Событие канала было усечено

Здесь я столкнулся с проблемой, что я получаю сообщение из источника Kafka и пишу перехватчик для извлечения двух полей (dataSoure и businessType) из сообщения kafka (формат json). Здесь я использую gson.fromJson(). Но проблема в том, что я получил ошибку ниже.

Здесь я хочу знать, обрезает ли Flume событие Flume, когда оно превышает лимит? Если да, то как настроить его на большее значение. Поскольку мое сообщение кафки всегда очень длинное, около 60 КБ.

С нетерпением жду ответа. Заранее спасибо!

09.12.2015, 11:48:05,665 (PollableSourceRunner-KafkaSource-apply) [ОШИБКА - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] ИСКЛЮЧЕНИЕ KafkaSource, {} com.google. gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Незавершенная строка в строке 1 столбца 4096 в com.google.gson.Gson.fromJson(Gson.java:809) в com.google.gson.Gson.fromJson(Gson .java:761) по адресу com.google.gson.Gson.fromJson(Gson.java:710) по адресу com.xxx.flume.interceptor.JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:43) по адресу com.xxx.flume.interceptor. JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:61) в org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62) в org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:146) в org. .apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)


person Tough    schedule 10.12.2015    source источник


Ответы (1)


Наконец, я нахожу первопричину путем отладки исходного кода. Это потому, что я пытался преобразовать event.getBody() в карту с помощью Gson, что неверно, поскольку event.getBody() - это байт [], а не строка, которую нельзя преобразовать. Правильный код должен быть следующим:

String body = new String(event.getBody(), "UTF-8");   
Map<String, Object> map = gson.fromJson(body, new TypeToken<Map<String, Object>>() {}.getType());
person Tough    schedule 13.12.2015