Trying with seek

This commit is contained in:
Petar Bozin 2017-04-21 19:25:50 +00:00
parent 327c7a693f
commit e1ff747ffc

View File

@ -45,18 +45,13 @@ class KafkaHelper extends MdcLoggable {
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
var producer = new KafkaProducer[String, String](producerProps)
var consumer = new KafkaConsumer[String, String](consumerProps)
consumer.subscribe(util.Arrays.asList(responseTopic))
implicit val formats = DefaultFormats
def getResponse(reqId: String): json.JValue = {
var consumer = new KafkaConsumer[String, String](consumerProps)
consumer.subscribe(util.Arrays.asList(responseTopic))
//if (consumer == null) {
// consumer = new KafkaConsumer[String, String](consumerProps)
// consumer.subscribe(util.Arrays.asList(responseTopic))
//}
//if (consumer == null)
// return json.parse("""{"error":"kafka consumer unavailable"}""")
consumer.seekToBeginning(consumer.assignment())
val consumerMap = consumer.poll(100)
val it = consumerMap.iterator