diff --git a/src/main/scala/code/bankconnectors/Connector.scala b/src/main/scala/code/bankconnectors/Connector.scala index 02b661d10..8e3c18533 100644 --- a/src/main/scala/code/bankconnectors/Connector.scala +++ b/src/main/scala/code/bankconnectors/Connector.scala @@ -43,8 +43,14 @@ Could consider a Map of ("resourceType" -> "provider") - this could tell us whic initialise MongoDB etc. resourceType might be sub devided to allow for different account types coming from different internal APIs, MQs. */ -object Connector extends SimpleInjector { +object Connector extends SimpleInjector { + import scala.reflect.runtime.universe._ + def getObjectInstance(clsName: String):Connector = { + val mirror = runtimeMirror(getClass.getClassLoader) + val module = mirror.staticModule(clsName) + mirror.reflectModule(module).instance.asInstanceOf[Connector] + } val connector = new Inject(buildOne _) {} @@ -54,8 +60,9 @@ object Connector extends SimpleInjector { connectorProps match { case "mapped" => LocalMappedConnector case "mongodb" => LocalConnector - case "kafka" => KafkaMappedConnector case "obpjvm" => ObpJvmMappedConnector + case "kafka" => KafkaMappedConnector + case matchKafkaVersion(version) => getObjectInstance(s"""code.bankconnectors.KafkaMappedConnector_v${version}""") } } diff --git a/src/main/scala/code/bankconnectors/KafkaMappedConnector_vMar2017.scala b/src/main/scala/code/bankconnectors/KafkaMappedConnector_vMar2017.scala new file mode 100644 index 000000000..1ec8f9231 --- /dev/null +++ b/src/main/scala/code/bankconnectors/KafkaMappedConnector_vMar2017.scala @@ -0,0 +1,1524 @@ +package code.bankconnectors + +/* +Open Bank Project - API +Copyright (C) 2011-2016, TESOBE Ltd + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see http://www.gnu.org/licenses/. + +Email: contact@tesobe.com +TESOBE Ltd +Osloerstrasse 16/17 +Berlin 13359, Germany +*/ + +import java.text.SimpleDateFormat +import java.util.{Date, Locale, UUID} + +import code.accountholder.AccountHolders +import code.api.util.ErrorMessages +import code.api.v2_1_0.{BranchJsonPost, BranchJsonPut, TransactionRequestCommonBodyJSON} +import code.branches.Branches.{Branch, BranchId} +import code.branches.MappedBranch +import code.fx.{FXRate, fx} +import code.management.ImporterAPI.ImporterTransaction +import code.metadata.comments.{Comments, MappedComment} +import code.metadata.counterparties.{Counterparties, CounterpartyTrait, MappedCounterparty} +import code.metadata.narrative.MappedNarrative +import code.metadata.tags.{MappedTag, Tags} +import code.metadata.transactionimages.{MappedTransactionImage, TransactionImages} +import code.metadata.wheretags.{MappedWhereTag, WhereTags} +import code.model._ +import code.model.dataAccess._ +import code.products.Products.ProductCode +import code.transaction.MappedTransaction +import code.transactionrequests.{MappedTransactionRequest, TransactionRequestTypeCharge} +import code.transactionrequests.TransactionRequests._ +import code.util.{Helper, TTLCache} +import code.views.Views +import net.liftweb.json +import net.liftweb.mapper._ +import net.liftweb.util.Helpers._ +import net.liftweb.util.Props +import net.liftweb.json._ +import net.liftweb.common._ +import code.products.MappedProduct +import code.products.Products.{Product, ProductCode} +import code.products.MappedProduct +import code.products.Products.{Product, ProductCode} +import code.users.Users + +object KafkaMappedConnector_vMar2017 extends Connector with Loggable { + + var producer = new KafkaProducer() + var consumer = new KafkaConsumer() + type AccountType = KafkaBankAccount + + // Local TTL Cache + val cacheTTL = Props.get("connector.cache.ttl.seconds", "0").toInt + val cachedUser = TTLCache[KafkaInboundValidatedUser](cacheTTL) + val cachedBank = TTLCache[KafkaInboundBank](cacheTTL) + val cachedAccount = TTLCache[KafkaInboundAccount](cacheTTL) + val cachedBanks = TTLCache[List[KafkaInboundBank]](cacheTTL) + val cachedAccounts = TTLCache[List[KafkaInboundAccount]](cacheTTL) + val cachedPublicAccounts = TTLCache[List[KafkaInboundAccount]](cacheTTL) + val cachedUserAccounts = TTLCache[List[KafkaInboundAccount]](cacheTTL) + val cachedFxRate = TTLCache[KafkaInboundFXRate](cacheTTL) + val cachedCounterparty = TTLCache[KafkaInboundCounterparty](cacheTTL) + val cachedTransactionRequestTypeCharge = TTLCache[KafkaInboundTransactionRequestTypeCharge](cacheTTL) + + + // + // "Versioning" of the messages sent by this or similar connector might work like this: + // Use Case Classes (e.g. KafkaInbound... KafkaOutbound... as below to describe the message structures. + // Probably should be in a separate file e.g. Nov2016_messages.scala + // Once the message format is STABLE, freeze the key/value pair names there. For now, new keys may be added but none modified. + // If we want to add a new message format, create a new file e.g. March2017_messages.scala + // Then add a suffix to the connector value i.e. instead of kafka we might have kafka_march_2017. + // Then in this file, populate the different case classes depending on the connector name and send to Kafka + // + + val formatVersion: String = "Mar2017" + + implicit val formats = net.liftweb.json.DefaultFormats + + + // TODO Create and use a case class for each Map so we can document each structure. + + + def getUser( username: String, password: String ): Box[InboundUser] = { + for { + req <- tryo {Map[String, String]( + "action" -> "obp.getUser", + "version" -> formatVersion, // rename version to messageFormat or maybe connector (see above) + "username" -> username, + "password" -> password + )} + u <- tryo{cachedUser.getOrElseUpdate( req.toString, () => process(req).extract[KafkaInboundValidatedUser])} + recUsername <- tryo{u.displayName} + } yield { + if (username == u.displayName) new InboundUser( recUsername, password, recUsername) + else null + } + } + + def updateUserAccountViews( user: ResourceUser ) = { + val accounts: List[KafkaInboundAccount] = getBanks.flatMap { bank => { + val bankId = bank.bankId.value + logger.info(s"ObpJvm updateUserAccountViews for user.email ${user.email} user.name ${user.name} at bank ${bankId}") + for { + username <- tryo {user.name} + req <- tryo { Map[String, String]( + "action" -> "obp.getAccounts", + "version" -> formatVersion, + "username" -> user.name, + "userId" -> user.userId, + "bankId" -> bankId)} + // Generate random uuid to be used as request-response match id + } yield { + cachedUserAccounts.getOrElseUpdate(req.toString, () => process(req).extract[List[KafkaInboundAccount]]) + } + } + }.flatten + + val views = for { + acc <- accounts + username <- tryo {user.name} + views <- tryo {createViews( BankId(acc.bankId), + AccountId(acc.accountId), + acc.owners.contains(username), + acc.generate_public_view, + acc.generate_accountants_view, + acc.generate_auditors_view + )} + existing_views <- tryo {Views.views.vend.views(BankAccountUID(BankId(acc.bankId), AccountId(acc.accountId)))} + } yield { + setAccountOwner(username, BankId(acc.bankId), AccountId(acc.accountId), acc.owners) + views.foreach(v => { + Views.views.vend.addPermission(v.uid, user) + logger.info(s"------------> updated view ${v.uid} for resourceuser ${user} and account ${acc}") + }) + existing_views.filterNot(_.users.contains(user.resourceUserId)).foreach (v => { + Views.views.vend.addPermission(v.uid, user) + logger.info(s"------------> added resourceuser ${user} to view ${v.uid} for account ${acc}") + }) + } + } + + + //gets banks handled by this connector + override def getBanks: List[Bank] = { + val req = Map( + "action" -> "obp.getBanks", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername + ) + + logger.debug(s"Kafka getBanks says: req is: $req") + + logger.debug(s"Kafka getBanks before cachedBanks.getOrElseUpdate") + val rList = { + cachedBanks.getOrElseUpdate( req.toString, () => process(req).extract[List[KafkaInboundBank]]) + } + + logger.debug(s"Kafka getBanks says rList is $rList") + + // Loop through list of responses and create entry for each + val res = { for ( r <- rList ) yield { + new KafkaBank(r) + } + } + // Return list of results + + logger.debug(s"Kafka getBanks says res is $res") + res + } + + // Gets current challenge level for transaction request + override def getChallengeThreshold(bankId: String, accountId: String, viewId: String, transactionRequestType: String, currency: String, userId: String, userName: String): AmountOfMoney = { + // Create argument list + val req = Map( + "action" -> "obp.getChallengeThreshold", + "version" -> formatVersion, + "bankId" -> bankId, + "accountId" -> accountId, + "viewId" -> viewId, + "transactionRequestType" -> transactionRequestType, + "currency" -> currency, + "userId" -> userId, + "username" -> userName + ) + val r: Option[KafkaInboundChallengeLevel] = process(req).extractOpt[KafkaInboundChallengeLevel] + // Return result + r match { + // Check does the response data match the requested data + case Some(x) => AmountOfMoney(x.currency, x.limit) + case _ => { + val limit = BigDecimal("0") + val rate = fx.exchangeRate ("EUR", currency) + val convertedLimit = fx.convert(limit, rate) + AmountOfMoney(currency,convertedLimit.toString()) + } + } + } + + override def getChargeLevel(bankId: BankId, + accountId: AccountId, + viewId: ViewId, + userId: String, + userName: String, + transactionRequestType: String, + currency: String): Box[AmountOfMoney] = { + // Create argument list + val req = Map( + "action" -> "obp.getChargeLevel", + "version" -> formatVersion, + "bankId" -> bankId.value, + "accountId" -> accountId.value, + "viewId" -> viewId.value, + "transactionRequestType" -> transactionRequestType, + "currency" -> currency, + "userId" -> userId, + "username" -> userName + ) + val r: Option[KafkaInboundChargeLevel] = process(req).extractOpt[KafkaInboundChargeLevel] + // Return result + val chargeValue = r match { + // Check does the response data match the requested data + case Some(x) => AmountOfMoney(x.currency, x.amount) + case _ => { + AmountOfMoney("EUR", "0.0001") + } + } + Full(chargeValue) + } + + override def createChallenge(bankId: BankId, accountId: AccountId, userId: String, transactionRequestType: TransactionRequestType, transactionRequestId: String) : Box[String] = { + // Create argument list + val req = Map( + "action" -> "obp.createChallenge", + "version" -> formatVersion, + "bankId" -> bankId.value, + "accountId" -> accountId.value, + "userId" -> userId, + "username" -> AuthUser.getCurrentUserUsername, + "transactionRequestType" -> transactionRequestType.value, + "transactionRequestId" -> transactionRequestId + ) + val r: Option[KafkaInboundCreateChallange] = process(req).extractOpt[KafkaInboundCreateChallange] + // Return result + r match { + // Check does the response data match the requested data + case Some(x) => Full(x.challengeId) + case _ => Empty + } + } + + override def validateChallengeAnswer(challengeId: String, hashOfSuppliedAnswer: String) : Box[Boolean] = { + // Create argument list + val req = Map( + "action" -> "obp.validateChallengeAnswer", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "challengeId" -> challengeId, + "hashOfSuppliedAnswer" -> hashOfSuppliedAnswer + ) + val r: Option[KafkaInboundValidateChallangeAnswer] = process(req).extractOpt[KafkaInboundValidateChallangeAnswer] + // Return result + r match { + // Check does the response data match the requested data + case Some(x) => Full(x.answer.toBoolean) + case _ => Empty + } + } + + // Gets bank identified by bankId + override def getBank(id: BankId): Box[Bank] = { + // Create argument list + val req = Map( + "action" -> "obp.getBank", + "version" -> formatVersion, + "bankId" -> id.toString, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername + ) + val r = { + cachedBank.getOrElseUpdate( req.toString, () => process(req).extract[KafkaInboundBank]) + } + // Return result + Full(new KafkaBank(r)) + } + + // Gets transaction identified by bankid, accountid and transactionId + def getTransaction(bankId: BankId, accountId: AccountId, transactionId: TransactionId): Box[Transaction] = { + val req = Map( + "action" -> "obp.getTransaction", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "bankId" -> bankId.toString, + "accountId" -> accountId.toString, + "transactionId" -> transactionId.toString + ) + // Since result is single account, we need only first list entry + val r = process(req).extractOpt[KafkaInboundTransaction] + r match { + // Check does the response data match the requested data + case Some(x) if transactionId.value != x.transactionId => Failure(ErrorMessages.InvalidGetTransactionConnectorResponse, Empty, Empty) + case Some(x) if transactionId.value == x.transactionId => createNewTransaction(x) + case _ => Failure(ErrorMessages.ConnectorEmptyResponse, Empty, Empty) + } + + } + + override def getTransactions(bankId: BankId, accountId: AccountId, queryParams: OBPQueryParam*): Box[List[Transaction]] = { + val limit = queryParams.collect { case OBPLimit(value) => MaxRows[MappedTransaction](value) }.headOption + val offset = queryParams.collect { case OBPOffset(value) => StartAt[MappedTransaction](value) }.headOption + val fromDate = queryParams.collect { case OBPFromDate(date) => By_>=(MappedTransaction.tFinishDate, date) }.headOption + val toDate = queryParams.collect { case OBPToDate(date) => By_<=(MappedTransaction.tFinishDate, date) }.headOption + val ordering = queryParams.collect { + //we don't care about the intended sort field and only sort on finish date for now + case OBPOrdering(_, direction) => + direction match { + case OBPAscending => OrderBy(MappedTransaction.tFinishDate, Ascending) + case OBPDescending => OrderBy(MappedTransaction.tFinishDate, Descending) + } + } + val optionalParams : Seq[QueryParam[MappedTransaction]] = Seq(limit.toSeq, offset.toSeq, fromDate.toSeq, toDate.toSeq, ordering.toSeq).flatten + val mapperParams = Seq(By(MappedTransaction.bank, bankId.value), By(MappedTransaction.account, accountId.value)) ++ optionalParams + + val req = Map( + "action" -> "obp.getTransactions", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "bankId" -> bankId.toString, + "accountId" -> accountId.toString, + "queryParams" -> queryParams.toString + ) + implicit val formats = net.liftweb.json.DefaultFormats + val rList = process(req).extract[List[KafkaInboundTransaction]] + // Check does the response data match the requested data + val isCorrect = rList.forall(x=>x.accountId == accountId.value && x.bankId == bankId.value) + if (!isCorrect) throw new Exception(ErrorMessages.InvalidGetTransactionsConnectorResponse) + // Populate fields and generate result + val res = for { + r <- rList + transaction <- createNewTransaction(r) + } yield { + transaction + } + Full(res) + //TODO is this needed updateAccountTransactions(bankId, accountId) + } + + override def getBankAccount(bankId: BankId, accountId: AccountId): Box[KafkaBankAccount] = { + // Generate random uuid to be used as request-response match id + val req = Map( + "action" -> "obp.getBankAccount", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "bankId" -> bankId.toString, + "accountId" -> accountId.value + ) + // Since result is single account, we need only first list entry + implicit val formats = net.liftweb.json.DefaultFormats + val r = { + cachedAccount.getOrElseUpdate( req.toString, () => process(req).extract[KafkaInboundAccount]) + } + // Check does the response data match the requested data + val accResp = List((BankId(r.bankId), AccountId(r.accountId))).toSet + val acc = List((bankId, accountId)).toSet + if ((accResp diff acc).size > 0) throw new Exception(ErrorMessages.InvalidGetBankAccountConnectorResponse) + + createMappedAccountDataIfNotExisting(r.bankId, r.accountId, r.label) + + Full(new KafkaBankAccount(r)) + } + + override def getBankAccounts(accts: List[(BankId, AccountId)]): List[KafkaBankAccount] = { + val primaryUserIdentifier = AuthUser.getCurrentUserUsername + + val r:List[KafkaInboundAccount] = accts.flatMap { a => { + + logger.info (s"KafkaMappedConnnector.getBankAccounts with params ${a._1.value} and ${a._2.value} and primaryUserIdentifier is $primaryUserIdentifier") + + val req = Map( + "action" -> "obp.getBankAccounts", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "bankId" -> a._1.value, + "accountId" -> a._2.value + ) + implicit val formats = net.liftweb.json.DefaultFormats + val r = { + cachedAccounts.getOrElseUpdate( req.toString, () => process(req).extract[List[KafkaInboundAccount]]) + } + r + } + } + + + // Check does the response data match the requested data + val accRes = for(row <- r) yield { + (BankId(row.bankId), AccountId(row.accountId)) + } + if ((accRes.toSet diff accts.toSet).size > 0) throw new Exception(ErrorMessages.InvalidGetBankAccountsConnectorResponse) + + r.map { t => + createMappedAccountDataIfNotExisting(t.bankId, t.accountId, t.label) + new KafkaBankAccount(t) } + } + + private def getAccountByNumber(bankId : BankId, number : String) : Box[AccountType] = { + // Generate random uuid to be used as request-respose match id + val req = Map( + "action" -> "obp.getBankAccount", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "bankId" -> bankId.toString, + "number" -> number + ) + // Since result is single account, we need only first list entry + implicit val formats = net.liftweb.json.DefaultFormats + val r = { + cachedAccount.getOrElseUpdate( req.toString, () => process(req).extract[KafkaInboundAccount]) + } + createMappedAccountDataIfNotExisting(r.bankId, r.accountId, r.label) + Full(new KafkaBankAccount(r)) + } + + def getCounterpartyFromTransaction(thisBankId : BankId, thisAccountId : AccountId, metadata : CounterpartyMetadata) : Box[Counterparty] = { + //because we don't have a db backed model for OtherBankAccounts, we need to construct it from an + //OtherBankAccountMetadata and a transaction + val t = getTransactions(thisBankId, thisAccountId).map { t => + t.filter { e => + if (e.otherAccount.thisAccountId == metadata.getAccountNumber) + true + else + false + } + }.get.head + + val res = new Counterparty( + //counterparty id is defined to be the id of its metadata as we don't actually have an id for the counterparty itself + counterPartyId = metadata.metadataId, + label = metadata.getHolder, + nationalIdentifier = t.otherAccount.nationalIdentifier, + otherBankRoutingAddress = None, + otherAccountRoutingAddress = t.otherAccount.otherAccountRoutingAddress, + thisAccountId = AccountId(metadata.getAccountNumber), + thisBankId = t.otherAccount.thisBankId, + kind = t.otherAccount.kind, + otherBankId = thisBankId, + otherAccountId = thisAccountId, + alreadyFoundMetadata = Some(metadata), + name = "", + otherBankRoutingScheme = "", + otherAccountRoutingScheme="", + otherAccountProvider = "", + isBeneficiary = true + ) + Full(res) + } + + /** + * + * refreshes transactions via hbci if the transaction info is sourced from hbci + * + * Checks if the last update of the account was made more than one hour ago. + * if it is the case we put a message in the message queue to ask for + * transactions updates + * + * It will be used each time we fetch transactions from the DB. But the test + * is performed in a different thread. + */ + /* + private def updateAccountTransactions(bankId : BankId, accountId : AccountId) = { + + for { + bank <- getBank(bankId) + account <- getBankAccountType(bankId, accountId) + } { + spawn{ + val useMessageQueue = Props.getBool("messageQueue.updateBankAccountsTransaction", false) + val outDatedTransactions = Box!!account.lastUpdate match { + case Full(l) => now after time(l.getTime + hours(Props.getInt("messageQueue.updateTransactionsInterval", 1))) + case _ => true + } + //if(outDatedTransactions && useMessageQueue) { + // UpdatesRequestSender.sendMsg(UpdateBankAccount(account.number, bank.national_identifier.get)) + //} + } + } + } + */ + + + // Get all counterparties related to an account + override def getCounterpartiesFromTransaction(bankId: BankId, accountId: AccountId): List[Counterparty] = + Counterparties.counterparties.vend.getMetadatas(bankId, accountId).flatMap(getCounterpartyFromTransaction(bankId, accountId, _)) + + // Get one counterparty related to a bank account + override def getCounterpartyFromTransaction(bankId: BankId, accountId: AccountId, counterpartyID: String): Box[Counterparty] = + // Get the metadata and pass it to getOtherBankAccount to construct the other account. + Counterparties.counterparties.vend.getMetadata(bankId, accountId, counterpartyID).flatMap(getCounterpartyFromTransaction(bankId, accountId, _)) + + def getCounterparty(thisBankId: BankId, thisAccountId: AccountId, couterpartyId: String): Box[Counterparty] = { + //note: kafka mode just used the mapper data + LocalMappedConnector.getCounterparty(thisBankId, thisAccountId, couterpartyId) + } + + // Get one counterparty by the Counterparty Id + override def getCounterpartyByCounterpartyId(counterpartyId: CounterpartyId): Box[CounterpartyTrait] = { + + if (Props.getBool("get_counterparties_from_OBP_DB", true)) { + MappedCounterparty.find(By(MappedCounterparty.mCounterPartyId, counterpartyId.value)) + } else { + val req = Map( + "action" -> "obp.getCounterpartyByCounterpartyId", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "action" -> "obp.getCounterpartyByCounterpartyId", + "counterpartyId" -> counterpartyId.toString + ) + // Since result is single account, we need only first list entry + implicit val formats = net.liftweb.json.DefaultFormats + val r = { + cachedCounterparty.getOrElseUpdate( req.toString, () => process(req).extract[KafkaInboundCounterparty]) + } + Full(new KafkaCounterparty(r)) + } + } + + + + + + override def getCounterpartyByIban(iban: String): Box[CounterpartyTrait] = { + + if (Props.getBool("get_counterparties_from_OBP_DB", true)) { + MappedCounterparty.find( + By(MappedCounterparty.mOtherAccountRoutingAddress, iban), + By(MappedCounterparty.mOtherAccountRoutingScheme, "IBAN") + ) + } else { + val req = Map( + "action" -> "obp.getCounterpartyByIban", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "action" -> "obp.getCounterpartyByIban", + "otherAccountRoutingAddress" -> iban, + "otherAccountRoutingScheme" -> "IBAN" + ) + + val r = process(req).extract[KafkaInboundCounterparty] + + Full(new KafkaCounterparty(r)) + } + } + + override def getCounterparties(thisBankId: BankId, thisAccountId: AccountId,viewId :ViewId): Box[List[CounterpartyTrait]] = { + //note: kafka mode just used the mapper data + LocalMappedConnector.getCounterparties(thisBankId, thisAccountId, viewId) + } + override def getPhysicalCards(user: User): List[PhysicalCard] = + List() + + override def getPhysicalCardsForBank(bank: Bank, user: User): List[PhysicalCard] = + List() + + def AddPhysicalCard(bankCardNumber: String, + nameOnCard: String, + issueNumber: String, + serialNumber: String, + validFrom: Date, + expires: Date, + enabled: Boolean, + cancelled: Boolean, + onHotList: Boolean, + technology: String, + networks: List[String], + allows: List[String], + accountId: String, + bankId: String, + replacement: Option[CardReplacementInfo], + pinResets: List[PinResetInfo], + collected: Option[CardCollectionInfo], + posted: Option[CardPostedInfo] + ) : Box[PhysicalCard] = { + Empty + } + + + protected override def makePaymentImpl(fromAccount: KafkaBankAccount, + toAccount: KafkaBankAccount, + toCounterparty: CounterpartyTrait, + amt: BigDecimal, + description: String, + transactionRequestType: TransactionRequestType, + chargePolicy: String): Box[TransactionId] = { + + val sentTransactionId = saveTransaction(fromAccount, + toAccount, + toCounterparty, + -amt, + description, + transactionRequestType, + chargePolicy) + + sentTransactionId + } + + + /** + * Saves a transaction with amount @amount and counterparty @counterparty for account @account. Returns the id + * of the saved transaction. + */ + private def saveTransaction(fromAccount: KafkaBankAccount, + toAccount: KafkaBankAccount, + toCounterparty: CounterpartyTrait, + amount: BigDecimal, + description: String, + transactionRequestType: TransactionRequestType, + chargePolicy: String) = { + + val transactionTime = now + val currency = fromAccount.currency + + //update the balance of the account for which a transaction is being created + //val newAccountBalance : Long = account.balance.toLong + Helper.convertToSmallestCurrencyUnits(amount, account.currency) + //account.balance = newAccountBalance + + val req: Map[String, String] = Map( + "action" -> "obp.putTransaction", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "description" -> description, + "transactionRequestType" -> transactionRequestType.value, + "toCurrency" -> currency, //Now, http request currency must equal fromAccount.currency + "toAmount" -> amount.toString, + "chargePolicy" -> chargePolicy, + //fromAccount + "fromBankId" -> fromAccount.bankId.value, + "fromAccountId" -> fromAccount.accountId.value, + //toAccount + "toBankId" -> toAccount.bankId.value, + "toAccountId" -> toAccount.accountId.value, + //toCounterty + "toCounterpartyId" -> toCounterparty.counterpartyId, + "toCounterpartyOtherBankRoutingAddress" -> toCounterparty.otherBankRoutingAddress, + "toCounterpartyOtherAccountRoutingAddress" -> toCounterparty.otherAccountRoutingAddress, + "toCounterpartyOtherAccountRoutingScheme" -> toCounterparty.otherAccountRoutingScheme, + "toCounterpartyOtherBankRoutingScheme" -> toCounterparty.otherBankRoutingScheme, + "type" -> "AC") + + + // Since result is single account, we need only first list entry + val r = process(req) + + r.extract[KafkaInboundTransactionId] match { + case r: KafkaInboundTransactionId => Full(TransactionId(r.transactionId)) + case _ => Full(TransactionId("0")) + } + + } + + /* + Transaction Requests + */ + override def getTransactionRequestStatusesImpl() : Box[Map[String, String]] = { + val req : Map[String,String] = Map( + "action" -> "obp.fetch", + "version" -> formatVersion + ) + + val r = process(req) + try { + Full(r.extract[Map[String, String]]) + } catch { + case mpex: net.liftweb.json.MappingException => Empty + } + + //try { + // r.extract[KafkaInboundTransactionRequestStatus] match { + // case status: KafkaInboundTransactionRequestStatus => Full(status.transactionRequestId, status.bulkTransactionsStatus.map( x => TransactionStatus(x.transactionId, x.transactionStatus, x.transactionTimestamp)))) + // case _ => Empty + // } + //} catch { + // case mpex: net.liftweb.json.MappingException => Empty + //} + } + + override def createTransactionRequestImpl(transactionRequestId: TransactionRequestId, transactionRequestType: TransactionRequestType, + account : BankAccount, counterparty : BankAccount, body: TransactionRequestBody, + status: String, charge: TransactionRequestCharge) : Box[TransactionRequest] = { + val mappedTransactionRequest = MappedTransactionRequest.create + .mTransactionRequestId(transactionRequestId.value) + .mType(transactionRequestType.value) + .mFrom_BankId(account.bankId.value) + .mFrom_AccountId(account.accountId.value) + .mTo_BankId(counterparty.bankId.value) + .mTo_AccountId(counterparty.accountId.value) + .mBody_Value_Currency(body.value.currency) + .mBody_Value_Amount(body.value.amount) + .mBody_Description(body.description) + .mStatus(status) + .mStartDate(now) + .mEndDate(now).saveMe + Full(mappedTransactionRequest).flatMap(_.toTransactionRequest) + } + + + //Note: now call the local mapper to store data + protected override def createTransactionRequestImpl210(transactionRequestId: TransactionRequestId, + transactionRequestType: TransactionRequestType, + fromAccount: BankAccount, + toAccount: BankAccount, + toCounterparty: CounterpartyTrait, + transactionRequestCommonBody: TransactionRequestCommonBodyJSON, + details: String, status: String, + charge: TransactionRequestCharge, + chargePolicy: String): Box[TransactionRequest] = { + + LocalMappedConnector.createTransactionRequestImpl210(transactionRequestId: TransactionRequestId, + transactionRequestType: TransactionRequestType, + fromAccount: BankAccount, toAccount: BankAccount, + toCounterparty: CounterpartyTrait, + transactionRequestCommonBody: TransactionRequestCommonBodyJSON, + details: String, + status: String, + charge: TransactionRequestCharge, + chargePolicy: String) + } + //Note: now call the local mapper to store data + override def saveTransactionRequestTransactionImpl(transactionRequestId: TransactionRequestId, transactionId: TransactionId): Box[Boolean] = { + LocalMappedConnector.saveTransactionRequestTransactionImpl(transactionRequestId: TransactionRequestId, transactionId: TransactionId) + } + + override def saveTransactionRequestChallengeImpl(transactionRequestId: TransactionRequestId, challenge: TransactionRequestChallenge): Box[Boolean] = { + val mappedTransactionRequest = MappedTransactionRequest.find(By(MappedTransactionRequest.mTransactionRequestId, transactionRequestId.value)) + mappedTransactionRequest match { + case Full(tr: MappedTransactionRequest) => Full{ + tr.mChallenge_Id(challenge.id) + tr.mChallenge_AllowedAttempts(challenge.allowed_attempts) + tr.mChallenge_ChallengeType(challenge.challenge_type).save + } + case _ => Failure(s"Couldn't find transaction request ${transactionRequestId} to set transactionId") + } + } + + override def saveTransactionRequestStatusImpl(transactionRequestId: TransactionRequestId, status: String): Box[Boolean] = { + val mappedTransactionRequest = MappedTransactionRequest.find(By(MappedTransactionRequest.mTransactionRequestId, transactionRequestId.value)) + mappedTransactionRequest match { + case Full(tr: MappedTransactionRequest) => Full(tr.mStatus(status).save) + case _ => Failure(s"Couldn't find transaction request ${transactionRequestId} to set status") + } + } + + + override def getTransactionRequestsImpl(fromAccount : BankAccount) : Box[List[TransactionRequest]] = { + val transactionRequests = MappedTransactionRequest.findAll(By(MappedTransactionRequest.mFrom_AccountId, fromAccount.accountId.value), + By(MappedTransactionRequest.mFrom_BankId, fromAccount.bankId.value)) + + Full(transactionRequests.flatMap(_.toTransactionRequest)) + } + + override def getTransactionRequestsImpl210(fromAccount : BankAccount) : Box[List[TransactionRequest]] = { + val transactionRequests = MappedTransactionRequest.findAll(By(MappedTransactionRequest.mFrom_AccountId, fromAccount.accountId.value), + By(MappedTransactionRequest.mFrom_BankId, fromAccount.bankId.value)) + + Full(transactionRequests.flatMap(_.toTransactionRequest)) + } + + override def getTransactionRequestImpl(transactionRequestId: TransactionRequestId): Box[TransactionRequest] = { + val transactionRequest = MappedTransactionRequest.find(By(MappedTransactionRequest.mTransactionRequestId, transactionRequestId.value)) + transactionRequest.flatMap(_.toTransactionRequest) + } + + + override def getTransactionRequestTypesImpl(fromAccount: BankAccount): Box[List[TransactionRequestType]] = { + val validTransactionRequestTypes = Props.get("transactionRequests_supported_types", "").split(",").map(x => TransactionRequestType(x)).toList + Full(validTransactionRequestTypes) + } + + /* + Bank account creation + */ + + //creates a bank account (if it doesn't exist) and creates a bank (if it doesn't exist) + //again assume national identifier is unique + override def createBankAndAccount(bankName: String, bankNationalIdentifier: String, accountNumber: String, + accountType: String, accountLabel: String, currency: String, accountHolderName: String): (Bank, BankAccount) = { + //don't require and exact match on the name, just the identifier + val bank: Bank = MappedBank.find(By(MappedBank.national_identifier, bankNationalIdentifier)) match { + case Full(b) => + logger.info(s"bank with id ${b.bankId} and national identifier ${b.nationalIdentifier} found") + b + case _ => + logger.info(s"creating bank with national identifier $bankNationalIdentifier") + //TODO: need to handle the case where generatePermalink returns a permalink that is already used for another bank + MappedBank.create + .permalink(Helper.generatePermalink(bankName)) + .fullBankName(bankName) + .shortBankName(bankName) + .national_identifier(bankNationalIdentifier) + .saveMe() + } + + //TODO: pass in currency as a parameter? + val account = createAccountIfNotExisting( + bank.bankId, + AccountId(UUID.randomUUID().toString), + accountNumber, + accountType, + accountLabel, + currency, + 0L, + accountHolderName + ) + + (bank, account) + } + + //for sandbox use -> allows us to check if we can generate a new test account with the given number + override def accountExists(bankId: BankId, accountNumber: String): Boolean = { + getAccountByNumber(bankId, accountNumber) != null + } + + //remove an account and associated transactions + override def removeAccount(bankId: BankId, accountId: AccountId) : Boolean = { + //delete comments on transactions of this account + val commentsDeleted = Comments.comments.vend.bulkDeleteComments(bankId, accountId) + + //delete narratives on transactions of this account + val narrativesDeleted = MappedNarrative.bulkDelete_!!( + By(MappedNarrative.bank, bankId.value), + By(MappedNarrative.account, accountId.value) + ) + + //delete narratives on transactions of this account + val tagsDeleted = Tags.tags.vend.bulkDeleteTags(bankId, accountId) + + //delete WhereTags on transactions of this account + val whereTagsDeleted = WhereTags.whereTags.vend.bulkDeleteWhereTags(bankId, accountId) + + //delete transaction images on transactions of this account + val transactionImagesDeleted = TransactionImages.transactionImages.vend.bulkDeleteTransactionImage(bankId, accountId) + + //delete transactions of account + val transactionsDeleted = MappedTransaction.bulkDelete_!!( + By(MappedTransaction.bank, bankId.value), + By(MappedTransaction.account, accountId.value) + ) + + //remove view privileges + val privilegesDeleted = Views.views.vend.removeAllPermissions(bankId, accountId) + + //delete views of account + val viewsDeleted = Views.views.vend.removeAllViews(bankId, accountId) + + //delete account + val account = getBankAccount(bankId, accountId) + + val accountDeleted = account match { + case acc => true //acc.delete_! //TODO + case _ => false + } + + commentsDeleted && narrativesDeleted && tagsDeleted && whereTagsDeleted && transactionImagesDeleted && + transactionsDeleted && privilegesDeleted && viewsDeleted && accountDeleted +} + + //creates a bank account for an existing bank, with the appropriate values set. Can fail if the bank doesn't exist + override def createSandboxBankAccount(bankId: BankId, accountId: AccountId, accountNumber: String, + accountType: String, accountLabel: String, currency: String, + initialBalance: BigDecimal, accountHolderName: String): Box[BankAccount] = { + + for { + bank <- getBank(bankId) //bank is not really used, but doing this will ensure account creations fails if the bank doesn't + } yield { + + val balanceInSmallestCurrencyUnits = Helper.convertToSmallestCurrencyUnits(initialBalance, currency) + createAccountIfNotExisting(bankId, accountId, accountNumber, accountType, accountLabel, currency, balanceInSmallestCurrencyUnits, accountHolderName) + } + + } + + //sets a user as an account owner/holder + override def setAccountHolder(bankAccountUID: BankAccountUID, user: User): Unit = { + AccountHolders.accountHolders.vend.createAccountHolder(user.resourceUserId.value, bankAccountUID.accountId.value, bankAccountUID.bankId.value) + } + + private def createAccountIfNotExisting(bankId: BankId, accountId: AccountId, accountNumber: String, + accountType: String, accountLabel: String, currency: String, + balanceInSmallestCurrencyUnits: Long, accountHolderName: String) : BankAccount = { + getBankAccount(bankId, accountId) match { + case Full(a) => + logger.info(s"account with id $accountId at bank with id $bankId already exists. No need to create a new one.") + a + case _ => null //TODO + /* + new KafkaBankAccount + .bank(bankId.value) + .theAccountId(accountId.value) + .accountNumber(accountNumber) + .accountType(accountType) + .accountLabel(accountLabel) + .accountCurrency(currency) + .accountBalance(balanceInSmallestCurrencyUnits) + .holder(accountHolderName) + .saveMe() + */ + } + } + + private def createMappedAccountDataIfNotExisting(bankId: String, accountId: String, label: String) : Boolean = { + MappedBankAccountData.find(By(MappedBankAccountData.accountId, accountId), + By(MappedBankAccountData.bankId, bankId)) match { + case Empty => + val data = new MappedBankAccountData + data.setAccountId(accountId) + data.setBankId(bankId) + data.setLabel(label) + data.save() + true + case _ => + logger.info(s"account data with id $accountId at bank with id $bankId already exists. No need to create a new one.") + false + } + } + + /* + End of bank account creation + */ + + + /* + Transaction importer api + */ + + //used by the transaction import api + override def updateAccountBalance(bankId: BankId, accountId: AccountId, newBalance: BigDecimal): Boolean = { + + //this will be Full(true) if everything went well + val result = for { + acc <- getBankAccount(bankId, accountId) + bank <- getBank(bankId) + } yield { + //acc.balance = newBalance + setBankAccountLastUpdated(bank.nationalIdentifier, acc.number, now) + } + + result.getOrElse(false) + } + + //transaction import api uses bank national identifiers to uniquely indentify banks, + //which is unfortunate as theoretically the national identifier is unique to a bank within + //one country + private def getBankByNationalIdentifier(nationalIdentifier : String) : Box[Bank] = { + MappedBank.find(By(MappedBank.national_identifier, nationalIdentifier)) + } + + + private val bigDecimalFailureHandler : PartialFunction[Throwable, Unit] = { + case ex : NumberFormatException => { + logger.warn(s"could not convert amount to a BigDecimal: $ex") + } + } + + //used by transaction import api call to check for duplicates + override def getMatchingTransactionCount(bankNationalIdentifier : String, accountNumber : String, amount: String, completed: Date, otherAccountHolder: String): Int = { + //we need to convert from the legacy bankNationalIdentifier to BankId, and from the legacy accountNumber to AccountId + val count = for { + bankId <- getBankByNationalIdentifier(bankNationalIdentifier).map(_.bankId) + account <- getAccountByNumber(bankId, accountNumber) + amountAsBigDecimal <- tryo(bigDecimalFailureHandler)(BigDecimal(amount)) + } yield { + + val amountInSmallestCurrencyUnits = + Helper.convertToSmallestCurrencyUnits(amountAsBigDecimal, account.currency) + + MappedTransaction.count( + By(MappedTransaction.bank, bankId.value), + By(MappedTransaction.account, account.accountId.value), + By(MappedTransaction.amount, amountInSmallestCurrencyUnits), + By(MappedTransaction.tFinishDate, completed), + By(MappedTransaction.counterpartyAccountHolder, otherAccountHolder)) + } + + //icky + count.map(_.toInt) getOrElse 0 + } + + //used by transaction import api + override def createImportedTransaction(transaction: ImporterTransaction): Box[Transaction] = { + //we need to convert from the legacy bankNationalIdentifier to BankId, and from the legacy accountNumber to AccountId + val obpTransaction = transaction.obp_transaction + val thisAccount = obpTransaction.this_account + val nationalIdentifier = thisAccount.bank.national_identifier + val accountNumber = thisAccount.number + for { + bank <- getBankByNationalIdentifier(transaction.obp_transaction.this_account.bank.national_identifier) ?~! + s"No bank found with national identifier $nationalIdentifier" + bankId = bank.bankId + account <- getAccountByNumber(bankId, accountNumber) + details = obpTransaction.details + amountAsBigDecimal <- tryo(bigDecimalFailureHandler)(BigDecimal(details.value.amount)) + newBalanceAsBigDecimal <- tryo(bigDecimalFailureHandler)(BigDecimal(details.new_balance.amount)) + amountInSmallestCurrencyUnits = Helper.convertToSmallestCurrencyUnits(amountAsBigDecimal, account.currency) + newBalanceInSmallestCurrencyUnits = Helper.convertToSmallestCurrencyUnits(newBalanceAsBigDecimal, account.currency) + otherAccount = obpTransaction.other_account + mappedTransaction = MappedTransaction.create + .bank(bankId.value) + .account(account.accountId.value) + .transactionType(details.kind) + .amount(amountInSmallestCurrencyUnits) + .newAccountBalance(newBalanceInSmallestCurrencyUnits) + .currency(account.currency) + .tStartDate(details.posted.`$dt`) + .tFinishDate(details.completed.`$dt`) + .description(details.label) + .counterpartyAccountNumber(otherAccount.number) + .counterpartyAccountHolder(otherAccount.holder) + .counterpartyAccountKind(otherAccount.kind) + .counterpartyNationalId(otherAccount.bank.national_identifier) + .counterpartyBankName(otherAccount.bank.name) + .counterpartyIban(otherAccount.bank.IBAN) + .saveMe() + transaction <- mappedTransaction.toTransaction(account) + } yield transaction + } + + override def setBankAccountLastUpdated(bankNationalIdentifier: String, accountNumber : String, updateDate: Date) : Boolean = { + val result = for { + bankId <- getBankByNationalIdentifier(bankNationalIdentifier).map(_.bankId) + account <- getAccountByNumber(bankId, accountNumber) + } yield { + val acc = getBankAccount(bankId, account.accountId) + acc match { + case a => true //a.lastUpdate = updateDate //TODO + case _ => logger.warn("can't set bank account.lastUpdated because the account was not found"); false + } + } + result.getOrElse(false) + } + + /* + End of transaction importer api + */ + + + override def updateAccountLabel(bankId: BankId, accountId: AccountId, label: String): Boolean = { + //this will be Full(true) if everything went well + val result = for { + acc <- getBankAccount(bankId, accountId) + bank <- getBank(bankId) + d <- MappedBankAccountData.find(By(MappedBankAccountData.accountId, accountId.value), By(MappedBankAccountData.bankId, bank.bankId.value)) + } yield { + d.setLabel(label) + d.save() + } + result.getOrElse(false) + } + + + override def getProducts(bankId: BankId): Box[List[Product]] = Empty + + override def getProduct(bankId: BankId, productCode: ProductCode): Box[Product] = Empty + + override def createOrUpdateBranch(branch: BranchJsonPost ): Box[Branch] = Empty + + override def getBranch(bankId : BankId, branchId: BranchId) : Box[MappedBranch]= Empty + + override def getConsumerByConsumerId(consumerId: Long): Box[Consumer] = Empty + + // get the latest FXRate specified by fromCurrencyCode and toCurrencyCode. + override def getCurrentFxRate(fromCurrencyCode: String, toCurrencyCode: String): Box[FXRate] = { + // Create request argument list + val req = Map( + "action" -> "obp.getCurrentFxRate", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "fromCurrencyCode" -> fromCurrencyCode, + "toCurrencyCode" -> toCurrencyCode + ) + val r = { + cachedFxRate.getOrElseUpdate(req.toString, () => process(req).extract[KafkaInboundFXRate]) + } + // Return result + Full(new KafkaFXRate(r)) + } + + //get the current charge specified by bankId, accountId, viewId and transactionRequestType + override def getTransactionRequestTypeCharge(bankId: BankId, accountId: AccountId, viewId: ViewId, transactionRequestType: TransactionRequestType): Box[TransactionRequestTypeCharge] = { + + // Create request argument list + val req = Map( + "action" -> "obp.getTransactionRequestTypeCharge", + "version" -> formatVersion, + "userId" -> AuthUser.getCurrentResourceUserUserId, + "username" -> AuthUser.getCurrentUserUsername, + "bankId" -> bankId.value, + "accountId" -> accountId.value, + "viewId" -> viewId.value, + "transactionRequestType" -> transactionRequestType.value + ) + // send the request to kafka and get response + // TODO the error handling is not good enough, it should divide the error, empty and no-response. + val r = tryo { + Full(cachedTransactionRequestTypeCharge.getOrElseUpdate(req.toString, () => process(req).extract[KafkaInboundTransactionRequestTypeCharge])) + } + + // Return result + val result = r match { + case Full(f) => KafkaTransactionRequestTypeCharge(f.get) + case _ => + val fromAccountCurrency: String = getBankAccount(bankId, accountId).get.currency + KafkaTransactionRequestTypeCharge(KafkaInboundTransactionRequestTypeCharge(transactionRequestType.value, bankId.value, fromAccountCurrency, "0.00", "Warning! Default value!")) + } + + // result + Full(result) + } + + override def getTransactionRequestTypeCharges(bankId: BankId, accountId: AccountId, viewId: ViewId, transactionRequestTypes: List[TransactionRequestType]): Box[List[TransactionRequestTypeCharge]] = { + Full(transactionRequestTypes.map(getTransactionRequestTypeCharge(bankId, accountId, viewId,_).get)) + } + + override def getEmptyBankAccount(): Box[AccountType] = { + Full(new KafkaBankAccount(KafkaInboundAccount(accountId = "", + bankId = "", + label = "", + number = "", + `type` = "", + balanceAmount = "", + balanceCurrency = "", + iban = "", + owners = Nil, + generate_public_view = true, + generate_accountants_view = true, + generate_auditors_view = true))) + } + + ///////////////////////////////////////////////////////////////////////////// + + + + // Helper for creating a transaction + def createNewTransaction(r: KafkaInboundTransaction):Box[Transaction] = { + var datePosted: Date = null + if (r.postedDate != null) // && r.details.posted.matches("^[0-9]{8}$")) + datePosted = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).parse(r.postedDate) + + var dateCompleted: Date = null + if (r.completedDate != null) // && r.details.completed.matches("^[0-9]{8}$")) + dateCompleted = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).parse(r.completedDate) + + for { + counterpartyId <- tryo{r.counterpartyId} + counterpartyName <- tryo{r.counterpartyName} + thisAccount <- getBankAccount(BankId(r.bankId), AccountId(r.accountId)) + //creates a dummy OtherBankAccount without an OtherBankAccountMetadata, which results in one being generated (in OtherBankAccount init) + dummyOtherBankAccount <- tryo{createCounterparty(counterpartyId, counterpartyName, thisAccount, None)} + //and create the proper OtherBankAccount with the correct "id" attribute set to the metadataId of the OtherBankAccountMetadata object + //note: as we are passing in the OtherBankAccountMetadata we don't incur another db call to get it in OtherBankAccount init + counterparty <- tryo{createCounterparty(counterpartyId, counterpartyName, thisAccount, Some(dummyOtherBankAccount.metadata))} + } yield { + // Create new transaction + new Transaction( + r.transactionId, // uuid:String + TransactionId(r.transactionId), // id:TransactionId + thisAccount, // thisAccount:BankAccount + counterparty, // otherAccount:OtherBankAccount + r.`type`, // transactionType:String + BigDecimal(r.amount), // val amount:BigDecimal + thisAccount.currency, // currency:String + Some(r.description), // description:Option[String] + datePosted, // startDate:Date + dateCompleted, // finishDate:Date + BigDecimal(r.newBalanceAmount) // balance:BigDecimal) + ) + } + } + + + case class KafkaBank(r: KafkaInboundBank) extends Bank { + def fullName = r.name + def shortName = r.name + def logoUrl = r.logo + def bankId = BankId(r.bankId) + def nationalIdentifier = "None" //TODO + def swiftBic = "None" //TODO + def websiteUrl = r.url + } + + // Helper for creating other bank account + def createCounterparty(counterpartyId: String, counterpartyName: String, o: KafkaBankAccount, alreadyFoundMetadata : Option[CounterpartyMetadata]) = { + new Counterparty( + counterPartyId = alreadyFoundMetadata.map(_.metadataId).getOrElse(""), + label = counterpartyName, + nationalIdentifier = "", + otherBankRoutingAddress = None, + otherAccountRoutingAddress = None, + thisAccountId = AccountId(counterpartyId), + thisBankId = BankId(""), + kind = "", + otherBankId = o.bankId, + otherAccountId = o.accountId, + alreadyFoundMetadata = alreadyFoundMetadata, + name = "", + otherBankRoutingScheme = "", + otherAccountRoutingScheme="", + otherAccountProvider = "", + isBeneficiary = true + ) + } + case class KafkaBankAccount(r: KafkaInboundAccount) extends BankAccount { + def accountId : AccountId = AccountId(r.accountId) + def accountType : String = r.`type` + def balance : BigDecimal = BigDecimal(r.balanceAmount) + def currency : String = r.balanceCurrency + def name : String = r.owners.head + def swift_bic : Option[String] = Some("swift_bic") //TODO + def iban : Option[String] = Some(r.iban) + def number : String = r.number + def bankId : BankId = BankId(r.bankId) + def lastUpdate : Date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).parse(today.getTime.toString) + def accountHolder : String = r.owners.head + + // Fields modifiable from OBP are stored in mapper + def label : String = (for { + d <- MappedBankAccountData.find(By(MappedBankAccountData.accountId, r.accountId)) + } yield { + d.getLabel + }).getOrElse(r.number) + + } + + case class KafkaFXRate(kafkaInboundFxRate: KafkaInboundFXRate) extends FXRate { + def fromCurrencyCode : String= kafkaInboundFxRate.from_currency_code + def toCurrencyCode : String= kafkaInboundFxRate.to_currency_code + def conversionValue : Double= kafkaInboundFxRate.conversion_value + def inverseConversionValue : Double= kafkaInboundFxRate.inverse_conversion_value + //TODO need to add error handling here for String --> Date transfer + def effectiveDate : Date= new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).parse(kafkaInboundFxRate.effective_date) + } + + case class KafkaCounterparty(counterparty: KafkaInboundCounterparty) extends CounterpartyTrait { + def createdByUserId: String = counterparty.created_by_user_id + def name: String = counterparty.name + def thisBankId: String = counterparty.this_bank_id + def thisAccountId: String = counterparty.this_account_id + def thisViewId: String = counterparty.this_view_id + def counterpartyId: String = counterparty.counterparty_id + def otherAccountRoutingScheme: String = counterparty.other_account_routing_scheme + def otherAccountRoutingAddress: String = counterparty.other_account_routing_address + def otherBankRoutingScheme: String = counterparty.other_bank_routing_scheme + def otherBankRoutingAddress: String = counterparty.other_bank_routing_address + def isBeneficiary : Boolean = counterparty.is_beneficiary + } + + case class KafkaTransactionRequestTypeCharge(kafkaInboundTransactionRequestTypeCharge: KafkaInboundTransactionRequestTypeCharge) extends TransactionRequestTypeCharge{ + def transactionRequestTypeId: String = kafkaInboundTransactionRequestTypeCharge.transaction_request_type_id + def bankId: String = kafkaInboundTransactionRequestTypeCharge.bank_id + def chargeCurrency: String = kafkaInboundTransactionRequestTypeCharge.charge_currency + def chargeAmount: String = kafkaInboundTransactionRequestTypeCharge.charge_amount + def chargeSummary: String = kafkaInboundTransactionRequestTypeCharge.charge_summary + } + + case class KafkaInboundBank( + bankId : String, + name : String, + logo : String, + url : String) + + + /** Bank Branches + * + * @param id Uniquely identifies the Branch within the Bank. SHOULD be url friendly (no spaces etc.) Used in URLs + * @param bank_id MUST match bank_id in Banks + * @param name Informal name for the Branch + * @param address Address + * @param location Geolocation + * @param meta Meta information including the license this information is published under + * @param lobby Info about when the lobby doors are open + * @param driveUp Info about when automated facilities are open e.g. cash point machine + */ + case class KafkaInboundBranch( + id : String, + bank_id: String, + name : String, + address : KafkaInboundAddress, + location : KafkaInboundLocation, + meta : KafkaInboundMeta, + lobby : Option[KafkaInboundLobby], + driveUp : Option[KafkaInboundDriveUp]) + + case class KafkaInboundLicense( + id : String, + name : String) + + case class KafkaInboundMeta( + license : KafkaInboundLicense) + + case class KafkaInboundLobby( + hours : String) + + case class KafkaInboundDriveUp( + hours : String) + + /** + * + * @param line_1 Line 1 of Address + * @param line_2 Line 2 of Address + * @param line_3 Line 3 of Address + * @param city City + * @param county County i.e. Division of State + * @param state State i.e. Division of Country + * @param post_code Post Code or Zip Code + * @param country_code 2 letter country code: ISO 3166-1 alpha-2 + */ + case class KafkaInboundAddress( + line_1 : String, + line_2 : String, + line_3 : String, + city : String, + county : String, // Division of State + state : String, // Division of Country + post_code : String, + country_code: String) + + case class KafkaInboundLocation( + latitude : Double, + longitude : Double) + + case class KafkaInboundValidatedUser(email: String, + displayName: String) + + // TODO Be consistent use camelCase + + case class KafkaInboundAccount( + accountId : String, + bankId : String, + label : String, + number : String, + `type` : String, + balanceAmount: String, + balanceCurrency: String, + iban : String, + owners : List[String], + generate_public_view : Boolean, + generate_accountants_view : Boolean, + generate_auditors_view : Boolean) + + case class KafkaInboundTransaction( + transactionId : String, + accountId : String, + amount: String, + bankId : String, + completedDate: String, + counterpartyId: String, + counterpartyName: String, + currency: String, + description: String, + newBalanceAmount: String, + newBalanceCurrency: String, + postedDate: String, + `type`: String, + userId: String + ) + + case class KafkaInboundAtm( + id : String, + bank_id: String, + name : String, + address : KafkaInboundAddress, + location : KafkaInboundLocation, + meta : KafkaInboundMeta + ) + + case class KafkaInboundProduct( + bank_id : String, + code: String, + name : String, + category : String, + family : String, + super_family : String, + more_info_url : String, + meta : KafkaInboundMeta + ) + + case class KafkaInboundAccountData( + banks : List[KafkaInboundBank], + users : List[InboundUser], + accounts : List[KafkaInboundAccount] + ) + + // We won't need this. TODO clean up. + case class KafkaInboundData( + banks : List[KafkaInboundBank], + users : List[InboundUser], + accounts : List[KafkaInboundAccount], + transactions : List[KafkaInboundTransaction], + branches: List[KafkaInboundBranch], + atms: List[KafkaInboundAtm], + products: List[KafkaInboundProduct], + crm_events: List[KafkaInboundCrmEvent] + ) + + case class KafkaInboundCrmEvent( + id : String, // crmEventId + bank_id : String, + customer: KafkaInboundCustomer, + category : String, + detail : String, + channel : String, + actual_date: String + ) + + case class KafkaInboundCustomer( + name: String, + number : String // customer number, also known as ownerId (owner of accounts) aka API User? + ) + + case class KafkaInboundTransactionId( + transactionId : String + ) + + case class KafkaOutboundTransaction( + action: String, + version: String, + userId: String, + userName: String, + accountId: String, + currency: String, + amount: String, + otherAccountId: String, + otherAccountCurrency: String, + transactionType: String) + + case class KafkaInboundChallengeLevel( + limit: String, + currency: String + ) + case class KafkaInboundTransactionRequestStatus( + transactionRequestId : String, + bulkTransactionsStatus: List[KafkaInboundTransactionStatus] + ) + case class KafkaInboundTransactionStatus( + transactionId : String, + transactionStatus: String, + transactionTimestamp: String + ) + case class KafkaInboundCreateChallange(challengeId: String) + case class KafkaInboundValidateChallangeAnswer(answer: String) + + case class KafkaInboundChargeLevel( + currency: String, + amount: String + ) + + case class KafkaInboundFXRate( + from_currency_code: String, + to_currency_code: String, + conversion_value: Double, + inverse_conversion_value: Double, + effective_date: String + ) + + case class KafkaInboundCounterparty( + name: String, + created_by_user_id: String, + this_bank_id: String, + this_account_id: String, + this_view_id: String, + counterparty_id: String, + other_bank_routing_scheme: String, + other_account_routing_scheme: String, + other_bank_routing_address: String, + other_account_routing_address: String, + is_beneficiary: Boolean + ) + + + case class KafkaInboundTransactionRequestTypeCharge( + transaction_request_type_id: String, + bank_id: String, + charge_currency: String, + charge_amount: String, + charge_summary: String + ) + + def process(request: Map[String,String]): json.JValue = { + val reqId = UUID.randomUUID().toString + if (producer.send(reqId, request, "1")) { + // Request sent, now we wait for response with the same reqId + val res = consumer.getResponse(reqId) + return res + } + return json.parse("""{"error":"could not send message to kafka"}""") + } + +} + diff --git a/src/main/scala/code/model/dataAccess/AuthUser.scala b/src/main/scala/code/model/dataAccess/AuthUser.scala index 918edfc38..ba5f12042 100644 --- a/src/main/scala/code/model/dataAccess/AuthUser.scala +++ b/src/main/scala/code/model/dataAccess/AuthUser.scala @@ -443,7 +443,7 @@ import net.liftweb.util.Helpers._ case Full(user) if (user.getProvider() != Props.get("hostname","")) => connector match { - case "kafka" if ( Props.getBool("kafka.user.authentication", false) && + case Helper.matchAnyKafka() if ( Props.getBool("kafka.user.authentication", false) && ! LoginAttempt.userIsLocked(username) ) => val userId = for { kafkaUser <- getUserFromConnector(username, password) kafkaUserId <- tryo{kafkaUser.user} } yield { @@ -625,7 +625,7 @@ import net.liftweb.util.Helpers._ S.error(S.?("account.validation.error")) // If not found locally, try to authenticate user via Kafka, if enabled in props - case Empty if (connector == "kafka" || connector == "obpjvm") && + case Empty if (connector.startsWith("kafka") || connector == "obpjvm") && (Props.getBool("kafka.user.authentication", false) || Props.getBool("obpjvm.user.authentication", false)) => val preLoginState = capturePreLoginState() @@ -682,7 +682,7 @@ import net.liftweb.util.Helpers._ def testExternalPassword(usernameFromGui: Box[String], passwordFromGui: Box[String]): Box[Boolean] = { - if (connector == "kafka" || connector == "obpjvm") { + if (connector.startsWith("kafka") || connector == "obpjvm") { val res = for { username <- usernameFromGui password <- passwordFromGui @@ -697,7 +697,7 @@ import net.liftweb.util.Helpers._ def externalUserHelper(name: String, password: String): Box[AuthUser] = { - if (connector == "kafka" || connector == "obpjvm") { + if (connector.startsWith("kafka") || connector == "obpjvm") { for { user <- getUserFromConnector(name, password) u <- Users.users.vend.getUserByUserName(username) @@ -710,7 +710,7 @@ import net.liftweb.util.Helpers._ def registeredUserHelper(username: String) = { - if (connector == "kafka" || connector == "obpjvm") { + if (connector.startsWith("kafka") || connector == "obpjvm") { for { u <- Users.users.vend.getUserByUserName(username) v <- tryo {Connector.connector.vend.updateUserAccountViews(u)} diff --git a/src/main/scala/code/util/Helper.scala b/src/main/scala/code/util/Helper.scala index 7b590cd0f..21a980ae5 100644 --- a/src/main/scala/code/util/Helper.scala +++ b/src/main/scala/code/util/Helper.scala @@ -155,4 +155,11 @@ object Helper{ validUrls.contains(extractCleanURL) } + + /** + * Used for version extraction from props string + */ + val matchKafkaVersion = "kafka_v([0-9a-zA-Z_]+)".r + val matchAnyKafka = "^kafka.*$".r + } \ No newline at end of file