Может ли кто-нибудь объяснить внутреннюю работу искры при чтении данных из одной таблицы и записи их в другую в кассандре.
Вот мой вариант использования:
Я загружаю данные, поступающие с платформы IOT, в cassandra через тему kafka. У меня есть небольшой скрипт python, который анализирует каждое сообщение от kafka, чтобы получить имя таблицы, к которой оно принадлежит, подготавливает запрос и записывает его в cassandra с помощью драйвера datastax cassandra для python. С помощью этого скрипта я могу принимать около 300000 записей в минуту в кассандру. Однако моя входящая скорость передачи данных составляет 510000 записей в минуту, поэтому задержка потребителя kafka продолжает увеличиваться.
Сценарий Python уже выполняет одновременные вызовы cassandra. Если я увеличу количество исполнителей python, драйвер cassandra начнет давать сбой, потому что узлы cassandra станут для него недоступны. Я предполагаю, что есть лимит вызовов Кассандры в секунду, на которые я попадаю. Вот сообщение об ошибке, которое я получаю:
ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"
Недавно я выполнил задание pyspark по копированию данных из пары столбцов одной таблицы в другую. В таблице было около 168 миллионов записей. Работа Pyspark завершена примерно за 5 часов. Таким образом, он обрабатывал более 550000 записей в минуту.
Вот код pyspark, который я использую:
df = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table=sourcetable, keyspace=sourcekeyspace)\
.load().cache()
df.createOrReplaceTempView("data")
query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value from data " )
vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table=newtable, keyspace=newkeyspace)\
.save()
Версии:
- Кассандра 3.9.
- Spark 2.1.0.
- Datastax's Spark-Cassandra-Connector 2.0.1
- Scala версии 2.11
Кластер:
- Настройка Spark с 3 рабочими и 1 главным узлом.
- На 3 рабочих узлах также установлен кластер cassandra. (каждый узел кассандры с одним рабочим узлом искры)
- Каждому рабочему было разрешено 10 ГБ оперативной памяти и 3 ядра.
Так что мне интересно:
Сначала искра читает все данные из кассандры, а затем записывает их в новую таблицу, или есть ли какая-то оптимизация в соединителе искры кассандры, которая позволяет перемещать данные по таблицам кассандры без чтения всех записей?
Если я заменю свой скрипт python заданием потоковой передачи искр, в котором я анализирую пакет, чтобы получить имя таблицы для кассандры, поможет ли это мне быстрее загружать данные в кассандру?