From 445db575f0716a7c520a025f96da21cda5f8f251 Mon Sep 17 00:00:00 2001 From: Petar Bozin Date: Fri, 28 Apr 2017 18:09:24 +0200 Subject: [PATCH] Currently all broken, just a backup-commit --- src/main/resources/default.logback.xml | 2 +- src/main/scala/bootstrap/liftweb/Boot.scala | 29 ++++----- .../scala/code/actorsystem/ActorHelper.scala | 30 +++++++++ .../scala/code/actorsystem/ActorUtils.scala | 62 ------------------- .../code/bankconnectors/KafkaHelper.scala | 48 ++++++++++++-- .../bankconnectors/KafkaHelperActor.scala | 8 ++- .../bankconnectors/KafkaHelperActors.scala | 23 +------ .../code/remotedata/RemotedataActors.scala | 60 +++++++++++++----- 8 files changed, 142 insertions(+), 120 deletions(-) create mode 100644 src/main/scala/code/actorsystem/ActorHelper.scala delete mode 100644 src/main/scala/code/actorsystem/ActorUtils.scala diff --git a/src/main/resources/default.logback.xml b/src/main/resources/default.logback.xml index e54458242..dea516d0b 100644 --- a/src/main/resources/default.logback.xml +++ b/src/main/resources/default.logback.xml @@ -6,7 +6,7 @@ - + diff --git a/src/main/scala/bootstrap/liftweb/Boot.scala b/src/main/scala/bootstrap/liftweb/Boot.scala index c2dc80fbf..b4ed7048a 100644 --- a/src/main/scala/bootstrap/liftweb/Boot.scala +++ b/src/main/scala/bootstrap/liftweb/Boot.scala @@ -198,23 +198,9 @@ class Boot extends MdcLoggable { logger.info("running mode: " + runningMode) logger.info(s"ApiPathZero (the bit before version) is $ApiPathZero") - if (runningMode == "Production mode") - System.setProperty("log_dir", Helper.getHostname) - logger.debug(s"If you can read this, logging level is debug") - val actorSystem = ObpActorSystem.startLocalActorSystem() - KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem) - - if (!Props.getBool("remotedata.enable", false)) { - try { - logger.info(s"RemotedataActors.startLocalRemotedataWorkers(actorSystem) starting") - RemotedataActors.startLocalRemotedataWorkers(actorSystem) - } catch { - case ex: Exception => logger.warn(s"RemotedataActors.startLocalRemotedataWorkers(${actorSystem}) could not start: $ex") - } - } // where to search snippets LiftRules.addToPackages("code") @@ -281,6 +267,21 @@ class Boot extends MdcLoggable { } } + if (connector.startsWith("kafka")) { + logger.info(s"KafkaHelperActors.startLocalKafkaHelperWorkers( ${actorSystem} ) starting") + KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem) + } + + if (!Props.getBool("remotedata.enable", false)) { + try { + logger.info(s"RemotedataActors.startLocalRemotedataWorkers( ${actorSystem} ) starting") + RemotedataActors.startLocalRemotedataWorkers(actorSystem) + } catch { + case ex: Exception => logger.warn(s"RemotedataActors.startLocalRemotedataWorkers( ${actorSystem} ) could not start: $ex") + } + } + + // API Metrics (logs of API calls) // If set to true we will write each URL with params to a datastore / log file if (Props.getBool("write_metrics", false)) { diff --git a/src/main/scala/code/actorsystem/ActorHelper.scala b/src/main/scala/code/actorsystem/ActorHelper.scala new file mode 100644 index 000000000..e12429ff6 --- /dev/null +++ b/src/main/scala/code/actorsystem/ActorHelper.scala @@ -0,0 +1,30 @@ +package code.actorsystem + +import akka.util.Timeout +import code.api.APIFailure +import net.liftweb.common._ + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + +trait ActorHelper { + + def extractResult[T](in: T) = { + in match { + case pf: ParamFailure[_] => + pf.param match { + case af: APIFailure => af + case f: Failure => f + case _ => pf + } + case af: APIFailure => af + case f: Failure => f + case l: List[T] => l + case s: Set[T] => s + case Full(r) => r + case t: T => t + case _ => APIFailure(s"result extraction failed", 501) + } + } +} \ No newline at end of file diff --git a/src/main/scala/code/actorsystem/ActorUtils.scala b/src/main/scala/code/actorsystem/ActorUtils.scala deleted file mode 100644 index a09b5e2da..000000000 --- a/src/main/scala/code/actorsystem/ActorUtils.scala +++ /dev/null @@ -1,62 +0,0 @@ -package code.actorsystem - -import akka.util.Timeout -import code.api.APIFailure -import net.liftweb.common._ - -import scala.concurrent.{Await, Future} -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ - -trait ActorUtils { - - var ACTOR_TIMEOUT: Long = 3 - val TIMEOUT: FiniteDuration = ACTOR_TIMEOUT seconds - implicit val timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds)) - import scala.concurrent.ExecutionContext.Implicits.global - - def extractFuture[T](f: Future[Any]): T = { - val r = f.map { - case s: Set[T] => s - case l: List[T] => l - case t: T => t - case _ => Empty ~> APIFailure(s"future extraction failed", 501) - } - Await.result(r, TIMEOUT).asInstanceOf[T] - } - - def extractFutureToBox[T](f: Future[Any]): Box[T] = { - val r = f.map { - case pf: ParamFailure[_] => Empty ~> pf - case af: APIFailure => Empty ~> af - case f: Failure => f - case Empty => Empty - case t: T => Full(t) - case _ => Empty ~> APIFailure(s"future extraction to box failed", 501) - } - Await.result(r, TIMEOUT) - } - -} - - -trait ActorHelper { - - def extractResult[T](in: T) = { - in match { - case pf: ParamFailure[_] => - pf.param match { - case af: APIFailure => af - case f: Failure => f - case _ => pf - } - case af: APIFailure => af - case f: Failure => f - case l: List[T] => l - case s: Set[T] => s - case Full(r) => r - case t: T => t - case _ => APIFailure(s"result extraction failed", 501) - } - } -} \ No newline at end of file diff --git a/src/main/scala/code/bankconnectors/KafkaHelper.scala b/src/main/scala/code/bankconnectors/KafkaHelper.scala index 27572ea61..5c2ef2e7d 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelper.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelper.scala @@ -1,18 +1,57 @@ package code.bankconnectors +import akka.actor.ActorSelection import net.liftweb.common.{Full, _} import akka.pattern.ask import akka.util.Timeout +import code.api.APIFailure import code.util.Helper.MdcLoggable import net.liftweb.json.JValue +import net.liftweb.util.Props -import scala.concurrent.Await +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ - +import scala.concurrent.ExecutionContext.Implicits.global object KafkaHelper extends KafkaHelper -trait KafkaHelper extends KafkaHelperActorInit with MdcLoggable { +trait KafkaHelper extends MdcLoggable { + + // Deafult is 3 seconds, which should be more than enough for slower systems + val ACTOR_TIMEOUT: Long = Props.getLong("connector.timeout").openOr(3) + val TIMEOUT: FiniteDuration = ACTOR_TIMEOUT seconds + + + def extractFuture[T](f: Future[Any]): T = { + val r = f.map { + case s: Set[T] => s + case l: List[T] => l + case t: T => t + case _ => Empty ~> APIFailure(s"future extraction failed", 501) + } + Await.result(r, TIMEOUT).asInstanceOf[T] + } + + def extractFutureToBox[T](f: Future[Any]): Box[T] = { + val r = f.map { + case pf: ParamFailure[_] => Empty ~> pf + case af: APIFailure => Empty ~> af + case f: Failure => f + case Empty => Empty + case t: T => Full(t) + case _ => Empty ~> APIFailure(s"future extraction to box failed", 501) + } + Await.result(r, TIMEOUT) + } + + val actorName: String = CreateActorNameFromClassName(this.getClass.getName) + val actor: ActorSelection = KafkaHelperLookupSystem.getKafkaHelperActor(actorName) + + def CreateActorNameFromClassName(c: String): String = { + val n = c.substring(c.lastIndexOf('.') + 1 ).replaceAll("\\$", "") + val name = Character.toLowerCase(n.charAt(0)) + n.substring(1) + name + } /** * Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala @@ -33,8 +72,9 @@ trait KafkaHelper extends KafkaHelperActorInit with MdcLoggable { process(mapRequest) } + implicit val timeout: Timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds)) def process (request: Map[String, String]): JValue ={ - extractFuture(actor ? processRequest(request)) + extractFuture(actor ? processRequest(request)) } case class processRequest ( diff --git a/src/main/scala/code/bankconnectors/KafkaHelperActor.scala b/src/main/scala/code/bankconnectors/KafkaHelperActor.scala index 12a4593b1..2a61431ca 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelperActor.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelperActor.scala @@ -19,7 +19,10 @@ import org.apache.kafka.common.errors.WakeupException import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionException, Future} -class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable { +class KafkaHelperActor extends Actor with MdcLoggable { + + implicit val formats = DefaultFormats + val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set") val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set") @@ -44,7 +47,7 @@ class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable { var producer = new KafkaProducer[String, String](producerProps) var consumer = new KafkaConsumer[String, String](consumerProps) - implicit val formats = DefaultFormats + def getResponse(reqId: String): String = { var res = """{"error":"KafkaConsumer could not fetch response"}""" @@ -87,6 +90,7 @@ class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable { Await.result(futureResponse, Duration("3 seconds")) } + def process(request: Map[String,String]): String = { val reqId = UUID.randomUUID().toString val jsonRequest = Extraction.decompose(request) diff --git a/src/main/scala/code/bankconnectors/KafkaHelperActors.scala b/src/main/scala/code/bankconnectors/KafkaHelperActors.scala index 151610b0f..c5393badf 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelperActors.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelperActors.scala @@ -1,27 +1,8 @@ package code.bankconnectors import akka.actor.{ActorSystem, Props => ActorProps} -import code.actorsystem.ActorUtils -import code.actorsystem.ObpActorConfig -import code.remotedata._ import code.util.Helper import code.util.Helper.MdcLoggable -import com.typesafe.config.ConfigFactory -import net.liftweb.util.Props - -trait KafkaHelperActorInit extends ActorUtils { - - // Deafult is 3 seconds, which should be more than enough for slower systems - ACTOR_TIMEOUT = Props.getLong("connector.timeout").openOr(3) - - val actorName = CreateActorNameFromClassName(this.getClass.getName) - val actor = KafkaHelperLookupSystem.getKafkaHelperActor(actorName) - - def CreateActorNameFromClassName(c: String): String = { - val n = c.replaceFirst("^.*KafkaHelper", "") - Character.toLowerCase(n.charAt(0)) + n.substring(1) - } -} object KafkaHelperActors extends MdcLoggable { @@ -36,9 +17,7 @@ object KafkaHelperActors extends MdcLoggable { def startLocalKafkaHelperWorkers(system: ActorSystem): Unit = { logger.info("Starting local KafkaHelper workers") - //logger.info(ObpActorConfig.localConf) - //val system = ActorSystem.create(s"ObpActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.localConf))) - startKafkaHelperActors(system) + startKafkaHelperActors(system) } diff --git a/src/main/scala/code/remotedata/RemotedataActors.scala b/src/main/scala/code/remotedata/RemotedataActors.scala index 37666dcd6..d910960e6 100644 --- a/src/main/scala/code/remotedata/RemotedataActors.scala +++ b/src/main/scala/code/remotedata/RemotedataActors.scala @@ -2,10 +2,11 @@ package code.remotedata import java.util.concurrent.TimeUnit -import akka.actor.{ActorSystem, Props => ActorProps} +import akka.actor.{ActorSelection, ActorSystem, Props => ActorProps} +import akka.util.Timeout import bootstrap.liftweb.ToSchemify -import code.actorsystem.ActorUtils import code.actorsystem.ObpActorConfig +import code.api.APIFailure import code.util.Helper import com.typesafe.config.ConfigFactory import net.liftweb.common._ @@ -17,32 +18,62 @@ import net.liftweb.util.Props import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import code.util.Helper.MdcLoggable +import scala.concurrent.{Await, Future} - -trait RemotedataActorInit extends ActorUtils { +trait RemotedataActorInit { // Deafult is 3 seconds, which should be more than enough for slower systems - ACTOR_TIMEOUT = Props.getLong("remotedata.timeout").openOr(3) + val ACTOR_TIMEOUT: Long = Props.getLong("remotedata.timeout").openOr(3) - val actorName = CreateActorNameFromClassName(this.getClass.getName) - val actor = RemotedataLookupSystem.getRemotedataActor(actorName) + val actorName: String = CreateRemotedataActorNameFromClassName(this.getClass.getName) + val actor: ActorSelection = RemotedataLookupSystem.getRemotedataActor(actorName) + val TIMEOUT: FiniteDuration = ACTOR_TIMEOUT seconds + implicit val timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds)) - def CreateActorNameFromClassName(c: String): String = { - val n = c.replaceFirst("^.*Remotedata", "") - Character.toLowerCase(n.charAt(0)) + n.substring(1) + def CreateRemotedataActorNameFromClassName(c: String): String = { + val n = c.replaceFirst("^.*Remotedata", "").replaceAll("\\$", "") + val name = Character.toLowerCase(n.charAt(0)) + n.substring(1) + name } + + def getRemotedataActorName(): String = { + "test" //actorName + } + + def extractFuture[T](f: Future[Any]): T = { + val r = f.map { + case s: Set[T] => s + case l: List[T] => l + case t: T => t + case _ => Empty ~> APIFailure(s"future extraction failed", 501) + } + Await.result(r, TIMEOUT).asInstanceOf[T] + } + + def extractFutureToBox[T](f: Future[Any]): Box[T] = { + val r = f.map { + case pf: ParamFailure[_] => Empty ~> pf + case af: APIFailure => Empty ~> af + case f: Failure => f + case Empty => Empty + case t: T => Full(t) + case _ => Empty ~> APIFailure(s"future extraction to box failed", 501) + } + Await.result(r, TIMEOUT) + } + } object RemotedataActors extends MdcLoggable { - val props_hostname = Helper.getHostname - def startRemotedataActors(actorSystem: ActorSystem) = { + //val t = RemotedataAccountHolders.cc + println("=============================> " + actorSystem) val actorsRemotedata = Map( - ActorProps[RemotedataAccountHoldersActor] -> RemotedataAccountHolders.actorName, + ActorProps[RemotedataAccountHoldersActor] -> "accountHolders", //RemotedataAccountHolders.actorName, ActorProps[RemotedataCommentsActor] -> RemotedataComments.actorName, ActorProps[RemotedataCounterpartiesActor] -> RemotedataCounterparties.actorName, ActorProps[RemotedataTagsActor] -> RemotedataTags.actorName, @@ -67,12 +98,11 @@ object RemotedataActors extends MdcLoggable { def startLocalRemotedataWorkers( system: ActorSystem ): Unit = { logger.info("Starting local Remotedata actors") - //logger.info(ObpActorConfig.localConf) - //val system = ActorSystem.create(s"RemotedataActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.localConf))) startRemotedataActors(system) } def startRemoteWorkerSystem(): Unit = { + val props_hostname = Helper.getHostname logger.info("Starting remote RemotedataLookupSystem") logger.info(ObpActorConfig.remoteConf) val system = ActorSystem(s"RemotedataActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.remoteConf)))