diff --git a/obp-api/pom.xml b/obp-api/pom.xml index 9abb85490..fcfd4dee0 100644 --- a/obp-api/pom.xml +++ b/obp-api/pom.xml @@ -341,6 +341,14 @@ geocalc 0.5.7 + + + net.manub + scalatest-embedded-kafka_2.12 + 1.1.1 + test + + diff --git a/obp-api/src/main/scala/code/actorsystem/ObpActorSystem.scala b/obp-api/src/main/scala/code/actorsystem/ObpActorSystem.scala index fa1d83ce7..6995e0af2 100644 --- a/obp-api/src/main/scala/code/actorsystem/ObpActorSystem.scala +++ b/obp-api/src/main/scala/code/actorsystem/ObpActorSystem.scala @@ -13,7 +13,9 @@ object ObpActorSystem extends MdcLoggable { var obpActorSystem: ActorSystem = _ var northSideAkkaConnectorActorSystem: ActorSystem = _ - def startLocalActorSystem(): ActorSystem = { + def startLocalActorSystem() = localActorSystem + + lazy val localActorSystem: ActorSystem = { logger.info("Starting local actor system") val localConf = ObpActorConfig.localConf logger.info(localConf) diff --git a/obp-api/src/main/scala/code/api/util/ExampleValue.scala b/obp-api/src/main/scala/code/api/util/ExampleValue.scala index e340a40b1..7ae96acbd 100644 --- a/obp-api/src/main/scala/code/api/util/ExampleValue.scala +++ b/obp-api/src/main/scala/code/api/util/ExampleValue.scala @@ -1,29 +1,30 @@ package code.api.util -import code.api.util.Glossary.{GlossaryItem, glossaryItems, makeGlossaryItem} +import code.api.util.Glossary.{glossaryItems, makeGlossaryItem} + +case class ConnectorField(value: String, description: String) { + + // def valueAndDescription: String = { + // s"${value} : ${description}".toString + // } + +} object ExampleValue { - case class ConnectorField(value: String, description: String) { -// def valueAndDescription: String = { -// s"${value} : ${description}".toString -// } + lazy val bankIdGlossary = glossaryItems.find(_.title == "Bank.bank_id").map(_.textDescription) - } + lazy val bankIdExample = ConnectorField("GENODEM1GLS", s"A string that MUST uniquely identify the bank on this OBP instance. It COULD be a UUID but is generally a short string that easily identifies the bank / brand it represents.") - val bankIdGlossary = glossaryItems.find(_.title == "Bank.bank_id").map(_.textDescription) - - val bankIdExample = ConnectorField("GENODEM1GLS", s"A string that MUST uniquely identify the bank on this OBP instance. It COULD be a UUID but is generally a short string that easily identifies the bank / brand it represents.") - - val accountIdExample = ConnectorField("8ca8a7e4-6d02-40e3-a129-0b2bf89de9f0", s"A string that, in combination with the bankId MUST uniquely identify the account on this OBP instance. SHOULD be a UUID. MUST NOT be able to guess accountNumber from accountID. OBP-API or Adapter keeps a mapping between accountId and accountNumber. AccountId is a non reversible hash of the human readable account number.") + lazy val accountIdExample = ConnectorField("8ca8a7e4-6d02-40e3-a129-0b2bf89de9f0", s"A string that, in combination with the bankId MUST uniquely identify the account on this OBP instance. SHOULD be a UUID. MUST NOT be able to guess accountNumber from accountID. OBP-API or Adapter keeps a mapping between accountId and accountNumber. AccountId is a non reversible hash of the human readable account number.") val accountNumberExample = ConnectorField("546387432", s"A human friendly string that identifies the account at the bank, possibly in combination with the branch and account type.") val sessionIdExample = ConnectorField("b4e0352a-9a0f-4bfa-b30b-9003aa467f50", s"A string that MUST uniquely identify the session on this OBP instance, can be used in all cache. ") - val userIdExample = ConnectorField("9ca9a7e4-6d02-40e3-a129-0b2bf89de9b1", s"A string that MUST uniquely identify the user on this OBP instance.") + lazy val userIdExample = ConnectorField("9ca9a7e4-6d02-40e3-a129-0b2bf89de9b1", s"A string that MUST uniquely identify the user on this OBP instance.") glossaryItems += makeGlossaryItem("Adapter.userId", userIdExample) @@ -34,7 +35,7 @@ object ExampleValue { glossaryItems += makeGlossaryItem("API.correlation_id", correlationIdExample) - val customerIdExample = ConnectorField("7uy8a7e4-6d02-40e3-a129-0b2bf89de8uh", s"A non human friendly string that identifies the customer and is used in URLs. This SHOULD NOT be the customer number. The combination of customerId and bankId MUST be unique on an OBP instance. customerId SHOULD be unique on an OBP instance. Ideally customerId is a UUID. A mapping between customer number and customer id is kept in OBP.") + lazy val customerIdExample = ConnectorField("7uy8a7e4-6d02-40e3-a129-0b2bf89de8uh", s"A non human friendly string that identifies the customer and is used in URLs. This SHOULD NOT be the customer number. The combination of customerId and bankId MUST be unique on an OBP instance. customerId SHOULD be unique on an OBP instance. Ideally customerId is a UUID. A mapping between customer number and customer id is kept in OBP.") glossaryItems += makeGlossaryItem("Adapter.customerId", customerIdExample) val consumerIdExample = ConnectorField("7uy8a7e4-6d02-40e3-a129-0b2bf89de8uh", s"A non human friendly string that identifies the consumer. It is the app which calls the apis") diff --git a/obp-api/src/main/scala/code/api/util/Glossary.scala b/obp-api/src/main/scala/code/api/util/Glossary.scala index 3bc6443fe..e58b974b1 100644 --- a/obp-api/src/main/scala/code/api/util/Glossary.scala +++ b/obp-api/src/main/scala/code/api/util/Glossary.scala @@ -1,7 +1,7 @@ package code.api.util import code.api.util.APIUtil.{getOAuth2ServerUrl, getObpApiRoot, getServerUrl} -import code.api.util.ExampleValue._ +import code.api.util.ExampleValue.{accountIdExample, bankIdExample, customerIdExample, userIdExample} import scala.collection.mutable.ArrayBuffer diff --git a/obp-api/src/main/scala/code/bankconnectors/vSept2018/KafkaMappedConnector_vSept2018.scala b/obp-api/src/main/scala/code/bankconnectors/vSept2018/KafkaMappedConnector_vSept2018.scala index a837f7dbd..e0bc3f02d 100644 --- a/obp-api/src/main/scala/code/bankconnectors/vSept2018/KafkaMappedConnector_vSept2018.scala +++ b/obp-api/src/main/scala/code/bankconnectors/vSept2018/KafkaMappedConnector_vSept2018.scala @@ -191,7 +191,7 @@ trait KafkaMappedConnector_vSept2018 extends Connector with KafkaHelper with Mdc val viewBasicExample = ViewBasic("owner","Owner", "This is the owner view") val internalBasicCustomerExample = InternalBasicCustomer( - bankId = bankIdExample.value, + bankId = ExampleValue.bankIdExample.value, customerId = customerIdExample.value, customerNumber = customerNumberExample.value, legalName = legalNameExample.value, diff --git a/obp-api/src/main/scala/code/kafka/NorthSideConsumer.scala b/obp-api/src/main/scala/code/kafka/NorthSideConsumer.scala index 779039523..e514e9b8d 100644 --- a/obp-api/src/main/scala/code/kafka/NorthSideConsumer.scala +++ b/obp-api/src/main/scala/code/kafka/NorthSideConsumer.scala @@ -92,8 +92,8 @@ class NorthSideConsumer[K, V](brokers: String, topic: String, group: String, key val allTopicsApiListening: List[String] = allTopicsOverAdapter :+ apiLoopbackTopic consumer.subscribe(allTopicsApiListening) - var completed = false - var started = false + @volatile var completed = false + @volatile var started = false def complete(): Unit = { completed = true diff --git a/obp-api/src/test/scala/code/kafka/KafkaTest.scala b/obp-api/src/test/scala/code/kafka/KafkaTest.scala new file mode 100644 index 000000000..b9d2f0e60 --- /dev/null +++ b/obp-api/src/test/scala/code/kafka/KafkaTest.scala @@ -0,0 +1,61 @@ +package code.kafka + +import java.util.Date + +import code.api.util.{CallContext, ExampleValue} +import code.bankconnectors.vMar2017.{InboundBank} +import code.bankconnectors.vSept2018._ +import code.setup.KafkaSetup +import com.openbankproject.commons.dto.InBoundGetKycStatuses +import com.openbankproject.commons.model._ +import net.liftweb.common.{Box, Full} +import net.liftweb.json + +import scala.collection.immutable.List +import scala.concurrent.Await +import scala.concurrent.duration.{Duration, _} + +class KafkaTest extends KafkaSetup { + val waitTime: Duration = (10 second) + + + feature("Send and retrieve message") { + scenario("Send and retrieve message directly to and from kafka") { + val emptyStatusMessage = InboundStatusMessage("", "", "", "") + val inBound = InboundGetBanks(InboundAuthInfo("", ""), Status("", List(emptyStatusMessage)), List(InboundBank("1", "2", "3", "4"))) + When("send a OutboundGetBanks message") + + dispathResponse(inBound) + val req = OutboundGetBanks(AuthInfo()) + + val future = processToFuture[OutboundGetBanks](req) + val result:json.JValue = Await.result(future, waitTime) + + val banks = result.extract[InboundGetBanks] + banks should be equals (inBound) + } + + /** + * override val bankId: String, + * override val customerId: String, + * override val customerNumber : String, + * override val ok : Boolean, + * override val date : Date + */ + scenario("Send and retrieve api message") { + When("send a OutboundGetKycStatuses api message") + val emptyStatusMessage = InboundStatusMessage("", "", "", "") + val kycStatusCommons = KycStatusCommons(bankId = "hello_bank_id", customerId = "hello_customer_id", customerNumber = "hello_customer_number", ok = true, date = new Date()) + val singleInboundBank = List(kycStatusCommons) + val inboundAdapterCallContext = InboundAdapterCallContext(correlationId="some_correlationId") + val inBound = InBoundGetKycStatuses(inboundAdapterCallContext, Status("", List(emptyStatusMessage)), singleInboundBank) + + dispathResponse(inBound) + + val future = KafkaMappedConnector_vSept2018.getKycStatuses(kycStatusCommons.customerId, Some(CallContext())) + val result: (Box[List[KycStatus]], Option[CallContext]) = Await.result(future, waitTime) + val expectResult = Full(singleInboundBank) + result._1 should be equals(expectResult) + } + } +} diff --git a/obp-api/src/test/scala/code/setup/KafkaSetup.scala b/obp-api/src/test/scala/code/setup/KafkaSetup.scala new file mode 100644 index 000000000..69eb3790a --- /dev/null +++ b/obp-api/src/test/scala/code/setup/KafkaSetup.scala @@ -0,0 +1,71 @@ +package code.setup + +import code.actorsystem.ObpActorSystem +import code.api.util.CustomJsonFormats +import code.kafka._ +import code.util.Helper.MdcLoggable +import net.liftweb.json +import net.liftweb.json.{DefaultFormats, Extraction} +import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.scalatest.{FeatureSpec, _} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +trait KafkaSetup extends FeatureSpec with EmbeddedKafka with KafkaHelper + with BeforeAndAfterEach with GivenWhenThen + with Matchers with MdcLoggable { + + + + implicit val formats = CustomJsonFormats.formats + implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181) //TODO the port should read from test.default.props, but fail + implicit val stringSerializer = new StringSerializer + implicit val stringDeserializer = new StringDeserializer + + lazy val requestMapResponseTopics:Map[String, String] = NorthSideConsumer.listOfTopics + .map(Topics.createTopicByClassName) + .map(pair => (pair.request, pair.response)) + .toMap + lazy val requestTopics = requestMapResponseTopics.keySet + + override def beforeEach(): Unit = { + super.beforeEach() + + EmbeddedKafka.start() + + createCustomTopic("Request", Map.empty, 10, 1) + createCustomTopic("Response", Map.empty, 10, 1) + if(!OBPKafkaConsumer.primaryConsumer.started){ + val actorSystem = ObpActorSystem.startLocalActorSystem + KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem) + // Start North Side Consumer if it's not already started + OBPKafkaConsumer.primaryConsumer.start() + } + } + + override def afterEach(): Unit = { + super.afterEach() + OBPKafkaConsumer.primaryConsumer.complete() + EmbeddedKafka.stop() + } + + /** + * send an object to kafka as response + * + * @param inBound inBound object that will send to kafka as a response + * @tparam T Outbound type + */ + def dispathResponse(inBound: AnyRef): Unit = { + val inBoundStr = json.compactRender(Extraction.decompose(inBound)) + Future{ + val requestKeyValue = consumeNumberKeyedMessagesFromTopics(requestTopics, 1, true) + val (requestTopic, keyValueList) = requestKeyValue.find(_._2.nonEmpty).get + val (key, _) = keyValueList.head + val responseTopic = requestMapResponseTopics(requestTopic) + publishToKafka(responseTopic, key, inBoundStr) + } + } + +}