improved performances, improved kafka request processing

This commit is contained in:
slavisa 2017-05-30 09:51:49 +02:00
parent 505299a293
commit d64afc9706

View File

@ -18,12 +18,15 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}
import scala.concurrent.{ExecutionException, Future, TimeoutException}
import akka.stream.ThrottleMode.Shaping
/**
* Actor for accessing kafka from North side.
*/
class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable with KafkaConfig with AvroSerializer{
class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable with KafkaConfig with AvroSerializer {
implicit val formats = DefaultFormats
@ -50,23 +53,28 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
private val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
// .withProperty("batch.size", "0")
.withProperty("batch.size", "0")
.withParallelism(20)
//.withProperty("auto.create.topics.enable", "true")
private val producer = producerSettings.createKafkaProducer()
private val producer = producerSettings
.createKafkaProducer()
private val flow: (String => Source[String, Consumer.Control]) = { key =>
consumer
.filter(msg => msg.key() == key)
// .filter(msg => msg.key() == key)
.map { msg =>
logger.info(s"${Topics.connectorTopic} with $msg")
msg.value
}
logger.debug(s"${Topics.connectorTopic} with $msg")
msg.value
}
}
private val sendRequest: ((Topic, String, String) => Future[String]) = { (topic, key, value) =>
producer.send(new ProducerRecord[String, String](topic.request, key, value))
flow(key).runWith(Sink.head)
producer.send(new ProducerRecord[String, String](topic.request, 0, key, value))
flow(key)
// .throttle(1, FiniteDuration(10, MILLISECONDS), 1, Shaping)
.runWith(Sink.head)
}
private val parseF: (String => Future[JsonAST.JValue]) = { r =>
@ -77,15 +85,33 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
Future(extractResult(r))
}
val decomposeF: (Map[String, String] => Future[json.JValue]) = { m =>
Future(Extraction.decompose(m))
}
val serializeF: (json.JValue => Future[String]) = { m =>
Future(json.compactRender(m))
}
private val RESP: String = "{\"count\": \"\", \"data\": [], \"state\": \"\", \"pager\": \"\", \"target\": \"banks\"}"
import akka.pattern.ask
override def preStart(): Unit = {
super.preStart()
self ? Map()
}
def receive = {
case request: Map[String, String] =>
logger.info("kafka_request: " + request)
val orgSender = sender
val f = for {
key <- makeKeyFuture
r <- sendRequest(Topics.connectorTopic, key, json.compactRender(Extraction.decompose(request)))
d <- decomposeF(request)
v <- serializeF(d)
r <- sendRequest(Topics.connectorTopic, key, v)
jv <- parseF(r)
any <- extractF(jv)
} yield {