getBanks works well now, some gliches come from OBP-Kafka-Python

This commit is contained in:
Petar Bozin 2017-04-22 21:48:30 +02:00
parent 9e823000d6
commit 5ae45d5a07

View File

@ -19,9 +19,9 @@ import scala.concurrent.{Await, ExecutionException, Future}
object KafkaHelper extends KafkaHelper {
}
class KafkaHelper extends MdcLoggable {
val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set")
@ -50,10 +50,12 @@ class KafkaHelper extends MdcLoggable {
implicit val formats = DefaultFormats
def getResponse(reqId: String): json.JValue = {
if (consumer == null )
consumer = new KafkaConsumer[String, String](consumerProps)
if (consumer == null )
return json.parse("""{"error":"kafka consumer unavailable"}""")
//if (consumer == null )
// consumerProps.remove("group.id")
// consumerProps.put("group.id", UUID.randomUUID.toString)
// consumer = new KafkaConsumer[String, String](consumerProps)
//if (consumer == null )
// return json.parse("""{"error":"kafka consumer unavailable"}""")
var res = json.parse("""{"error":"KafkaConsumer could not fetch response"}""")
try {
consumer.synchronized {
@ -82,10 +84,10 @@ class KafkaHelper extends MdcLoggable {
def processRequest(jsonRequest: JValue, reqId: String): JValue = {
if (producer == null )
producer = new KafkaProducer[String, String](producerProps)
if (producer == null )
return json.parse("""{"error":"kafka producer unavailable"}""")
//if (producer == null )
// producer = new KafkaProducer[String, String](producerProps)
//if (producer == null )
// return json.parse("""{"error":"kafka producer unavailable"}""")
import scala.concurrent.ExecutionContext.Implicits.global
val futureResponse = Future { getResponse(reqId) }
try {
@ -96,10 +98,10 @@ class KafkaHelper extends MdcLoggable {
case ex: ExecutionException => return json.parse(s"""{"error":"could not send message to kafka: ${ex}"}""")
case t:Throwable => return json.parse(s"""{"error":"unexpected error sending message to kafka: ${t}"}""")
}
Await.result(futureResponse, Duration("1 second"))
Await.result(futureResponse, Duration("3 seconds"))
}
def process(request: scala.Product): JValue = {
val reqId = UUID.randomUUID().toString
val mapRequest= stransferCaseClassToMap(request)
@ -107,12 +109,14 @@ class KafkaHelper extends MdcLoggable {
processRequest(jsonRequest, reqId)
}
def process(request: Map[String,String]): JValue = {
val reqId = UUID.randomUUID().toString
val jsonRequest = Extraction.decompose(request)
processRequest(jsonRequest, reqId)
}
/**
* Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala
* In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class