diff --git a/src/main/scala/code/bankconnectors/KafkaHelper.scala b/src/main/scala/code/bankconnectors/KafkaHelper.scala index d87a129fe..3af5a5883 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelper.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelper.scala @@ -19,9 +19,9 @@ import scala.concurrent.{Await, ExecutionException, Future} object KafkaHelper extends KafkaHelper { - } + class KafkaHelper extends MdcLoggable { val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set") @@ -50,10 +50,12 @@ class KafkaHelper extends MdcLoggable { implicit val formats = DefaultFormats def getResponse(reqId: String): json.JValue = { - if (consumer == null ) - consumer = new KafkaConsumer[String, String](consumerProps) - if (consumer == null ) - return json.parse("""{"error":"kafka consumer unavailable"}""") + //if (consumer == null ) + // consumerProps.remove("group.id") + // consumerProps.put("group.id", UUID.randomUUID.toString) + // consumer = new KafkaConsumer[String, String](consumerProps) + //if (consumer == null ) + // return json.parse("""{"error":"kafka consumer unavailable"}""") var res = json.parse("""{"error":"KafkaConsumer could not fetch response"}""") try { consumer.synchronized { @@ -82,10 +84,10 @@ class KafkaHelper extends MdcLoggable { def processRequest(jsonRequest: JValue, reqId: String): JValue = { - if (producer == null ) - producer = new KafkaProducer[String, String](producerProps) - if (producer == null ) - return json.parse("""{"error":"kafka producer unavailable"}""") + //if (producer == null ) + // producer = new KafkaProducer[String, String](producerProps) + //if (producer == null ) + // return json.parse("""{"error":"kafka producer unavailable"}""") import scala.concurrent.ExecutionContext.Implicits.global val futureResponse = Future { getResponse(reqId) } try { @@ -96,10 +98,10 @@ class KafkaHelper extends MdcLoggable { case ex: ExecutionException => return json.parse(s"""{"error":"could not send message to kafka: ${ex}"}""") case t:Throwable => return json.parse(s"""{"error":"unexpected error sending message to kafka: ${t}"}""") } - Await.result(futureResponse, Duration("1 second")) - + Await.result(futureResponse, Duration("3 seconds")) } + def process(request: scala.Product): JValue = { val reqId = UUID.randomUUID().toString val mapRequest= stransferCaseClassToMap(request) @@ -107,12 +109,14 @@ class KafkaHelper extends MdcLoggable { processRequest(jsonRequest, reqId) } + def process(request: Map[String,String]): JValue = { val reqId = UUID.randomUUID().toString val jsonRequest = Extraction.decompose(request) processRequest(jsonRequest, reqId) } + /** * Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala * In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class