From d64afc97063aa4942e0e04d451e7b442e1037904 Mon Sep 17 00:00:00 2001 From: slavisa Date: Tue, 30 May 2017 09:51:49 +0200 Subject: [PATCH] improved performances, improved kafka request processing --- .../bankconnectors/kafkaStremsHelper.scala | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/src/main/scala/code/bankconnectors/kafkaStremsHelper.scala b/src/main/scala/code/bankconnectors/kafkaStremsHelper.scala index 088060545..b41e18e9f 100644 --- a/src/main/scala/code/bankconnectors/kafkaStremsHelper.scala +++ b/src/main/scala/code/bankconnectors/kafkaStremsHelper.scala @@ -18,12 +18,15 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import scala.concurrent.duration.{FiniteDuration, MILLISECONDS} import scala.concurrent.{ExecutionException, Future, TimeoutException} +import akka.stream.ThrottleMode.Shaping + /** * Actor for accessing kafka from North side. */ -class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable with KafkaConfig with AvroSerializer{ +class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable with KafkaConfig with AvroSerializer { implicit val formats = DefaultFormats @@ -50,23 +53,28 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe private val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) - // .withProperty("batch.size", "0") + .withProperty("batch.size", "0") + .withParallelism(20) //.withProperty("auto.create.topics.enable", "true") - private val producer = producerSettings.createKafkaProducer() + private val producer = producerSettings + .createKafkaProducer() private val flow: (String => Source[String, Consumer.Control]) = { key => consumer - .filter(msg => msg.key() == key) + // .filter(msg => msg.key() == key) .map { msg => - logger.info(s"${Topics.connectorTopic} with $msg") - msg.value - } + logger.debug(s"${Topics.connectorTopic} with $msg") + msg.value + } } + private val sendRequest: ((Topic, String, String) => Future[String]) = { (topic, key, value) => - producer.send(new ProducerRecord[String, String](topic.request, key, value)) - flow(key).runWith(Sink.head) + producer.send(new ProducerRecord[String, String](topic.request, 0, key, value)) + flow(key) + // .throttle(1, FiniteDuration(10, MILLISECONDS), 1, Shaping) + .runWith(Sink.head) } private val parseF: (String => Future[JsonAST.JValue]) = { r => @@ -77,15 +85,33 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe Future(extractResult(r)) } + val decomposeF: (Map[String, String] => Future[json.JValue]) = { m => + Future(Extraction.decompose(m)) + } + + val serializeF: (json.JValue => Future[String]) = { m => + Future(json.compactRender(m)) + } + private val RESP: String = "{\"count\": \"\", \"data\": [], \"state\": \"\", \"pager\": \"\", \"target\": \"banks\"}" + + import akka.pattern.ask + + override def preStart(): Unit = { + super.preStart() + self ? Map() + } + def receive = { case request: Map[String, String] => logger.info("kafka_request: " + request) val orgSender = sender val f = for { key <- makeKeyFuture - r <- sendRequest(Topics.connectorTopic, key, json.compactRender(Extraction.decompose(request))) + d <- decomposeF(request) + v <- serializeF(d) + r <- sendRequest(Topics.connectorTopic, key, v) jv <- parseF(r) any <- extractF(jv) } yield {