Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Idiomatic, typesafe, and reactive Scala client for Apache Pulsar

License

NotificationsYou must be signed in to change notification settings

CleverCloud/pulsar4s

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

build

pulsar4s is a concise, idiomatic, reactive, type safe Scala client forApache Pulsar.As a simple wrapper over the Java client, we benefit from the reliability and performance of that client while providing better integration with the Scala ecosystem and idioms.

Warning!!

(This disclaimer was written on 2023-01-05.)

Starting in version 2.9.0, we support scala 3. This means we had to perform some"aggressive" bumps on libs:

Libs that were bumped for everyone:

  • play-json 2.10 (Currently in RC7)
  • cats-effect 3.3 (was 2.x)
  • ZIO 2.0 (was 1.x) & zio-cats-interop 23.0.0

Libs that come in different versions across scala versions:

  • avro4s
    • for Scala 3: 5.0+
    • for Scala 2: 4.1+
  • scala-java8-compat
    • for Scala ≥2.13: 1.0.2
    • for Scala 2.12: 0.8.0 (Was already the case before scala 3)

Check carefully that bumping pulsar4s will not break, especially withcats-effect!

Using the client

The first step is to create a client attached to the pulsar cluster, providing the service url.

valclient=PulsarClient("pulsar://localhost:6650")

Alternatively, you can use an instance ofPulsarClientConfig if you need to set further configurationoptions such as authentication, tls, timeouts and so on.

valconfig=PulsarClientConfig("pulsar://localhost:6650", ...)valclient=PulsarClient(config)

Then we can create either a producer or a consumer from the client. We need an implicit schema in scope - more on that later.

To create a producer, we need the topic, and an instance ofProducerConfig.We can set further options on the config object, such as max pending messages, router mode, producer name and so on.

implicitvalschema:Schema[String]=Schema.STRINGvaltopic=Topic("persistent://sample/standalone/ns1/b")valproducerConfig=ProducerConfig(topic, ...)valproducer= client.producer[String](producerConfig)

To create a consumer, we need one or more topics to subscribe to, the subscription name, and an instance ofConsumerConfig.We can set further options on the config object, such as subscription type, consumer name, queue size and so on.

implicitvalschema:Schema[String]=Schema.STRINGvaltopic=Topic("persistent://sample/standalone/ns1/b")valconsumerConfig=ConsumerConfig(Seq(topic),Subscription("mysub"), ...)valconsumerFn= client.consumer[String](ConsumerConfig(, )

Note: Callclose() on the client, producer, and consumer once you are finished. The client and producer also implementAutoCloseable andCloseable.

Schemas

A message must be the correct type for the producer or consumer. When a producer or consumer is created,an implicitSchema typeclass must be available. In the earlier examples, you saw that we added an implicit schema for String usingimplicit val schema: Schema[String] = Schema.STRING.

There are built in schemas for bytes and strings, but other complex types required a custom schema.Some people prefer to write custom typeclasses manually for the types they need to support.Other people like to just have it done automagically. For those people, pulsar4s provides extensionsfor the well known Scala Json libraries that can be used to generate messages where the bodyis a JSON representation of the class.

An example of creating a producer for a complex type using the circe json library to generate the schema:

importio.circe.generic.auto._importcom.sksamuel.pulsar4s.circe._valtopic=Topic("persistent://sample/standalone/ns1/b")valproducer= client.producer[Food](ProducerConfig(topic))producer.send(Food("pizza","ham and pineapple"))

Note: The imports bring into scope a method that will generate an implicit schema when required.

The following extension modules can be used for automatic schemas

LibraryModuleImport
Circepulsar4s-circeimport io.circe.generic.auto._
import com.sksamuel.pulsar4s.circe._
Jacksonpulsar4s-jacksonimport com.sksamuel.pulsar4s.jackson._
Json4spulsar4s-json4simport com.sksamuel.pulsar4s.json4s._
Spray Jsonpulsar4s-spray-jsonimport com.sksamuel.pulsar4s.sprayjson._
Play Jsonpulsar4s-play-jsonimport com.sksamuel.pulsar4s.playjson._

Producing

There are two ways to send a message - either with a plain value, or with an instance ofProducerMessage.If you do not need to specify extra options on the message - such as key, event time, headers, etc - then you can just senda plain value, and the client will wrap the value in a pulsar message. Alternatively, you can create an instance ofProducerMessageto specify extra options.

Each method can be synchronous or asynchronous. The asynchronous methods return ascala.concurrent.Future.If you are using another effect library, such as cats, scalaz or monix, then pulsar4salso supports those effects. See the section on #effects.

If the send method is successful, you will receive theMessageId of the generated message. If an exception is generated, then in the synchronous methods, you will receive aFailure with the error. In the asynchronousmethods the exception will be surfaced as a failed Future.

To send a plain value, we just invokesend with the value:

producer.send("wibble")

Or to send a message, we first create an instance ofProducerMessage.

valmessage=DefaultProducerMessage(Some("mykey"),"wibble", eventTime=Some(EventTime(System.currentTimeMillis)))producer.send(message)

Consuming

To receive a message, create a consumer and invoke either thereceive,receive(Duration), or thereceiveAsync methods.The first two are synchronous and return an instance ofConsumerMessage, blocking if necessary, and the latter is asynchronous, returninga Future (or other effect) with theConsumerMessage once ready.

valmessage:Message= consumer.receive

or

valmessage:Future[T]= consumer.receiveAsync

Once a message has been consumed, it is important to acknowledge the message by using the message id with the ack methods.

consumer.acknowledge(message.messageId)

Akka Streams

Pulsar4s integrates with the outstandingakka-streams library - it provides both a source and a sink.To use this, you need to add a dependency on thepulsar4s-akka-streams module.

Sources

To create a source all that is required is a function that will create a consumer on demand and the message id to seek.The function must return a fresh consumer each time it is invoked.The consumer is just a regular pulsar4sConsumer and can be created in the normal way, for example.

valtopic=Topic("persistent://sample/standalone/ns1/b")valconsumerFn= ()=> client.consumer(ConsumerConfig(topic, subscription))

We pass that function into the source method, providing the seek. Note the imports.

importcom.sksamuel.pulsar4s.akka.streams._valpulsarSource= source(consumerFn,Some(MessageId.earliest))

The materialized value of the source is an instance ofControl which provides a method called 'close' which can be used to stop consuming messages.Once the akka streams source is completed (or fails) the consumer will be automatically closed.

Sinks

To create a sink, we need a producer function similar to the source's consumer function.Again, the producer used is just a regular pulsar4sProducer.The function must return a fresh producer each time it is invoked.

valtopic=Topic("persistent://sample/standalone/ns1/b")valproducerFn= ()=> client.producer(ProducerConfig(topic))

We pass that function into the sink method. Once again, take note of the imports.

importcom.sksamuel.pulsar4s.akka.streams._valpulsarSink= sink(producerFn)

A sink requires messages of typeProducerMessage[T] where T is the value type of the message. For example, if we were producingString messages, then we would map our upstream messages into instances ofProducerMessage[String] before passing them to the sink.

importcom.sksamuel.pulsar4s.akka.streams._Source.fromIterator(()=>List("a","b","c","d").iterator)  .map(string=>ProducerMessage(string))  .runWith(sink(producerFn))

A sink will run until the upstream source completes. In other words, to terminate the sink, the source must be cancelled or completed.Once the sink completes the producer will be automatically closed.

The materialized value of the sink is aFuture[Done] which will be completed once the upstream source has completed.

There is also an implementation of a 'multi-sink'.Multi-sink allows to produce to multiple topics in Pulsar, while using just 1 sink.Multi-sink expects, in addition toProducerMessage[T], aTopic, so the input format is(Topic, ProducerMessage[T]).All producers in the sink are lazily-created, once a tuple with a new topic is received.There is also a possibility to provide a collection of topics in the constructing function, to create those topicsahead of time if the names are known. New topics read from the stream will also be created on-the-fly.

Example usage of a multi-sink:

importcom.sksamuel.pulsar4s.akka.streams._valtopic1=Topic("persistent://sample/standalone/ns1/b")valtopic2=Topic("persistent://sample/standalone/ns1/bb")valproducerFn= (topic:Topic)=> client.producer(ProducerConfig(topic))valpulsarMultiSink= multiSink(producerFn)# or to create those topics ahead oftime:valpulsarMultiSink2= multiSink(producerFn,Set(topic1, topic2))

Full Example

Here is a full example of consuming from a topic for 10 seconds, publising the messages back into another topic.Obviously this is a bit of a toy example but shows everything in one place.

importcom.sksamuel.pulsar4s.{ConsumerConfig,MessageId,ProducerConfig,PulsarClient,Subscription,Topic}importorg.apache.pulsar.client.api.Schemaimplicitvalsystem:ActorSystem=ActorSystem()implicitvalmaterializer:ActorMaterializer=ActorMaterializer()implicitvalschema:Schema[Array[Byte]]=Schema.BYTESvalclient=PulsarClient("pulsar://localhost:6650")valintopic=Topic("persistent://sample/standalone/ns1/in")valouttopic=Topic("persistent://sample/standalone/ns1/out")valconsumerFn= ()=> client.consumer(ConsumerConfig(Seq(intopic),Subscription("mysub")))valproducerFn= ()=> client.producer(ProducerConfig(outtopic))valcontrol= source(consumerFn,Some(MessageId.earliest))                .map { consumerMessage=>ProducerMessage(consumerMessage.data) }                .to(sink(producerFn)).run()Thread.sleep(10000)control.close()

FS2 Support

Pulsar4s integrates with thefs2 library - it provides both a source and a sink.To use this, you need to add a dependency on thepulsar4s-{effect} +pulsar4s-fs2 module.

Example

importcom.sksamuel.pulsar4s._importcom.sksamuel.pulsar4s.cats.CatsAsyncHandler._importcom.sksamuel.pulsar4s.fs2.Streamsimportorg.apache.pulsar.client.api.Schemaimplicitvalschema:Schema[Array[Byte]]=Schema.BYTESvalclient=PulsarClient("pulsar://localhost:6650")valintopic=Topic("persistent://sample/standalone/ns1/in")valouttopic=Topic("persistent://sample/standalone/ns1/out")Streams.batch[IO,Array[Byte]](client.consumerAsync[Array[Byte],IO](ConsumerConfig(  subscriptionName=Subscription("mysub"),  topics=Seq(intopic),  subscriptionInitialPosition=Some(SubscriptionInitialPosition.Earliest))))  .map(_.map(ProducerMessage(_.value)))  .through(Streams.committableSink(client.producerAsync[Array[Byte],IO](ProducerConfig(outtopic))))  .compile  .drain

Example SBT Setup

valpulsar4sVersion="x.x.x"libraryDependencies++=Seq("com.clever-cloud.pulsar4s"%%"pulsar4s-core"% pulsar4sVersion,// for the akka-streams integration"com.clever-cloud.pulsar4s"%%"pulsar4s-akka-streams"% pulsar4sVersion,// if you want to use avro for schemas"com.clever-cloud.pulsar4s"%%"pulsar4s-avro"% pulsar4sVersion,// if you want to use circe for schemas"com.clever-cloud.pulsar4s"%%"pulsar4s-circe"% pulsar4sVersion,// if you want to use json4s for schemas"com.clever-cloud.pulsar4s"%%"pulsar4s-json4s"% pulsar4sVersion,// if you want to use jackson for schemas"com.clever-cloud.pulsar4s"%%"pulsar4s-jackson"% pulsar4sVersion,// if you want to use spray-json for schemas"com.clever-cloud.pulsar4s"%%"pulsar4s-spray-json"% pulsar4sVersion,// if you want to use play-json for schemas"com.clever-cloud.pulsar4s"%%"pulsar4s-play-json"% pulsar4sVersion,// if you want to use monix effects"com.clever-cloud.pulsar4s"%%"pulsar4s-monix"% pulsar4sVersion,// if you want to use scalaz effects"com.clever-cloud.pulsar4s"%%"pulsar4s-scalaz"% pulsar4sVersion,// if you want to use cats effects"com.clever-cloud.pulsar4s"%%"pulsar4s-cats-effect"% pulsar4sVersion,// if you want to use fs2"com.clever-cloud.pulsar4s"%%"pulsar4s-fs2"% pulsar4sVersion,// if you want to use zio"com.clever-cloud.pulsar4s"%%"pulsar4s-zio"% pulsar4sVersion)

Contributions

Contributions to pulsar4s are always welcome. Good ways to contribute include:

  • Raising bugs and feature requests
  • Improving the performance of pulsar4s
  • Adding to the documentation

License

This software is licensed under the Apache 2 license, quoted below.Copyright 2017-2018 Stephen SamuelLicensed under the Apache License, Version 2.0 (the "License"); you may notuse this file except in compliance with the License. You may obtain a copy ofthe License at    http://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS, WITHOUTWARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See theLicense for the specific language governing permissions and limitations underthe License.

[8]ページ先頭

©2009-2025 Movatter.jp