diff --git a/src/main/scala/code/bankconnectors/KafkaHelper.scala b/src/main/scala/code/bankconnectors/KafkaHelper.scala index 5352c53fe..8674f1d59 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelper.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelper.scala @@ -45,18 +45,13 @@ class KafkaHelper extends MdcLoggable { consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") var producer = new KafkaProducer[String, String](producerProps) + var consumer = new KafkaConsumer[String, String](consumerProps) + consumer.subscribe(util.Arrays.asList(responseTopic)) implicit val formats = DefaultFormats def getResponse(reqId: String): json.JValue = { - var consumer = new KafkaConsumer[String, String](consumerProps) - consumer.subscribe(util.Arrays.asList(responseTopic)) - //if (consumer == null) { - // consumer = new KafkaConsumer[String, String](consumerProps) - // consumer.subscribe(util.Arrays.asList(responseTopic)) - //} - //if (consumer == null) - // return json.parse("""{"error":"kafka consumer unavailable"}""") + consumer.seekToBeginning(consumer.assignment()) val consumerMap = consumer.poll(100) val it = consumerMap.iterator