Currently all broken, just a backup-commit

This commit is contained in:
Petar Bozin 2017-04-28 18:09:24 +02:00
parent e5bbf8295c
commit 445db575f0
8 changed files with 142 additions and 120 deletions

View File

@ -6,7 +6,7 @@
</encoder>
</appender>
<root level="INFO">
<root level="DEBUG">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -198,23 +198,9 @@ class Boot extends MdcLoggable {
logger.info("running mode: " + runningMode)
logger.info(s"ApiPathZero (the bit before version) is $ApiPathZero")
if (runningMode == "Production mode")
System.setProperty("log_dir", Helper.getHostname)
logger.debug(s"If you can read this, logging level is debug")
val actorSystem = ObpActorSystem.startLocalActorSystem()
KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem)
if (!Props.getBool("remotedata.enable", false)) {
try {
logger.info(s"RemotedataActors.startLocalRemotedataWorkers(actorSystem) starting")
RemotedataActors.startLocalRemotedataWorkers(actorSystem)
} catch {
case ex: Exception => logger.warn(s"RemotedataActors.startLocalRemotedataWorkers(${actorSystem}) could not start: $ex")
}
}
// where to search snippets
LiftRules.addToPackages("code")
@ -281,6 +267,21 @@ class Boot extends MdcLoggable {
}
}
if (connector.startsWith("kafka")) {
logger.info(s"KafkaHelperActors.startLocalKafkaHelperWorkers( ${actorSystem} ) starting")
KafkaHelperActors.startLocalKafkaHelperWorkers(actorSystem)
}
if (!Props.getBool("remotedata.enable", false)) {
try {
logger.info(s"RemotedataActors.startLocalRemotedataWorkers( ${actorSystem} ) starting")
RemotedataActors.startLocalRemotedataWorkers(actorSystem)
} catch {
case ex: Exception => logger.warn(s"RemotedataActors.startLocalRemotedataWorkers( ${actorSystem} ) could not start: $ex")
}
}
// API Metrics (logs of API calls)
// If set to true we will write each URL with params to a datastore / log file
if (Props.getBool("write_metrics", false)) {

View File

@ -0,0 +1,30 @@
package code.actorsystem
import akka.util.Timeout
import code.api.APIFailure
import net.liftweb.common._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
trait ActorHelper {
def extractResult[T](in: T) = {
in match {
case pf: ParamFailure[_] =>
pf.param match {
case af: APIFailure => af
case f: Failure => f
case _ => pf
}
case af: APIFailure => af
case f: Failure => f
case l: List[T] => l
case s: Set[T] => s
case Full(r) => r
case t: T => t
case _ => APIFailure(s"result extraction failed", 501)
}
}
}

View File

@ -1,62 +0,0 @@
package code.actorsystem
import akka.util.Timeout
import code.api.APIFailure
import net.liftweb.common._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
trait ActorUtils {
var ACTOR_TIMEOUT: Long = 3
val TIMEOUT: FiniteDuration = ACTOR_TIMEOUT seconds
implicit val timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds))
import scala.concurrent.ExecutionContext.Implicits.global
def extractFuture[T](f: Future[Any]): T = {
val r = f.map {
case s: Set[T] => s
case l: List[T] => l
case t: T => t
case _ => Empty ~> APIFailure(s"future extraction failed", 501)
}
Await.result(r, TIMEOUT).asInstanceOf[T]
}
def extractFutureToBox[T](f: Future[Any]): Box[T] = {
val r = f.map {
case pf: ParamFailure[_] => Empty ~> pf
case af: APIFailure => Empty ~> af
case f: Failure => f
case Empty => Empty
case t: T => Full(t)
case _ => Empty ~> APIFailure(s"future extraction to box failed", 501)
}
Await.result(r, TIMEOUT)
}
}
trait ActorHelper {
def extractResult[T](in: T) = {
in match {
case pf: ParamFailure[_] =>
pf.param match {
case af: APIFailure => af
case f: Failure => f
case _ => pf
}
case af: APIFailure => af
case f: Failure => f
case l: List[T] => l
case s: Set[T] => s
case Full(r) => r
case t: T => t
case _ => APIFailure(s"result extraction failed", 501)
}
}
}

View File

@ -1,18 +1,57 @@
package code.bankconnectors
import akka.actor.ActorSelection
import net.liftweb.common.{Full, _}
import akka.pattern.ask
import akka.util.Timeout
import code.api.APIFailure
import code.util.Helper.MdcLoggable
import net.liftweb.json.JValue
import net.liftweb.util.Props
import scala.concurrent.Await
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object KafkaHelper extends KafkaHelper
trait KafkaHelper extends KafkaHelperActorInit with MdcLoggable {
trait KafkaHelper extends MdcLoggable {
// Deafult is 3 seconds, which should be more than enough for slower systems
val ACTOR_TIMEOUT: Long = Props.getLong("connector.timeout").openOr(3)
val TIMEOUT: FiniteDuration = ACTOR_TIMEOUT seconds
def extractFuture[T](f: Future[Any]): T = {
val r = f.map {
case s: Set[T] => s
case l: List[T] => l
case t: T => t
case _ => Empty ~> APIFailure(s"future extraction failed", 501)
}
Await.result(r, TIMEOUT).asInstanceOf[T]
}
def extractFutureToBox[T](f: Future[Any]): Box[T] = {
val r = f.map {
case pf: ParamFailure[_] => Empty ~> pf
case af: APIFailure => Empty ~> af
case f: Failure => f
case Empty => Empty
case t: T => Full(t)
case _ => Empty ~> APIFailure(s"future extraction to box failed", 501)
}
Await.result(r, TIMEOUT)
}
val actorName: String = CreateActorNameFromClassName(this.getClass.getName)
val actor: ActorSelection = KafkaHelperLookupSystem.getKafkaHelperActor(actorName)
def CreateActorNameFromClassName(c: String): String = {
val n = c.substring(c.lastIndexOf('.') + 1 ).replaceAll("\\$", "")
val name = Character.toLowerCase(n.charAt(0)) + n.substring(1)
name
}
/**
* Have this function just to keep compatibility for KafkaMappedConnector_vMar2017 and KafkaMappedConnector.scala
@ -33,8 +72,9 @@ trait KafkaHelper extends KafkaHelperActorInit with MdcLoggable {
process(mapRequest)
}
implicit val timeout: Timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds))
def process (request: Map[String, String]): JValue ={
extractFuture(actor ? processRequest(request))
extractFuture(actor ? processRequest(request))
}
case class processRequest (

View File

@ -19,7 +19,10 @@ import org.apache.kafka.common.errors.WakeupException
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionException, Future}
class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable {
class KafkaHelperActor extends Actor with MdcLoggable {
implicit val formats = DefaultFormats
val requestTopic = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set")
val responseTopic = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set")
@ -44,7 +47,7 @@ class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable {
var producer = new KafkaProducer[String, String](producerProps)
var consumer = new KafkaConsumer[String, String](consumerProps)
implicit val formats = DefaultFormats
def getResponse(reqId: String): String = {
var res = """{"error":"KafkaConsumer could not fetch response"}"""
@ -87,6 +90,7 @@ class KafkaHelperActor extends Actor with ActorHelper with MdcLoggable {
Await.result(futureResponse, Duration("3 seconds"))
}
def process(request: Map[String,String]): String = {
val reqId = UUID.randomUUID().toString
val jsonRequest = Extraction.decompose(request)

View File

@ -1,27 +1,8 @@
package code.bankconnectors
import akka.actor.{ActorSystem, Props => ActorProps}
import code.actorsystem.ActorUtils
import code.actorsystem.ObpActorConfig
import code.remotedata._
import code.util.Helper
import code.util.Helper.MdcLoggable
import com.typesafe.config.ConfigFactory
import net.liftweb.util.Props
trait KafkaHelperActorInit extends ActorUtils {
// Deafult is 3 seconds, which should be more than enough for slower systems
ACTOR_TIMEOUT = Props.getLong("connector.timeout").openOr(3)
val actorName = CreateActorNameFromClassName(this.getClass.getName)
val actor = KafkaHelperLookupSystem.getKafkaHelperActor(actorName)
def CreateActorNameFromClassName(c: String): String = {
val n = c.replaceFirst("^.*KafkaHelper", "")
Character.toLowerCase(n.charAt(0)) + n.substring(1)
}
}
object KafkaHelperActors extends MdcLoggable {
@ -36,9 +17,7 @@ object KafkaHelperActors extends MdcLoggable {
def startLocalKafkaHelperWorkers(system: ActorSystem): Unit = {
logger.info("Starting local KafkaHelper workers")
//logger.info(ObpActorConfig.localConf)
//val system = ActorSystem.create(s"ObpActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.localConf)))
startKafkaHelperActors(system)
startKafkaHelperActors(system)
}

View File

@ -2,10 +2,11 @@ package code.remotedata
import java.util.concurrent.TimeUnit
import akka.actor.{ActorSystem, Props => ActorProps}
import akka.actor.{ActorSelection, ActorSystem, Props => ActorProps}
import akka.util.Timeout
import bootstrap.liftweb.ToSchemify
import code.actorsystem.ActorUtils
import code.actorsystem.ObpActorConfig
import code.api.APIFailure
import code.util.Helper
import com.typesafe.config.ConfigFactory
import net.liftweb.common._
@ -17,32 +18,62 @@ import net.liftweb.util.Props
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import code.util.Helper.MdcLoggable
import scala.concurrent.{Await, Future}
trait RemotedataActorInit extends ActorUtils {
trait RemotedataActorInit {
// Deafult is 3 seconds, which should be more than enough for slower systems
ACTOR_TIMEOUT = Props.getLong("remotedata.timeout").openOr(3)
val ACTOR_TIMEOUT: Long = Props.getLong("remotedata.timeout").openOr(3)
val actorName = CreateActorNameFromClassName(this.getClass.getName)
val actor = RemotedataLookupSystem.getRemotedataActor(actorName)
val actorName: String = CreateRemotedataActorNameFromClassName(this.getClass.getName)
val actor: ActorSelection = RemotedataLookupSystem.getRemotedataActor(actorName)
val TIMEOUT: FiniteDuration = ACTOR_TIMEOUT seconds
implicit val timeout = Timeout(ACTOR_TIMEOUT * (1000 milliseconds))
def CreateActorNameFromClassName(c: String): String = {
val n = c.replaceFirst("^.*Remotedata", "")
Character.toLowerCase(n.charAt(0)) + n.substring(1)
def CreateRemotedataActorNameFromClassName(c: String): String = {
val n = c.replaceFirst("^.*Remotedata", "").replaceAll("\\$", "")
val name = Character.toLowerCase(n.charAt(0)) + n.substring(1)
name
}
def getRemotedataActorName(): String = {
"test" //actorName
}
def extractFuture[T](f: Future[Any]): T = {
val r = f.map {
case s: Set[T] => s
case l: List[T] => l
case t: T => t
case _ => Empty ~> APIFailure(s"future extraction failed", 501)
}
Await.result(r, TIMEOUT).asInstanceOf[T]
}
def extractFutureToBox[T](f: Future[Any]): Box[T] = {
val r = f.map {
case pf: ParamFailure[_] => Empty ~> pf
case af: APIFailure => Empty ~> af
case f: Failure => f
case Empty => Empty
case t: T => Full(t)
case _ => Empty ~> APIFailure(s"future extraction to box failed", 501)
}
Await.result(r, TIMEOUT)
}
}
object RemotedataActors extends MdcLoggable {
val props_hostname = Helper.getHostname
def startRemotedataActors(actorSystem: ActorSystem) = {
//val t = RemotedataAccountHolders.cc
println("=============================> " + actorSystem)
val actorsRemotedata = Map(
ActorProps[RemotedataAccountHoldersActor] -> RemotedataAccountHolders.actorName,
ActorProps[RemotedataAccountHoldersActor] -> "accountHolders", //RemotedataAccountHolders.actorName,
ActorProps[RemotedataCommentsActor] -> RemotedataComments.actorName,
ActorProps[RemotedataCounterpartiesActor] -> RemotedataCounterparties.actorName,
ActorProps[RemotedataTagsActor] -> RemotedataTags.actorName,
@ -67,12 +98,11 @@ object RemotedataActors extends MdcLoggable {
def startLocalRemotedataWorkers( system: ActorSystem ): Unit = {
logger.info("Starting local Remotedata actors")
//logger.info(ObpActorConfig.localConf)
//val system = ActorSystem.create(s"RemotedataActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.localConf)))
startRemotedataActors(system)
}
def startRemoteWorkerSystem(): Unit = {
val props_hostname = Helper.getHostname
logger.info("Starting remote RemotedataLookupSystem")
logger.info(ObpActorConfig.remoteConf)
val system = ActorSystem(s"RemotedataActorSystem_${props_hostname}", ConfigFactory.load(ConfigFactory.parseString(ObpActorConfig.remoteConf)))