feature/embed_akka_adapter: embed akka adapter finished.

This commit is contained in:
shuang 2020-07-24 21:54:27 +08:00
parent 14416174e8
commit 75007228a6
5 changed files with 83 additions and 56 deletions

View File

@ -1,3 +1,3 @@
install:
- echo "Running a custom install command"
- mvn clean source:jar install -pl .,obp-commons && mvn source:jar install -DskipTests -pl obp-api
- mvn clean source:jar install -pl .,obp-commons && mvn source:jar install -U -DskipTests -pl obp-api

View File

@ -27,6 +27,19 @@
<groupId>com.tesobe</groupId>
<artifactId>obp-commons</artifactId>
</dependency>
<!--embed akka adapter start-->
<dependency>
<groupId>com.github.OpenBankProject.OBP-Adapter-Akka-SpringBoot</groupId>
<artifactId>adapter-akka-commons</artifactId>
<version>develop-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--embed akka adapter end-->
<dependency>
<groupId>com.github.everit-org.json-schema</groupId>
<artifactId>org.everit.json.schema</artifactId>

View File

@ -716,6 +716,9 @@ featured_apis=elasticSearchWarehouseV300
# akka_connector.name_of_actor=SOME_ACTOR_NAME
# akka connector timeout seconds, default is 3 seconds
# akka_connector.timeout=10
## When akka_connector.embed_adapter set true, will start embed akka adapter,
## Outbound akka message will be sent to this embed akka adapter's Actor. the default value is false
#akka_connector.embed_adapter=true
# --------------------------------------------------------------

View File

@ -2,10 +2,12 @@ package code.actorsystem
import akka.actor.{ActorSelection, ActorSystem}
import code.api.util.APIUtil
import code.bankconnectors.LocalMappedOutInBoundTransfer
import code.bankconnectors.akka.actor.{AkkaConnectorActorConfig, AkkaConnectorHelperActor}
import code.util.Helper
import code.util.Helper.MdcLoggable
import code.webhook.WebhookHelperActors
import com.openbankproject.adapter.akka.commons.config.AkkaConfig
import com.typesafe.config.ConfigFactory
import net.liftweb.common.Full
@ -106,8 +108,10 @@ trait ObpLookupSystem extends MdcLoggable {
val hostname = APIUtil.getPropsValue("akka_connector.hostname")
val port = APIUtil.getPropsValue("akka_connector.port")
val embedAdapter = APIUtil.getPropsAsBoolValue("akka_connector.embed_adapter", false)
val actorPath: String = (hostname, port) match {
case (Full(h), Full(p)) =>
case (Full(h), Full(p)) if !embedAdapter =>
val hostname = h
val port = p
val akka_connector_hostname = Helper.getAkkaConnectorHostname
@ -120,7 +124,13 @@ trait ObpLookupSystem extends MdcLoggable {
if (port == 0) {
logger.error("Failed to find an available port.")
}
AkkaConnectorHelperActor.startAkkaConnectorHelperActors(ObpActorSystem.northSideAkkaConnectorActorSystem)
if(embedAdapter) {
AkkaConfig(LocalMappedOutInBoundTransfer, Some(ObpActorSystem.northSideAkkaConnectorActorSystem))
} else {
AkkaConnectorHelperActor.startAkkaConnectorHelperActors(ObpActorSystem.northSideAkkaConnectorActorSystem)
}
s"akka.tcp://SouthSideAkkaConnector_${props_hostname}@${hostname}:${port}/user/${actorName}"
}
this.obpLookupSystem.actorSelection(actorPath)

View File

@ -64,64 +64,65 @@ object ConnectorUtils {
jObj.extract[InBoundTrait[Any]](formats, mainFest).data
}
}
lazy val outInboundTransferLocalMapped: OutInBoundTransfer = new OutInBoundTransfer{
private val ConnectorMethodRegex = "(?i)OutBound(.)(.+)".r
private val connector:Connector = LocalMappedConnector
private val queryParamType = universe.typeOf[List[OBPQueryParam]]
private val callContextType = universe.typeOf[Option[CallContext]]
private implicit val formats = CustomJsonFormats.nullTolerateFormats
object LocalMappedOutInBoundTransfer extends OutInBoundTransfer {
private val ConnectorMethodRegex = "(?i)OutBound(.)(.+)".r
private lazy val connector: Connector = LocalMappedConnector
private val queryParamType = universe.typeOf[List[OBPQueryParam]]
private val callContextType = universe.typeOf[Option[CallContext]]
private implicit val formats = CustomJsonFormats.nullTolerateFormats
override def transfer(outbound: TopicTrait): Future[InBoundTrait[_]] = {
val connectorMethod: String = outbound.getClass.getSimpleName match {
case ConnectorMethodRegex(x, y) => s"${x.toLowerCase()}$y"
case x => x
}
implicit val inboundMainFest = ManifestFactory.classType[InBoundTrait[_]](Class.forName(s"com.openbankproject.commons.dto.OutBound${connectorMethod.capitalize}"))
connector.implementedMethods.get(connectorMethod) match {
case None => Future.failed(new IllegalArgumentException(s"Outbound instance $outbound have no corresponding method in the ${connector.getClass.getSimpleName}"))
case Some(method) =>
val nameToValue = outbound.nameToValue.toMap
val argNameToType: List[(String, universe.Type)] = method.paramLists.head.map(it => it.name.decodedName.toString.trim -> it.info)
val connectorMethodArgs: List[Any] = argNameToType collect {
case (_, tp) if tp <:< callContextType => None // For connector method parameter `callContext: Option[CallContext]`, just pass None
case (_, tp) if tp <:< queryParamType =>
val limit = nameToValue("limit").asInstanceOf[Int]
val offset = nameToValue("offset").asInstanceOf[Int]
val fromDate = nameToValue("fromDate").asInstanceOf[String]
val toDate = nameToValue("toDate").asInstanceOf[String]
val queryParams: List[OBPQueryParam] = OBPQueryParam.toOBPQueryParams(limit, offset, fromDate, toDate)
queryParams
case (name, _) => nameToValue(name)
}
val connectorResult = ReflectUtils.invokeMethod(connector, method, connectorMethodArgs: _*)
val futureResult: Future[_] = transferConnectorResult(connectorResult)
futureResult.asInstanceOf[Future[InBoundTrait[_]]]
}
override def transfer(outbound: TopicTrait): Future[InBoundTrait[_]] = {
val connectorMethod: String = outbound.getClass.getSimpleName match {
case ConnectorMethodRegex(x, y) => s"${x.toLowerCase()}$y"
case x => x
}
val clazz = Class.forName(s"com.openbankproject.commons.dto.InBound${connectorMethod.capitalize}")
implicit val inboundMainFest: Manifest[InBoundTrait[_]] = ManifestFactory.classType[InBoundTrait[_]](clazz)
private def transferConnectorResult(any: Any)(implicit inboundMainFest: Manifest[Any]): Future[_] = any match {
case x: Future[_] => x.map { it =>
val dataJson = json.Extraction.decompose(getData(it))
val inboundJson: JObject = "data" -> dataJson
inboundJson.extract[Any](formats, inboundMainFest)
}
case x =>
Future{
val dataJson = json.Extraction.decompose(getData(x))
val inboundJson: JObject = "data" -> dataJson
inboundJson.extract[Any](formats, inboundMainFest)
connector.implementedMethods.get(connectorMethod) match {
case None => Future.failed(new IllegalArgumentException(s"Outbound instance $outbound have no corresponding method in the ${connector.getClass.getSimpleName}"))
case Some(method) =>
val nameToValue = outbound.nameToValue.toMap
val argNameToType: List[(String, universe.Type)] = method.paramLists.head.map(it => it.name.decodedName.toString.trim -> it.info)
val connectorMethodArgs: List[Any] = argNameToType collect {
case (_, tp) if tp <:< callContextType => None // For connector method parameter `callContext: Option[CallContext]`, just pass None
case (_, tp) if tp <:< queryParamType =>
val limit = nameToValue("limit").asInstanceOf[Int]
val offset = nameToValue("offset").asInstanceOf[Int]
val fromDate = nameToValue("fromDate").asInstanceOf[String]
val toDate = nameToValue("toDate").asInstanceOf[String]
val queryParams: List[OBPQueryParam] = OBPQueryParam.toOBPQueryParams(limit, offset, fromDate, toDate)
queryParams
case (name, _) => nameToValue(name)
}
}
// connector methods return different type value, this method just extract value for InboundXX#data
private def getData(any: Any): Any = any match {
case (Full(v), _: Option[CallContext]) => v
case (v, _: Option[CallContext]) => v
case Full((v, _: Option[CallContext])) => v
case Full(v) => v
case v => v
val connectorResult = ReflectUtils.invokeMethod(connector, method, connectorMethodArgs: _*)
val futureResult: Future[InBoundTrait[_]] = transferConnectorResult(connectorResult)
futureResult
}
}
private def transferConnectorResult(any: Any)(implicit inboundMainFest: Manifest[InBoundTrait[_]]): Future[InBoundTrait[_]] = any match {
case x: Future[_] => x.map { it =>
val dataJson = json.Extraction.decompose(getData(it))
val inboundJson: JObject = "data" -> dataJson
inboundJson.extract[InBoundTrait[_]](formats, inboundMainFest)
}
case x =>
Future{
val dataJson = json.Extraction.decompose(getData(x))
val inboundJson: JObject = "data" -> dataJson
inboundJson.extract[InBoundTrait[_]](formats, inboundMainFest)
}
}
// connector methods return different type value, this method just extract value for InboundXX#data
private def getData(any: Any): Any = any match {
case (Full(v), _: Option[CallContext]) => v
case (v, _: Option[CallContext]) => v
case Full((v, _: Option[CallContext])) => v
case Full(v) => v
case v => v
}
}