Как spark копира данни между касандрови таблици?

Може ли някой да обясни вътрешната работа на spark при четене на данни от една таблица и записването им в друга в cassandra.

Ето моят случай на употреба:

Поглъщам данни, идващи от IOT платформа в cassandra чрез kafka тема. Имам малък python скрипт, който анализира всяко съобщение от kafka, за да получи името на таблицата, към която принадлежи, подготвя заявка и я записва в cassandra, използвайки касандровия драйвер на datastax за python. С този скрипт мога да поглъщам около 300 000 записа на минута в cassandra. Скоростта на входящите ми данни обаче е 510 000 записа в минута, така че забавянето на потребителите на Kafka продължава да се увеличава.

Скриптът на Python вече извършва едновременни извиквания към cassandra. Ако увелича броя на изпълнителите на Python, cassandra-driver започва да се проваля, защото възлите на cassandra стават недостъпни за него. Предполагам, че има ограничение на обажданията на cassandra в секунда, което удрям там. Ето съобщението за грешка, което получавам:

ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"

Наскоро стартирах задача на pyspark, за да копирам данни от няколко колони в една таблица в друга. Таблицата имаше около 168 милиона записа в нея. Работата на Pyspark приключи за около 5 часа. Така той обработва над 550 000 записа в минута.

Ето кода на pyspark, който използвам:

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table=sourcetable, keyspace=sourcekeyspace)\
    .load().cache()

df.createOrReplaceTempView("data")

query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value  from data  " )

vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=newtable, keyspace=newkeyspace)\
    .save()

Версии:

  • Касандра 3.9.
  • Искра 2.1.0.
  • Spark-cassandra-connector 2.0.1 на Datastax
  • Scala версия 2.11

Клъстер:

  • Настройка на Spark с 3 работни и 1 главен възел.
  • 3 работни възли също имат инсталиран касандров клъстер. (всеки възел cassandra с един възел Spark Worker)
  • На всеки работник бяха разрешени 10 GB ram и 3 ядра.

Така че се чудя:

  • Spark първо чете ли всички данни от cassandra и след това ги записва в новата таблица или има някакъв вид оптимизация в spark cassandra конектора, който му позволява да премества данните около cassandra таблици, без да чете всички записи?

  • Ако заменя своя скрипт на Python с задание за стрийминг на искра, в което анализирам пакета, за да получа името на таблицата за cassandra, това ще ми помогне ли да приемам данни по-бързо в cassandra?


person farazmateen    schedule 01.06.2018    source източник


Отговори (1)


Spark конекторът е оптимизиран, защото паралелизира обработката и четенето/вмъкването на данни във възли, които притежават данните. Може да получите по-добра производителност, като използвате Cassandra Spark Connector, но това ще изисква повече ресурси.

Говорейки за вашата задача - 300 000 вмъквания/минута са 5000/секунда и това не е много голямо число, честно казано - можете да увеличите пропускателната способност, като поставите различни оптимизации:

  • Използване на асинхронни повиквания за изпращане на заявки. Трябва само да се уверите, че изпращате повече заявки, които могат да бъдат обработени от една връзка (но можете също да увеличите този брой - не съм сигурен как да го направя в Python, но моля, проверете документ за Java драйвер, за да получите представа).
  • използвайте правилно ниво на последователност (LOCAL_ONE трябва да ви даде много добра производителност)
  • използвайте правилна правила за балансиране на натоварването
  • можете да стартирате няколко копия на вашия скрипт паралелно, като се уверите, че всички те са в една и съща потребителска група Kafka.
person Alex Ott    schedule 01.06.2018
comment
Благодаря. Ще пробвам тези оптимизации. Вече използвам асинхронни повиквания. Използвам и едновременната библиотека на python, която ми позволява да имам множество изпълнители в един и същи скрипт. Въпреки това, когато увелича броя на изпълнителите от определен брой (да речем 7), започвам да получавам грешка „Не мога да завърша операцията срещу нито един хост“. Сякаш възелът е станал офлайн. Може ли да се дължи на определен брой обаждания на cassandra, който достигам? - person farazmateen; 02.06.2018
comment
Трябва да проверите регистрационните файлове от страната на Касандра, за да видите какво причинява. Най-вероятно възелът е претоварен, но може да има много причини за това - зависи какво ниво на последователност използвате за писане и т.н. - person Alex Ott; 02.06.2018