More improvements

This commit is contained in:
Petar Bozin 2017-04-22 14:55:10 +00:00
parent c642bde22f
commit b83084a794

View File

@ -40,11 +40,15 @@ class KafkaHelper extends MdcLoggable {
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", Props.get("kafka.host")openOr("localhost:9092"))
consumerProps.put("enable.auto.commit", "false")
consumerProps.put("group.id", UUID.randomUUID.toString)
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
var producer = new KafkaProducer[String, String](producerProps)
val consumer = new KafkaConsumer[String, String](consumerProps)
consumer.subscribe(util.Arrays.asList(responseTopic))
implicit val formats = DefaultFormats
/*
@ -186,11 +190,11 @@ messages with an offset greater than 5. For this it would call kafkaConsumer.see
*/
def getResponse(reqId: String): json.JValue = {
println("RECEIVING...")
val tempProps = consumerProps
tempProps.put("group.id", UUID.randomUUID.toString)
val consumer = new KafkaConsumer[String, String](tempProps)
consumer.subscribe(util.Arrays.asList(responseTopic))
println("RECEIVING... " + reqId)
//val tempProps = consumerProps
//tempProps.put("group.id", UUID.randomUUID.toString)
//val consumer = new KafkaConsumer[String, String](tempProps)
//consumer.subscribe(util.Arrays.asList(responseTopic))
while (true) {
val consumerMap = consumer.poll(100)
val records = consumerMap.records(responseTopic).iterator
@ -198,6 +202,7 @@ messages with an offset greater than 5. For this it would call kafkaConsumer.see
val record = records.next
println("FILTERING..." + record)
if (record.key == reqId)
println("FOUND >>> " + record)
return json.parse(record.value) \\ "data"
}
}