mirror of
https://github.com/OpenBankProject/OBP-API.git
synced 2026-02-06 17:37:00 +00:00
refactor/set the default RabbitMq queue to quorum
This commit is contained in:
parent
9752973220
commit
92f2b2684e
@ -3100,7 +3100,7 @@ object MockedRabbitMqAdapter extends App with MdcLoggable{
|
||||
|
||||
connection = factory.newConnection()
|
||||
channel = connection.createChannel()
|
||||
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, RabbitMQUtils.args)
|
||||
|
||||
channel.basicQos(1)
|
||||
// stop after one consumed message since this is example code
|
||||
val serverCallback = new ServerCallback(channel)
|
||||
|
||||
@ -40,6 +40,8 @@ object RabbitMQUtils extends MdcLoggable{
|
||||
// automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time.
|
||||
args.put("x-expires", Integer.valueOf(60000))
|
||||
args.put("x-message-ttl", Integer.valueOf(60000))
|
||||
args.put("x-queue-type", "quorum")
|
||||
|
||||
|
||||
private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats
|
||||
|
||||
@ -82,13 +84,20 @@ object RabbitMQUtils extends MdcLoggable{
|
||||
|
||||
val connection = RabbitMQConnectionPool.borrowConnection()
|
||||
val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message.
|
||||
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, args)
|
||||
channel.queueDeclare(
|
||||
RPC_QUEUE_NAME, // Queue name
|
||||
true, // durable: non-persis
|
||||
false, // exclusive: non-excl4
|
||||
false, // autoDelete: delete
|
||||
args // extra arguments
|
||||
)
|
||||
|
||||
val replyQueueName:String = channel.queueDeclare(
|
||||
"", // Queue name
|
||||
false, // durable: non-persistent
|
||||
true, // exclusive: non-exclusive
|
||||
true, // autoDelete: delete when no consumers
|
||||
args // extra arguments
|
||||
"amq.gen-"+UUID.randomUUID.toString, // Queue name, rabbitMq will create a unique name for this queue, eg:
|
||||
true, // durable: non-persistent
|
||||
false, // exclusive: non-exclusive
|
||||
false, // autoDelete: delete when no consumers
|
||||
args // extra arguments
|
||||
).getQueue
|
||||
|
||||
val rabbitResponseJsonFuture = {
|
||||
@ -106,19 +115,7 @@ object RabbitMQUtils extends MdcLoggable{
|
||||
|
||||
val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel)
|
||||
channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback)
|
||||
// // Add a timeout mechanism here:
|
||||
// val timeout = 10 // seconds
|
||||
// val start = System.currentTimeMillis()
|
||||
// while (true) {
|
||||
// Thread.sleep(100)
|
||||
// if (System.currentTimeMillis() - start > timeout * 1000) {
|
||||
// println("Request timed out")
|
||||
// channel.close();
|
||||
// }
|
||||
// }
|
||||
responseCallback.take()
|
||||
|
||||
|
||||
} catch {
|
||||
case e: Throwable =>{
|
||||
logger.debug(s"${RabbitMQConnector_vOct2024.toString} inBoundJson exception: $messageId = ${e}")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user