Fixed issue with null entry in kafka queue, when either key or message is null

This commit is contained in:
Petar Bozin 2016-08-24 11:21:22 +02:00
parent 81ce96a9b7
commit 4e4398cf2f

View File

@ -73,18 +73,24 @@ class KafkaConsumer(val zookeeper: String = Props.get("kafka.zookeeper_host").op
// wait for message
while (it.hasNext()) {
val mIt = it.next()
val msg = new String(mIt.message(), "UTF8")
val key = new String(mIt.key(), "UTF8")
// check if the id matches
if (key == reqId) {
// Parse JSON message
val j = json.parse(msg)
// disconnect from Kafka
consumer.shutdown()
// return as JSON
return j
// skip null entries
if (mIt != null && mIt.key != null && mIt.message != null) {
val msg = new String(mIt.message(), "UTF8")
val key = new String(mIt.key(), "UTF8")
// check if the id matches
if (key == reqId) {
// Parse JSON message
val j = json.parse(msg)
// disconnect from Kafka
consumer.shutdown()
// return as JSON
return j
}
} else {
logger.warn("KafkaConsumer: Got null value/key from kafka. Might be south-side connector issue.")
}
}
return json.parse("""{"error":"KafkaConsumer could not fetch response"}""") //TODO: replace with standard message
}
catch {
case e:kafka.consumer.ConsumerTimeoutException =>