Събитието на потока беше съкратено

Тук се сблъсквам с проблем, при който получавам съобщение от източника на Kafka и пиша прехващач за извличане на две полета(dataSoure и businessType) от съобщението kafka (формат json). Тук използвам gson.fromJson(). Но проблемът е, че получих по-долу грешка.

Тук искам да знам дали Flume съкращава събитието Flume, когато надхвърли лимит? Ако да, как да го настроя на по-голяма стойност. Тъй като моето kafka съобщение винаги е много дълго, около 60K байта.

Очаквам отговор. Благодаря предварително!

2015-12-09 11:48:05,665 (PollableSourceRunner-KafkaSource-apply) [ГРЕШКА - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource ИЗКЛЮЧЕНИЕ, {} com.google. gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Незавършен низ на ред 1 колона 4096 на com.google.gson.Gson.fromJson(Gson.java:809) на com.google.gson.Gson.fromJson(Gson .java:761) на com.google.gson.Gson.fromJson(Gson.java:710) на com.xxx.flume.interceptor.JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:43) на com.xxx.flume.interceptor. JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:61) в org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62) в org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:146) в org .apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)


person Tough    schedule 10.12.2015    source източник


Отговори (1)


Накрая откривам първопричината чрез отстраняване на грешки в изходния код. Причината е, че се опитах да преобразувам event.getBody() в карта с помощта на Gson, което е неправилно, тъй като event.getBody() е байт [], а не низ, който не може да бъде преобразуван. Правилният код трябва да бъде както следва:

String body = new String(event.getBody(), "UTF-8");   
Map<String, Object> map = gson.fromJson(body, new TypeToken<Map<String, Object>>() {}.getType());
person Tough    schedule 13.12.2015