While exploring Microsoft Orleans for potential use in developing a CQRS & Event Sourced application, I was delighted to see Implicit Streams being supported as a method for propagating Events to Grains. Unfortunately, at the time of this writing, the documentation is a little lacking in how to properly setup Implicit Streaming. Fortunately, there are official samples that cover most use cases, including Implicit Streaming, but those require using Event Hubs, which is hardly a “download & run” scenario.

As such, I have provided a completely barebones sample to demonstrate how Implicit Streaming is setup.

https://github.com/jsedlak/orleans-samples/tree/main/Streaming/ImplicitStreams

The solution contains three projects: A Shared Library containing all the common code, an Orleans Client, and an Orleans Host. Here are the key points to know.

We create a few constants to ensure we’re always referencing the correct stream.

public static class Constants
{
    public const string StreamProvider = "StreamProvider";
    public const string StreamNamespace = "COUNTER";
}

In our Silo configuration, we’re going to add Memory Streams with our custom Provider Name, and inform the framework that this provider will be Implicit Only, allowing us to proceed without a PubSubStore.

var host = Host.CreateDefaultBuilder()
    .UseOrleans(silo =>
    {
        silo.UseLocalhostClustering()
            .AddMemoryGrainStorageAsDefault()
            .AddMemoryStreams(Constants.StreamProvider, configurator => configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly));
    })
    .ConfigureLogging(logging => logging.AddConsole())
    .UseConsoleLifetime()
    .Build();

Our Producer Grain is going to create the Stream and register a Grain Timer to introduce stream data every N=1 seconds. It is also worth noting the sample uses 8.2.0, which utilizes the new RegisterGrainTimer method. We expose this as a Grain Method in the IProducerGrain interface to ensure the Client can control when stream data starts producing.

public Task StartProducing()
{
    // Get the stream
    var streamId = StreamId.Create(Constants.StreamNamespace, this.GetGrainId().GetGuidKey());
    _stream = this
        .GetStreamProvider(Constants.StreamProvider)
        .GetStream<int>(streamId);

    // Register a timer that produce an event every second
    var period = TimeSpan.FromSeconds(1);
    _timer = this.RegisterGrainTimer<object>(TimerTick, new { }, period, period);

    _logger.LogInformation("I will produce a new event every {Period}", period);

    return Task.CompletedTask;
}

On every tick of the timer, we simply increment a count and send it to the Stream using OnNextAsync.

private async Task TimerTick(object _)
{
    var value = _counter++;
    
    _logger.LogInformation("Sending event {EventNumber}", value);
    
    if (_stream is not null)
    {
        await _stream.OnNextAsync(value);
    }
}

What is not clear in the documentation, is how a consuming Grain needs to subscribe to the Stream implicitly, other than the ImplicitStreamSubscription attribute. To do this, the Grain needs to implement two interfaces: IAsyncObserver<T> to observe incoming data, and IStreamSubscriptionObserver to handle subscriptions.

To handle incoming data, we implement the OnNextAsync method.

public Task OnNextAsync(int item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"[{nameof(ConsumerGrain)}] OnNextAsync: item: {item}, token = {token}");
    _readCount = item;
    return Task.CompletedTask;
}

And to handle subscribing, we implement the OnSubscribed method.

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<int>();
    await handle.ResumeAsync(this);
}

It is important to note that without calling ResumeAsync, the Stream will not see any subscribers and will drop the data “on the floor”.

If everything goes well, you should see log entries like the following:

info: ImplicitStreams.Shared.IProducerGrain[0]
      I will produce a new event every 00:00:01
info: ImplicitStreams.Shared.IProducerGrain[0]
      Sending event 0
info: ImplicitStreams.Shared.IConsumerGrain[0]
      [ConsumerGrain] OnNextAsync: item: 0, token = [EventSequenceToken: SeqNum=638581061883068071, EventIndex=0]
info: ImplicitStreams.Shared.IConsumerGrain[0]
      [SecondConsumerGrain] OnNextAsync: item: 0, token = [EventSequenceToken: SeqNum=638581061883068071, EventIndex=0]
info: ImplicitStreams.Shared.IProducerGrain[0]
      Sending event 1
info: ImplicitStreams.Shared.IConsumerGrain[0]
      [ConsumerGrain] OnNextAsync: item: 1, token = [EventSequenceToken: SeqNum=638581061883068072, EventIndex=0]
info: ImplicitStreams.Shared.IConsumerGrain[0]
      [SecondConsumerGrain] OnNextAsync: item: 1, token = [EventSequenceToken: SeqNum=638581061883068072, EventIndex=0]
info: ImplicitStreams.Shared.IProducerGrain[0]

Hopefully this has provided some clarity surrounding how to get started with Implicit Streaming in Microsoft Orleans!