diff --git a/src/main/scala/code/bankconnectors/KafkaHelper.scala b/src/main/scala/code/bankconnectors/KafkaHelper.scala index 05a3aa896..39e6544ff 100644 --- a/src/main/scala/code/bankconnectors/KafkaHelper.scala +++ b/src/main/scala/code/bankconnectors/KafkaHelper.scala @@ -29,6 +29,7 @@ import kafka.consumer.{Consumer, _} import kafka.message._ import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.utils.Json +import net.liftweb.common.Loggable import net.liftweb.json import net.liftweb.json._ import net.liftweb.util.Props @@ -36,7 +37,7 @@ import net.liftweb.util.Props class KafkaConsumer(val zookeeper: String = Props.get("kafka.zookeeper_host").openOrThrowException("no kafka.zookeeper_host set"), val topic: String = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set"), - val delay: Long = 0) { + val delay: Long = 0) extends Loggable { val zkProps = new Properties() zkProps.put("log4j.logger.org.apache.zookeeper", "ERROR") @@ -86,18 +87,20 @@ class KafkaConsumer(val zookeeper: String = Props.get("kafka.zookeeper_host").op } } catch { - case e:kafka.consumer.ConsumerTimeoutException => println("Exception: " + e.toString) - return json.parse("""{"error":"timeout"}""") //TODO: replace with standard message + case e:kafka.consumer.ConsumerTimeoutException => + logger.error("KafkaConsumer: timeout") + return json.parse("""{"error":"KafkaConsumer timeout"}""") //TODO: replace with standard message } } // disconnect from kafka consumer.shutdown() - return json.parse("""{"info":"disconnected"}""") //TODO: replace with standard message + logger.info("KafkaProducer: shutdown") + return json.parse("""{"info":"KafkaConsumer shutdown"}""") //TODO: replace with standard message } } -case class KafkaProducer( +class KafkaProducer( topic: String = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set"), brokerList: String = Props.get("kafka.host")openOr("localhost:9092"), clientId: String = UUID.randomUUID().toString, @@ -106,7 +109,7 @@ case class KafkaProducer( batchSize: Integer = 200, messageSendMaxRetries: Integer = 3, requestRequiredAcks: Integer = -1 - ) { + ) extends Loggable { // determine compression codec @@ -138,7 +141,7 @@ case class KafkaProducer( implicit val formats = DefaultFormats - def send(key: String, request: String, arguments: Map[String, String], partition: String = null): Unit = { + def send(key: String, request: String, arguments: Map[String, String], partition: String = null): Boolean = { // create message using request and arguments strings val reqCommand = Map(request -> arguments) val message = Json.encode(reqCommand) @@ -146,14 +149,20 @@ case class KafkaProducer( send(key.getBytes("UTF8"), message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8")) } - def send(key: Array[Byte], message: Array[Byte], partition: Array[Byte]): Unit = { + def send(key: Array[Byte], message: Array[Byte], partition: Array[Byte]): Boolean = { try { // actually send the message to kafka producer.send(kafkaMesssage(key, message, partition)) } catch { - case e: Exception => + case e: kafka.common.FailedToSendMessageException => + logger.error("KafkaProducer: Failed to send message") + return false + case e: Throwable => + logger.error("KafkaProducer: Unknown error while trying to send message") e.printStackTrace() + return false } + true } } diff --git a/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala b/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala index d13c7b5df..3eee4adca 100644 --- a/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala +++ b/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala @@ -874,23 +874,12 @@ object KafkaMappedConnector extends Connector with CreateViewImpls with Loggable def process(reqId: String, command: String, argList: Map[String,String]): json.JValue = { //List[Map[String,String]] = { - var retries:Int = 3 - while (consumer == null && retries > 0 ) { - retries -= 1 - consumer = new KafkaConsumer() + if (producer.send(reqId, command, argList, "1")) { + // Request sent, now we wait for response with the same reqId + val res = consumer.getResponse(reqId) + return res } - retries = 3 - while (producer == null && retries > 0) { - retries -= 1 - producer = new KafkaProducer() - } - if (producer == null || consumer == null) - return json.parse("""{"error":"connection failed. try again later."}""") - // Send request to Kafka - producer.send(reqId, command, argList, "1") - // Request sent, now we wait for response with the same reqId - val res = consumer.getResponse(reqId) - res + return json.parse("""{"error":"could not send message to kafka"}""") }