Как правильно отправить задание pyspark потоковой передачи kafka в Google Dataproc

Я пытаюсь отправить задание pyspark через пользовательский интерфейс Dataproc и продолжаю получать сообщение об ошибке, похоже, он не загружает потоковый пакет kafka.

Вот команда REST, предоставляемая пользовательским интерфейсом в моей работе: POST /v1/projects/projectname/regions/global/jobs:submit/ { "projectId": "projectname", "job": { "placement": { "clusterName": "cluster-main" }, "reference": { "jobId": "job-33ab811a" }, "pysparkJob": { "mainPythonFileUri": "gs://projectname/streaming.py", "args": [ "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0" ], "jarFileUris": [ "gs://projectname/spark-streaming-kafka-0-10_2.11-2.2.0.jar" ] } } }

Я попытался передать пакет kafka как args, так и как файл jar.

Вот мой код (streaming.py):

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json


sc = SparkContext()

spark = SparkSession.builder.master("local").appName("Spark-Kafka-Integration").getOrCreate()

# < ip > is masked
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<ip>:9092") \
    .option("subscribe", "rsvps") \
    .option("startingOffsets", "earliest") \
    .load()
df.printSchema()

ошибка:: java.lang.ClassNotFoundException: не удалось найти источник данных: kafka. Пакеты можно найти на странице http://spark.apache.org/third-party-projects.html

полная трассировка: https://pastebin.com/Uz3iGy2N


person tasha    schedule 11.03.2018    source источник


Ответы (1)


Вы, вероятно, столкнетесь с проблемой, когда "--packages" является синтаксическим сахаром в spark-submit, который плохо взаимодействует, когда инструменты более высокого уровня (Dataproc) программно вызывают отправку Spark, с альтернативным синтаксисом, описанным в моем ответе здесь: использовать внешнюю библиотеку в задании pyspark в кластере Spark из google-dataproc

Короче говоря, вы можете использовать properties для указания эквивалентного spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 в вашем запросе Dataproc вместо передачи --properties в аргументах задания.

person Dennis Huo    schedule 11.03.2018