SSL 2 way auth for kafka

This commit is contained in:
tawoe 2018-01-18 11:54:09 +01:00
parent 0770b89757
commit 189942e15a
5 changed files with 71 additions and 35 deletions

View File

@ -318,6 +318,18 @@ We use jetty8 to run the API in production mode.
Most internal OBP model data access now occurs over Akka. This is so the machine that has JDBC access to the OBP database can be physically separated from the OBP API layer. In this configuration we run two instances of OBP-API on two different machines and they communicate over Akka. Please see README.Akka.md for instructions.
## Using SSL Encryption with kafka
For SSL encryption we use jks keystores.
Note that both the keystore and the truststore (and all keys within) must have the same password for unlocking, for which
the api will stop at boot up and ask for.
* Edit your props file(s) to contain:
_kafka.use.ssl=true
keystore.path=/path/to/api.keystore.jks
truststore.path=/path/to/api.truststore.jks_
## Scala / Lift

View File

@ -95,8 +95,6 @@ import code.bankconnectors.vMar2017.InboundAdapterInfoInternal
*/
class Boot extends MdcLoggable {
var clientCertificatePw = ""
def boot {
val contextPath = LiftRules.context.path
@ -131,17 +129,6 @@ class Boot extends MdcLoggable {
* Looks third in the war file, following the normal lift naming rules
*
*/
print("Enter the Password for the SSL Certificate Stores: ")
//As most IDEs do not provide a Console, we fall back to readLine
clientCertificatePw = if (true) {
try {
System.console.readPassword().toString
} catch {
case e: NullPointerException => scala.io.StdIn.readLine()
}
} else {"notused"}
val firstChoicePropsDir = for {
propsPath <- propsPath
} yield {
@ -193,6 +180,16 @@ class Boot extends MdcLoggable {
DB.defineConnectionManager(net.liftweb.util.DefaultConnectionIdentifier, vendor)
}
print("Enter the Password for the SSL Certificate Stores: ")
//As most IDEs do not provide a Console, we fall back to readLine
code.api.util.APIUtil.initPasswd = if (Props.get("kafka.use.ssl").getOrElse("") == "true") {
try {
System.console.readPassword().toString
} catch {
case e: NullPointerException => scala.io.StdIn.readLine()
}
} else {"notused"}
// ensure our relational database's tables are created/fit the schema
val connector = Props.get("connector").openOrThrowException("no connector set")

View File

@ -383,6 +383,7 @@ object APIUtil extends MdcLoggable {
val emptyObjectJson = EmptyClassJson()
val defaultFilterFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
val fallBackFilterFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
var initPasswd = ""
import code.api.util.ErrorMessages._
def httpMethod : String =

View File

@ -9,6 +9,7 @@ import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import code.actorsystem.{ObpActorHelper, ObpActorInit}
import code.api.util.APIUtil.initPasswd
import code.bankconnectors.AvroSerializer
import code.kafka.Topics.TopicTrait
import code.util.Helper.MdcLoggable
@ -40,17 +41,26 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
*/
private def keyAndPartition = scala.util.Random.nextInt(partitions) + "_" + UUID.randomUUID().toString
private val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withClientId(clientId)
.withMaxWakeups(maxWakeups)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig)
.withProperty("security.protocol","SSL")
.withProperty("ssl.truststore.location", "/home/work/kafka/api.truststore.jks")
.withProperty("ssl.truststore.password", "redf1234")
.withProperty("ssl.keystore.location","/home/work/kafka/api.keystore.jks")
.withProperty("ssl.keystore.password", "redff1234")
private val consumerSettings = if (Props.get("kafka.use.ssl").getOrElse("false") == "true") {
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withClientId(clientId)
.withMaxWakeups(maxWakeups)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig)
.withProperty("security.protocol","SSL")
.withProperty("ssl.truststore.location", Props.get("truststore.path").getOrElse(""))
.withProperty("ssl.truststore.password", initPasswd)
.withProperty("ssl.keystore.location",Props.get("keystore.path").getOrElse(""))
.withProperty("ssl.keystore.password", initPasswd)
} else {
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withClientId(clientId)
.withMaxWakeups(maxWakeups)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig)
}
private val consumer: ((String, Int) => Source[ConsumerRecord[String, String], Consumer.Control]) = { (topic, partition) =>
val assignment = Subscriptions.assignmentWithOffset(new TopicPartition(topic, partition), 0)
@ -58,16 +68,22 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe
.completionTimeout(completionTimeout)
}
private val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
.withProperty("batch.size", "0")
.withParallelism(3)
.withProperty("security.protocol","SSL")
.withProperty("ssl.truststore.location", "/home/work/kafka/api.truststore.jks")
.withProperty("ssl.truststore.password", "redf1234")
.withProperty("ssl.keystore.location","/home/work/kafka/api.keystore.jks")
.withProperty("ssl.keystore.password", "redff1234")
//.withProperty("auto.create.topics.enable", "true")
private val producerSettings = if (Props.get("kafka.use.ssl").getOrElse("false") == "true") {
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
.withProperty("batch.size", "0")
.withParallelism(3)
.withProperty("security.protocol","SSL")
.withProperty("ssl.truststore.location", Props.get("truststore.path").getOrElse(""))
.withProperty("ssl.truststore.password", initPasswd)
.withProperty("ssl.keystore.location",Props.get("keystore.path").getOrElse(""))
.withProperty("ssl.keystore.password", initPasswd)
} else {
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
.withProperty("batch.size", "0")
.withParallelism(3)
}
private val producer = producerSettings
.createKafkaProducer()

View File

@ -27,13 +27,23 @@ broker.id=0
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
#this will enable both ssl and plain
#listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
#use this for ssl 2 way auth
#ssl.keystore.location=/path/to/server.keystore.jks
#ssl.keystore.password=password
#ssl.key.password=password
#ssl.client.auth=required
#ssl.truststore.location=/path/to/server.truststore.jks
#ssl.truststore.password=password
# The number of threads handling network requests
num.network.threads=3