From de9b8ae8a556590adc77228c8cfc15f86ead00e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Wed, 28 Jan 2026 16:48:52 +0100 Subject: [PATCH 01/10] feature/RabitMQ adapter tweaks --- .../resources/props/sample.props.template | 2 ++ .../rabbitmq/RabbitMQUtils.scala | 36 ++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/obp-api/src/main/resources/props/sample.props.template b/obp-api/src/main/resources/props/sample.props.template index c0f151d83..77a420868 100644 --- a/obp-api/src/main/resources/props/sample.props.template +++ b/obp-api/src/main/resources/props/sample.props.template @@ -1047,6 +1047,8 @@ featured_apis=elasticSearchWarehouseV300 # rabbitmq_connector.username=obp # rabbitmq_connector.password=obp # rabbitmq_connector.virtual_host=/ +# rabbitmq_connector.request_queue=obp_rpc_queue +# rabbitmq_connector.response_queue_prefix=obp_reply_queue # -- RabbitMQ Adapter -------------------------------------------- #rabbitmq.adapter.enabled=false diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index 52d0b1975..d32137991 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -53,8 +53,8 @@ object RabbitMQUtils extends MdcLoggable{ private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats - val RPC_QUEUE_NAME: String = "obp_rpc_queue" - val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = "obp_reply_queue" + val RPC_QUEUE_NAME: String = APIUtil.getPropsValue("rabbitmq_connector.request_queue", "obp_rpc_queue") + val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = APIUtil.getPropsValue("rabbitmq_connector.response_queue_prefix", "obp_reply_queue") class ResponseCallback(val rabbitCorrelationId: String, channel: Channel) extends DeliverCallback { @@ -92,14 +92,30 @@ object RabbitMQUtils extends MdcLoggable{ val rabbitRequestJsonString: String = write(outBound) // convert OutBound to json string val connection = RabbitMQConnectionPool.borrowConnection() + // Check if queue already exists using a temporary channel (passive declare closes channel on failure) + val queueExists = try { + val tempChannel = connection.createChannel() + try { + tempChannel.queueDeclarePassive(RPC_QUEUE_NAME) + true + } finally { + if (tempChannel.isOpen) tempChannel.close() + } + } catch { + case _: java.io.IOException => false + } + val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message. - channel.queueDeclare( - RPC_QUEUE_NAME, // Queue name - true, // durable: non-persis, here set durable = true - false, // exclusive: non-excl4, here set exclusive = false - false, // autoDelete: delete, here set autoDelete = false - rpcQueueArgs // extra arguments, - ) + // Only declare queue if it doesn't already exist (avoids argument conflicts with external adapters) + if (!queueExists) { + channel.queueDeclare( + RPC_QUEUE_NAME, // Queue name + true, // durable: non-persis, here set durable = true + false, // exclusive: non-excl4, here set exclusive = false + false, // autoDelete: delete, here set autoDelete = false + rpcQueueArgs // extra arguments, + ) + } val replyQueueName:String = channel.queueDeclare( s"${RPC_REPLY_TO_QUEUE_NAME_PREFIX}_${messageId.replace("obp_","")}_${UUID.randomUUID.toString}", // Queue name, it will be a unique name for each queue @@ -112,6 +128,7 @@ object RabbitMQUtils extends MdcLoggable{ val rabbitResponseJsonFuture = { try { logger.debug(s"${RabbitMQConnector_vOct2024.toString} outBoundJson: $messageId = $rabbitRequestJsonString") + logger.info(s"[RabbitMQ] Sending message to queue: $RPC_QUEUE_NAME, messageId: $messageId, replyTo: $replyQueueName") val rabbitMQCorrelationId = UUID.randomUUID().toString val rabbitMQProps = new BasicProperties.Builder() @@ -121,6 +138,7 @@ object RabbitMQUtils extends MdcLoggable{ .replyTo(replyQueueName) .build() channel.basicPublish("", RPC_QUEUE_NAME, rabbitMQProps, rabbitRequestJsonString.getBytes("UTF-8")) + logger.info(s"[RabbitMQ] Message published, correlationId: $rabbitMQCorrelationId, waiting for response on: $replyQueueName") val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel) channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback) From 4ad712e21794c4306128e38fe3496667b25ba5af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 3 Feb 2026 14:31:09 +0100 Subject: [PATCH 02/10] feature/check resource_docs_requires_role props is applied consistently across all swagger / openapi / yaml --- .../ResourceDocs1_4_0/ResourceDocs140.scala | 4 +- .../ResourceDocsAPIMethods.scala | 20 +++++ .../ResourceDocs1_4_0/SwaggerDocsTest.scala | 84 ++++++++++++++++++- .../scala/code/setup/SendServerRequests.scala | 20 +++-- 4 files changed, 119 insertions(+), 9 deletions(-) diff --git a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala index bec0f2ade..4f4e46793 100644 --- a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala +++ b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala @@ -2,7 +2,7 @@ package code.api.ResourceDocs1_4_0 import scala.language.reflectiveCalls import code.api.Constant.HostName -import code.api.OBPRestHelper +import code.api.{OBPRestHelper, ResponseHeader} import code.api.cache.Caching import code.api.util.APIUtil._ import code.api.util.{APIUtil, ApiVersionUtils, YAMLUtils} @@ -236,7 +236,7 @@ object ResourceDocs300 extends OBPRestHelper with ResourceDocsAPIMethods with Md yamlResult } - val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType) + val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) val bytes = yamlString.getBytes("UTF-8") InMemoryResponse(bytes, headers, Nil, 200) } diff --git a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocsAPIMethods.scala b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocsAPIMethods.scala index 648800604..26822fb7f 100644 --- a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocsAPIMethods.scala +++ b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocsAPIMethods.scala @@ -677,6 +677,16 @@ trait ResourceDocsAPIMethods extends MdcLoggable with APIMethods220 with APIMeth implicit val ec = EndpointContext(Some(cc)) val (resourceDocTags, partialFunctions, locale, contentParam, apiCollectionIdParam) = ResourceDocsAPIMethodsUtil.getParams() for { + (u: Box[User], callContext: Option[CallContext]) <- if (resourceDocsRequireRole) { + authenticatedAccess(cc) + } else { + anonymousAccess(cc) + } + _ <- if (resourceDocsRequireRole) { + NewStyle.function.hasAtLeastOneEntitlement(failMsg = UserHasMissingRoles + canReadResourceDoc.toString)("", u.map(_.userId).getOrElse(""), ApiRole.canReadResourceDoc :: Nil, cc.callContext) + } else { + Future(()) + } requestedApiVersion <- NewStyle.function.tryons(s"$InvalidApiVersionString Current Version is $requestedApiVersionString", 400, cc.callContext) { ApiVersionUtils.valueOf(requestedApiVersionString) } @@ -871,6 +881,16 @@ trait ResourceDocsAPIMethods extends MdcLoggable with APIMethods220 with APIMeth } else { Future.successful(true) } + (u: Box[User], callContext: Option[CallContext]) <- if (resourceDocsRequireRole) { + authenticatedAccess(cc) + } else { + anonymousAccess(cc) + } + _ <- if (resourceDocsRequireRole) { + NewStyle.function.hasAtLeastOneEntitlement(failMsg = UserHasMissingRoles + canReadResourceDoc.toString)("", u.map(_.userId).getOrElse(""), ApiRole.canReadResourceDoc :: Nil, cc.callContext) + } else { + Future(()) + } requestedApiVersion <- NewStyle.function.tryons(s"$InvalidApiVersionString Current Version is $requestedApiVersionString", 400, cc.callContext) { ApiVersionUtils.valueOf(requestedApiVersionString) } diff --git a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala index b96a1acf3..b07f3024e 100644 --- a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala +++ b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala @@ -1,9 +1,12 @@ package code.api.ResourceDocs1_4_0 import code.api.ResourceDocs1_4_0.ResourceDocs140.ImplementationsResourceDocs +import code.api.util.ErrorMessages.{AuthenticatedUserIsRequired, UserHasMissingRoles} import code.api.util.{ApiRole, CustomJsonFormats} import code.setup.{DefaultUsers, PropsReset} import com.github.dwickern.macros.NameOf.nameOf +import code.api.util.APIUtil.OAuth._ +import code.entitlement.Entitlement import com.openbankproject.commons.util.{ApiVersion, Functions} import io.swagger.parser.OpenAPIParser import net.liftweb.json @@ -256,4 +259,83 @@ class SwaggerDocsTest extends ResourceDocsV140ServerSetup with PropsReset with D (errors, warnings, allMessages) } -} + + // Additional tests to verify that the Swagger/OpenAPI endpoints respect the resource_docs_requires_role prop. + // These are minimal checks that mirror the behaviour validated elsewhere (Lift/http4s tests). + feature(s"Swagger & OpenAPI access control for resource_docs_requires_role") { + scenario("Swagger - public access when resource_docs_requires_role is false", ApiEndpoint1, VersionOfApi) { + setPropsValues( + "resource_docs_requires_role" -> "false", + ) + val requestGetSwagger = (ResourceDocsV5_1Request / "resource-docs" / "v5.1.0" / "swagger").GET + val responseGetSwagger = makeGetRequest(requestGetSwagger) + responseGetSwagger.code should equal(200) + } + + scenario("Swagger - unauthenticated rejected when resource_docs_requires_role is true", ApiEndpoint1, VersionOfApi) { + setPropsValues( + "resource_docs_requires_role" -> "true", + ) + val requestGetSwagger = (ResourceDocsV5_1Request / "resource-docs" / "v5.1.0" / "swagger").GET + val responseGetSwagger = makeGetRequest(requestGetSwagger) + // Lift endpoints typically return 401 with AuthenticatedUserIsRequired message when auth required + responseGetSwagger.code should equal(401) + responseGetSwagger.body.toString should include(AuthenticatedUserIsRequired) + } + + scenario("Swagger - authenticated but missing role gets 403", ApiEndpoint1, VersionOfApi) { + setPropsValues( + "resource_docs_requires_role" -> "true", + ) + val requestGetSwagger = (ResourceDocsV5_1Request / "resource-docs" / "v5.1.0" / "swagger").GET <@ (user1) + val responseGetSwagger = makeGetRequest(requestGetSwagger) + responseGetSwagger.code should equal(403) + responseGetSwagger.body.toString should include(UserHasMissingRoles) + responseGetSwagger.body.toString should include(ApiRole.canReadResourceDoc.toString()) + } + + scenario("Swagger - authenticated and entitled canReadResourceDoc returns 200", ApiEndpoint1, VersionOfApi) { + setPropsValues( + "resource_docs_requires_role" -> "true", + ) + // grant the entitlement to the resource user used in tests + Entitlement.entitlement.vend.addEntitlement("", resourceUser1.userId, ApiRole.canReadResourceDoc.toString) + val requestGetSwagger = (ResourceDocsV5_1Request / "resource-docs" / "v5.1.0" / "swagger").GET <@ (user1) + val responseGetSwagger = makeGetRequest(requestGetSwagger) + responseGetSwagger.code should equal(200) + } + + // OpenAPI JSON checks (v6.0.0 used elsewhere for OpenAPI tests) + scenario("OpenAPI JSON - public access when resource_docs_requires_role is false", ApiEndpoint1, VersionOfApi) { + setPropsValues( + "resource_docs_requires_role" -> "false", + ) + val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET + val responseGetOpenAPI = makeGetRequest(requestGetOpenAPI) + responseGetOpenAPI.code should equal(200) + } + + scenario("OpenAPI JSON - unauthenticated rejected when resource_docs_requires_role is true", ApiEndpoint1, VersionOfApi) { + setPropsValues( + "resource_docs_requires_role" -> "true", + ) + val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET + val responseGetOpenAPI = makeGetRequest(requestGetOpenAPI) + responseGetOpenAPI.code should equal(401) + responseGetOpenAPI.body.toString should include(AuthenticatedUserIsRequired) + } + + scenario("OpenAPI YAML - raw response: public access when resource_docs_requires_role is false", ApiEndpoint1, VersionOfApi) { + setPropsValues( + "resource_docs_requires_role" -> "false", + ) + val requestGetOpenAPIYAML = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi.yaml").GET + val responseGetOpenAPIYAML = makeGetRequest(requestGetOpenAPIYAML) + responseGetOpenAPIYAML.code should equal(200) + // body should be non-empty YAML + responseGetOpenAPIYAML.body.toString.trim.nonEmpty should be (true) + } + + } + +} diff --git a/obp-api/src/test/scala/code/setup/SendServerRequests.scala b/obp-api/src/test/scala/code/setup/SendServerRequests.scala index a3a8325df..e16909f82 100644 --- a/obp-api/src/test/scala/code/setup/SendServerRequests.scala +++ b/obp-api/src/test/scala/code/setup/SendServerRequests.scala @@ -190,12 +190,20 @@ trait SendServerRequests { case _ => } - val parsedBody = tryo { - parse(body) - } - parsedBody match { - case Full(b) => APIResponse(response.getStatusCode, b, Some(response.getHeaders())) - case _ => throw new Exception(s"couldn't parse response from ${req.url} : $body") + // Handle YAML responses: don't try to parse as JSON. Wrap YAML as a JString so tests + // that expect a JValue can still receive the body. + val contentTypeList = response.getHeaders("Content-Type").asScala.toList.map(_.toLowerCase) + val isYaml = contentTypeList.exists(_.contains("yaml")) + if (isYaml) { + APIResponse(response.getStatusCode, JString(body), Some(response.getHeaders())) + } else { + val parsedBody = tryo { + parse(body) + } + parsedBody match { + case Full(b) => APIResponse(response.getStatusCode, b, Some(response.getHeaders())) + case _ => throw new Exception(s"couldn't parse response from ${req.url} : $body") + } } } } From f417982d1f206ceb4179747310426677ad177714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 3 Feb 2026 16:12:55 +0100 Subject: [PATCH 03/10] refactor/Refine log at tests --- .../scala/code/setup/SendServerRequests.scala | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/obp-api/src/test/scala/code/setup/SendServerRequests.scala b/obp-api/src/test/scala/code/setup/SendServerRequests.scala index e16909f82..122db2e2a 100644 --- a/obp-api/src/test/scala/code/setup/SendServerRequests.scala +++ b/obp-api/src/test/scala/code/setup/SendServerRequests.scala @@ -186,7 +186,28 @@ trait SendServerRequests { // Check that every response has a correlationId at Response Header val list = response.getHeaders(ResponseHeader.`Correlation-Id`).asScala.toList list match { - case Nil => throw new Exception(s"There is no ${ResponseHeader.`Correlation-Id`} in response header. Couldn't parse response from ${req.url} : $body") + case Nil => + // Improve diagnostic information: include HTTP status, all response headers and a snippet of the body. + val status = response.getStatusCode + val headersStr = try { + // response.getHeaders().entries() returns a Java collection of header entries + response.getHeaders().entries().asScala.map(h => s"${h.getKey}: ${h.getValue}").mkString(", ") + } catch { + case _: Throwable => "unable to read headers" + } + val bodySnippet = if (body == null) { + "" + } else { + val maxLen = 1000 + if (body.length > maxLen) body.take(maxLen) + "..." else body + } + throw new Exception( + s"""There is no ${ResponseHeader.`Correlation-Id`} in response header. + |Couldn't parse response from ${req.url} + |status=$status + |headers=[$headersStr] + |body-snippet=${bodySnippet}""".stripMargin + ) case _ => } From 77b10d75408ab8076922e39e3fdc445b02e2fc99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 3 Feb 2026 17:07:56 +0100 Subject: [PATCH 04/10] feature/Set workflow JAVA_TOOL_OPTIONS: -Xmx3G --- .github/workflows/build_container.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build_container.yml b/.github/workflows/build_container.yml index 8a027bbb9..df5dd3339 100644 --- a/.github/workflows/build_container.yml +++ b/.github/workflows/build_container.yml @@ -7,6 +7,7 @@ on: [push] env: DOCKER_HUB_ORGANIZATION: ${{ vars.DOCKER_HUB_ORGANIZATION }} DOCKER_HUB_REPOSITORY: obp-api + JAVA_TOOL_OPTIONS: -Xmx3G -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp jobs: build: From e6009facca984afb697f0488adf364c389adbe8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 3 Feb 2026 18:45:25 +0100 Subject: [PATCH 05/10] refactor/Implement a streaming YAML generation to avoid large memory spikes --- .../ResourceDocs1_4_0/ResourceDocs140.scala | 43 ++++-- .../main/scala/code/api/util/YAMLUtils.scala | 127 ++++++++++++++++-- 2 files changed, 148 insertions(+), 22 deletions(-) diff --git a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala index 4f4e46793..9ac04c42a 100644 --- a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala +++ b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala @@ -11,7 +11,7 @@ import code.apicollectionendpoint.MappedApiCollectionEndpointsProvider import code.util.Helper.{MdcLoggable, SILENCE_IS_GOLDEN} import com.openbankproject.commons.model.enums.ContentParam.{DYNAMIC, STATIC} import com.openbankproject.commons.util.{ApiVersion, ApiVersionStatus} -import net.liftweb.http.{GetRequest, InMemoryResponse, PlainTextResponse, Req, S} +import net.liftweb.http.{GetRequest, InMemoryResponse, PlainTextResponse, Req, S, StreamingResponse} object ResourceDocs140 extends OBPRestHelper with ResourceDocsAPIMethods with MdcLoggable { @@ -198,10 +198,15 @@ object ResourceDocs300 extends OBPRestHelper with ResourceDocsAPIMethods with Md ) val cacheValueFromRedis = Caching.getStaticSwaggerDocCache(cacheKey) - val yamlString = if (cacheValueFromRedis.isDefined) { - cacheValueFromRedis.get + if (cacheValueFromRedis.isDefined) { + // If we already have a cached YAML string, serve it as before. + val yamlString = cacheValueFromRedis.get + val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) + val bytes = yamlString.getBytes("UTF-8") + InMemoryResponse(bytes, headers, Nil, 200) } else { - // Generate OpenAPI JSON and convert to YAML + // Generate OpenAPI JSON JValue (this may be large) but stream YAML generation to avoid + // building a huge YAML string in memory. val openApiJValue = try { val resourceDocsJsonFiltered = locale match { case _ if (apiCollectionIdParam.isDefined) => @@ -231,14 +236,30 @@ object ResourceDocs300 extends OBPRestHelper with ResourceDocsAPIMethods with Md throw e } - val yamlResult = YAMLUtils.jValueToYAMLSafe(openApiJValue, s"# Error converting OpenAPI to YAML: ${openApiJValue.toString}") - Caching.setStaticSwaggerDocCache(cacheKey, yamlResult) - yamlResult + // Attempt to obtain an InputStream that streams YAML + YAMLUtils.jValueToYAMLInputStream(openApiJValue) match { + case scala.util.Success(in) => + val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) + // StreamingResponse takes a function that returns an InputStream when called by Lift + // StreamingResponse constructor expects: data, onEnd, size, headers, cookies, code + // Provide the InputStream directly as `data`, an onEnd that closes the stream, + // -1L for unknown size, the headers, empty cookies list, and HTTP 200 code. + StreamingResponse(in, () => { try { in.close() } catch { case _: Throwable => () } }, -1L, headers, Nil, 200) + case scala.util.Failure(e) => + logger.error(s"Error streaming OpenAPI YAML: ${e.getMessage}", e) + // Fallback: try a safe conversion to a string (may be memory heavy) and return that, or an error. + // We attempt a safe string conversion as a last resort. + val yamlResult = YAMLUtils.jValueToYAMLSafe(openApiJValue, s"# Error converting OpenAPI to YAML: ${openApiJValue.toString}") + if (yamlResult.nonEmpty) { + Caching.setStaticSwaggerDocCache(cacheKey, yamlResult) + val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) + val bytes = yamlResult.getBytes("UTF-8") + InMemoryResponse(bytes, headers, Nil, 200) + } else { + PlainTextResponse(s"Error generating OpenAPI YAML: ${e.getMessage}", 500) + } + } } - - val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) - val bytes = yamlString.getBytes("UTF-8") - InMemoryResponse(bytes, headers, Nil, 200) } } catch { case _: Exception => diff --git a/obp-api/src/main/scala/code/api/util/YAMLUtils.scala b/obp-api/src/main/scala/code/api/util/YAMLUtils.scala index 16714ee50..ce7a00eab 100644 --- a/obp-api/src/main/scala/code/api/util/YAMLUtils.scala +++ b/obp-api/src/main/scala/code/api/util/YAMLUtils.scala @@ -28,17 +28,22 @@ package code.api.util import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import com.fasterxml.jackson.dataformat.yaml.YAMLFactory -import net.liftweb.json.JsonAST.JValue +import com.fasterxml.jackson.core.{JsonGenerator, JsonFactory} +import net.liftweb.json.JsonAST.{JObject, JArray, JBool, JNull, JNothing, JDouble, JInt, JString, JField, JValue} import net.liftweb.json._ import net.liftweb.json.compactRender import code.util.Helper.MdcLoggable import scala.util.{Try, Success, Failure} +import java.io.{OutputStream, InputStream, PipedInputStream, PipedOutputStream} +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global /** * Utility object for YAML conversion operations * * This utility provides methods to convert Lift's JValue objects to YAML format - * using Jackson's YAML support. + * using Jackson's YAML support. It provides both simple string-based conversion + * and streaming conversion APIs to avoid building huge intermediate strings. */ object YAMLUtils extends MdcLoggable { @@ -46,21 +51,121 @@ object YAMLUtils extends MdcLoggable { private val yamlMapper = new ObjectMapper(new YAMLFactory()) /** - * Converts a JValue to YAML string + * Convert a Lift JValue by writing it token-by-token to a Jackson JsonGenerator. + * This avoids creating a very large intermediate JSON string. + */ + private def writeJValueToGenerator(gen: JsonGenerator, j: JValue): Unit = { + j match { + case JObject(fields) => + gen.writeStartObject() + fields.foreach { + case JField(name, value) => + gen.writeFieldName(name) + writeJValueToGenerator(gen, value) + } + gen.writeEndObject() + case JArray(items) => + gen.writeStartArray() + items.foreach(item => writeJValueToGenerator(gen, item)) + gen.writeEndArray() + case JString(s) => + gen.writeString(s) + case JInt(num) => + // Jackson supports BigInteger via writeNumber(String) + gen.writeNumber(num.toString) + case JDouble(d) => + gen.writeNumber(d) + // JDecimal is not available in this Lift version; high-precision decimals will + // fall through to the fallback case (written via compactRender) or be represented + // as JDouble/JInt depending on creation site. + case JBool(b) => + gen.writeBoolean(b) + case JNull | JNothing => + gen.writeNull() + case other => + // fallback: write compact rendering as string + gen.writeString(compactRender(other)) + } + } + + /** + * Stream a JValue as YAML into a supplied OutputStream. + * The caller is responsible for closing the OutputStream when appropriate. + * + * @param jValue the JValue to serialize + * @param out the OutputStream to write YAML bytes to + * @return Try[Unit] indicating success or failure + */ + def jValueToYAMLStream(jValue: JValue, out: OutputStream): Try[Unit] = { + Try { + val gen = yamlMapper.getFactory.createGenerator(out) + try { + writeJValueToGenerator(gen, jValue) + gen.flush() + } finally { + // Do not close the provided OutputStream here; just close the generator + try { gen.close() } catch { case _: Throwable => } + } + }.recoverWith { + case ex: Exception => + logger.error(s"Failed to stream JValue to YAML: ${ex.getMessage}", ex) + Failure(new RuntimeException(s"YAML streaming failed: ${ex.getMessage}", ex)) + } + } + + /** + * Provide an InputStream that streams YAML representation of the provided JValue. + * Writing is performed on a background thread into a PipedOutputStream connected to + * the returned PipedInputStream. Caller must close the InputStream when done. + * + * @param jValue the JValue to serialize + * @return Try[InputStream] that will yield the YAML bytes + */ + def jValueToYAMLInputStream(jValue: JValue): Try[InputStream] = { + Try { + val in = new PipedInputStream(64 * 1024) + val out = new PipedOutputStream(in) + // Write in a background thread so the caller can read as we generate + val writerThread = new Thread(new Runnable { + override def run(): Unit = { + try { + jValueToYAMLStream(jValue, out) match { + case Success(_) => // done + case Failure(e) => + // attempt to write an error message into the stream so the reader sees something useful + try { + val msg = s"# Error generating YAML: ${e.getMessage}\n" + out.write(msg.getBytes("UTF-8")) + } catch { case _: Throwable => } + } + } finally { + try { out.close() } catch { case _: Throwable => } + } + } + }, "yaml-stream-writer") + writerThread.setDaemon(true) + writerThread.start() + in + }.recoverWith { + case ex: Exception => + logger.error(s"Failed to create YAML InputStream: ${ex.getMessage}", ex) + Failure(new RuntimeException(s"Failed to create YAML InputStream: ${ex.getMessage}", ex)) + } + } + + /** + * Converts a JValue to YAML string (keeps compatibility). This method uses the streaming + * generator internally but still accumulates into a String (for callers that need a String). + * Prefer streaming APIs for large documents. * * @param jValue The Lift JValue to convert * @return Try containing the YAML string or error */ def jValueToYAML(jValue: JValue): Try[String] = { Try { - // First convert JValue to JSON string - val jsonString = compactRender(jValue) - - // Parse JSON string to Jackson JsonNode - val jsonNode: JsonNode = jsonMapper.readTree(jsonString) - - // Convert JsonNode to YAML string - yamlMapper.writeValueAsString(jsonNode) + val baos = new java.io.ByteArrayOutputStream() + jValueToYAMLStream(jValue, baos).get + baos.toString("UTF-8") }.recoverWith { case ex: Exception => logger.error(s"Failed to convert JValue to YAML: ${ex.getMessage}", ex) From 1d62f2d6f5179fc5ca3e44aad0a661273969b823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 3 Feb 2026 18:45:41 +0100 Subject: [PATCH 06/10] Revert "feature/Set workflow JAVA_TOOL_OPTIONS: -Xmx3G" This reverts commit 77b10d75408ab8076922e39e3fdc445b02e2fc99. --- .github/workflows/build_container.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/build_container.yml b/.github/workflows/build_container.yml index df5dd3339..8a027bbb9 100644 --- a/.github/workflows/build_container.yml +++ b/.github/workflows/build_container.yml @@ -7,7 +7,6 @@ on: [push] env: DOCKER_HUB_ORGANIZATION: ${{ vars.DOCKER_HUB_ORGANIZATION }} DOCKER_HUB_REPOSITORY: obp-api - JAVA_TOOL_OPTIONS: -Xmx3G -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp jobs: build: From c907e1744b8e0234849c16438e1a7bd731799361 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Wed, 4 Feb 2026 07:40:45 +0100 Subject: [PATCH 07/10] test/Limit openapi yaml response to tags=Consumer --- .../scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala index b07f3024e..34e4cc5c9 100644 --- a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala +++ b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala @@ -311,7 +311,8 @@ class SwaggerDocsTest extends ResourceDocsV140ServerSetup with PropsReset with D "resource_docs_requires_role" -> "false", ) val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET - val responseGetOpenAPI = makeGetRequest(requestGetOpenAPI) + val params = ("tags", "Consumer") :: Nil + val responseGetOpenAPI = makeGetRequest(requestGetOpenAPI, params) responseGetOpenAPI.code should equal(200) } From 0db2888d61d9b037cbcceabc93feea69cb4bded1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Wed, 4 Feb 2026 08:45:00 +0100 Subject: [PATCH 08/10] Revert "test/Limit openapi yaml response to tags=Consumer" This reverts commit c907e1744b8e0234849c16438e1a7bd731799361. --- .../scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala index 34e4cc5c9..b07f3024e 100644 --- a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala +++ b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala @@ -311,8 +311,7 @@ class SwaggerDocsTest extends ResourceDocsV140ServerSetup with PropsReset with D "resource_docs_requires_role" -> "false", ) val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET - val params = ("tags", "Consumer") :: Nil - val responseGetOpenAPI = makeGetRequest(requestGetOpenAPI, params) + val responseGetOpenAPI = makeGetRequest(requestGetOpenAPI) responseGetOpenAPI.code should equal(200) } From 8c377f3b520336f717cc8f0d74bf1f095dfcda7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Wed, 4 Feb 2026 09:15:10 +0100 Subject: [PATCH 09/10] test/Limit resource docs to openapi yaml response --- .../scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala index b07f3024e..ee3326ace 100644 --- a/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala +++ b/obp-api/src/test/scala/code/api/ResourceDocs1_4_0/SwaggerDocsTest.scala @@ -310,7 +310,7 @@ class SwaggerDocsTest extends ResourceDocsV140ServerSetup with PropsReset with D setPropsValues( "resource_docs_requires_role" -> "false", ) - val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET + val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET < "true", ) - val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET + val requestGetOpenAPI = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi").GET < "false", ) - val requestGetOpenAPIYAML = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi.yaml").GET + val requestGetOpenAPIYAML = (ResourceDocsV6_0Request / "resource-docs" / "v6.0.0" / "openapi.yaml").GET < Date: Wed, 4 Feb 2026 09:45:55 +0100 Subject: [PATCH 10/10] Revert "refactor/Implement a streaming YAML generation to avoid large memory spikes" This reverts commit e6009facca984afb697f0488adf364c389adbe8b. --- .../ResourceDocs1_4_0/ResourceDocs140.scala | 43 ++---- .../main/scala/code/api/util/YAMLUtils.scala | 127 ++---------------- 2 files changed, 22 insertions(+), 148 deletions(-) diff --git a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala index 9ac04c42a..4f4e46793 100644 --- a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala +++ b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/ResourceDocs140.scala @@ -11,7 +11,7 @@ import code.apicollectionendpoint.MappedApiCollectionEndpointsProvider import code.util.Helper.{MdcLoggable, SILENCE_IS_GOLDEN} import com.openbankproject.commons.model.enums.ContentParam.{DYNAMIC, STATIC} import com.openbankproject.commons.util.{ApiVersion, ApiVersionStatus} -import net.liftweb.http.{GetRequest, InMemoryResponse, PlainTextResponse, Req, S, StreamingResponse} +import net.liftweb.http.{GetRequest, InMemoryResponse, PlainTextResponse, Req, S} object ResourceDocs140 extends OBPRestHelper with ResourceDocsAPIMethods with MdcLoggable { @@ -198,15 +198,10 @@ object ResourceDocs300 extends OBPRestHelper with ResourceDocsAPIMethods with Md ) val cacheValueFromRedis = Caching.getStaticSwaggerDocCache(cacheKey) - if (cacheValueFromRedis.isDefined) { - // If we already have a cached YAML string, serve it as before. - val yamlString = cacheValueFromRedis.get - val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) - val bytes = yamlString.getBytes("UTF-8") - InMemoryResponse(bytes, headers, Nil, 200) + val yamlString = if (cacheValueFromRedis.isDefined) { + cacheValueFromRedis.get } else { - // Generate OpenAPI JSON JValue (this may be large) but stream YAML generation to avoid - // building a huge YAML string in memory. + // Generate OpenAPI JSON and convert to YAML val openApiJValue = try { val resourceDocsJsonFiltered = locale match { case _ if (apiCollectionIdParam.isDefined) => @@ -236,30 +231,14 @@ object ResourceDocs300 extends OBPRestHelper with ResourceDocsAPIMethods with Md throw e } - // Attempt to obtain an InputStream that streams YAML - YAMLUtils.jValueToYAMLInputStream(openApiJValue) match { - case scala.util.Success(in) => - val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) - // StreamingResponse takes a function that returns an InputStream when called by Lift - // StreamingResponse constructor expects: data, onEnd, size, headers, cookies, code - // Provide the InputStream directly as `data`, an onEnd that closes the stream, - // -1L for unknown size, the headers, empty cookies list, and HTTP 200 code. - StreamingResponse(in, () => { try { in.close() } catch { case _: Throwable => () } }, -1L, headers, Nil, 200) - case scala.util.Failure(e) => - logger.error(s"Error streaming OpenAPI YAML: ${e.getMessage}", e) - // Fallback: try a safe conversion to a string (may be memory heavy) and return that, or an error. - // We attempt a safe string conversion as a last resort. - val yamlResult = YAMLUtils.jValueToYAMLSafe(openApiJValue, s"# Error converting OpenAPI to YAML: ${openApiJValue.toString}") - if (yamlResult.nonEmpty) { - Caching.setStaticSwaggerDocCache(cacheKey, yamlResult) - val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) - val bytes = yamlResult.getBytes("UTF-8") - InMemoryResponse(bytes, headers, Nil, 200) - } else { - PlainTextResponse(s"Error generating OpenAPI YAML: ${e.getMessage}", 500) - } - } + val yamlResult = YAMLUtils.jValueToYAMLSafe(openApiJValue, s"# Error converting OpenAPI to YAML: ${openApiJValue.toString}") + Caching.setStaticSwaggerDocCache(cacheKey, yamlResult) + yamlResult } + + val headers = List("Content-Type" -> YAMLUtils.getYAMLContentType, (ResponseHeader.`Correlation-Id` -> getCorrelationId())) + val bytes = yamlString.getBytes("UTF-8") + InMemoryResponse(bytes, headers, Nil, 200) } } catch { case _: Exception => diff --git a/obp-api/src/main/scala/code/api/util/YAMLUtils.scala b/obp-api/src/main/scala/code/api/util/YAMLUtils.scala index ce7a00eab..16714ee50 100644 --- a/obp-api/src/main/scala/code/api/util/YAMLUtils.scala +++ b/obp-api/src/main/scala/code/api/util/YAMLUtils.scala @@ -28,22 +28,17 @@ package code.api.util import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import com.fasterxml.jackson.dataformat.yaml.YAMLFactory -import com.fasterxml.jackson.core.{JsonGenerator, JsonFactory} -import net.liftweb.json.JsonAST.{JObject, JArray, JBool, JNull, JNothing, JDouble, JInt, JString, JField, JValue} +import net.liftweb.json.JsonAST.JValue import net.liftweb.json._ import net.liftweb.json.compactRender import code.util.Helper.MdcLoggable import scala.util.{Try, Success, Failure} -import java.io.{OutputStream, InputStream, PipedInputStream, PipedOutputStream} -import scala.concurrent.Future -import scala.concurrent.ExecutionContext.Implicits.global /** * Utility object for YAML conversion operations * * This utility provides methods to convert Lift's JValue objects to YAML format - * using Jackson's YAML support. It provides both simple string-based conversion - * and streaming conversion APIs to avoid building huge intermediate strings. + * using Jackson's YAML support. */ object YAMLUtils extends MdcLoggable { @@ -51,121 +46,21 @@ object YAMLUtils extends MdcLoggable { private val yamlMapper = new ObjectMapper(new YAMLFactory()) /** - * Convert a Lift JValue by writing it token-by-token to a Jackson JsonGenerator. - * This avoids creating a very large intermediate JSON string. - */ - private def writeJValueToGenerator(gen: JsonGenerator, j: JValue): Unit = { - j match { - case JObject(fields) => - gen.writeStartObject() - fields.foreach { - case JField(name, value) => - gen.writeFieldName(name) - writeJValueToGenerator(gen, value) - } - gen.writeEndObject() - case JArray(items) => - gen.writeStartArray() - items.foreach(item => writeJValueToGenerator(gen, item)) - gen.writeEndArray() - case JString(s) => - gen.writeString(s) - case JInt(num) => - // Jackson supports BigInteger via writeNumber(String) - gen.writeNumber(num.toString) - case JDouble(d) => - gen.writeNumber(d) - // JDecimal is not available in this Lift version; high-precision decimals will - // fall through to the fallback case (written via compactRender) or be represented - // as JDouble/JInt depending on creation site. - case JBool(b) => - gen.writeBoolean(b) - case JNull | JNothing => - gen.writeNull() - case other => - // fallback: write compact rendering as string - gen.writeString(compactRender(other)) - } - } - - /** - * Stream a JValue as YAML into a supplied OutputStream. - * The caller is responsible for closing the OutputStream when appropriate. - * - * @param jValue the JValue to serialize - * @param out the OutputStream to write YAML bytes to - * @return Try[Unit] indicating success or failure - */ - def jValueToYAMLStream(jValue: JValue, out: OutputStream): Try[Unit] = { - Try { - val gen = yamlMapper.getFactory.createGenerator(out) - try { - writeJValueToGenerator(gen, jValue) - gen.flush() - } finally { - // Do not close the provided OutputStream here; just close the generator - try { gen.close() } catch { case _: Throwable => } - } - }.recoverWith { - case ex: Exception => - logger.error(s"Failed to stream JValue to YAML: ${ex.getMessage}", ex) - Failure(new RuntimeException(s"YAML streaming failed: ${ex.getMessage}", ex)) - } - } - - /** - * Provide an InputStream that streams YAML representation of the provided JValue. - * Writing is performed on a background thread into a PipedOutputStream connected to - * the returned PipedInputStream. Caller must close the InputStream when done. - * - * @param jValue the JValue to serialize - * @return Try[InputStream] that will yield the YAML bytes - */ - def jValueToYAMLInputStream(jValue: JValue): Try[InputStream] = { - Try { - val in = new PipedInputStream(64 * 1024) - val out = new PipedOutputStream(in) - // Write in a background thread so the caller can read as we generate - val writerThread = new Thread(new Runnable { - override def run(): Unit = { - try { - jValueToYAMLStream(jValue, out) match { - case Success(_) => // done - case Failure(e) => - // attempt to write an error message into the stream so the reader sees something useful - try { - val msg = s"# Error generating YAML: ${e.getMessage}\n" - out.write(msg.getBytes("UTF-8")) - } catch { case _: Throwable => } - } - } finally { - try { out.close() } catch { case _: Throwable => } - } - } - }, "yaml-stream-writer") - writerThread.setDaemon(true) - writerThread.start() - in - }.recoverWith { - case ex: Exception => - logger.error(s"Failed to create YAML InputStream: ${ex.getMessage}", ex) - Failure(new RuntimeException(s"Failed to create YAML InputStream: ${ex.getMessage}", ex)) - } - } - - /** - * Converts a JValue to YAML string (keeps compatibility). This method uses the streaming - * generator internally but still accumulates into a String (for callers that need a String). - * Prefer streaming APIs for large documents. + * Converts a JValue to YAML string * * @param jValue The Lift JValue to convert * @return Try containing the YAML string or error */ def jValueToYAML(jValue: JValue): Try[String] = { Try { - val baos = new java.io.ByteArrayOutputStream() - jValueToYAMLStream(jValue, baos).get - baos.toString("UTF-8") + // First convert JValue to JSON string + val jsonString = compactRender(jValue) + + // Parse JSON string to Jackson JsonNode + val jsonNode: JsonNode = jsonMapper.readTree(jsonString) + + // Convert JsonNode to YAML string + yamlMapper.writeValueAsString(jsonNode) }.recoverWith { case ex: Exception => logger.error(s"Failed to convert JValue to YAML: ${ex.getMessage}", ex)