Является ли ManagedChannel потокобезопасным в Transformer Kafka

Вот мой трансформатор:

public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {

    private ManagedChannel channel;
    private InfoClient infoclient;
    private LRUCacheCollector < String,
    InfoResponse > cache;


    public DataEnricher() {}

    @Override
    public void init(ProcessorContext context) {
        channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
        infoclient = new InfoClient(channel);
    }

    @Override
    public KeyValue < byte[],
    EnrichedData > transform(byte[] key, EnrichedData request) {
        InfoResponse infoResponse = null;
        String someInfo = request.getSomeInfo();
        try {
            infoResponse = infoclient.getMoreInfo(someInfo);
        } catch (Exception e) {
            logger.warn("An exception has occurred during retrieval.", e.getMessage());
        }
        EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
        return new KeyValue < > (key, enrichedData);
    }

    @Override
    public KeyValue < byte[],
    DataEnricher > punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {
        client.shutdown();
    }
}

В Kafka Streams каждый поток потока инициализирует свою собственную копию топологии потока, а затем создает экземпляр этой топологии для каждого ProcessorContext, то есть для каждой задачи, то есть для каждого раздела. Итак, не будет ли init() вызван и перезаписан / пропущен канал для каждого раздела, и, поскольку у нас есть несколько потоков, даже будет гонка за созданием channel/client? Есть ли способ предотвратить это?

это вызывается в методе run():

public KafkaStreams createStreams() {
    final Properties streamsConfiguration = new Properties();
    //other configuration is setup here
    streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    streamsConfiguration.put(
        StreamsConfig.NUM_STREAM_THREADS_CONFIG,
        3);

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    RequestJsonSerde requestSerde = new RequestJsonSerde();
    DataEnricher dataEnricher = new DataEnricher();
    // Get the stream of requests
    final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
        .stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
    final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
        .filter((key, request) - > {
            return Objects.nonNull(request);
        }
        .transform(() - > dataEnricher);

    enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));

    return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}

person Alex P.    schedule 05.03.2020    source источник


Ответы (2)


Не относится к ManagedChannel, но вы должны предоставить новый момент DataEnricher для ProcessContext в TransformerSupplier.

KStream.transform(DataEnricher::new);

Как только я столкнусь с некоторыми исключениями потока Kafka, связанными с этим, я попытаюсь воссоздать его.

И ИМО, если вы не используете пунктуацию для отправки большего количества записей в нисходящий поток, а новый ключ такой же, как входная запись, вы должны использовать transformValues(), потому что transform() может привести к повторному разделению, когда применяется операция на основе ключа, такая как агрегация, соединение.

person Tuyen Luong    schedule 06.03.2020
comment
Спасибо, я изменил это, дам вам знать, если это сработает, и приму ваш ответ - person Alex P.; 06.03.2020

Я предполагаю, что TransformerSupplier создает по одному экземпляру Transformer для каждой топологии (или ProcessorContext) и, следовательно, по одному channel для каждой топологии. В этом случае нет опасности channel перезаписи. Также я предполагаю, что ваш client.shutdown() также закрывает свой канал.

person San P    schedule 06.03.2020
comment
Я обновил вопрос, с конфигурацией kafka. Таким образом, количество потоковых потоков равно 3 - person Alex P.; 06.03.2020