add embeded kafka unit test, but not passed yet

This commit is contained in:
shuang 2019-05-14 20:52:50 +08:00
parent cfa06ced0f
commit 13cd4c3976
8 changed files with 161 additions and 18 deletions

View File

@ -341,6 +341,14 @@
<artifactId>geocalc</artifactId>
<version>0.5.7</version>
</dependency>
<!-- embeded kafka for unit test start -->
<dependency>
<groupId>net.manub</groupId>
<artifactId>scalatest-embedded-kafka_2.12</artifactId>
<version>1.1.1</version>
<scope>test</scope>
</dependency>
<!-- embeded kafka for unit test end -->
</dependencies>
<build>

View File

@ -13,7 +13,9 @@ object ObpActorSystem extends MdcLoggable {
var obpActorSystem: ActorSystem = _
var northSideAkkaConnectorActorSystem: ActorSystem = _
def startLocalActorSystem(): ActorSystem = {
def startLocalActorSystem() = localActorSystem
lazy val localActorSystem: ActorSystem = {
logger.info("Starting local actor system")
val localConf = ObpActorConfig.localConf
logger.info(localConf)

View File

@ -1,29 +1,30 @@
package code.api.util
import code.api.util.Glossary.{GlossaryItem, glossaryItems, makeGlossaryItem}
import code.api.util.Glossary.{glossaryItems, makeGlossaryItem}
case class ConnectorField(value: String, description: String) {
// def valueAndDescription: String = {
// s"${value} : ${description}".toString
// }
}
object ExampleValue {
case class ConnectorField(value: String, description: String) {
// def valueAndDescription: String = {
// s"${value} : ${description}".toString
// }
lazy val bankIdGlossary = glossaryItems.find(_.title == "Bank.bank_id").map(_.textDescription)
}
lazy val bankIdExample = ConnectorField("GENODEM1GLS", s"A string that MUST uniquely identify the bank on this OBP instance. It COULD be a UUID but is generally a short string that easily identifies the bank / brand it represents.")
val bankIdGlossary = glossaryItems.find(_.title == "Bank.bank_id").map(_.textDescription)
val bankIdExample = ConnectorField("GENODEM1GLS", s"A string that MUST uniquely identify the bank on this OBP instance. It COULD be a UUID but is generally a short string that easily identifies the bank / brand it represents.")
val accountIdExample = ConnectorField("8ca8a7e4-6d02-40e3-a129-0b2bf89de9f0", s"A string that, in combination with the bankId MUST uniquely identify the account on this OBP instance. SHOULD be a UUID. MUST NOT be able to guess accountNumber from accountID. OBP-API or Adapter keeps a mapping between accountId and accountNumber. AccountId is a non reversible hash of the human readable account number.")
lazy val accountIdExample = ConnectorField("8ca8a7e4-6d02-40e3-a129-0b2bf89de9f0", s"A string that, in combination with the bankId MUST uniquely identify the account on this OBP instance. SHOULD be a UUID. MUST NOT be able to guess accountNumber from accountID. OBP-API or Adapter keeps a mapping between accountId and accountNumber. AccountId is a non reversible hash of the human readable account number.")
val accountNumberExample = ConnectorField("546387432", s"A human friendly string that identifies the account at the bank, possibly in combination with the branch and account type.")
val sessionIdExample = ConnectorField("b4e0352a-9a0f-4bfa-b30b-9003aa467f50", s"A string that MUST uniquely identify the session on this OBP instance, can be used in all cache. ")
val userIdExample = ConnectorField("9ca9a7e4-6d02-40e3-a129-0b2bf89de9b1", s"A string that MUST uniquely identify the user on this OBP instance.")
lazy val userIdExample = ConnectorField("9ca9a7e4-6d02-40e3-a129-0b2bf89de9b1", s"A string that MUST uniquely identify the user on this OBP instance.")
glossaryItems += makeGlossaryItem("Adapter.userId", userIdExample)
@ -34,7 +35,7 @@ object ExampleValue {
glossaryItems += makeGlossaryItem("API.correlation_id", correlationIdExample)
val customerIdExample = ConnectorField("7uy8a7e4-6d02-40e3-a129-0b2bf89de8uh", s"A non human friendly string that identifies the customer and is used in URLs. This SHOULD NOT be the customer number. The combination of customerId and bankId MUST be unique on an OBP instance. customerId SHOULD be unique on an OBP instance. Ideally customerId is a UUID. A mapping between customer number and customer id is kept in OBP.")
lazy val customerIdExample = ConnectorField("7uy8a7e4-6d02-40e3-a129-0b2bf89de8uh", s"A non human friendly string that identifies the customer and is used in URLs. This SHOULD NOT be the customer number. The combination of customerId and bankId MUST be unique on an OBP instance. customerId SHOULD be unique on an OBP instance. Ideally customerId is a UUID. A mapping between customer number and customer id is kept in OBP.")
glossaryItems += makeGlossaryItem("Adapter.customerId", customerIdExample)
val consumerIdExample = ConnectorField("7uy8a7e4-6d02-40e3-a129-0b2bf89de8uh", s"A non human friendly string that identifies the consumer. It is the app which calls the apis")

View File

@ -1,7 +1,7 @@
package code.api.util
import code.api.util.APIUtil.{getOAuth2ServerUrl, getObpApiRoot, getServerUrl}
import code.api.util.ExampleValue._
import code.api.util.ExampleValue.{accountIdExample, bankIdExample, customerIdExample, userIdExample}
import scala.collection.mutable.ArrayBuffer

View File

@ -191,7 +191,7 @@ trait KafkaMappedConnector_vSept2018 extends Connector with KafkaHelper with Mdc
val viewBasicExample = ViewBasic("owner","Owner", "This is the owner view")
val internalBasicCustomerExample = InternalBasicCustomer(
bankId = bankIdExample.value,
bankId = ExampleValue.bankIdExample.value,
customerId = customerIdExample.value,
customerNumber = customerNumberExample.value,
legalName = legalNameExample.value,

View File

@ -92,8 +92,8 @@ class NorthSideConsumer[K, V](brokers: String, topic: String, group: String, key
val allTopicsApiListening: List[String] = allTopicsOverAdapter :+ apiLoopbackTopic
consumer.subscribe(allTopicsApiListening)
var completed = false
var started = false
@volatile var completed = false
@volatile var started = false
def complete(): Unit = {
completed = true

View File

@ -0,0 +1,61 @@
package code.kafka
import java.util.Date
import code.api.util.{CallContext, ExampleValue}
import code.bankconnectors.vMar2017.{InboundBank}
import code.bankconnectors.vSept2018._
import code.setup.KafkaSetup
import com.openbankproject.commons.dto.InBoundGetKycStatuses
import com.openbankproject.commons.model._
import net.liftweb.common.{Box, Full}
import net.liftweb.json
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, _}
class KafkaTest extends KafkaSetup {
val waitTime: Duration = (10 second)
feature("Send and retrieve message") {
scenario("Send and retrieve message directly to and from kafka") {
val emptyStatusMessage = InboundStatusMessage("", "", "", "")
val inBound = InboundGetBanks(InboundAuthInfo("", ""), Status("", List(emptyStatusMessage)), List(InboundBank("1", "2", "3", "4")))
When("send a OutboundGetBanks message")
dispathResponse(inBound)
val req = OutboundGetBanks(AuthInfo())
val future = processToFuture[OutboundGetBanks](req)
val result:json.JValue = Await.result(future, waitTime)
val banks = result.extract[InboundGetBanks]
banks should be equals (inBound)
}
/**
* override val bankId: String,
* override val customerId: String,
* override val customerNumber : String,
* override val ok : Boolean,
* override val date : Date
*/
scenario("Send and retrieve api message") {
When("send a OutboundGetKycStatuses api message")
val emptyStatusMessage = InboundStatusMessage("", "", "", "")
val kycStatusCommons = KycStatusCommons(bankId = "hello_bank_id", customerId = "hello_customer_id", customerNumber = "hello_customer_number", ok = true, date = new Date())
val singleInboundBank = List(kycStatusCommons)
val inboundAdapterCallContext = InboundAdapterCallContext(correlationId="some_correlationId")
val inBound = InBoundGetKycStatuses(inboundAdapterCallContext, Status("", List(emptyStatusMessage)), singleInboundBank)
dispathResponse(inBound)
val future = KafkaMappedConnector_vSept2018.getKycStatuses(kycStatusCommons.customerId, Some(CallContext()))
val result: (Box[List[KycStatus]], Option[CallContext]) = Await.result(future, waitTime)
val expectResult = Full(singleInboundBank)
result._1 should be equals(expectResult)
}
}
}

View File

@ -0,0 +1,71 @@
package code.setup
import code.actorsystem.ObpActorSystem
import code.api.util.CustomJsonFormats
import code.kafka._
import code.util.Helper.MdcLoggable
import net.liftweb.json
import net.liftweb.json.{DefaultFormats, Extraction}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.{FeatureSpec, _}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
trait KafkaSetup extends FeatureSpec with EmbeddedKafka with KafkaHelper
with BeforeAndAfterEach with GivenWhenThen
with Matchers with MdcLoggable {
implicit val formats = CustomJsonFormats.formats
implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181) //TODO the port should read from test.default.props, but fail
implicit val stringSerializer = new StringSerializer
implicit val stringDeserializer = new StringDeserializer
lazy val requestMapResponseTopics:Map[String, String] = NorthSideConsumer.listOfTopics
.map(Topics.createTopicByClassName)
.map(pair => (pair.request, pair.response))
.toMap
lazy val requestTopics = requestMapResponseTopics.keySet
override def beforeEach(): Unit = {
super.beforeEach()
EmbeddedKafka.start()
createCustomTopic("Request", Map.empty, 10, 1)
createCustomTopic("Response", Map.empty, 10, 1)
if(!OBPKafkaConsumer.primaryConsumer.started){
val actorSystem = ObpActorSystem.startLocalActorSystem
KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem)
// Start North Side Consumer if it's not already started
OBPKafkaConsumer.primaryConsumer.start()
}
}
override def afterEach(): Unit = {
super.afterEach()
OBPKafkaConsumer.primaryConsumer.complete()
EmbeddedKafka.stop()
}
/**
* send an object to kafka as response
*
* @param inBound inBound object that will send to kafka as a response
* @tparam T Outbound type
*/
def dispathResponse(inBound: AnyRef): Unit = {
val inBoundStr = json.compactRender(Extraction.decompose(inBound))
Future{
val requestKeyValue = consumeNumberKeyedMessagesFromTopics(requestTopics, 1, true)
val (requestTopic, keyValueList) = requestKeyValue.find(_._2.nonEmpty).get
val (key, _) = keyValueList.head
val responseTopic = requestMapResponseTopics(requestTopic)
publishToKafka(responseTopic, key, inBoundStr)
}
}
}