Проблема интеграции PostgreSQL и Kafka Connect

Я тестирую коннектор JDBC Sink для дампа записей из Kafka в PostgreSQL. Вот конфигурация коннектора:

{
    "name": "jdbc-sink-postgresql-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "role",
        "connection.url": "jdbc:postgresql://localhost:5432/postgres?user=&password=",
        "auto.create": "false",                                                   
        "insert.mode": "upsert",
        "mode":"incrementing",
        "table.name.format":"role",
        "pk.mode":"record_value",
        "pk.fields":"role_id"
    }
}

Когда я запускаю соединитель, я получаю следующее исключение:

java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "role" ("role_id","role_name") VALUES (123,'admin') ON CONFLICT ("role_id") DO UPDATE SET "role_name"=EXCLUDED."role_name" was aborted.  
   Call getNextException to see the cause.
   at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2778))

Любые указатели на то, что мне здесь не хватает? Пожалуйста, дайте мне знать, если потребуется дополнительная информация.


person user123    schedule 07.10.2018    source источник
comment
Можете ли вы подтвердить правильность названия вашей таблицы и схемы?   -  person Giorgos Myrianthous    schedule 07.10.2018
comment
@Giorgos Myrianthous - Спасибо за быстрый ответ. Название таблицы и темы - роль. Имя схемы является общедоступным, а имя базы данных - postgres.   -  person user123    schedule 07.10.2018
comment
Можете ли вы попробовать запустить INSERT INTO роль (role_id, role_name) VALUES (123, 'admin') ON CONFLICT (role_id) DO UPDATE SET role_name = EXCLUDED.role_name в своей базе данных и посмотреть, что произойдет?   -  person Giorgos Myrianthous    schedule 07.10.2018
comment
Кроме того, какая у вас версия Kafka?   -  person Giorgos Myrianthous    schedule 07.10.2018
comment
Это было первое, что я сделал после того, как заметил проблему, и все сработало нормально, без каких-либо проблем. Я использую confluent Kafka 4.0.0   -  person user123    schedule 07.10.2018
comment
Можете ли вы включить batch.size: 0 в конфигурацию соединителя приемника? Затем остановите и снова запустите свой соединитель. Это не решит вашу проблему, но может дать более подробную информацию об ошибке.   -  person Giorgos Myrianthous    schedule 07.10.2018
comment
Спасибо @GiorgosMyrianthous за указание на это. Я смог найти первопричину. Я ответил на этот вопрос в надежде, что когда-нибудь кто-то сочтет это полезным.   -  person user123    schedule 07.10.2018


Ответы (1)


Итак, проблема была в таблице. Вот как я сначала создал таблицу:

CREATE TABLE role(
 role_id int PRIMARY KEY,
 role_name VARCHAR (255) UNIQUE NOT NULL
);

Тестовые данные в теме выглядели так:

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic role --property schema.registry.url=http://localhost:8081/  --property value.schema='{"type":"record","name":"myRecord","fields": [{"name": "role_id","type": "int"},{"name": "role_name","type": "string"}]}' --key-serializer org.apache.kafka.common.serialization.StringSerializer --value-serializer io.confluent.kafka.serializers.KafkaAvroSerializer --property print.key=true
{"role_id":122, "role_name":"admin"}
{"role_id":123, "role_name":"admin"}
{"role_id":124, "role_name":"admin"}
{"role_id":125, "role_name":"admin"}
{"role_id":126, "role_name":"admin"}

Итак, когда мои тестовые данные снова и снова имели одно и то же значение для поля role_name, это нарушало уникальное ограничение и, следовательно, ошибку.

Что все, что я сделал?

Я уронил стол.

Создана новая таблица без ограничения уникального ключа, и вышеуказанные данные были без проблем отправлены в PostgreSQL.

person user123    schedule 07.10.2018