refactor/Replace library elastic4s-http with elastic4s-client-esjava

This commit is contained in:
Marko Milić 2022-07-13 22:29:55 +02:00
parent 45fc67ce46
commit d9c74a7b69
3 changed files with 35 additions and 119 deletions

View File

@ -217,25 +217,11 @@
<artifactId>elasticsearch</artifactId>
<version>6.8.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.sksamuel.elastic4s/elastic4s-client-esjava -->
<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-http_${scala.version}</artifactId>
<version>6.1.1</version>
<!-- TODO try to switch to slf4j from log4j
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
</exclusions> -->
<artifactId>elastic4s-client-esjava_${scala.version}</artifactId>
<version>8.2.1</version>
</dependency>
<!-- for LiftConsole -->
<dependency>

View File

@ -1,33 +1,26 @@
package code.search
import java.nio.charset.Charset
import dispatch.{Http, url}
import code.util.Helper.MdcLoggable
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import net.liftweb.http.{InMemoryResponse, JsonResponse, LiftResponse}
import net.liftweb.json.JsonAST._
import net.liftweb.util.Helpers
import net.liftweb.util.Props
import dispatch._
import Defaults._
import net.liftweb.json
import java.util.Date
import code.api.util.APIUtil
import code.api.util.ErrorMessages._
import com.sksamuel.elastic4s.ElasticsearchClientUri
import org.elasticsearch.common.settings.Settings
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.mappings.FieldType._
import com.sksamuel.elastic4s.http.ElasticDsl._
import dispatch.as.String.charset
import code.util.Helper.MdcLoggable
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import dispatch.Defaults._
import dispatch.{Http, url, _}
import net.liftweb.common.{Box, Empty, Failure, Full}
import net.liftweb.http.provider.HTTPCookie
import net.liftweb.http.{InMemoryResponse, JsonResponse, LiftResponse}
import net.liftweb.json
import net.liftweb.json.JsonAST
import net.liftweb.json.JsonAST._
import net.liftweb.util.Helpers
import org.elasticsearch.common.settings.Settings
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.control.NoStackTrace
@ -250,22 +243,23 @@ class elasticsearchMetrics extends elasticsearch {
if (esIndex.contains(",")) throw new RuntimeException("Props error: es.metrics.index can not be a list")
var client:HttpClient = null
val props = ElasticProperties(s"http://$esHost:${esPortTCP.toInt}")
val client = ElasticClient(JavaClient(props))
// we must import the dsl
import com.sksamuel.elastic4s.ElasticDsl._
if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) && APIUtil.getPropsAsBoolValue("allow_elasticsearch_metrics", false) ) {
val settings = Settings.builder().put("cluster.name", APIUtil.getPropsValue("es.cluster.name", "elasticsearch")).build()
client = HttpClient(ElasticsearchClientUri(esHost, esPortTCP.toInt))
try {
client.execute {
createIndex(esIndex).mappings(
mapping("request") as (
textField("userId"),
textField("url"),
dateField("date"),
textField("userName"),
textField("appName"),
textField("developerEmail"),
textField("correlationId")
createIndex(s"$esIndex/request").mapping(
properties (
textField("userId"),
textField("url"),
dateField("date"),
textField("userName"),
textField("appName"),
textField("developerEmail"),
textField("correlationId")
)
)
}
@ -278,8 +272,10 @@ class elasticsearchMetrics extends elasticsearch {
def indexMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, correlationId: String) {
if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) && APIUtil.getPropsAsBoolValue("allow_elasticsearch_metrics", false) ) {
try {
// we must import the dsl
import com.sksamuel.elastic4s.ElasticDsl._
client.execute {
indexInto(esIndex / "request") fields (
indexInto(s"$esIndex/request") fields (
"userId" -> userId,
"url" -> url,
"date" -> date,
@ -304,77 +300,12 @@ class elasticsearchWarehouse extends elasticsearch {
override val esPortTCP = APIUtil.getPropsValue("es.warehouse.port.tcp","9300")
override val esPortHTTP = APIUtil.getPropsValue("es.warehouse.port.http","9200")
override val esIndex = APIUtil.getPropsValue("es.warehouse.index", "warehouse")
var client:HttpClient = null
val props = ElasticProperties(s"http://$esHost:${esPortTCP.toInt}")
var client: ElasticClient = null
if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) && APIUtil.getPropsAsBoolValue("allow_elasticsearch_warehouse", false) ) {
val settings = Settings.builder().put("cluster.name", APIUtil.getPropsValue("es.cluster.name", "elasticsearch")).build()
client = HttpClient(ElasticsearchClientUri(esHost, esPortTCP.toInt))
client = ElasticClient(JavaClient(props))
}
}
/*
class elasticsearchOBP extends elasticsearch {
override val esHost = APIUtil.getPropsValue("es.obp.host","localhost")
override val esPortTCP = APIUtil.getPropsValue("es.obp.port.tcp","9300")
override val esPortHTTP = APIUtil.getPropsValue("es.obp.port.tcp","9200")
override val esIndex = APIUtil.getPropsValue("es.obp.index", "obp")
val accountIndex = "account_v1.2.1"
val transactionIndex = "transaction_v1.2.1"
var client:TcpClient = null
if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) ) {
client = TcpClient.transport("elasticsearch://" + esHost + ":" + esPortTCP + ",")
client.execute {
create index accountIndex mappings (
"account" as (
"viewId" typed StringType,
"account" typed ObjectType
)
)
}
client.execute {
create index transactionIndex mappings (
"transaction" as (
"viewId" typed StringType,
"transaction" typed ObjectType
)
)
}
}
/*
Index objects in Elastic Search.
Use **the same** representations that we return in the REST API.
Use the name singular_object_name-version e.g. transaction-v1.2.1 for the index name / type
*/
// Index a Transaction
// Put into a index that has the viewId and version in the name.
def indexTransaction(viewId: String, transaction: TransactionJSON) {
if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) ) {
client.execute {
index into transactionIndex / "transaction" fields (
"viewId" -> viewId,
"transaction" -> transaction
)
}
}
}
// Index an Account
// Put into a index that has the viewId and version in the name.
def indexAccount(viewId: String, account: AccountJSON) {
if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) ) {
client.execute {
index into accountIndex / "account" fields (
"viewId" -> viewId,
"account" -> account
)
}
}
}
}
*/

View File

@ -1,6 +1,5 @@
package code.users
import cats.Now
import code.util.Helper.MdcLoggable
import net.liftweb.common.{Box, Full}
import net.liftweb.mapper.By