SR Queue Consumer Pattern - Part 3
Abstraction of Message Types and Transformation
This post extends the queue consumer pattern discussed here and here to externalize the message types and the message processing. This will create a reusable library allowing consumers to focus on the transformation domain.
In the previous examples the Queue Consumer Library had compile time knowledge of the MessageIn and MessageOut types, additionally the ‘transformation’ from MessageIn to MessageOut was coded into the Queue Processor. While this works for a demonstration, it does not make the sytem very useful in practice.
In this update we abstract the MessageIn and MessageOut types as well as the Message Transformation Logic out of the Queue Processor and into the ‘domain’. This will give us a re-usable Smugglers Run Queue Processor library.
The first step is to update IProcessingContainer and the implemented Pod class:
- Templatize the MessageIn and Message out types.
- Pass in a Transformer at Pod creation time. A Note here on Pod reuse: I have not left out the possibility of passing the transformer factory (discussed later) all the way down to the Pod. There are some tradeoffs here, particularly if we want to use a transformer that calls out to an external resource.
public class Pod<TMessageIn, TMessageOut> : IProcessingContainer<TMessageIn, TMessageOut>
{
//.......
public Pod(int idx, Func<TMessageIn, TMessageOut> transformer)
{
//...
}
//.......
}
We also need to update our IProcessingSystem implementation to define Message Types. Additionally an ITransformerFactory has been added, this enables the conduit to create a transformer for each pod.
public class Conduit<TMessageIn, TMessageOut> : IProcessingSystem<TMessageIn, TMessageOut>
{
//...
public Conduit(
ILogger<Conduit<TMessageIn, TMessageOut>> logger,
IOptions<ConduitConfig> options,
ITransformerFactory<TMessageIn, TMessageOut> transformerFactory)
{
//...
}
//.......
}
ITransformerFactory is defined as follows.
public interface ITransformerFactory<TMessageIn, TMessageOut>
{
Func<TMessageIn, TMessageOut> GetTransformer();
}
Implementations
Common Concerns
MessageIn.cs and MessageOut.cs have been moved into a domain library so that they can be shared across examples.
A HostingExtensions library was added to refactor some commonality into all the host processses.
Self Contained
The ./src/console project is self contained and has been updated to support the new design. In addition, to demonstrate the extrnalization of the transformation process it includes two message transformer factories.
| Factory | Description |
|---|---|
| DefaultTransformerFactory | Default Transformation logic from Parts 1 and 2 externalized from the IProcessingContainer |
| LoggingTransformerFactory | A Transformer Factory with an ILogger<> Injected. |
As with previous versions the Readme contains details on running the project. Console Details includes all the information for running the various Transformer Factories.
Services
The ./src/Processor project has also been updated to support the new design. It also includes a DefaultTransformerFactory and an ExternalServiceTransformerFactory which makes a call to a url as part of the processing. Service Details includes detailed instructions for running.
Next Steps
There are still some improvements that can be made to the system
- Force MessageOut to Implement Clone()
- Pass an IProcessingContainer Factory into Conduit so the entire system can be dependency injected and the Pod to Conduit dependency be removed.
- Observability into the processing performance and health.
- Extending ExternalServiceTransformerFactory to demonstrate other IO operations.
Some Notes
MessageReadEventArgs sends out some information about the pod. I would probably eliminate the Idx at some point, it is mostly in for demonstration purposes. and there is still some ‘audit’ functionality for demonstration purposes.