Как Spark копирует данные между таблицами cassandra?

Может ли кто-нибудь объяснить внутреннюю работу искры при чтении данных из одной таблицы и записи их в другую в кассандре.

Вот мой вариант использования:

Я загружаю данные, поступающие с платформы IOT, в cassandra через тему kafka. У меня есть небольшой скрипт python, который анализирует каждое сообщение от kafka, чтобы получить имя таблицы, к которой оно принадлежит, подготавливает запрос и записывает его в cassandra с помощью драйвера datastax cassandra для python. С помощью этого скрипта я могу принимать около 300000 записей в минуту в кассандру. Однако моя входящая скорость передачи данных составляет 510000 записей в минуту, поэтому задержка потребителя kafka продолжает увеличиваться.

Сценарий Python уже выполняет одновременные вызовы cassandra. Если я увеличу количество исполнителей python, драйвер cassandra начнет давать сбой, потому что узлы cassandra станут для него недоступны. Я предполагаю, что есть лимит вызовов Кассандры в секунду, на которые я попадаю. Вот сообщение об ошибке, которое я получаю:

ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"

Недавно я выполнил задание pyspark по копированию данных из пары столбцов одной таблицы в другую. В таблице было около 168 миллионов записей. Работа Pyspark завершена примерно за 5 часов. Таким образом, он обрабатывал более 550000 записей в минуту.

Вот код pyspark, который я использую:

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table=sourcetable, keyspace=sourcekeyspace)\
    .load().cache()

df.createOrReplaceTempView("data")

query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value  from data  " )

vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=newtable, keyspace=newkeyspace)\
    .save()

Версии:

  • Кассандра 3.9.
  • Spark 2.1.0.
  • Datastax's Spark-Cassandra-Connector 2.0.1
  • Scala версии 2.11

Кластер:

  • Настройка Spark с 3 рабочими и 1 главным узлом.
  • На 3 рабочих узлах также установлен кластер cassandra. (каждый узел кассандры с одним рабочим узлом искры)
  • Каждому рабочему было разрешено 10 ГБ оперативной памяти и 3 ядра.

Так что мне интересно:

  • Сначала искра читает все данные из кассандры, а затем записывает их в новую таблицу, или есть ли какая-то оптимизация в соединителе искры кассандры, которая позволяет перемещать данные по таблицам кассандры без чтения всех записей?

  • Если я заменю свой скрипт python заданием потоковой передачи искр, в котором я анализирую пакет, чтобы получить имя таблицы для кассандры, поможет ли это мне быстрее загружать данные в кассандру?


person farazmateen    schedule 01.06.2018    source источник


Ответы (1)


Коннектор Spark оптимизирован, поскольку он распараллеливает обработку и чтение / вставку данных в узлы, которым принадлежат данные. Вы можете повысить пропускную способность, используя Cassandra Spark Connector, но для этого потребуется больше ресурсов.

Говоря о вашей задаче - 300000 вставок в минуту - это 5000 в секунду, и это, честно говоря, не очень большое число - вы можете увеличить пропускную способность, добавив различные оптимизации:

  • Использование асинхронных вызовов для отправки запросов. Вам нужно только убедиться, что вы отправляете больше запросов, которые могут быть обработаны одним соединением (но вы также можете увеличить это число - я не уверен, как это сделать в Python, но, пожалуйста, отметьте Документ драйвера Java, чтобы получить представление).
  • используйте правильный уровень согласованности (LOCAL_ONE должен дать вам очень хорошую производительность)
  • используйте правильную политику балансировки нагрузки
  • вы можете запустить несколько копий вашего скрипта параллельно, убедившись, что все они находятся в одной группе потребителей Kafka.
person Alex Ott    schedule 01.06.2018
comment
Благодарю. Я попробую эти оптимизации. Я уже использую асинхронные вызовы. И я также использую параллельную библиотеку python, которая позволяет мне иметь несколько исполнителей в одном скрипте. Однако, когда я увеличиваю количество исполнителей с определенного числа (скажем, 7), я начинаю получать ошибку «Невозможно завершить операцию против любых хостов». Как будто нода перешла в оффлайн. Может ли это быть из-за установленного мной лимита вызовов cassandra? - person farazmateen; 02.06.2018
comment
Вам нужно проверить журналы на стороне Cassandra, чтобы узнать, что это вызывает. Скорее всего, узел перегружен, но для этого может быть много причин - это зависит от того, какой уровень согласованности вы используете для записи и т. Д. - person Alex Ott; 02.06.2018