Как создать хранилище состояний с HashMap в качестве значения в потоках Kafka?

Мне нужно создать хранилище состояний со строковым ключом HashMap в качестве значения. Я попробовал следующие два метода.

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
          .withKeys(Serdes.String())
          .withValues(HashMap.class)
          .persistent()
          .build();

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")
          .withKeys(Serdes.String())
          .withValues(Serdes.serdeFrom(h.getClass()))
          .persistent()
          .build();

Код компилируется нормально, без ошибок, но я получаю ошибку во время выполнения

io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer

Может ли кто-нибудь предложить мне, как правильно создать хранилище состояний?


person Samy    schedule 29.08.2016    source источник


Ответы (1)


Если вы хотите создать хранилище состояний, вам необходимо предоставить классы сериализатора и десериализатора для типа, который вы хотите использовать. В Kafka Stream существует единственная абстракция под названием Serde, которая объединяет сериализатор и десериализатор в один класс.

Если вы используете .withValues(Class<K> keyClass), он должен содержать это

@param keyClass класс для ключей, который должен быть одним из типов, для которых у Kafka есть встроенные серверы.

Поскольку нет встроенного Serdes для HashMap, вам нужно сначала реализовать один (возможно, называемый HashMapSerde) и передать этот класс методу .withValues(Serde<K> keySerde). Более того, вы также должны реализовать настоящий сериализатор и десериализатор для HashMap. Если вы знаете общие типы вашего HashMap, вы должны указать их (что значительно упрощает реализацию сериализатора и десериализатора.

Что-то вроде этого (просто набросок, общие типы опущены):

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;

public class HashMapSerde implements Serde<HashMap> {

    void configure(Map<String, ?> configs, boolean isKey) {
        /* put your code here */
    }

    void close() {
        /* put your code here */
    }

    Serializer<HashMap> serializer() {
        return new Serializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public byte[] serialize(String topic, T data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }

    Deserializer<HashMap> deserializer() {
        return new Deserializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public T deserialize(String topic, byte[] data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }
}

Если вы хотите увидеть примеры реализации (де)сериализаторов и Serde, загляните в https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization и https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

person Matthias J. Sax    schedule 29.08.2016
comment
Спасибо @matthias-j-sax. Я попробую этот. - person Samy; 30.08.2016
comment
@matthias: Как мы можем определить serde для Hashmap‹?,?›? - person Nishu Tayal; 12.02.2018
comment
Реализовать Serde интерфейс. - person Matthias J. Sax; 12.02.2018