Compile passes, execution still fails in Boot (kafka actor related)

This commit is contained in:
Petar Bozin 2017-04-25 11:16:10 +02:00
parent 273a6843b1
commit e5bbf8295c
5 changed files with 33 additions and 127 deletions

View File

@ -1,104 +1,44 @@
package code.bankconnectors
import java.util
import java.util.{Properties, UUID}
import akka.actor.Actor
import code.actorsystem.ActorHelper
import net.liftweb.common.{Full, _}
import akka.pattern.ask
import akka.util.Timeout
import code.util.Helper.MdcLoggable
import net.liftweb.json
import net.liftweb.json._
import net.liftweb.util.Props
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors.WakeupException
import net.liftweb.json.JValue
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionException, Future}
import scala.concurrent.Await
import scala.concurrent.duration._
//object KafkaHelperActor extends KafkaHelperActor with KafkaHelperActorInit {
//
//}
object KafkaHelper extends Actor with KafkaHelperActorInit with MdcLoggable {
object KafkaHelper extends KafkaHelper
def receive = {
trait KafkaHelper extends KafkaHelperActorInit with MdcLoggable {
case message => logger.warn("[KAFKA ACTOR ERROR - REQUEST NOT RECOGNIZED] " + message)
/**
* Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala
* In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class
* eg: case class Company(name: String, address: String) -->
* Company("TESOBE","Berlin")
* Map(name->"TESOBE", address->"2")
*
* @param caseClassObject
* @return Map[String, String]
*/
def transferCaseClassToMap(caseClassObject: scala.Product) =
caseClassObject.getClass.getDeclaredFields.map(_.getName) // all field names
.zip(caseClassObject.productIterator.to).toMap.asInstanceOf[Map[String, String]] // zipped with all values
def process(request: scala.Product): JValue = {
val mapRequest:Map[String, String] = transferCaseClassToMap(request)
process(mapRequest)
}
val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set")
val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set")
val producerProps = new Properties()
producerProps.put("bootstrap.servers", Props.get("kafka.bootstrap_hosts")openOr("localhost:9092"))
producerProps.put("acks", "all")
producerProps.put("retries", "0")
producerProps.put("batch.size", "16384")
producerProps.put("linger.ms", "1")
producerProps.put("buffer.memory", "33554432")
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", Props.get("kafka.bootstrap_hosts")openOr("localhost:9092"))
consumerProps.put("enable.auto.commit", "false")
consumerProps.put("group.id", UUID.randomUUID.toString)
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
var producer = new KafkaProducer[String, String](producerProps)
var consumer = new KafkaConsumer[String, String](consumerProps)
implicit val formats = DefaultFormats
def getResponse(reqId: String): String = {
var res = """{"error":"KafkaConsumer could not fetch response"}"""
try {
consumer.synchronized {
consumer.subscribe(util.Arrays.asList(responseTopic))
var run = true
var retries = 1
while (run && retries > 0) {
val consumerMap = consumer.poll(100)
val records = consumerMap.records(responseTopic).iterator
while (records.hasNext) {
val record = records.next
println("FILTERING: " + record + " => " + reqId)
retries = retries - 1
if (record.key == reqId)
println("FOUND >>> " + record)
run = false
res = record.value
}
}
}
} catch {
case e: WakeupException => logger.error(e)
}
res
def process (request: Map[String, String]): JValue ={
extractFuture(actor ? processRequest(request))
}
def processRequest(jsonRequest: JValue, reqId: String): String = {
import scala.concurrent.ExecutionContext.Implicits.global
val futureResponse = Future { getResponse(reqId) }
try {
val record = new ProducerRecord(requestTopic, reqId, json.compactRender(jsonRequest))
producer.send(record).get
} catch {
case ie: InterruptedException => return s"""{"error":"sending message to kafka interrupted: ${ie}"}"""
case ex: ExecutionException => return s"""{"error":"could not send message to kafka: ${ex}"}"""
case t:Throwable => return s"""{"error":"unexpected error sending message to kafka: ${t}"}"""
}
Await.result(futureResponse, Duration("3 seconds"))
}
def process(request: Map[String,String]): String = {
val reqId = UUID.randomUUID().toString
val jsonRequest = Extraction.decompose(request)
processRequest(jsonRequest, reqId)
}
case class processRequest (
request: Map[String, String]
)
}

View File

@ -19,10 +19,8 @@ import org.apache.kafka.common.errors.WakeupException
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionException, Future}
class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable {
val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set")
val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set")

View File

@ -55,12 +55,7 @@ import net.liftweb.util.Props
import code.util.Helper.MdcLoggable
import akka.pattern.ask
object KafkaMappedConnector extends Connector with KafkaHelperActorInit with MdcLoggable {
def process(request: Map[String,String]): json.JValue = {
val result = actor ? request
json.parse(extractFuture(result)) \\ "data"
}
object KafkaMappedConnector extends Connector with KafkaHelper with MdcLoggable {
type AccountType = KafkaBankAccount

View File

@ -74,12 +74,7 @@ import net.liftweb.json.Extraction._
import code.util.Helper.MdcLoggable
import akka.pattern.ask
object KafkaMappedConnector_JVMcompatible extends Connector with KafkaHelperActorInit with MdcLoggable {
def process(request: Map[String,String]): json.JValue = {
val result = actor ? request
json.parse(extractFuture(result)) \\ "data"
}
object KafkaMappedConnector_JVMcompatible extends Connector with KafkaHelper with MdcLoggable {
type AccountType = KafkaBankAccount

View File

@ -62,30 +62,8 @@ import scala.collection.mutable.ArrayBuffer
import code.util.Helper.MdcLoggable
import akka.pattern.ask
object KafkaMappedConnector_vMar2017 extends Connector with KafkaHelperActorInit with MdcLoggable {
/**
* Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala
* In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class
* eg: case class Company(name: String, address: String) -->
* Company("TESOBE","Berlin")
* Map(name->"TESOBE", address->"2")
*
* @param caseClassObject
* @return Map[String, String]
*/
def transferCaseClassToMap(caseClassObject: scala.Product) =
caseClassObject.getClass.getDeclaredFields.map(_.getName) // all field names
.zip(caseClassObject.productIterator.to).toMap.asInstanceOf[Map[String, String]] // zipped with all values
def process(request: scala.Product): json.JValue = {
val reqId = UUID.randomUUID().toString
val mapRequest= transferCaseClassToMap(request)
val jsonRequest = Extraction.decompose(mapRequest)
val result = actor ? jsonRequest
json.parse(extractFuture(result)) \\ "data"
}
object KafkaMappedConnector_vMar2017 extends Connector with KafkaHelper with MdcLoggable {
type AccountType = BankAccount2