Improved connection checking for kafka connector

This commit is contained in:
Petar Bozin 2016-08-22 14:51:35 +02:00
parent e1a4644510
commit 5ec69c44fd
2 changed files with 23 additions and 25 deletions

View File

@ -29,6 +29,7 @@ import kafka.consumer.{Consumer, _}
import kafka.message._
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.utils.Json
import net.liftweb.common.Loggable
import net.liftweb.json
import net.liftweb.json._
import net.liftweb.util.Props
@ -36,7 +37,7 @@ import net.liftweb.util.Props
class KafkaConsumer(val zookeeper: String = Props.get("kafka.zookeeper_host").openOrThrowException("no kafka.zookeeper_host set"),
val topic: String = Props.get("kafka.response_topic").openOrThrowException("no kafka.response_topic set"),
val delay: Long = 0) {
val delay: Long = 0) extends Loggable {
val zkProps = new Properties()
zkProps.put("log4j.logger.org.apache.zookeeper", "ERROR")
@ -86,18 +87,20 @@ class KafkaConsumer(val zookeeper: String = Props.get("kafka.zookeeper_host").op
}
}
catch {
case e:kafka.consumer.ConsumerTimeoutException => println("Exception: " + e.toString)
return json.parse("""{"error":"timeout"}""") //TODO: replace with standard message
case e:kafka.consumer.ConsumerTimeoutException =>
logger.error("KafkaConsumer: timeout")
return json.parse("""{"error":"KafkaConsumer timeout"}""") //TODO: replace with standard message
}
}
// disconnect from kafka
consumer.shutdown()
return json.parse("""{"info":"disconnected"}""") //TODO: replace with standard message
logger.info("KafkaProducer: shutdown")
return json.parse("""{"info":"KafkaConsumer shutdown"}""") //TODO: replace with standard message
}
}
case class KafkaProducer(
class KafkaProducer(
topic: String = Props.get("kafka.request_topic").openOrThrowException("no kafka.request_topic set"),
brokerList: String = Props.get("kafka.host")openOr("localhost:9092"),
clientId: String = UUID.randomUUID().toString,
@ -106,7 +109,7 @@ case class KafkaProducer(
batchSize: Integer = 200,
messageSendMaxRetries: Integer = 3,
requestRequiredAcks: Integer = -1
) {
) extends Loggable {
// determine compression codec
@ -138,7 +141,7 @@ case class KafkaProducer(
implicit val formats = DefaultFormats
def send(key: String, request: String, arguments: Map[String, String], partition: String = null): Unit = {
def send(key: String, request: String, arguments: Map[String, String], partition: String = null): Boolean = {
// create message using request and arguments strings
val reqCommand = Map(request -> arguments)
val message = Json.encode(reqCommand)
@ -146,14 +149,20 @@ case class KafkaProducer(
send(key.getBytes("UTF8"), message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
}
def send(key: Array[Byte], message: Array[Byte], partition: Array[Byte]): Unit = {
def send(key: Array[Byte], message: Array[Byte], partition: Array[Byte]): Boolean = {
try {
// actually send the message to kafka
producer.send(kafkaMesssage(key, message, partition))
} catch {
case e: Exception =>
case e: kafka.common.FailedToSendMessageException =>
logger.error("KafkaProducer: Failed to send message")
return false
case e: Throwable =>
logger.error("KafkaProducer: Unknown error while trying to send message")
e.printStackTrace()
return false
}
true
}
}

View File

@ -874,23 +874,12 @@ object KafkaMappedConnector extends Connector with CreateViewImpls with Loggable
def process(reqId: String, command: String, argList: Map[String,String]): json.JValue = { //List[Map[String,String]] = {
var retries:Int = 3
while (consumer == null && retries > 0 ) {
retries -= 1
consumer = new KafkaConsumer()
if (producer.send(reqId, command, argList, "1")) {
// Request sent, now we wait for response with the same reqId
val res = consumer.getResponse(reqId)
return res
}
retries = 3
while (producer == null && retries > 0) {
retries -= 1
producer = new KafkaProducer()
}
if (producer == null || consumer == null)
return json.parse("""{"error":"connection failed. try again later."}""")
// Send request to Kafka
producer.send(reqId, command, argList, "1")
// Request sent, now we wait for response with the same reqId
val res = consumer.getResponse(reqId)
res
return json.parse("""{"error":"could not send message to kafka"}""")
}