feature/Add MetricsArchive table

This commit is contained in:
Marko Milić 2022-11-16 13:21:58 +01:00
parent 9336f96009
commit c438920257
9 changed files with 309 additions and 6 deletions

View File

@ -1204,4 +1204,7 @@ user_account_validated_redirect_url =
# Defines is it user is automatically validated, without SCA flow, after user account created
# In case is not defined default value is false
user_account_is_validated = false
user_account_is_validated = false
# Defines the number of days we keep rows in the table "MetricsArchive"
retain_archive_metrics_days = 365

View File

@ -84,7 +84,7 @@ import code.metadata.tags.MappedTag
import code.metadata.transactionimages.MappedTransactionImage
import code.metadata.wheretags.MappedWhereTag
import code.methodrouting.MethodRouting
import code.metrics.{MappedConnectorMetric, MappedMetric}
import code.metrics.{MappedConnectorMetric, MappedMetric, MetricsArchive}
import code.migration.MigrationScriptLog
import code.model.{Consumer, _}
import code.model.dataAccess._
@ -96,7 +96,7 @@ import code.productcollectionitem.MappedProductCollectionItem
import code.products.MappedProduct
import code.ratelimiting.RateLimiting
import code.remotedata.RemotedataActors
import code.scheduler.DatabaseDriverScheduler
import code.scheduler.{DatabaseDriverScheduler, MetricsArchiveScheduler}
import code.scope.{MappedScope, MappedUserScope}
import code.apicollectionendpoint.ApiCollectionEndpoint
import code.apicollection.ApiCollection
@ -683,6 +683,7 @@ class Boot extends MdcLoggable {
case Full(i) => DatabaseDriverScheduler.start(i)
case _ => // Do not start it
}
MetricsArchiveScheduler.start(intervalInSeconds = 86400)
APIUtil.akkaSanityCheck() match {
@ -951,6 +952,7 @@ object ToSchemify {
MappedTransactionRequest,
TransactionRequestAttribute,
MappedMetric,
MetricsArchive,
MapperAccountHolders,
MappedEntitlement,
MappedConnectorMetric,

View File

@ -58,6 +58,21 @@ trait APIMetrics {
verb: String,
httpCode: Option[Int],
correlationId: String): Unit
def saveMetricsArchive(primaryKey: Long,
userId: String,
url: String,
date: Date,
duration: Long,
userName: String,
appName: String,
developerEmail: String,
consumerId: String,
implementedByPartialFunction: String,
implementedInVersion: String,
verb: String,
httpCode: Option[Int],
correlationId: String): Unit
// //TODO: ordering of list? should this be by date? currently not enforced
// def getAllGroupedByUrl() : Map[String, List[APIMetric]]
@ -70,6 +85,8 @@ trait APIMetrics {
def getAllMetrics(queryParams: List[OBPQueryParam]): List[APIMetric]
def getAllMetricsArchive(queryParams: List[OBPQueryParam]): List[APIMetric]
def getAllAggregateMetricsFuture(queryParams: List[OBPQueryParam]): Future[Box[List[AggregateMetrics]]]
def getTopApisFuture(queryParams: List[OBPQueryParam]): Future[Box[List[TopApi]]]
@ -82,10 +99,12 @@ trait APIMetrics {
class RemotedataMetricsCaseClasses {
case class saveMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String)
case class saveMetricsArchive(primaryKey: Long, userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String)
// case class getAllGroupedByUrl()
// case class getAllGroupedByDay()
// case class getAllGroupedByUserId()
case class getAllMetrics(queryParams: List[OBPQueryParam])
case class getAllMetricsArchive(queryParams: List[OBPQueryParam])
case class getAllAggregateMetricsFuture(queryParams: List[OBPQueryParam])
case class getTopApisFuture(queryParams: List[OBPQueryParam])
case class getTopConsumersFuture(queryParams: List[OBPQueryParam])
@ -96,6 +115,7 @@ object RemotedataMetricsCaseClasses extends RemotedataMetricsCaseClasses
trait APIMetric {
def getPrimaryKey() : Long
def getUrl() : String
def getDate() : Date
def getDuration(): Long

View File

@ -19,6 +19,7 @@ object ElasticsearchMetrics extends APIMetrics {
es.indexMetric(userId, url, date, duration, userName, appName, developerEmail, correlationId)
}
}
override def saveMetricsArchive(primaryKey: Long, userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String): Unit = ???
// override def getAllGroupedByUserId(): Map[String, List[APIMetric]] = {
// //TODO: replace the following with valid ES query
@ -53,6 +54,24 @@ object ElasticsearchMetrics extends APIMetrics {
MappedMetric.findAll(optionalParams: _*)
}
override def getAllMetricsArchive(queryParams: List[OBPQueryParam]): List[APIMetric] = {
//TODO: replace the following with valid ES query
val limit = queryParams.collect { case OBPLimit(value) => MaxRows[MetricsArchive](value) }.headOption
val offset = queryParams.collect { case OBPOffset(value) => StartAt[MetricsArchive](value) }.headOption
val fromDate = queryParams.collect { case OBPFromDate(date) => By_>=(MetricsArchive.date, date) }.headOption
val toDate = queryParams.collect { case OBPToDate(date) => By_<=(MetricsArchive.date, date) }.headOption
val ordering = queryParams.collect {
//we don't care about the intended sort field and only sort on finish date for now
case OBPOrdering(_, direction) =>
direction match {
case OBPAscending => OrderBy(MetricsArchive.date, Ascending)
case OBPDescending => OrderBy(MetricsArchive.date, Descending)
}
}
val optionalParams : Seq[QueryParam[MetricsArchive]] = Seq(limit.toSeq, offset.toSeq, fromDate.toSeq, toDate.toSeq, ordering).flatten
MetricsArchive.findAll(optionalParams: _*)
}
override def getAllAggregateMetricsFuture(queryParams: List[OBPQueryParam]): Future[Box[List[AggregateMetrics]]] = ???

View File

@ -64,6 +64,29 @@ object MappedMetrics extends APIMetrics with MdcLoggable{
}
metric.save
}
override def saveMetricsArchive(primaryKey: Long, userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String): Unit = {
val metric = MetricsArchive.find(By(MetricsArchive.id, primaryKey)).getOrElse(MetricsArchive.create)
metric
.primaryKey(primaryKey)
.userId(userId)
.url(url)
.date(date)
.duration(duration)
.userName(userName)
.appName(appName)
.developerEmail(developerEmail)
.consumerId(consumerId)
.implementedByPartialFunction(implementedByPartialFunction)
.implementedInVersion(implementedInVersion)
.verb(verb)
.correlationId(correlationId)
httpCode match {
case Some(code) => metric.httpCode(code)
case None =>
}
metric.save
}
private lazy val getDbConnectionParameters: (String, String, String) = {
val dbUrl = APIUtil.getPropsValue("db.url") openOr Constant.h2DatabaseDefaultUrlValue
@ -143,6 +166,78 @@ object MappedMetrics extends APIMetrics with MdcLoggable{
values.map(NotBy(MappedMetric.appName, _))
}.headOption
Seq(
offset.toSeq,
fromDate.toSeq,
toDate.toSeq,
ordering,
consumerId.toSeq,
userId.toSeq,
bankId.toSeq,
url.toSeq,
appName.toSeq,
implementedInVersion.toSeq,
implementedByPartialFunction.toSeq,
verb.toSeq,
limit.toSeq,
correlationId.toSeq,
duration.toSeq,
anon.toSeq,
excludeAppNames.toSeq.flatten
).flatten
}
//TODO, maybe move to `APIUtil.scala`
private def getQueryParamsMetricsArchive(queryParams: List[OBPQueryParam]) = {
val limit = queryParams.collect { case OBPLimit(value) => MaxRows[MetricsArchive](value) }.headOption
val offset = queryParams.collect { case OBPOffset(value) => StartAt[MetricsArchive](value) }.headOption
val fromDate = queryParams.collect { case OBPFromDate(date) => By_>=(MetricsArchive.date, date) }.headOption
val toDate = queryParams.collect { case OBPToDate(date) => By_<=(MetricsArchive.date, date) }.headOption
val ordering = queryParams.collect {
case OBPOrdering(field, dir) =>
val direction = dir match {
case OBPAscending => Ascending
case OBPDescending => Descending
}
field match {
case Some(s) if s == "user_id" => OrderBy(MetricsArchive.userId, direction)
case Some(s) if s == "user_name" => OrderBy(MetricsArchive.userName, direction)
case Some(s) if s == "developer_email" => OrderBy(MetricsArchive.developerEmail, direction)
case Some(s) if s == "app_name" => OrderBy(MetricsArchive.appName, direction)
case Some(s) if s == "url" => OrderBy(MetricsArchive.url, direction)
case Some(s) if s == "date" => OrderBy(MetricsArchive.date, direction)
case Some(s) if s == "consumer_id" => OrderBy(MetricsArchive.consumerId, direction)
case Some(s) if s == "verb" => OrderBy(MetricsArchive.verb, direction)
case Some(s) if s == "implemented_in_version" => OrderBy(MetricsArchive.implementedInVersion, direction)
case Some(s) if s == "implemented_by_partial_function" => OrderBy(MetricsArchive.implementedByPartialFunction, direction)
case Some(s) if s == "correlation_id" => OrderBy(MetricsArchive.correlationId, direction)
case Some(s) if s == "duration" => OrderBy(MetricsArchive.duration, direction)
case _ => OrderBy(MetricsArchive.date, Descending)
}
}
// he optional variables:
val consumerId = queryParams.collect { case OBPConsumerId(value) => value}.headOption
.flatMap(consumerIdToPrimaryKey)
.map(By(MetricsArchive.consumerId, _) )
val bankId = queryParams.collect { case OBPBankId(value) => Like(MetricsArchive.url, s"%banks/$value%") }.headOption
val userId = queryParams.collect { case OBPUserId(value) => By(MetricsArchive.userId, value) }.headOption
val url = queryParams.collect { case OBPUrl(value) => By(MetricsArchive.url, value) }.headOption
val appName = queryParams.collect { case OBPAppName(value) => By(MetricsArchive.appName, value) }.headOption
val implementedInVersion = queryParams.collect { case OBPImplementedInVersion(value) => By(MetricsArchive.implementedInVersion, value) }.headOption
val implementedByPartialFunction = queryParams.collect { case OBPImplementedByPartialFunction(value) => By(MetricsArchive.implementedByPartialFunction, value) }.headOption
val verb = queryParams.collect { case OBPVerb(value) => By(MetricsArchive.verb, value) }.headOption
val correlationId = queryParams.collect { case OBPCorrelationId(value) => By(MetricsArchive.correlationId, value) }.headOption
val duration = queryParams.collect { case OBPDuration(value) => By(MetricsArchive.duration, value) }.headOption
val anon = queryParams.collect {
case OBPAnon(true) => By(MetricsArchive.userId, "null")
case OBPAnon(false) => NotBy(MetricsArchive.userId, "null")
}.headOption
val excludeAppNames = queryParams.collect {
case OBPExcludeAppNames(values) =>
values.map(NotBy(MetricsArchive.appName, _))
}.headOption
Seq(
offset.toSeq,
fromDate.toSeq,
@ -180,6 +275,22 @@ object MappedMetrics extends APIMetrics with MdcLoggable{
}
}
}
// TODO Cache this as long as fromDate and toDate are in the past (before now)
override def getAllMetricsArchive(queryParams: List[OBPQueryParam]): List[APIMetric] = {
/**
* Please note that "var cacheKey = (randomUUID().toString, randomUUID().toString, randomUUID().toString)"
* is just a temporary value field with UUID values in order to prevent any ambiguity.
* The real value will be assigned by Macro during compile time at this line of a code:
* https://github.com/OpenBankProject/scala-macros/blob/master/macros/src/main/scala/com/tesobe/CacheKeyFromArgumentsMacro.scala#L49
*/
var cacheKey = (randomUUID().toString, randomUUID().toString, randomUUID().toString)
CacheKeyFromArguments.buildCacheKey {
Caching.memoizeSyncWithProvider(Some(cacheKey.toString()))(cachedAllMetrics days){
val optionalParams = getQueryParamsMetricsArchive(queryParams)
MetricsArchive.findAll(optionalParams: _*)
}
}
}
private def extendCurrentQuery (length: Int) ={
@ -473,6 +584,7 @@ object MappedMetrics extends APIMetrics with MdcLoggable{
}
class MappedMetric extends APIMetric with LongKeyedMapper[MappedMetric] with IdPK {
override def getSingleton = MappedMetric
object userId extends UUIDString(this)
@ -495,6 +607,7 @@ class MappedMetric extends APIMetric with LongKeyedMapper[MappedMetric] with IdP
object correlationId extends MappedUUID(this)
override def getPrimaryKey(): Long = id.get
override def getUrl(): String = url.get
override def getDate(): Date = date.get
override def getDuration(): Long = duration.get
@ -511,6 +624,57 @@ class MappedMetric extends APIMetric with LongKeyedMapper[MappedMetric] with IdP
}
object MappedMetric extends MappedMetric with LongKeyedMetaMapper[MappedMetric] {
//override def dbIndexes = Index(userId) :: Index(url) :: Index(date) :: Index(userName) :: Index(appName) :: Index(developerEmail) :: super.dbIndexes
// Please note that the old table name was "MappedMetric"
// Renaming implications:
// - at an existing sandbox the table "MappedMetric" still exists with rows until this change is deployed at it
// and new rows are stored in the table "Metric"
// - at a fresh sandbox there is no the table "MappedMetric", only "Metric" is present
override def dbTableName = "Metric" // define the DB table name
override def dbIndexes = Index(date) :: Index(consumerId) :: super.dbIndexes
}
class MetricsArchive extends APIMetric with LongKeyedMapper[MetricsArchive] with IdPK {
override def getSingleton = MetricsArchive
object primaryKey extends MappedLong(this)
object userId extends UUIDString(this)
object url extends MappedString(this, 2000) // TODO Introduce / use class for Mapped URLs
object date extends MappedDateTime(this)
object duration extends MappedLong(this)
object userName extends MappedString(this, 64) // TODO constrain source value length / truncate value on insert
object appName extends MappedString(this, 64) // TODO constrain source value length / truncate value on insert
object developerEmail extends MappedString(this, 64) // TODO constrain source value length / truncate value on insert
//The consumerId, Foreign key to Consumer not key
object consumerId extends UUIDString(this)
//name of the Scala Partial Function being used for the endpoint
object implementedByPartialFunction extends MappedString(this, 128)
//name of version where the call is implemented) -- S.request.get.view
object implementedInVersion extends MappedString(this, 16)
//(GET, POST etc.) --S.request.get.requestType
object verb extends MappedString(this, 16)
object httpCode extends MappedInt(this)
object correlationId extends MappedUUID(this)
override def getPrimaryKey(): Long = primaryKey.get
override def getUrl(): String = url.get
override def getDate(): Date = date.get
override def getDuration(): Long = duration.get
override def getUserId(): String = userId.get
override def getUserName(): String = userName.get
override def getAppName(): String = appName.get
override def getDeveloperEmail(): String = developerEmail.get
override def getConsumerId(): String = consumerId.get
override def getImplementedByPartialFunction(): String = implementedByPartialFunction.get
override def getImplementedInVersion(): String = implementedInVersion.get
override def getVerb(): String = verb.get
override def getHttpCode(): Int = httpCode.get
override def getCorrelationId(): String = correlationId.get
}
object MetricsArchive extends MetricsArchive with LongKeyedMetaMapper[MetricsArchive] {
override def dbIndexes =
Index(userId) :: Index(consumerId) :: Index(url) :: Index(date) :: Index(userName) ::
Index(appName) :: Index(developerEmail) :: super.dbIndexes
}

View File

@ -31,13 +31,13 @@ import java.util.Date
import code.api.util.OBPQueryParam
import net.liftweb.common.Box
import net.liftweb.mongodb.record.field.{DateField, ObjectIdPk}
import net.liftweb.mongodb.record.field.{DateField, LongPk}
import net.liftweb.mongodb.record.{MongoMetaRecord, MongoRecord}
import net.liftweb.record.field.{IntField, LongField, StringField}
import scala.concurrent.Future
private class MongoAPIMetric extends MongoRecord[MongoAPIMetric] with ObjectIdPk[MongoAPIMetric] with APIMetric {
private class MongoAPIMetric extends MongoRecord[MongoAPIMetric] with LongPk[MongoAPIMetric] with APIMetric {
def meta = MongoAPIMetric
object userId extends StringField(this,255)
object url extends StringField(this,255)
@ -58,6 +58,7 @@ import scala.concurrent.Future
object correlationId extends StringField(this,255)
def getPrimaryKey(): Long = id.get
def getUrl() = url.get
def getDate() = date.get
def getDuration(): Long = duration.get
@ -91,6 +92,8 @@ private object MongoAPIMetric extends MongoAPIMetric with MongoMetaRecord[MongoA
correlationId(correlationId)
saveTheRecord()
}
override def saveMetricsArchive(primaryKey: Long, userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String): Unit = ???
// def getAllGroupedByUrl() : Map[String, List[APIMetric]] = {
// MongoAPIMetric.findAll.groupBy[String](_.url.get)
@ -107,6 +110,8 @@ private object MongoAPIMetric extends MongoAPIMetric with MongoMetaRecord[MongoA
override def getAllMetrics(queryParams: List[OBPQueryParam]): List[APIMetric] = {
MongoAPIMetric.findAll
}
override def getAllMetricsArchive(queryParams: List[OBPQueryParam]): List[APIMetric] = ???
override def bulkDeleteMetrics(): Boolean = ???
override def getAllAggregateMetricsFuture(queryParams: List[OBPQueryParam]): Future[Box[List[AggregateMetrics]]] = ???

View File

@ -18,6 +18,10 @@ object RemotedataMetrics extends ObpActorInit with APIMetrics {
def saveMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String) : Unit = getValueFromFuture(
(actor ? cc.saveMetric(userId, url, date, duration, userName, appName, developerEmail, consumerId, implementedByPartialFunction, implementedInVersion, verb, httpCode, correlationId)).mapTo[Unit]
)
def saveMetricsArchive(primaryKey: Long, userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String) : Unit = getValueFromFuture(
(actor ? cc.saveMetricsArchive(primaryKey, userId, url, date, duration, userName, appName, developerEmail, consumerId, implementedByPartialFunction, implementedInVersion, verb, httpCode, correlationId)).mapTo[Unit]
)
// def getAllGroupedByUrl() : Map[String, List[APIMetric]] =
// extractFuture(actor ? cc.getAllGroupedByUrl())
@ -32,6 +36,10 @@ object RemotedataMetrics extends ObpActorInit with APIMetrics {
(actor ? cc.getAllMetrics(queryParams)).mapTo[List[APIMetric]]
)
def getAllMetricsArchive(queryParams: List[OBPQueryParam]): List[APIMetric] = getValueFromFuture(
(actor ? cc.getAllMetricsArchive(queryParams)).mapTo[List[APIMetric]]
)
override def getAllAggregateMetricsFuture(queryParams: List[OBPQueryParam]): Future[Box[List[AggregateMetrics]]] ={
logger.debug(s"RemotedataMetrics.getAllAggregateMetrics($queryParams)")
(actor ? cc.getAllAggregateMetricsFuture(queryParams)).mapTo[Box[List[AggregateMetrics]]]

View File

@ -40,6 +40,10 @@ class RemotedataMetricsActor extends Actor with ObpActorHelper with MdcLoggable
case cc.getAllMetrics(queryParams) =>
logger.debug("getAllMetrics()")
sender ! (mapper.getAllMetrics(queryParams))
case cc.getAllMetricsArchive(queryParams) =>
logger.debug("getAllMetricsArchive()")
sender ! (mapper.getAllMetricsArchive(queryParams))
case cc.getAllAggregateMetricsFuture(queryParams: List[OBPQueryParam]) =>
logger.debug(s"RemotedataMetricsActor.getAllAggregateMetricsFuture($queryParams)")

View File

@ -0,0 +1,78 @@
package code.scheduler
import java.util.concurrent.TimeUnit
import java.util.{Calendar, Date}
import code.actorsystem.ObpLookupSystem
import code.api.util.{APIUtil, OBPFromDate}
import code.metrics.{APIMetrics, MetricsArchive}
import code.util.Helper.MdcLoggable
import net.liftweb.mapper.By_<=
import scala.concurrent.duration._
object MetricsArchiveScheduler extends MdcLoggable {
private lazy val actorSystem = ObpLookupSystem.obpLookupSystem
implicit lazy val executor = actorSystem.dispatcher
private lazy val scheduler = actorSystem.scheduler
def start(intervalInSeconds: Long): Unit = {
scheduler.schedule(
initialDelay = Duration(getMillisTillMidnight(), TimeUnit.MILLISECONDS),
interval = Duration(intervalInSeconds, TimeUnit.SECONDS),
runnable = new Runnable {
def run(): Unit = {
copyDataToMetricsArchive()
deleteOutdatedRowsFromMetricsArchive()
}
}
)
}
def copyDataToMetricsArchive() = {
val currentTime = new Date()
val twoDaysAgo: Date = new Date(currentTime.getTime - (86400000 * 2))
// Get the data from the table "Metric" (former "MappedMetric")
val chunkOfData = APIMetrics.apiMetrics.vend.getAllMetrics(List(OBPFromDate(twoDaysAgo)))
chunkOfData map { i =>
// and copy it to the table "MetricsArchive"
APIMetrics.apiMetrics.vend.saveMetricsArchive(
i.getPrimaryKey(),
i.getUserId(),
i.getUrl(),
i.getDate(),
i.getDuration(),
i.getUserName(),
i.getAppName(),
i.getDeveloperEmail(),
i.getConsumerId(),
i.getImplementedByPartialFunction(),
i.getImplementedInVersion(),
i.getVerb(),
Some(i.getHttpCode()),
i.getCorrelationId()
)
}
}
def deleteOutdatedRowsFromMetricsArchive() = {
val currentTime = new Date()
val days = APIUtil.getPropsAsLongValue("retain_archive_metrics_days", 365)
val oneYearAgo: Date = new Date(currentTime.getTime - (86400000 * days))
// Delete the outdated rows from the table "MetricsArchive"
MetricsArchive.bulkDelete_!!(By_<=(MetricsArchive.date, oneYearAgo))
}
private def getMillisTillMidnight(): Long = {
val c = Calendar.getInstance
c.add(Calendar.DAY_OF_MONTH, 1)
c.set(Calendar.HOUR_OF_DAY, 0)
c.set(Calendar.MINUTE, 0)
c.set(Calendar.SECOND, 0)
c.set(Calendar.MILLISECOND, 0)
c.getTimeInMillis - System.currentTimeMillis
}
}