feature/added RabbitMQ-step1

This commit is contained in:
hongwei 2024-10-04 06:59:13 +02:00
parent 18eedd3a33
commit e7e0a5a55a
3 changed files with 158 additions and 0 deletions

View File

@ -493,6 +493,14 @@
<artifactId>okhttp</artifactId>
<version>4.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.22.0</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,75 @@
package code.bankconnectors.vSept2018
import java.util.UUID
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._
class ResponseCallback(val corrId: String) extends DeliverCallback {
val response: BlockingQueue[String] = new ArrayBlockingQueue[String](1)
override def handle(consumerTag: String, message: Delivery): Unit = {
if (message.getProperties.getCorrelationId.equals(corrId)) {
response.offer(new String(message.getBody, "UTF-8"))
}
}
def take(): String = {
response.take();
}
}
class RPCClient(host: String) {
val factory = new ConnectionFactory()
factory.setHost(host)
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
val requestQueueName: String = "rpc_queue"
val replyQueueName: String = channel.queueDeclare().getQueue
def call(message: String): String = {
val corrId = UUID.randomUUID().toString
val props = new BasicProperties.Builder().correlationId(corrId)
.replyTo(replyQueueName)
.build()
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"))
val responseCallback = new ResponseCallback(corrId)
channel.basicConsume(replyQueueName, true, responseCallback, _ => { })
responseCallback.take()
}
def close() {
connection.close()
}
}
object RPCClient {
def main(argv: Array[String]) {
var fibonacciRpc: RPCClient = null
var response: String = null
try {
val host = if (argv.isEmpty) "localhost" else argv(0)
fibonacciRpc = new RPCClient(host)
println(" [x] Requesting fib(30)")
response = fibonacciRpc.call("30")
println(" [.] Got '" + response + "'")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close()
} catch {
case ignore: Exception =>
}
}
}
}
}

View File

@ -0,0 +1,75 @@
package code.bankconnectors.vSept2018
import java.util.concurrent.CountDownLatch
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._
class ServerCallback(val ch: Channel, val latch: CountDownLatch) extends DeliverCallback {
override def handle(consumerTag: String, delivery: Delivery): Unit = {
var response: String = null
val replyProps = new BasicProperties.Builder()
.correlationId(delivery.getProperties.getCorrelationId)
.build
try {
val message = new String(delivery.getBody, "UTF-8")
val n = java.lang.Integer.parseInt(message)
println(" [.] fib(" + message + ")")
response = "" + Fibonacci.fib(n)
} catch {
case e: Exception => {
println(" [.] " + e.toString)
response = ""
}
} finally {
ch.basicPublish("", delivery.getProperties.getReplyTo, replyProps, response.getBytes("UTF-8"))
ch.basicAck(delivery.getEnvelope.getDeliveryTag, false)
latch.countDown()
}
}
}
object Fibonacci {
def fib(n: Int): Int = {
if (n == 0) return 0
if (n == 1) return 1
fib(n - 1) + fib(n - 2)
}
}
object RPCServer {
private val RPC_QUEUE_NAME = "rpc_queue"
def main(argv: Array[String]) {
var connection: Connection = null
var channel: Channel = null
try {
val factory = new ConnectionFactory()
factory.setHost("localhost")
connection = factory.newConnection()
channel = connection.createChannel()
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null)
channel.basicQos(1)
// stop after one consumed message since this is example code
val latch = new CountDownLatch(1)
val serverCallback = new ServerCallback(channel, latch)
channel.basicConsume(RPC_QUEUE_NAME, false, serverCallback, _ => { })
println(" [x] Awaiting RPC requests")
latch.await()
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
try {
// connection.close()
} catch {
case ignore: Exception =>
}
}
}
}
}