mirror of
https://github.com/OpenBankProject/OBP-API.git
synced 2026-02-06 19:16:53 +00:00
fixed performance issues by reusing consumer settings
This commit is contained in:
parent
353d183535
commit
af49d5c26a
@ -2,16 +2,16 @@ package code.bankconnectors
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.{Actor, ActorRef}
|
||||
import akka.kafka.scaladsl.Consumer
|
||||
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
|
||||
import akka.kafka.{ConsumerSettings, KafkaConsumerActor, ProducerSettings, Subscriptions}
|
||||
import akka.pattern.pipe
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import code.actorsystem.{ObpActorHelper, ObpActorInit}
|
||||
import code.util.Helper.MdcLoggable
|
||||
import net.liftweb.json
|
||||
import net.liftweb.json.{DefaultFormats, Extraction}
|
||||
import net.liftweb.json.{DefaultFormats, Extraction, JsonAST}
|
||||
import net.liftweb.util.Props
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
@ -40,9 +40,11 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
|
||||
.withMaxWakeups(maxWakeups)
|
||||
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig)
|
||||
|
||||
private lazy val consumer: (String => Source[ConsumerRecord[String, String], Consumer.Control]) = { topic =>
|
||||
val assignment = Subscriptions.assignmentWithOffset(new TopicPartition(topic, 0), 0)
|
||||
Consumer.plainSource[String, String](consumerSettings, assignment)
|
||||
private val consumerActor: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))
|
||||
|
||||
private val consumer: Source[ConsumerRecord[String, String], Consumer.Control] = {
|
||||
val assignment = Subscriptions.assignmentWithOffset(new TopicPartition(Topics.connectorTopic.response, 0), 0)
|
||||
Consumer.plainExternalSource(consumerActor, assignment)
|
||||
.completionTimeout(completionTimeout)
|
||||
}
|
||||
|
||||
@ -53,38 +55,49 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
|
||||
|
||||
private val producer = producerSettings.createKafkaProducer()
|
||||
|
||||
private val flow: ((String, String, String) => Source[String, Consumer.Control]) = { (topic, key, value) =>
|
||||
consumer(topic)
|
||||
private val flow: (String => Source[String, Consumer.Control]) = { key =>
|
||||
consumer
|
||||
.filter(msg => msg.key() == key)
|
||||
.map { msg =>
|
||||
logger.info(s"$topic with $msg")
|
||||
logger.info(s"${Topics.connectorTopic} with $msg")
|
||||
msg.value
|
||||
}
|
||||
}
|
||||
|
||||
private val sendRequest: ((Topic, String, String) => Future[String]) = { (topic, key, value) =>
|
||||
producer.send(new ProducerRecord[String, String](topic.request, key, value))
|
||||
flow(topic.response, key, value).runWith(Sink.head)
|
||||
flow(key).runWith(Sink.head)
|
||||
}
|
||||
|
||||
private val parseF: (String => Future[JsonAST.JValue]) = { r =>
|
||||
Future(json.parse(r) \\ "data")
|
||||
}
|
||||
|
||||
val extractF: (JsonAST.JValue => Future[Any]) = { r =>
|
||||
Future(extractResult(r))
|
||||
}
|
||||
|
||||
private val RESP: String = "{\"count\": \"\", \"data\": [], \"state\": \"\", \"pager\": \"\", \"target\": \"banks\"}"
|
||||
|
||||
def receive = {
|
||||
case request: Map[String, String] =>
|
||||
logger.info("kafka_request: " + request)
|
||||
val orgSender = sender
|
||||
val f = for {
|
||||
key <- makeKeyFuture
|
||||
r <- sendRequest(Topics.getBanksTopic, key, json.compactRender(Extraction.decompose(request)))
|
||||
r <- sendRequest(Topics.connectorTopic, key, json.compactRender(Extraction.decompose(request)))
|
||||
jv <- parseF(r)
|
||||
any <- extractF(jv)
|
||||
} yield {
|
||||
json.parse(r) \\ "data"
|
||||
any
|
||||
}
|
||||
|
||||
f.recover {
|
||||
f recover {
|
||||
case ie: InterruptedException => json.parse(s"""{"error":"sending message to kafka interrupted: ${ie}"}""")
|
||||
case ex: ExecutionException => json.parse(s"""{"error":"could not send message to kafka: ${ex}"}""")
|
||||
case te: TimeoutException => json.parse(s"""{"error":"receiving message from kafka timed out: ${te}"}""")
|
||||
case t: Throwable => json.parse(s"""{"error":"unexpected error sending message to kafka: ${t}"}""")
|
||||
}
|
||||
f pipeTo orgSender
|
||||
} pipeTo orgSender
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,7 +107,7 @@ object Topics {
|
||||
private val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set")
|
||||
private val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set")
|
||||
|
||||
val getBanksTopic = Topic(requestTopic, responseTopic)
|
||||
val connectorTopic = Topic(requestTopic, responseTopic)
|
||||
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user