Event Sources Module
Warning: Event sources are experimental and may change in future releases based on usage and feedback.
Acquaintance provides a mechanism to monitor a source of events and publish them to the bus at regular intervals. This mechanism is called “Event Sources”. An Event Source is a callback which is invoked at regular intervals on a dedicated thread. The event source callback will do work and will be able to publish messages to the bus as they are available.
Event Source Callbacks
There are two signatures for callbacks in an event source. The first takes a context object which can be used to control the operation of the event source. A context exposes the ability to publish an event to the bus, but does not expose all the methods of IMessageBus
. The context can also control when the next iteration happens and when the event source is complete.
var token = messageBus.RunEventSource(context => {
// Publish as many messages per iteration as you need
context.Publish<MyEvent>("topic", new MyEvent { ... });
// Specify a time delay before the next iteration (defaults to 1s)
context.IterationDelayMs = 5000;
// Mark the event source complete so that it stops iterating
context.Complete();
});
The second signature also takes a CancellationToken
, which you can use to abort the event source early:
var token = messageBus.RunEventSource((context, cancellationToken) => {
// check the token to break out of a long-running operation:
while(!cancellationToken.IsCancellationRequested)
{
...
}
});
Event sources with the context are conceptually similar to the IEnumerable
/IEnumerator
interfaces in core .NET. context.Publish
is analogous to the yield return
construct in an enumerator, and context.Complete()
is analogous to the yield break
construct. The primary difference is that enumerators tend to operate on a pull model while Event Sources operate on a poll/push model instead.
IEventSource
You can create an object to serve as an event source, and that object will be kept alive by the system so that it can hold state between calls:
public class MyEventSource : IEventSource
{
public void CheckForEvents(
IEventSourceContext context,
CancellationToken cancellationToken)
{
...
}
}
You can register your source object with the system:
var token = messageBus.RunEventSource(new MyEventSource());
Use Cases
Bulk DB Processing
I have an application which needs to read records from a DB and distribute those records to worker threads for fast bulk processing.
// Setup worker threads
messageBus.Subscribe<MyDbRecord>(b => b
.WithTopc("Process")
.Invoke(r => Process(r))
.OnThreadPool());
// Setup the event souce
messageBus.RunEventSource((c, t) => {
while (!t.IsCancellationRequested) {
var record = dataSource.GetNextRecord();
if (record == null)
{
c.Complete();
return;
}
messageBus.Publish("Process", record);
}
});
Polling a Webservice
I have a remote webservice whose state needs to be monitored. If the webservice responds to a ping, we can publish events that the webservice is healthy. If the webservice does not respond, we can publish an event that the service is unhealthy. I want to ping the webservice every minute. This ping process should continue indefinitely.
messageBus.RunEventSource(c => {
c.IterationDelayMs = 60000; // 60 secs/min * 1000 ms/sec = 60000 ms/min
var result = myWebServiceGateway.SendPing();
if (result != null)
c.Publish(new MyWebserviceHealthMessage(isHealthy: true));
else
c.Publish(new MyWebserviceHealthMessage(isHealthy: false));
});