The journeylism of @yreynhout

On CQRS, DDD(D), ES, …

Category Archives: Messaging

Object Inheritance

When mentioning “object inheritance” most people immediately think of “class inheritance“. Alas, that’s not what it is. Quoting from Streamlined Object Modeling*:

Object inheritance allows two objects representing a single entity to be treated as one object by the rest of the system.

Put a different way, where class inheritance allows for code reuse, object inheritance allows for data reuse.

Application

Where could this be useful? Depending on the domain you are in, I’d say quite a few places. Whenever you replicate data from one object to another object, you should stop and consider if “object inheritance” could be applicable. While object inheritance does impose some constraints, it also saves you from writing reconciliation code to synchronize two or more objects (especially in systems that heavily rely on messaging).
A common example I like to use is the one of a video title and a video tape (**). From a real world point of view, a video tape has both the attributes of the tape itself and the title. Yet, if I modeled this as two separate objects, I run into trouble (a.k.a. synchronization woes). If I copy the title information to the tape upon creation of the tape, and I made an error in say the release date of the title, I now have to replicate that information to the “affected” tapes. Don’t get me wrong, sometimes this kind of behavior is desirable, i.e. it makes sense from a business perspective. But what if it doesn’t? That’s where “object inheritance” comes in. To a large extent, it can cover the scenarios that a synchronization based solution can, IF constraints are met.

Constraints

  • Localized data: “Object inheritance” assumes that the data of both objects lives close together. That might be a deal breaker for larger systems, or at least an indication that you’d have to consider co-locating the data of “both” objects.
  • Immutable data: One of the objects, the “parent”, its data is immutable during object inheritance. From an OOP perspective that means you can only invoke side-effect free functions on a “parent”.
  • “Parent” object responsibilities: the parent object contains information and behaviors that are valid across multiple contexts, multiple interactions, and multiple variations of an object.
  • “Child” object responsibilities: the child object represents the parent in a specialized context, in a particular interaction, or as a distinct variation.

The aforementioned book also states other constraints such as the child object exhibiting the parent object’s “profile” (think interface), but I find those less desirable in an environment that uses CQRS. For more in-depth information, I strongly suggest you read it. It has a wealth of information on this very topic.

Code

In its simplest form “object inheritance” looks and feels like object composition.

The composition can be hidden from child object consuming code behind a repository’s facade as shown below.

The gist of “object inheritance” is that the child object (the video tape) asks the parent (the video) for data or invokes side-effect free functions on the parent to accomplish its task.

Lunch -> Not Free

Whether you go down the route of “object inheritance” or (message based) “synchronization”, you will have to think about object life-cycles (both parent and child). Sometimes it’s desirable for the child to have a still view of the parent, a snapshot if you will. Other times you may want the child to always see the parent in its most current form. Children can even change parent during their life-cycle. Other children may want a more “controlled” view of the parent, rolling forward or backward based on their needs or context. You can get pretty sophisticated with this technique, especially in an event sourced domain model since it’s very well suited to roll up to a certain point in time or a certain revision of a parent. In the same breath, I should also say that you can very well shoot yourself in the foot with this technique, especially if the composition is to be taking place in the user interface, in which case there’s no point in using this. It’s also very easy to bury yourself in the pit of abstraction when talking about “child” and “parent”, so do replace those words with nouns from your domain, and get your archetypes right.

All in all, I’ve found this a good tool in the box, that plays nicely with aggregates, event-sourcing, even state-based models that track the concept of time. It’s not something I use everyday, but whenever I did, I ended up with less code. YMMV.

(*) Streamlined object modeling is a successor to Peter Coad’s “Modeling In Color“. A Kindle version of the former is available.
(**) I’m from the “cassette & video” ages.

The Money Box

Once in a while I hear/read people struggle with projection performance. The root causes of their performance issues are diverse:

  • using the same persistence behavior during projection rebuild as during live/production build
  • use of ill fit technology (here’s looking at you EF) or usage of said technology in the wrong way
  • too wide a persistence interface which makes it difficult to optimize for performance
  • no batching support or batching as an afterthought
  • not thinking about the implication of performing reads during projection

Ever heard of the book Refactoring to Patterns? It has a nice refactoring in there, called Move Accumulation to Collecting Parameter that refers to the Collecting Parameter pattern on C2. How would this help with thy projections? Well, what if you could decouple the act of performing an action from collecting what is required to be able to perform an action? Put another way, what if you decouple the act of executing SQL DML statements from collecting those statements during projection (a.k.a. event handling)? So, instead of …

… we add another level of indirection …

The most noticeable differences are the decoupling from persistence technology(*), no reads, and no promises with regard to when the requested operations will be executed/flushed to storage. Usually, the IProjectionSqlOperations interface will have a very small surface (i.e. low member count), covering INSERT, UPDATE and DELETE. During live/production projection building you could have an implementation of this interface (a.k.a. strategy) that flushes as soon as an operation is requested.

However, the more interesting implementations are the ones that are used during rebuild.

This implementation translates the requested operations into sql statement objects (abstracted by ISqlStatement) and pushes them onto something that observes these sql statements. The observer couldn’t care less what the actual sql statements are (that happened in the projection handler above). The simplest observer implementation could look something like this …

Of course, collecting in and by itself is not all that useful. You have to do something with what you’ve collected (“the money in the box”). Let’s look at another observer that takes a slightly different approach.

Without diving too much into the details, this observer flushes statements to the database as soon as a hard-coded threshold is reached. It does so in a batch-like fashion to minimize the number of roundtrips, but still adhering to the limitations that come with this particular ADO.NET data provider. Other implementations use SqlBulkCopy to maximize performance (but come with their own limitations). Depending on what memory resources a server has you could get pretty creative as to which strategy you choose to rebuild a large projection.

Conclusion

I’ve shown you SQL centric projections, but please, do step out of the box. Nothing is stopping you from producing and collecting “HttpStatements” for your favorite key-value store or “FileOperations” for your file-based projections. Nothing is stopping you from making different choices for the producing and consuming side. Nothing is stopping you from doing the “statement” execution in an asynchronous and/or parallel fashion. It’s just a matter of exploring your options and use what works in your environment. Next time I’ll show you how reading fits into all this …

(*): Yeah, yeah, I know, too much abstraction kills kittens …

Using Windows Azure to build a simple EventStore

Continuing my journey, I figured it shouldn’t be that hard to apply “Your EventStream is a linked list” using Windows Azure Storage Services.

Disclaimer: Again, this is just a proof of concept, a way to get started, not production code … obviously. I’m assuming you know your way around Windows Azure (how to set up an account, stuff like that) and how the .NET SDK works.

Just like with Amazon Web Services, the first thing to create are the containers that will harbor the changesets and aggregate heads. Unlike the AWS example, I’ve chosen to store both using the same service, Windows Azure Blob Storage. Why? Because it offers optimistic concurrency by leveraging ETags. The only unfortunate side-effect is that it seems to force me to break CQS, but I can live with that for now. The Windows Azure Blob Storage provides convenience methods to only create the containers when they don’t exist.

//Setting up the Windows Azure Blob Storage container to store changesets and aggregate heads

const string PrimaryAccessKey = "your-key-here-for-testing-purposes-only-ofcourse-;-)";
const string AccountName = "youraccountname";
const string ChangesetContainerName = "changesets";
const string AggregateContainerName = "aggregates";

var storageAccount =
  new CloudStorageAccount(
      new StorageCredentialsAccountAndKey(
            AccountName,
            PrimaryAccessKey),
      false);

var blobClient = storageAccount.CreateCloudBlobClient();

var changesetContainer = blobClient.GetContainerReference(ChangesetContainerName);
changesetContainer.CreateIfNotExist();

var aggregateContainer = blobClient.GetContainerReference(AggregateContainerName);
aggregateContainer.CreateIfNotExist();

Now that we’ve set up the infrastructure, let’s tackle the scenario of storing a changeset. The flow is pretty simple. First we try to store the changeset in the changeset container as a blob.

public class ChangesetDocument {
      public const long InitialVersion = 0;

      //Exposing internals to make the sample easier
      public Guid ChangesetId { get; set; }
      public Guid? ParentChangesetId { get; set; }
      public Guid AggregateId { get; set; }
      public long AggregateVersion { get; set; }
      public string AggregateETag { get; set; }

      public byte[] Content { get; set; }
}

//Assuming there's a changeset document we want to store,
//going by the variable name 'document'.

var changesetBlob = changesetContainer.GetBlobReference(document.ChangesetId.ToString());
var changesetUploadOptions = new BlobRequestOptions {
      AccessCondition = AccessCondition.None,
      BlobListingDetails = BlobListingDetails.None,
      CopySourceAccessCondition = AccessCondition.None,
      DeleteSnapshotsOption = DeleteSnapshotsOption.None,
      RetryPolicy = RetryPolicies.NoRetry(),
      Timeout = TimeSpan.FromSeconds(90),
      UseFlatBlobListing = false
};
changesetBlob.UploadByteArray(document.Content, changesetUploadOptions);

const string AggregateIdMetaName = "aggregateid";
const string AggregateVersionMetaName = "aggregateversion";
const string ChangesetIdMetaName = "changesetid";
const string ParentChangesetIdMetaName = "parentchangesetid";

//Set the meta-data of the changeset
//Notice how this doesn't need to be transactional
changesetBlob.Metadata[AggregateIdMetaName] = document.AggregateId.ToString();
changesetBlob.Metadata[AggregateVersionMetaName] = document.AggregateVersion.ToString();
changesetBlob.Metadata[ChangesetIdMetaName] = document.ChangesetId.ToString();
if(document.ParentChangesetId.HasValue)
      changesetBlob.Metadata[ParentChangesetIdMetaName] = document.ParentChangesetId.Value.ToString();
changesetBlob.SetMetadata();

If that goes well, we try to upsert the head of the aggregate to get it to point to this changeset. Below, we’re using the ETag of the aggregate head blob as a way of doing optimistic concurrency checking. It caters for both the initial (competing inserters) and update (competing updaters) concurrency.

public static class ExtensionsForChangeDocument {
      public static AccessCondition ToAccessCondition(this ChangesetDocument document) {
            if(document.AggregateVersion == ChangesetDocument.InitialVersion) {
                  return AccessCondition.IfNoneMatch("*");
            }
            return AccessCondition.IfMatch(document.AggregateETag);
      }
}

//Upsert the aggregate
var aggregateBlob = aggregateContainer.GetBlobReference(document.AggregateId.ToString());
var aggregateUploadOptions = new BlobRequestOptions {
      AccessCondition = document.ToAccessCondition(),
      BlobListingDetails = BlobListingDetails.None,
      CopySourceAccessCondition = AccessCondition.None,
      DeleteSnapshotsOption = DeleteSnapshotsOption.None,
      RetryPolicy = RetryPolicies.NoRetry(),
      Timeout = TimeSpan.FromSeconds(90),
      UseFlatBlobListing = false
};
aggregateBlob.UploadByteArray(document.ChangesetId.ToByteArray(), aggregateUploadOptions);
//Here's where we are breaking CQS if we'd like to cache the aggregate.
//This won't be a problem if we're re-reading the aggregate upon each behavior.
var eTag = aggregateBlob.Properties.ETag;

If the UploadByteArray operation throws a StorageClientException indicating that “The condition specified using HTTP conditional header(s) is not met.”, we know there was some form of optimistic concurrency. In such a case, it’s best to repeat the entire operation.
Now that we’ve dealt with writing, let’s take a look at reading. First we need to fetch the pointer to the last stored and approved changeset identifier.

var aggregateBlob = aggregateContainer.GetBlobReference(aggregateId.ToString());
var aggregateDownloadOptions = new BlobRequestOptions {
      AccessCondition = AccessCondition.None,
      BlobListingDetails = BlobListingDetails.None,
      CopySourceAccessCondition = AccessCondition.None,
      DeleteSnapshotsOption = DeleteSnapshotsOption.None,
      RetryPolicy = RetryPolicies.NoRetry(),
      Timeout = TimeSpan.FromSeconds(90),
      UseFlatBlobListing = false
};
var changesetId = new Guid?(new Guid(aggregateBlob.DownloadByteArray(aggregateDownloadOptions)));
var eTag = aggregateBlob.Properties.ETag;

Now that we’ve bootstrapped the reading process, we keep reading each changeset, until there’s no more changeset to read. Each approved changeset contains metadata that points to the previous approved changeset. It’s the responsibility of the calling code todo something useful with the read changesets (e.g. deserialize the content and replay each embedded event into the corresponding aggregate).

while(changesetId.HasValue) {
      var changesetBlob = changesetContainer.GetBlobReference(changesetId.Value.ToString());
      var changesetDownloadOptions = new BlobRequestOptions {
            AccessCondition = AccessCondition.None,
            BlobListingDetails = BlobListingDetails.None,
            CopySourceAccessCondition = AccessCondition.None,
            DeleteSnapshotsOption = DeleteSnapshotsOption.None,
            RetryPolicy = RetryPolicies.NoRetry(),
            Timeout = TimeSpan.FromSeconds(90),
            UseFlatBlobListing = false
      };
      var content = changesetBlob.DownloadByteArray(changesetDownloadOptions);
      changesetBlob.FetchAttributes();
      var document = new ChangesetDocument {
            AggregateETag = eTag,
            AggregateId = new Guid(changesetBlob.Metadata[AggregateIdMetaName]),
            AggregateVersion = Convert.ToInt64(changesetBlob.Metadata[AggregateVersionMetaName]),
            ChangesetId = new Guid(changesetBlob.Metadata[ChangesetIdMetaName]),
            Content = content,
      };
      if (changesetBlob.Metadata[ParentChangesetIdMetaName] != null)
            document.ParentChangesetId = new Guid(changesetBlob.Metadata[ParentChangesetIdMetaName]);

      yield return document;

      changesetId = document.ParentChangesetId;
}

The only *weird* thing with this code is that I’m propagating the aggregate’s head ETag using the changesets. It’s a modeling issue I’ll have to revisit ;-) .
But that’s basically all there’s to it. In reality, you’ll need a lot more metadata and error handling to make this a success. I should point out that the performance of this service consumed on premise was better than what I experienced with AWS.

Conclusion

Using Windows Azure Storage Services is not so different from the Amazon Web Services in this case. However, this overall technique suffers from a few drawbacks. As mentioned before, upon concurrency, you might be wasting some storage space. Another drawback is the fact that you need to read all changesets that make up the history of an aggregate (or eventsource if want to decouple it from DDD terminology) before being able to apply the first event. There’s ways around this, such as storing all the changeset identifiers in the aggregate head if the total number of behaviors in an aggregate is low on average. You could even partition the aggregate head into multiple documents using nothing but its version number or count of applied behaviors to partition, but that’s the subject of a future exploration. I apologize if this post is a somewhat copy-paste of its AWS counterpart, but given the goal and similarities that was to be expected :-) .

Using Amazon Web Services to build a simple Event Store

My post on “Your EventStream is a linked list” might have been somewhat abstract (a more prosaic version can be found here). By way of testing my theory, I set out to apply it using Amazon’s Web Services (AWS) stack. I used AWS DynamoDB as the transactional medium that would handle the optimistic concurrency, and AWS S3 to store the changesets.

Disclaimer: This is just a proof of concept, a way to get started. It’s not production code … obviously :-) I’m assuming you know your way around AWS (how to set up an account, stuff like that) and how the .NET SDK works.

The first thing todo is to create the container for the changesets. In AWS S3 those things are called buckets. This is a one time operation. You might want to make this conditional at the startup of your application/worker role/what-have-you. The AWS S3 API provides mechanisms to query for buckets, so you should be able to pull that off. On the other hand, when creating buckets using the REST API the HTTP PUT verb is used, and we all know that PUT is supposed to be idempotent. You might want to read up on the PUT Bucket API. One last thing: bucket names are unique across all of S3.

//Setting up the Amazon S3 bucket to store changesets in
const string ChangesetBucketName = "yourorganization_yourboundedcontextname_changesets";

var s3Client = AWSClientFactory.CreateAmazonS3Client();

s3Client.PutBucket(
  new PutBucketRequest().
    WithBucketName(ChangesetBucketName).
    WithBucketRegion(S3Region.EU)); //Your region might vary

The same thing needs to happen in AWS DynamoDB. There, the containers are called tables. Again, this is a one time, conditional operation. You might wanna read up on the specifics of table naming and uniqueness. The throughput provisioning is not something you want to be static. Monitor the load on your system (they have notification events/alerts for that), both in terms of number of reads/writes and bytes used for storage, and use that information to tune the table throughput settings.

//Setting up the Amazon DynamoDB table to store aggregates in
const string AggregateTableName = "yourboundedcontextname_aggregates";
const string AggregateIdName = "aggregate-id";

//At the time of writing, DynamoDBClient didn't make it yet
//into the AWSClientFactory.
var dynamoDbClient = new AmazonDynamoDBClient();

dynamoDbClient.CreateTable(
  new CreateTableRequest().
    WithTableName(AggregateTableName).
    WithKeySchema(
      new KeySchema().
        WithHashKeyElement(
          new KeySchemaElement().
            WithAttributeName(AggregateIdName).
            WithAttributeType("S")
        ).
        WithRangeKeyElement(null)
    ).
    WithProvisionedThroughput(
      new ProvisionedThroughput().
        WithReadCapacityUnits(300).
        WithWriteCapacityUnits(500)
    )
  );

Now that we’ve set up the infrastructure, let’s tackle the scenario of storing a changeset. The flow is pretty simple. First we try to store the changeset in AWS S3 as an object in the configured bucket.

//An internal representation of the changeset
public class ChangesetDocument {
  public const long InitialValue = 0;

  //Exposing internal state here to
  //simplify the example.
  public Guid AggregateId { get; set; }
  public long AggregateVersion { get; set; }
  public Guid ChangesetId { get; set; }
  public Guid? ParentChangesetId { get; set; }

  public byte[] Content { get; set; }

  public Stream GetContentStream() {
    return new MemoryStream(Content, writable: false);
  }
}

//Assuming there's a changeset document we want to store,
//going by the variable name 'document'.

const string AggregateIdMetaName = "x-amz-meta-aggregate-id";
const string AggregateVersionMetaName = "x-amz-meta-aggregate-version";
const string ChangesetIdMetaName = "x-amz-meta-changeset-id";
const string ParentChangesetIdMetaName = "x-amz-meta-parent-changeset-id";

var putObjectRequest = new PutObjectRequest().
  WithBucketName(ChangesetBucketName).
  WithGenerateChecksum(true).
  WithKey(document.ChangesetId.ToString()).
  WithMetaData(ChangesetIdMetaName, document.ChangesetId.ToString()).
  WithMetaData(AggregateIdMetaName, document.AggregateId.ToString()).
  WithMetaData(AggregateVersionMetaName, Convert.ToString(document.AggregateVersion));

if (document.ParentChangesetId.HasValue) {
  putObjectRequest.WithMetaData(ParentChangesetIdMetaName, document.ParentChangesetId.Value.ToString());
}

putObjectRequest.WithInputStream(document.GetContentStream());

s3Client.PutObject(putObjectRequest);

If that goes well, we try to create an item in the configured AWS DynamoDB table. The expected values below are the AWS DynamoDB way of doing optimistic concurrency checking. They cater for both the initial (competing inserters) and update (competing updaters) concurrency.

In this example I’m using an incrementing version number to do that. Strictly speaking, I could be using the changeset identifier for that, but an incrementing version number is easier to relate to when you come from an ORM/database background.

const string AggregateVersionName = "aggregate-version";
const string ChangesetIdName = "changeset-id";

public static class ExtensionsForChangesetDocument {
  public static KeyValuePair<string, ExpectedAttributeValue>[] ToExpectedValues(this ChangesetDocument document) {
    var dictionary = new Dictionary<string, ExpectedAttributeValue>();
    if(document.AggregateVersion == ChangesetDocument.InitialValue) {
      //Make sure we're the first to create the aggregate
      dictionary.Add(AggregateIdName,
        new ExpectedAttributeValue().
          WithExists(false)
      );
    } else {
      //Make sure nobody changed the aggregate behind our back 
      dictionary.Add(AggregateIdName,
        new ExpectedAttributeValue().
          WithExists(true).
          WithValue(
            new AttributeValue().
              WithS(document.AggregateId.ToString()))
      );
      dictionary.Add(AggregateVersionName,
        new ExpectedAttributeValue().
          WithValue(
            new AttributeValue().
              WithN(Convert.ToString(document.AggregateVersion - 1)))
      );
    }
    return dictionary.ToArray();
  }

  public static KeyValuePair<string, AttributeValue>[] ToItemValues(this ChangesetDocument document) {
    //The relevant values to store in the item.
    var dictionary = new Dictionary<string, AttributeValue> {
      {AggregateIdName, new AttributeValue().WithS(document.AggregateId.ToString())},
      {AggregateVersionName, new AttributeValue().WithN(Convert.ToString(document.AggregateVersion))},
      {ChangesetIdName, new AttributeValue().WithS(document.ChangesetId.ToString())},
    };
    return dictionary.ToArray();
  }
}

dynamoDbClient.PutItem(
  new PutItemRequest().
    WithTableName(AggregateTableName).
    WithExpected(document.ToExpectedValues()).
    WithItem(document.ToItemValues())
  );

If the PutItem operation throws an exception indicating that the “ConditionalCheckFailed”, we know there was some form of optimistic concurrency. In such a case, it’s best to repeat the entire operation.

I’ve left duplicate command processing elimination as an exercise for the reader ;-)

The only thing left todo is showing the reverse operation, reading. First we need to fetch the pointer to the last stored and approved changeset identifier. I hope you do realize that a consistent read is not strictly necessary, since the likelihood of concurrency should be low.

var getItemResponse = dynamoDbClient.GetItem(
  new GetItemRequest().
    WithTableName(AggregateTableName).
    WithKey(
      new Key().
        WithHashKeyElement(
          new AttributeValue().
            WithS(aggregateId.ToString())).
        WithRangeKeyElement(null)).
    WithConsistentRead(false).
    WithAttributesToGet(ChangesetIdName)
  );

Now that we’ve bootstrapped the reading process, we keep reading each changeset, until there’s no more changeset to read. Each approved changeset contains metadata that points to the previous approved changeset. It’s the responsibility of the calling code todo something useful with the read changesets (e.g. deserialize the content and replay each embedded event into the corresponding aggregate).

var changesetId = new Guid?(new Guid(getItemResponse.GetItemResult.Item[ChangesetIdName].S));
while(changesetId.HasValue) {
  var getObjectResponse = s3Client.GetObject(
    new GetObjectRequest().
      WithBucketName(ChangesetBucketName).
      WithKey(changesetId.Value.ToString()));
  var document = new ChangesetDocument {
    AggregateId = new Guid(getObjectResponse.Metadata[AggregateIdMetaName]),
    AggregateVersion = Convert.ToInt64(getObjectResponse.Metadata[AggregateVersionMetaName]),
    ChangesetId = new Guid(getObjectResponse.Metadata[ChangesetIdMetaName]),
    Content = getObjectResponse.ResponseStream.ToByteArray()
  };
  var parentChangesetIdAsString = getObjectResponse.Metadata[ParentChangesetIdMetaName];
  if(parentChangesetIdAsString != null) {
    document.ParentChangesetId = new Guid(parentChangesetIdAsString);
  }

  changesetId = document.ParentChangesetId;

  yield return document;
}

And that’s basically all there’s to it. In reality, you’ll need a lot more metadata and error handling to make this a success. I should point out that the performance of these services consumed on premise was abominal. I’m assuming (hoping) that if you consume them from an AWS EC2 instance, performance will be much better.

Conclusion

A clever reader will notice that this technique can easily be transposed to other document/data stores. The specifics will vary, but the theme will be the same. The reason for applying this technique has mainly todo with the inherent constraints of certain datastores. The transactional ones – especially in cloudy environments – are limited with regard to the number of bytes they can store, which makes them less desirable for storing payloads in (prediction of payload size can be hard at times). There are easy ways to side-step concurrency problems such as serializing all aggregate access to one worker/queue, but that’s another discussion.

Your EventStream is a linked list

A week ago I had an interesting twonversation with Jérémie Chassaing and Rinat Abdullin about event streams. I mentioned how I had been toying with eventstreams as being linked lists of changesets (to the aficionados of Jonathan Oliver’s EventStore, this is very much akin to what he calls Commits) in the past. As a way of documenting some of my thoughts on the subject I’m putting up some schematics here.

Model of an event stream

Model

Fom the above picture you can deduce that an event stream is really a simple thing: a collection of changesets. A changeset itself is a collection of events that occurred (how you got there is not important at this point). Event streams and changesets both have a form of unique identity. Head marks the identity of the latest known changeset. A changeset is immutable. For the sake of simplicity I’ve omitted any form of headers/properties you can attach to an event stream, a changeset and/or an event.

Changesets as a LinkedList

EventStream - Model

Each changeset knows its “parent”, i.e. the changeset that it should be appended to. Except for the very first changeset, which obviously does not have a “parent” (strictly speaking you could have an explicit terminator “parent”). Chronologically, changeset 1 came before changeset 2, changeset 2 came before changeset 3, and so on.

Looking at the write side of a system that uses event streams for storage, there are two main scenarios:

  1. Writing a changeset of a given event stream: concerns here are duplicate changeset elimination & detecting conflicts, besides the act of actually writing.
  2. Reading the entire event stream: the main concern here is reading all changesets of the event stream as fast as we can, in order.

I’m well aware I’ve omitted other concerns such as automatic event upgrading, event dispatching, snapshotting which, frankly, are distractions at this point.

Reading

Changesets As Files

Supposing that each changeset is say a file on disk, how would I know where to start reading? Various options, really. The picture above illustrates one option where – by using “<streamid>.txt” as a convention – the Stream Head File is loaded to bootstrap “walking the chain”, by virtue of having it point to the latest changeset document (represented as LatestChangesetRef) that makes up that stream. As each Changeset File is read, it provides a reference/pointer to the next Changeset File to read (represented by ParentRef). That reference is really the identity of the next changeset.

I hope I don’t need to explain why you need to keep those identifiers logical. Don’t make it a “physical” thing, like the path to the next changeset file. That would be really painful if you were ever to restructure the changeset files on disk. Instead you should delegate the responsibility of translating/resolving a logical identity into its physical location.

Other options for “where to start reading” could be:

  1. keeping the head of each stream in memory (causing “sticky” streams and dealing with recovery mechanisms).
  2. storing the head as a record in a database or blob storage with concurrency control

Now, reading each changeset file could become a bit costly if they’re scattered allover the disk. There’s nothing stopping you from throwing all those changeset documents in one big file, even asynchronously. This is where immutability and resolving identities can really help you out. It’s important to distinguish between what happens at the logical level and what happens at the physical level.

Alternative Changesets As Files

Yet another approach might be to keep an index file of all changesets (above represented by the Stream Index File) that make up the event stream (in an append only fashion), thus alleviating the changeset documents from having to know their parents.

Writing

Basically, this operation can be split up into writing the changeset document and updating the head (or index) of the event stream. The advantage here is that storing the changeset document does not require any form of transaction. This allows you to choose from a broader range of data-stores as there really isn’t a requirement beyond the guarantee that they will remember and serve what you asked them to store. Updating the head of the event stream does require you to at least be able to detect concurrent writes are happening or have happened, depending on how you want to resolve conflicts. As such, there’s no need to store both of them in the same data-store. Also notice that the duration of the transaction is reduced by taking the changeset itself out of the equation.

When picking a changeset identity, you might be tempted to reuse the identifier of the message that caused the changes to happen (usually the Command’s message identifier). Don’t. Remember, retrying that same command might produce a different set of changes. How are you going to differentiate between rectifying a previous failure with a retry and some other thread trying to process the same message? It’s best to use identities/identifiers for just one purpose.

Model - Failure

Model - Failure

What happens when you can’t update the head to your newly inserted changeset identity? You’ll be left with a dangling changeset that didn’t make it into “the circle of trust”. No harm done, except for wasting some storage. If the changesets were blobs in the cloud it might be useful to have a special purpose daemon to hunt these down and remove them (depends on how much storage costs vs the cost of building the daemon) . In general you should optimize for having as few “concurrency conflicts” as possible (it’s bad for business).

Conclusion

I know there are a lot of holes in this story. That’s intentional, I have a lot of unanswered questions myself. I’m especially interested in any comments that can point me to prior art outside the realm of sourcecontrol systems. I have no intention of using this in production. It’s just a mental exercise.

Acknowledgements

Most of what I mention here exists in one shape or another. By now some of you will have noticed the resemblance to certain aspects of Git’s internal object model, although I’m inclined to say it’s closer to what Mercurial does. The convention based file storage, can be found in the NoDB implementation by Chris Nicola. Concepts such as commits and event streams (and much more) can be found in the EventStore project.

Message contract DSL

A long time ago, I read this post by Rinat Abdullin on teaching Microsoft Visual Studio your own language. Unfortunately, at the time I was struggling with more basic CQRS concepts and so it got somewhat pushed to the background. Today, however, after authoring the gazillionth command and event by hand, I knew something had to change. This was just too tedious, manual labor.
Granted, it only takes 5 to 10 minutes at first to define a command or event. But as you progress, you start refactoring towards composable types, creating builders for those types, and before you know it, you spent 45 minutes on datastructures. Not my idea of productivity.
As I reread Rinat’s article, I knew I had reached that point where automation needs to kick in (it just took me a little longer). I wasn’t particularly keen on using ANTLR and an AST to define my grammar, so I started looking for alternatives. The usual suspects passed by: xml (too verbose), json (better, but still too verbose), yaml (dismissed because of lack of familiarity – your mileage may vary). I continued hacking in notepad until I hit something that looked pretty much like the example below.

[MakeDogBarkCommand]
DogId=Guid
Times=Int32

[DogBarkedEvent]
Id=Guid
Version=In64
NameOfDog=String (Called)
Times=Int32 (ForNumberOfTimes)

If you’ve been developing on Microsoft Windows systems for some time, this format should look familiar. Yeah, it’s good old INI file format with conventions sprinkled all over it. Section names denote types such as events, commands, datastructures, enumerations. Key names denote properties of the type, while the values denote each property’s datatype. There’s no translation here, those are just native .NET types or your own (a big plus compared to all the type conversion between xml/json and the .NET type system). Conventions applied include: anything that ends with ‘Event’ is an event, anything that ends with ‘Command’ is a command, anything that ends with ‘Fragment’ is a fragment to be inserted at the location of reference, anything that has a number for each key value is an enumeration, anything else is just a datastructure, values with a ‘type (name)’ signature denote a property type and a custom name to be used as builder method name.

If you’re using your events and commands directly as interop datastructures, this approach might not work for you.

The beauty of this technique lies in the fact that you can define a type and reuse it pretty much everywhere, thus composing commands and events from existing types. It doesn’t take a genius to figure out that you can go from the above message contract definition to the code below.

[Serializable] [ProtoContract]
public class MakeDogBarkCommand : ICommand {
  [ProtoMember(1)]
  public Guid DogId { get; private set; }
  [ProtoMember(2)]
  public Int32 Times { get; private set; }

  MakeDogBarkCommand() {}
  public MakeDogBarkCommand(Guid dogId, Int32 times) {
    DogId = dogId;
    Times = times;
  }
}

[Serializable] [ProtoContract]
public class DogBarkedEvent : IEvent {
  [ProtoMember(1)]
  public Guid Id { get; private set; }
  [ProtoMember(2)]
  public Int64 Version { get; private set; }
  [ProtoMember(3)]
  public String NameOfDog { get; private set; }
  [ProtoMember(4)]
  public Int32 Times { get; private set; }

  DogBarkedEvent() {}
  public DogBarkedEvent(Guid id, Int64 version, String nameOfDog, Int32 times) {
    Id = id;
    Version = version;
    NameOfDog = nameOfDog;
    Times =   times;
  }
}

public class DogBarkedEventBuilder : IEventBuilder<DogBarkedEvent> {
  String _nameOfDog;
  Int32 _times;

  public DogBarkedEventBuilder Called(String value) {
    _nameOfDog = value;
    return this;
  }

  public DogBarkedEventBuilder ForNumberOfTimes(Int32 value) {
    _times = value;
    return this;
  }

  public DogBarkedEvent Build(Guid id, Int64 version) {
    return new DogBarkedEvent(id, version, _nameOfDog, _times);
  }
}

Below is a more advanced example demonstrating the use of arrays, enumerations and composing a type from fragments, other custom and native types.

[AddProductSetToCartCommand]
CartId=Guid
ProductSet=ProductSet

[ProductSet]
Id=Guid
Name=String
Items=ProductSetItem[]

[ProductIdNameCategoryFragment]
ProductId=Guid
ProductName=String
ProductCategory=ProductCategory

[ProductSetItem]
Id=Int32
Product=ProductIdNameCategoryFragment
Amount=Int32

[ProductCategory]
Toys=1
ComputerGames=2
DVDs=3
CDs=4

As a side-note, to make the transition to this DSL a little easier, I threw together a bit of reflection-based code over my existing events/commands to spit out a file in the INI format.

The thing that stood out the most for me is the terseness with which a message contract can be authored. Combined with conventions and old-fashioned code generation this can be a real time-saver.

Message interaction diagrams

The other day, while commenting on my EventBuilders post, Thorsten Krüger questioned how one would keep an overview of all the messages (commands, events) floating around the system in a meaningful way.

Anybody who has built larger messaging systems (or parts thereof), or used messaging to integrate with other systems, will acknowledge that some form of contract/protocol negotiation has taken place, such that all parties can communicate with one another. At times, on the surface of it, it might look like nothing more than a form of data-exchange. But if you question why that data goes to and fro, usually there’ll be some greater goal, a business function that data supports. Other times it might really be behavior going over the wire, in the form of serialized messages that want your end or their end to mutate state and reply or publish upon completion (success/failure).

Nevertheless, as the title of this post suggests, visual documentation is what will keep the bigger picture clear. At a high level, visual documentation of communication between bounded contexts will function somewhat like a context map, but it’s more focused on the command/event interactions supporting a specific use case/user story/business function. Mind you that this way of documenting your system’s interactions is not limited to communication between bounded contexts. It could as well be breaking down a process that affects multiple aggregates, tied together by a saga or several eventhandlers triggering one command after another.
The most useful tool I’ve found to document the flow of messages is by far interaction diagrams, and more specifically sequence diagrams. Below you’ll find an example using the excellent http://www.websequencediagrams.com/ (seriously, try it and if you like it, consider buying it).
Communication between and in bounded contexts

EventBuilders

Some people have asked me to clarify what I’ve been saying over on twitter about using event builders. This post is an attempt at showing how I’ve been using them.

This is just one way of creating fluent builders for your events. If you search the net, you’ll find many more (start here).

Let’s dive in. I’m assuming there’s an event in place (called BeganTemplateRollout in this example) along with some minor infrastructure that tells me the class I’m dealing with is indeed an event.

//Could be your own, the one from your favourite messaging framework, 
//or none at all

public interface IMessage {} 
public interface IEvent : IMessage {}

public class BeganTemplateRolloutEvent : IEvent {
  public readonly Guid Id;
  public readonly long Version;
  public readonly Events.RolloutPeriod PeriodToRollout;

  public BeganTemplateRolloutEvent(Guid id, long version, Events.RolloutPeriod periodToRollout) {
    Id = id; Version = version; PeriodToRollout = periodToRollout;
  }
}

public class RolloutPeriod  { 
  //i.e. Events.RolloutPeriod, 
  //the event representation of a value object

  public readonly DateTime StartDate;
  public readonly DateTime EndDate;

  public RolloutPeriod(DateTime startDate, DateTime endDate) {
    StartDate = startDate; EndDate = endDate;
  }
}

So, given that event, what does its builder look like?

public interface IEventBuilder<in TEvent> 
  where TEvent : IEvent
{
  TEvent Build(Guid id, long version); 
}

public class BeganTemplateRolloutEventBuilder : IEventBuilder<BeganTemplateRolloutEvent> {
  Events.RolloutPeriod _periodToRollout;

  public BeganTemplateRolloutEventBuilder ForPeriod(RolloutPeriodBuilder builder) { 
    //Notice the builder? Makes it more fluent, that's all.
    _periodToRollout= builder.Build();
    return this;
  }

  public BeganTemplateRolloutEvent Build(Guid id, long version) {
    return new BeganTemplateRolloutEvent(id, version, _periodToRollout);
  }
}

public class RolloutPeriodBuilder {
  DateTime _startsOn;
  DateTime _endsOn;

  public RolloutPeriodBuilder StartsOn(DateTime value) { _startsOn = value; }
  public RolloutPeriodBuilder EndsOn(DateTime value) { _endsOn = value; }

  public Events.RolloutPeriod Build() { return new Events.RolloutPeriod(_startsOn, _endsOn); }
}

public abstract class AggregateRootEntity {
  Guid _id;
  long _version;

  protected void ApplyEvent(IEventBuilder<IEvent> builder) {
    ApplyEvent(builder.Build(_id, ++_version));
  }

  private void ApplyEvent(IEvent @event) {
    /* You know what goes here ... */
  }
}

Why the explicit event builder interface? Basically this is a contract between the derived aggregate root entities and an AggregateRootEntity base class, where the latter provides event sourcing capabilities as found in Greg Young’s m-r. It allows the derived classes to concern themselves with the purely domain specific part of an event, while the base class can provide event identity (i.e. the aggregate identifier) and versioning (i.e. incrementing a version number upon each change or calculation a version hash or …). The base class makes no assumption about how the information is stored inside the event, by virtue of that Build method.

Having event builders is great, but where can event builders be used in your codebase? The short answer is: pretty much everywhere you’d be using an event. Be it in a command handler test to assert the proper event was produced, inside an event handler/denormalizer test to properly compose an event to be handled, or inside an aggregate to build up events in a readable manner before turning them over to the ApplyEvent(…) method. Below is an example of its usage inside an aggregate.

public class Template : AggregateRootEntity {
  public void BeginRollout(Domain.RolloutPeriod periodToRollout) {
    Guard.Against(
      IsPeriodRolledOut(periodToRollout), 
      ErrorCode.PeriodAlreadyRolledOut);

    ApplyEvent(
      Build.BeganTemplateRollout.
        ForPeriod(periodToRollout) //Double dispatch
    );
  }

  public bool IsPeriodRolledOut(Domain.RolloutPeriod periodToRollout) {
    /* Do what is necessary */
    return false;
  }
  
  void ApplyEvent(BeganTemplateRolloutEvent @event) { 
    // change internal state here
    // if you care for it
  }
}

public class RolloutPeriod { 
  //i.e. Domain.RolloutPeriod

  private DateTime _startDate;
  private DateTime _endDate;

  internal void BuildEvent(BeganTemplateRolloutEventBuilder builder) {
    builder.ForPeriod(
      Build.RolloutPeriod.
        StartsOn(_startDate).
        EndsOn(_endDate));
  }
}

internal static class Build {
  public static BeganTemplateRolloutEventBuilder BeganTemplateRollout { 
    get { return new BeganTemplateRolloutEventBuilder(); } 
  }
  public static RolloutPeriodBuilder RolloutPeriod { 
    get { return new RolloutPeriodBuilder(); } 
  }
}

public static class EventBuilderExtensions {
  public static BeganTemplateRolloutEventBuilder ForPeriod(
    this BeganTemplateRolloutEventBuilder builder, 
    Domain.RolloutPeriod value) 
  {
    value.BuildEvent(builder); //Double dispatch magic
    return builder;
  }
}

How do you control time?

Granted, in some systems this might not be an issue. But if you want to be able to test your system’s behavior in a deterministic fashion, it becomes important to be able to control time.

So how do you control time in your code? Simple, you define your own Clock and use it instead of the System.DateTime static members.

public static class Clock {
  static Func<DateTime> _nowValueProvider;

  static Clock() { _nowValueProvider = () => DateTime.Now; }

  public static void Initialize(Func<DateTime> nowValueProvider) {
    _nowValueProvider = nowValueProvider;
  }

  public static DateTime Now { get { return _nowValueProvider(); } }
  public static DateTime Today { get { return Now.Date; } }
}

Now, I’m not gonna take credit for this code. It was authored by my coworker Wouter Naessens and inspired by an old Ayende post (http://ayende.com/blog/3408/dealing-with-time-in-tests). People using NodaTime will call this a cheap rip-off (http://code.google.com/p/noda-time/source/browse/src/NodaTime/Clock.cs). If you search the net, you’ll find other similar examples. Regardless of how you decide to implement this, what’s important is that you do.

Those that have a copy of NDepend can scan their code for usages of DateTime.Now, DateTime.Today, … and fail the build if any of those are detected.

Now imagine you’re using messaging and you want to test your system in a deterministic way. Setting that clock in unit tests is all fine and dandy, but what about rolling time forward (or backward if need be) between two consecutive commands sent to your system? Simple, you define a system command called SetClock.

public class SetClock : IMessage {
  public DateTime Now { get; private set; }
  public SetClock(DateTime now) {
    Now = now;
  }
}

public class SetClockHandler : IHandle<SetClock> {
  public void Handle(SetClock message) {
    Clock.Initialize(() => message.Now);
    //If you want to keep time rolling forward
    //from message.Now onwards you'll have to be
    //more inventive.
  }
}

You could also embed the clock’s time as a header in the command’s envelope, allowing a message handler to initialize the clock off the header value. There are lots of variations possible.

Using this technique you should be able to replay use cases that involve time, or do acceptance/integration testing that involves time.

Command Design Survey

A while back, I did a command design survey to get a feeling as to how people were designing their commands in a CQRS environment. A total of 28 people participated in the survey (excluding myself, as I didn’t want to bias the results in any way). I want to thank each one of you for taking the time to do so. Here are the results:



To the question “What else is special about your commands?” I got this compiled list:

  • Nothing
  • For now I use what ncqrs requires
  • They contain correlation id, user, role, time stamp, context reference
  • Use data annotations for validations. Commands are treated as value objects.
  • They are DTOs, no behaviour
  • Nothing, they are just standard nCQRS CommandBase implementations at the moment.
  • Simple, no behavior in commands, handlers may throw.
  • Validates them using a nsb message mutator on the sending side and a msg handler on the receiving side
  • Immutable, self describing, explicit
  • They are dead simple :)
  • The base class/interface contains an Id property of type Guid.
  • Getters/Setters to use the command as the ASP.NET MVC view model;nCQRS base class has a marker interface, so both really.;No constructor, so no exception from the constructor. Validation is done by ASP.NET MVC so the user can see the validation errors. I validate again in a command service interceptor, to catch base commands from other sources.
  • I have a ICommand.Validate method that gets called by the dispatcher (returns IEnumerable)

Come to your own conclusions.

Follow

Get every new post delivered to your Inbox.

Join 607 other followers