From 92f2b2684eb60efdd4bc1c2ac609a8e7fe66057d Mon Sep 17 00:00:00 2001 From: hongwei Date: Wed, 27 Nov 2024 15:11:44 +0100 Subject: [PATCH] refactor/set the default RabbitMq queue to quorum --- .../Adapter/MockedRabbitMqAdapter.scala | 2 +- .../rabbitmq/RabbitMQUtils.scala | 33 +++++++++---------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala index bfd67dbe1..ca7646c3f 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala @@ -3100,7 +3100,7 @@ object MockedRabbitMqAdapter extends App with MdcLoggable{ connection = factory.newConnection() channel = connection.createChannel() - channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, RabbitMQUtils.args) + channel.basicQos(1) // stop after one consumed message since this is example code val serverCallback = new ServerCallback(channel) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index 7b238a8c5..fd811e4c7 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -40,6 +40,8 @@ object RabbitMQUtils extends MdcLoggable{ // automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time. args.put("x-expires", Integer.valueOf(60000)) args.put("x-message-ttl", Integer.valueOf(60000)) + args.put("x-queue-type", "quorum") + private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats @@ -82,13 +84,20 @@ object RabbitMQUtils extends MdcLoggable{ val connection = RabbitMQConnectionPool.borrowConnection() val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message. - channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, args) + channel.queueDeclare( + RPC_QUEUE_NAME, // Queue name + true, // durable: non-persis + false, // exclusive: non-excl4 + false, // autoDelete: delete + args // extra arguments + ) + val replyQueueName:String = channel.queueDeclare( - "", // Queue name - false, // durable: non-persistent - true, // exclusive: non-exclusive - true, // autoDelete: delete when no consumers - args // extra arguments + "amq.gen-"+UUID.randomUUID.toString, // Queue name, rabbitMq will create a unique name for this queue, eg: + true, // durable: non-persistent + false, // exclusive: non-exclusive + false, // autoDelete: delete when no consumers + args // extra arguments ).getQueue val rabbitResponseJsonFuture = { @@ -106,19 +115,7 @@ object RabbitMQUtils extends MdcLoggable{ val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel) channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback) -// // Add a timeout mechanism here: -// val timeout = 10 // seconds -// val start = System.currentTimeMillis() -// while (true) { -// Thread.sleep(100) -// if (System.currentTimeMillis() - start > timeout * 1000) { -// println("Request timed out") -// channel.close(); -// } -// } responseCallback.take() - - } catch { case e: Throwable =>{ logger.debug(s"${RabbitMQConnector_vOct2024.toString} inBoundJson exception: $messageId = ${e}")