mirror of
https://github.com/OpenBankProject/OBP-API.git
synced 2026-02-06 17:37:00 +00:00
Merge remote-tracking branch 'upstream/develop' into develop
This commit is contained in:
commit
17cee52e3b
@ -19,5 +19,5 @@ trait KafkaConfig {
|
||||
val autoOffsetResetConfig = "earliest"
|
||||
val maxWakeups = 50
|
||||
//TODO should be less then container's timeout
|
||||
val completionTimeout = FiniteDuration(Math.max(Props.getInt("kafka.akka.timeout", 3) - 1, 1), SECONDS)
|
||||
val completionTimeout = FiniteDuration(Props.getInt("kafka.akka.timeout", 2)*1000 - 450, MILLISECONDS)
|
||||
}
|
||||
@ -18,12 +18,15 @@ import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
|
||||
|
||||
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}
|
||||
import scala.concurrent.{ExecutionException, Future, TimeoutException}
|
||||
|
||||
import akka.stream.ThrottleMode.Shaping
|
||||
|
||||
/**
|
||||
* Actor for accessing kafka from North side.
|
||||
*/
|
||||
class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable with KafkaConfig with AvroSerializer{
|
||||
class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable with KafkaConfig with AvroSerializer {
|
||||
|
||||
implicit val formats = DefaultFormats
|
||||
|
||||
@ -50,23 +53,28 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
|
||||
|
||||
private val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
// .withProperty("batch.size", "0")
|
||||
.withProperty("batch.size", "0")
|
||||
.withParallelism(20)
|
||||
//.withProperty("auto.create.topics.enable", "true")
|
||||
|
||||
private val producer = producerSettings.createKafkaProducer()
|
||||
private val producer = producerSettings
|
||||
.createKafkaProducer()
|
||||
|
||||
private val flow: (String => Source[String, Consumer.Control]) = { key =>
|
||||
consumer
|
||||
.filter(msg => msg.key() == key)
|
||||
// .filter(msg => msg.key() == key)
|
||||
.map { msg =>
|
||||
logger.info(s"${Topics.connectorTopic} with $msg")
|
||||
msg.value
|
||||
}
|
||||
logger.debug(s"${Topics.connectorTopic} with $msg")
|
||||
msg.value
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private val sendRequest: ((Topic, String, String) => Future[String]) = { (topic, key, value) =>
|
||||
producer.send(new ProducerRecord[String, String](topic.request, key, value))
|
||||
flow(key).runWith(Sink.head)
|
||||
producer.send(new ProducerRecord[String, String](topic.request, 0, key, value))
|
||||
flow(key)
|
||||
// .throttle(1, FiniteDuration(10, MILLISECONDS), 1, Shaping)
|
||||
.runWith(Sink.head)
|
||||
}
|
||||
|
||||
private val parseF: (String => Future[JsonAST.JValue]) = { r =>
|
||||
@ -77,15 +85,33 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
|
||||
Future(extractResult(r))
|
||||
}
|
||||
|
||||
val decomposeF: (Map[String, String] => Future[json.JValue]) = { m =>
|
||||
Future(Extraction.decompose(m))
|
||||
}
|
||||
|
||||
val serializeF: (json.JValue => Future[String]) = { m =>
|
||||
Future(json.compactRender(m))
|
||||
}
|
||||
|
||||
private val RESP: String = "{\"count\": \"\", \"data\": [], \"state\": \"\", \"pager\": \"\", \"target\": \"banks\"}"
|
||||
|
||||
|
||||
import akka.pattern.ask
|
||||
|
||||
override def preStart(): Unit = {
|
||||
super.preStart()
|
||||
self ? Map()
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case request: Map[String, String] =>
|
||||
logger.info("kafka_request: " + request)
|
||||
val orgSender = sender
|
||||
val f = for {
|
||||
key <- makeKeyFuture
|
||||
r <- sendRequest(Topics.connectorTopic, key, json.compactRender(Extraction.decompose(request)))
|
||||
d <- decomposeF(request)
|
||||
v <- serializeF(d)
|
||||
r <- sendRequest(Topics.connectorTopic, key, v)
|
||||
jv <- parseF(r)
|
||||
any <- extractF(jv)
|
||||
} yield {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user