diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala index 662a1c651..bfd67dbe1 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala @@ -31,6 +31,7 @@ class ServerCallback(val ch: Channel) extends DeliverCallback with MdcLoggable{ val replyProps = new BasicProperties.Builder() .correlationId(delivery.getProperties.getCorrelationId) .contentType("application/json") + .expiration("60000") .messageId(obpMessageId) .build val message = new String(delivery.getBody, "UTF-8") @@ -3099,7 +3100,7 @@ object MockedRabbitMqAdapter extends App with MdcLoggable{ connection = factory.newConnection() channel = connection.createChannel() - channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) + channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, RabbitMQUtils.args) channel.basicQos(1) // stop after one consumed message since this is example code val serverCallback = new ServerCallback(channel) diff --git a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala index 0fa125cb5..7b238a8c5 100644 --- a/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala +++ b/obp-api/src/main/scala/code/bankconnectors/rabbitmq/RabbitMQUtils.scala @@ -34,6 +34,12 @@ object RabbitMQUtils extends MdcLoggable{ val keystorePassword = APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd) val truststorePath = APIUtil.getPropsValue("truststore.path").getOrElse("") val truststorePassword = APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd) + + val args = new util.HashMap[String, AnyRef]() + //60s It sets the time (in milliseconds) after which the queue will + // automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time. + args.put("x-expires", Integer.valueOf(60000)) + args.put("x-message-ttl", Integer.valueOf(60000)) private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats @@ -74,15 +80,9 @@ object RabbitMQUtils extends MdcLoggable{ val rabbitRequestJsonString: String = write(outBound) // convert OutBound to json string - val args = new util.HashMap[String, AnyRef]() - //60s It sets the time (in milliseconds) after which the queue will - // automatically be deleted if it is not used, i.e., if no consumer is connected to it during that time. - args.put("x-expires", Integer.valueOf(60000)) - - val connection = RabbitMQConnectionPool.borrowConnection() val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message. - channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) + channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, args) val replyQueueName:String = channel.queueDeclare( "", // Queue name false, // durable: non-persistent @@ -106,6 +106,16 @@ object RabbitMQUtils extends MdcLoggable{ val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel) channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback) +// // Add a timeout mechanism here: +// val timeout = 10 // seconds +// val start = System.currentTimeMillis() +// while (true) { +// Thread.sleep(100) +// if (System.currentTimeMillis() - start > timeout * 1000) { +// println("Request timed out") +// channel.close(); +// } +// } responseCallback.take()