diff --git a/src/main/scala/code/bankconnectors/KafkaHelper.scala b/src/main/scala/code/bankconnectors/KafkaHelper.scala index 34c328c67..27572ea61 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelper.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelper.scala @@ -1,104 +1,44 @@ package code.bankconnectors -import java.util -import java.util.{Properties, UUID} - -import akka.actor.Actor -import code.actorsystem.ActorHelper +import net.liftweb.common.{Full, _} +import akka.pattern.ask +import akka.util.Timeout import code.util.Helper.MdcLoggable -import net.liftweb.json -import net.liftweb.json._ -import net.liftweb.util.Props -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.errors.WakeupException +import net.liftweb.json.JValue -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionException, Future} +import scala.concurrent.Await +import scala.concurrent.duration._ -//object KafkaHelperActor extends KafkaHelperActor with KafkaHelperActorInit { -// -//} -object KafkaHelper extends Actor with KafkaHelperActorInit with MdcLoggable { +object KafkaHelper extends KafkaHelper - def receive = { +trait KafkaHelper extends KafkaHelperActorInit with MdcLoggable { - case message => logger.warn("[KAFKA ACTOR ERROR - REQUEST NOT RECOGNIZED] " + message) + /** + * Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala + * In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class + * eg: case class Company(name: String, address: String) --> + * Company("TESOBE","Berlin") + * Map(name->"TESOBE", address->"2") + * + * @param caseClassObject + * @return Map[String, String] + */ + def transferCaseClassToMap(caseClassObject: scala.Product) = + caseClassObject.getClass.getDeclaredFields.map(_.getName) // all field names + .zip(caseClassObject.productIterator.to).toMap.asInstanceOf[Map[String, String]] // zipped with all values + + def process(request: scala.Product): JValue = { + val mapRequest:Map[String, String] = transferCaseClassToMap(request) + process(mapRequest) } - - val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set") - val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set") - - val producerProps = new Properties() - producerProps.put("bootstrap.servers", Props.get("kafka.bootstrap_hosts")openOr("localhost:9092")) - producerProps.put("acks", "all") - producerProps.put("retries", "0") - producerProps.put("batch.size", "16384") - producerProps.put("linger.ms", "1") - producerProps.put("buffer.memory", "33554432") - producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - val consumerProps = new Properties() - consumerProps.put("bootstrap.servers", Props.get("kafka.bootstrap_hosts")openOr("localhost:9092")) - consumerProps.put("enable.auto.commit", "false") - consumerProps.put("group.id", UUID.randomUUID.toString) - consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") - consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") - - var producer = new KafkaProducer[String, String](producerProps) - var consumer = new KafkaConsumer[String, String](consumerProps) - - implicit val formats = DefaultFormats - - def getResponse(reqId: String): String = { - var res = """{"error":"KafkaConsumer could not fetch response"}""" - try { - consumer.synchronized { - consumer.subscribe(util.Arrays.asList(responseTopic)) - var run = true - var retries = 1 - while (run && retries > 0) { - val consumerMap = consumer.poll(100) - val records = consumerMap.records(responseTopic).iterator - while (records.hasNext) { - val record = records.next - println("FILTERING: " + record + " => " + reqId) - retries = retries - 1 - if (record.key == reqId) - println("FOUND >>> " + record) - run = false - res = record.value - } - } - } - } catch { - case e: WakeupException => logger.error(e) - } - res + def process (request: Map[String, String]): JValue ={ + extractFuture(actor ? processRequest(request)) } - def processRequest(jsonRequest: JValue, reqId: String): String = { - import scala.concurrent.ExecutionContext.Implicits.global - val futureResponse = Future { getResponse(reqId) } - try { - val record = new ProducerRecord(requestTopic, reqId, json.compactRender(jsonRequest)) - producer.send(record).get - } catch { - case ie: InterruptedException => return s"""{"error":"sending message to kafka interrupted: ${ie}"}""" - case ex: ExecutionException => return s"""{"error":"could not send message to kafka: ${ex}"}""" - case t:Throwable => return s"""{"error":"unexpected error sending message to kafka: ${t}"}""" - } - Await.result(futureResponse, Duration("3 seconds")) - } - - def process(request: Map[String,String]): String = { - val reqId = UUID.randomUUID().toString - val jsonRequest = Extraction.decompose(request) - processRequest(jsonRequest, reqId) - } + case class processRequest ( + request: Map[String, String] + ) } - diff --git a/src/main/scala/code/bankconnectors/KafkaHelperActor.scala b/src/main/scala/code/bankconnectors/KafkaHelperActor.scala index f520b3674..12a4593b1 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelperActor.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelperActor.scala @@ -19,10 +19,8 @@ import org.apache.kafka.common.errors.WakeupException import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionException, Future} - class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable { - val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set") val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set") diff --git a/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala b/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala index 88a1bbb40..fac1d7206 100644 --- a/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala +++ b/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala @@ -55,12 +55,7 @@ import net.liftweb.util.Props import code.util.Helper.MdcLoggable import akka.pattern.ask -object KafkaMappedConnector extends Connector with KafkaHelperActorInit with MdcLoggable { - - def process(request: Map[String,String]): json.JValue = { - val result = actor ? request - json.parse(extractFuture(result)) \\ "data" - } +object KafkaMappedConnector extends Connector with KafkaHelper with MdcLoggable { type AccountType = KafkaBankAccount diff --git a/src/main/scala/code/bankconnectors/KafkaMappedConnector_JVMcompatible.scala b/src/main/scala/code/bankconnectors/KafkaMappedConnector_JVMcompatible.scala index 9914c7bf2..63698df5a 100644 --- a/src/main/scala/code/bankconnectors/KafkaMappedConnector_JVMcompatible.scala +++ b/src/main/scala/code/bankconnectors/KafkaMappedConnector_JVMcompatible.scala @@ -74,12 +74,7 @@ import net.liftweb.json.Extraction._ import code.util.Helper.MdcLoggable import akka.pattern.ask -object KafkaMappedConnector_JVMcompatible extends Connector with KafkaHelperActorInit with MdcLoggable { - - def process(request: Map[String,String]): json.JValue = { - val result = actor ? request - json.parse(extractFuture(result)) \\ "data" - } +object KafkaMappedConnector_JVMcompatible extends Connector with KafkaHelper with MdcLoggable { type AccountType = KafkaBankAccount diff --git a/src/main/scala/code/bankconnectors/KafkaMappedConnector_vMar2017.scala b/src/main/scala/code/bankconnectors/KafkaMappedConnector_vMar2017.scala index 2be64214f..2fefca469 100644 --- a/src/main/scala/code/bankconnectors/KafkaMappedConnector_vMar2017.scala +++ b/src/main/scala/code/bankconnectors/KafkaMappedConnector_vMar2017.scala @@ -62,30 +62,8 @@ import scala.collection.mutable.ArrayBuffer import code.util.Helper.MdcLoggable import akka.pattern.ask -object KafkaMappedConnector_vMar2017 extends Connector with KafkaHelperActorInit with MdcLoggable { - - /** - * Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala - * In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class - * eg: case class Company(name: String, address: String) --> - * Company("TESOBE","Berlin") - * Map(name->"TESOBE", address->"2") - * - * @param caseClassObject - * @return Map[String, String] - */ - def transferCaseClassToMap(caseClassObject: scala.Product) = - caseClassObject.getClass.getDeclaredFields.map(_.getName) // all field names - .zip(caseClassObject.productIterator.to).toMap.asInstanceOf[Map[String, String]] // zipped with all values - - def process(request: scala.Product): json.JValue = { - val reqId = UUID.randomUUID().toString - val mapRequest= transferCaseClassToMap(request) - val jsonRequest = Extraction.decompose(mapRequest) - val result = actor ? jsonRequest - json.parse(extractFuture(result)) \\ "data" - } +object KafkaMappedConnector_vMar2017 extends Connector with KafkaHelper with MdcLoggable { type AccountType = BankAccount2