11package otoroshi .statefulclients
22
3- import com .sksamuel .pulsar4s .{ DefaultPulsarClient , PulsarClient , PulsarClientConfig }
4- import org . apache . pulsar . client . impl . auth .{ AuthenticationBasic , AuthenticationToken }
3+ import com .sksamuel .pulsar4s .PulsarClient
4+ import otoroshi . events .{ PulsarConfig , PulsarSetting }
55import otoroshi .utils .syntax .implicits ._
6- import play .api .libs .json .{ JsObject , Json }
6+ import play .api .libs .json .JsObject
77
88import java .util .concurrent .atomic .AtomicBoolean
99
1010object PulsarStatefulClientConfig {
11- def apply (obj : JsObject ) = new PulsarStatefulClientConfig (
12- uri = obj.select(" uri" ).asString,
13- tlsTrustCertsFilePath = obj.select(" tlsTrustCertsFilePath" ).asOpt[String ],
14- token = obj.select(" token" ).asOpt[String ].filter(_.trim.nonEmpty),
15- username = obj.select(" username" ).asOpt[String ].filter(_.trim.nonEmpty),
16- password = obj.select(" password" ).asOpt[String ].filter(_.trim.nonEmpty),
17- tlsHostnameVerification = obj.select(" tlsHostnameVerification" ).asOpt[Boolean ].getOrElse(false ),
18- tlsAllowInsecure = obj.select(" tlsAllowInsecure" ).asOpt[Boolean ].getOrElse(false )
19- )
11+ def apply (obj : JsObject ) = new PulsarStatefulClientConfig (PulsarConfig .format.reads(obj).get)
2012}
2113
22- case class PulsarStatefulClientConfig (
23- uri : String ,
24- tlsTrustCertsFilePath : Option [String ] = None ,
25- token : Option [String ] = None ,
26- username : Option [String ] = None ,
27- password : Option [String ] = None ,
28- tlsHostnameVerification : Boolean = false ,
29- tlsAllowInsecure : Boolean = false
30- ) extends StatefulClientConfig [PulsarClient ] {
14+ case class PulsarStatefulClientConfig (config : PulsarConfig ) extends StatefulClientConfig [PulsarClient ] {
3115
3216 private val open = new AtomicBoolean (false )
3317
34- override def start (): PulsarClient = {
35- val client = if (tlsTrustCertsFilePath.isDefined) {
36- val builder = org.apache.pulsar.client.api.PulsarClient
37- .builder()
38- .serviceUrl(uri)
39- .enableTlsHostnameVerification(tlsHostnameVerification)
40- .allowTlsInsecureConnection(tlsAllowInsecure)
41- tlsTrustCertsFilePath.foreach(builder.tlsTrustCertsFilePath)
42- token.foreach(t => builder.authentication(new AuthenticationToken (t)))
43- new DefaultPulsarClient (builder.build())
44- } else {
45- val config = PulsarClientConfig (
46- serviceUrl = uri,
47- authentication = token.map(t => new AuthenticationToken (t))
48- .orElse(for {
49- u <- username
50- p <- password
51- } yield {
52- val auth = new AuthenticationBasic ()
53- auth.configure(Json .stringify(Json .obj(" userId" -> u, " password" -> p)))
54- auth
55- })
56- )
57- PulsarClient (config)
58- }
18+ override def start (env : otoroshi.env.Env ): PulsarClient = {
19+ val client = PulsarSetting .client(env, config)
5920 open.set(true )
6021 client
6122 }
@@ -68,12 +29,7 @@ case class PulsarStatefulClientConfig(
6829 override def isOpen (client : PulsarClient ): Boolean = open.get()
6930
7031 override def sameConfig (other : StatefulClientConfig [_]): Boolean = other match {
71- case p : PulsarStatefulClientConfig =>
72- p.uri == uri &&
73- p.tlsTrustCertsFilePath == tlsTrustCertsFilePath &&
74- p.token == token &&
75- p.username == username &&
76- p.password == password
32+ case p : PulsarStatefulClientConfig => p.config == config
7733 case _ => false
7834 }
7935}
0 commit comments