From b83084a794db1cbc170ed0ab90cd58567bb9b9ae Mon Sep 17 00:00:00 2001 From: Petar Bozin Date: Sat, 22 Apr 2017 14:55:10 +0000 Subject: [PATCH] More improvements --- .../scala/code/bankconnectors/KafkaHelper.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/scala/code/bankconnectors/KafkaHelper.scala b/src/main/scala/code/bankconnectors/KafkaHelper.scala index a53d6ac6b..4cc64c84e 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelper.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelper.scala @@ -40,11 +40,15 @@ class KafkaHelper extends MdcLoggable { val consumerProps = new Properties() consumerProps.put("bootstrap.servers", Props.get("kafka.host")openOr("localhost:9092")) consumerProps.put("enable.auto.commit", "false") + consumerProps.put("group.id", UUID.randomUUID.toString) consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") var producer = new KafkaProducer[String, String](producerProps) + val consumer = new KafkaConsumer[String, String](consumerProps) + consumer.subscribe(util.Arrays.asList(responseTopic)) + implicit val formats = DefaultFormats /* @@ -186,11 +190,11 @@ messages with an offset greater than 5. For this it would call kafkaConsumer.see */ def getResponse(reqId: String): json.JValue = { - println("RECEIVING...") - val tempProps = consumerProps - tempProps.put("group.id", UUID.randomUUID.toString) - val consumer = new KafkaConsumer[String, String](tempProps) - consumer.subscribe(util.Arrays.asList(responseTopic)) + println("RECEIVING... " + reqId) + //val tempProps = consumerProps + //tempProps.put("group.id", UUID.randomUUID.toString) + //val consumer = new KafkaConsumer[String, String](tempProps) + //consumer.subscribe(util.Arrays.asList(responseTopic)) while (true) { val consumerMap = consumer.poll(100) val records = consumerMap.records(responseTopic).iterator @@ -198,6 +202,7 @@ messages with an offset greater than 5. For this it would call kafkaConsumer.see val record = records.next println("FILTERING..." + record) if (record.key == reqId) + println("FOUND >>> " + record) return json.parse(record.value) \\ "data" } }