Merge branch 'develop' into test300

# Conflicts:
#	release_notes.md
#	src/main/scala/code/views/MapperViews.scala
This commit is contained in:
hongwei1 2017-05-24 01:29:39 +02:00
commit fc4f56052c
75 changed files with 1034 additions and 740 deletions

21
pom.xml
View File

@ -13,7 +13,7 @@
<properties>
<scala.version>2.11</scala.version>
<scala.compiler>2.11.8</scala.compiler>
<akka.version>2.4.10</akka.version>
<akka.version>2.4.17</akka.version>
<lift.version>2.6.3</lift.version>
<obp-ri.version>2016.11-RC6-SNAPSHOT</obp-ri.version>
<!-- Common plugin settings -->
@ -89,18 +89,13 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>

View File

@ -16,6 +16,6 @@ Date Commit Action
20/02/2017 d8b6907 added new pair to props : # If true, get counterparties from OBP db, else put message on Kafka queue. <--> get_counterparties_from_OBP_DB = true
05/04/2017 added 8 new caching props to sample.props.template which start with connector.cache.ttl.seconds.* and end with function names (getBank, getBanks, getAccount, getAccounts, getTransaction, getTransactions, getCounterpartyFromTransaction, getCounterpartiesFromTransaction). If it's omitted default value is 0 i.e. no caching.
02/05/2017 3084827 added 1 new caching props to sample.props.template connector.cache.ttl.seconds.APIMethods121.getTransactions. If it's omitted default value is 0 i.e. no caching. This cacahe is from API level.
09/05/2017 46317aa Added allow_public_views=false, we will not create the public views when it is false. And when we access the existing public view, we also need check it.
10/05/2017 7f95a5c added allow_public_views=false, we will not create the public views and will not access them (if public views are exsiting)when it is false.
```

View File

@ -36,12 +36,14 @@ import java.util.Locale
import javax.mail.internet.MimeMessage
import code.accountholder.MapperAccountHolders
import code.actorsystem.ObpActorSystem
import code.api.Constant._
import code.api.ResourceDocs1_4_0.ResourceDocs
import code.api._
import code.api.sandbox.SandboxApiCalls
import code.api.util.{APIUtil, ErrorMessages}
import code.atms.MappedAtm
import code.bankconnectors.KafkaHelperActors
import code.branches.MappedBranch
import code.cards.{MappedPhysicalCard, PinReset}
import code.crm.MappedCrmEvent
@ -196,21 +198,9 @@ class Boot extends MdcLoggable {
logger.info("running mode: " + runningMode)
logger.info(s"ApiPathZero (the bit before version) is $ApiPathZero")
if (runningMode == "Production mode")
System.setProperty("log_dir", Helper.getHostname)
logger.debug(s"If you can read this, logging level is debug")
if (!Props.getBool("remotedata.enable", false)) {
try {
logger.info(s"RemoteDataActors.startLocalWorkerSystem() starting")
RemotedataActors.startLocalWorkerSystem()
} catch {
case ex: Exception => logger.warn(s"RemoteDataActors.startLocalWorkerSystem() could not start: $ex")
}
}
val actorSystem = ObpActorSystem.startLocalActorSystem()
// where to search snippets
LiftRules.addToPackages("code")
@ -278,6 +268,21 @@ class Boot extends MdcLoggable {
}
}
if (connector.startsWith("kafka")) {
logger.info(s"KafkaHelperActors.startLocalKafkaHelperWorkers( ${actorSystem} ) starting")
KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem)
}
if (!Props.getBool("remotedata.enable", false)) {
try {
logger.info(s"RemotedataActors.startLocalRemotedataWorkers( ${actorSystem} ) starting")
RemotedataActors.startActors(actorSystem)
} catch {
case ex: Exception => logger.warn(s"RemotedataActors.startLocalRemotedataWorkers( ${actorSystem} ) could not start: $ex")
}
}
// API Metrics (logs of API calls)
// If set to true we will write each URL with params to a datastore / log file
if (Props.getBool("write_metrics", false)) {

View File

@ -1,13 +1,10 @@
package code.remotedata
package code.actorsystem
import akka.actor.ActorSystem
import code.util.Helper
import com.typesafe.config.ConfigFactory
import net.liftweb.util.Props
object RemotedataConfig {
object ObpActorConfig {
val remoteHostname = Props.get("remotedata.hostname").openOr("127.0.0.1")
val remotePort = Props.get("remotedata.port").openOr("2662")

View File

@ -0,0 +1,27 @@
package code.actorsystem
import code.api.APIFailure
import net.liftweb.common._
import net.liftweb.json.JsonAST.JValue
trait ObpActorHelper {
def extractResult[T](in: T) = {
in match {
case pf: ParamFailure[_] =>
pf.param match {
case af: APIFailure => af
case f: Failure => f
case _ => pf
}
case af: APIFailure => af
case f: Failure => f
case l: List[T] => l
case s: Set[T] => s
case Full(r) => r
case j: JValue => j
case t: T => t
case _ => APIFailure(s"result extraction failed", 501)
}
}
}

View File

@ -0,0 +1,48 @@
package code.actorsystem
import akka.util.Timeout
import code.api.APIFailure
import net.liftweb.common._
import net.liftweb.util.Props
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
trait ObpActorInit {
// Deafult is 3 seconds, which should be more than enough for slower systems
val ACTOR_TIMEOUT: Long = Props.getLong("remotedata.timeout").openOr(3)
val actorName = CreateActorNameFromClassName(this.getClass.getName)
val actor = ObpLookupSystem.getRemotedataActor(actorName)
val TIMEOUT = (ACTOR_TIMEOUT seconds)
implicit val timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds))
def extractFuture[T](f: Future[Any]): T = {
val r = f.map {
case s: Set[T] => s
case l: List[T] => l
case t: T => t
case _ => Empty ~> APIFailure(s"future extraction failed", 501)
}
Await.result(r, TIMEOUT).asInstanceOf[T]
}
def extractFutureToBox[T](f: Future[Any]): Box[T] = {
val r = f.map {
case pf: ParamFailure[_] => Empty ~> pf
case af: APIFailure => Empty ~> af
case f: Failure => f
case Empty => Empty
case t: T => Full(t)
case _ => Empty ~> APIFailure(s"future extraction to box failed", 501)
}
Await.result(r, TIMEOUT)
}
def CreateActorNameFromClassName(c: String): String = {
val n = c.replaceFirst("^.*Remotedata", "").replaceAll("\\$.*", "")
Character.toLowerCase(n.charAt(0)) + n.substring(1)
}
}

View File

@ -0,0 +1,20 @@
package code.actorsystem
import akka.actor.ActorSystem
import code.util.Helper
import code.util.Helper.MdcLoggable
import com.typesafe.config.ConfigFactory
object ObpActorSystem extends MdcLoggable {
val props_hostname = Helper.getHostname
var obpActorSystem: ActorSystem = _
def startLocalActorSystem(): ActorSystem = {
logger.info("Starting local actor system")
logger.info(ObpActorConfig.localConf)
obpActorSystem = ActorSystem.create(s"ObpActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.localConf)))
obpActorSystem
}
}

View File

@ -0,0 +1,64 @@
package code.actorsystem
import akka.actor.ActorSystem
import code.util.Helper
import code.util.Helper.MdcLoggable
import com.typesafe.config.ConfigFactory
import net.liftweb.util.Props
object ObpLookupSystem extends ObpLookupSystem {
this.init
}
trait ObpLookupSystem extends MdcLoggable {
var obpLookupSystem: ActorSystem = null
val props_hostname = Helper.getHostname
def init (): ActorSystem = {
if (obpLookupSystem == null ) {
val system = ActorSystem("ObpLookupSystem", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.lookupConf)))
logger.info(ObpActorConfig.lookupConf)
obpLookupSystem = system
}
obpLookupSystem
}
def getKafkaActor(actorName: String) = {
val actorPath: String = {
val hostname = ObpActorConfig.localHostname
val port = ObpActorConfig.localPort
val props_hostname = Helper.getHostname
if (port == 0) {
logger.error("Failed to connect to local Kafka actor")
}
s"akka.tcp://ObpActorSystem_${props_hostname}@${hostname}:${port}/user/${actorName}"
}
this.obpLookupSystem.actorSelection(actorPath)
}
def getRemotedataActor(actorName: String) = {
val actorPath: String = Props.getBool("remotedata.enable", false) match {
case true =>
val hostname = ObpActorConfig.remoteHostname
val port = ObpActorConfig.remotePort
val remotedata_hostname = Helper.getRemotedataHostname
s"akka.tcp://RemotedataActorSystem_${remotedata_hostname}@${hostname}:${port}/user/${actorName}"
case false =>
val hostname = ObpActorConfig.localHostname
val port = ObpActorConfig.localPort
val props_hostname = Helper.getHostname
if (port == 0) {
logger.error("Failed to connect to local Remotedata actor")
}
s"akka.tcp://ObpActorSystem_${props_hostname}@${hostname}:${port}/user/${actorName}"
}
this.obpLookupSystem.actorSelection(actorPath)
}
}

View File

@ -545,7 +545,7 @@ object SwaggerDefinitionsJSON {
on_hot_list = true,
technology = "String",
networks = List("String"),
allows = List("String"),
allows = List("credit"),
account_id = "String",
replacement = replacementJSON,
pin_reset = List(pinResetJSON),

View File

@ -324,10 +324,6 @@ object APIUtil extends MdcLoggable {
val consumerId = if (u != null) c.id.toString() else "null"
var appName = if (u != null) c.name.toString() else "null"
var developerEmail = if (u != null) c.developerEmail.toString() else "null"
//TODO no easy way to get it, make it later
//name of the Scala Partial Function being used for the endpoint
val implementedByPartialFunction = rd match {
case Some(r) => r.apiFunction
case _ => ""
@ -337,7 +333,6 @@ object APIUtil extends MdcLoggable {
//(GET, POST etc.) --S.request.get.requestType.method
val verb = S.request.get.requestType.method
APIMetrics.apiMetrics.vend.saveMetric(userId, S.uriAndQueryString.getOrElse(""), date, duration: Long, userName, appName, developerEmail, consumerId, implementedByPartialFunction, implementedInVersion, verb)
}
}

View File

@ -17,7 +17,7 @@ import code.api.v2_1_0.JSONFactory210._
import code.api.v2_2_0.{CounterpartyJsonV220, JSONFactory220}
import code.atms.Atms
import code.atms.Atms.AtmId
import code.bankconnectors._
import code.bankconnectors.{OBPQueryParam, _}
import code.branches.Branches
import code.branches.Branches.BranchId
import code.consumer.Consumers
@ -40,7 +40,7 @@ import net.liftweb.util.Helpers.tryo
import net.liftweb.util.Props
import scala.collection.immutable.Nil
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
// Makes JValue assignment to Nil work
import code.api.util.APIUtil._
import code.api.{APIFailure, ChargePolicy}
@ -915,7 +915,7 @@ trait APIMethods210 {
for {
u <- user ?~! UserNotLoggedIn
isValidBankIdFormat <- tryo(assert(isValidID(bankId.value)))?~! InvalidBankIdFormat
canCreateCardsForBank <- booleanToBox(hasEntitlement("", u.userId, CanCreateCardsForBank), UserDoesNotHaveRole +CanCreateCardsForBank)
canCreateCardsForBank <- booleanToBox(hasEntitlement(bankId.value, u.userId, CanCreateCardsForBank), UserDoesNotHaveRole +CanCreateCardsForBank)
postJson <- tryo {json.extract[PostPhysicalCardJSON]} ?~! {InvalidJsonFormat}
postedAllows <- postJson.allows match {
case List() => booleanToBox(true)
@ -1682,8 +1682,7 @@ trait APIMethods210 {
u <- user ?~! UserNotLoggedIn
hasEntitlement <- booleanToBox(hasEntitlement("", u.userId, ApiRole.CanReadMetrics), UserDoesNotHaveRole + CanReadMetrics )
//Note: Filters Part 1:
//?start_date=100&end_date=1&limit=200&offset=0
//Note: Filters Part 1: //eg: /management/metrics?start_date=2010-05-22&end_date=2017-05-22&limit=200&offset=0
inputDateFormat <- Full(new SimpleDateFormat("yyyy-MM-dd", Locale.ENGLISH))
// set the long,long ago as the default date.
@ -1705,39 +1704,14 @@ trait APIMethods210 {
}
) ?~! s"${InvalidNumber } limit:${S.param("limit").get }"
// default0, start from page 0
offset <- tryo(S.param("offset").getOrElse("0").toInt) ?~!
s"${InvalidNumber } offset:${S.param("offset").get }"
metrics <- Full(APIMetrics.apiMetrics.vend.getAllMetrics(List(OBPLimit(limit), OBPOffset(offset), OBPFromDate(startDate), OBPToDate(endDate))))
offset <- tryo(S.param("offset").getOrElse("0").toInt) ?~! s"${InvalidNumber } offset:${S.param("offset").get }"
//Because of "rd.getDate().before(startDatePlusOneDay)" exclude the startDatePlusOneDay, so we need to plus one day more then today.
// add because of endDate is yyyy-MM-dd format, it started from 0, so it need to add 2 days.
//startDatePlusOneDay <- Full(inputDateFormat.parse((new Date(endDate.getTime + 1000 * 60 * 60 * 24 * 2)).toInstant.toString))
///filterByDate <- Full(metrics.toList.filter(rd => (rd.getDate().after(startDate)) && (rd.getDate().before(startDatePlusOneDay))))
/** pages:
* eg: total=79
* offset=0, limit =50
* filterByDate.slice(0,50)
* offset=1, limit =50
* filterByDate.slice(50*1,50+50*1)--> filterByDate.slice(50,100)
* offset=2, limit =50
* filterByDate.slice(50*2,50+50*2)-->filterByDate.slice(100,150)
*/
//filterByPages <- Full(filterByDate.slice(offset * limit, (offset * limit + limit)))
//Filters Part 2.
//eg: /management/metrics?start_date=100&end_date=1&limit=200&offset=0
// &user_id=c7b6cb47-cb96-4441-8801-35b57456753a&consumer_id=78&app_name=hognwei&implemented_in_version=v2.1.0&verb=GET&anon=true
// consumer_id (if null ignore)
// user_id (if null ignore)
// anon true => return where user_id is null. false => return where where user_id is not null(if null ignore)
// url (if null ignore)
// app_name (if null ignore)
// implemented_by_partial_function (if null ignore)
// implemented_in_version (if null ignore)
// verb (if null ignore)
//Filters Part 2. -- the optional varibles:
//eg: /management/metrics?start_date=2010-05-22&end_date=2017-05-22&limit=200&offset=0&user_id=c7b6cb47-cb96-4441-8801-35b57456753a&consumer_id=78&app_name=hognwei&implemented_in_version=v2.1.0&verb=GET&anon=true
consumerId <- Full(S.param("consumer_id")) //(if null ignore)
userId <- Full(S.param("user_id")) //(if null ignore)
anon <- Full(S.param("anon")) // (if null ignore) true => return where user_id is null.false => return where user_id is not null.
@ -1751,17 +1725,33 @@ trait APIMethods210 {
assert(anon.get.equals("true") || anon.get.equals("false"))
}) ?~! s"value anon:${anon.get } is Wrong . anon only have two value true or false or omit anon field"
parameters = new collection.mutable.ListBuffer[OBPQueryParam]()
setFilterPart1 <- Full(parameters += OBPLimit(limit) +=OBPOffset(offset) += OBPFromDate(startDate)+= OBPToDate(endDate))
setFilterPart2 <- if (!consumerId.isEmpty)
Full(parameters += OBPConsumerId(consumerId.get))
else if (!userId.isEmpty)
Full(parameters += OBPUserId(userId.get))
else if (!url.isEmpty)
Full(parameters += OBPUrl(url.get))
else if (!appName.isEmpty)
Full(parameters += OBPAppName(appName.get))
else if (!implementedInVersion.isEmpty)
Full(parameters += OBPImplementedInVersion(implementedInVersion.get))
else if (!implementedByPartialFunction.isEmpty)
Full(parameters += OBPImplementedByPartialFunction(implementedByPartialFunction.get))
else if (!verb.isEmpty)
Full(parameters += OBPVerb(verb.get))
else
Full(parameters)
metrics <- Full(APIMetrics.apiMetrics.vend.getAllMetrics(parameters.toList))
// the anon field is not in database, so here use different way to filer it.
filterByFields: List[APIMetric] = metrics
.filter(rd => (if (!consumerId.isEmpty) rd.getConsumerId().equals(consumerId.get) else true))
.filter(rd => (if (!userId.isEmpty) rd.getUserId().equals(userId.get) else true))
.filter(rd => (if (!anon.isEmpty && anon.get.equals("true")) (rd.getUserId().equals("null")) else true))
.filter(rd => (if (!anon.isEmpty && anon.get.equals("false")) (!rd.getUserId().equals("null")) else true))
//TODO url can not contain '&', if url is /management/metrics?start_date=100&end_date=1&limit=200&offset=0, it can not work.
.filter(rd => (if (!url.isEmpty) rd.getUrl().equals(url.get) else true))
.filter(rd => (if (!appName.isEmpty) rd.getAppName.equals(appName.get) else true))
.filter(rd => (if (!implementedByPartialFunction.isEmpty) rd.getImplementedByPartialFunction().equals(implementedByPartialFunction.get) else true))
.filter(rd => (if (!implementedInVersion.isEmpty) rd.getImplementedInVersion().equals(implementedInVersion.get) else true))
.filter(rd => (if (!verb.isEmpty) rd.getVerb().equals(verb.get) else true))
} yield {
val json = JSONFactory210.createMetricsJson(filterByFields)
successJsonResponse(Extraction.decompose(json)(DateFormatWithCurrentTimeZone))

View File

@ -3,6 +3,7 @@ package code.api.v2_2_0
import java.text.SimpleDateFormat
import java.util.{Date, Locale, UUID}
import code.actorsystem.ObpActorConfig
import code.api.ResourceDocs1_4_0.SwaggerDefinitionsJSON._
import code.api.util.APIUtil.{isValidCurrencyISOCode, _}
import code.api.util.ApiRole._
@ -15,7 +16,6 @@ import code.consumer.Consumers
import code.metrics.{ConnMetric, ConnMetrics}
import code.model.dataAccess.BankAccountCreation
import code.model.{BankId, ViewId, _}
import code.remotedata.RemotedataConfig
import code.util.Helper._
import net.liftweb.common.{Box, Full}
import net.liftweb.http.rest.RestHelper
@ -46,8 +46,8 @@ trait APIMethods220 {
val f7 = CachedFunctionJSON("getCounterpartyFromTransaction", Props.get("connector.cache.ttl.seconds.getCounterpartyFromTransaction", "0").toInt)
val f8 = CachedFunctionJSON("getCounterpartiesFromTransaction", Props.get("connector.cache.ttl.seconds.getCounterpartiesFromTransaction", "0").toInt)
val akkaPorts = PortJSON("remotedata.local.port", RemotedataConfig.localPort.toString) :: PortJSON("remotedata.port", RemotedataConfig.remotePort) :: Nil
val akka = AkkaJSON(akkaPorts, RemotedataConfig.akka_loglevel)
val akkaPorts = PortJSON("remotedata.local.port", ObpActorConfig.localPort.toString) :: PortJSON("remotedata.port", ObpActorConfig.remotePort) :: Nil
val akka = AkkaJSON(akkaPorts, ObpActorConfig.akka_loglevel)
val cache = f1::f2::f3::f4::f5::f6::f7::f8::Nil
val metrics = MetricsJSON("es.metrics.port.tcp", Props.get("es.metrics.port.tcp", "9300")) ::
@ -596,8 +596,8 @@ trait APIMethods220 {
case "management" :: "connector" :: "metrics" :: Nil JsonGet _ => {
user => {
for {
// u <- user ?~! ErrorMessages.UserNotLoggedIn
// _ <- booleanToBox(hasEntitlement("", u.userId, ApiRole.CanGetConnectorMetrics), s"$CanGetConnectorMetrics entitlement required")
u <- user ?~! ErrorMessages.UserNotLoggedIn
_ <- booleanToBox(hasEntitlement("", u.userId, ApiRole.CanGetConnectorMetrics), s"$CanGetConnectorMetrics entitlement required")
//Note: Filters Part 1:
//?start_date=100&end_date=1&limit=200&offset=0

View File

@ -85,6 +85,13 @@ case class OBPOffset(value: Int) extends OBPQueryParam
case class OBPFromDate(value: Date) extends OBPQueryParam
case class OBPToDate(value: Date) extends OBPQueryParam
case class OBPOrdering(field: Option[String], order: OBPOrder) extends OBPQueryParam
case class OBPConsumerId(value: String) extends OBPQueryParam
case class OBPUserId(value: String) extends OBPQueryParam
case class OBPUrl(value: String) extends OBPQueryParam
case class OBPAppName(value: String) extends OBPQueryParam
case class OBPImplementedByPartialFunction(value: String) extends OBPQueryParam
case class OBPImplementedInVersion(value: String) extends OBPQueryParam
case class OBPVerb(value: String) extends OBPQueryParam
//Note: this is used for connector method: 'def getUser(name: String, password: String): Box[InboundUser]'
case class InboundUser(
@ -220,14 +227,8 @@ trait Connector {
//because we don't have a db backed model for OtherBankAccounts, we need to construct it from an
//OtherBankAccountMetadata and a transaction
for {
tlist <- getTransactions(thisBankId, thisAccountId).map { t =>
t.filter { e =>
if (e.otherAccount.thisAccountId.value == metadata.getAccountNumber)
true
else
false
}
}
//TODO, performance issue, when many metadata and many transactions, this will course a big problem .
tlist <- getTransactions(thisBankId, thisAccountId).map(_.filter(_.otherAccount.thisAccountId.value == metadata.getAccountNumber))
} yield {
tlist match {
case list: List[Transaction] if list.nonEmpty =>

View File

@ -0,0 +1,37 @@
package code.bankconnectors
import akka.pattern.ask
import code.actorsystem.{ObpActorInit, ObpLookupSystem}
import code.util.Helper.MdcLoggable
import net.liftweb.json.JValue
object KafkaHelper extends KafkaHelper
trait KafkaHelper extends ObpActorInit with MdcLoggable {
override val actorName = "kafkaHelper" //CreateActorNameFromClassName(this.getClass.getName)
override val actor = ObpLookupSystem.getRemotedataActor(actorName)
/**
* Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala
* In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class
* eg: case class Company(name: String, address: String) -->
* Company("TESOBE","Berlin")
* Map(name->"TESOBE", address->"2")
*
* @param caseClassObject
* @return Map[String, String]
*/
def transferCaseClassToMap(caseClassObject: scala.Product) =
caseClassObject.getClass.getDeclaredFields.map(_.getName) // all field names
.zip(caseClassObject.productIterator.to).toMap.asInstanceOf[Map[String, String]] // zipped with all values
def process(request: scala.Product): JValue = {
val mapRequest:Map[String, String] = transferCaseClassToMap(request)
process(mapRequest)
}
def process (request: Map[String, String]): JValue ={
extractFuture(actor ? request)
}
}

View File

@ -0,0 +1,104 @@
package code.bankconnectors
import java.util
import java.util.{Properties, UUID}
import akka.actor.Actor
import code.actorsystem.{ObpActorHelper, ObpActorInit}
import code.util.Helper.MdcLoggable
import net.liftweb.json.{DefaultFormats, Extraction, JValue}
import akka.pattern.ask
import net.liftweb.json
import net.liftweb.util.Props
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors.WakeupException
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionException, Future}
/**
* Created by petar on 5/8/17.
*/
class KafkaHelperActor extends Actor with ObpActorInit with ObpActorHelper with MdcLoggable {
implicit val formats = DefaultFormats
val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set")
val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set")
val producerProps = new Properties()
producerProps.put("bootstrap.servers", Props.get("kafka.bootstrap_hosts")openOr("localhost:9092"))
producerProps.put("acks", "all")
producerProps.put("retries", "0")
producerProps.put("batch.size", "16384")
producerProps.put("linger.ms", "1")
producerProps.put("buffer.memory", "33554432")
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", Props.get("kafka.bootstrap_hosts")openOr("localhost:9092"))
consumerProps.put("enable.auto.commit", "false")
consumerProps.put("group.id", UUID.randomUUID.toString)
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
var producer = new KafkaProducer[String, String](producerProps)
var consumer = new KafkaConsumer[String, String](consumerProps)
def getResponse(reqId: String): String = {
var res = """{"error":"KafkaConsumer could not fetch response"}"""
try {
consumer.synchronized {
consumer.subscribe(util.Arrays.asList(responseTopic))
var run = true
var retries = 1
while (run && retries > 0) {
val consumerMap = consumer.poll(100)
val records = consumerMap.records(responseTopic).iterator
while (records.hasNext) {
val record = records.next
println("FILTERING: " + record + " => " + reqId)
retries = retries - 1
if (record.key == reqId)
println("FOUND >>> " + record)
run = false
res = record.value
}
}
}
} catch {
case e: WakeupException => logger.error(e)
}
res
}
def processRequest(jsonRequest: JValue, reqId: String): JValue = {
import scala.concurrent.ExecutionContext.Implicits.global
val futureResponse = Future { getResponse(reqId) }
try {
val record = new ProducerRecord(requestTopic, reqId, json.compactRender(jsonRequest))
producer.send(record).get
} catch {
case ie: InterruptedException => return json.parse(s"""{"error":"sending message to kafka interrupted: ${ie}"}""")
case ex: ExecutionException => return json.parse(s"""{"error":"could not send message to kafka: ${ex}"}""")
case t:Throwable => return json.parse(s"""{"error":"unexpected error sending message to kafka: ${t}"}""")
}
json.parse(Await.result(futureResponse, Duration("3 seconds"))) \\ "data"
}
def kafkaProcess(request: Map[String,String]): JValue = {
val reqId = UUID.randomUUID().toString
val jsonRequest = Extraction.decompose(request)
processRequest(jsonRequest, reqId)
}
def receive = {
case request: Map[String, String] =>
logger.info("kafka_request: " + request )
sender ! extractResult(kafkaProcess(request))
}
}

View File

@ -0,0 +1,23 @@
package code.bankconnectors
import akka.actor.{ActorSystem, Props => ActorProps}
import code.util.Helper
import code.util.Helper.MdcLoggable
object KafkaHelperActors extends MdcLoggable {
val props_hostname = Helper.getHostname
def startKafkaHelperActors(actorSystem: ActorSystem) = {
val actorsKafkaHelper = Map(
ActorProps[KafkaHelperActor] -> "kafkaHelper" //KafkaHelper.actorName
)
actorsKafkaHelper.foreach { a => logger.info(actorSystem.actorOf(a._1, name = a._2)) }
}
def startLocalKafkaHelperWorkers(system: ActorSystem): Unit = {
logger.info("Starting local KafkaHelper workers")
startKafkaHelperActors(system)
}
}

View File

@ -54,10 +54,8 @@ import net.liftweb.util.Helpers._
import net.liftweb.util.Props
import code.util.Helper.MdcLoggable
object KafkaMappedConnector extends Connector with MdcLoggable {
object KafkaMappedConnector extends Connector with KafkaHelper with MdcLoggable {
lazy val producer = new KafkaProducer()
lazy val consumer = new KafkaConsumer()
type AccountType = KafkaBankAccount
implicit override val nameOfConnector = KafkaMappedConnector.getClass.getSimpleName
@ -148,11 +146,11 @@ object KafkaMappedConnector extends Connector with MdcLoggable {
setAccountOwner(username, BankId(acc.bankId), AccountId(acc.accountId), acc.owners)
views.foreach(v => {
Views.views.vend.addPermission(v.uid, user)
logger.info(s"------------> updated view ${v.uid} for resourceuser ${user} and account ${acc}")
//logger.info(s"------------> updated view ${v.uid} for resourceuser ${user} and account ${acc}")
})
existing_views.filterNot(_.users.contains(user.resourceUserId)).foreach (v => {
Views.views.vend.addPermission(v.uid, user)
logger.info(s"------------> added resourceuser ${user} to view ${v.uid} for account ${acc}")
//logger.info(s"------------> added resourceuser ${user} to view ${v.uid} for account ${acc}")
})
}
}
@ -1480,173 +1478,7 @@ object KafkaMappedConnector extends Connector with MdcLoggable {
charge_summary: String
)
def process(request: Map[String,String]): json.JValue = {
val reqId = UUID.randomUUID().toString
if (producer.send(reqId, request, "1")) {
// Request sent, now we wait for response with the same reqId
val res = consumer.getResponse(reqId)
return res
}
return json.parse("""{"error":"could not send message to kafka"}""")
}
}
import java.util.{Properties, UUID}
import kafka.consumer.{Consumer, _}
import kafka.message._
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.utils.Json
import net.liftweb.json
import net.liftweb.json._
import net.liftweb.util.Props
class KafkaConsumer(val zookeeper: String = Props.get("kafka.zookeeper_host").openOrThrowException("no kafka.zookeeper_host set"),
val topic: String = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set"),
val delay: Long = 0) extends MdcLoggable {
def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("auto.offset.reset", "smallest")
props.put("auto.commit.enable", "true")
props.put("zookeeper.sync.time.ms", "2000")
props.put("auto.commit.interval.ms", "1000")
props.put("zookeeper.session.timeout.ms", "6000")
props.put("zookeeper.connection.timeout.ms", "6000")
props.put("consumer.timeout.ms", "20000")
val config = new ConsumerConfig(props)
config
}
def getResponse(reqId: String): json.JValue = {
// create consumer with unique groupId in order to prevent race condition with kafka
val config = createConsumerConfig(zookeeper, UUID.randomUUID.toString)
val consumer = Consumer.create(config)
// recreate stream for topic if not existing
val consumerMap = consumer.createMessageStreams(Map(topic -> 1))
val streams = consumerMap.get(topic).get
// process streams
for (stream <- streams) {
val it = stream.iterator()
try {
// wait for message
while (it.hasNext()) {
val mIt = it.next()
// skip null entries
if (mIt != null && mIt.key != null && mIt.message != null) {
val msg = new String(mIt.message(), "UTF8")
val key = new String(mIt.key(), "UTF8")
// check if the id matches
if (key == reqId) {
// Parse JSON message
val j = json.parse(msg)
// disconnect from Kafka
consumer.shutdown()
// return as JSON
return j \\ "data"
}
} else {
logger.warn("KafkaConsumer: Got null value/key from kafka. Might be south-side connector issue.")
}
}
return json.parse("""{"error":"KafkaConsumer could not fetch response"}""") //TODO: replace with standard message
}
catch {
case e:kafka.consumer.ConsumerTimeoutException =>
logger.error("KafkaConsumer: timeout")
return json.parse("""{"error":"KafkaConsumer timeout"}""") //TODO: replace with standard message
}
}
// disconnect from kafka
consumer.shutdown()
logger.info("KafkaProducer: shutdown")
return json.parse("""{"info":"KafkaConsumer shutdown"}""") //TODO: replace with standard message
}
}
class KafkaProducer(
topic: String = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set"),
brokerList: String = Props.get("kafka.host")openOr("localhost:9092"),
clientId: String = UUID.randomUUID().toString,
synchronously: Boolean = true,
compress: Boolean = true,
batchSize: Integer = 200,
messageSendMaxRetries: Integer = 3,
requestRequiredAcks: Integer = -1
) extends MdcLoggable {
// determine compression codec
val codec = if (compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
// configure producer
val props = new Properties()
props.put("compression.codec", codec.toString)
props.put("producer.type", if (synchronously) "sync" else "async")
props.put("metadata.broker.list", brokerList)
props.put("batch.num.messages", batchSize.toString)
props.put("message.send.max.retries", messageSendMaxRetries.toString)
props.put("request.required.acks", requestRequiredAcks.toString)
props.put("client.id", clientId.toString)
// create producer
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
// create keyed message since we will use the key as id for matching response to a request
def kafkaMesssage(key: Array[Byte], message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
if (partition == null) {
// no partiton specified
new KeyedMessage(topic, key, message)
} else {
// specific partition
new KeyedMessage(topic, key, partition, message)
}
}
implicit val formats = DefaultFormats
def send(key: String, request: Map[String, String], partition: String = null): Boolean = {
val message = Json.encode(request)
// translate strings to utf8 before sending to kafka
send(key.getBytes("UTF8"), message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
}
def send(key: Array[Byte], message: Array[Byte], partition: Array[Byte]): Boolean = {
try {
// actually send the message to kafka
producer.send(kafkaMesssage(key, message, partition))
} catch {
case e: kafka.common.FailedToSendMessageException =>
logger.error("KafkaProducer: Failed to send message")
return false
case e: Throwable =>
logger.error("KafkaProducer: Unknown error while trying to send message")
e.printStackTrace()
return false
}
true
}
}

View File

@ -33,7 +33,6 @@ import code.accountholder.{AccountHolders, MapperAccountHolders}
import code.api.util.APIUtil.saveConnectorMetric
import code.api.util.ErrorMessages
import code.api.v2_1_0.{BranchJsonPost, TransactionRequestCommonBodyJSON}
import code.bankconnectors.KafkaMappedConnector_JVMcompatible
import code.branches.Branches.{Branch, BranchId}
import code.branches.MappedBranch
import code.fx.{FXRate, fx}
@ -74,11 +73,8 @@ import com.tesobe.obp.transport.spi.{DefaultPager, DefaultSorter, TimestampFilte
import net.liftweb.json.Extraction._
import code.util.Helper.MdcLoggable
object KafkaMappedConnector_JVMcompatible extends Connector with KafkaHelper with MdcLoggable {
object KafkaMappedConnector_JVMcompatible extends Connector with MdcLoggable {
lazy val producer = new KafkaProducer()
lazy val consumer = new KafkaConsumer()
type AccountType = KafkaBankAccount
implicit override val nameOfConnector = KafkaMappedConnector_JVMcompatible.getClass.getSimpleName
@ -1538,14 +1534,4 @@ object KafkaMappedConnector_JVMcompatible extends Connector with MdcLoggable {
charge_summary: String
)
def process(request: Map[String,String]): json.JValue = {
val reqId = UUID.randomUUID().toString
if (producer.send(reqId, request, "1")) {
// Request sent, now we wait for response with the same reqId
val res = consumer.getResponse(reqId)
return res
}
return json.parse("""{"error":"could not send message to kafka"}""")
}
}

View File

@ -61,10 +61,9 @@ import scala.collection.immutable.Nil
import scala.collection.mutable.ArrayBuffer
import code.util.Helper.MdcLoggable
object KafkaMappedConnector_vMar2017 extends Connector with MdcLoggable {
lazy val producer = new KafkaProducer()
lazy val consumer = new KafkaConsumer()
object KafkaMappedConnector_vMar2017 extends Connector with KafkaHelper with MdcLoggable {
type AccountType = BankAccount2
implicit override val nameOfConnector = KafkaMappedConnector_vMar2017.getClass.getSimpleName
@ -1830,30 +1829,5 @@ object KafkaMappedConnector_vMar2017 extends Connector with MdcLoggable {
)
}
def process(request: scala.Product): json.JValue = {
val reqId = UUID.randomUUID().toString
val requestToMap= stransferCaseClassToMap(request)
if (producer.send(reqId, requestToMap, "1")) {
// Request sent, now we wait for response with the same reqId
val res = consumer.getResponse(reqId)
return res
}
return json.parse("""{"error":"could not send message to kafka"}""")
}
/**
* Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala
* In KafkaMappedConnector.scala, we use Map[String, String]. Now we change to case class
* eg: case class Company(name: String, address: String) -->
* Company("TESOBE","Berlin")
* Map(name->"TESOBE", address->"2")
*
* @param caseClassObject
* @return Map[String, String]
*/
def stransferCaseClassToMap(caseClassObject: scala.Product) = caseClassObject.getClass.getDeclaredFields.map(_.getName) // all field names
.zip(caseClassObject.productIterator.to).toMap.asInstanceOf[Map[String, String]] // zipped with all values
}

View File

@ -34,12 +34,25 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.math.BigInt
import code.api.util.APIUtil.saveConnectorMetric
import scalacache.ScalaCache
import scalacache.guava.GuavaCache
import scalacache._
import concurrent.duration._
import language.postfixOps
import memoization._
import com.google.common.cache.CacheBuilder
import code.util.Helper.MdcLoggable
object LocalMappedConnector extends Connector with MdcLoggable {
type AccountType = MappedBankAccount
val maxBadLoginAttempts = Props.get("max.bad.login.attempts") openOr "10"
val underlyingGuavaCache = CacheBuilder.newBuilder().maximumSize(10000L).build[String, Object]
implicit val scalaCache = ScalaCache(GuavaCache(underlyingGuavaCache))
val getTransactionsTTL = Props.get("connector.cache.ttl.seconds.getTransactions", "0").toInt * 1000 // Miliseconds
//This is the implicit parameter for saveConnectorMetric function.
//eg: override def getBank(bankId: BankId): Box[Bank] = saveConnectorMetric
@ -178,13 +191,18 @@ object LocalMappedConnector extends Connector with MdcLoggable {
val optionalParams : Seq[QueryParam[MappedTransaction]] = Seq(limit.toSeq, offset.toSeq, fromDate.toSeq, toDate.toSeq, ordering.toSeq).flatten
val mapperParams = Seq(By(MappedTransaction.bank, bankId.value), By(MappedTransaction.account, accountId.value)) ++ optionalParams
val mappedTransactions = MappedTransaction.findAll(mapperParams: _*)
updateAccountTransactions(bankId, accountId)
for (account <- getBankAccount(bankId, accountId))
yield mappedTransactions.flatMap(_.toTransaction(account))
def getTransactionsCached(bankId: BankId, accountId: AccountId, optionalParams : Seq[QueryParam[MappedTransaction]]): Box[List[Transaction]] = memoizeSync(getTransactionsTTL millisecond){
val mappedTransactions = MappedTransaction.findAll(mapperParams: _*)
updateAccountTransactions(bankId, accountId)
for (account <- getBankAccount(bankId, accountId))
yield mappedTransactions.flatMap(_.toTransaction(account))
}
getTransactionsCached(bankId: BankId, accountId: AccountId, optionalParams)
}
/**
@ -254,6 +272,7 @@ object LocalMappedConnector extends Connector with MdcLoggable {
// Get all counterparties related to an account
override def getCounterpartiesFromTransaction(bankId: BankId, accountId: AccountId): List[Counterparty] =
//TODO, performance issue, when many metadata and many transactions, this will course a big problem .
Counterparties.counterparties.vend.getMetadatas(bankId, accountId).flatMap(getCounterpartyFromTransaction(bankId, accountId, _))
// Get one counterparty related to a bank account

View File

@ -14,14 +14,15 @@ object fx extends MdcLoggable {
val exchangeRates = {
Map(
"GBP" -> Map("EUR" -> 1.16278, "USD" -> 1.24930, "JPY" -> 141.373, "AED" -> 4.58882, "INR" -> 84.0950, "KRW" -> 1433.92, "XAF" -> 762.826),
"EUR" -> Map("GBP" -> 0.860011, "USD" -> 1.07428, "JPY" -> 121.567, "AED" -> 3.94594, "INR" -> 72.3136, "KRW" -> 1233.03, "XAF" -> 655.957),
"USD" -> Map("GBP" -> 0.800446, "EUR" -> 0.930886, "JPY" -> 113.161, "AED" -> 3.67310, "INR" -> 67.3135, "KRW" -> 1147.78, "XAF" -> 610.601),
"JPY" -> Map("GBP" -> 0.00707350, "EUR" -> 0.00822592, "USD" -> 0.00883695, "AED" -> 0.0324590, "INR" -> 0.594846, "KRW" -> 10.1428, "XAF" -> 5.39585),
"AED" -> Map("GBP" -> 0.217921, "EUR" -> 0.253425, "USD" -> 0.272250, "JPY" -> 30.8081, "INR" -> 18.3255, "KRW" -> 312.482, "XAF" -> 166.236),
"INR" -> Map("GBP" -> 0.0118913, "EUR" -> 0.0138287, "USD" -> 0.0148559, "JPY" -> 1.68111, "AED" -> 0.0545671, "KRW" -> 17.0512, "XAF" -> 9.07101),
"KRW" -> Map("GBP" -> 0.000697389, "EUR" -> 0.000811008, "USD" -> 0.000871250, "JPY" -> 0.0985917, "AED" -> 0.00320019, "INR" -> 0.0586469, "XAF" -> 0.531986),
"XAF" -> Map("GBP" -> 0.00131092, "EUR" -> 0.00152449, "USD" -> 0.00163773, "JPY" -> 0.185328, "AED" -> 0.00601555, "INR" -> 0.110241, "KRW" -> 1.87975 )
"GBP" -> Map("EUR" -> 1.16278, "USD" -> 1.24930, "JPY" -> 141.373, "AED" -> 4.58882, "INR" -> 84.0950, "KRW" -> 1433.92, "XAF" -> 762.826, "JOD" -> 1.0),
"EUR" -> Map("GBP" -> 0.860011, "USD" -> 1.07428, "JPY" -> 121.567, "AED" -> 3.94594, "INR" -> 72.3136, "KRW" -> 1233.03, "XAF" -> 655.957, "JOD" -> 1.0),
"USD" -> Map("GBP" -> 0.800446, "EUR" -> 0.930886, "JPY" -> 113.161, "AED" -> 3.67310, "INR" -> 67.3135, "KRW" -> 1147.78, "XAF" -> 610.601, "JOD" -> 1.0),
"JPY" -> Map("GBP" -> 0.00707350, "EUR" -> 0.00822592, "USD" -> 0.00883695, "AED" -> 0.0324590, "INR" -> 0.594846, "KRW" -> 10.1428, "XAF" -> 5.39585, "JOD" -> 1.0),
"AED" -> Map("GBP" -> 0.217921, "EUR" -> 0.253425, "USD" -> 0.272250, "JPY" -> 30.8081, "INR" -> 18.3255, "KRW" -> 312.482, "XAF" -> 166.236, "AED" -> 1.0),
"INR" -> Map("GBP" -> 0.0118913, "EUR" -> 0.0138287, "USD" -> 0.0148559, "JPY" -> 1.68111, "AED" -> 0.0545671, "KRW" -> 17.0512, "XAF" -> 9.07101, "JOD" -> 1.0),
"KRW" -> Map("GBP" -> 0.000697389, "EUR" -> 0.000811008, "USD" -> 0.000871250, "JPY" -> 0.0985917, "AED" -> 0.00320019, "INR" -> 0.0586469, "XAF" -> 0.531986, "JOD" -> 1.0),
"XAF" -> Map("GBP" -> 0.00131092, "EUR" -> 0.00152449, "USD" -> 0.00163773, "JPY" -> 0.185328, "AED" -> 0.00601555, "INR" -> 0.110241, "KRW" -> 1.87975, "JOD" -> 1.0),
"JOD" -> Map("GBP" -> 1.0, "EUR" -> 1.0, "USD" -> 1.0, "JPY" -> 1.0, "AED" -> 1.0, "INR" -> 1.0, "KRW" -> 1.0, "XAF" -> 1.0)
)
}

View File

@ -2,7 +2,7 @@ package code.metrics
import java.util.Date
import code.bankconnectors._
import code.bankconnectors.{OBPImplementedByPartialFunction, _}
import code.util.DefaultStringField
import net.liftweb.mapper._
@ -52,7 +52,29 @@ object MappedMetrics extends APIMetrics {
case OBPDescending => OrderBy(MappedMetric.date, Descending)
}
}
val optionalParams : Seq[QueryParam[MappedMetric]] = Seq(limit.toSeq, offset.toSeq, fromDate.toSeq, toDate.toSeq, ordering).flatten
// the optional variables:
val toConsumerId = queryParams.collect {case OBPConsumerId(value) => By(MappedMetric.consumerId, value)}.headOption
val toUserId = queryParams.collect {case OBPUserId(value) => By(MappedMetric.userId, value)}.headOption
val toUrl = queryParams.collect {case OBPUrl(value) => By(MappedMetric.url, value)}.headOption
val toAppName = queryParams.collect {case OBPAppName(value) => By(MappedMetric.appName, value)}.headOption
val toImplementedInVersion = queryParams.collect {case OBPImplementedInVersion(value) => By(MappedMetric.implementedInVersion, value)}.headOption
val toImplementedByPartialFunction = queryParams.collect {case OBPImplementedByPartialFunction(value) => By(MappedMetric.implementedByPartialFunction, value)}.headOption
val toVerb = queryParams.collect {case OBPVerb(value) => By(MappedMetric.verb, value)}.headOption
val optionalParams : Seq[QueryParam[MappedMetric]] = Seq(
offset.toSeq,
fromDate.toSeq,
toDate.toSeq,
ordering,
toConsumerId.toSeq,
toUserId.toSeq,
toUrl.toSeq,
toAppName.toSeq,
toImplementedInVersion.toSeq,
toImplementedByPartialFunction.toSeq,
toVerb.toSeq,
limit.toSeq
).flatten
MappedMetric.findAll(optionalParams: _*)
}

View File

@ -2,14 +2,12 @@ package code.remotedata
import code.accountholder.{AccountHolders, RemotedataAccountHoldersCaseClasses}
import code.model.{AccountId, BankId, User}
import net.liftweb.common.{Full, _}
import net.liftweb.common.Box
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import code.actorsystem.ObpActorInit
object RemotedataAccountHolders extends ActorInit with AccountHolders {
object RemotedataAccountHolders extends ObpActorInit with AccountHolders {
val cc = RemotedataAccountHoldersCaseClasses

View File

@ -1,21 +1,13 @@
package code.remotedata
import java.util.concurrent.TimeUnit
import akka.actor.Actor
import akka.event.Logging
import akka.util.Timeout
import code.accountholder.{MapperAccountHolders, RemotedataAccountHoldersCaseClasses}
import code.actorsystem.ObpActorHelper
import code.model._
import code.util.Helper.MdcLoggable
import net.liftweb.common._
import net.liftweb.util.ControlHelpers.tryo
import scala.concurrent.duration._
class RemotedataAccountHoldersActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataAccountHoldersActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MapperAccountHolders
val cc = RemotedataAccountHoldersCaseClasses

View File

@ -1,47 +0,0 @@
package code.remotedata
import akka.actor.ActorSystem
import code.util.Helper
import com.typesafe.config.ConfigFactory
import net.liftweb.util.Props
import code.util.Helper.MdcLoggable
object RemotedataActorSystem extends MdcLoggable {
var obpActorSystem: ActorSystem = null
val props_hostname = Helper.getHostname
def init () = {
if (obpActorSystem == null ) {
val system = ActorSystem("LookupSystem", ConfigFactory.load(ConfigFactory.parseString(RemotedataConfig.lookupConf)))
logger.info(RemotedataConfig.lookupConf)
obpActorSystem = system
}
obpActorSystem
}
def getActor(actorName: String) = {
this.init
val actorPath: String = Props.getBool("remotedata.enable", false) match {
case true =>
val hostname = RemotedataConfig.remoteHostname
val port = RemotedataConfig.remotePort
s"akka.tcp://RemotedataActorSystem_${props_hostname}@${hostname}:${port}/user/${actorName}"
case false =>
val hostname = RemotedataConfig.localHostname
var port = RemotedataConfig.localPort
if (port == 0) {
logger.error("Failed to connect to local Remotedata actor")
}
s"akka.tcp://RemotedataActorSystem_${props_hostname}@${hostname}:${port}/user/${actorName}"
}
this.obpActorSystem.actorSelection(actorPath)
}
}

View File

@ -2,10 +2,9 @@ package code.remotedata
import java.util.concurrent.TimeUnit
import akka.actor.{ActorSystem, ExtendedActorSystem, Props => ActorProps}
import akka.util.Timeout
import akka.actor.{ActorSystem, Props => ActorProps}
import bootstrap.liftweb.ToSchemify
import code.api.APIFailure
import code.actorsystem.{ObpActorConfig}
import code.util.Helper
import com.typesafe.config.ConfigFactory
import net.liftweb.common._
@ -16,69 +15,8 @@ import net.liftweb.util.Props
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import code.util.Helper.MdcLoggable
trait ActorInit {
// Deafult is 3 seconds, which should be more than enough for slower systems
val ACTOR_TIMEOUT: Long = Props.getLong("remotedata.timeout").openOr(3)
val actorName = CreateActorNameFromClassName(this.getClass.getName)
val actor = RemotedataActorSystem.getActor(actorName)
val TIMEOUT = (ACTOR_TIMEOUT seconds)
implicit val timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds))
def CreateActorNameFromClassName(c: String): String = {
val n = c.replaceFirst("^.*Remotedata", "")
Character.toLowerCase(n.charAt(0)) + n.substring(1)
}
def extractFuture[T](f: Future[Any]): T = {
val r = f.map {
case s: Set[T] => s
case l: List[T] => l
case t: T => t
case _ => Empty ~> APIFailure(s"future extraction failed", 501)
}
Await.result(r, TIMEOUT).asInstanceOf[T]
}
def extractFutureToBox[T](f: Future[Any]): Box[T] = {
val r = f.map {
case pf: ParamFailure[_] => Empty ~> pf
case af: APIFailure => Empty ~> af
case f: Failure => f
case Empty => Empty
case t: T => Full(t)
case _ => Empty ~> APIFailure(s"future extraction to box failed", 501)
}
Await.result(r, TIMEOUT)
}
}
trait ActorHelper {
def extractResult[T](in: T) = {
in match {
case pf: ParamFailure[_] =>
pf.param match {
case af: APIFailure => af
case f: Failure => f
case _ => pf
}
case af: APIFailure => af
case f: Failure => f
case l: List[T] => l
case s: Set[T] => s
case Full(r) => r
case t: T => t
case _ => APIFailure(s"result extraction failed", 501)
}
}
}
object RemotedataActors extends MdcLoggable {
@ -111,17 +49,11 @@ object RemotedataActors extends MdcLoggable {
actorsRemotedata.foreach { a => logger.info(actorSystem.actorOf(a._1, name = a._2)) }
}
def startLocalWorkerSystem(): Unit = {
logger.info("Starting local RemotedataActorSystem")
logger.info(RemotedataConfig.localConf)
val system = ActorSystem.create(s"RemotedataActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(RemotedataConfig.localConf)))
startActors(system)
}
def startRemoteWorkerSystem(): Unit = {
logger.info("Starting remote RemotedataActorSystem")
logger.info(RemotedataConfig.remoteConf)
val system = ActorSystem(s"RemotedataActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(RemotedataConfig.remoteConf)))
logger.info(ObpActorConfig.remoteConf)
val hostname = Helper.getRemotedataHostname
val system = ActorSystem(s"RemotedataActorSystem_${hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.remoteConf)))
startActors(system)
logger.info("Started")
}

View File

@ -2,20 +2,16 @@ package code.remotedata
import java.util.Date
import akka.actor.ActorKilledException
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.actorsystem.ObpActorInit
import code.metadata.comments.{Comments, RemotedataCommentsCaseClasses}
import code.model._
import net.liftweb.common.{Full, _}
import net.liftweb.common.Box
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration._
object RemotedataComments extends ActorInit with Comments {
object RemotedataComments extends ObpActorInit with Comments {
val cc = RemotedataCommentsCaseClasses

View File

@ -1,13 +1,13 @@
package code.remotedata
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.metadata.comments.{MappedComments, RemotedataCommentsCaseClasses}
import code.model._
import code.util.Helper.MdcLoggable
class RemotedataCommentsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataCommentsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedComments
val cc = RemotedataCommentsCaseClasses

View File

@ -3,11 +3,12 @@ package code.remotedata
import java.util.Date
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.bankconnectors.OBPQueryParam
import code.metrics.{ConnMetrics, MappedConnectorMetric, RemotedataConnectorMetricsCaseClasses}
object RemotedataConnectorMetrics extends ActorInit with ConnMetrics {
object RemotedataConnectorMetrics extends ObpActorInit with ConnMetrics {
val cc = RemotedataConnectorMetricsCaseClasses

View File

@ -3,12 +3,11 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.metrics.{ConnectorMetrics, RemotedataConnectorMetricsCaseClasses}
import code.util.Helper.MdcLoggable
class RemotedataConnectorMetricsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataConnectorMetricsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = ConnectorMetrics
val cc = RemotedataConnectorMetricsCaseClasses

View File

@ -1,12 +1,13 @@
package code.remotedata
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.consumer.{ConsumersProvider, RemotedataConsumersCaseClasses}
import code.model._
import net.liftweb.common._
object RemotedataConsumers extends ActorInit with ConsumersProvider {
object RemotedataConsumers extends ObpActorInit with ConsumersProvider {
val cc = RemotedataConsumersCaseClasses

View File

@ -1,13 +1,12 @@
package code.remotedata
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.consumer.RemotedataConsumersCaseClasses
import code.model.{MappedConsumersProvider, _}
import code.util.Helper.MdcLoggable
class RemotedataConsumersActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataConsumersActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedConsumersProvider
val cc = RemotedataConsumersCaseClasses

View File

@ -1,21 +1,15 @@
package code.remotedata
import java.util.Date
import akka.actor.ActorKilledException
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.actorsystem.ObpActorInit
import code.metadata.counterparties.{Counterparties, CounterpartyTrait, RemotedataCounterpartiesCaseClasses}
import code.model._
import net.liftweb.common.{Full, _}
import net.liftweb.common.Box
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration._
object RemotedataCounterparties extends ActorInit with Counterparties {
object RemotedataCounterparties extends ObpActorInit with Counterparties {
val cc = RemotedataCounterpartiesCaseClasses

View File

@ -3,13 +3,13 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.metadata.counterparties.{MapperCounterparties, RemotedataCounterpartiesCaseClasses}
import code.model._
import code.util.Helper.MdcLoggable
class RemotedataCounterpartiesActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataCounterpartiesActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MapperCounterparties
val cc = RemotedataCounterpartiesCaseClasses

View File

@ -3,14 +3,13 @@ package code.remotedata
import java.util.Date
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.customer.{AmountOfMoney, CreditRating, Customer, CustomerFaceImage, CustomerProvider, RemotedataCustomerProviderCaseClasses}
import code.model._
import net.liftweb.common.{Full, _}
import net.liftweb.common.Box
object RemotedataCustomers extends ActorInit with CustomerProvider {
object RemotedataCustomers extends ObpActorInit with CustomerProvider {
val cc = RemotedataCustomerProviderCaseClasses

View File

@ -3,14 +3,12 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.customer.{AmountOfMoney, _}
import code.model._
import code.util.Helper.MdcLoggable
import net.liftweb.util.ControlHelpers._
class RemotedataCustomersActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataCustomersActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedCustomerProvider
val cc = RemotedataCustomerProviderCaseClasses

View File

@ -1,13 +1,13 @@
package code.remotedata
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.entitlement.{Entitlement, EntitlementProvider, RemotedataEntitlementsCaseClasses}
import net.liftweb.common.Box
import scala.collection.immutable.List
object RemotedataEntitlements extends ActorInit with EntitlementProvider {
object RemotedataEntitlements extends ObpActorInit with EntitlementProvider {
val cc = RemotedataEntitlementsCaseClasses

View File

@ -1,12 +1,12 @@
package code.remotedata
import akka.actor.Actor
import code.actorsystem.ObpActorHelper
import code.entitlement.{Entitlement, MappedEntitlementsProvider, RemotedataEntitlementsCaseClasses}
import code.util.Helper.MdcLoggable
import net.liftweb.common.Box
class RemotedataEntitlementsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataEntitlementsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedEntitlementsProvider
val cc = RemotedataEntitlementsCaseClasses

View File

@ -3,12 +3,13 @@ package code.remotedata
import java.util.Date
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.bankconnectors.OBPQueryParam
import code.customer.{AmountOfMoney => _}
import code.metrics.{APIMetric, APIMetrics, RemotedataMetricsCaseClasses}
object RemotedataMetrics extends ActorInit with APIMetrics {
object RemotedataMetrics extends ObpActorInit with APIMetrics {
val cc = RemotedataMetricsCaseClasses

View File

@ -3,13 +3,13 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.bankconnectors.OBPQueryParam
import code.metrics.{MappedMetrics, RemotedataMetricsCaseClasses}
import code.util.Helper.MdcLoggable
class RemotedataMetricsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataMetricsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedMetrics
val cc = RemotedataMetricsCaseClasses

View File

@ -1,13 +1,11 @@
package code.remotedata
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.metadata.narrative.{Narrative, RemoteNarrativesCaseClasses}
import code.model._
import scala.concurrent.Await
object RemotedataNarratives extends ActorInit with Narrative {
object RemotedataNarratives extends ObpActorInit with Narrative {
val cc = RemoteNarrativesCaseClasses

View File

@ -1,16 +1,12 @@
package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.metadata.narrative.{MappedNarratives, RemoteNarrativesCaseClasses}
import code.model._
import code.util.Helper.MdcLoggable
import net.liftweb.util.ControlHelpers.tryo
class RemotedataNarrativesActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataNarrativesActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedNarratives
val cc = RemoteNarrativesCaseClasses

View File

@ -6,9 +6,10 @@ import code.model.Nonce
import code.nonce.{NoncesProvider, RemotedataNoncesCaseClasses}
import net.liftweb.common.Box
import akka.pattern.ask
import code.actorsystem.ObpActorInit
object RemotedataNonces extends ActorInit with NoncesProvider {
object RemotedataNonces extends ObpActorInit with NoncesProvider {
val cc = RemotedataNoncesCaseClasses

View File

@ -3,13 +3,12 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.model._
import code.nonce.RemotedataNoncesCaseClasses
import code.util.Helper.MdcLoggable
class RemotedataNoncesActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataNoncesActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedNonceProvider
val cc = RemotedataNoncesCaseClasses

View File

@ -1,11 +1,12 @@
package code.remotedata
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.sanitycheck.{RemotedataSanityCheckCaseClasses, SanityChecks}
import net.liftweb.common.Box
object RemotedataSanityCheck extends ActorInit with SanityChecks {
object RemotedataSanityCheck extends ObpActorInit with SanityChecks {
val cc = RemotedataSanityCheckCaseClasses

View File

@ -1,12 +1,11 @@
package code.remotedata
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.sanitycheck.{RemotedataSanityCheckCaseClasses, SanityChecksImpl}
import code.util.Helper.MdcLoggable
class RemotedataSanityCheckActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataSanityCheckActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = SanityChecksImpl
val cc = RemotedataSanityCheckCaseClasses

View File

@ -1,21 +1,15 @@
package code.remotedata
import java.util.Date
import akka.actor.ActorKilledException
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.actorsystem.ObpActorInit
import code.metadata.tags.{RemotedataTagsCaseClasses, Tags}
import code.model._
import net.liftweb.common.{Full, _}
import net.liftweb.common.Box
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration._
object RemotedataTags extends ActorInit with Tags {
object RemotedataTags extends ObpActorInit with Tags {
val cc = RemotedataTagsCaseClasses

View File

@ -1,20 +1,12 @@
package code.remotedata
import java.util.concurrent.TimeUnit
import akka.actor.Actor
import akka.event.Logging
import akka.util.Timeout
import code.actorsystem.ObpActorHelper
import code.metadata.tags.{MappedTags, RemotedataTagsCaseClasses}
import code.model._
import code.util.Helper.MdcLoggable
import net.liftweb.common._
import net.liftweb.util.ControlHelpers.tryo
import scala.concurrent.duration._
class RemotedataTagsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataTagsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedTags
val cc = RemotedataTagsCaseClasses

View File

@ -1,15 +1,15 @@
package code.remotedata
import java.util.Date
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.token.{RemotedataTokensCaseClasses, TokensProvider}
import code.model.Token
import code.model.TokenType.TokenType
import net.liftweb.common._
import net.liftweb.common.Box
object RemotedataTokens extends ActorInit with TokensProvider {
object RemotedataTokens extends ObpActorInit with TokensProvider {
val cc = RemotedataTokensCaseClasses

View File

@ -3,14 +3,14 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.token.RemotedataTokensCaseClasses
import code.model.TokenType.TokenType
import code.model._
import code.util.Helper.MdcLoggable
class RemotedataTokensActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataTokensActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedTokenProvider
val cc = RemotedataTokensCaseClasses

View File

@ -1,21 +1,14 @@
package code.remotedata
import java.util.Date
import akka.actor.ActorKilledException
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.actorsystem.ObpActorInit
import code.metadata.transactionimages.{RemotedataTransactionImagesCaseClasses, TransactionImages}
import code.model._
import net.liftweb.common.{Full, _}
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration._
import net.liftweb.common.Box
object RemotedataTransactionImages extends ActorInit with TransactionImages {
object RemotedataTransactionImages extends ObpActorInit with TransactionImages {
val cc = RemotedataTransactionImagesCaseClasses

View File

@ -3,14 +3,13 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.metadata.transactionimages.{MapperTransactionImages, RemotedataTransactionImagesCaseClasses}
import code.model._
import code.util.Helper.MdcLoggable
import net.liftweb.util.ControlHelpers.tryo
class RemotedataTransactionImagesActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataTransactionImagesActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MapperTransactionImages
val cc = RemotedataTransactionImagesCaseClasses

View File

@ -1,6 +1,7 @@
package code.remotedata
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.api.v2_1_0.TransactionRequestCommonBodyJSON
import code.metadata.counterparties.CounterpartyTrait
import code.model._
@ -8,7 +9,7 @@ import code.transactionrequests.TransactionRequests.{TransactionRequest, Transac
import code.transactionrequests.{MappedTransactionRequest, RemotedataTransactionRequestsCaseClasses, TransactionRequestProvider}
import net.liftweb.common.Box
object RemotedataTransactionRequests extends ActorInit with TransactionRequestProvider {
object RemotedataTransactionRequests extends ObpActorInit with TransactionRequestProvider {
val cc = RemotedataTransactionRequestsCaseClasses

View File

@ -1,7 +1,7 @@
package code.remotedata
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.api.v2_1_0.TransactionRequestCommonBodyJSON
import code.metadata.counterparties.CounterpartyTrait
import code.model._
@ -10,7 +10,7 @@ import code.transactionrequests.{MappedTransactionRequestProvider, RemotedataTra
import code.util.Helper.MdcLoggable
class RemotedataTransactionRequestsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataTransactionRequestsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedTransactionRequestProvider
val cc = RemotedataTransactionRequestsCaseClasses

View File

@ -2,11 +2,12 @@ package code.remotedata
import java.util.Date
import akka.pattern.ask
import code.actorsystem.ObpActorInit
import code.usercustomerlinks.{RemotedataUserCustomerLinkProviderCaseClass, UserCustomerLink, UserCustomerLinkProvider}
import net.liftweb.common._
object RemotedataUserCustomerLinks extends ActorInit with UserCustomerLinkProvider {
object RemotedataUserCustomerLinks extends ObpActorInit with UserCustomerLinkProvider {
val cc = RemotedataUserCustomerLinkProviderCaseClass

View File

@ -3,12 +3,12 @@ package code.remotedata
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import code.actorsystem.ObpActorHelper
import code.usercustomerlinks.{MappedUserCustomerLinkProvider, RemotedataUserCustomerLinkProviderCaseClass}
import code.util.Helper.MdcLoggable
class RemotedataUserCustomerLinksActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataUserCustomerLinksActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MappedUserCustomerLinkProvider
val cc = RemotedataUserCustomerLinkProviderCaseClass

View File

@ -1,20 +1,14 @@
package code.remotedata
import akka.actor.ActorKilledException
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.actorsystem.ObpActorInit
import code.model.User
import code.model.dataAccess.ResourceUser
import code.users.{RemotedataUsersCaseClasses, Users}
import net.liftweb.common.{Full, _}
import net.liftweb.common.Box
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration._
object RemotedataUsers extends ActorInit with Users {
object RemotedataUsers extends ObpActorInit with Users {
val cc = RemotedataUsersCaseClasses

View File

@ -1,21 +1,12 @@
package code.remotedata
import java.util.concurrent.TimeUnit
import akka.actor.Actor
import akka.event.Logging
import akka.util.Timeout
import code.model._
import code.actorsystem.ObpActorHelper
import code.model.dataAccess.ResourceUser
import code.users.{LiftUsers, RemotedataUsersCaseClasses}
import code.util.Helper.MdcLoggable
import net.liftweb.common._
import net.liftweb.util.ControlHelpers.tryo
import scala.concurrent.duration._
class RemotedataUsersActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataUsersActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = LiftUsers
val cc = RemotedataUsersCaseClasses

View File

@ -1,19 +1,14 @@
package code.remotedata
import akka.actor.ActorKilledException
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.actorsystem.ObpActorInit
import code.model.{CreateViewJSON, Permission, UpdateViewJSON, _}
import code.views.{RemotedataViewsCaseClasses, Views}
import net.liftweb.common.{Full, _}
import net.liftweb.common.{Box, Full}
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration._
object RemotedataViews extends ActorInit with Views {
object RemotedataViews extends ObpActorInit with Views {
val cc = RemotedataViewsCaseClasses

View File

@ -1,15 +1,13 @@
package code.remotedata
import akka.actor.Actor
import code.actorsystem.ObpActorHelper
import code.views.{MapperViews, RemotedataViewsCaseClasses}
import code.model._
import code.util.Helper.MdcLoggable
import net.liftweb.common._
import scala.concurrent.duration._
class RemotedataViewsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataViewsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MapperViews
val cc = RemotedataViewsCaseClasses

View File

@ -2,20 +2,14 @@ package code.remotedata
import java.util.Date
import akka.actor.ActorKilledException
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.actorsystem.ObpActorInit
import code.metadata.wheretags.{RemotedataWhereTagsCaseClasses, WhereTags}
import code.model._
import net.liftweb.common.{Full, _}
import scala.collection.immutable.List
import scala.concurrent.Await
import scala.concurrent.duration._
import net.liftweb.common.Box
object RemotedataWhereTags extends ActorInit with WhereTags {
object RemotedataWhereTags extends ObpActorInit with WhereTags {
val cc = RemotedataWhereTagsCaseClasses

View File

@ -1,21 +1,15 @@
package code.remotedata
import java.util.concurrent.TimeUnit
import java.util.Date
import akka.actor.Actor
import akka.event.Logging
import akka.util.Timeout
import code.actorsystem.ObpActorHelper
import code.metadata.wheretags.{MapperWhereTags, RemotedataWhereTagsCaseClasses}
import code.model._
import code.util.Helper.MdcLoggable
import net.liftweb.common._
import net.liftweb.util.ControlHelpers.tryo
import scala.concurrent.duration._
class RemotedataWhereTagsActor extends Actor with ActorHelper with MdcLoggable {
class RemotedataWhereTagsActor extends Actor with ObpActorHelper with MdcLoggable {
val mapper = MapperWhereTags
val cc = RemotedataWhereTagsCaseClasses

View File

@ -208,6 +208,13 @@ object Helper{
}
}
def getRemotedataHostname(): String = {
Props.get("remotedata.hostname", "") match {
case s: String if s.nonEmpty => s.replaceAll("\\/", "").replaceAll("\\.", "-")
case _ => "unknown"
}
}
def findAvailablePort(): Int = {
val PORT_RANGE_MIN = 2552
val PORT_RANGE_MAX = 2661
@ -235,6 +242,7 @@ object Helper{
candidatePort = findRandomPort()
}
while (!isPortAvailable(candidatePort))
println("==================================> RANDOM PORT = " + candidatePort)
candidatePort
}

View File

@ -261,11 +261,16 @@ object MapperViews extends Views with MdcLoggable {
}
}
/**
* Get the view list by bankAccountUUID.
* @param bankAccountId find the views by this bankaccountUUID.
* @return if find, return the view list. Or return the Nil.
*/
def views(bankAccountId : BankAccountUID) : List[View] = {
ViewImpl.findAll(ViewImpl.accountFilter(bankAccountId.bankId, bankAccountId.accountId): _*)
//TODO to check the ALLOW_PUBLIC_VIEWS, need throw exception
//res.foreach(view => if(view.isPublic && !ALLOW_PUBLIC_VIEWS) return Failure(AllowPublicViewsNotSpecified))
if (ALLOW_PUBLIC_VIEWS)
ViewImpl.findAll(ViewImpl.accountFilter(bankAccountId.bankId, bankAccountId.accountId): _*)
else
ViewImpl.findAll(By(ViewImpl.isPublic_, false):: ViewImpl.accountFilter(bankAccountId.bankId, bankAccountId.accountId): _*)
}
/**
@ -274,7 +279,7 @@ object MapperViews extends Views with MdcLoggable {
*
* @param user the user need to be checked for the views
* @param bankAccountId the bankAccountUID, the account will be checked the views.
* @return
* @return if find, return the view list. or return Nil.
*/
def permittedViews(user: User, bankAccountId: BankAccountUID): List[View] = {
//TODO: do this more efficiently?
@ -292,50 +297,39 @@ object MapperViews extends Views with MdcLoggable {
case _ => None
}
})
// merge the nonPublic and Pulic views
// TODO why not just findAll(By(user), By(BankId), By(AccountId)) directly ? So complicated now.
// merge the nonPublic and public views
userNonPublicViewsForAccount ++ publicViews(bankAccountId)
}
def publicViews(bankAccountId : BankAccountUID) : List[View] = {
//TODO: do this more efficiently?
ViewImpl.findAll(ViewImpl.accountFilter(bankAccountId.bankId, bankAccountId.accountId): _*).filter(v => {
v.isPublic == true && ALLOW_PUBLIC_VIEWS
})
if(ALLOW_PUBLIC_VIEWS)
ViewImpl.findAll(By(ViewImpl.isPublic_,true)::ViewImpl.accountFilter(bankAccountId.bankId, bankAccountId.accountId): _*)
else
Nil
}
/**
* An account is considered public if it contains a public view
* @return the list of all bankAccountUUIDs which contains a public view
*/
def getAllPublicAccounts() : List[BankAccountUID] = {
//TODO: do this more efficiently
// An account is considered public if it contains a public view
val bankAndAccountIds : List[(BankId, AccountId)] =
if (ALLOW_PUBLIC_VIEWS)
ViewImpl.findAll(By(ViewImpl.isPublic_, true)).map(v =>(v.bankId, v.accountId)).distinct //we remove duplicates here
else
Nil
val accountsList = bankAndAccountIds.map {
case (bankId, accountId) => {
BankAccountUID(bankId, accountId)
}
}
accountsList
if (ALLOW_PUBLIC_VIEWS)
ViewImpl
.findAll(By(ViewImpl.isPublic_, true)) //find all the public views
.map(v =>BankAccountUID(v.bankId, v.accountId)) //generate the BankAccountUID
.distinct //we remove duplicates here
else
Nil
}
def getPublicBankAccounts(bank : Bank) : List[BankAccountUID] = {
//TODO: do this more efficiently
val accountIds : List[AccountId] =
if (ALLOW_PUBLIC_VIEWS)
ViewImpl.findAll(By(ViewImpl.isPublic_, true), By(ViewImpl.bankPermalink, bank.bankId.value)).map(v => {v.accountId}).distinct //we remove duplicates here
ViewImpl
.findAll(By(ViewImpl.isPublic_, true), By(ViewImpl.bankPermalink, bank.bankId.value)) //find all the public views
.map(v => {BankAccountUID(bank.bankId, v.accountId) }) //generate the BankAccountUID
.distinct //we remove duplicates here
else
Nil
val accountsList = accountIds.map(accountId => {
BankAccountUID(bank.bankId, accountId)
})
accountsList
}
/**
@ -344,31 +338,22 @@ object MapperViews extends Views with MdcLoggable {
*/
def getAllAccountsUserCanSee(user : Box[User]) : List[BankAccountUID] = {
user match {
case Full(theuser) => {
//TODO: this could be quite a bit more efficient...
val publicViewBankAndAccountIds=
case Full(user) => {
val publicViewBankAndAccounts=
if (ALLOW_PUBLIC_VIEWS)
ViewImpl.findAll(By(ViewImpl.isPublic_, true)).map(v => {(v.bankId, v.accountId)}).distinct
ViewImpl
.findAll(By(ViewImpl.isPublic_, true)) // find all the public view in ViewImpl table, it has no relevent with user, all the user can get the public view.
.map(v => {BankAccountUID(v.bankId, v.accountId)}) //generate the BankAccountUID
else
Nil
val userPrivileges : List[ViewPrivileges] = ViewPrivileges.findAll(By(ViewPrivileges.user, theuser.resourceUserId.value))
val userNonPublicViews : List[ViewImpl] = userPrivileges.map(_.view.obj).flatten.filter(!_.isPublic)
val nonPublicViewBankAndAccountIds = userNonPublicViews.map(v => {
(v.bankId, v.accountId)
}).distinct //we remove duplicates here
val visibleBankAndAccountIds =
(publicViewBankAndAccountIds ++ nonPublicViewBankAndAccountIds).distinct
val accountsList = visibleBankAndAccountIds.map {
case (bankId, accountId) => {
BankAccountUID(bankId, accountId)
}
}
accountsList
val nonPublicViewBankAndAccounts = ViewPrivileges
.findAll(By(ViewPrivileges.user, user.resourceUserId.value)) // find all the views link to the user, means the views that user can access.
.map(_.view.obj).flatten.filter(!_.isPublic) //select all the non-public views
.map(v => { BankAccountUID(v.bankId, v.accountId)}) //generate the BankAccountUID
//we remove duplicates here, because some accounts, has both public views and non-public views
(publicViewBankAndAccounts ++ nonPublicViewBankAndAccounts).distinct
}
case _ => getAllPublicAccounts()
}
@ -380,35 +365,24 @@ object MapperViews extends Views with MdcLoggable {
*/
def getAllAccountsUserCanSee(bank: Bank, user : Box[User]) : List[BankAccountUID] = {
user match {
case Full(theuser) => {
//TODO: this could be quite a bit more efficient...
val publicViewBankAndAccountIds =
case Full(user) => {
val publicViewBankAndAccounts=
if (ALLOW_PUBLIC_VIEWS)
ViewImpl.findAll(By(ViewImpl.isPublic_, true), By(ViewImpl.bankPermalink, bank.bankId.value)).map(v => {(v.bankId, v.accountId)}).distinct
else
ViewImpl
.findAll(By(ViewImpl.isPublic_, true),By(ViewImpl.bankPermalink, bank.bankId.value)) // find all the public view in ViewImpl table, it has no relevant with user, all the user can get the public view.
.map(v => {BankAccountUID(v.bankId, v.accountId)}) //generate the BankAccountUID
else
Nil
val userPrivileges : List[ViewPrivileges] = ViewPrivileges.findAll(By(ViewPrivileges.user, theuser.resourceUserId.value))
val userNonPublicViews : List[ViewImpl] = userPrivileges.map(_.view.obj).flatten.filter(v => {
!v.isPublic && v.bankId == bank.bankId
})
val nonPublicViewBankAndAccountIds = userNonPublicViews.map(v => {
(v.bankId, v.accountId)
}).distinct //we remove duplicates here
val visibleBankAndAccountIds =
(publicViewBankAndAccountIds ++ nonPublicViewBankAndAccountIds).distinct
val accountsList = visibleBankAndAccountIds.map {
case (bankId, accountId) => {
BankAccountUID(bankId, accountId)
}
}
accountsList
val nonPublicViewBankAndAccounts = ViewPrivileges
.findAll(By(ViewPrivileges.user, user.resourceUserId.value)) // find all the views link to the user, means the views that user can access.
.map(_.view.obj).flatten.filter(v => !v.isPublic && v.bankId ==bank.bankId) //select all the non-public views according to bankId
.map(v => { BankAccountUID(v.bankId, v.accountId)}) //generate the BankAccountUID
//we remove duplicates here, because some accounts, has both public views and non-public views
(publicViewBankAndAccounts ++ nonPublicViewBankAndAccounts).distinct
}
case _ => getPublicBankAccounts(bank)
case _ => getAllPublicAccounts()
}
}
@ -416,37 +390,22 @@ object MapperViews extends Views with MdcLoggable {
* @return the bank accounts where the user has at least access to a non public view (is_public==false)
*/
def getNonPublicBankAccounts(user : User) : List[BankAccountUID] = {
//TODO: make this more efficient
val userPrivileges : List[ViewPrivileges] = ViewPrivileges.findAll(By(ViewPrivileges.user, user.resourceUserId.value))
val userNonPublicViews : List[ViewImpl] = userPrivileges.map(_.view.obj).flatten.filter(!_.isPublic)
val nonPublicViewBankAndAccountIds = userNonPublicViews.map(v => {
(v.bankId, v.accountId)
}).distinct //we remove duplicates here
val accountsList = nonPublicViewBankAndAccountIds.map {
case(bankId, accountId) => {
BankAccountUID(bankId, accountId)
}
}
accountsList
ViewPrivileges
.findAll(By(ViewPrivileges.user, user.resourceUserId.value)) // find all the views link to the user, means the views that user can access.
.map(_.view.obj).flatten.filter(!_.isPublic) //select all the non-public views
.map(v => { BankAccountUID(v.bankId, v.accountId)}) //generate the BankAccountUID
.distinct//we remove duplicates here
}
/**
* @return the bank accounts where the user has at least access to a non public view (is_public==false) for a specific bank
*/
def getNonPublicBankAccounts(user : User, bankId : BankId) : List[BankAccountUID] = {
val userPrivileges : List[ViewPrivileges] = ViewPrivileges.findAll(By(ViewPrivileges.user, user.resourceUserId.value))
val userNonPublicViewsForBank : List[ViewImpl] =
userPrivileges.map(_.view.obj).flatten.filter(v => !v.isPublic && v.bankId == bankId)
val nonPublicViewAccountIds = userNonPublicViewsForBank.
map(_.accountId).distinct //we remove duplicates here
val accountsList = nonPublicViewAccountIds.map { accountId =>
BankAccountUID(bankId, accountId)
}
accountsList
ViewPrivileges
.findAll(By(ViewPrivileges.user, user.resourceUserId.value)) // find all the views link to the user, means the views that user can access.
.map(_.view.obj).flatten.filter(v => !v.isPublic && v.bankId == bankId) //select all the non-public views according to bankId
.map(v => { BankAccountUID(v.bankId, v.accountId)}) //generate the BankAccountUID
.distinct//we remove duplicates here
}
def createOwnerView(bankId: BankId, accountId: AccountId, description: String = "Owner View") : Box[View] = {
@ -567,9 +526,16 @@ object MapperViews extends Views with MdcLoggable {
}
//TODO This is used only for tests, but might impose security problem
/**
* Grant user all views in the ViewImpl table. It is only used in Scala Tests.
* @param user the user who will get the access to all views in ViewImpl table.
* @return if no exception, it always return true
*/
def grantAccessToAllExistingViews(user : User) = {
ViewImpl.findAll.foreach(v => {
//Get All the views from ViewImpl table, and create the link user <--> each view. The link record the access permission.
if ( ViewPrivileges.find(By(ViewPrivileges.view, v), By(ViewPrivileges.user, user.resourceUserId.value) ).isEmpty )
//If the user and one view has no link, it will create one .
ViewPrivileges.create.
view(v).
user(user.resourceUserId.value).
@ -577,7 +543,13 @@ object MapperViews extends Views with MdcLoggable {
})
true
}
/**
* grant one user access to specific view. It is only used in Scala Tests
* @param user The user, who will get the access of input view
* @param view
* @return
*/
def grantAccessToView(user : User, view : View) = {
val v = ViewImpl.find(view.uid).orNull
if ( ViewPrivileges.count(By(ViewPrivileges.view, v), By(ViewPrivileges.user, user.resourceUserId.value) ) == 0 )
@ -588,17 +560,28 @@ object MapperViews extends Views with MdcLoggable {
else
false
}
/**
* Find view by bankId , accountId and viewName. If it is exsting in ViewImple table, return true.
* Otherwise, return false.
*
* But not used yet !
*/
def viewExists(bankId: BankId, accountId: AccountId, name: String): Boolean = {
val res =
ViewImpl.findAll(
By(ViewImpl.bankPermalink, bankId.value),
By(ViewImpl.accountPermalink, accountId.value),
By(ViewImpl.name_, name)
)
//TODO to check the ALLOW_PUBLIC_VIEWS, need throw exception
//res.foreach(view => if(view.isPublic && !ALLOW_PUBLIC_VIEWS) return Failure(AllowPublicViewsNotSpecified))
if (ALLOW_PUBLIC_VIEWS)
ViewImpl.findAll(
By(ViewImpl.bankPermalink, bankId.value),
By(ViewImpl.accountPermalink, accountId.value),
By(ViewImpl.name_, name)
)
else
ViewImpl.findAll(
By(ViewImpl.bankPermalink, bankId.value),
By(ViewImpl.accountPermalink, accountId.value),
By(ViewImpl.name_, name),
By(ViewImpl.isPublic_, false)
)
res.nonEmpty
}

View File

@ -0,0 +1,27 @@
Helper scripts to setup, start, stop and clean kafka.
1. download and unpack kafka binaries
-------------------------------------
setup.sh
2. start kafka
--------------
kafka-start.sh
3. stop kafka
-------------
kafka-stop.sh
4. clean logs and restart kafka
-------------------------------
kafka-clean-restart.sh

View File

@ -0,0 +1,118 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dir=tmp/kafka
log.dirs=tmp/kafka
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=1
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

View File

@ -0,0 +1,28 @@
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1

View File

@ -0,0 +1,28 @@
#!/bin/bash
PARTITIONS=1
DIR=$(dirname $(readlink -f $0))
cd $DIR
VERSION=$(find . -name "kafka_2*[0-9.][0-9]" | cut -d_ -f2)
./kafka_stop.sh 2>/dev/null
sleep 1
echo "Cleaning kafka logs..."
rm -rf ${DIR}/tmp/[kz][ao]*
./kafka_start.sh
sleep 5
if [[ "$(kafka_$VERSION/bin/kafka-topics.sh --zookeeper=localhost:2181 --list | grep Response)" != "Response" ]]; then
kafka_$VERSION/bin/kafka-topics.sh --zookeeper=localhost:2181 --create --partitions ${PARTITIONS} --replication-factor 1 --topic Response
fi
if [[ "$(kafka_$VERSION/bin/kafka-topics.sh --zookeeper=localhost:2181 --list | grep Request)" != "Request" ]]; then
kafka_$VERSION/bin/kafka-topics.sh --zookeeper=localhost:2181 --create --partitions ${PARTITIONS} --replication-factor 1 --topic Request
fi

View File

@ -0,0 +1,38 @@
#!/bin/bash
trap ctrl_c INT
function ctrl_c() {
echo
kill -9 ${pids}
exit
}
DIR=$(dirname $(readlink -f $0))
cd $DIR
VERSION=$(find . -name "kafka_2*[0-9.][0-9]" | cut -d_ -f2)
topic_list=$(kafka_$VERSION/bin/kafka-topics.sh --list --zookeeper=localhost:2181)
echo "Monitoring topics:"
pids=
for topic in ${topic_list}
do
if [ "${topic}" != "__consumer_offsets" ]
then
echo ${topic}
kafka_$VERSION/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic=${topic} &
pids=${pids}\ $!
fi
done
echo "------------------"
read keypress
echo ${pids}
kill -9 ${pids}
exit

View File

@ -0,0 +1,31 @@
#!/bin/bash
DIR=$(dirname $(readlink -f $0))
cd $DIR
LOGDIR=${DIR}/logs
TMPDIR=${DIR}/tmp
mkdir -p ${LOGDIR}
mkdir -p ${TMPDIR}
VERSION=$(find . -name "kafka_2*[0-9.][0-9]" | cut -d_ -f2)
KAFKA_PID_FILE="${TMPDIR}/kafka.pid"
ZOOKEEPER_PID_FILE="${TMPDIR}/zookeeper.pid"
echo "Starting zookeeper..."
nohup kafka_$VERSION/bin/zookeeper-server-start.sh config/zookeeper.properties >> ${LOGDIR}/zookeeper.nohup.out &
echo $! > ${ZOOKEEPER_PID_FILE}
sleep 5
echo "OK"
echo "Starting kafka..."
nohup kafka_$VERSION/bin/kafka-server-start.sh config/kafka.properties >> ${LOGDIR}/kafka.nohup.out &
echo $! > ${KAFKA_PID_FILE}
sleep 5
echo "OK"
cd -

View File

@ -0,0 +1,42 @@
#!/bin/bash
DIR=$(dirname $(readlink -f $0))
cd $DIR
TMPDIR=${DIR}/tmp
KAFKA_PID_FILE="${TMPDIR}/kafka.pid"
ZOOKEEPER_PID_FILE="${TMPDIR}/zookeeper.pid"
if [ -e "$KAFKA_PID_FILE" ]; then
PID=`/bin/cat ${KAFKA_PID_FILE}`
echo "Killing kafka process ${PID}..."
/bin/kill -9 ${PID}
/bin/rm ${KAFKA_PID_FILE}
else
echo "No PID file found at ${KAFKA_PID_FILE}"
fi
if [ -e "$ZOOKEEPER_PID_FILE" ]; then
PID=`/bin/cat ${ZOOKEEPER_PID_FILE}`
echo "Killing zookeeper process ${PID}..."
/bin/kill -9 ${PID}
/bin/rm ${ZOOKEEPER_PID_FILE}
else
echo "No PID file found at ${ZOOKEEPER_PID_FILE}"
fi
PIDS=`/bin/ps auxw | grep kafka | grep -v grep | grep -v kafka_st | grep -v kafka_mo | grep -v kafka_cl | cut -b 10-16`
if [ ${#PIDS[@]} -le 1 ]
then
echo "No zombie processes found"
fi
for PID in ${PIDS}
do
echo "Killing zombie process ${PID}..."
/bin/kill -9 ${PID}
done
cd -

View File

@ -0,0 +1,3 @@
wget http://www-eu.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz && \
tar xzf kafka_2.11-0.10.2.0.tgz && \
rm -f kafka_2.11-0.10.2.0.tgz

View File

@ -31,3 +31,4 @@ create or replace view v_transaction_comment as select id numeric_transaciton_co
create or replace view v_view_privilege as select id numeric_view_privilege_id, user_c numeric_resource_user_id, view_c numeric_view_id from viewprivileges;
create or replace view v_transaction_request_type_charge as select id, mbankid bank_id, mtransactionrequesttypeid transaction_request_type_id, mchargecurrency currency , mchargeamount amount, mchargesummary summary from mappedtransactionrequesttypecharge;

View File

@ -0,0 +1,43 @@
#logo-left {
margin: 0;
text-align: center;
float: none;
display: block;
}
#logo-left img {
max-height: none;
width: 1050px;
}
#header {
padding: 0;
}
#nav,
#main-showcases,
#main-support,
#main-links,
#authorizeSection,
#registerAppSection,
#header-decoration,
#create-sandbox-account {
background-color: #b6dcdc;
}
#main-about-box {
background-color: rgba(182, 220, 220, 0.7);
}
#nav ul li:nth-child(1):before {
content: "";
height: 0px;
width: 0px;
margin-right: 0px;
}
#account-created-successfully {
color: green;
}

View File

@ -16,7 +16,12 @@ class MappedEntitlementTest extends ServerSetup {
def createEntitlement(bankId: String, userId: String, roleName: String) = Entitlement.entitlement.vend.addEntitlement(bankId, userId, roleName)
private def delete() {
MappedEntitlement.bulkDelete_!!()
val found = Entitlement.entitlement.vend.getEntitlements.openOr(List())
found.foreach {
d => {
Entitlement.entitlement.vend.deleteEntitlement(Full(d))
}
}
}
override def beforeAll() = {
@ -32,10 +37,10 @@ class MappedEntitlementTest extends ServerSetup {
feature("Getting Entitlement data") {
scenario("We try to get Entitlement") {
Given("There is no entitlements at all but we try to get it")
MappedEntitlement.findAll().size should equal(0)
Entitlement.entitlement.vend.getEntitlements().openOr(List()).size should equal(0)
When("We try to get it all")
val found = MappedEntitlementsProvider.getEntitlements.openOr(List())
val found = Entitlement.entitlement.vend.getEntitlements.openOr(List())
Then("We don't")
found.size should equal(0)
@ -45,11 +50,7 @@ class MappedEntitlementTest extends ServerSetup {
scenario("A Entitlement exists for user and we try to get it") {
Given("Create an entitlement")
val entitlement1 = createEntitlement(bankId1, userId1, role1.toString)
MappedEntitlement.find(
By(MappedEntitlement.mBankId, bankId1),
By(MappedEntitlement.mUserId, userId1),
By(MappedEntitlement.mRoleName, role1.toString)
).isDefined should equal(true)
Entitlement.entitlement.vend.getEntitlement(bankId1, userId1, role1.toString).isDefined should equal(true)
When("We try to get it by bank, user and role")
val foundOpt = Entitlement.entitlement.vend.getEntitlement(bankId1, userId1, role1.toString)
@ -87,7 +88,7 @@ class MappedEntitlementTest extends ServerSetup {
And("We try to delete all rows")
found.foreach {
d => {
MappedEntitlementsProvider.deleteEntitlement(Full(d)) should equal(Full(true))
Entitlement.entitlement.vend.deleteEntitlement(Full(d)) should equal(Full(true))
}
}
}