From 81ecee84e7dbda83e83a0eb9086a1c2f17ba9982 Mon Sep 17 00:00:00 2001 From: sorinmanole Date: Thu, 25 Aug 2016 12:35:46 +0200 Subject: [PATCH] issue #139 - new KafkaLibMapeedConnector --- pom.xml | 5 + .../resources/props/sample.props.template | 1 + .../scala/code/bankconnectors/Connector.scala | 17 +- .../KafkaLibMappedConnector.scala | 1049 +++++++++++++++++ 4 files changed, 1067 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/code/bankconnectors/KafkaLibMappedConnector.scala diff --git a/pom.xml b/pom.xml index 192e5b2af..8cade95e9 100644 --- a/pom.xml +++ b/pom.xml @@ -209,6 +209,11 @@ signpost-commonshttp4 1.2.1.2 + + com.tesobe.obp + obp-ri-kafka + 2016.0-SNAPSHOT + diff --git a/src/main/resources/props/sample.props.template b/src/main/resources/props/sample.props.template index f19b28ffe..ab0105121 100644 --- a/src/main/resources/props/sample.props.template +++ b/src/main/resources/props/sample.props.template @@ -10,6 +10,7 @@ connector=mapped #connector=mongodb #connector=kafka +#connector=kafka_lib_v0 #connector=... #if using kafka connector, set zookeeper host diff --git a/src/main/scala/code/bankconnectors/Connector.scala b/src/main/scala/code/bankconnectors/Connector.scala index 1dd545864..5b4d74cea 100644 --- a/src/main/scala/code/bankconnectors/Connector.scala +++ b/src/main/scala/code/bankconnectors/Connector.scala @@ -37,12 +37,19 @@ object Connector extends SimpleInjector { val connector = new Inject(buildOne _) {} - def buildOne: Connector = - Props.get("connector").openOrThrowException("no connector set") match { - case "mapped" => LocalMappedConnector - case "mongodb" => LocalConnector - case "kafka" => KafkaMappedConnector + def buildOne: Connector = { + val connectorProps = Props.get("connector").openOrThrowException("no connector set") + + if (connectorProps.startsWith("kafka_lib")) { + KafkaLibMappedConnector + } else { + connectorProps match { + case "mapped" => LocalMappedConnector + case "mongodb" => LocalConnector + case "kafka" => KafkaMappedConnector + } } + } } diff --git a/src/main/scala/code/bankconnectors/KafkaLibMappedConnector.scala b/src/main/scala/code/bankconnectors/KafkaLibMappedConnector.scala new file mode 100644 index 000000000..779c0d511 --- /dev/null +++ b/src/main/scala/code/bankconnectors/KafkaLibMappedConnector.scala @@ -0,0 +1,1049 @@ +package code.bankconnectors + +import java.text.SimpleDateFormat +import java.util.{Optional, Date, Locale, UUID} + +import code.api.util.ErrorMessages +import code.management.ImporterAPI.ImporterTransaction +import code.metadata.comments.MappedComment +import code.metadata.counterparties.Counterparties +import code.metadata.narrative.MappedNarrative +import code.metadata.tags.MappedTag +import code.metadata.transactionimages.MappedTransactionImage +import code.metadata.wheretags.MappedWhereTag +import code.model._ +import code.model.dataAccess._ +import code.sandbox.{CreateViewImpls, Saveable} +import code.transaction.MappedTransaction +import code.transactionrequests.{MappedTransactionRequest210, MappedTransactionRequest} +import code.transactionrequests.TransactionRequests._ +import code.util.{Helper, TTLCache} +import code.views.Views +import com.tesobe.obp.kafka.SimpleNorth +import com.tesobe.obp.transport.Transport +import com.tesobe.obp.transport.Transport.Factory +import net.liftweb.common._ +import net.liftweb.json +import net.liftweb.mapper._ +import net.liftweb.util.Helpers._ +import net.liftweb.util.Props +import net.liftweb.json._ + +import scala.collection.JavaConversions._ + +object KafkaLibMappedConnector extends Connector with CreateViewImpls with Loggable { + type JBank = com.tesobe.obp.transport.Bank + type JTransaction = com.tesobe.obp.transport.Transaction + type JAccount = com.tesobe.obp.transport.Account + type OutboundContext = com.tesobe.obp.transport.OutboundContext + + val version = getVersion() + + val factory : Factory = Transport.factory(version, Transport.Encoding.json).get + //todo get topic names from the props + val north: SimpleNorth = new SimpleNorth("Request", "Response") // Kafka + val connector : com.tesobe.obp.transport.Connector = factory.connector(north) + + north.receive() // start Kafka + + var producer = new KafkaProducer() + var consumer = new KafkaConsumer() + type AccountType = KafkaBankAccount + + // Local TTL Cache + val cacheTTL = Props.get("kafka.cache.ttl.seconds", "3").toInt + val cachedUser = TTLCache[KafkaInboundValidatedUser](cacheTTL) + val cachedBank = TTLCache[KafkaInboundBank](cacheTTL) + val cachedAccount = TTLCache[KafkaInboundAccount](cacheTTL) + val cachedBanks = TTLCache[List[KafkaInboundBank]](cacheTTL) + val cachedAccounts = TTLCache[List[KafkaInboundAccount]](cacheTTL) + val cachedPublicAccounts = TTLCache[List[KafkaInboundAccount]](cacheTTL) + val cachedUserAccounts = TTLCache[List[KafkaInboundAccount]](cacheTTL) + + implicit val formats = net.liftweb.json.DefaultFormats + + def getVersion(): Transport.Version = { + val versionProp = Props.get("connector").openOrThrowException("no connector set") + val version = versionProp.split("_").last + + version.toLowerCase match { + case "v0" => Transport.Version.v0 + case _ => Transport.Version.v0 + } + } + + //gets user handled by this connector + def getUser( username: String, password: String ): Box[KafkaInboundUser] = { + val user = connector.getUser(username, new OutboundContext(null, null, null)).user + + if (user.isPresent) { + val u = user.get + Full(new KafkaInboundUser(u.email(), u.password(), u.email())) + } else { + Empty + } + } + + //gets banks handled by this connector + override def getBanks: List[Bank] = { + val banks = connector.getBanks(new OutboundContext(null, null, null)).banks + + //Loop through list of responses and create entry for each + val res = { + for (r <- banks) yield { + KafkaBank(KafkaInboundBank(r.id, r.shortName, r.fullName, r.logo, r.url)) + } + } + // Return list of results + + logger.debug(s"Kafka getBanks says res is $res") + res.toList + } + + // Gets bank identified by bankId + override def getBank(id: BankId): Box[Bank] = { + val bank = connector.getBank(id.value, new OutboundContext(null, null, null)).bank + + if(bank.isPresent) { + val b = bank.get + Full(KafkaBank(KafkaInboundBank(b.id, b.shortName, b.fullName, b.logo, b.url))) + } else { + Empty + } + } + + // Gets transaction identified by bankid, accountid and transactionId + def getTransaction(bankId: BankId, accountID: AccountId, transactionId: TransactionId): Box[Transaction] = { + val transaction = connector.getTransaction(bankId.value, accountID.value, transactionId.value, new OutboundContext(null, null, null)).transaction + + if(transaction.isPresent) { + val t = transaction.get + val kafkaInboundTransaction = new KafkaInboundTransaction(t.id(), + new KafkaInboundAccountId(t.account_number(), t.bank()), + Some(new KafkaInboundTransactionCounterparty(Some(t.name()), Some(t.account_number()))), + new KafkaInboundTransactionDetails(t.`type`(), t.description(), t.posted().toString, t.completed().toString, t.new_balance(), t.value()) + ) + + if (transactionId.value == t.id()){ + createNewTransaction(kafkaInboundTransaction) + } else { + Failure(ErrorMessages.InvalidGetTransactionConnectorResponse, Empty, Empty) + } + } else { + Empty + } + } + + // Gets transaction identified by bankid and accountid + override def getTransactions(bankId: BankId, accountID: AccountId, queryParams: OBPQueryParam*): Box[List[Transaction]] = { + val transactions = connector.getTransactions(bankId.value, accountID.value, new OutboundContext(null, null, null)).transactions + + // Check does the response data match the requested data + val isCorrect = transactions.forall(x => x.account_number() == accountID.value && x.bank() == bankId.value) + if (!isCorrect) throw new Exception(ErrorMessages.InvalidGetTransactionsConnectorResponse) + // Populate fields and generate result + val res = for { + t <- transactions + kafkaInboundTransaction = new KafkaInboundTransaction(t.id(), + new KafkaInboundAccountId(t.account_number(), t.bank()), + Some(new KafkaInboundTransactionCounterparty(Some(t.name()), Some(t.account_number()))), + new KafkaInboundTransactionDetails(t.`type`(), t.description(), t.posted().toString, t.completed().toString, t.new_balance(), t.value()) + ) + transaction <- createNewTransaction(kafkaInboundTransaction) + } yield { + transaction + } + Full(res.toList) + } + + // Gets account identified by bankid and accountid + override def getBankAccount(bankId: BankId, accountID: AccountId): Box[KafkaBankAccount] = { + val account = connector.getAccount(bankId.value, accountID.value, new OutboundContext(null, null, null)).account + + if (account.isPresent) { + val a = account.get + // Check does the response data match the requested data + val accResp = List((BankId(a.bank()), AccountId(a.id()))).toSet + val acc = List((bankId, accountID)).toSet + if ((accResp diff acc).size > 0) throw new Exception(ErrorMessages.InvalidGetBankAccountConnectorResponse) + + createMappedAccountDataIfNotExisting(a.bank(), a.id(), a.label()) + + Full(new KafkaBankAccount(new KafkaInboundAccount(a.id(), a.bank(), a.label(), a.number(), a.`type`(), + new KafkaInboundBalance(a.currency(), a.amount()), a.iban(), Nil, false, false, false))) //TODO figure out the generate parameters + } else { + Empty + } + } + + // Gets accounts identified by bankid and accountid + override def getBankAccounts(accts: List[(BankId, AccountId)]): List[KafkaBankAccount] = { + var accounts: List[JAccount] = Nil + + for (ba <- accts){ + val account = connector.getAccount(ba._1.value, ba._2.value, new OutboundContext(null, null, null)).account + if (account.isPresent){ + accounts = accounts ::: List[JAccount](account.get) + } + } + + // Check does the response data match the requested data + val accRes = for(row <- accounts) yield { + (BankId(row.bank), AccountId(row.id)) + } + if ((accRes.toSet diff accts.toSet).size > 0) throw new Exception(ErrorMessages.InvalidGetBankAccountsConnectorResponse) + + accounts.map { a => + createMappedAccountDataIfNotExisting(a.bank, a.id, a.label) + new KafkaBankAccount(new KafkaInboundAccount(a.id(), a.bank(), a.label(), a.number(), a.`type`(), + new KafkaInboundBalance(a.currency(), a.amount()), a.iban(), Nil, false, false, false)) } + } + + private def getAccountByNumber(bankId : BankId, number : String) : Box[AccountType] = { + // Generate random uuid to be used as request-respose match id + val reqId: String = UUID.randomUUID().toString + // Create argument list with reqId + // in order to fetch corresponding response + val argList = Map("bankId" -> bankId.toString, + "username" -> OBPUser.getCurrentUserUsername, + "number" -> number) + // Since result is single account, we need only first list entry + implicit val formats = net.liftweb.json.DefaultFormats + val r = { + cachedAccount.getOrElseUpdate( argList.toString, () => process(reqId, "getBankAccount", argList).extract[KafkaInboundAccount]) + } + createMappedAccountDataIfNotExisting(r.bank, r.id, r.label) + Full(new KafkaBankAccount(r)) + } + + def getOtherBankAccount(thisAccountBankId : BankId, thisAccountId : AccountId, metadata : OtherBankAccountMetadata) : Box[OtherBankAccount] = { + //because we don't have a db backed model for OtherBankAccounts, we need to construct it from an + //OtherBankAccountMetadata and a transaction + val t = getTransactions(thisAccountBankId, thisAccountId).map { t => + t.filter { e => + if (e.otherAccount.number == metadata.getAccountNumber) + true + else + false + } + }.get.head + + val res = new OtherBankAccount( + //counterparty id is defined to be the id of its metadata as we don't actually have an id for the counterparty itself + id = metadata.metadataId, + label = metadata.getHolder, + nationalIdentifier = t.otherAccount.nationalIdentifier, + swift_bic = None, + iban = t.otherAccount.iban, + number = metadata.getAccountNumber, + bankName = t.otherAccount.bankName, + kind = t.otherAccount.kind, + originalPartyBankId = thisAccountBankId, + originalPartyAccountId = thisAccountId, + alreadyFoundMetadata = Some(metadata) + ) + Full(res) + } + + /** + * + * refreshes transactions via hbci if the transaction info is sourced from hbci + * + * Checks if the last update of the account was made more than one hour ago. + * if it is the case we put a message in the message queue to ask for + * transactions updates + * + * It will be used each time we fetch transactions from the DB. But the test + * is performed in a different thread. + */ + /* + private def updateAccountTransactions(bankId : BankId, accountID : AccountId) = { + + for { + bank <- getBank(bankId) + account <- getBankAccountType(bankId, accountID) + } { + spawn{ + val useMessageQueue = Props.getBool("messageQueue.updateBankAccountsTransaction", false) + val outDatedTransactions = Box!!account.lastUpdate match { + case Full(l) => now after time(l.getTime + hours(Props.getInt("messageQueue.updateTransactionsInterval", 1))) + case _ => true + } + //if(outDatedTransactions && useMessageQueue) { + // UpdatesRequestSender.sendMsg(UpdateBankAccount(account.number, bank.national_identifier.get)) + //} + } + } + } + */ + + //gets the users who are the legal owners/holders of the account + override def getAccountHolders(bankId: BankId, accountID: AccountId): Set[User] = + MappedAccountHolder.findAll( + By(MappedAccountHolder.accountBankPermalink, bankId.value), + By(MappedAccountHolder.accountPermalink, accountID.value)).map(accHolder => accHolder.user.obj).flatten.toSet + + + // Get all counterparties related to an account + override def getOtherBankAccounts(bankId: BankId, accountID: AccountId): List[OtherBankAccount] = + Counterparties.counterparties.vend.getMetadatas(bankId, accountID).flatMap(getOtherBankAccount(bankId, accountID, _)) + + // Get one counterparty related to a bank account + override def getOtherBankAccount(bankId: BankId, accountID: AccountId, otherAccountID: String): Box[OtherBankAccount] = + // Get the metadata and pass it to getOtherBankAccount to construct the other account. + Counterparties.counterparties.vend.getMetadata(bankId, accountID, otherAccountID).flatMap(getOtherBankAccount(bankId, accountID, _)) + + override def getPhysicalCards(user: User): Set[PhysicalCard] = + Set.empty + + override def getPhysicalCardsForBank(bankId: BankId, user: User): Set[PhysicalCard] = + Set.empty + + + override def makePaymentImpl(fromAccount: AccountType, toAccount: AccountType, amt: BigDecimal, description : String): Box[TransactionId] = { + val fromTransAmt = -amt //from account balance should decrease + val toTransAmt = amt //to account balance should increase + + //we need to save a copy of this payment as a transaction in each of the accounts involved, with opposite amounts + val sentTransactionId = saveTransaction(fromAccount, toAccount, fromTransAmt, description) + saveTransaction(toAccount, fromAccount, toTransAmt, description) + + sentTransactionId + } + + + /** + * Saves a transaction with amount @amt and counterparty @counterparty for account @account. Returns the id + * of the saved transaction. + */ + private def saveTransaction(account : AccountType, + counterparty : BankAccount, + amt : BigDecimal, + description : String) : Box[TransactionId] = { + + val transactionTime = now + val currency = account.currency + + //update the balance of the account for which a transaction is being created + val newAccountBalance : Long = account.balance.toLong + Helper.convertToSmallestCurrencyUnits(amt, account.currency) + //account.balance = newAccountBalance + + val reqId: String = UUID.randomUUID().toString + // Create argument list with reqId + // in order to fetch corresponding response + val argObj = KafkaOutboundTransaction(username = OBPUser.getCurrentUserUsername, + accountId = account.accountId.value, + currency = currency, + amount = amt.toString, + otherAccountId = counterparty.accountId.value, + otherAccountCurrency = counterparty.currency, + transactionType = "AC") + + // Since result is single account, we need only first list entry + implicit val formats = net.liftweb.json.DefaultFormats + val argMap = Extraction.decompose(argObj).values + val r = process(reqId, "saveTransaction", argMap.asInstanceOf[Map[String,String]]) //.extract[KafkaInboundTransactionId] + + r.extract[KafkaInboundTransactionId] match { + case r: KafkaInboundTransactionId => Full(TransactionId(r.transactionId)) + case _ => Full(TransactionId("0")) + } + + } + + /* + Transaction Requests + */ + + override def createTransactionRequestImpl(transactionRequestId: TransactionRequestId, transactionRequestType: TransactionRequestType, + account : BankAccount, counterparty : BankAccount, body: TransactionRequestBody, + status: String, charge: TransactionRequestCharge) : Box[TransactionRequest] = { + val mappedTransactionRequest = MappedTransactionRequest.create + .mTransactionRequestId(transactionRequestId.value) + .mType(transactionRequestType.value) + .mFrom_BankId(account.bankId.value) + .mFrom_AccountId(account.accountId.value) + .mBody_To_BankId(counterparty.bankId.value) + .mBody_To_AccountId(counterparty.accountId.value) + .mBody_Value_Currency(body.value.currency) + .mBody_Value_Amount(body.value.amount) + .mBody_Description(body.description) + .mStatus(status) + .mStartDate(now) + .mEndDate(now).saveMe + Full(mappedTransactionRequest).flatMap(_.toTransactionRequest) + } + + override def createTransactionRequestImpl210(transactionRequestId: TransactionRequestId, transactionRequestType: TransactionRequestType, + account : BankAccount, details: String, + status: String, charge: TransactionRequestCharge) : Box[TransactionRequest210] = { + val mappedTransactionRequest = MappedTransactionRequest210.create + .mTransactionRequestId(transactionRequestId.value) + .mType(transactionRequestType.value) + .mFrom_BankId(account.bankId.value) + .mFrom_AccountId(account.accountId.value) + .mDetails(details) + .mStatus(status) + .mStartDate(now) + .mEndDate(now).saveMe + Full(mappedTransactionRequest).flatMap(_.toTransactionRequest210) + } + + override def saveTransactionRequestTransactionImpl(transactionRequestId: TransactionRequestId, transactionId: TransactionId): Box[Boolean] = { + val mappedTransactionRequest = MappedTransactionRequest.find(By(MappedTransactionRequest.mTransactionRequestId, transactionRequestId.value)) + mappedTransactionRequest match { + case Full(tr: MappedTransactionRequest) => Full(tr.mTransactionIDs(transactionId.value).save) + case _ => Failure("Couldn't find transaction request ${transactionRequestId}") + } + } + + override def saveTransactionRequestChallengeImpl(transactionRequestId: TransactionRequestId, challenge: TransactionRequestChallenge): Box[Boolean] = { + val mappedTransactionRequest = MappedTransactionRequest.find(By(MappedTransactionRequest.mTransactionRequestId, transactionRequestId.value)) + mappedTransactionRequest match { + case Full(tr: MappedTransactionRequest) => Full{ + tr.mChallenge_Id(challenge.id) + tr.mChallenge_AllowedAttempts(challenge.allowed_attempts) + tr.mChallenge_ChallengeType(challenge.challenge_type).save + } + case _ => Failure(s"Couldn't find transaction request ${transactionRequestId} to set transactionId") + } + } + + override def saveTransactionRequestStatusImpl(transactionRequestId: TransactionRequestId, status: String): Box[Boolean] = { + val mappedTransactionRequest = MappedTransactionRequest.find(By(MappedTransactionRequest.mTransactionRequestId, transactionRequestId.value)) + mappedTransactionRequest match { + case Full(tr: MappedTransactionRequest) => Full(tr.mStatus(status).save) + case _ => Failure(s"Couldn't find transaction request ${transactionRequestId} to set status") + } + } + + + override def getTransactionRequestsImpl(fromAccount : BankAccount) : Box[List[TransactionRequest]] = { + val transactionRequests = MappedTransactionRequest.findAll(By(MappedTransactionRequest.mFrom_AccountId, fromAccount.accountId.value), + By(MappedTransactionRequest.mFrom_BankId, fromAccount.bankId.value)) + + Full(transactionRequests.flatMap(_.toTransactionRequest)) + } + + override def getTransactionRequestsImpl210(fromAccount : BankAccount) : Box[List[TransactionRequest210]] = { + val transactionRequests = MappedTransactionRequest210.findAll(By(MappedTransactionRequest210.mFrom_AccountId, fromAccount.accountId.value), + By(MappedTransactionRequest210.mFrom_BankId, fromAccount.bankId.value)) + + Full(transactionRequests.flatMap(_.toTransactionRequest210)) + } + + override def getTransactionRequestImpl(transactionRequestId: TransactionRequestId) : Box[TransactionRequest] = { + val transactionRequest = MappedTransactionRequest.find(By(MappedTransactionRequest.mTransactionRequestId, transactionRequestId.value)) + transactionRequest.flatMap(_.toTransactionRequest) + } + + + override def getTransactionRequestTypesImpl(fromAccount : BankAccount) : Box[List[TransactionRequestType]] = { + //TODO: write logic / data access + Full(List(TransactionRequestType("SANDBOX_TAN"))) + } + + /* + Bank account creation + */ + + //creates a bank account (if it doesn't exist) and creates a bank (if it doesn't exist) + //again assume national identifier is unique + override def createBankAndAccount(bankName: String, bankNationalIdentifier: String, accountNumber: String, + accountType: String, accountLabel: String, currency: String, accountHolderName: String): (Bank, BankAccount) = { + //don't require and exact match on the name, just the identifier + val bank: Bank = MappedBank.find(By(MappedBank.national_identifier, bankNationalIdentifier)) match { + case Full(b) => + logger.info(s"bank with id ${b.bankId} and national identifier ${b.nationalIdentifier} found") + b + case _ => + logger.info(s"creating bank with national identifier $bankNationalIdentifier") + //TODO: need to handle the case where generatePermalink returns a permalink that is already used for another bank + MappedBank.create + .permalink(Helper.generatePermalink(bankName)) + .fullBankName(bankName) + .shortBankName(bankName) + .national_identifier(bankNationalIdentifier) + .saveMe() + } + + //TODO: pass in currency as a parameter? + val account = createAccountIfNotExisting( + bank.bankId, + AccountId(UUID.randomUUID().toString), + accountNumber, + accountType, + accountLabel, + currency, + 0L, + accountHolderName + ) + + (bank, account) + } + + //for sandbox use -> allows us to check if we can generate a new test account with the given number + override def accountExists(bankId: BankId, accountNumber: String): Boolean = { + getAccountByNumber(bankId, accountNumber) != null + } + + //remove an account and associated transactions + override def removeAccount(bankId: BankId, accountID: AccountId) : Boolean = { + //delete comments on transactions of this account + val commentsDeleted = MappedComment.bulkDelete_!!( + By(MappedComment.bank, bankId.value), + By(MappedComment.account, accountID.value) + ) + + //delete narratives on transactions of this account + val narrativesDeleted = MappedNarrative.bulkDelete_!!( + By(MappedNarrative.bank, bankId.value), + By(MappedNarrative.account, accountID.value) + ) + + //delete narratives on transactions of this account + val tagsDeleted = MappedTag.bulkDelete_!!( + By(MappedTag.bank, bankId.value), + By(MappedTag.account, accountID.value) + ) + + //delete WhereTags on transactions of this account + val whereTagsDeleted = MappedWhereTag.bulkDelete_!!( + By(MappedWhereTag.bank, bankId.value), + By(MappedWhereTag.account, accountID.value) + ) + + //delete transaction images on transactions of this account + val transactionImagesDeleted = MappedTransactionImage.bulkDelete_!!( + By(MappedTransactionImage.bank, bankId.value), + By(MappedTransactionImage.account, accountID.value) + ) + + //delete transactions of account + val transactionsDeleted = MappedTransaction.bulkDelete_!!( + By(MappedTransaction.bank, bankId.value), + By(MappedTransaction.account, accountID.value) + ) + + //remove view privileges (get views first) + val views = ViewImpl.findAll( + By(ViewImpl.bankPermalink, bankId.value), + By(ViewImpl.accountPermalink, accountID.value) + ) + + //loop over them and delete + var privilegesDeleted = true + views.map (x => { + privilegesDeleted &&= ViewPrivileges.bulkDelete_!!(By(ViewPrivileges.view, x.id_)) + }) + + //delete views of account + val viewsDeleted = ViewImpl.bulkDelete_!!( + By(ViewImpl.bankPermalink, bankId.value), + By(ViewImpl.accountPermalink, accountID.value) + ) + + //delete account + val account = getBankAccount(bankId, accountID) + + val accountDeleted = account match { + case acc => true //acc.delete_! //TODO + case _ => false + } + + commentsDeleted && narrativesDeleted && tagsDeleted && whereTagsDeleted && transactionImagesDeleted && + transactionsDeleted && privilegesDeleted && viewsDeleted && accountDeleted + } + + //creates a bank account for an existing bank, with the appropriate values set. Can fail if the bank doesn't exist + override def createSandboxBankAccount(bankId: BankId, accountID: AccountId, accountNumber: String, + accountType: String, accountLabel: String, currency: String, + initialBalance: BigDecimal, accountHolderName: String): Box[BankAccount] = { + + for { + bank <- getBank(bankId) //bank is not really used, but doing this will ensure account creations fails if the bank doesn't + } yield { + + val balanceInSmallestCurrencyUnits = Helper.convertToSmallestCurrencyUnits(initialBalance, currency) + createAccountIfNotExisting(bankId, accountID, accountNumber, accountType, accountLabel, currency, balanceInSmallestCurrencyUnits, accountHolderName) + } + + } + + //sets a user as an account owner/holder + override def setAccountHolder(bankAccountUID: BankAccountUID, user: User): Unit = { + MappedAccountHolder.createMappedAccountHolder(user.apiId.value, bankAccountUID.accountId.value, bankAccountUID.bankId.value) + } + + private def createAccountIfNotExisting(bankId: BankId, accountID: AccountId, accountNumber: String, + accountType: String, accountLabel: String, currency: String, + balanceInSmallestCurrencyUnits: Long, accountHolderName: String) : BankAccount = { + getBankAccount(bankId, accountID) match { + case Full(a) => + logger.info(s"account with id $accountID at bank with id $bankId already exists. No need to create a new one.") + a + case _ => null //TODO + /* + new KafkaBankAccount + .bank(bankId.value) + .theAccountId(accountID.value) + .accountNumber(accountNumber) + .accountType(accountType) + .accountLabel(accountLabel) + .accountCurrency(currency) + .accountBalance(balanceInSmallestCurrencyUnits) + .holder(accountHolderName) + .saveMe() + */ + } + } + + private def createMappedAccountDataIfNotExisting(bankId: String, accountId: String, label: String) : Boolean = { + MappedKafkaBankAccountData.find(By(MappedKafkaBankAccountData.accountId, accountId), + By(MappedKafkaBankAccountData.bankId, bankId)) match { + case Empty => + val data = new MappedKafkaBankAccountData + data.setAccountId(accountId) + data.setBankId(bankId) + data.setLabel(label) + data.save() + true + case _ => + logger.info(s"account data with id $accountId at bank with id $bankId already exists. No need to create a new one.") + false + } + } + + /* + End of bank account creation + */ + + + /* + Transaction importer api + */ + + //used by the transaction import api + override def updateAccountBalance(bankId: BankId, accountID: AccountId, newBalance: BigDecimal): Boolean = { + + //this will be Full(true) if everything went well + val result = for { + acc <- getBankAccount(bankId, accountID) + bank <- getBank(bankId) + } yield { + //acc.balance = newBalance + setBankAccountLastUpdated(bank.nationalIdentifier, acc.number, now) + } + + result.getOrElse(false) + } + + //transaction import api uses bank national identifiers to uniquely indentify banks, + //which is unfortunate as theoretically the national identifier is unique to a bank within + //one country + private def getBankByNationalIdentifier(nationalIdentifier : String) : Box[Bank] = { + MappedBank.find(By(MappedBank.national_identifier, nationalIdentifier)) + } + + + private val bigDecimalFailureHandler : PartialFunction[Throwable, Unit] = { + case ex : NumberFormatException => { + logger.warn(s"could not convert amount to a BigDecimal: $ex") + } + } + + //used by transaction import api call to check for duplicates + override def getMatchingTransactionCount(bankNationalIdentifier : String, accountNumber : String, amount: String, completed: Date, otherAccountHolder: String): Int = { + //we need to convert from the legacy bankNationalIdentifier to BankId, and from the legacy accountNumber to AccountId + val count = for { + bankId <- getBankByNationalIdentifier(bankNationalIdentifier).map(_.bankId) + account <- getAccountByNumber(bankId, accountNumber) + amountAsBigDecimal <- tryo(bigDecimalFailureHandler)(BigDecimal(amount)) + } yield { + + val amountInSmallestCurrencyUnits = + Helper.convertToSmallestCurrencyUnits(amountAsBigDecimal, account.currency) + + MappedTransaction.count( + By(MappedTransaction.bank, bankId.value), + By(MappedTransaction.account, account.accountId.value), + By(MappedTransaction.amount, amountInSmallestCurrencyUnits), + By(MappedTransaction.tFinishDate, completed), + By(MappedTransaction.counterpartyAccountHolder, otherAccountHolder)) + } + + //icky + count.map(_.toInt) getOrElse 0 + } + + //used by transaction import api + override def createImportedTransaction(transaction: ImporterTransaction): Box[Transaction] = { + //we need to convert from the legacy bankNationalIdentifier to BankId, and from the legacy accountNumber to AccountId + val obpTransaction = transaction.obp_transaction + val thisAccount = obpTransaction.this_account + val nationalIdentifier = thisAccount.bank.national_identifier + val accountNumber = thisAccount.number + for { + bank <- getBankByNationalIdentifier(transaction.obp_transaction.this_account.bank.national_identifier) ?~! + s"No bank found with national identifier $nationalIdentifier" + bankId = bank.bankId + account <- getAccountByNumber(bankId, accountNumber) + details = obpTransaction.details + amountAsBigDecimal <- tryo(bigDecimalFailureHandler)(BigDecimal(details.value.amount)) + newBalanceAsBigDecimal <- tryo(bigDecimalFailureHandler)(BigDecimal(details.new_balance.amount)) + amountInSmallestCurrencyUnits = Helper.convertToSmallestCurrencyUnits(amountAsBigDecimal, account.currency) + newBalanceInSmallestCurrencyUnits = Helper.convertToSmallestCurrencyUnits(newBalanceAsBigDecimal, account.currency) + otherAccount = obpTransaction.other_account + mappedTransaction = MappedTransaction.create + .bank(bankId.value) + .account(account.accountId.value) + .transactionType(details.kind) + .amount(amountInSmallestCurrencyUnits) + .newAccountBalance(newBalanceInSmallestCurrencyUnits) + .currency(account.currency) + .tStartDate(details.posted.`$dt`) + .tFinishDate(details.completed.`$dt`) + .description(details.label) + .counterpartyAccountNumber(otherAccount.number) + .counterpartyAccountHolder(otherAccount.holder) + .counterpartyAccountKind(otherAccount.kind) + .counterpartyNationalId(otherAccount.bank.national_identifier) + .counterpartyBankName(otherAccount.bank.name) + .counterpartyIban(otherAccount.bank.IBAN) + .saveMe() + transaction <- mappedTransaction.toTransaction(account) + } yield transaction + } + + override def setBankAccountLastUpdated(bankNationalIdentifier: String, accountNumber : String, updateDate: Date) : Boolean = { + val result = for { + bankId <- getBankByNationalIdentifier(bankNationalIdentifier).map(_.bankId) + account <- getAccountByNumber(bankId, accountNumber) + } yield { + val acc = getBankAccount(bankId, account.accountId) + acc match { + case a => true //a.lastUpdate = updateDate //TODO + case _ => logger.warn("can't set bank account.lastUpdated because the account was not found"); false + } + } + result.getOrElse(false) + } + + /* + End of transaction importer api + */ + + + override def updateAccountLabel(bankId: BankId, accountID: AccountId, label: String): Boolean = { + //this will be Full(true) if everything went well + val result = for { + acc <- getBankAccount(bankId, accountID) + bank <- getBank(bankId) + d <- MappedKafkaBankAccountData.find(By(MappedKafkaBankAccountData.accountId, accountID.value), By(MappedKafkaBankAccountData.bankId, bank.bankId.value)) + } yield { + d.setLabel(label) + d.save() + } + result.getOrElse(false) + } + + + + + ///////////////////////////////////////////////////////////////////////////// + + + + def process(reqId: String, command: String, argList: Map[String,String]): json.JValue = { //List[Map[String,String]] = { + var retries:Int = 3 + while (consumer == null && retries > 0 ) { + retries -= 1 + consumer = new KafkaConsumer() + } + retries = 3 + while (producer == null && retries > 0) { + retries -= 1 + producer = new KafkaProducer() + } + if (producer == null || consumer == null) + return json.parse("""{"error":"connection failed. try again later."}""") + // Send request to Kafka + producer.send(reqId, command, argList, "1") + // Request sent, now we wait for response with the same reqId + val res = consumer.getResponse(reqId) + res + } + + + // Helper for creating a transaction + def createNewTransaction(r: KafkaInboundTransaction):Box[Transaction] = { + var datePosted: Date = null + if (r.details.posted != null) // && r.details.posted.matches("^[0-9]{8}$")) + datePosted = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).parse(r.details.posted) + + var dateCompleted: Date = null + if (r.details.completed != null) // && r.details.completed.matches("^[0-9]{8}$")) + dateCompleted = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).parse(r.details.completed) + + for { + counterparty <- tryo{r.counterparty} + thisAccount <- getBankAccount(BankId(r.this_account.bank), AccountId(r.this_account.id)) + //creates a dummy OtherBankAccount without an OtherBankAccountMetadata, which results in one being generated (in OtherBankAccount init) + dummyOtherBankAccount <- tryo{createOtherBankAccount(counterparty.get, thisAccount, None)} + //and create the proper OtherBankAccount with the correct "id" attribute set to the metadataId of the OtherBankAccountMetadata object + //note: as we are passing in the OtherBankAccountMetadata we don't incur another db call to get it in OtherBankAccount init + otherAccount <- tryo{createOtherBankAccount(counterparty.get, thisAccount, Some(dummyOtherBankAccount.metadata))} + } yield { + // Create new transaction + new Transaction( + r.id, // uuid:String + TransactionId(r.id), // id:TransactionId + thisAccount, // thisAccount:BankAccount + otherAccount, // otherAccount:OtherBankAccount + r.details.`type`, // transactionType:String + BigDecimal(r.details.value), // val amount:BigDecimal + thisAccount.currency, // currency:String + Some(r.details.description), // description:Option[String] + datePosted, // startDate:Date + dateCompleted, // finishDate:Date + BigDecimal(r.details.new_balance) // balance:BigDecimal) + ) + } + } + + + case class KafkaBank(r: KafkaInboundBank) extends Bank { + def fullName = r.full_name + def shortName = r.short_name + def logoUrl = r.logo + def bankId = BankId(r.id) + def nationalIdentifier = "None" //TODO + def swiftBic = "None" //TODO + def websiteUrl = r.website + } + + // Helper for creating other bank account + def createOtherBankAccount(c: KafkaInboundTransactionCounterparty, o: KafkaBankAccount, alreadyFoundMetadata : Option[OtherBankAccountMetadata]) = { + new OtherBankAccount( + id = alreadyFoundMetadata.map(_.metadataId).getOrElse(""), + label = c.account_number.getOrElse(c.name.getOrElse("")), + nationalIdentifier = "", + swift_bic = None, + iban = None, + number = c.account_number.getOrElse(""), + bankName = "", + kind = "", + originalPartyBankId = BankId(o.bankId.value), + originalPartyAccountId = AccountId(o.accountId.value), + alreadyFoundMetadata = alreadyFoundMetadata + ) + } + + case class KafkaBankAccount(r: KafkaInboundAccount) extends BankAccount { + def accountId : AccountId = AccountId(r.id) + def accountType : String = r.`type` + def balance : BigDecimal = BigDecimal(r.balance.amount) + def currency : String = r.balance.currency + def name : String = r.owners.head + def swift_bic : Option[String] = Some("swift_bic") //TODO + def iban : Option[String] = Some(r.IBAN) + def number : String = r.number + def bankId : BankId = BankId(r.bank) + def lastUpdate : Date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.ENGLISH).parse(today.getTime.toString) + def accountHolder : String = r.owners.head + + // Fields modifiable from OBP are stored in mapper + def label : String = (for { + d <- MappedKafkaBankAccountData.find(By(MappedKafkaBankAccountData.accountId, r.id)) + } yield { + d.getLabel + }).getOrElse(r.number) + + } + + + case class KafkaInboundBank( + id : String, + short_name : String, + full_name : String, + logo : String, + website : String) + + + /** Bank Branches + * + * @param id Uniquely identifies the Branch within the Bank. SHOULD be url friendly (no spaces etc.) Used in URLs + * @param bank_id MUST match bank_id in Banks + * @param name Informal name for the Branch + * @param address Address + * @param location Geolocation + * @param meta Meta information including the license this information is published under + * @param lobby Info about when the lobby doors are open + * @param driveUp Info about when automated facilities are open e.g. cash point machine + */ + case class KafkaInboundBranch( + id : String, + bank_id: String, + name : String, + address : KafkaInboundAddress, + location : KafkaInboundLocation, + meta : KafkaInboundMeta, + lobby : Option[KafkaInboundLobby], + driveUp : Option[KafkaInboundDriveUp]) + + case class KafkaInboundLicense( + id : String, + name : String) + + case class KafkaInboundMeta( + license : KafkaInboundLicense) + + case class KafkaInboundLobby( + hours : String) + + case class KafkaInboundDriveUp( + hours : String) + + /** + * + * @param line_1 Line 1 of Address + * @param line_2 Line 2 of Address + * @param line_3 Line 3 of Address + * @param city City + * @param county County i.e. Division of State + * @param state State i.e. Division of Country + * @param post_code Post Code or Zip Code + * @param country_code 2 letter country code: ISO 3166-1 alpha-2 + */ + case class KafkaInboundAddress( + line_1 : String, + line_2 : String, + line_3 : String, + city : String, + county : String, // Division of State + state : String, // Division of Country + post_code : String, + country_code: String) + + case class KafkaInboundLocation( + latitude : Double, + longitude : Double) + + case class KafkaInboundUser( + email : String, + password : String, + display_name : String) + + case class KafkaInboundValidatedUser( + email : String, + display_name : String) + + case class KafkaInboundAccount( + id : String, + bank : String, + label : String, + number : String, + `type` : String, + balance : KafkaInboundBalance, + IBAN : String, + owners : List[String], + generate_public_view : Boolean, + generate_accountants_view : Boolean, + generate_auditors_view : Boolean) + + case class KafkaInboundBalance( + currency : String, + amount : String) + + case class KafkaInboundTransaction( + id : String, + this_account : KafkaInboundAccountId, + counterparty : Option[KafkaInboundTransactionCounterparty], + details : KafkaInboundTransactionDetails) + + case class KafkaInboundTransactionCounterparty( + name : Option[String], // Also known as Label + account_number : Option[String]) + + case class KafkaInboundAccountId( + id : String, + bank : String) + + case class KafkaInboundTransactionDetails( + `type` : String, + description : String, + posted : String, + completed : String, + new_balance : String, + value : String) + + + case class KafkaInboundAtm( + id : String, + bank_id: String, + name : String, + address : KafkaInboundAddress, + location : KafkaInboundLocation, + meta : KafkaInboundMeta + ) + + + case class KafkaInboundProduct( + bank_id : String, + code: String, + name : String, + category : String, + family : String, + super_family : String, + more_info_url : String, + meta : KafkaInboundMeta + ) + + + case class KafkaInboundAccountData( + banks : List[KafkaInboundBank], + users : List[KafkaInboundUser], + accounts : List[KafkaInboundAccount] + ) + + // We won't need this. TODO clean up. + case class KafkaInboundData( + banks : List[KafkaInboundBank], + users : List[KafkaInboundUser], + accounts : List[KafkaInboundAccount], + transactions : List[KafkaInboundTransaction], + branches: List[KafkaInboundBranch], + atms: List[KafkaInboundAtm], + products: List[KafkaInboundProduct], + crm_events: List[KafkaInboundCrmEvent] + ) + + + case class KafkaInboundCrmEvent( + id : String, // crmEventId + bank_id : String, + customer: KafkaInboundCustomer, + category : String, + detail : String, + channel : String, + actual_date: String + ) + + case class KafkaInboundCustomer( + name: String, + number : String // customer number, also known as ownerId (owner of accounts) aka API User? + ) + + + case class KafkaInboundTransactionId( + transactionId : String + ) + case class KafkaOutboundTransaction(username: String, + accountId: String, + currency: String, + amount: String, + otherAccountId: String, + otherAccountCurrency: String, + transactionType: String) +} +