feature/RabitMQ adapter tweaks

This commit is contained in:
Marko Milić 2026-01-28 16:48:52 +01:00
parent a81e208cb1
commit de9b8ae8a5
2 changed files with 29 additions and 9 deletions

View File

@ -1047,6 +1047,8 @@ featured_apis=elasticSearchWarehouseV300
# rabbitmq_connector.username=obp
# rabbitmq_connector.password=obp
# rabbitmq_connector.virtual_host=/
# rabbitmq_connector.request_queue=obp_rpc_queue
# rabbitmq_connector.response_queue_prefix=obp_reply_queue
# -- RabbitMQ Adapter --------------------------------------------
#rabbitmq.adapter.enabled=false

View File

@ -53,8 +53,8 @@ object RabbitMQUtils extends MdcLoggable{
private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats
val RPC_QUEUE_NAME: String = "obp_rpc_queue"
val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = "obp_reply_queue"
val RPC_QUEUE_NAME: String = APIUtil.getPropsValue("rabbitmq_connector.request_queue", "obp_rpc_queue")
val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = APIUtil.getPropsValue("rabbitmq_connector.response_queue_prefix", "obp_reply_queue")
class ResponseCallback(val rabbitCorrelationId: String, channel: Channel) extends DeliverCallback {
@ -92,14 +92,30 @@ object RabbitMQUtils extends MdcLoggable{
val rabbitRequestJsonString: String = write(outBound) // convert OutBound to json string
val connection = RabbitMQConnectionPool.borrowConnection()
// Check if queue already exists using a temporary channel (passive declare closes channel on failure)
val queueExists = try {
val tempChannel = connection.createChannel()
try {
tempChannel.queueDeclarePassive(RPC_QUEUE_NAME)
true
} finally {
if (tempChannel.isOpen) tempChannel.close()
}
} catch {
case _: java.io.IOException => false
}
val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message.
channel.queueDeclare(
RPC_QUEUE_NAME, // Queue name
true, // durable: non-persis, here set durable = true
false, // exclusive: non-excl4, here set exclusive = false
false, // autoDelete: delete, here set autoDelete = false
rpcQueueArgs // extra arguments,
)
// Only declare queue if it doesn't already exist (avoids argument conflicts with external adapters)
if (!queueExists) {
channel.queueDeclare(
RPC_QUEUE_NAME, // Queue name
true, // durable: non-persis, here set durable = true
false, // exclusive: non-excl4, here set exclusive = false
false, // autoDelete: delete, here set autoDelete = false
rpcQueueArgs // extra arguments,
)
}
val replyQueueName:String = channel.queueDeclare(
s"${RPC_REPLY_TO_QUEUE_NAME_PREFIX}_${messageId.replace("obp_","")}_${UUID.randomUUID.toString}", // Queue name, it will be a unique name for each queue
@ -112,6 +128,7 @@ object RabbitMQUtils extends MdcLoggable{
val rabbitResponseJsonFuture = {
try {
logger.debug(s"${RabbitMQConnector_vOct2024.toString} outBoundJson: $messageId = $rabbitRequestJsonString")
logger.info(s"[RabbitMQ] Sending message to queue: $RPC_QUEUE_NAME, messageId: $messageId, replyTo: $replyQueueName")
val rabbitMQCorrelationId = UUID.randomUUID().toString
val rabbitMQProps = new BasicProperties.Builder()
@ -121,6 +138,7 @@ object RabbitMQUtils extends MdcLoggable{
.replyTo(replyQueueName)
.build()
channel.basicPublish("", RPC_QUEUE_NAME, rabbitMQProps, rabbitRequestJsonString.getBytes("UTF-8"))
logger.info(s"[RabbitMQ] Message published, correlationId: $rabbitMQCorrelationId, waiting for response on: $replyQueueName")
val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel)
channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback)