Spark за обработка на rdd парче по парче от json файлове и публикуване в темата на Kafka

Нов съм в Spark & ​​scala. Имам изискване да обработя брой json файлове, да речем от s3 местоположение. Тези данни са основно пакетни данни, които ще бъдат запазени за повторна обработка по-късно. Сега моята работа на Spark трябва да обработи тези файлове по такъв начин, че да избере 5 необработени json записа и да изпрати съобщение до темата Kafka. Причината за избора на само 5 записа е, че темата kafka обработва едновременно данни в реално време и пакетни данни по една и съща тема. така че пакетната обработка не трябва да забавя обработката в реално време.

Трябва да обработя целия json файл последователно и затова бих избрал само 5 записа наведнъж и ще публикувам съобщение до kafka и ще избера следващите 5 записа на json файл и така нататък...

Написах част от код, който ще чете от json файлове и ще го публикува в kafka тема.

        val jsonRDD = sc.textFile(s3Location)

        var count = 0

        val buf = new StringBuilder

        jsonRDD.collect().foreach(line => {
            count += 1
                    buf ++= line
                    if (count > 5) {
                        println(s"Printing 5 jsons $buf")
                        count = 0
                        buf.setLength(0)
                        SendMessagetoKakfaTopic(buf) // psuedo cod for sending message to kafkatopic 
                        Thread.sleep(10000)
                    }
        })
        if (buf != null) {
            println(s"Printing remaining jsons $buf")
            SendMessagetoKakfaTopic(buf)
        }

Вярвам, че има по-ефективен начин за обработка на JSON в Spark.

Освен това трябва да търся други параметри като памет, ресурси и т.н. Тъй като данните може да надхвърлят 100 гигабайта.


person ShivRaj Nag    schedule 19.05.2017    source източник


Отговори (1)


Това изглежда като случай за Spark Streaming или (препоръчително) Spark Структурирано поточно предаване.

И в двата случая наблюдавате директория и обработвате нови файлове на всеки партиден интервал (може да се конфигурира).


Можете да се справите с него, като използвате SparkContext.textFile (със заместващи знаци) или SparkContext.wholeTextFiles. И в двата случая в крайна сметка ще получите RDD[String] за представяне на редовете във вашите JSON файлове (един ред на JSON файл).

Ако вашето изискване е да обработвате файловете последователно, 5-редова част по 5-редова част, можете да направите конвейера за трансформация малко по-ефективен, като използвате RDD.toLocalIterator:

toLocalIterator: Iterator[T]

Връща итератор, който съдържа всички елементи в този RDD. Итераторът ще консумира толкова памет, колкото най-големият дял в този RDD.

Вижте RDD API.

С Итератор на JSON, бихте направили sliding с 5 елемента.

Това ще ви даде доста ефективен тръбопровод.


Още веднъж силно препоръчвам да прочетете за структурирано поточно предаване в Структурирано поточно предаване + Ръководство за интегриране на Kafka (брокер Kafka версия 0.10.0 или по-нова) (за четене, но се поддържа и писане).

person Jacek Laskowski    schedule 19.05.2017
comment
Благодаря много @Jacek. Има ли начин, по който мога да контролирам зареждането на входни файлове в rdd. Например Ако има 100 файла, тогава 10 файла трябва да се заредят веднъж по един и след това останалите 10 файла и така нататък. Видях, че има само една опция за разделяне с метода sc.textFile(). - person ShivRaj Nag; 19.05.2017
comment
не Няма начин да го контролираш. По преценка на textFile е да зареди толкова, колкото са налични. Можете да опитате да ограничите файловете, заредени от regex в пътя на директорията, напр. s3Location/*.2017.06.[0-5]*. - person Jacek Laskowski; 19.05.2017