refactor/tweaked default setting for rabbitMq

This commit is contained in:
hongwei 2024-11-19 10:29:49 +01:00
parent 7854d4fedf
commit cc8a963063
2 changed files with 19 additions and 8 deletions

View File

@ -31,6 +31,7 @@ class ServerCallback(val ch: Channel) extends DeliverCallback with MdcLoggable{
val replyProps = new BasicProperties.Builder()
.correlationId(delivery.getProperties.getCorrelationId)
.contentType("application/json")
.expiration("60000")
.messageId(obpMessageId)
.build
val message = new String(delivery.getBody, "UTF-8")
@ -3099,7 +3100,7 @@ object MockedRabbitMqAdapter extends App with MdcLoggable{
connection = factory.newConnection()
channel = connection.createChannel()
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null)
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, RabbitMQUtils.args)
channel.basicQos(1)
// stop after one consumed message since this is example code
val serverCallback = new ServerCallback(channel)

View File

@ -34,6 +34,12 @@ object RabbitMQUtils extends MdcLoggable{
val keystorePassword = APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd)
val truststorePath = APIUtil.getPropsValue("truststore.path").getOrElse("")
val truststorePassword = APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd)
val args = new util.HashMap[String, AnyRef]()
//60s It sets the time (in milliseconds) after which the queue will
// automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time.
args.put("x-expires", Integer.valueOf(60000))
args.put("x-message-ttl", Integer.valueOf(60000))
private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats
@ -74,15 +80,9 @@ object RabbitMQUtils extends MdcLoggable{
val rabbitRequestJsonString: String = write(outBound) // convert OutBound to json string
val args = new util.HashMap[String, AnyRef]()
//60s It sets the time (in milliseconds) after which the queue will
// automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time.
args.put("x-expires", Integer.valueOf(60000))
val connection = RabbitMQConnectionPool.borrowConnection()
val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message.
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null)
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, args)
val replyQueueName:String = channel.queueDeclare(
"", // Queue name
false, // durable: non-persistent
@ -106,6 +106,16 @@ object RabbitMQUtils extends MdcLoggable{
val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel)
channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback)
// // Add a timeout mechanism here:
// val timeout = 10 // seconds
// val start = System.currentTimeMillis()
// while (true) {
// Thread.sleep(100)
// if (System.currentTimeMillis() - start > timeout * 1000) {
// println("Request timed out")
// channel.close();
// }
// }
responseCallback.take()