Може ли някой да обясни вътрешната работа на spark при четене на данни от една таблица и записването им в друга в cassandra.
Ето моят случай на употреба:
Поглъщам данни, идващи от IOT платформа в cassandra чрез kafka тема. Имам малък python скрипт, който анализира всяко съобщение от kafka, за да получи името на таблицата, към която принадлежи, подготвя заявка и я записва в cassandra, използвайки касандровия драйвер на datastax за python. С този скрипт мога да поглъщам около 300 000 записа на минута в cassandra. Скоростта на входящите ми данни обаче е 510 000 записа в минута, така че забавянето на потребителите на Kafka продължава да се увеличава.
Скриптът на Python вече извършва едновременни извиквания към cassandra. Ако увелича броя на изпълнителите на Python, cassandra-driver започва да се проваля, защото възлите на cassandra стават недостъпни за него. Предполагам, че има ограничение на обажданията на cassandra в секунда, което удрям там. Ето съобщението за грешка, което получавам:
ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"
Наскоро стартирах задача на pyspark, за да копирам данни от няколко колони в една таблица в друга. Таблицата имаше около 168 милиона записа в нея. Работата на Pyspark приключи за около 5 часа. Така той обработва над 550 000 записа в минута.
Ето кода на pyspark, който използвам:
df = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table=sourcetable, keyspace=sourcekeyspace)\
.load().cache()
df.createOrReplaceTempView("data")
query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value from data " )
vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table=newtable, keyspace=newkeyspace)\
.save()
Версии:
- Касандра 3.9.
- Искра 2.1.0.
- Spark-cassandra-connector 2.0.1 на Datastax
- Scala версия 2.11
Клъстер:
- Настройка на Spark с 3 работни и 1 главен възел.
- 3 работни възли също имат инсталиран касандров клъстер. (всеки възел cassandra с един възел Spark Worker)
- На всеки работник бяха разрешени 10 GB ram и 3 ядра.
Така че се чудя:
Spark първо чете ли всички данни от cassandra и след това ги записва в новата таблица или има някакъв вид оптимизация в spark cassandra конектора, който му позволява да премества данните около cassandra таблици, без да чете всички записи?
Ако заменя своя скрипт на Python с задание за стрийминг на искра, в което анализирам пакета, за да получа името на таблицата за cassandra, това ще ми помогне ли да приемам данни по-бързо в cassandra?