From 92f2b2684eb60efdd4bc1c2ac609a8e7fe66057d Mon Sep 17 00:00:00 2001 From: hongwei Date: Wed, 27 Nov 2024 15:11:44 +0100 Subject: [PATCH 1/4] refactor/set the default RabbitMq queue to quorum --- .../Adapter/MockedRabbitMqAdapter.scala | 2 +- .../rabbitmq/RabbitMQUtils.scala | 33 +++++++++---------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala index bfd67dbe1..ca7646c3f 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala @@ -3100,7 +3100,7 @@ object MockedRabbitMqAdapter extends App with MdcLoggable{ connection = factory.newConnection() channel = connection.createChannel() - channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, RabbitMQUtils.args) + channel.basicQos(1) // stop after one consumed message since this is example code val serverCallback = new ServerCallback(channel) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index 7b238a8c5..fd811e4c7 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -40,6 +40,8 @@ object RabbitMQUtils extends MdcLoggable{ // automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time. args.put("x-expires", Integer.valueOf(60000)) args.put("x-message-ttl", Integer.valueOf(60000)) + args.put("x-queue-type", "quorum") + private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats @@ -82,13 +84,20 @@ object RabbitMQUtils extends MdcLoggable{ val connection = RabbitMQConnectionPool.borrowConnection() val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message. - channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, args) + channel.queueDeclare( + RPC_QUEUE_NAME, // Queue name + true, // durable: non-persis + false, // exclusive: non-excl4 + false, // autoDelete: delete + args // extra arguments + ) + val replyQueueName:String = channel.queueDeclare( - "", // Queue name - false, // durable: non-persistent - true, // exclusive: non-exclusive - true, // autoDelete: delete when no consumers - args // extra arguments + "amq.gen-"+UUID.randomUUID.toString, // Queue name, rabbitMq will create a unique name for this queue, eg: + true, // durable: non-persistent + false, // exclusive: non-exclusive + false, // autoDelete: delete when no consumers + args // extra arguments ).getQueue val rabbitResponseJsonFuture = { @@ -106,19 +115,7 @@ object RabbitMQUtils extends MdcLoggable{ val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel) channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback) -// // Add a timeout mechanism here: -// val timeout = 10 // seconds -// val start = System.currentTimeMillis() -// while (true) { -// Thread.sleep(100) -// if (System.currentTimeMillis() - start > timeout * 1000) { -// println("Request timed out") -// channel.close(); -// } -// } responseCallback.take() - - } catch { case e: Throwable =>{ logger.debug(s"${RabbitMQConnector_vOct2024.toString} inBoundJson exception: $messageId = ${e}") From 3a972f0816621b3506c2c5f88d2f0838406b2415 Mon Sep 17 00:00:00 2001 From: hongwei Date: Thu, 28 Nov 2024 08:53:16 +0100 Subject: [PATCH 2/4] refactor/added comments for the field --- .../rabbitmq/RabbitMQUtils.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index fd811e4c7..8a4a36ad2 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -86,18 +86,18 @@ object RabbitMQUtils extends MdcLoggable{ val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message. channel.queueDeclare( RPC_QUEUE_NAME, // Queue name - true, // durable: non-persis - false, // exclusive: non-excl4 - false, // autoDelete: delete - args // extra arguments + true, // durable: non-persis, here set durable = true + false, // exclusive: non-excl4, here set exclusive = false + false, // autoDelete: delete, here set autoDelete = false + args // extra arguments, ) val replyQueueName:String = channel.queueDeclare( - "amq.gen-"+UUID.randomUUID.toString, // Queue name, rabbitMq will create a unique name for this queue, eg: - true, // durable: non-persistent - false, // exclusive: non-exclusive - false, // autoDelete: delete when no consumers - args // extra arguments + "amq.gen-"+UUID.randomUUID.toString, // Queue name, it will be a unique name for each queue + true, // durable: non-persis, here set durable = true + false, // exclusive: non-excl4, here set exclusive = false + false, // autoDelete: delete, here set autoDelete = false + args // extra arguments, ).getQueue val rabbitResponseJsonFuture = { From 69558928a41df06b7ce4f6a24e5699ed42e6b0a7 Mon Sep 17 00:00:00 2001 From: hongwei Date: Thu, 28 Nov 2024 14:00:43 +0100 Subject: [PATCH 3/4] refactor/tweaked the reply queue name --- .../main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index 8a4a36ad2..cd2cc6103 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -93,7 +93,7 @@ object RabbitMQUtils extends MdcLoggable{ ) val replyQueueName:String = channel.queueDeclare( - "amq.gen-"+UUID.randomUUID.toString, // Queue name, it will be a unique name for each queue + "obp.gen-"+UUID.randomUUID.toString, // Queue name, it will be a unique name for each queue true, // durable: non-persis, here set durable = true false, // exclusive: non-excl4, here set exclusive = false false, // autoDelete: delete, here set autoDelete = false From 5ae8942d620cb1d663c10c6dd885774aa784df97 Mon Sep 17 00:00:00 2001 From: hongwei Date: Thu, 28 Nov 2024 15:41:04 +0100 Subject: [PATCH 4/4] refactor/revert the rabbitMq queue to classic --- .../code/bankconnectors/rabbitmq/RabbitMQUtils.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index cd2cc6103..bf70af473 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -40,12 +40,13 @@ object RabbitMQUtils extends MdcLoggable{ // automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time. args.put("x-expires", Integer.valueOf(60000)) args.put("x-message-ttl", Integer.valueOf(60000)) - args.put("x-queue-type", "quorum") + //args.put("x-queue-type", "quorum") // use the classic first. private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats val RPC_QUEUE_NAME: String = "obp_rpc_queue" + val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = "obp_reply_queue" class ResponseCallback(val rabbitCorrelationId: String, channel: Channel) extends DeliverCallback { @@ -93,10 +94,10 @@ object RabbitMQUtils extends MdcLoggable{ ) val replyQueueName:String = channel.queueDeclare( - "obp.gen-"+UUID.randomUUID.toString, // Queue name, it will be a unique name for each queue - true, // durable: non-persis, here set durable = true - false, // exclusive: non-excl4, here set exclusive = false - false, // autoDelete: delete, here set autoDelete = false + s"${RPC_REPLY_TO_QUEUE_NAME_PREFIX}_${messageId.replace("obp_","")}_${UUID.randomUUID.toString}", // Queue name, it will be a unique name for each queue + false, // durable: non-persis, here set durable = false + true, // exclusive: non-excl4, here set exclusive = true + true, // autoDelete: delete, here set autoDelete = true args // extra arguments, ).getQueue