diff --git a/README.md b/README.md index eb37cbb1f..26226bf25 100644 --- a/README.md +++ b/README.md @@ -318,6 +318,18 @@ We use jetty8 to run the API in production mode. Most internal OBP model data access now occurs over Akka. This is so the machine that has JDBC access to the OBP database can be physically separated from the OBP API layer. In this configuration we run two instances of OBP-API on two different machines and they communicate over Akka. Please see README.Akka.md for instructions. +## Using SSL Encryption with kafka + +For SSL encryption we use jks keystores. +Note that both the keystore and the truststore (and all keys within) must have the same password for unlocking, for which +the api will stop at boot up and ask for. + +* Edit your props file(s) to contain: + + _kafka.use.ssl=true + keystore.path=/path/to/api.keystore.jks + truststore.path=/path/to/api.truststore.jks_ + ## Scala / Lift diff --git a/src/main/scala/bootstrap/liftweb/Boot.scala b/src/main/scala/bootstrap/liftweb/Boot.scala index bc7a6b5c0..0d0f07571 100644 --- a/src/main/scala/bootstrap/liftweb/Boot.scala +++ b/src/main/scala/bootstrap/liftweb/Boot.scala @@ -95,8 +95,6 @@ import code.bankconnectors.vMar2017.InboundAdapterInfoInternal */ class Boot extends MdcLoggable { - var clientCertificatePw = "" - def boot { val contextPath = LiftRules.context.path @@ -131,17 +129,6 @@ class Boot extends MdcLoggable { * Looks third in the war file, following the normal lift naming rules * */ - - print("Enter the Password for the SSL Certificate Stores: ") - //As most IDEs do not provide a Console, we fall back to readLine - clientCertificatePw = if (true) { - try { - System.console.readPassword().toString - } catch { - case e: NullPointerException => scala.io.StdIn.readLine() - } - } else {"notused"} - val firstChoicePropsDir = for { propsPath <- propsPath } yield { @@ -193,6 +180,16 @@ class Boot extends MdcLoggable { DB.defineConnectionManager(net.liftweb.util.DefaultConnectionIdentifier, vendor) } + + print("Enter the Password for the SSL Certificate Stores: ") + //As most IDEs do not provide a Console, we fall back to readLine + code.api.util.APIUtil.initPasswd = if (Props.get("kafka.use.ssl").getOrElse("") == "true") { + try { + System.console.readPassword().toString + } catch { + case e: NullPointerException => scala.io.StdIn.readLine() + } + } else {"notused"} // ensure our relational database's tables are created/fit the schema val connector = Props.get("connector").openOrThrowException("no connector set") diff --git a/src/main/scala/code/api/util/APIUtil.scala b/src/main/scala/code/api/util/APIUtil.scala index cf1f53829..c9c9068f2 100644 --- a/src/main/scala/code/api/util/APIUtil.scala +++ b/src/main/scala/code/api/util/APIUtil.scala @@ -383,6 +383,7 @@ object APIUtil extends MdcLoggable { val emptyObjectJson = EmptyClassJson() val defaultFilterFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") val fallBackFilterFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ") + var initPasswd = "" import code.api.util.ErrorMessages._ def httpMethod : String = diff --git a/src/main/scala/code/kafka/kafkaStreamsHelper.scala b/src/main/scala/code/kafka/kafkaStreamsHelper.scala index 06fc828b7..5893a0309 100644 --- a/src/main/scala/code/kafka/kafkaStreamsHelper.scala +++ b/src/main/scala/code/kafka/kafkaStreamsHelper.scala @@ -9,6 +9,7 @@ import akka.pattern.pipe import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} import code.actorsystem.{ObpActorHelper, ObpActorInit} +import code.api.util.APIUtil.initPasswd import code.bankconnectors.AvroSerializer import code.kafka.Topics.TopicTrait import code.util.Helper.MdcLoggable @@ -40,17 +41,26 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe */ private def keyAndPartition = scala.util.Random.nextInt(partitions) + "_" + UUID.randomUUID().toString - private val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer) - .withBootstrapServers(bootstrapServers) - .withGroupId(groupId) - .withClientId(clientId) - .withMaxWakeups(maxWakeups) - .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig) - .withProperty("security.protocol","SSL") - .withProperty("ssl.truststore.location", "/home/work/kafka/api.truststore.jks") - .withProperty("ssl.truststore.password", "redf1234") - .withProperty("ssl.keystore.location","/home/work/kafka/api.keystore.jks") - .withProperty("ssl.keystore.password", "redff1234") + private val consumerSettings = if (Props.get("kafka.use.ssl").getOrElse("false") == "true") { + ConsumerSettings(system, new StringDeserializer, new StringDeserializer) + .withBootstrapServers(bootstrapServers) + .withGroupId(groupId) + .withClientId(clientId) + .withMaxWakeups(maxWakeups) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig) + .withProperty("security.protocol","SSL") + .withProperty("ssl.truststore.location", Props.get("truststore.path").getOrElse("")) + .withProperty("ssl.truststore.password", initPasswd) + .withProperty("ssl.keystore.location",Props.get("keystore.path").getOrElse("")) + .withProperty("ssl.keystore.password", initPasswd) + } else { + ConsumerSettings(system, new StringDeserializer, new StringDeserializer) + .withBootstrapServers(bootstrapServers) + .withGroupId(groupId) + .withClientId(clientId) + .withMaxWakeups(maxWakeups) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig) + } private val consumer: ((String, Int) => Source[ConsumerRecord[String, String], Consumer.Control]) = { (topic, partition) => val assignment = Subscriptions.assignmentWithOffset(new TopicPartition(topic, partition), 0) @@ -58,16 +68,22 @@ class KafkaStreamsHelperActor extends Actor with ObpActorInit with ObpActorHelpe .completionTimeout(completionTimeout) } - private val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) - .withBootstrapServers(bootstrapServers) - .withProperty("batch.size", "0") - .withParallelism(3) - .withProperty("security.protocol","SSL") - .withProperty("ssl.truststore.location", "/home/work/kafka/api.truststore.jks") - .withProperty("ssl.truststore.password", "redf1234") - .withProperty("ssl.keystore.location","/home/work/kafka/api.keystore.jks") - .withProperty("ssl.keystore.password", "redff1234") - //.withProperty("auto.create.topics.enable", "true") + private val producerSettings = if (Props.get("kafka.use.ssl").getOrElse("false") == "true") { + ProducerSettings(system, new StringSerializer, new StringSerializer) + .withBootstrapServers(bootstrapServers) + .withProperty("batch.size", "0") + .withParallelism(3) + .withProperty("security.protocol","SSL") + .withProperty("ssl.truststore.location", Props.get("truststore.path").getOrElse("")) + .withProperty("ssl.truststore.password", initPasswd) + .withProperty("ssl.keystore.location",Props.get("keystore.path").getOrElse("")) + .withProperty("ssl.keystore.password", initPasswd) + } else { + ProducerSettings(system, new StringSerializer, new StringSerializer) + .withBootstrapServers(bootstrapServers) + .withProperty("batch.size", "0") + .withParallelism(3) + } private val producer = producerSettings .createKafkaProducer() diff --git a/src/main/scripts/kafka/config/kafka.properties b/src/main/scripts/kafka/config/kafka.properties index 89c573375..7a5796aa4 100644 --- a/src/main/scripts/kafka/config/kafka.properties +++ b/src/main/scripts/kafka/config/kafka.properties @@ -27,13 +27,23 @@ broker.id=0 # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -#listeners=PLAINTEXT://:9092 +#this will enable both ssl and plain +#listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 + # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 +#use this for ssl 2 way auth +#ssl.keystore.location=/path/to/server.keystore.jks +#ssl.keystore.password=password +#ssl.key.password=password +#ssl.client.auth=required +#ssl.truststore.location=/path/to/server.truststore.jks +#ssl.truststore.password=password + # The number of threads handling network requests num.network.threads=3