Merge branch 'develop' of github.com:OpenBankProject/OBP-API into develop

This commit is contained in:
simonredfern 2024-11-30 07:34:50 +01:00
commit 5f08430c31
2 changed files with 17 additions and 19 deletions

View File

@ -3100,7 +3100,7 @@ object MockedRabbitMqAdapter extends App with MdcLoggable{
connection = factory.newConnection()
channel = connection.createChannel()
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, RabbitMQUtils.args)
channel.basicQos(1)
// stop after one consumed message since this is example code
val serverCallback = new ServerCallback(channel)

View File

@ -40,10 +40,13 @@ object RabbitMQUtils extends MdcLoggable{
// automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time.
args.put("x-expires", Integer.valueOf(60000))
args.put("x-message-ttl", Integer.valueOf(60000))
//args.put("x-queue-type", "quorum") // use the classic first.
private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats
val RPC_QUEUE_NAME: String = "obp_rpc_queue"
val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = "obp_reply_queue"
class ResponseCallback(val rabbitCorrelationId: String, channel: Channel) extends DeliverCallback {
@ -82,13 +85,20 @@ object RabbitMQUtils extends MdcLoggable{
val connection = RabbitMQConnectionPool.borrowConnection()
val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message.
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, args)
channel.queueDeclare(
RPC_QUEUE_NAME, // Queue name
true, // durable: non-persis, here set durable = true
false, // exclusive: non-excl4, here set exclusive = false
false, // autoDelete: delete, here set autoDelete = false
args // extra arguments,
)
val replyQueueName:String = channel.queueDeclare(
"", // Queue name
false, // durable: non-persistent
true, // exclusive: non-exclusive
true, // autoDelete: delete when no consumers
args // extra arguments
s"${RPC_REPLY_TO_QUEUE_NAME_PREFIX}_${messageId.replace("obp_","")}_${UUID.randomUUID.toString}", // Queue name, it will be a unique name for each queue
false, // durable: non-persis, here set durable = false
true, // exclusive: non-excl4, here set exclusive = true
true, // autoDelete: delete, here set autoDelete = true
args // extra arguments,
).getQueue
val rabbitResponseJsonFuture = {
@ -106,19 +116,7 @@ object RabbitMQUtils extends MdcLoggable{
val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel)
channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback)
// // Add a timeout mechanism here:
// val timeout = 10 // seconds
// val start = System.currentTimeMillis()
// while (true) {
// Thread.sleep(100)
// if (System.currentTimeMillis() - start > timeout * 1000) {
// println("Request timed out")
// channel.close();
// }
// }
responseCallback.take()
} catch {
case e: Throwable =>{
logger.debug(s"${RabbitMQConnector_vOct2024.toString} inBoundJson exception: $messageId = ${e}")