Merge remote-tracking branch 'upstream/develop' into develop

This commit is contained in:
Marko Milić 2025-03-10 06:45:14 +01:00
commit b7b6fffed9
62 changed files with 160 additions and 11967 deletions

View File

@ -69,9 +69,6 @@ jobs:
echo COUNTERPARTY_OTP_INSTRUCTION_TRANSPORT=dummy >> obp-api/src/main/resources/props/test.default.props
echo SEPA_CREDIT_TRANSFERS_OTP_INSTRUCTION_TRANSPORT=dummy >> obp-api/src/main/resources/props/test.default.props
echo kafka.akka.timeout = 9 >> obp-api/src/main/resources/props/test.default.props
echo remotedata.timeout = 10 >> obp-api/src/main/resources/props/test.default.props
echo allow_oauth2_login=true >> obp-api/src/main/resources/props/test.default.props
echo oauth2.jwk_set.url=https://www.googleapis.com/oauth2/v3/certs >> obp-api/src/main/resources/props/test.default.props

View File

@ -69,9 +69,6 @@ jobs:
echo COUNTERPARTY_OTP_INSTRUCTION_TRANSPORT=dummy >> obp-api/src/main/resources/props/test.default.props
echo SEPA_CREDIT_TRANSFERS_OTP_INSTRUCTION_TRANSPORT=dummy >> obp-api/src/main/resources/props/test.default.props
echo kafka.akka.timeout = 9 >> obp-api/src/main/resources/props/test.default.props
echo remotedata.timeout = 10 >> obp-api/src/main/resources/props/test.default.props
echo allow_oauth2_login=true >> obp-api/src/main/resources/props/test.default.props
echo oauth2.jwk_set.url=https://www.googleapis.com/oauth2/v3/certs >> obp-api/src/main/resources/props/test.default.props

View File

@ -65,8 +65,6 @@ jobs:
echo COUNTERPARTY_OTP_INSTRUCTION_TRANSPORT=dummy >> obp-api/src/main/resources/props/test.default.props
echo SEPA_CREDIT_TRANSFERS_OTP_INSTRUCTION_TRANSPORT=dummy >> obp-api/src/main/resources/props/test.default.props
echo kafka.akka.timeout = 9 >> obp-api/src/main/resources/props/test.default.props
echo remotedata.timeout = 10 >> obp-api/src/main/resources/props/test.default.props
echo allow_oauth2_login=true >> obp-api/src/main/resources/props/test.default.props
echo oauth2.jwk_set.url=https://www.googleapis.com/oauth2/v3/certs >> obp-api/src/main/resources/props/test.default.props

View File

@ -1,26 +0,0 @@
## Kafka Quickstart
Note that obp with kafka connector will also need a bank backend connected to the kafka that implements the following: https://apiexplorersandbox.openbankproject.com/glossary#Adapter.Kafka.Intro
Otherwise obp will display anything but adapter errors.
#####Configuration
* Edit the OBP-API/obp-api/src/main/resources/props/default.props so that it contains the following lines:
connector=kafka_vMay2019
# kafka server location, plaintext for quickstart
kafka.host=localhost:9092
# next 2 lines for legacy resons
kafka.request_topic=Request
kafka.response_topic=Response
# number of partitions available on kafka. Must match the kafka configuration!!!!!!.
kafka.partitions=1
# no ssl for quickstart
kafka.use.ssl=false
# start with 1 for the first instance, set to 2 for the second instance etc
api_instance_id=1

View File

@ -374,17 +374,6 @@ We use 9 to run the API in production mode.
Most internal OBP model data access now occurs over Akka. This is so the machine that has JDBC access to the OBP database can be physically separated from the OBP API layer. In this configuration we run two instances of OBP-API on two different machines and they communicate over Akka. Please see README.Akka.md for instructions.
## Using SSL Encryption with kafka
For SSL encryption we use JKS keystores. Note that both the keystore and the truststore (and all keys within) must have the same password for unlocking, for which the API will stop at boot up and ask for.
* Edit your props file(s) to contain:
```
kafka.use.ssl=true
keystore.path=/path/to/api.keystore.jks
truststore.path=/path/to/api.truststore.jks
```
## Using SSL Encryption with RabbitMq

View File

@ -24,8 +24,6 @@
[Access Control](https://apiexplorersandbox.openbankproject.com/glossary#API.Access-Control)
[OBP Kafka](https://apiexplorersandbox.openbankproject.com/glossary#Adapter.Kafka.Intro)
[OBP Akka](https://apiexplorersandbox.openbankproject.com/glossary#Adapter.Akka.Intro)
[API Explorer](https://github.com/OpenBankProject/API-Explorer/blob/develop/README.md)

View File

@ -278,10 +278,6 @@ Support for on premise OAuth2 provider e.g. MitreId. See the glossary.
### Message Docs (for Akka)
Message Docs (which define Core Banking System Akka messages) are now available independent of the connector being used on the API instance. See [here](https://apiexplorersandbox.openbankproject.com/?ignoredefcat=true&tags=#v2_2_0-getMessageDocs)
### Message Docs (for Kafka)
Message Docs (which define Core Banking System Kafka messages) are now available independent of the connector being used on the API instance. See [here](https://apiexplorersandbox.openbankproject.com/?ignoredefcat=true&tags=#v2_2_0-getMessageDocs)
### Endpoint config and cleanup
Endpoints can now be enabled / disabled explicitly using Props file.
We removed old versions including v1.0, v1.1 and v.1.2.
@ -302,15 +298,11 @@ We added Custom code folders so that bank specific forks can more easily git mer
### API Tester
API Tester is a Python/Djano App for testing an OBP API instance from the outside. Partiularly useful when using a non-sandbox (e.g. kafka) connector. It supports a variety of authentication methods so you can test outside a gateway. You can configure different data profiles for specifying parameters such as bank_id, account_id etc. See [here](https://github.com/OpenBankProject/API-Tester) for the source code and installation instructions.
API Tester is a Python/Djano App for testing an OBP API instance from the outside. Partiularly useful when using a non-sandbox (e.g. RabbitMq) connector. It supports a variety of authentication methods so you can test outside a gateway. You can configure different data profiles for specifying parameters such as bank_id, account_id etc. See [here](https://github.com/OpenBankProject/API-Tester) for the source code and installation instructions.
### Extend Swagger support
We improved the information contained in the Swagger (and Resource Doc) endpoints. They are also available from the API Explorer. See [here](https://apiexplorersandbox.openbankproject.com/?ignoredefcat=true&tags=#v1_4_0-getResourceDocsSwagger)
### Kafka versioning
The built in kafka connectors now provide message versioning
### Akka Remote data (Three tier architechture)
Most OBP data access now happens over Akka. This allows the API layer to be physically separated from the storage layer with the API layer only able to call a specified set of data access functions with only the storage layer having JDBC / SQL access.
@ -428,13 +420,6 @@ is used to explore and interact with the OBP API. See [API Explorer on Sandbox](
See [Resource Docs endpoint](https://api.openbankproject.com/obp/v1.4.0/resource-docs/obp)
### Kafka connector
* Get transactions via Kafka bus and language neutral connector on the south side of the MQ
See [Docker obp-full-kafka](https://hub.docker.com/r/openbankproject/obp-full-kafka/)
### Version 1.4.0
This version is stable. For the spec see [here](https://github.com/OpenBankProject/OBP-API/wiki/REST-API-V1.4.0) or [here](https://apiexplorersandbox.openbankproject.com/?version=1.4.0&list-all-banks=false&core=&psd2=&obwg=&ignoredefcat=true)

View File

@ -79,16 +79,6 @@
<groupId>org.slf4j</groupId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpg-jdk15on</artifactId>
@ -253,11 +243,6 @@
<artifactId>akka-remote_${scala.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_${scala.version}</artifactId>
<version>${akka-streams-kafka.version}</version>
</dependency>
<dependency>
<groupId>com.sksamuel.avro4s</groupId>
<artifactId>avro4s-core_${scala.version}</artifactId>
@ -512,14 +497,6 @@
<version>1.20.3</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.testcontainers/kafka -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.20.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -947,15 +947,11 @@ Este error no puede ser mostrado al usuario, sólo para depuración.
OBP-50004 = método (AuthUser.getCurrentUser) no puede encontrar el usuario actual en el contexto actual!
OBP-50005 = ha producido un error interno o no especificado.
OBP-50006 = interrumpió la excepción.
OBP-50007 = de ejecución de Kafka.
OBP-50008 = de tiempo de espera del flujo Kafka de Akka.
OBP-50009 = desconocido de Kafka.
OBP-50010 = devuelve la caja vacía a Liftweb.
OBP-50012 = se puede obtener el objeto CallContext aquí.
OBP-50013 = sistema bancario central devolvió un error o una respuesta no especificada.
OBP-50014 = se puede actualizar el usuario.
OBP-50015 = servidor encontró una condición inesperada que le impidió cumplir con la solicitud.
OBP-50016 = servidor kafka no está disponible.
OBP-50017 = punto final está prohibido en esta instancia de la API.
OBP-50018 = de construcción.
OBP-50019 = se puede conectar a la base de datos OBP.
@ -981,7 +977,6 @@ OBP-50217 = no devolvió la transacción que solicitamos.
OBP-50218 = conector no devolvió el conjunto de etiquetas de punto final que solicitamos.
OBP-50219 = no devolvió las cuentas bancarias que solicitamos.
#Excepciones del adaptador (OBP-6XXXX)
#Reservado para mensajes del adaptador (al sur de Kafka)
#También se utiliza para el conector == mapeado, y mostrarlo como los errores internos.
OBP-60001 = excepción de transacción.
OBP-60002 = la Excepción de Valor de Carga.

View File

@ -31,7 +31,7 @@ connector=star
#hikari.keepaliveTime=
#hikari.maxLifetime=
## if connector = star, then need to set which connectors will be used. For now, obp support rest, akka, kafka. If you set kafka, then you need to start the kafka server.
## if connector = star, then need to set which connectors will be used. For now, obp support rest, akka.
starConnector_supported_types=mapped,internal
## whether export LocalMappedConnector methods as endpoints, it is just for develop, default is false
@ -125,24 +125,6 @@ long_endpoint_timeout = 55000
## Scheduler will be disabled if delay is not set.
#transaction_status_scheduler_delay=300
## If using kafka, set the brokers
#kafka.bootstrap_hosts=localhost:9092
# WARNING: if this number does not match the partitions in Kafka config, you will SUFFER !
#kafka.partitions=3
#This is the api instance, we create kafka topic based on this number, each instance should have each own id. use it in load balancing + Kafka setup
#This is also used for scheduler.
#OBP set the default as the non-persistent UUID string.
#api_instance_id=7uy8a7e4-6d02-40e3-a129-0b2bf89de8uh
#If a value is set, OBP will concatenate the UUID string with the value.
#api_instance_id=1
#When the value is set to conclude with "final," OBP will preserve that value.
#api_instance_id=1_final
## DEPRECATED
## Enable user authentication via kafka
#kafka.user.authentication=true
## Enable user authentication via the connector
#connector.user.authentication=true
@ -150,9 +132,6 @@ long_endpoint_timeout = 55000
## Enable SSL for JWT, if set to true must set paths for the keystore locations
jwt.use.ssl=false
## Enable SSL for kafka, if set to true must set paths for the keystore locations
#kafka.use.ssl=true
## Enable SSL for rabbitmq, if set to true must set paths for the keystore locations
#rabbitmq.use.ssl=false
@ -710,9 +689,6 @@ autocomplete_at_login_form_enabled=false
# To BYPASS this security features (for local development only), set this property to true to skip the email address validation.
#authUser.skipEmailValidation=false
# If using Kafka but want to get counterparties from OBP, set this to true
#get_counterparties_from_OBP_DB=true
# control the create and access to public views.
# allow_public_views=false
@ -756,7 +732,7 @@ dauth.host=127.0.0.1
# }
# When is enabled we show all messages in a chain. For instance:
# {
# "error": "OBP-30001: Bank not found. Please specify a valid value for BANK_ID. <- Full(Kafka_TimeoutExceptionjava.util.concurrent.TimeoutException: The stream has not been completed in 1550 milliseconds.)"
# "error": "OBP-30001: Bank not found. Please specify a valid value for BANK_ID. <- Full(TimeoutExceptionjava.util.concurrent.TimeoutException: The stream has not been completed in 1550 milliseconds.)"
# }
display_internal_errors=false
# -------------------------------------- Display internal errors --

View File

@ -18,7 +18,6 @@
#which data connector to use
#connector=rest
#connector=kafka
#connector=obpjvm
## proxy connector get data from LocalMappedConnector, and set the follow corresponding fields to be null: @optional, inbound.optional.fields props, outbound.optional.fields props
#connector=proxy
@ -28,17 +27,6 @@ starConnector_supported_types = mapped,internal
# Connector cache time-to-live in seconds, caching disabled if not set
#connector.cache.ttl.seconds=3
# OBP-JVM transport type. currently supported: kafka, mock
#obpjvm.transport=kafka
#if using kafka, set zookeeper host and brokers
#defaults to "localhost:2181" if not set
#kafka.zookeeper_host=localhost:2181
#kafka.bootstrap_hosts=localhost:9092
#if using kafka, the following is mandatory
#kafka.request_topic=Request
#kafka.response_topic=Response
#this is needed for oauth to work. it's important to access the api over this url, e.g.
# if this is 127.0.0.1 don't use localhost to access it.

View File

@ -80,7 +80,6 @@ import code.endpointTag.EndpointTag
import code.entitlement.{Entitlement, MappedEntitlement}
import code.entitlementrequest.MappedEntitlementRequest
import code.fx.{MappedCurrency, MappedFXRate}
import code.kafka.{KafkaHelperActors, OBPKafkaConsumer}
import code.kycchecks.MappedKycCheck
import code.kycdocuments.MappedKycDocument
import code.kycmedias.MappedKycMedia
@ -540,13 +539,6 @@ class Boot extends MdcLoggable {
}
if (connector.startsWith("kafka") || (connector == "star" && APIUtil.getPropsValue("starConnector_supported_types","").split(",").contains("kafka"))) {
logger.info(s"KafkaHelperActors.startLocalKafkaHelperWorkers( ${actorSystem} ) starting")
KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem)
// Start North Side Consumer if it's not already started
OBPKafkaConsumer.primaryConsumer.start()
}
// API Metrics (logs of API calls)
// If set to true we will write each URL with params to a datastore / log file
if (APIUtil.getPropsAsBoolValue("write_metrics", false)) {

View File

@ -1,53 +0,0 @@
package code.actorsystem
import akka.util.Timeout
import code.api.APIFailure
import code.api.util.APIUtil
import code.util.Helper.MdcLoggable
import net.liftweb.common._
import com.openbankproject.commons.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.reflect.ClassTag
trait ObpActorInit extends MdcLoggable{
// Default is 3 seconds, which should be more than enough for slower systems
val ACTOR_TIMEOUT: Long = APIUtil.getPropsAsLongValue("remotedata.timeout").openOr(3)
val actorName = CreateActorNameFromClassName(this.getClass.getName)
val actor = ObpLookupSystem.getActor(actorName)
logger.debug(s"Create this Actor: $actorName: ${actor}")
val TIMEOUT = (ACTOR_TIMEOUT seconds)
implicit val timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds))
/**
* This function extracts the payload from Future and wraps it to Box.
* It is used for Old Style Endpoints at Kafka connector.
* @param f The payload wrapped into Future
* @tparam T The type of the payload
* @return The payload wrapped into Box
*/
def extractFutureToBox[T: ClassTag](f: Future[Any]): Box[T] = {
val r: Future[Box[T]] = f.map {
case f@ (_: ParamFailure[_] | _: APIFailure) => Empty ~> f
case f: Failure => f
case Empty => Empty
case t: T => Full(t)
case _ => Empty ~> APIFailure("future extraction to box failed", 501)
}
Await.result(r, TIMEOUT)
}
def getValueFromFuture[T](f: Future[T]): T = {
Await.result(f, TIMEOUT)
}
def CreateActorNameFromClassName(c: String): String = {
val n = c.replaceFirst("^.*Remotedata", "").replaceAll("\\$.*", "")
Character.toLowerCase(n.charAt(0)) + n.substring(1)
}
}

View File

@ -28,34 +28,6 @@ trait ObpLookupSystem extends MdcLoggable {
obpLookupSystem
}
def getKafkaActor(actorName: String) = {
val actorPath: String = {
val hostname = ObpActorConfig.localHostname
val port = ObpActorConfig.localPort
val props_hostname = Helper.getHostname
if (port == 0) {
logger.error("Failed to connect to local Kafka actor")
}
s"akka.tcp://ObpActorSystem_${props_hostname}@${hostname}:${port}/user/${actorName}"
}
this.obpLookupSystem.actorSelection(actorPath)
}
def getKafkaActorChild(actorName: String, actorChildName: String) = {
val actorPath: String = {
val hostname = ObpActorConfig.localHostname
val port = ObpActorConfig.localPort
val props_hostname = Helper.getHostname
if (port == 0) {
logger.error("Failed to connect to local Kafka actor")
}
s"akka.tcp://ObpActorSystem_${props_hostname}@${hostname}:${port}/user/${actorName}/${actorChildName}"
}
this.obpLookupSystem.actorSelection(actorPath)
}
def getActor(actorName: String) = {
val actorPath: String = {

View File

@ -246,7 +246,7 @@ trait OBPRestHelper extends RestHelper with MdcLoggable {
# }
# When is enabled we show all messages in a chain. For instance:
# {
# "error": "OBP-30001: Bank not found. Please specify a valid value for BANK_ID. <- Full(Kafka_TimeoutExceptionjava.util.concurrent.TimeoutException: The stream has not been completed in 1550 milliseconds.)"
# "error": "OBP-30001: Bank not found. Please specify a valid value for BANK_ID. <- Full(TimeoutExceptionjava.util.concurrent.TimeoutException: The stream has not been completed in 1550 milliseconds.)"
# }
*/
implicit def jsonResponseBoxToJsonResponse(box: Box[JsonResponse]): JsonResponse = {

View File

@ -421,8 +421,7 @@ trait ResourceDocsAPIMethods extends MdcLoggable with APIMethods220 with APIMeth
val (tags, partialFunctions, locale, contentParam, apiCollectionIdParam) = ResourceDocsAPIMethodsUtil.getParams()
cc =>
implicit val ec = EndpointContext(Some(cc))
val resourceDocs = getApiLevelResourceDocs(cc,requestedApiVersionString, tags, partialFunctions, locale, contentParam, apiCollectionIdParam,false)
resourceDocs
getApiLevelResourceDocs(cc,requestedApiVersionString, tags, partialFunctions, locale, contentParam, apiCollectionIdParam,false)
}
}
@ -446,8 +445,7 @@ trait ResourceDocsAPIMethods extends MdcLoggable with APIMethods220 with APIMeth
val (tags, partialFunctions, locale, contentParam, apiCollectionIdParam) = ResourceDocsAPIMethodsUtil.getParams()
cc =>
implicit val ec = EndpointContext(Some(cc))
val resourceDocs = getApiLevelResourceDocs(cc,requestedApiVersionString, tags, partialFunctions, locale, contentParam, apiCollectionIdParam,true)
resourceDocs
getApiLevelResourceDocs(cc,requestedApiVersionString, tags, partialFunctions, locale, contentParam, apiCollectionIdParam,true)
}
}
@ -492,61 +490,63 @@ trait ResourceDocsAPIMethods extends MdcLoggable with APIMethods220 with APIMeth
Some(isVersion4OrHigher)
)
json <- locale match {
case _ if (apiCollectionIdParam.isDefined) =>
val operationIds = MappedApiCollectionEndpointsProvider.getApiCollectionEndpoints(apiCollectionIdParam.getOrElse("")).map(_.operationId).map(getObpFormatOperationId)
val resourceDocs = ResourceDoc.getResourceDocs(operationIds)
val resourceDocsJson = JSONFactory1_4_0.createResourceDocsJson(resourceDocs, isVersion4OrHigher, locale)
val resourceDocsJsonJValue = Full(resourceDocsJsonToJsonResponse(resourceDocsJson))
Future(resourceDocsJsonJValue.map(successJsonResponse(_)))
case _ if (apiCollectionIdParam.isDefined) =>
NewStyle.function.tryons(s"$UnknownError Can not prepare OBP resource docs.", 500, callContext) {
val operationIds = MappedApiCollectionEndpointsProvider.getApiCollectionEndpoints(apiCollectionIdParam.getOrElse("")).map(_.operationId).map(getObpFormatOperationId)
val resourceDocs = ResourceDoc.getResourceDocs(operationIds)
val resourceDocsJson = JSONFactory1_4_0.createResourceDocsJson(resourceDocs, isVersion4OrHigher, locale)
val resourceDocsJsonJValue = Full(resourceDocsJsonToJsonResponse(resourceDocsJson))
resourceDocsJsonJValue.map(successJsonResponse(_))
}
case _ =>
contentParam match {
case Some(DYNAMIC) =>{
val cacheValueFromRedis = Caching.getDynamicResourceDocCache(cacheKey)
val dynamicDocs: Box[JValue] =
if (cacheValueFromRedis.isDefined) {
Full(json.parse(cacheValueFromRedis.get))
} else {
val resourceDocJson = getResourceDocsObpDynamicCached(tags, partialFunctions, locale, None, false)
val resourceDocJsonJValue = resourceDocJson.map(resourceDocsJsonToJsonResponse).head
val jsonString = json.compactRender(resourceDocJsonJValue)
Caching.setDynamicResourceDocCache(cacheKey, jsonString)
Full(resourceDocJsonJValue)
}
Future(dynamicDocs.map(successJsonResponse(_)))
NewStyle.function.tryons(s"$UnknownError Can not prepare OBP resource docs.", 500, callContext) {
val cacheValueFromRedis = Caching.getDynamicResourceDocCache(cacheKey)
val dynamicDocs: Box[JValue] =
if (cacheValueFromRedis.isDefined) {
Full(json.parse(cacheValueFromRedis.get))
} else {
val resourceDocJson = getResourceDocsObpDynamicCached(tags, partialFunctions, locale, None, false)
val resourceDocJsonJValue = resourceDocJson.map(resourceDocsJsonToJsonResponse).head
val jsonString = json.compactRender(resourceDocJsonJValue)
Caching.setDynamicResourceDocCache(cacheKey, jsonString)
Full(resourceDocJsonJValue)
}
dynamicDocs.map(successJsonResponse(_))
}
}
case Some(STATIC) => {
val cacheValueFromRedis = Caching.getStaticResourceDocCache(cacheKey)
val staticDocs: Box[JValue] =
if (cacheValueFromRedis.isDefined) {
Full(json.parse(cacheValueFromRedis.get))
} else {
val resourceDocJson = getStaticResourceDocsObpCached(requestedApiVersionString, tags, partialFunctions, locale, isVersion4OrHigher)
val resourceDocJsonJValue = resourceDocJson.map(resourceDocsJsonToJsonResponse).head
val jsonString = json.compactRender(resourceDocJsonJValue)
Caching.setStaticResourceDocCache(cacheKey, jsonString)
Full(resourceDocJsonJValue)
}
Future(staticDocs.map(successJsonResponse(_)))
NewStyle.function.tryons(s"$UnknownError Can not prepare OBP resource docs.", 500, callContext) {
val cacheValueFromRedis = Caching.getStaticResourceDocCache(cacheKey)
val staticDocs: Box[JValue] =
if (cacheValueFromRedis.isDefined) {
Full(json.parse(cacheValueFromRedis.get))
} else {
val resourceDocJson = getStaticResourceDocsObpCached(requestedApiVersionString, tags, partialFunctions, locale, isVersion4OrHigher)
val resourceDocJsonJValue = resourceDocJson.map(resourceDocsJsonToJsonResponse).head
val jsonString = json.compactRender(resourceDocJsonJValue)
Caching.setStaticResourceDocCache(cacheKey, jsonString)
Full(resourceDocJsonJValue)
}
staticDocs.map(successJsonResponse(_))
}
}
case _ => {
val cacheValueFromRedis = Caching.getAllResourceDocCache(cacheKey)
val bothStaticAndDyamicDocs: Box[JValue] =
if (cacheValueFromRedis.isDefined) {
Full(json.parse(cacheValueFromRedis.get))
} else {
val resourceDocJson = getAllResourceDocsObpCached(requestedApiVersionString, tags, partialFunctions, locale, contentParam, isVersion4OrHigher)
val resourceDocJsonJValue = resourceDocJson.map(resourceDocsJsonToJsonResponse).head
val jsonString = json.compactRender(resourceDocJsonJValue)
Caching.setAllResourceDocCache(cacheKey, jsonString)
Full(resourceDocJsonJValue)
}
Future(bothStaticAndDyamicDocs.map(successJsonResponse(_)))
NewStyle.function.tryons(s"$UnknownError Can not prepare OBP resource docs.", 500, callContext) {
val cacheValueFromRedis = Caching.getAllResourceDocCache(cacheKey)
val bothStaticAndDyamicDocs: Box[JValue] =
if (cacheValueFromRedis.isDefined) {
Full(json.parse(cacheValueFromRedis.get))
} else {
val resourceDocJson = getAllResourceDocsObpCached(requestedApiVersionString, tags, partialFunctions, locale, contentParam, isVersion4OrHigher)
val resourceDocJsonJValue = resourceDocJson.map(resourceDocsJsonToJsonResponse).head
val jsonString = json.compactRender(resourceDocJsonJValue)
Caching.setAllResourceDocCache(cacheKey, jsonString)
Full(resourceDocJsonJValue)
}
bothStaticAndDyamicDocs.map(successJsonResponse(_))
}
}
}
}
@ -681,27 +681,7 @@ trait ResourceDocsAPIMethods extends MdcLoggable with APIMethods220 with APIMeth
} else {
Future.successful(true)
}
isVersion4OrHigher = true
resourceDocsJsonFiltered <- locale match {
case _ if (apiCollectionIdParam.isDefined) =>
val operationIds = MappedApiCollectionEndpointsProvider.getApiCollectionEndpoints(apiCollectionIdParam.getOrElse("")).map(_.operationId).map(getObpFormatOperationId)
val resourceDocs = ResourceDoc.getResourceDocs(operationIds)
val resourceDocsJson = JSONFactory1_4_0.createResourceDocsJson(resourceDocs, isVersion4OrHigher, locale)
Future(resourceDocsJson.resource_docs)
case _ =>
contentParam match {
case Some(DYNAMIC) =>
Future(getResourceDocsObpDynamicCached(resourceDocTags, partialFunctions, locale, None, isVersion4OrHigher).head.resource_docs)
case Some(STATIC) => {
Future(getStaticResourceDocsObpCached(requestedApiVersionString, resourceDocTags, partialFunctions, locale, isVersion4OrHigher).head.resource_docs)
}
case _ => {
Future(getAllResourceDocsObpCached(requestedApiVersionString, resourceDocTags, partialFunctions, locale, contentParam, isVersion4OrHigher).head.resource_docs)
}
}
}
cacheKey = APIUtil.createResourceDocCacheKey(
None,
requestedApiVersionString,
@ -710,14 +690,32 @@ trait ResourceDocsAPIMethods extends MdcLoggable with APIMethods220 with APIMeth
locale,
contentParam,
apiCollectionIdParam,
None
Some(isVersion4OrHigher)
)
swaggerJValue <- NewStyle.function.tryons(s"$UnknownError Can not convert internal swagger file.", 400, cc.callContext) {
val cacheValueFromRedis = Caching.getStaticSwaggerDocCache(cacheKey)
if (cacheValueFromRedis.isDefined) {
json.parse(cacheValueFromRedis.get)
} else {
cacheValueFromRedis = Caching.getStaticSwaggerDocCache(cacheKey)
swaggerJValue <- if (cacheValueFromRedis.isDefined) {
NewStyle.function.tryons(s"$UnknownError Can not convert internal swagger file from cache.", 400, cc.callContext) {json.parse(cacheValueFromRedis.get)}
} else {
NewStyle.function.tryons(s"$UnknownError Can not convert internal swagger file.", 400, cc.callContext) {
val resourceDocsJsonFiltered = locale match {
case _ if (apiCollectionIdParam.isDefined) =>
val operationIds = MappedApiCollectionEndpointsProvider.getApiCollectionEndpoints(apiCollectionIdParam.getOrElse("")).map(_.operationId).map(getObpFormatOperationId)
val resourceDocs = ResourceDoc.getResourceDocs(operationIds)
val resourceDocsJson = JSONFactory1_4_0.createResourceDocsJson(resourceDocs, isVersion4OrHigher, locale)
resourceDocsJson.resource_docs
case _ =>
contentParam match {
case Some(DYNAMIC) =>
getResourceDocsObpDynamicCached(resourceDocTags, partialFunctions, locale, None, isVersion4OrHigher).head.resource_docs
case Some(STATIC) => {
getStaticResourceDocsObpCached(requestedApiVersionString, resourceDocTags, partialFunctions, locale, isVersion4OrHigher).head.resource_docs
}
case _ => {
getAllResourceDocsObpCached(requestedApiVersionString, resourceDocTags, partialFunctions, locale, contentParam, isVersion4OrHigher).head.resource_docs
}
}
}
convertResourceDocsToSwaggerJvalueAndSetCache(cacheKey, requestedApiVersionString, resourceDocsJsonFiltered)
}
}

View File

@ -618,7 +618,7 @@ object SwaggerDefinitionsJSON {
val messageDocJson = MessageDocJson(
process = "getAccounts",
message_format = "KafkaV2017",
message_format = "rest_vMar2019",
inbound_topic = Some("from.obp.api.1.to.adapter.mf.caseclass.OutboundGetAccounts"),
outbound_topic = Some("to.obp.api.1.caseclass.OutboundGetAccounts"),
description = "get Banks",
@ -4020,7 +4020,7 @@ object SwaggerDefinitionsJSON {
user_auth_contexts = List(userAuthContextJson)
)
val obpApiLoopbackJson = ObpApiLoopbackJson("kafka_vSept2018","f0acd4be14cdcb94be3433ec95c1ad65228812a0","10 ms")
val obpApiLoopbackJson = ObpApiLoopbackJson("rest_vMar2019","f0acd4be14cdcb94be3433ec95c1ad65228812a0","10 ms")
val refresUserJson = RefreshUserJson("10 ms")

View File

@ -57,14 +57,6 @@ object Caching extends MdcLoggable {
}
}
def getLocalisedResourceDocCache(key: String) = {
use(JedisMethod.GET, (LOCALISED_RESOURCE_DOC_PREFIX + key).intern(), Some(CREATE_LOCALISED_RESOURCE_DOC_JSON_TTL))
}
def setLocalisedResourceDocCache(key:String, value: String)= {
use(JedisMethod.SET, (LOCALISED_RESOURCE_DOC_PREFIX+key).intern(), Some(CREATE_LOCALISED_RESOURCE_DOC_JSON_TTL), Some(value))
}
def getDynamicResourceDocCache(key: String) = {
use(JedisMethod.GET, (DYNAMIC_RESOURCE_DOC_CACHE_KEY_PREFIX + key).intern(), Some(GET_DYNAMIC_RESOURCE_DOCS_TTL))
}

View File

@ -13,7 +13,7 @@ import com.openbankproject.commons.ExecutionContext.Implicits.global
object InMemory extends MdcLoggable {
val underlyingGuavaCache = CacheBuilder.newBuilder().maximumSize(10000L).build[String, Object]
val underlyingGuavaCache = CacheBuilder.newBuilder().maximumSize(100000L).build[String, Object]
implicit val scalaCache = ScalaCache(GuavaCache(underlyingGuavaCache))
def memoizeSyncWithInMemory[A](cacheKey: Option[String])(@cacheKeyExclude ttl: Duration)(@cacheKeyExclude f: => A): A = {

View File

@ -2057,7 +2057,6 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
Glossary.glossaryItems.toList.sortBy(_.title)
}
// Used to document the KafkaMessage calls
case class MessageDoc(
process: String,
messageFormat: String,
@ -3389,7 +3388,6 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
/**
* This method is used for cache in connector level.
* eg: KafkaMappedConnector_vJune2017.bankTTL
* The default cache time unit is second.
*/
def getSecondsCache(cacheType: String) : Int = {

View File

@ -256,7 +256,6 @@ object DynamicUtil extends MdcLoggable{
|import code.api.dynamic.endpoint.helper.MockResponseHolder
|import code.bankconnectors._
|import code.customer.internalMapping.MappedCustomerIdMappingProvider
|import code.kafka.KafkaHelper
|import code.model.dataAccess.internalMapping.MappedAccountIdMappingProvider
|import code.util.AkkaHttpClient._
|import code.util.Helper.MdcLoggable

View File

@ -682,21 +682,15 @@ object ErrorMessages {
// Exceptions (OBP-50XXX)
val UnknownError = "OBP-50000: Unknown Error."
val FutureTimeoutException = "OBP-50001: Future Timeout Exception."
val KafkaMessageClassCastException = "OBP-50002: Kafka Response Message Class Cast Exception."
val AdapterOrCoreBankingSystemException = "OBP-50003: Adapter Or Core Banking System Exception. Failed to get a valid response from the south side Adapter or Core Banking System."
// This error may not be shown to user, just for debugging.
val CurrentUserNotFoundException = "OBP-50004: Method (AuthUser.getCurrentUser) can not find the current user in the current context!"
val AnUnspecifiedOrInternalErrorOccurred = "OBP-50005: An unspecified or internal error occurred."
val KafkaInterruptedException = "OBP-50006: Kafka interrupted exception."
val KafkaExecutionException = "OBP-50007: Kafka execution exception."
val KafkaStreamTimeoutException = "OBP-50008: Akka Kafka stream timeout exception."
val KafkaUnknownError = "OBP-50009: Kafka Unknown Error."
val ScalaEmptyBoxToLiftweb = "OBP-50010: Scala return Empty box to Liftweb."
val NoCallContext = "OBP-50012: Can not get the CallContext object here."
val UnspecifiedCbsError = "OBP-50013: The Core Banking System returned an unspecified error or response."
val RefreshUserError = "OBP-50014: Can not refresh User."
val InternalServerError = "OBP-50015: The server encountered an unexpected condition which prevented it from fulfilling the request."
val KafkaServerUnavailable = "OBP-50016: The kafka server is unavailable."
val NotAllowedEndpoint = "OBP-50017: The endpoint is forbidden at this API instance."
val UnderConstructionError = "OBP-50018: Under Construction Error."
val DatabaseConnectionClosedError = "OBP-50019: Cannot connect to the OBP database."
@ -727,7 +721,6 @@ object ErrorMessages {
val InvalidConnectorResponseForGetStatus = "OBP-50222: Connector method getStatus did not return the data we requested."
// Adapter Exceptions (OBP-6XXXX)
// Reserved for adapter (south of Kafka) messages
// Also used for connector == mapped, and show it as the Internal errors.
val GetStatusException = "OBP-60001: Save Transaction Exception. "
val GetChargeValueException = "OBP-60002: Get ChargeValue Exception. "

View File

@ -129,10 +129,10 @@ object Glossary extends MdcLoggable {
// NOTE! Some glossary items are defined in ExampleValue.scala
val latestKafkaConnector : String = "kafka_vSept2018"
val latestConnector : String = "rest_vMar2019"
def messageDocLink(process: String) : String = {
s"""<a href="/message-docs?connector=$latestKafkaConnector#$process">$process</a>"""
s"""<a href="/message-docs?connector=$latestConnector#$process">$process</a>"""
}
val latestAkkaConnector : String = "akka_vDec2018"
@ -171,8 +171,6 @@ object Glossary extends MdcLoggable {
|
|[Access Control](/glossary#API.Access-Control)
|
|[OBP Kafka](/glossary#Adapter.Kafka.Intro)
|
|[OBP Akka](/glossary#Adapter.Akka.Intro)
|
|[API Explorer](https://github.com/OpenBankProject/API-Explorer/blob/develop/README.md)
@ -289,159 +287,6 @@ object Glossary extends MdcLoggable {
|
""")
glossaryItems += GlossaryItem(
title = "Adapter.Kafka.Intro",
description =
s"""
|## Use Kafka as an interface between OBP and your Core Banking System (CBS).
|
|
|For an introduction to Kafka see [here](https://kafka.apache.org/)
|
|### Installation Prerequisites
|
|
|* You have OBP-API running and it is connected to a Kafka installation.
| You can check OBP -> Kafka connectivity using the <a href="/#OBPv3_1_0-getObpConnectorLoopback">"loopback" endpoint</a>.
|
|* Ideally you have API Explorer running (the application serving this page) but its not necessary - you could use any other REST client.
|* You might want to also run API Manager as it makes it easier to grant yourself roles, but its not necessary - you could use the API Explorer / any REST client instead.
|
|### Create a Customer User and an Admin User
|
|* Register a User who will use the API as a Customer.
|* Register another User that will use the API as an Admin. The Admin user will need some Roles. See [here](/index#OBPv2_0_0-addEntitlement). You can bootstrap an Admin user by editing the Props file. See the README for that.
|
|### Add some authentication context to the Customer User
|
|* As the Admin User, use the [Create Auth Context](/index#OBPv3_1_0-createUserAuthContext) endpoint to add one or more attributes to the Customer User.
|For instance you could add the name/value pair CUSTOMER_NUMBER/889763 and this will be sent to the Adapter / CBS inside the AuthInfo object.
|
|
|Now you should be able to use the [Get Auth Contexts](/index#OBPv3_1_0-getUserAuthContexts) endpoint to see the data you added.
|
|### Write or Build an Adapter to respond to the following messages.
|
| When getting started, we suggest that you implement the messages in the following order:
|
|1) Core (Prerequisites) - Get Adapter, Get Banks, Get Bank
|
|* ${messageDocLink("obp.getAdapterInfo")}
|
|Now you should be able to use the [Adapter Info](/index#OBPv3_1_0-getAdapterInfo) endpoint
|
|* ${messageDocLink("obp.getBanks")}
|
|Now you should be able to use the [Get Banks](/index#OBPv3_0_0-getBanks) endpoint
|
|* ${messageDocLink("obp.getBank")}
|
|Now you should be able to use the [Get Bank](/index#OBPv3_0_0-bankById) endpoint
|
|
|2) Core (Authentications) -The step1 Apis are all anonymous access. If you need to link bank customer data to the obp user,
| Then you need link OBP user with Bank user/customer using the [Create User Auth Context]((/index#OBPv3_1_0-createUserAuthContext)). Also
| check the description for this endpoint. Once you create the user-auth-context for one user, then these user-auth-context key value pair
| can be propagated over connector message. Than the Adapter can use it to map OBP user and Bank user/customer.
|
|* ${messageDocLink("obp.getBankAccountsForUser")}
|
|Now you should be able to use the [Refresh User](/index#OBPv3_1_0-refreshUser) endpoint
|
|3) Customers for logged in User
|
|* ${messageDocLink("obp.getCustomersByUserIdBox")}
|
|Now you should be able to use the [Get Customers](/index#OBPv3_0_0-getCustomersForUser) endpoint.
|
|
|4) Get Accounts
|
|Now you should already be able to use the [Get Accounts at Bank (IDs only).](/index#OBPv3_0_0-getPrivateAccountIdsbyBankId) endpoint.
|
|* ${messageDocLink("obp.getCoreBankAccounts")}
|
| The above messages should enable at least the following endpoints:
|
|* [Get Accounts at Bank (Minimal).](/index#OBPv3_0_0-privateAccountsAtOneBank)
|* [Get Accounts at all Banks (private)](/index#OBPv3_0_0-corePrivateAccountsAllBanks)
|
|5) Get Account
|
|* ${messageDocLink("obp.checkBankAccountExists")}
|* ${messageDocLink("obp.getBankAccount")}
|
| The above message should enable at least the following endpoints:
|
|* [Get Account by Id - Core](/index#OBPv3_0_0-getCoreAccountById)
|* [Get Account by Id - Full](/index#OBPv3_0_0-getPrivateAccountById)
|
|6) Get Transactions
|
|* ${messageDocLink("obp.getTransactions")}
|* ${messageDocLink("obp.getTransaction")}
|
|7) Manage Counterparties
|
|* ${messageDocLink("obp.getCounterparties")}
|* ${messageDocLink("obp.getCounterpartyByCounterpartyId")}
|* ${messageDocLink("obp.createCounterparty")}
|
|8) Get Transaction Request Types
|
|* This is configured using OBP Props - No messages required
|
|9) Get Challenge Threshold (CBS)
|
|* ${messageDocLink("obp.getChallengeThreshold")}
|
|10) Make Payment (used by Create Transaction Request)
|
|* ${messageDocLink("obp.makePaymentv210")}
|* This also requires 8,9,10 for high value payments.
|
|11) Get Transaction Requests.
|
|* ${messageDocLink("obp.getTransactionRequests210")}
|
|12) Generate Security Challenges (CBS)
|
|* ${messageDocLink("obp.createChallenge")}
|
|13) Answer Security Challenges (Validate)
|
|* Optional / Internal OBP (No additional messages required)
|
|14) Manage Counterparty Metadata
|
|* Internal OBP (No additional messages required)
|
|15) Get Entitlements
|
|* Internal OBP (No additional messages required)
|
|16) Manage Roles
|
|* Internal OBP (No additional messages required)
|
|17) Manage Entitlements
|
|* Internal OBP (No additional messages required)
|
|18) Manage Views
|
|* Internal OBP (No additional messages required)
|
|19) Manage Transaction Metadata
|
|* Internal OBP (No additional messages required)
|
|"""
)
glossaryItems += GlossaryItem(
title = "Adapter.Stored_Procedure.Intro",
description =
@ -488,14 +333,14 @@ object Glossary extends MdcLoggable {
|
|However, there are multiple available connector implementations - and you can also mix and create your own.|
|
|E.g. Kafka
|E.g. RabbitMq
|
|<pre>
|[=============] [============] [============] [============] [============]
|[ ] [ ] [ ] [ ] [ ]
|[ OBP API ] ===> Kafka Connector ===> [ Kafka ] ===> [ Kafka ] [ OBP Kafka ] ===> [ CBS ]
|[ OBP API ] ===> RabbitMq Connector ===> [ RabbitMq ] ===> [ RabbitMq ] [ OBP RabbitMq] ===> [ CBS ]
|[ ] Puts OBP Messages [ Connector ] [ Cluster ] [ Adapter ] [ ]
|[=============] onto a Kafka [============] [============] [============] [============]
|[=============] onto a RabbitMq [============] [============] [============] [============]
|
|</pre>
|
@ -691,7 +536,7 @@ object Glossary extends MdcLoggable {
|It SHOULD be a UUID. It MUST be unique in combination with the BANK_ID. ACCOUNT_ID is used in many URLS so it should be considered public.
|(We do NOT use account number in URLs since URLs are cached and logged all over the internet.)
|In local / sandbox mode, ACCOUNT_ID is generated as a UUID and stored in the database.
|In non sandbox modes (Kafka etc.), ACCOUNT_ID is mapped to core banking account numbers / identifiers at the South Side Adapter level.
|In non sandbox modes (RabbitMq etc.), ACCOUNT_ID is mapped to core banking account numbers / identifiers at the South Side Adapter level.
|ACCOUNT_ID is used to link Metadata and Views so it must be persistant and known to the North Side (OBP-API).
|
| Example value: ${accountIdExample.value}
@ -3172,7 +3017,7 @@ object Glossary extends MdcLoggable {
|
|The OBP Connector is a core part of the OBP-API and is written in Scala / Java and potentially other JVM languages.
|
|The OBP Connector implements multiple functions / methods in a style that satisfies a particular transport / protocol such as HTTP REST, Akka or Kafka.
|The OBP Connector implements multiple functions / methods in a style that satisfies a particular transport / protocol such as HTTP REST, Akka or RabbitMq.
|
|An OBP Adapter is a separate software component written in any programming language that responds to requests from the OBP Connector.
|
@ -3193,7 +3038,7 @@ object Glossary extends MdcLoggable {
| 1) The Name of the internal OBP function / method e.g. getAccountsForUser
| 2) The Outbound Message structure.
| 3) The Inbound Message structure.
| 4) The Connector name which denotes the protocol / transport used (e.g. REST, Akka, Kafka etc)
| 4) The Connector name which denotes the protocol / transport used (e.g. REST, Akka, RabbitMq etc)
| 5) Outbound / Inbound Topic
| 6) A list of required Inbound fields
| 7) A list of dependent endpoints.
@ -3233,7 +3078,7 @@ object Glossary extends MdcLoggable {
|This contains the named fields and their values which are specific to each Function / Message Doc.
|
|
|The Outbound / Inbound Topics are used for routing in multi OBP instance / Kafka installations. (so OBP nodes only listen only to the correct Topics).
|The Outbound / Inbound Topics are used for routing in multi OBP instance / RabbitMq installations. (so OBP nodes only listen only to the correct Topics).
|
|The dependent endpoints are listed to facilitate navigation in the API Explorer so integrators can test endpoints during integration.
|
@ -3247,7 +3092,7 @@ object Glossary extends MdcLoggable {
s"""
|
| Open Bank Project can have different connectors, to connect difference data sources.
| We support several sources at the moment, eg: databases, rest services, stored procedures and kafka.
| We support several sources at the moment, eg: databases, rest services, stored procedures and RabbitMq.
|
| If OBP set connector=star, then you can use this method routing to switch the sources.
| And we also provide the fields mapping in side the endpoints. If the fields in the source are different from connector,

View File

@ -1385,7 +1385,7 @@ object NewStyle extends MdcLoggable{
def getTransactionRequestImpl(transactionRequestId: TransactionRequestId, callContext: Option[CallContext]): OBPReturnType[TransactionRequest] =
{
//Note: this method is not over kafka yet, so use Future here.
//Note: this method is not over CBS yet, so use Future here.
Future{ Connector.connector.vend.getTransactionRequestImpl(transactionRequestId, callContext)} map {
unboxFullOrFail(_, callContext, s"$InvalidTransactionRequestId Current TransactionRequestId($transactionRequestId) ")
}

View File

@ -135,7 +135,7 @@ object WriteMetricUtil extends MdcLoggable {
Empty
}
// TODO This should use Elastic Search or Kafka not an RDBMS
// TODO This should use Elastic Search not an RDBMS
val u: User = user.orNull
val userId = if (u != null) u.userId else "null"
val userName = if (u != null) u.name else "null"

View File

@ -1,5 +1,6 @@
package code.api.v1_4_0
import code.api.Constant.{CREATE_LOCALISED_RESOURCE_DOC_JSON_TTL, LOCALISED_RESOURCE_DOC_PREFIX}
import code.api.berlin.group.v1_3.JvalueCaseClass
import code.api.cache.Caching
import java.util.Date
@ -519,17 +520,17 @@ object JSONFactory1_4_0 extends MdcLoggable{
jsonFieldsDescription.mkString(jsonTitleType,"","\n")
}
//cache key will only contain "operationId + locale"
def createLocalisedResourceDocJsonCached(
operationId: String, // this will be in the cacheKey
locale: Option[String],// this will be in the cacheKey
resourceDocUpdatedTags: ResourceDoc,
isVersion4OrHigher:Boolean,
isVersion4OrHigher:Boolean,// this will be in the cacheKey
urlParametersI18n:String ,
jsonRequestBodyFieldsI18n:String,
jsonResponseBodyFieldsI18n:String
): ResourceDocJson = {
val cacheKey = LOCALISED_RESOURCE_DOC_PREFIX + s"operationId:${operationId}-locale:$locale- isVersion4OrHigher:$isVersion4OrHigher".intern()
Caching.memoizeSyncWithImMemory(Some(cacheKey))(CREATE_LOCALISED_RESOURCE_DOC_JSON_TTL seconds) {
val fieldsDescription =
if (resourceDocUpdatedTags.tags.toString.contains("Dynamic-Entity")
|| resourceDocUpdatedTags.tags.toString.contains("Dynamic-Endpoint")
@ -588,7 +589,7 @@ object JSONFactory1_4_0 extends MdcLoggable{
logger.trace(s"createLocalisedResourceDocJsonCached value is $resourceDoc")
resourceDoc
}
}}
def createLocalisedResourceDocJson(rd: ResourceDoc, isVersion4OrHigher:Boolean, locale: Option[String], urlParametersI18n:String ,jsonRequestBodyFieldsI18n:String, jsonResponseBodyFieldsI18n:String) : ResourceDocJson = {
@ -596,26 +597,15 @@ object JSONFactory1_4_0 extends MdcLoggable{
val userDefinedEndpointTags = getAllEndpointTagsBox(rd.operationId).map(endpointTag =>ResourceDocTag(endpointTag.tagName))
val resourceDocWithUserDefinedEndpointTags: ResourceDoc = rd.copy(tags = userDefinedEndpointTags++ rd.tags)
val cacheKey = s"operationId:${resourceDocWithUserDefinedEndpointTags.operationId}-locale:$locale- isVersion4OrHigher:$isVersion4OrHigher".intern()
val cacheValueFromRedis = Caching.getLocalisedResourceDocCache(cacheKey)
if(cacheValueFromRedis.isDefined){
json.parse(cacheValueFromRedis.get).extract[ResourceDocJson]
}else{
val resourceDocJson = createLocalisedResourceDocJsonCached(
resourceDocWithUserDefinedEndpointTags.operationId,
locale: Option[String],
resourceDocWithUserDefinedEndpointTags,
isVersion4OrHigher: Boolean,
urlParametersI18n: String,
jsonRequestBodyFieldsI18n: String,
jsonResponseBodyFieldsI18n: String
)
val jsonString = json.compactRender(Extraction.decompose(resourceDocJson))
Caching.setLocalisedResourceDocCache(cacheKey,jsonString)
resourceDocJson
}
createLocalisedResourceDocJsonCached(
resourceDocWithUserDefinedEndpointTags.operationId,
locale: Option[String],
resourceDocWithUserDefinedEndpointTags,
isVersion4OrHigher: Boolean,
urlParametersI18n: String,
jsonRequestBodyFieldsI18n: String,
jsonResponseBodyFieldsI18n: String
)
}

View File

@ -438,12 +438,12 @@ trait APIMethods220 {
"GET",
"/message-docs/CONNECTOR",
"Get Message Docs",
"""These message docs provide example messages sent by OBP to the (Kafka) message queue for processing by the Core Banking / Payment system Adapter - together with an example expected response and possible error codes.
"""These message docs provide example messages sent by OBP to the (RabbitMq) message queue for processing by the Core Banking / Payment system Adapter - together with an example expected response and possible error codes.
| Integrators can use these messages to build Adapters that provide core banking services to OBP.
|
| Note: API Explorer provides a Message Docs page where these messages are displayed.
|
| `CONNECTOR`: kafka_vSept2018, stored_procedure_vDec2019 ...
| `CONNECTOR`: rest_vMar2019, stored_procedure_vDec2019 ...
""".stripMargin,
EmptyBody,
messageDocsJson,
@ -457,7 +457,7 @@ trait APIMethods220 {
implicit val ec = EndpointContext(Some(cc))
for {
connectorObject <- Future(tryo{Connector.getConnectorInstance(connector)}) map { i =>
val msg = s"$InvalidConnector Current Input is $connector. It should be eg: kafka_vSept2018..."
val msg = s"$InvalidConnector Current Input is $connector. It should be eg: rest_vMar2019..."
unboxFullOrFail(i, cc.callContext, msg)
}
} yield {

View File

@ -32,7 +32,6 @@ import code.consent.{ConsentRequests, ConsentStatus, Consents, MappedConsent}
import code.consumer.Consumers
import code.context.UserAuthContextUpdateProvider
import code.entitlement.Entitlement
import code.kafka.KafkaHelper
import code.loginattempts.LoginAttempt
import code.methodrouting.{MethodRouting, MethodRoutingCommons, MethodRoutingParam, MethodRoutingT}
import code.metrics.APIMetrics
@ -1862,14 +1861,7 @@ trait APIMethods310 {
"GET",
"/connector/loopback",
"Get Connector Status (Loopback)",
s"""This endpoint makes a call to the Connector to check the backend transport (e.g. Kafka) is reachable.
|
|Currently this is only implemented for Kafka based connectors.
|
|For Kafka based connectors, this endpoint writes a message to Kafka and reads it again.
|
|In the future, this endpoint may also return information about database connections etc.
|
s"""This endpoint makes a call to the Connector to check the backend transport is reachable. (WIP)
|
|${userAuthenticationMessage(true)}
|
@ -1888,12 +1880,13 @@ trait APIMethods310 {
(_, callContext) <- anonymousAccess(cc)
connectorVersion = APIUtil.getPropsValue("connector").openOrThrowException("connector props field `connector` not set")
starConnectorProps = APIUtil.getPropsValue("starConnector_supported_types").openOr("notfound")
obpApiLoopback <- connectorVersion.contains("kafka") || (connectorVersion.contains("star") && starConnectorProps.contains("kafka")) match {
case false => throw new IllegalStateException(s"${NotImplemented}for connector ${connectorVersion}")
case true => KafkaHelper.echoKafkaServer.recover {
case e: Throwable => throw new IllegalStateException(s"${KafkaServerUnavailable} Timeout error, because kafka do not return message to OBP-API. ${e.getMessage}")
}
}
//TODO we need to decide what kind of connector should we use.
obpApiLoopback = ObpApiLoopback(
connectorVersion ="Unknown",
gitCommit ="Unknown",
durationTime ="Unknown"
)
_ = throw new IllegalStateException(s"${NotImplemented}")
} yield {
(createObpApiLoopbackJson(obpApiLoopback), HttpCode.`200`(callContext))
}
@ -3181,14 +3174,6 @@ trait APIMethods310 {
implicit val ec = EndpointContext(Some(cc))
for {
(_, callContext) <- anonymousAccess(cc)
convertedToResourceDocs = RestConnector_vMar2019.messageDocs.map(toResourceDoc).toList
resourceDocListFiltered = ResourceDocsAPIMethodsUtil.filterResourceDocs(convertedToResourceDocs, resourceDocTags, partialFunctions)
resourceDocJsonList = JSONFactory1_4_0.createResourceDocsJson(resourceDocListFiltered, true, None).resource_docs
swaggerResourceDoc <- Future {SwaggerJSONFactory.createSwaggerResourceDoc(resourceDocJsonList, ApiVersion.v3_1_0)}
//For this connector swagger, it shares some basic fields with api swagger, eg: BankId, AccountId. So it need to merge here.
allSwaggerDefinitionCaseClasses = MessageDocsSwaggerDefinitions.allFields++SwaggerDefinitionsJSON.allFields
cacheKey = APIUtil.createResourceDocCacheKey(
None,
restConnectorVersion,
@ -3199,11 +3184,19 @@ trait APIMethods310 {
apiCollectionIdParam,
None
)
swaggerJValue <- NewStyle.function.tryons(s"$UnknownError Can not convert internal swagger file.", 400, cc.callContext) {
val cacheValueFromRedis = Caching.getStaticSwaggerDocCache(cacheKey)
if (cacheValueFromRedis.isDefined) {
cacheValueFromRedis = Caching.getStaticSwaggerDocCache(cacheKey)
swaggerJValue <- if (cacheValueFromRedis.isDefined) {
NewStyle.function.tryons(s"$UnknownError Can not convert internal swagger file from cache.", 400, cc.callContext) {
json.parse(cacheValueFromRedis.get)
} else {
}
} else {
NewStyle.function.tryons(s"$UnknownError Can not convert internal swagger file.", 400, cc.callContext) {
val convertedToResourceDocs = RestConnector_vMar2019.messageDocs.map(toResourceDoc).toList
val resourceDocListFiltered = ResourceDocsAPIMethodsUtil.filterResourceDocs(convertedToResourceDocs, resourceDocTags, partialFunctions)
val resourceDocJsonList = JSONFactory1_4_0.createResourceDocsJson(resourceDocListFiltered, true, None).resource_docs
val swaggerResourceDoc = SwaggerJSONFactory.createSwaggerResourceDoc(resourceDocJsonList, ApiVersion.v3_1_0)
//For this connector swagger, it shares some basic fields with api swagger, eg: BankId, AccountId. So it need to merge here.
val allSwaggerDefinitionCaseClasses = MessageDocsSwaggerDefinitions.allFields ++ SwaggerDefinitionsJSON.allFields
val jsonAST = SwaggerJSONFactory.loadDefinitions(resourceDocJsonList, allSwaggerDefinitionCaseClasses)
val swaggerDocJsonJValue = Extraction.decompose(swaggerResourceDoc) merge jsonAST
val jsonString = json.compactRender(swaggerDocJsonJValue)
@ -3211,7 +3204,6 @@ trait APIMethods310 {
swaggerDocJsonJValue
}
}
} yield {
// Merge both results and return
(swaggerJValue, HttpCode.`200`(callContext))

View File

@ -1043,7 +1043,7 @@ trait APIMethods400 extends MdcLoggable {
|4) `answer` : must be `123` in case that Strong Customer Authentication method for OTP challenge is dummy.
| For instance: SANDBOX_TAN_OTP_INSTRUCTION_TRANSPORT=dummy
| Possible values are dummy,email and sms
| In kafka mode, the answer can be got by phone message or other SCA methods.
| In CBS mode, the answer can be got by phone message or other SCA methods.
|
|Note that each Transaction Request Type can have its own OTP_INSTRUCTION_TRANSPORT method.
|OTP_INSTRUCTION_TRANSPORT methods are set in Props. See sample.props.template for instructions.

View File

@ -12,8 +12,6 @@ import code.bankconnectors.akka.AkkaConnector_vDec2018
import code.bankconnectors.rabbitmq.RabbitMQConnector_vOct2024
import code.bankconnectors.rest.RestConnector_vMar2019
import code.bankconnectors.storedprocedure.StoredProcedureConnector_vDec2019
import code.bankconnectors.vMay2019.KafkaMappedConnector_vMay2019
import code.bankconnectors.vSept2018.KafkaMappedConnector_vSept2018
import com.openbankproject.commons.model.CounterpartyLimitTrait
import com.openbankproject.commons.model.CustomerAccountLinkTrait
import com.openbankproject.commons.model.EndpointTagT
@ -46,7 +44,7 @@ import scala.reflect.runtime.universe.{MethodSymbol, typeOf}
So we can switch between different sources of resources e.g.
- Mapper ORM for connecting to RDBMS (via JDBC) https://www.assembla.com/wiki/show/liftweb/Mapper
- MongoDB
- KafkaMQ
- RabbitMq
etc.
Note: We also have individual providers for resources like Branches and Products.
@ -64,8 +62,6 @@ object Connector extends SimpleInjector {
val nameToConnector: Map[String, Connector] = Map(
"mapped" -> LocalMappedConnector,
"akka_vDec2018" -> AkkaConnector_vDec2018,
"kafka_vSept2018" -> KafkaMappedConnector_vSept2018,
"kafka_vMay2019" -> KafkaMappedConnector_vMay2019,
"rest_vMar2019" -> RestConnector_vMar2019,
"stored_procedure_vDec2019" -> StoredProcedureConnector_vDec2019,
"rabbitmq_vOct2024" -> RabbitMQConnector_vOct2024,
@ -695,8 +691,6 @@ trait Connector extends MdcLoggable {
callContext: Option[CallContext]
): OBPReturnType[Box[PhysicalCardTrait]] = Future{(Failure{setUnimplementedError(nameOf(updatePhysicalCard _))}, callContext)}
//Note: introduce v210 here, is for kafka connectors, use callContext and return Future.
def makePaymentv210(fromAccount: BankAccount,
toAccount: BankAccount,
transactionRequestId: TransactionRequestId,

View File

@ -1311,7 +1311,7 @@ object LocalMappedConnector extends Connector with MdcLoggable {
/**
* This is used for create or update the special bankAccount for COUNTERPARTY stuff (toAccountProvider != "OBP") and (Connector = Kafka)
* This is used for create or update the special bankAccount for COUNTERPARTY stuff (toAccountProvider != "OBP") and (Connector = RabbitMq)
* details in createTransactionRequest - V210 ,case COUNTERPARTY.toString
*
*/

View File

@ -3,7 +3,6 @@ package code.bankconnectors.generator
import code.api.util.CodeGenerateUtils.createDocExample
import code.api.util.{APIUtil, CallContext}
import code.bankconnectors.{Connector, LocalMappedConnector}
import code.bankconnectors.vSept2018.KafkaMappedConnector_vSept2018
import com.openbankproject.commons.util.ReflectUtils
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils.uncapitalize
@ -192,7 +191,7 @@ object ConnectorBuilderUtil {
private[this] val cacheMethodName = if(resultType.startsWith("Box[")) "memoizeSyncWithProvider" else "memoizeWithProvider"
private[this] val timeoutFieldName = uncapitalize(methodName.replaceFirst("^[a-z]+", "")) + "TTL"
private[this] val cacheTimeout = ReflectUtils.findMethod(ru.typeOf[KafkaMappedConnector_vSept2018], timeoutFieldName)(_ => true)
private[this] val cacheTimeout = ReflectUtils.findMethod(ru.typeOf[code.bankconnectors.rabbitmq.RabbitMQConnector_vOct2024], timeoutFieldName)(_ => true)
.map(_.name.toString)
.getOrElse("accountTTL")

View File

@ -54,12 +54,12 @@ trait RabbitMQConnector_vOct2024 extends Connector with MdcLoggable {
implicit override val nameOfConnector = RabbitMQConnector_vOct2024.toString
// "Versioning" of the messages sent by this or similar connector works like this:
// Use Case Classes (e.g. KafkaInbound... KafkaOutbound... as below to describe the message structures.
// Use Case Classes (e.g. Inbound... Outbound... as below to describe the message structures.
// Each connector has a separate file like this one.
// Once the message format is STABLE, freeze the key/value pair names there. For now, new keys may be added but none modified.
// If we want to add a new message format, create a new file e.g. March2017_messages.scala
// Then add a suffix to the connector value i.e. instead of kafka we might have kafka_march_2017.
// Then in this file, populate the different case classes depending on the connector name and send to Kafka
// Then add a suffix to the connector value i.e. instead of RabbitMq we might have rest_vMar2019.
// Then in this file, populate the different case classes depending on the connector name and send to CBS
val messageFormat: String = "Oct2024"
override val messageDocs = ArrayBuffer[MessageDoc]()

View File

@ -42,7 +42,6 @@ import code.api.util.{APIUtil, CallContext, OBPQueryParam}
import code.bankconnectors._
import code.context.UserAuthContextProvider
import code.customer.internalMapping.MappedCustomerIdMappingProvider
import code.kafka.KafkaHelper
import code.model.dataAccess.internalMapping.MappedAccountIdMappingProvider
import code.util.AkkaHttpClient._
import code.util.Helper
@ -72,19 +71,19 @@ import scala.language.postfixOps
import scala.reflect.runtime.universe._
trait RestConnector_vMar2019 extends Connector with KafkaHelper with MdcLoggable {
trait RestConnector_vMar2019 extends Connector with MdcLoggable {
//this one import is for implicit convert, don't delete
import com.openbankproject.commons.model.{AmountOfMoney, CreditLimit, CreditRating, CustomerFaceImage}
implicit override val nameOfConnector = RestConnector_vMar2019.toString
// "Versioning" of the messages sent by this or similar connector works like this:
// Use Case Classes (e.g. KafkaInbound... KafkaOutbound... as below to describe the message structures.
// Use Case Classes (e.g. Inbound... Outbound... as below to describe the message structures.
// Each connector has a separate file like this one.
// Once the message format is STABLE, freeze the key/value pair names there. For now, new keys may be added but none modified.
// If we want to add a new message format, create a new file e.g. March2017_messages.scala
// Then add a suffix to the connector value i.e. instead of kafka we might have kafka_march_2017.
// Then in this file, populate the different case classes depending on the connector name and send to Kafka
// Then add a suffix to the connector value i.e. instead of Rest we might have rest_vMar2019.
// Then in this file, populate the different case classes depending on the connector name and send to rest_vMar2019
val messageFormat: String = "March2019"
override val messageDocs = ArrayBuffer[MessageDoc]()

View File

@ -59,12 +59,12 @@ trait StoredProcedureConnector_vDec2019 extends Connector with MdcLoggable {
implicit override val nameOfConnector = StoredProcedureConnector_vDec2019.toString
// "Versioning" of the messages sent by this or similar connector works like this:
// Use Case Classes (e.g. KafkaInbound... KafkaOutbound... as below to describe the message structures.
// Use Case Classes (e.g. Inbound... Outbound... as below to describe the message structures.
// Each connector has a separate file like this one.
// Once the message format is STABLE, freeze the key/value pair names there. For now, new keys may be added but none modified.
// If we want to add a new message format, create a new file e.g. March2017_messages.scala
// Then add a suffix to the connector value i.e. instead of kafka we might have kafka_march_2017.
// Then in this file, populate the different case classes depending on the connector name and send to Kafka
// Then add a suffix to the connector value i.e. instead of Rest we might have rest_vMar2019.
// Then in this file, populate the different case classes depending on the connector name and send to rest_vMar2019
val messageFormat: String = "Dec2019"
override val messageDocs = ArrayBuffer[MessageDoc]()

View File

@ -1,31 +0,0 @@
package code.bankconnectors.vMay2019
import code.bankconnectors.generator.ConnectorBuilderUtil._
import scala.collection.immutable.List
import scala.language.postfixOps
object KafkaConnectorBuilder extends App {
val genMethodNames = List(
"getAdapterInfo",
"getBank",
"getBanks",
"getBankAccountsBalances",
"getBranch",
"getBranches",
"getAtm",
"getAtms",
"getCustomersByUserId",
"getCustomerByCustomerId",
"getCustomerByCustomerNumber"
)
generateMethods(commonMethodNames,
"src/main/scala/code/bankconnectors/vMay2019/KafkaMappedConnector_vMay2019.scala",
"processRequest[InBound](req)", true)
}

View File

@ -1,19 +0,0 @@
package code.bankconnectors.vSept2018
import code.bankconnectors.generator.ConnectorBuilderUtil._
import scala.collection.immutable.List
import scala.language.postfixOps
object KafkaConnectorBuilder extends App {
generateMethods(commonMethodNames,
"src/main/scala/code/bankconnectors/vSept2018/KafkaMappedConnector_vSept2018.scala",
"processRequest[InBound](req)", true)
}

View File

@ -1,457 +0,0 @@
package code.bankconnectors.vSept2018
import java.util.Date
import code.api.util.APIUtil
import code.branches.Branches.{DriveUpString, LobbyString}
import code.model.dataAccess.MappedBankAccountData
import com.openbankproject.commons.model.{CounterpartyTrait, Customer, UserAuthContext, _}
import net.liftweb.mapper.By
import net.liftweb.util.Helpers.today
import scala.collection.immutable.List
/**
* case classes used to define topics, these are outbound kafka messages
*/
case class OutboundGetAdapterInfo(date: String) extends TopicTrait
case class OutboundGetBanks(authInfo: AuthInfo) extends TopicTrait
case class OutboundGetBank(authInfo: AuthInfo, bankId: String) extends TopicTrait
case class OutboundGetUserByUsernamePassword(authInfo: AuthInfo, password: String) extends TopicTrait
case class OutboundGetAccounts(authInfo: AuthInfo, customers:InternalBasicCustomers) extends TopicTrait
case class OutboundGetAccountbyAccountID(authInfo: AuthInfo, bankId: String, accountId: String)extends TopicTrait
case class OutboundCheckBankAccountExists(authInfo: AuthInfo, bankId: String, accountId: String)extends TopicTrait
case class OutboundGetCoreBankAccounts(authInfo: AuthInfo, bankIdAccountIds: List[BankIdAccountId])extends TopicTrait
case class OutboundGetBankAccountsHeld(authInfo: AuthInfo, bankIdAccountIds: List[BankIdAccountId])extends TopicTrait
case class OutboundGetTransactions(authInfo: AuthInfo,bankId: String, accountId: String, limit: Int, fromDate: String, toDate: String) extends TopicTrait
case class OutboundGetTransaction(authInfo: AuthInfo, bankId: String, accountId: String, transactionId: String) extends TopicTrait
case class OutboundGetBranches(authInfo: AuthInfo,bankId: String) extends TopicTrait
case class OutboundGetBranch(authInfo: AuthInfo, bankId: String, branchId: String)extends TopicTrait
case class OutboundGetAtms(authInfo: AuthInfo,bankId: String) extends TopicTrait
case class OutboundGetAtm(authInfo: AuthInfo,bankId: String, atmId: String) extends TopicTrait
case class OutboundGetChallengeThreshold(
authInfo: AuthInfo,
bankId: String,
accountId: String,
viewId: String,
transactionRequestType: String,
currency: String,
userId: String,
userName: String
) extends TopicTrait
case class OutboundCreateTransaction(
authInfo: AuthInfo,
// fromAccount
fromAccountBankId : String,
fromAccountId : String,
// transaction details
transactionRequestType: String,
transactionChargePolicy: String,
transactionRequestCommonBody: TransactionRequestCommonBodyJSON,
// toAccount or toCounterparty
toCounterpartyId: String,
toCounterpartyName: String,
toCounterpartyCurrency: String,
toCounterpartyRoutingAddress: String,
toCounterpartyRoutingScheme: String,
toCounterpartyBankRoutingAddress: String,
toCounterpartyBankRoutingScheme: String
) extends TopicTrait
case class OutboundCreateChallengeSept2018(
authInfo: AuthInfo,
bankId: String,
accountId: String,
userId: String,
username: String,
transactionRequestType: String,
transactionRequestId: String
) extends TopicTrait
case class OutboundCreateCounterparty(
authInfo: AuthInfo,
counterparty: OutboundCounterparty
) extends TopicTrait
case class OutboundGetTransactionRequests210(
authInfo: AuthInfo,
counterparty: OutboundTransactionRequests
) extends TopicTrait
case class OutboundGetCounterparties(
authInfo: AuthInfo,
counterparty: InternalOutboundGetCounterparties
) extends TopicTrait
case class OutboundGetCounterpartyByCounterpartyId(
authInfo: AuthInfo,
counterparty: OutboundGetCounterpartyById
) extends TopicTrait
case class OutboundGetCounterparty(authInfo: AuthInfo, thisBankId: String, thisAccountId: String, counterpartyId: String) extends TopicTrait
case class OutboundGetCustomersByUserId(
authInfo: AuthInfo
) extends TopicTrait
case class OutboundGetCheckbookOrderStatus(
authInfo: AuthInfo,
bankId: String,
accountId: String,
originatorApplication: String,
originatorStationIP: String,
primaryAccount: String
)extends TopicTrait
case class OutboundGetCreditCardOrderStatus(
authInfo: AuthInfo,
bankId: String,
accountId: String,
originatorApplication: String,
originatorStationIP: String,
primaryAccount: String
)extends TopicTrait
/**
* case classes used in Kafka message, these are InBound Kafka messages
*/
//AdapterInfo has no AuthInfo, because it just get data from Adapter, no need for AuthInfo
case class InboundAdapterInfo(data: InboundAdapterInfoInternal)
case class InboundGetUserByUsernamePassword(inboundAuthInfo: InboundAuthInfo, data: InboundValidatedUser)
case class InboundGetBanks(inboundAuthInfo: InboundAuthInfo, status: Status,data: List[InboundBank])
case class InboundGetBank(inboundAuthInfo: InboundAuthInfo, status: Status, data: InboundBank)
case class InboundGetAccounts(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[InboundAccountSept2018])
case class InboundGetAccountbyAccountID(inboundAuthInfo: InboundAuthInfo, status: Status, data: Option[InboundAccountSept2018])
case class InboundGetBankAccountsHeld(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[AccountHeld])
case class InboundCheckBankAccountExists(inboundAuthInfo: InboundAuthInfo, status: Status, data: Option[InboundAccountSept2018])
case class InboundGetCoreBankAccounts(inboundAuthInfo: InboundAuthInfo, data: List[InternalInboundCoreAccount])
case class InboundGetTransactions(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[InternalTransaction_vSept2018])
case class InboundGetTransaction(inboundAuthInfo: InboundAuthInfo, status: Status, data: Option[InternalTransaction_vSept2018])
case class InboundCreateChallengeSept2018(inboundAuthInfo: InboundAuthInfo, data: InternalCreateChallengeSept2018)
case class InboundCreateCounterparty(inboundAuthInfo: InboundAuthInfo, status: Status, data: Option[InternalCounterparty])
case class InboundGetTransactionRequests210(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[TransactionRequest])
case class InboundGetCounterparties(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[InternalCounterparty])
case class InboundGetCounterparty(inboundAuthInfo: InboundAuthInfo, status: Status, data: Option[InternalCounterparty])
case class InboundGetCustomersByUserId(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[InternalCustomer])
case class InboundGetBranches(inboundAuthInfo: InboundAuthInfo,status: Status,data: List[InboundBranchVSept2018])
case class InboundGetBranch(inboundAuthInfo: InboundAuthInfo,status: Status, data: Option[InboundBranchVSept2018])
case class InboundGetAtms(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[InboundAtmSept2018])
case class InboundGetAtm(inboundAuthInfo: InboundAuthInfo, status: Status, data: Option[InboundAtmSept2018])
case class InboundGetChecksOrderStatus(inboundAuthInfo: InboundAuthInfo, status: Status, data: CheckbookOrdersJson)
case class InboundGetCreditCardOrderStatus(inboundAuthInfo: InboundAuthInfo, status: Status, data: List[InboundCardDetails])
case class InboundGetChallengeThreshold(inboundAuthInfo: InboundAuthInfo, status: Status, data: AmountOfMoney)
////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////
// These are case classes, used in internal message mapping
case class InternalInboundCoreAccount(
errorCode: String,
backendMessages: List[InboundStatusMessage],
id : String,
label : String,
bankId : String,
accountType: String,
accountRoutings: List[AccountRouting]
)
case class InboundAuthInfo(
cbsToken: String = "",
sessionId: String = ""
)
case class InboundAccountSept2018(
errorCode: String,
cbsToken: String, //TODO, this maybe move to AuthInfo, but it is used in GatewayLogin
bankId: String,
branchId: String,
accountId: String,
accountNumber: String,
accountType: String,
balanceAmount: String,
balanceCurrency: String,
owners: List[String],
viewsToGenerate: List[String],
bankRoutingScheme: String,
bankRoutingAddress: String,
branchRoutingScheme: String,
branchRoutingAddress: String,
accountRoutingScheme: String,
accountRoutingAddress: String,
accountRouting: List[AccountRouting],
accountRules: List[AccountRule]
) extends InboundMessageBase with InboundAccount
case class BankAccountSept2018(r: InboundAccountSept2018) extends BankAccount {
def accountId: AccountId = AccountId(r.accountId)
def accountType: String = r.accountType
def balance: BigDecimal = BigDecimal(r.balanceAmount)
def currency: String = r.balanceCurrency
def name: String = r.owners.head
// Note: swift_bic--> swiftBic, but it extends from BankAccount
def swift_bic: Option[String] = Some("swift_bic")
// Note: deprecated, extends from BankAccount
def iban: Option[String] = Some("iban")
def number: String = r.accountNumber
def bankId: BankId = BankId(r.bankId)
def lastUpdate: Date = APIUtil.DateWithMsFormat.parse(today.getTime.toString)
def accountHolder: String = r.owners.head
// Fields modifiable from OBP are stored in mapper
def label: String = (for {
d <- MappedBankAccountData.find(By(MappedBankAccountData.accountId, r.accountId))
} yield {
d.getLabel
}).getOrElse(r.accountNumber)
def accountRoutingScheme: String = r.accountRoutingScheme
def accountRoutingAddress: String = r.accountRoutingAddress
def accountRoutings: List[AccountRouting] = List()
def branchId: String = r.branchId
def accountRules: List[AccountRule] = r.accountRules
}
case class InternalCreateChallengeSept2018(
errorCode: String,
backendMessages: List[InboundStatusMessage],
answer : String
)
case class InternalGetTransactionRequests(
errorCode: String,
backendMessages: List[InboundStatusMessage],
transactionRequests:List[TransactionRequest]
)
case class OutboundCounterparty(
name: String,
description: String,
currency: String,
createdByUserId: String,
thisBankId: String,
thisAccountId: String,
thisViewId: String,
otherAccountRoutingScheme: String,
otherAccountRoutingAddress: String,
otherAccountSecondaryRoutingScheme: String,
otherAccountSecondaryRoutingAddress: String,
otherBankRoutingScheme: String,
otherBankRoutingAddress: String,
otherBranchRoutingScheme: String,
otherBranchRoutingAddress: String,
isBeneficiary:Boolean,
bespoke: List[CounterpartyBespoke]
)
case class InternalOutboundGetCounterparties(
thisBankId: String,
thisAccountId: String,
viewId :String
)
case class OutboundGetCounterpartyById(
counterpartyId : String
)
case class OutboundTransactionRequests(
accountId: String,
accountType: String,
currency: String,
iban: String,
number: String,
bankId: String,
branchId: String,
accountRoutingScheme: String,
accountRoutingAddress: String
)
case class InternalCounterparty(
createdByUserId: String,
name: String,
thisBankId: String,
thisAccountId: String,
thisViewId: String,
counterpartyId: String,
otherAccountRoutingScheme: String,
otherAccountRoutingAddress: String,
otherBankRoutingScheme: String,
otherBankRoutingAddress: String,
otherBranchRoutingScheme: String,
otherBranchRoutingAddress: String,
isBeneficiary: Boolean,
description: String,
currency: String,
otherAccountSecondaryRoutingScheme: String,
otherAccountSecondaryRoutingAddress: String,
bespoke: List[CounterpartyBespoke]) extends CounterpartyTrait
case class InboundBranchVSept2018(
branchId: BranchId,
bankId: BankId,
name: String,
address: Address,
location: Location,
lobbyString: Option[LobbyString],
driveUpString: Option[DriveUpString],
meta: Meta,
branchRouting: Option[Routing],
lobby: Option[Lobby],
driveUp: Option[DriveUp],
// Easy access for people who use wheelchairs etc.
isAccessible : Option[Boolean],
accessibleFeatures: Option[String],
branchType : Option[String],
moreInfo : Option[String],
phoneNumber : Option[String],
isDeleted : Option[Boolean]
) extends BranchT
case class InboundAtmSept2018(
atmId : AtmId,
bankId : BankId,
name : String,
address : Address,
location : Location,
meta : Meta,
OpeningTimeOnMonday : Option[String],
ClosingTimeOnMonday : Option[String],
OpeningTimeOnTuesday : Option[String],
ClosingTimeOnTuesday : Option[String],
OpeningTimeOnWednesday : Option[String],
ClosingTimeOnWednesday : Option[String],
OpeningTimeOnThursday : Option[String],
ClosingTimeOnThursday: Option[String],
OpeningTimeOnFriday : Option[String],
ClosingTimeOnFriday : Option[String],
OpeningTimeOnSaturday : Option[String],
ClosingTimeOnSaturday : Option[String],
OpeningTimeOnSunday: Option[String],
ClosingTimeOnSunday : Option[String],
isAccessible : Option[Boolean],
locatedAt : Option[String],
moreInfo : Option[String],
hasDepositCapability : Option[Boolean],
supportedLanguages: Option[List[String]]= None,
services: Option[List[String]] = None,
accessibilityFeatures: Option[List[String]] = None,
supportedCurrencies: Option[List[String]] = None,
notes: Option[List[String]] = None,
minimumWithdrawal: Option[String] = None,
branchIdentification: Option[String] = None,
locationCategories: Option[List[String]] = None,
siteIdentification: Option[String] = None,
siteName: Option[String] = None,
cashWithdrawalNationalFee: Option[String] = None,
cashWithdrawalInternationalFee: Option[String] = None,
balanceInquiryFee: Option[String] = None,
atmType: Option[String] = None,
phone: Option[String] = None,
) extends AtmT
case class InternalTransaction_vSept2018(
transactionId: String,
accountId: String,
amount: String,
bankId: String,
completedDate: String,
counterpartyId: String,
counterpartyName: String,
currency: String,
description: String,
newBalanceAmount: String,
newBalanceCurrency: String,
postedDate: String,
`type`: String,
userId: String
)
case class InboundCardDetails(
orderId: String,
creditCardType: String,
cardDescription: String,
useType: String,
orderDate: String,
deliveryStatus: String,
statusDate: String,
branch: String
)
case class InternalTransactionId(
id : String
)
case class InboundCreateTransactionId(inboundAuthInfo: InboundAuthInfo, status: Status, data: InternalTransactionId)
object JsonFactory_vSept2018 {
def createCustomerJson(customer : Customer) : InternalBasicCustomer = {
InternalBasicCustomer(
bankId=customer.bankId,
customerId = customer.customerId,
customerNumber = customer.number,
legalName = customer.legalName,
dateOfBirth = customer.dateOfBirth
)
}
def createUserJson(user : User) : InternalBasicUser = {
InternalBasicUser(
user.userId,
user.emailAddress,
user.name,
)
}
def createBasicCustomerJson(customer : Customer) : BasicCustomer = {
BasicCustomer(
customerId = customer.customerId,
customerNumber = customer.number,
legalName = customer.legalName,
)
}
def createBasicUserAuthContext(userAuthContest : UserAuthContext) : BasicUserAuthContext = {
BasicUserAuthContext(
key = userAuthContest.key,
value = userAuthContest.value
)
}
def createCustomersJson(customers : List[Customer]) : InternalBasicCustomers = {
InternalBasicCustomers(customers.map(createCustomerJson))
}
def createUsersJson(users : List[User]) : InternalBasicUsers = {
InternalBasicUsers(users.map(createUserJson))
}
def createBasicCustomerJson(customers : List[Customer]) : List[BasicCustomer] = {
customers.map(createBasicCustomerJson)
}
def createBasicUserAuthContextJson(userAuthContexts : List[UserAuthContext]) : List[BasicUserAuthContext] = {
userAuthContexts.map(createBasicUserAuthContext)
}
}

View File

@ -1,22 +0,0 @@
package code.kafka
import code.api.Constant
import code.api.util.{APIUtil, ErrorMessages}
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}
/**
* Basic kafka configuration utility
*/
trait KafkaConfig {
val bootstrapServers = APIUtil.getPropsValue("kafka.bootstrap_hosts")openOr("localhost:9092")
val groupId = APIUtil.getPropsValue("kafka.group.id").openOr("obp-api")
val apiInstanceId = Constant.ApiInstanceId
val partitions = APIUtil.getPropsAsIntValue("kafka.partitions", 10)
val clientId = s"obp.api.$apiInstanceId"
val autoOffsetResetConfig = "earliest"
val maxWakeups = 50
//TODO should be less then container's timeout
val completionTimeout = FiniteDuration(APIUtil.getPropsAsIntValue("kafka.akka.timeout", 2)*1000 - 450, MILLISECONDS)
}

View File

@ -1,168 +0,0 @@
package code.kafka
import akka.pattern.{AskTimeoutException, ask}
import code.actorsystem.{ObpActorInit, ObpLookupSystem}
import code.api.APIFailureNewStyle
import code.api.util.APIUtil.{fullBoxOrException, gitCommit, unboxFull, unboxFullOrFail}
import code.api.util.{APIUtil, CallContext, CustomJsonFormats}
import code.api.util.ErrorMessages._
import code.util.Helper.MdcLoggable
import com.openbankproject.commons.model.{ObpApiLoopback, TopicTrait}
import net.liftweb
import net.liftweb.common._
import net.liftweb.json
import net.liftweb.json.JsonAST.JNull
import net.liftweb.json.{Extraction, JValue, MappingException}
import scala.concurrent.Future
import net.liftweb.json.JsonParser.ParseException
import net.liftweb.util.Helpers
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors._
object KafkaHelper extends KafkaHelper
trait KafkaHelper extends ObpActorInit with MdcLoggable {
override val actorName = "KafkaStreamsHelperActor" //CreateActorNameFromClassName(this.getClass.getName)
override val actor = ObpLookupSystem.getKafkaActor(actorName)
/**
* Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala
* In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class
* eg: case class Company(name: String, address: String) -->
* Company("TESOBE","Berlin")
* Map(name->"TESOBE", address->"2")
*
* @param caseClassObject
* @return Map[String, String]
*/
def transferCaseClassToMap(caseClassObject: scala.Product) =
caseClassObject.getClass.getDeclaredFields.map(_.getName) // all field names
.zip(caseClassObject.productIterator.to).toMap.asInstanceOf[Map[String, String]] // zipped with all values
def process(request: scala.Product): JValue = {
val mapRequest:Map[String, String] = transferCaseClassToMap(request)
process(mapRequest)
}
/**
* This function is used for Old Style Endpoints.
* It processes Kafka's Outbound message to JValue.
* @param request The request we send to Kafka
* @return Kafka's Inbound message as JValue
*/
def process (request: Map[String, String]): JValue = {
val boxedJValue = processToBox(request)
fullBoxOrException(boxedJValue)
// fullBoxOrException(boxedJValue) already process Empty and Failure, So the follow throw exception message just a stub.
boxedJValue.openOrThrowException("future extraction to box failed")
}
/**
* This function is used for Old Style Endpoints at Kafka connector.
* It processes Kafka's Outbound message to JValue wrapped into Box.
* @param request The request we send to Kafka
* @return Kafka's Inbound message as JValue wrapped into Box
*/
def processToBox(request: Any): Box[JValue] = {
extractFutureToBox[JValue](actor ? request)
}
/**
* This function is used for Old Style Endpoints at Kafka connector.
* It processes Kafka's Outbound message to JValue wrapped into Box.
* @param request The request we send to Kafka
* @tparam T the type of the Outbound message
* @return Kafka's Inbound message as JValue wrapped into Future
*/
def processToFuture[T](request: T): Future[JValue] = {
(actor ? request).mapTo[JValue]
}
/**
* This function is used for send request to kafka, and get the result extract to Box result.
* It processes Kafka's Outbound message to JValue wrapped into Box.
* @param request The request we send to Kafka
* @tparam T the type of the Inbound message
* @return Kafka's Inbound message into Future
*/
def processRequest[T: Manifest](request: TopicTrait): Future[Box[T]] = {
import com.openbankproject.commons.ExecutionContext.Implicits.global
import liftweb.json.compactRender
implicit val formats = CustomJsonFormats.nullTolerateFormats
val tp = manifest[T].runtimeClass
(actor ? request)
.mapTo[JValue]
.map {jvalue =>
try {
if (jvalue == JNull)
throw new Exception("Adapter can not return `null` value to OBP-API!")
else
Full(jvalue.extract[T])
} catch {
case e: Exception => {
val errorMsg = s"${InvalidConnectorResponse} extract response payload to type ${tp} fail. the payload content: ${compactRender(jvalue)}. $e"
sendOutboundAdapterError(errorMsg, request)
Failure(errorMsg, Full(e), Empty)
}
}
}
.recoverWith {
case e: ParseException => {
val errorMsg = s"${InvalidConnectorResponse} parse response payload to JValue fail. ${e.getMessage}"
sendOutboundAdapterError(errorMsg, request)
Future(Failure(errorMsg, Box !! (e.getCause) or Full(e), Empty))
}
case e: AskTimeoutException => {
echoKafkaServer
.map { _ => {
val errorMsg = s"${AdapterUnknownError} Timeout error, because Adapter do not return proper message to Kafka. ${e.getMessage}"
sendOutboundAdapterError(errorMsg, request)
Failure(errorMsg, Full(e), Empty)
}
}
.recover{
case e: Throwable => Failure(s"${KafkaServerUnavailable} Timeout error, because kafka do not return message to OBP-API. ${e.getMessage}", Full(e), Empty)
}
}
case e @ (_:AuthenticationException| _:AuthorizationException|
_:IllegalStateException| _:InterruptException|
_:SerializationException| _:TimeoutException|
_:KafkaException| _:ApiException)
=> Future(Failure(s"${KafkaUnknownError} OBP-API send message to kafka server failed. ${e.getMessage}", Full(e), Empty))
}
}
def sendOutboundAdapterError(error: String): Unit = actor ! OutboundAdapterError(error)
def sendOutboundAdapterError(error: String, request: TopicTrait): Unit = {
implicit val formats = CustomJsonFormats.formats
val requestJson =json.compactRender(Extraction.decompose(request))
s"""$error
|The request is: ${requestJson}
""".stripMargin
}
/**
* check Kafka server, where send and request success
* @return ObpApiLoopback with duration
*/
def echoKafkaServer: Future[ObpApiLoopback] = {
import com.openbankproject.commons.ExecutionContext.Implicits.global
implicit val formats = CustomJsonFormats.formats
for{
connectorVersion <- Future {APIUtil.getPropsValue("connector").openOrThrowException("connector props field `connector` not set")}
startTime = Helpers.now
req = ObpApiLoopback(connectorVersion, gitCommit, "")
obpApiLoopbackRespons <- (actor ? req)
.map(_.asInstanceOf[JValue].extract[ObpApiLoopback])
.map(_.copy(durationTime = (Helpers.now.getTime - startTime.getTime).toString))
} yield {
obpApiLoopbackRespons
}
}
}

View File

@ -1,27 +0,0 @@
package code.kafka
import akka.actor.{ActorSystem, Props => ActorProps}
import code.util.Helper
import code.util.Helper.MdcLoggable
object KafkaHelperActors extends MdcLoggable with KafkaHelper{
val props_hostname = Helper.getHostname
def startKafkaHelperActors(actorSystem: ActorSystem) = {
// List all the ActorSystems used in Kafka, for now, we have Kafka and KafkaStreams
val actorsKafkaHelper = Map(
//ActorProps[KafkaHelperActor] -> actorName //KafkaHelper.actorName, we use kafka-steam now.
ActorProps[KafkaStreamsHelperActor] -> actorName //KafkaHelper.actorName
)
//Create the actorSystem for all up list Kafka
actorsKafkaHelper.foreach { a => logger.info(actorSystem.actorOf(a._1, name = a._2)) }
}
//This method is called in Boot.scala, when the OBP-API start, if the connector is Kafka_*, it will create the ActorSystem for Kafka
def startLocalKafkaHelperWorkers(system: ActorSystem): Unit = {
logger.info("Starting local KafkaHelper workers")
startKafkaHelperActors(system)
}
}

View File

@ -1,7 +0,0 @@
package code.kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
trait MessageProcessorTrait[K, V] {
def processMessage(record: ConsumerRecord[K, V]): Unit
}

View File

@ -1,20 +0,0 @@
package code.kafka
import code.actorsystem.ObpLookupSystem
import code.kafka.actor.RequestResponseActor.Response
import code.util.Helper.MdcLoggable
import org.apache.kafka.clients.consumer.ConsumerRecord
/**
* This class implements behavior of North Side Consumer
* i.e. how the consumer processes a received Kafka message
*/
class NorthSideConsumerMessageProcessor extends MessageProcessorTrait[String, String] with MdcLoggable with KafkaHelper {
override def processMessage(record: ConsumerRecord[String, String]): Unit = {
val backendRequestId = record.key()
val payload = record.value()
logger.debug(s"kafka consumer :$record")
// Try to find a child actor of "KafkaStreamsHelperActor" with a name equal to value of backendRequestId
ObpLookupSystem.getKafkaActorChild(actorName, backendRequestId) ! Response(backendRequestId, payload)
}
}

View File

@ -1,124 +0,0 @@
package code.kafka
import java.util.regex.Pattern
import code.api.util.APIUtil
import code.util.ClassScanUtils
import code.util.Helper.MdcLoggable
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
object NorthSideConsumer {
private[this] val outboundNamePattern= Pattern.compile("""com\.openbankproject\.commons\..*(OutBound.+)""")
val listOfTopics : List[String] = (Set(
"OutboundGetAdapterInfo",
"OutboundGetBanks",
"OutboundGetBank",
"OutboundGetUserByUsernamePassword",
"OutboundGetAccounts",
"OutboundGetAccountbyAccountID",
"OutboundCheckBankAccountExists",
"OutboundGetCoreBankAccounts",
"OutboundGetCoreBankAccounts",
"OutboundGetTransactions",
"OutboundGetTransaction",
"OutboundCreateTransaction",
"OutboundGetBranches",
"OutboundGetBranch",
"OutboundGetAtms",
"OutboundGetAtm",
"OutboundCreateChallengeJune2017",
"OutboundCreateCounterparty",
"OutboundGetTransactionRequests210",
"OutboundGetCounterparties",
"OutboundGetCounterpartyByCounterpartyId",
"OutboundGetCounterparty",
"OutboundCounterparty",
"OutboundGetCounterpartyById",
"OutboundTransactionRequests",
"OutboundGetCustomersByUserId",
"OutboundGetCheckbookOrderStatus",
"OutboundGetCreditCardOrderStatus",
"OutboundGetBankAccountsHeld",
"OutboundGetChallengeThreshold",
"OutboundCreateChallengeSept2018",
"ObpApiLoopback" //This topic is tricky now, it is just used in api side: api produce and consumer it. Not used over adapter. Only for test api <--> kafka.
) ++ ClassScanUtils.findTypes(classInfo => outboundNamePattern.matcher(classInfo.name).matches())
.map(outboundNamePattern.matcher(_).replaceFirst("$1"))).toList
def consumerProperties(brokers: String, group: String, keyDeserealizer: String, valueDeserealizer: String): Map[String, String] = {
if (APIUtil.getPropsValue("kafka.use.ssl").getOrElse("false") == "true") {
Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> OBPKafkaConsumer.autoOffsetResetConfig,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> keyDeserealizer,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> valueDeserealizer,
"security.protocol" -> "SSL",
"ssl.truststore.location" -> APIUtil.getPropsValue("truststore.path").getOrElse(""),
"ssl.truststore.password" -> APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd),
"ssl.keystore.location" -> APIUtil.getPropsValue("keystore.path").getOrElse(""),
"ssl.keystore.password" -> APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd)
)
} else {
Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> OBPKafkaConsumer.autoOffsetResetConfig,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> keyDeserealizer,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> valueDeserealizer
)
}
}
def apply[K, V](brokers: String, topic: String, group: String, processor: MessageProcessorTrait[K, V]): NorthSideConsumer[K, V] =
new NorthSideConsumer[K, V](brokers, topic, group, classOf[StringDeserializer].getName, classOf[StringDeserializer].getName, processor)
}
class NorthSideConsumer[K, V](brokers: String, topic: String, group: String, keyDeserealizer: String, valueDeserealizer: String,
processor: MessageProcessorTrait[K, V]) extends Runnable with MdcLoggable with KafkaConfig {
import NorthSideConsumer._
import scala.collection.JavaConversions._
val consumer = new KafkaConsumer[K, V](consumerProperties(brokers, group, keyDeserealizer, valueDeserealizer))
//The following topic is for loopback, only for testing api <--> kafka
val apiLoopbackTopic = s"from.${clientId}.to.adapter.mf.caseclass.ObpApiLoopback"
val allTopicsOverAdapter= listOfTopics.map(t => s"to.${clientId}.caseclass.$t")
//we use the same topic to send to Kakfa and listening the same topic to get the message back.
//So there is no to.obp.api.1.caseclass..ObpApiLoopback at all. Just use `apiLoopbackTopic` in the response topic.
val allTopicsApiListening: List[String] = allTopicsOverAdapter :+ apiLoopbackTopic
consumer.subscribe(allTopicsApiListening)
@volatile var completed = false
@volatile var started = false
def complete(): Unit = {
completed = true
}
override def run(): Unit = {
while (!completed) {
val records = consumer.poll(100)
for (record <- records) {
processor.processMessage(record)
}
}
consumer.close()
logger.info("Consumer closed")
}
def start(): Unit = {
if(!started) {
logger.info("Consumer started")
val t = new Thread(this)
t.start()
started = true
}
}
}

View File

@ -1,5 +0,0 @@
package code.kafka
object OBPKafkaConsumer extends KafkaConfig {
lazy val primaryConsumer = NorthSideConsumer(bootstrapServers, "", groupId + "-north.side.consumer", new NorthSideConsumerMessageProcessor())
}

View File

@ -1,53 +0,0 @@
package code.kafka.actor
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, PoisonPill}
import code.util.Helper.MdcLoggable
import shapeless.ops.zipper.Down
import scala.concurrent.duration.DurationInt
object RequestResponseActor {
case class Request(backendRequestId: String, payload: String)
case class Response(backendRequestId: String, payload: String)
}
/**
* This Actor acts in next way:
* 1. Someone sends a message to it, i.e. "thisActor ? Request(backendRequestId, requestMessage)"
* 2. The actor log the request
* 3. The actor immediately start to listen to a Response(backendRequestId, responseMessage)
* without returning answer to "thisActor ? Request(backendRequestId, requestMessage)"
* 4. The actor receives the Response(backendRequestId, responseMessage
* 5. The actor sends answer to "thisActor ? Request(backendRequestId, requestMessage)"
* 6. The actor destroy itself
*
* Please note that this type of Actor during its life cycle:
* - leaves up to 60 seconds
* - serves only one Kafka message
*/
class RequestResponseActor extends Actor with ActorLogging with MdcLoggable {
import RequestResponseActor._
def receive = waitingForRequest
private def waitingForRequest: Receive = {
case Request(backendRequestId, payload) =>
implicit val ec = context.dispatcher
val timeout = context.system.scheduler.scheduleOnce(60.second, self, Down)
context become waitingForResponse(sender, timeout)
logger.info(s"Request (backendRequestId, payload) = ($backendRequestId, $payload) was sent.")
}
private def waitingForResponse(origin: ActorRef, timeout: Cancellable): Receive = {
case Response(backendRequestId, payload) =>
timeout.cancel()
origin ! payload
self ! PoisonPill
logger.info(s"Response (backendRequestId, payload) = ($backendRequestId, $payload) was processed.")
case Down =>
self ! PoisonPill
logger.info(s"Actor $self was destroyed by the scheduler.")
}
}

View File

@ -1,262 +0,0 @@
package code.kafka
import java.util.concurrent.{Future => JFuture}
import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.ProducerSettings
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import code.actorsystem.{ObpActorHelper, ObpActorInit}
import code.api.util.{APIUtil, CustomJsonFormats}
import code.bankconnectors.AvroSerializer
import code.kafka.actor.RequestResponseActor
import code.kafka.actor.RequestResponseActor.Request
import code.util.Helper.MdcLoggable
import com.openbankproject.commons.model.TopicTrait
import net.liftweb.json
import net.liftweb.json.JsonParser.ParseException
import net.liftweb.json.{Extraction, JsonAST}
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.{ExecutionException, Future}
import scala.util.Try
/**
* Actor for accessing kafka from North side.
*/
class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable with KafkaConfig with AvroSerializer {
implicit val formats = CustomJsonFormats.formats
implicit val materializer = ActorMaterializer()
import materializer._
/**
*Random select the partitions number from 0 to kafka.partitions value
*The specified partition number will be inside the Key.
*/
private def keyAndPartition = scala.util.Random.nextInt(partitions) + "_" + APIUtil.generateUUID()
private val producerSettings = if (APIUtil.getPropsValue("kafka.use.ssl").getOrElse("false") == "true") {
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
.withProperty("batch.size", "0")
.withParallelism(3)
.withProperty("security.protocol","SSL")
.withProperty("ssl.truststore.location", APIUtil.getPropsValue("truststore.path").getOrElse(""))
.withProperty("ssl.truststore.password", APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd))
.withProperty("ssl.keystore.location",APIUtil.getPropsValue("keystore.path").getOrElse(""))
.withProperty("ssl.keystore.password", APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd))
} else {
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
.withProperty("batch.size", "0")
.withParallelism(3)
}
private val producer = producerSettings.createKafkaProducer()
/**
* communication with Kafka, send and receive message.
* This method will send message to Kafka, using the specified key and partition for each topic
* And get the message from the specified partition and filter by key
*/
private val sendRequestAndGetResponseFromKafka: ((TopicPair, String, String) => Future[String]) = { (topic, key, value) =>
//When we send RequestTopic message, contain the partition in it, and when we get the ResponseTopic according to the partition.
val requestTopic = topic.request
val responseTopic = topic.response
if (NorthSideConsumer.listOfTopics.exists(_ == responseTopic)) {
logger.error(s"North Kafka Consumer is not subscribed to a topic: $responseTopic")
}
// This actor is used to listen to a message which will be sent by NorthSideConsumer
val actorListener = context.actorOf(Props[RequestResponseActor], key)
/**
* This function is used o send Kafka message in Async way to a Kafka broker
* In case the the broker cannot accept the message an error is logged
* @param requestTopic A topic used to send Kafka message to Adapter side
* @param key Kafka Message key
* @param value Kafka Message value
*/
def sendAsync(requestTopic: String, key: String, value: String): JFuture[RecordMetadata] = {
val message = new ProducerRecord[String, String](requestTopic, key, value)
logger.debug(s" kafka producer : $message")
producer.send(message, (_: RecordMetadata, e: Exception) => {
if (e != null) {
e.printStackTrace()
logger.error(s"unknown error happened in kafka producer,the following message to do producer properly: $message")
actorListener ! PoisonPill
}
})
}
def listenResponse: Future[String] = {
import akka.pattern.ask
// Listen to a message which will be sent by NorthSideConsumer
(actorListener ? Request(key, value)).mapTo[String] // this future will be fail future with AskTimeoutException
}
//producer publishes the message to a broker
try {
import scala.util.{Failure => JFailure, Success => JSuccess}
val jFuture = sendAsync(requestTopic, key, value)
if(jFuture.isDone) Try(jFuture.get()) match {
case JSuccess(_) => listenResponse
// reference KafkaProducer#send method source code, it may return KafkaProducer#FutureFailure, this case return fail future of ApiException
case JFailure(e: ExecutionException) => Future.failed(e.getCause)
case JFailure(e) => Future.failed(e) // impossible case, just add this case as insurance
} else {
listenResponse// here will not block, so don't worry sync thread
}
} catch {
case e:Throwable => Future.failed(e)
}
}
private val stringToJValueF: (String => Future[JsonAST.JValue]) = { r =>
logger.debug("kafka-consumer-stringToJValueF:" + r)
Future(json.parse(r)).recover {
case e: ParseException => throw new ParseException(s"parse json fail, the wrong json String is: $r", e)
}
}
val extractJValueToAnyF: (JsonAST.JValue => Future[Any]) = { r =>
logger.debug("kafka-consumer-extractJValueToAnyF:" + r)
Future(extractResult(r))
}
val anyToJValueF: (Any => Future[json.JValue]) = { m =>
logger.debug("kafka-produce-anyToJValueF:" + m)
Future(Extraction.decompose(m))
}
val serializeF: (json.JValue => Future[String]) = { m =>
logger.debug("kafka-produce-serializeF:" + m)
Future(json.compactRender(m))
}
//private val RESP: String = "{\"count\": \"\", \"data\": [], \"state\": \"\", \"pager\": \"\", \"target\": \"banks\"}"
override def preStart(): Unit = {
super.preStart()
val conn = {
val c = APIUtil.getPropsValue("connector").openOr("June2017")
if (c.contains("_")) c.split("_")(1) else c
}
//configuration optimization is postponed
//self ? conn
}
def receive = {
case value: String =>
logger.debug("kafka_request[value]: " + value)
for {
t <- Future(Topics.topicPairHardCode) // Just have two Topics: obp.request.version and obp.response.version
r <- sendRequestAndGetResponseFromKafka(t, keyAndPartition, value)
jv <- stringToJValueF(r)
} yield {
logger.debug("South Side recognises version info")
jv
}
// This is for KafkaMappedConnector_vJune2017, the request is TopicTrait
/**
* the follow matched case, if pipTo sender, then all exception will in Future, exception means:
* > net.liftweb.json.JsonParser.ParseException is response parse JValue fail
* > AskTimeoutException timeout but have no response return
* > (AuthenticationException| AuthorizationException| IllegalStateException| InterruptException| SerializationException| TimeoutException| KafkaException| ApiException) send message to kafka server fail
*/
case request: TopicTrait =>
logger.debug("kafka_request[TopicCaseClass]: " + request)
val f = for {
t <- Future(Topics.createTopicByClassName(request.getClass.getSimpleName))
d <- anyToJValueF(request)
s <- serializeF(d)
r <- sendRequestAndGetResponseFromKafka(t,keyAndPartition, s) //send s to kafka server,and get message, may case fail Futures:
jv <- stringToJValueF(r)// String to JValue, may return fail Future of net.liftweb.json.JsonParser.ParseException
} yield {
jv
}
f pipeTo sender
// This is for KafkaMappedConnector_JVMcompatible, KafkaMappedConnector_vMar2017 and KafkaMappedConnector, the request is Map[String, String]
case request: Map[_, _] =>
logger.debug("kafka_request[Map[String, String]]: " + request)
val orgSender = sender
val f = for {
t <- Future(Topics.topicPairFromProps) // Just have two Topics: Request and Response
d <- anyToJValueF(request)
v <- serializeF(d)
r <- sendRequestAndGetResponseFromKafka(t, keyAndPartition, v)
jv <- stringToJValueF(r)
} yield {
jv
}
f pipeTo orgSender
// This is used to send Outbound Adapter Error to Kafka topic responsable for it
case request: OutboundAdapterError =>
val key = APIUtil.generateUUID()
val value = request.error
val topic = s"from.obp.api.${apiInstanceId}.to.adapter.mf.caseclass.OutboundAdapterError"
val message = new ProducerRecord[String, String](topic, key, value)
logger.debug(s" kafka producer's OutboundAdapterError : $message")
producer.send(message, new Callback {
override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
val msg = e.printStackTrace()
logger.error(s"unknown error happened in kafka producer's OutboundAdapterError, the following message to do producer properly: $message")
}
}
})
}
}
/**
* This case class design an error send to Kafka topic "from.obp.api.${apiInstanceId}.to.adapter.mf.caseclass.OutboundAdapterError
* @param error the error message sent to Kafka
*/
case class OutboundAdapterError(error: String)
/**
* This case class design a pair of Topic, for both North and South side.
* They are a pair
* @param request eg: obp.June2017.N.GetBanks
* @param response eg: obp.June2017.S.GetBanks
*/
case class TopicPair(request: String, response: String)
object Topics extends KafkaConfig {
/**
* Two topics:
* Request : North is producer, South is the consumer. North --> South
* Response: South is producer, North is the consumer. South --> North
*/
private val requestTopic = APIUtil.getPropsValue("kafka.request_topic").openOr("Request")
private val responseTopic = APIUtil.getPropsValue("kafka.response_topic").openOr("Response")
/**
* set in props, we have two topics: Request and Response
*/
val topicPairFromProps = TopicPair(requestTopic, responseTopic)
def topicPairHardCode = TopicPair("obp.Request.version", "obp.Response.version")
def createTopicByClassName(className: String): TopicPair = {
/**
* eg:
* from.obp.api.1.to.adapter.mf.caseclass.GetBank
* to.obp.api.1.caseclass.GetBank
*/
TopicPair(
s"from.obp.api.${apiInstanceId}.to.adapter.mf.caseclass.${className.replace("$", "")}",
s"to.obp.api.${apiInstanceId}.caseclass.${className.replace("$", "")}"
)
}
}

View File

@ -1122,7 +1122,7 @@ def restoreSomeSessions(): Unit = {
S.error(S.?(ErrorMessages.UsernameHasBeenLocked))
loginRedirect(ObpS.param("Referer").or(S.param("Referer")))
// Check if user came from kafka/obpjvm/stored_procedure and
// Check if user came from CBS and
// if User is NOT locked. Then check username and password
// from connector in case they changed on the south-side
case Full(user) if externalUserIsValidatedAndNotLocked(usernameFromGui, user) && testExternalPassword(usernameFromGui, passwordFromGui) =>
@ -1131,7 +1131,7 @@ def restoreSomeSessions(): Unit = {
val preLoginState = capturePreLoginState()
logger.info("login redirect: " + loginRedirect.get)
val redirect = redirectUri(user.user.foreign)
//This method is used for connector = kafka* || obpjvm*
//This method is used for connector = cbs* || obpjvm*
//It will update the views and createAccountHolder ....
registeredUserHelper(user.getProvider(),user.username.get)
// User init actions
@ -1150,8 +1150,7 @@ def restoreSomeSessions(): Unit = {
// If user cannot be found locally, try to authenticate user via connector
case Empty if (APIUtil.getPropsAsBoolValue("connector.user.authentication", false) ||
APIUtil.getPropsAsBoolValue("kafka.user.authentication", false) ) =>
case Empty if (APIUtil.getPropsAsBoolValue("connector.user.authentication", false)) =>
val preLoginState = capturePreLoginState()
logger.info("login redirect: " + loginRedirect.get)
@ -1235,7 +1234,7 @@ def restoreSomeSessions(): Unit = {
* This method will update the views and createAccountHolder ....
*/
def registeredUserHelper(provider: String, username: String) = {
if (connector.startsWith("kafka")) {
if (connector.startsWith("rest_vMar2019")) {
for {
u <- Users.users.vend.getUserByProviderAndUsername(provider, username)
} yield {

View File

@ -30,7 +30,7 @@ trait ChallengeProvider {
def getChallengesByBasketId(basketId: String): Box[List[ChallengeTrait]]
/**
* There is another method: Connector.validateChallengeAnswer, it validate the challenge over Kafka.
* There is another method: Connector.validateChallengeAnswer, it validates the challenge over CBS.
* This method, will validate the answer in OBP side.
*/
def validateChallenge(challengeId: String, challengeAnswer: String, userId: Option[String]) : Box[ChallengeTrait]

View File

@ -235,7 +235,6 @@ object Helper extends Loggable {
/**
* Used for version extraction from props string
*/
val matchAnyKafka = "kafka.*|star".r
val matchAnyStoredProcedure = "stored_procedure.*|star".r
/**

View File

@ -93,12 +93,12 @@ trait Views {
final def getPrivateBankAccountsFuture(user : User, bankId : BankId) : Future[List[BankIdAccountId]] = Future {getPrivateBankAccounts(user, bankId)}
/**
* @param bankIdAccountId the IncomingAccount from Kafka
* @param bankIdAccountId the IncomingAccount from CBS
* @param viewId This field should be selected one from Owner/Public/Accountant/Auditor, only support
* these four values.
* @return This will insert a View (e.g. the owner view) for an Account (BankAccount), and return the view
* Note:
* updateUserAccountViews would call createAccountView once per View specified in the IncomingAccount from Kafka.
* updateUserAccountViews would call createAccountView once per View specified in the IncomingAccount from CBS.
* We should cache this function because the available views on an account will change rarely.
*
*/

View File

@ -423,25 +423,7 @@ class API2_2_0Test extends V220ServerSetup with DefaultUsers {
val response: APIResponse = makeGetRequest(request)
response.code should be (200)
}
scenario("Get Message Docs - kafka_vSept2018") {
val request = (v2_2Request / "message-docs" / "kafka_vSept2018" )
val response: APIResponse = makeGetRequest(request)
response.code should be (200)
}
scenario("Get Message Docs - kafka_vMay2019") {
val request = (v2_2Request / "message-docs" / "kafka_vMay2019" )
val response: APIResponse = makeGetRequest(request)
response.code should be (200)
}
scenario("Get Message Docs - rest_vMar2019") {
val request = (v2_2Request / "message-docs" / "rest_vMar2019" )
val response: APIResponse = makeGetRequest(request)
response.code should be (200)
}
}
scenario("Get Message Docs - stored_procedure_vDec2019") {
val request = (v2_2Request / "message-docs" / "stored_procedure_vDec2019" )
val response: APIResponse = makeGetRequest(request)

View File

@ -53,7 +53,7 @@ class ObpApiLoopbackTest extends V310ServerSetup {
Then("We should get a 400")
response310.code should equal(400)
val connectorVersion = APIUtil.getPropsValue("connector").openOrThrowException("connector props filed `connector` not set")
val errorMessage = s"${NotImplemented}for connector ${connectorVersion}"
val errorMessage = s"${NotImplemented}"
And("error should be " + errorMessage)
response310.body.extract[ErrorMessage].message should equal (errorMessage)
}

View File

@ -1,79 +0,0 @@
package code.container
import code.api.v5_0_0.V500ServerSetup
import code.setup.DefaultUsers
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.Ignore
import org.testcontainers.kafka.KafkaContainer
import java.util.{Collections, Properties}
import scala.jdk.CollectionConverters._
@Ignore
class EmbeddedKafka extends V500ServerSetup with DefaultUsers {
val kafkaContainer: KafkaContainer = new KafkaContainer("apache/kafka-native:3.8.0")
// It registers a shutdown hook, which is a block of code (or function) that runs when the application terminates,
// - either normally(e.g., when the main method completes)
// - or due to an external signal(e.g., Ctrl + C or termination by the operating system).
sys.addShutdownHook {
kafkaContainer.stop()
}
override def beforeAll(): Unit = {
super.beforeAll()
// Start RabbitMQ container
kafkaContainer.start()
}
override def afterAll(): Unit = {
super.afterAll()
kafkaContainer.stop()
}
feature(s"test EmbeddedKafka") {
scenario("Publish and Consume Message") {
val bootstrapServers: String = kafkaContainer.getBootstrapServers
// Kafka producer properties
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
// Kafka consumer properties
val consumerProps = new Properties()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// Create a producer
val producer = new KafkaProducer[String, String](producerProps)
val topic = "test-topic"
val key = "test-key"
val value = "Hello, Kafka!"
// Produce a message
producer.send(new ProducerRecord[String, String](topic, key, value))
producer.close()
// Create a consumer
val consumer = new KafkaConsumer[String, String](consumerProps)
consumer.subscribe(Collections.singletonList(topic))
// Consume the message
val records = consumer.poll(5000L)
consumer.close()
val messages = records.asScala.map(record => record.value())
messages should contain(value)
}
}
}

View File

@ -101,7 +101,6 @@ case class TransactionRequestStatusValue(value : String) {
override def toString = value
}
//Note: change case class -> trait, for kafka extends it
trait TransactionRequestStatus{
def transactionRequestId : String
def bulkTransactionsStatus: List[TransactionStatus]

View File

@ -44,7 +44,7 @@ trait JsonFieldReName
/**
*
* This is the base class for all kafka outbound case class
* This is the base class for all CBS outbound case class
* action and messageFormat are mandatory
* The optionalFields can be any other new fields .
*/
@ -251,7 +251,7 @@ trait CustomerAddress {
def insertDate: Date
}
// This is the common InboundAccount from all Kafka/remote, not finished yet.
// This is the common InboundAccount from all CBS/remote, not finished yet.
trait InboundAccount{
def bankId: String
def branchId: String
@ -392,8 +392,6 @@ trait RoutingT {
def address: String
}
// @see 'case request: TopicTrait' in code/bankconnectors/kafkaStreamsHelper.scala
// This is for Kafka topics for both North and South sides.
// In OBP-API, these topics will be created automatically.
trait TopicTrait {

View File

@ -234,7 +234,6 @@ trait View {
* 2rd: the view can grant the access to any other (not owner) users. eg: Simon's accountant view can grant access to Carola, then Carola can see Simon's accountant data
* also look into some createView methods in code, you can understand more:
* create1: code.bankconnectors.Connector.createViews
* need also look into here KafkaMappedConnector_vMar2017.updateUserAccountViewsOld
* after createViews method, always need call addPermission(v.uid, user). This will create this field
* Create2: code.model.dataAccess.BankAccountCreation.createOwnerView
* after create view, always need call `addPermission(ownerViewUID, user)`, this will create this field

View File

@ -13,8 +13,6 @@
<scala.version>2.12</scala.version>
<scala.compiler>2.12.12</scala.compiler>
<akka.version>2.5.32</akka.version>
<akka-streams-kafka.version>2.0.5</akka-streams-kafka.version>
<kafka.version>1.1.0</kafka.version>
<avro.version>1.8.2</avro.version>
<lift.version>3.5.0</lift.version>
<jetty.version>9.4.50.v20221201</jetty.version>

View File

@ -3,6 +3,8 @@
### Most recent changes at top of file
```
Date Commit Action
17/03/2025 166e4f2a Removed Kafka commits: 166e4f2a,7f24802e,6f0a3b53,f22763c3,
76fd73f7,7d1db2c2,dde267b1,7f259e49,00885604,a2847ce2,89ee59ac
17/02/2025 5877d2f2 Bootstrap Super User
Added props super_admin_username=TomWilliams
Added props super_admin_inital_password=681aeeb9f681aeeb9f681aeeb9

View File

@ -36,7 +36,6 @@ See [completed_developments.md](completed_developments.md)
* Clarify Account Customer Owners.
* Auto feed of Firehose Accounts/Transactions/Customers into Elastic Search
* Kafka Stream API for Accounts/Transactions/Customers
### SDK Documentation Upgrade