feature/Remove Akka in case of Webhooks 2

This commit is contained in:
Marko Milić 2023-01-10 13:56:50 +01:00
parent 58435d30e7
commit d12abfcc46
3 changed files with 81 additions and 132 deletions

View File

@ -498,6 +498,12 @@
<artifactId>jackson-databind</artifactId>
<version>2.12.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.1</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,30 @@
package code.webhook
import java.io.IOException
import code.webhook.WebhookActor.WebhookRequestTrait
import okhttp3._
object OkHttpWebhookClient {
private val client = new OkHttpClient
@throws[Exception]
def makeAsynchronousRequest(request: Request, webhookRequest: WebhookRequestTrait): Unit = {
client.newCall(request).enqueue(new Callback() {
def onFailure(call: Call, e: IOException): Unit = {
WebhookAction.webhookFailure(e.getMessage, webhookRequest)
}
@throws[IOException]
def onResponse(call: Call, response: Response): Unit = {
val responseBody = response.body
try {
if (!response.isSuccessful) throw new IOException("Unexpected code " + response)
org.scalameta.logger.elem(responseBody.string)
WebhookAction.webhookResponse(response.code().toString, webhookRequest)
} finally if (responseBody != null) responseBody.close()
}
})
}
}

View File

@ -1,23 +1,14 @@
package code.webhook
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import akka.util.ByteString
import code.actorsystem.ObpLookupSystem
import code.api.util.ApiTrigger.{OnBalanceChange, OnCreateTransaction, OnCreditTransaction, OnDebitTransaction}
import code.api.util.{ApiTrigger, CustomJsonFormats}
import code.util.Helper.MdcLoggable
import code.webhook.WebhookActor.{AccountNotificationWebhookRequest, WebhookFailure, WebhookRequest, WebhookRequestTrait, WebhookResponse}
import code.webhook.WebhookActor.{AccountNotificationWebhookRequest, WebhookRequest, WebhookRequestTrait}
import net.liftweb
import net.liftweb.json.Extraction
import net.liftweb.mapper.By
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
import okhttp3.{MediaType, Request, RequestBody}
import code.webhook.OkHttpWebhookClient._
object WebhookHttpClient extends MdcLoggable {
@ -57,7 +48,7 @@ object WebhookHttpClient extends MdcLoggable {
logger.debug("WebhookHttpClient.startEvent(WebhookRequestTrait) i.httpProtocol: " + i.httpProtocol)
val payload = getEventPayload(request)
logger.debug("WebhookHttpClient.startEvent(WebhookRequestTrait) payload: " + payload.toString)
makeRequest(getHttpRequest(i.url, i.httpMethod, i.httpProtocol, getEventPayload(request)), request)
makeAsynchronousRequest(composeRequest(i.url, i.httpMethod, i.httpProtocol, payload), request)
}
}
@ -90,7 +81,7 @@ object WebhookHttpClient extends MdcLoggable {
logger.debug("WebhookHttpClient.startEvent(AccountNotificationWebhookRequest) i.httpProtocol: " + i.httpProtocol)
val payload = getEventPayload(request)
logger.debug("WebhookHttpClient.startEvent(AccountNotificationWebhookRequest) payload: " + payload.toString)
makeRequest(getHttpRequest(i.url, i.httpMethod, i.httpProtocol, payload), request)
makeAsynchronousRequest(composeRequest(i.url, i.httpMethod, i.httpProtocol, payload), request)
}
}
}
@ -107,58 +98,51 @@ object WebhookHttpClient extends MdcLoggable {
* }
*
*/
private def getEventPayload(request: WebhookRequestTrait): RequestEntity = {
def getEventPayload(request: WebhookRequestTrait): Option[String] = {
request.trigger match {
case OnBalanceChange() | OnCreditTransaction() | OnDebitTransaction() | OnCreateTransaction() =>
implicit val formats = CustomJsonFormats.formats
val json = liftweb.json.compactRender(Extraction.decompose(request.toEventPayload))
val entity: RequestEntity = HttpEntity(ContentTypes.`application/json`, json)
entity
Some(json)
case _ =>
HttpEntity.Empty
None
}
}
/**
* This function makes HttpRequest object according to the DB's data related to an account's webhook
* @param uri In most cases it's a URL
* @param method GET/POST/POST/DELETE
* @param httpProtocol HTTP/1.0 / HTTP/1.1 / HTTP/2.0
* @param entity For instance:
* {
* "event_name":"OnCreditTransaction",
* "event_id":"fc7e4a71-5ff1-4006-95bb-7fd9e4adaef9",
* "bank_id":"gh.29.uk.x",
* "account_id":"private_01",
* "amount":"50.00 EUR",
* "balance":"739.00 EUR"
* }
* Please note it's empty in case of GET
* @return HttpRequest object
*/
private def getHttpRequest(uri: String, method: String, httpProtocol: String, entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
* This function makes HttpRequest object according to the DB's data related to an account's webhook
* @param uri In most cases it's a URL
* @param method GET/POST/POST/DELETE
* @param httpProtocol HTTP/1.0 / HTTP/1.1 / HTTP/2.0
* @param json For instance:
* {
* "event_name":"OnCreditTransaction",
* "event_id":"fc7e4a71-5ff1-4006-95bb-7fd9e4adaef9",
* "bank_id":"gh.29.uk.x",
* "account_id":"private_01",
* "amount":"50.00 EUR",
* "balance":"739.00 EUR"
* }
* Please note it's empty in case of GET
* @return HttpRequest object
*/
def composeRequest(uri: String, method: String, httpProtocol: String, json: Option[String]): Request = {
val jsonType = MediaType.parse("application/json; charset=utf-8");
val body = RequestBody.create(jsonType, json.getOrElse(""))
method match {
case m: String if m.toUpperCase == "GET" =>
HttpRequest(uri = uri, method = GET, protocol = getHttpProtocol(httpProtocol))
new Request.Builder().url(uri).build
case m: String if m.toUpperCase == "POST" =>
HttpRequest(uri = uri, method = POST, entity = entity, protocol = getHttpProtocol(httpProtocol))
new Request.Builder().url(uri).post(body).build
case m: String if m.toUpperCase == "PUT" =>
HttpRequest(uri = uri, method = PUT, entity = entity, protocol = getHttpProtocol(httpProtocol))
new Request.Builder().url(uri).put(body).build
case m: String if m.toUpperCase == "DELETE" =>
HttpRequest(uri = uri, method = DELETE, entity = entity, protocol = getHttpProtocol(httpProtocol))
new Request.Builder().url(uri).delete(body).build
case _ =>
HttpRequest(uri = uri, method = GET, protocol = getHttpProtocol(httpProtocol))
new Request.Builder().url(uri).build
}
}
private def getHttpProtocol(httpProtocol: String): HttpProtocol = {
httpProtocol match {
case m: String if m.toUpperCase == "HTTP/1.0" => HttpProtocols.`HTTP/1.0`
case m: String if m.toUpperCase == "HTTP/1.1" => HttpProtocols.`HTTP/1.1`
case m: String if m.toUpperCase == "HTTP/2.0" => HttpProtocols.`HTTP/2.0`
case _ => HttpProtocols.`HTTP/1.1`
}
}
private def logEvent(request: WebhookRequestTrait): Unit = {
logger.debug("TRIGGER: " + request.trigger)
@ -173,73 +157,11 @@ object WebhookHttpClient extends MdcLoggable {
logger.debug("RELATED_ENTITIES: " + request.asInstanceOf[AccountNotificationWebhookRequest].relatedEntities)
}else{
}
}
lazy implicit val system = ObpLookupSystem.obpLookupSystem
implicit lazy val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit lazy val executionContext = system.dispatcher
private def makeRequest(httpRequest: HttpRequest, request: WebhookRequestTrait): Unit = {
makeHttpRequest(httpRequest).onComplete {
case Success(res@HttpResponse(status, headers, entity, protocol)) =>
WebhookAction.webhookResponse(status.toString(), request)
res.discardEntityBytes()
case Failure(error) =>
WebhookAction.webhookFailure(error.getMessage, request)
}
}
private lazy val poolSettingsWithHttpsProxy =
ConnectionPoolSettings.apply(system)
/*
# The minimum duration to backoff new connection attempts after the previous connection attempt failed.
#
# The pool uses an exponential randomized backoff scheme. After the first failure, the next attempt will only be
# tried after a random duration between the base connection backoff and twice the base connection backoff. If that
# attempt fails as well, the next attempt will be delayed by twice that amount. The total delay is capped using the
# `max-connection-backoff` setting.
#
# The backoff applies for the complete pool. I.e. after one failed connection attempt, further connection attempts
# to that host will backoff for all connections of the pool. After the service recovered, connections will come out
# of backoff one by one due to the random extra backoff time. This is to avoid overloading just recently recovered
# services with new connections ("thundering herd").
#
# Example: base-connection-backoff = 100ms, max-connection-backoff = 10 seconds
# - After 1st failure, backoff somewhere between 100ms and 200ms
# - After 2nd, between 200ms and 400ms
# - After 3rd, between 200ms and 400ms
# - After 4th, between 400ms and 800ms
# - After 5th, between 800ms and 1600ms
# - After 6th, between 1600ms and 3200ms
# - After 7th, between 3200ms and 6400ms
# - After 8th, between 5000ms and 10 seconds (max capped by max-connection-backoff, min by half of that)
# - After 9th, etc., stays between 5000ms and 10 seconds
#
# This setting only applies to the new pool implementation and is ignored for the legacy one.
*/
.withBaseConnectionBackoff(1.second)
/*
# Maximum backoff duration between failed connection attempts. For more information see the above comment for the
# `base-connection-backoff` setting.
#
# This setting only applies to the new pool implementation and is ignored for the legacy one.
*/
.withMaxConnectionBackoff(1.minute)
/*
# The maximum number of times failed requests are attempted again,
# (if the request can be safely retried) before giving up and returning an error.
# Set to zero to completely disable request retries.
*/
.withMaxRetries(5)
private def makeHttpRequest(httpRequest: HttpRequest): Future[HttpResponse] =
Http().singleRequest(request = httpRequest, settings = poolSettingsWithHttpsProxy)
def main(args: Array[String]): Unit = {
val uri = "https://www.openbankproject.com"
val uri = "https://publicobject.com/helloworld.txt"
val request = WebhookRequest(
trigger=ApiTrigger.onBalanceChange ,
eventId="418044f2-f74e-412f-a4e1-a78cdacdef9c",
@ -248,34 +170,25 @@ object WebhookHttpClient extends MdcLoggable {
amount="10000",
balance="21000"
)
makeRequest(getHttpRequest(uri, "GET", "HTTP/1.1"), request)
makeAsynchronousRequest(
composeRequest(uri, "GET", "HTTP/1.1", None),
request
)
implicit val formats = CustomJsonFormats.formats
case class User(name: String, job: String)
val user = User("morpheus", "leader")
val json = liftweb.json.compactRender(Extraction.decompose(user))
val entity: RequestEntity = HttpEntity(ContentTypes.`application/json`, json)
makeHttpRequest(getHttpRequest("https://reqres.in/api/users", "POST", "HTTP/1.1", entity)) map {
`POST response` =>
org.scalameta.logger.elem(`POST response`.status)
`POST response`.entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
val `Got POST response, body: ` = body.utf8String
org.scalameta.logger.elem(`Got POST response, body: `)
}
}
makeAsynchronousRequest(
composeRequest("https://reqres.in/api/users", "POST", "HTTP/1.1", Some(json)),
request)
val user2 = User("morpheus", "zion resident")
val json2 = liftweb.json.compactRender(Extraction.decompose(user2))
val entity2: RequestEntity = HttpEntity(ContentTypes.`application/json`, json2)
makeHttpRequest(getHttpRequest("https://reqres.in/api/users", "PUT", "HTTP/1.1", entity2)) map {
`PUT response` =>
org.scalameta.logger.elem(`PUT response`.status)
`PUT response`.entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
val `Got PUT response, body: ` = body.utf8String
org.scalameta.logger.elem(`Got PUT response, body: `)
}
}
makeAsynchronousRequest(
composeRequest("https://reqres.in/api/users/2", "PUT", "HTTP/1.1", Some(json2)),
request
)
}
}