Производитель Kafka с использованием HiveStorageHandler

Я относительно новичок в hive/hadoop

Я читал эти обработчики хранилища Hive.

Теперь я пытаюсь написать пользовательскую реализацию HiveStorageHandler для запроса и отправки сообщений в Kafka с использованием таблицы Hive.

Я видел, что существуют другие реализации HiveStorageHandler, которые позволяют нам запрашивать и записывать в базы данных NoSQL, используя таблицы кустов.

Я пытаюсь воспроизвести это для Кафки. Я нашел проект на нем

HiveKa — запрашивать Kafka с помощью Hive

Здесь они пытаются прочитать данные из Kafka с помощью запросов к таблице улья. Я хочу написать на тему кафки, используя вставку на столе.

Может ли кто-нибудь помочь мне в этом?


person raizsh    schedule 25.11.2018    source источник


Ответы (2)


Я хочу написать на тему кафки, используя вставку на столе.

Это возможно с помощью Kafka HiveStorageHandler. Ниже приведены общие варианты использования этой функции.

  1. Запрос тем Kafka
  2. Запросить данные из тем Kafka и вставить в управляемую/внешнюю таблицу куста
  3. Запрашивайте данные из тем Kafka и добавляйте их в другие темы Kafka.
  4. Запросить данные из внешней/управляемой таблицы куста и отправить их в темы Kafka.

Вы пытаетесь выполнить третий вариант использования.

Сначала создайте две внешние таблицы для исходной и конечной тем Kafka.

create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);


create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);

Затем используйте запрос на слияние, чтобы вставить данные в целевую тему Kafka.

merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);

Примечание:

  1. Используется внешняя несобственная таблица Hive

  2. В дополнение к определяемой пользователем схеме полезной нагрузки обработчик хранилища Kafka добавляет 4 дополнительных столбца (__key, __partition, __offset, __timestmap), которые пользователи могут использовать для запроса полей метаданных Kafka.

  3. Пользователи должны установить свойство таблицы kafka.serde.class, если данные не в формате csv.

  4. Пользователи также могут установить свойство таблицы kafka.write.semantic, которое допускает значение NONE, AT_LEAST_ONCE или EXACTLY_ONCE.

person arunkvelu    schedule 04.04.2019

Если я правильно понимаю, вы хотите читать события из Hive и пушить в Kafka. У меня нет опыта работы с обработчиками хранилища, но я бы предпочел написать соответствующий код для создания в Kafka, а затем передавать эти события в Hadoop/Hive.

В Kafka есть фреймворк под названием Kafka Connect, который записывает во внешние системы. Confluent написала такой Connector для HDFS, который предлагает поддержку Hive путем обновления хранилища метаданных Hive всякий раз, когда файл записывается в HDFS.

Без написания обработчика хранилища вы можете попробовать использовать коннектор JDBC Source или Spark/Flink, чтобы прочитать эти данные из Hive и передать их в Kafka.

Однако, как правило, Hadoop является местом назначения для событий CDC, а не их источником. В основном потому, что это просто медленный запрос... Если вы хотите создавать события для вставок, обычно требуется сканирование таблицы, поэтому генерация событий из Cassandra/Hbase может быть лучшим вариантом.

person OneCricketeer    schedule 25.11.2018
comment
Я хочу писать в Кафку, используя операторы вставки в таблицу кустов. Я не собираюсь расспрашивать Кафку. - person raizsh; 25.11.2018
comment
Я понял эту часть. Думаю, я просто пытаюсь понять, почему, видя, что вы можете сразу же исправить события вставки в Kafka, а затем поместить их в Hive, а не наоборот. - person OneCricketeer; 25.11.2018