Stream

Central Pravega abstraction, Stream must be explictly created, in a Scope.

In the example below we will create a "sales" scope and a stream "events" in this scope.

Create a stream.

Streams are created in a scope, and must be explicitly created.

val streamConfiguration = StreamConfiguration.builder
    .scalingPolicy(ScalingPolicy.fixed(3))
    .build

val aStream:  RIO[PravegaStreamManager, Boolean] =
    PravegaStreamManager.createStream("sales", "events", streamConfiguration)

When creating a stream, we need to provide a StreamConfiguration that defines the stream properties, in this case we are using a fixed scaling policy with 3 segments.

Scaling is one of the most important features of Pravega, it allows to dynamically adapt the stream to the load, by adding or removing segments.

Scaling is a complex topic, and we will cover it in a dedicated section.

Simply said a ScalingPolicy can be:

  • fixed: the number of segments is fixed, and the segments are evenly distributed across the nodes.
  • by rate: the number of segments is dynamically adjusted to the rate of events.
  • by size: the number of segments is dynamically adjusted to the size of the stream.

The aStream is a RIO[PravegaStreamManager,Boolean] that:

  • will produce true if the stream was created or false if it already existed.
  • depends on the PravegaStreamManager capability.

As before, we need to provide the capability, which is the role of ZLayer.

Reader groups

A Reader Group is a named collection of Readers, which together perform parallel reads from a given Stream.

It must created expliciyly.

val readerGroup: RIO[PravegaReaderGroupManager, Boolean] =
    PravegaReaderGroupManager.createReaderGroup(
      "stats-application",
      "sales", "cancellation"
    )

The readerGroup is a RIO[PravegaReaderGroupManager,Boolean] that:

  • will produce true if the reader group was created or false if it already existed.
  • depends on the PravegaReaderGroupManager capability.

As before, we need to provide the capability, which is the role of ZLayer:

val readerGroupManager: RLayer[Scope & ClientConfig, PravegaReaderGroupManager] =
    PravegaReaderGroupManager.live("sales")

Note that the PravegaReaderGroupManager capability depends on the Scope and ClientConfig capability, which are provided by another layer. Also that PravegaReaderGroupManager is parameterized by the scope name, which is "sales" in this case.

Stream manager allows to create, delete, list, seal, truncate streams.

Sealing a stream

Before deleting, truncatig a stream, it must be sealed.

val sealStream: RIO[PravegaStreamManager, Boolean] =
    PravegaStreamManager
      .sealStream("sales", "events")

Truncation

A Truncation is a mechanism to remove data from a Stream.

for {
          readerGroup <- PravegaReaderGroupManager.openReaderGroup("g1")
          streamCuts   = readerGroup.getStreamCuts()
          _ <- ZIO.foreach(streamCuts.asScala.toList) { case (stream, streamCut) =>
                 ZIO.logDebug(s"Stream: ${stream.getStreamName}, StreamCut: $streamCut") *>
                   PravegaStreamManager.truncateStream("sales", stream.getStreamName(), streamCut)
               }
  } yield ()