Тематический объект Kafka в искровое преобразование кадра данных и запись в HDFS

Я пытаюсь создать потребителя kafka в искровом кодировании, при создании я получаю исключение. Моя цель - мне нужно прочитать из темы и записать в путь HDFS.

scala> df2.printSchema()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

scala> print(df1)
[key: binary, value: binary ... 5 more fields]

Я не ввожу никаких комментариев в тему, хотя в качестве входных данных используются эти 6 значений.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import spark.implicits._
object Read {  
  def main(args: Array[String]): Unit = {  

    val spark = SparkSession.builder()
    .appName("spark Oracle Kafka")
    .master("local")
    .getOrCreate()
val df2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka server ip address i have given")
  .option("subscribe", "topic20190904")
  .load()

print(df1)//it is return some values 
df2.show() it's throwing exception i hope it's not dataframe.
df2.write.parquet("/user/xrrn5/abcd")// I am getting java.lang.AbstractMethodError
java.lang.AbstractMethodError  at rg.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala)

person SR RAVINDRAN    schedule 03.09.2019    source источник
comment
Рассматривали ли вы использование Kafka Connect для записи данных в HDFS? Он специально разработан для этого и является частью Apache Kafka.   -  person Robin Moffatt    schedule 03.09.2019
comment
Спасибо, Робин ... Могу ли я иметь какой-нибудь фрагмент кода, чтобы сделать то же самое в kafka connect ... Моя цель - написать его из темы kafka в HDFS, это может быть scala или kafka conncect от Spark ...   -  person SR RAVINDRAN    schedule 04.09.2019


Ответы (1)


Для записи данных из Kafka в HDFS вам на самом деле не нужен код - вы можете просто использовать Kafka Connect, который является частью Apache Kafka. Вот пример конфигурации:

{
  "name": "hdfs-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test_hdfs",
    "hdfs.url": "hdfs://localhost:9000",
    "flush.size": "3",
    "name": "hdfs-sink"
  }
}

См. здесь для документации по коннектору и здесь для общего введения и обзора использования Kafka Connect.

person Robin Moffatt    schedule 04.09.2019