The journeylism of @yreynhout

On CQRS, DDD(D), ES, …

Category Archives: Event sourcing

A role to play

Every so often someone new arrives at the DDD/CQRS list (*) and topics such as set based validation rear their head, resulting in near-endless threads of discussion and coming to a common understanding. A topic that isn’t as often discussed is one of roles in the domain model and how that would work in combination with event sourcing. If you want to read up on roles, Mark Seemann has some great posts on that topic on his blog, albeit in a slightly different context. There’s also this video by Udi Dahan about making roles explicit, which is more akin to what I’ll be touching upon here. Fanatics of whitepapers, might get their brain washed by papers like “Role Interfaces“, “The Role Object Pattern“, “Modeling Roles” or “Mock Roles, not Objects“.

Let’s make it practical

Suppose I’m building a Realtor app that has a finite number of real estate property types (think apartment, villa, house, warehouse, etc…). I could model each type of property as a class/type (**) putting specific behavior on each of them. I’d end up with Apartment, Villa, House and Warehouse as aggregate root entity types (and hence as aggregates). What if I had some common behavior that applies to all of them, where the calling code has a desire to be ignorant of the specific type, i.e. it is interested in the role that a property plays, not the specific type of property it represents. Let’s call that role ‘Property’ for lack of inspiration. I could implement it using a base class, an interface or even a totally separate class as we’ll see in a moment. The calling code could be about adding a set of properties to a listing:

public class AddPropertiesToListingService {
  readonly IListingRepository _listingRepository;
  readonly IPropertyRepository _propertyRepository;

  public AddPropertiesToListingService(IListingRepository listingRepository, IPropertyRepository propertyRepository) {
    _listingRepository = listingRepository;
    _propertyRepository = propertyRepository;
  }

  public void AddPropertiesToListing(ListingId listingId, PropertyId[] propertyIds) {
    var listing = _listingRepository.Get(listingId);
    var properties = propertyIds.Select(propertyId => _propertyRepository.Get(propertyId));
    listing.AddProperties(properties);
  }
}

If this was modeled using either an interface or base class to denote the role, the repository code (***) would look a bit like this:

public interface IPropertyRepository {
  Property Get(Guid id);
}

public class PropertyRepository : IPropertyRepository {
  readonly IPropertyFactory _factory;
  readonly IEventStreamReader _reader;

  public PropertyRepository(IPropertyFactory factory, IEventStreamReader reader) {
    _factory = factory;
    _reader = reader;
  }

  public Property Get(Guid id) {
    var result = _reader.Read(id);
    if(!result.HasValue) {
      throw new PropertyNotFoundException(id);
    }
    var root = _factory.Create(result.Value);
    root.Initialize(result.Value.Events);
    return root;
  }
}

//This could also be a Func<EventStream, Property>
public interface IPropertyFactory {
  Property Create(EventStream eventStream);
}

public class EventStreamAnalyzingPropertyFactory : IPropertyFactory {
  readonly Dictionary<Type, Func<Property>> _propertyFactories;

  public EventStreamAnalyzingPropertyFactory() {
    _propertyFactories = new Dictionary<Type, Func<Property>>();
    //Assume that each of the aggregate root entities
    //has a static Factory method that creates a new instance.
    _propertyFactories.Add(typeof(ApartmentRegistered), () => Apartment.Factory());
    _propertyFactories.Add(typeof(VillaRegistered), () => Villa.Factory());
    _propertyFactories.Add(typeof(HouseRegistered), () => House.Factory());
    _propertyFactories.Add(typeof(WarehouseRegistered), () => Warehouse.Factory());
    //Remark: Yes, this is an OCP violation.
  }

  public Property Create(EventStream eventStream) {
    Func<Property> propertyFactory;
    if(!_propertyFactories.TryGet(eventStream[0].GetType(), out propertyFactory))
      throw new PropertyUnknownException(eventStream.Id);
    return propertyFactory();
  }
}

public interface IEventStreamReader {
  Optional<EventStream> Read(Guid id);
}

public interface Optional<T> {
  bool HasValue { get; }
  T Value { get; }
}

public interface EventStream {
  Guid Id { get; }
  Int32 ExpectedVersion { get; }
  //In a real world implementation this would be streaming,
  //i.e. IEnumerable<object>
  object[] Events { get; }
}

public class PropertyNotFoundException : Exception {
  public PropertyNotFoundException(Guid id) { }
}

public class PropertyUnknownException : Exception {
  public PropertyUnknownException(Guid id) { }
}

//As an aside, Property could also be 
//turned into an interface to describe
//the role behavior.
public abstract class Property : AggregateRootEntity { 
  /* Common behavior can be put here */
}

//Similar for the other property types.
public class Villa : Property {
  public static readonly Func<Villa> Factory = () => new Villa();

  Villa() { /* ... */}

  /* Specific behavior can be put here */
}

Notice how the factory is made responsible for analyzing the event stream and deciding which Property type to instantiate based on the type of the first event. It should be obvious that “the type of the first event” is just one of the ways you could come to decision of which Property type to instantiate.

A slightly different scenario is one where you load the stream into a dedicated class, instead of relying on a base class or an interface to fulfill the role.

public class Property {
  public static readonly Func<Property> Factory = () => new Property();

  Property() { 
    /* Streams from each of the Property types can 
       be loaded into this Role class  */
    Register<ApartmentRegistered>(When);
    Register<VillaRegistered>(When);
    Register<HouseRegistered>(When);
    Register<WarehouseRegistered>(When);
    /* Notice how I haven't even brought up what 
       you could do if these were polymorphic
       messages */
  } 

  /* Common role behavior goes here */
}

public class PropertyRepository : IPropertyRepository {
  readonly IEventStreamReader _reader;

  public PropertyRepository(IEventStreamReader reader) {
    _reader = reader;
  }

  public Property Get(Guid id) {
    var result = _reader.Read(id);
    if(!result.HasValue) {
      throw new PropertyNotFoundException(id);
    }
    var root = Property.Factory();
    root.Initialize(result.Value.Events);
    return root;
  }
}

This is exactly why you shouldn’t use the type name of an aggregate as a form of stream identification (at least if you want to support this kind of scenario). The repository will just load up the stream, being totally ignorant of what class was used to produce the events in the stream in the first place and happily feed it to the Property class.

Another scenario where this technique could prove to be useful is when your entity goes through a life cycle where each state has very different behavior or behavior needs to be limited as of a certain stage in its life cycle. Of course, this shouldn’t be used as an excuse to NOT model things explicitly.

Conclusion

The most important takeaway is that a stream of events does not need to be loaded into the same class all the time and that roles remain useful within a model backed by event sourcing. Like anything, this should be used with moderation and only if applicable.

(*) I’m reliving a scene with Arnold in Total Recall (1990) as I’m writing this (http://www.youtube.com/watch?feature=player_detailpage&v=WFMLGEHdIjE#t=86s).
(**) Having worked in the Realtor business, I can tell you right off the bat that having a class per real estate property type is going to hurt in the long run, but who am I to judge about the usefulness of this particular model.
(***) Don’t complain if the code doesn’t compile out of the box. I used my C# brain compiler.

Object Inheritance

When mentioning “object inheritance” most people immediately think of “class inheritance“. Alas, that’s not what it is. Quoting from Streamlined Object Modeling*:

Object inheritance allows two objects representing a single entity to be treated as one object by the rest of the system.

Put a different way, where class inheritance allows for code reuse, object inheritance allows for data reuse.

Application

Where could this be useful? Depending on the domain you are in, I’d say quite a few places. Whenever you replicate data from one object to another object, you should stop and consider if “object inheritance” could be applicable. While object inheritance does impose some constraints, it also saves you from writing reconciliation code to synchronize two or more objects (especially in systems that heavily rely on messaging).
A common example I like to use is the one of a video title and a video tape (**). From a real world point of view, a video tape has both the attributes of the tape itself and the title. Yet, if I modeled this as two separate objects, I run into trouble (a.k.a. synchronization woes). If I copy the title information to the tape upon creation of the tape, and I made an error in say the release date of the title, I now have to replicate that information to the “affected” tapes. Don’t get me wrong, sometimes this kind of behavior is desirable, i.e. it makes sense from a business perspective. But what if it doesn’t? That’s where “object inheritance” comes in. To a large extent, it can cover the scenarios that a synchronization based solution can, IF constraints are met.

Constraints

  • Localized data: “Object inheritance” assumes that the data of both objects lives close together. That might be a deal breaker for larger systems, or at least an indication that you’d have to consider co-locating the data of “both” objects.
  • Immutable data: One of the objects, the “parent”, its data is immutable during object inheritance. From an OOP perspective that means you can only invoke side-effect free functions on a “parent”.
  • “Parent” object responsibilities: the parent object contains information and behaviors that are valid across multiple contexts, multiple interactions, and multiple variations of an object.
  • “Child” object responsibilities: the child object represents the parent in a specialized context, in a particular interaction, or as a distinct variation.

The aforementioned book also states other constraints such as the child object exhibiting the parent object’s “profile” (think interface), but I find those less desirable in an environment that uses CQRS. For more in-depth information, I strongly suggest you read it. It has a wealth of information on this very topic.

Code

In its simplest form “object inheritance” looks and feels like object composition.

The composition can be hidden from child object consuming code behind a repository’s facade as shown below.

The gist of “object inheritance” is that the child object (the video tape) asks the parent (the video) for data or invokes side-effect free functions on the parent to accomplish its task.

Lunch -> Not Free

Whether you go down the route of “object inheritance” or (message based) “synchronization”, you will have to think about object life-cycles (both parent and child). Sometimes it’s desirable for the child to have a still view of the parent, a snapshot if you will. Other times you may want the child to always see the parent in its most current form. Children can even change parent during their life-cycle. Other children may want a more “controlled” view of the parent, rolling forward or backward based on their needs or context. You can get pretty sophisticated with this technique, especially in an event sourced domain model since it’s very well suited to roll up to a certain point in time or a certain revision of a parent. In the same breath, I should also say that you can very well shoot yourself in the foot with this technique, especially if the composition is to be taking place in the user interface, in which case there’s no point in using this. It’s also very easy to bury yourself in the pit of abstraction when talking about “child” and “parent”, so do replace those words with nouns from your domain, and get your archetypes right.

All in all, I’ve found this a good tool in the box, that plays nicely with aggregates, event-sourcing, even state-based models that track the concept of time. It’s not something I use everyday, but whenever I did, I ended up with less code. YMMV.

(*) Streamlined object modeling is a successor to Peter Coad’s “Modeling In Color“. A Kindle version of the former is available.
(**) I’m from the “cassette & video” ages.

The Money Box

Once in a while I hear/read people struggle with projection performance. The root causes of their performance issues are diverse:

  • using the same persistence behavior during projection rebuild as during live/production build
  • use of ill fit technology (here’s looking at you EF) or usage of said technology in the wrong way
  • too wide a persistence interface which makes it difficult to optimize for performance
  • no batching support or batching as an afterthought
  • not thinking about the implication of performing reads during projection

Ever heard of the book Refactoring to Patterns? It has a nice refactoring in there, called Move Accumulation to Collecting Parameter that refers to the Collecting Parameter pattern on C2. How would this help with thy projections? Well, what if you could decouple the act of performing an action from collecting what is required to be able to perform an action? Put another way, what if you decouple the act of executing SQL DML statements from collecting those statements during projection (a.k.a. event handling)? So, instead of …

… we add another level of indirection …

The most noticeable differences are the decoupling from persistence technology(*), no reads, and no promises with regard to when the requested operations will be executed/flushed to storage. Usually, the IProjectionSqlOperations interface will have a very small surface (i.e. low member count), covering INSERT, UPDATE and DELETE. During live/production projection building you could have an implementation of this interface (a.k.a. strategy) that flushes as soon as an operation is requested.

However, the more interesting implementations are the ones that are used during rebuild.

This implementation translates the requested operations into sql statement objects (abstracted by ISqlStatement) and pushes them onto something that observes these sql statements. The observer couldn’t care less what the actual sql statements are (that happened in the projection handler above). The simplest observer implementation could look something like this …

Of course, collecting in and by itself is not all that useful. You have to do something with what you’ve collected (“the money in the box”). Let’s look at another observer that takes a slightly different approach.

Without diving too much into the details, this observer flushes statements to the database as soon as a hard-coded threshold is reached. It does so in a batch-like fashion to minimize the number of roundtrips, but still adhering to the limitations that come with this particular ADO.NET data provider. Other implementations use SqlBulkCopy to maximize performance (but come with their own limitations). Depending on what memory resources a server has you could get pretty creative as to which strategy you choose to rebuild a large projection.

Conclusion

I’ve shown you SQL centric projections, but please, do step out of the box. Nothing is stopping you from producing and collecting “HttpStatements” for your favorite key-value store or “FileOperations” for your file-based projections. Nothing is stopping you from making different choices for the producing and consuming side. Nothing is stopping you from doing the “statement” execution in an asynchronous and/or parallel fashion. It’s just a matter of exploring your options and use what works in your environment. Next time I’ll show you how reading fits into all this …

(*): Yeah, yeah, I know, too much abstraction kills kittens …

Value objects in an event sourced domain model

A question that comes up from time to time is what role value objects can play in an event sourced domain model and how they contribute to and interact with the events produced by said domain model. Value objects come in many shapes and sizes, but obvious ones like money (amount and currency) and period (range of time values) come to mind. Fields that change together or are used together in the ubiquitous language are prime suspects of information “clustering” and can usually be represented as value objects when they clearly lack any form of identity. They are an ideal place to DRY up some of the validation, security and contextual capturing code that surrounds these fields.
Below you’ll find a stripped version of a value object from my own domain. Things you can’t easily spot from the code below is the clusivity of the lower and upper boundary of this range-like value type, partially because I omitted the arithmetic operations. The finer details, such as which methods are required and who to collaborate with, surface as you discuss them with domain experts.

namespace Domain {
  public class RolloutPeriod {
    readonly DateTime _from;
    readonly DateTime _to;

    public RolloutPeriod(DateTime from, DateTime to) {
      if(from > to)
        throw new ArgumentException("The from value of the rollout period must be less than or equal to the to value.", "from");
      _from = from;
      _to = to;
    }

    //Arithmetic members omitted for brevity

    public bool Equals(RolloutPeriod other) {
      if (ReferenceEquals(null, other)) return false;
      if (ReferenceEquals(this, other)) return true;
      return other._from.Equals(_from) && other._to.Equals(_to);
    }

    public override bool Equals(object obj) {
      if (ReferenceEquals(null, obj)) return false;
      if (ReferenceEquals(this, obj)) return true;
      if (obj.GetType() != typeof (RolloutPeriod)) return false;
      return Equals((RolloutPeriod) obj);
    }

    public override int GetHashCode() {
      return _from.GetHashCode() ^ _to.GetHashCode();
    }
  }
}

Something you do notice in the above code is that there are no getters nor setters. This is deliberate. On one hand, it allows me to redefine how I represent values on the inside (i.e. the from and to value). On the other hand, I decouple any consuming code from the internals and gently steer that code into a Tell Don’t Ask style of interaction. It makes you think about the question: “Why do you want access to my internals? What is it that you want to achieve with them? Just tell me what you want to do.”.
Once you get that right, you bump into the next hurdle: using value objects in an event sourced domain model.

namespace Domain {
  public class Schedule : AggregateRootEntity {
    public void Rollout(RolloutPeriod period) {
      //Guards ommitted for brevity

      ApplyEvent(Build.RolledoutSchedule.ForPeriod(period));
    }
  }

  public abstract class AggregateRootEntity {
    Guid _id;
    long _version;

    protected void ApplyEvent(IEventBuilder<IEvent> builder) {
      var @event = builder.Build(_id, _version++);
      //The usual stuff as found below goes here
      //https://github.com/gregoryyoung/m-r/blob/master/SimpleCQRS/Domain.cs
    }
  }
}

namespace Messaging {
  public static class Build {
    public static RolledoutScheduleBuilder RolledoutSchedule { get { return new RolledoutScheduleBuilder(); } }
  }

  public class RolledoutScheduleBuilder : IEventBuilder<RolledoutScheduleEvent> {
    RolloutPeriod _rolloutPeriod;

    public RolledoutScheduleEvent Build(Guid id, long version) {
      return new RolledoutScheduleEvent(id, version, _rolloutPeriod);
    }

    public RolledoutScheduleBuilder ForPeriod(RolloutPeriodBuilder builder) {
      _rolloutPeriod = builder.Build();
      return this;
    }
  }

  public interface IEventBuilder<out TEvent> where TEvent : IEvent {
    TEvent Build(Guid id, long version);
  }

  public class RolledoutScheduleEvent : IEvent {
    public RolledoutScheduleEvent(Guid id, long version, RolloutPeriod period) {
      Id = id;
      Version = version;
      Period = period;
    }

    public RolloutPeriod Period { get; private set; }

    public long Version { get; private set; }

    public Guid Id { get; private set; }
  }

  public interface IEvent { }

  public class RolloutPeriod {
    public DateTime From { get; private set; }
    public DateTime To { get; private set; }

    public RolloutPeriod(DateTime from, DateTime to) {
      From = from;
      To = to;
    }
  }

  public class RolloutPeriodBuilder {
    DateTime _from;
    DateTime _to;

    public RolloutPeriodBuilder From(DateTime value) {
      _from = value;
      return this;
    }

    public RolloutPeriodBuilder To(DateTime value) {
      _to = value;
      return this;
    }

    public RolloutPeriod Build() {
      return new RolloutPeriod(_from, _to);
    }
  }
}

The rollout period passed into the Schedule rollout method, is a value object part of the domain model. The rollout period of the event – a bit obfuscated by the syntax ‘Build.RolledoutSchedule.ForPeriod’ above – is part of the messaging bits. How do these two meet without exposing internals?

One thing I’d like to make clear: the domain value object IS NOT the data structure used inside the event. It’s a different type.

By applying double dispatch to the domain rollout period and adding a builder extension method – inside the domain model – that is domain rollout period aware, we get a nice translation going.

namespace Domain {
  public class RolloutPeriod {
    //... see above for other members

    internal void BuildValue(RolloutPeriodBuilder builder) {
      builder.From(_from).To(_to);
    }
  }

  public static class BuildExtensions {
    public static RolledoutScheduleBuilder ForPeriod(this RolledoutScheduleBuilder builder, RolloutPeriod period) {
      var valueBuilder = new RolloutPeriodBuilder();
      period.BuildValue(valueBuilder);
      builder.ForPeriod(valueBuilder);
      return builder;
    }
  }
}

The reverse, going from an event (or data structure within that event) to a value object, is just as easy.

namespace Domain {
  public class RolloutPeriod {
    //... see above for other members

    internal static RolloutPeriod FromEvent(Messaging.RolloutPeriod period) {
      return new RolloutPeriod(period.From, period.To);
    }
  }

  public class Schedule : AggregateRootEntity {
    void Apply(RolledOutScheduleEvent @event) {
      _rolloutPeriod = RolloutPeriod.FromEvent(@event.RolloutPeriod);
      //...
    }
  }
}

So, really, there’s no excuse to NOT use value objects inside an event sourced domain model nor to expose the internal state of those value objects for event sourcing purposes. Happy coding …

P.S. More information on event builders can be found here.

Using Windows Azure to build a simple EventStore

Continuing my journey, I figured it shouldn’t be that hard to apply “Your EventStream is a linked list” using Windows Azure Storage Services.

Disclaimer: Again, this is just a proof of concept, a way to get started, not production code … obviously. I’m assuming you know your way around Windows Azure (how to set up an account, stuff like that) and how the .NET SDK works.

Just like with Amazon Web Services, the first thing to create are the containers that will harbor the changesets and aggregate heads. Unlike the AWS example, I’ve chosen to store both using the same service, Windows Azure Blob Storage. Why? Because it offers optimistic concurrency by leveraging ETags. The only unfortunate side-effect is that it seems to force me to break CQS, but I can live with that for now. The Windows Azure Blob Storage provides convenience methods to only create the containers when they don’t exist.

//Setting up the Windows Azure Blob Storage container to store changesets and aggregate heads

const string PrimaryAccessKey = "your-key-here-for-testing-purposes-only-ofcourse-;-)";
const string AccountName = "youraccountname";
const string ChangesetContainerName = "changesets";
const string AggregateContainerName = "aggregates";

var storageAccount =
  new CloudStorageAccount(
      new StorageCredentialsAccountAndKey(
            AccountName,
            PrimaryAccessKey),
      false);

var blobClient = storageAccount.CreateCloudBlobClient();

var changesetContainer = blobClient.GetContainerReference(ChangesetContainerName);
changesetContainer.CreateIfNotExist();

var aggregateContainer = blobClient.GetContainerReference(AggregateContainerName);
aggregateContainer.CreateIfNotExist();

Now that we’ve set up the infrastructure, let’s tackle the scenario of storing a changeset. The flow is pretty simple. First we try to store the changeset in the changeset container as a blob.

public class ChangesetDocument {
      public const long InitialVersion = 0;

      //Exposing internals to make the sample easier
      public Guid ChangesetId { get; set; }
      public Guid? ParentChangesetId { get; set; }
      public Guid AggregateId { get; set; }
      public long AggregateVersion { get; set; }
      public string AggregateETag { get; set; }

      public byte[] Content { get; set; }
}

//Assuming there's a changeset document we want to store,
//going by the variable name 'document'.

var changesetBlob = changesetContainer.GetBlobReference(document.ChangesetId.ToString());
var changesetUploadOptions = new BlobRequestOptions {
      AccessCondition = AccessCondition.None,
      BlobListingDetails = BlobListingDetails.None,
      CopySourceAccessCondition = AccessCondition.None,
      DeleteSnapshotsOption = DeleteSnapshotsOption.None,
      RetryPolicy = RetryPolicies.NoRetry(),
      Timeout = TimeSpan.FromSeconds(90),
      UseFlatBlobListing = false
};
changesetBlob.UploadByteArray(document.Content, changesetUploadOptions);

const string AggregateIdMetaName = "aggregateid";
const string AggregateVersionMetaName = "aggregateversion";
const string ChangesetIdMetaName = "changesetid";
const string ParentChangesetIdMetaName = "parentchangesetid";

//Set the meta-data of the changeset
//Notice how this doesn't need to be transactional
changesetBlob.Metadata[AggregateIdMetaName] = document.AggregateId.ToString();
changesetBlob.Metadata[AggregateVersionMetaName] = document.AggregateVersion.ToString();
changesetBlob.Metadata[ChangesetIdMetaName] = document.ChangesetId.ToString();
if(document.ParentChangesetId.HasValue)
      changesetBlob.Metadata[ParentChangesetIdMetaName] = document.ParentChangesetId.Value.ToString();
changesetBlob.SetMetadata();

If that goes well, we try to upsert the head of the aggregate to get it to point to this changeset. Below, we’re using the ETag of the aggregate head blob as a way of doing optimistic concurrency checking. It caters for both the initial (competing inserters) and update (competing updaters) concurrency.

public static class ExtensionsForChangeDocument {
      public static AccessCondition ToAccessCondition(this ChangesetDocument document) {
            if(document.AggregateVersion == ChangesetDocument.InitialVersion) {
                  return AccessCondition.IfNoneMatch("*");
            }
            return AccessCondition.IfMatch(document.AggregateETag);
      }
}

//Upsert the aggregate
var aggregateBlob = aggregateContainer.GetBlobReference(document.AggregateId.ToString());
var aggregateUploadOptions = new BlobRequestOptions {
      AccessCondition = document.ToAccessCondition(),
      BlobListingDetails = BlobListingDetails.None,
      CopySourceAccessCondition = AccessCondition.None,
      DeleteSnapshotsOption = DeleteSnapshotsOption.None,
      RetryPolicy = RetryPolicies.NoRetry(),
      Timeout = TimeSpan.FromSeconds(90),
      UseFlatBlobListing = false
};
aggregateBlob.UploadByteArray(document.ChangesetId.ToByteArray(), aggregateUploadOptions);
//Here's where we are breaking CQS if we'd like to cache the aggregate.
//This won't be a problem if we're re-reading the aggregate upon each behavior.
var eTag = aggregateBlob.Properties.ETag;

If the UploadByteArray operation throws a StorageClientException indicating that “The condition specified using HTTP conditional header(s) is not met.”, we know there was some form of optimistic concurrency. In such a case, it’s best to repeat the entire operation.
Now that we’ve dealt with writing, let’s take a look at reading. First we need to fetch the pointer to the last stored and approved changeset identifier.

var aggregateBlob = aggregateContainer.GetBlobReference(aggregateId.ToString());
var aggregateDownloadOptions = new BlobRequestOptions {
      AccessCondition = AccessCondition.None,
      BlobListingDetails = BlobListingDetails.None,
      CopySourceAccessCondition = AccessCondition.None,
      DeleteSnapshotsOption = DeleteSnapshotsOption.None,
      RetryPolicy = RetryPolicies.NoRetry(),
      Timeout = TimeSpan.FromSeconds(90),
      UseFlatBlobListing = false
};
var changesetId = new Guid?(new Guid(aggregateBlob.DownloadByteArray(aggregateDownloadOptions)));
var eTag = aggregateBlob.Properties.ETag;

Now that we’ve bootstrapped the reading process, we keep reading each changeset, until there’s no more changeset to read. Each approved changeset contains metadata that points to the previous approved changeset. It’s the responsibility of the calling code todo something useful with the read changesets (e.g. deserialize the content and replay each embedded event into the corresponding aggregate).

while(changesetId.HasValue) {
      var changesetBlob = changesetContainer.GetBlobReference(changesetId.Value.ToString());
      var changesetDownloadOptions = new BlobRequestOptions {
            AccessCondition = AccessCondition.None,
            BlobListingDetails = BlobListingDetails.None,
            CopySourceAccessCondition = AccessCondition.None,
            DeleteSnapshotsOption = DeleteSnapshotsOption.None,
            RetryPolicy = RetryPolicies.NoRetry(),
            Timeout = TimeSpan.FromSeconds(90),
            UseFlatBlobListing = false
      };
      var content = changesetBlob.DownloadByteArray(changesetDownloadOptions);
      changesetBlob.FetchAttributes();
      var document = new ChangesetDocument {
            AggregateETag = eTag,
            AggregateId = new Guid(changesetBlob.Metadata[AggregateIdMetaName]),
            AggregateVersion = Convert.ToInt64(changesetBlob.Metadata[AggregateVersionMetaName]),
            ChangesetId = new Guid(changesetBlob.Metadata[ChangesetIdMetaName]),
            Content = content,
      };
      if (changesetBlob.Metadata[ParentChangesetIdMetaName] != null)
            document.ParentChangesetId = new Guid(changesetBlob.Metadata[ParentChangesetIdMetaName]);

      yield return document;

      changesetId = document.ParentChangesetId;
}

The only *weird* thing with this code is that I’m propagating the aggregate’s head ETag using the changesets. It’s a modeling issue I’ll have to revisit ;-) .
But that’s basically all there’s to it. In reality, you’ll need a lot more metadata and error handling to make this a success. I should point out that the performance of this service consumed on premise was better than what I experienced with AWS.

Conclusion

Using Windows Azure Storage Services is not so different from the Amazon Web Services in this case. However, this overall technique suffers from a few drawbacks. As mentioned before, upon concurrency, you might be wasting some storage space. Another drawback is the fact that you need to read all changesets that make up the history of an aggregate (or eventsource if want to decouple it from DDD terminology) before being able to apply the first event. There’s ways around this, such as storing all the changeset identifiers in the aggregate head if the total number of behaviors in an aggregate is low on average. You could even partition the aggregate head into multiple documents using nothing but its version number or count of applied behaviors to partition, but that’s the subject of a future exploration. I apologize if this post is a somewhat copy-paste of its AWS counterpart, but given the goal and similarities that was to be expected :-) .

Using Amazon Web Services to build a simple Event Store

My post on “Your EventStream is a linked list” might have been somewhat abstract (a more prosaic version can be found here). By way of testing my theory, I set out to apply it using Amazon’s Web Services (AWS) stack. I used AWS DynamoDB as the transactional medium that would handle the optimistic concurrency, and AWS S3 to store the changesets.

Disclaimer: This is just a proof of concept, a way to get started. It’s not production code … obviously :-) I’m assuming you know your way around AWS (how to set up an account, stuff like that) and how the .NET SDK works.

The first thing todo is to create the container for the changesets. In AWS S3 those things are called buckets. This is a one time operation. You might want to make this conditional at the startup of your application/worker role/what-have-you. The AWS S3 API provides mechanisms to query for buckets, so you should be able to pull that off. On the other hand, when creating buckets using the REST API the HTTP PUT verb is used, and we all know that PUT is supposed to be idempotent. You might want to read up on the PUT Bucket API. One last thing: bucket names are unique across all of S3.

//Setting up the Amazon S3 bucket to store changesets in
const string ChangesetBucketName = "yourorganization_yourboundedcontextname_changesets";

var s3Client = AWSClientFactory.CreateAmazonS3Client();

s3Client.PutBucket(
  new PutBucketRequest().
    WithBucketName(ChangesetBucketName).
    WithBucketRegion(S3Region.EU)); //Your region might vary

The same thing needs to happen in AWS DynamoDB. There, the containers are called tables. Again, this is a one time, conditional operation. You might wanna read up on the specifics of table naming and uniqueness. The throughput provisioning is not something you want to be static. Monitor the load on your system (they have notification events/alerts for that), both in terms of number of reads/writes and bytes used for storage, and use that information to tune the table throughput settings.

//Setting up the Amazon DynamoDB table to store aggregates in
const string AggregateTableName = "yourboundedcontextname_aggregates";
const string AggregateIdName = "aggregate-id";

//At the time of writing, DynamoDBClient didn't make it yet
//into the AWSClientFactory.
var dynamoDbClient = new AmazonDynamoDBClient();

dynamoDbClient.CreateTable(
  new CreateTableRequest().
    WithTableName(AggregateTableName).
    WithKeySchema(
      new KeySchema().
        WithHashKeyElement(
          new KeySchemaElement().
            WithAttributeName(AggregateIdName).
            WithAttributeType("S")
        ).
        WithRangeKeyElement(null)
    ).
    WithProvisionedThroughput(
      new ProvisionedThroughput().
        WithReadCapacityUnits(300).
        WithWriteCapacityUnits(500)
    )
  );

Now that we’ve set up the infrastructure, let’s tackle the scenario of storing a changeset. The flow is pretty simple. First we try to store the changeset in AWS S3 as an object in the configured bucket.

//An internal representation of the changeset
public class ChangesetDocument {
  public const long InitialValue = 0;

  //Exposing internal state here to
  //simplify the example.
  public Guid AggregateId { get; set; }
  public long AggregateVersion { get; set; }
  public Guid ChangesetId { get; set; }
  public Guid? ParentChangesetId { get; set; }

  public byte[] Content { get; set; }

  public Stream GetContentStream() {
    return new MemoryStream(Content, writable: false);
  }
}

//Assuming there's a changeset document we want to store,
//going by the variable name 'document'.

const string AggregateIdMetaName = "x-amz-meta-aggregate-id";
const string AggregateVersionMetaName = "x-amz-meta-aggregate-version";
const string ChangesetIdMetaName = "x-amz-meta-changeset-id";
const string ParentChangesetIdMetaName = "x-amz-meta-parent-changeset-id";

var putObjectRequest = new PutObjectRequest().
  WithBucketName(ChangesetBucketName).
  WithGenerateChecksum(true).
  WithKey(document.ChangesetId.ToString()).
  WithMetaData(ChangesetIdMetaName, document.ChangesetId.ToString()).
  WithMetaData(AggregateIdMetaName, document.AggregateId.ToString()).
  WithMetaData(AggregateVersionMetaName, Convert.ToString(document.AggregateVersion));

if (document.ParentChangesetId.HasValue) {
  putObjectRequest.WithMetaData(ParentChangesetIdMetaName, document.ParentChangesetId.Value.ToString());
}

putObjectRequest.WithInputStream(document.GetContentStream());

s3Client.PutObject(putObjectRequest);

If that goes well, we try to create an item in the configured AWS DynamoDB table. The expected values below are the AWS DynamoDB way of doing optimistic concurrency checking. They cater for both the initial (competing inserters) and update (competing updaters) concurrency.

In this example I’m using an incrementing version number to do that. Strictly speaking, I could be using the changeset identifier for that, but an incrementing version number is easier to relate to when you come from an ORM/database background.

const string AggregateVersionName = "aggregate-version";
const string ChangesetIdName = "changeset-id";

public static class ExtensionsForChangesetDocument {
  public static KeyValuePair<string, ExpectedAttributeValue>[] ToExpectedValues(this ChangesetDocument document) {
    var dictionary = new Dictionary<string, ExpectedAttributeValue>();
    if(document.AggregateVersion == ChangesetDocument.InitialValue) {
      //Make sure we're the first to create the aggregate
      dictionary.Add(AggregateIdName,
        new ExpectedAttributeValue().
          WithExists(false)
      );
    } else {
      //Make sure nobody changed the aggregate behind our back 
      dictionary.Add(AggregateIdName,
        new ExpectedAttributeValue().
          WithExists(true).
          WithValue(
            new AttributeValue().
              WithS(document.AggregateId.ToString()))
      );
      dictionary.Add(AggregateVersionName,
        new ExpectedAttributeValue().
          WithValue(
            new AttributeValue().
              WithN(Convert.ToString(document.AggregateVersion - 1)))
      );
    }
    return dictionary.ToArray();
  }

  public static KeyValuePair<string, AttributeValue>[] ToItemValues(this ChangesetDocument document) {
    //The relevant values to store in the item.
    var dictionary = new Dictionary<string, AttributeValue> {
      {AggregateIdName, new AttributeValue().WithS(document.AggregateId.ToString())},
      {AggregateVersionName, new AttributeValue().WithN(Convert.ToString(document.AggregateVersion))},
      {ChangesetIdName, new AttributeValue().WithS(document.ChangesetId.ToString())},
    };
    return dictionary.ToArray();
  }
}

dynamoDbClient.PutItem(
  new PutItemRequest().
    WithTableName(AggregateTableName).
    WithExpected(document.ToExpectedValues()).
    WithItem(document.ToItemValues())
  );

If the PutItem operation throws an exception indicating that the “ConditionalCheckFailed”, we know there was some form of optimistic concurrency. In such a case, it’s best to repeat the entire operation.

I’ve left duplicate command processing elimination as an exercise for the reader ;-)

The only thing left todo is showing the reverse operation, reading. First we need to fetch the pointer to the last stored and approved changeset identifier. I hope you do realize that a consistent read is not strictly necessary, since the likelihood of concurrency should be low.

var getItemResponse = dynamoDbClient.GetItem(
  new GetItemRequest().
    WithTableName(AggregateTableName).
    WithKey(
      new Key().
        WithHashKeyElement(
          new AttributeValue().
            WithS(aggregateId.ToString())).
        WithRangeKeyElement(null)).
    WithConsistentRead(false).
    WithAttributesToGet(ChangesetIdName)
  );

Now that we’ve bootstrapped the reading process, we keep reading each changeset, until there’s no more changeset to read. Each approved changeset contains metadata that points to the previous approved changeset. It’s the responsibility of the calling code todo something useful with the read changesets (e.g. deserialize the content and replay each embedded event into the corresponding aggregate).

var changesetId = new Guid?(new Guid(getItemResponse.GetItemResult.Item[ChangesetIdName].S));
while(changesetId.HasValue) {
  var getObjectResponse = s3Client.GetObject(
    new GetObjectRequest().
      WithBucketName(ChangesetBucketName).
      WithKey(changesetId.Value.ToString()));
  var document = new ChangesetDocument {
    AggregateId = new Guid(getObjectResponse.Metadata[AggregateIdMetaName]),
    AggregateVersion = Convert.ToInt64(getObjectResponse.Metadata[AggregateVersionMetaName]),
    ChangesetId = new Guid(getObjectResponse.Metadata[ChangesetIdMetaName]),
    Content = getObjectResponse.ResponseStream.ToByteArray()
  };
  var parentChangesetIdAsString = getObjectResponse.Metadata[ParentChangesetIdMetaName];
  if(parentChangesetIdAsString != null) {
    document.ParentChangesetId = new Guid(parentChangesetIdAsString);
  }

  changesetId = document.ParentChangesetId;

  yield return document;
}

And that’s basically all there’s to it. In reality, you’ll need a lot more metadata and error handling to make this a success. I should point out that the performance of these services consumed on premise was abominal. I’m assuming (hoping) that if you consume them from an AWS EC2 instance, performance will be much better.

Conclusion

A clever reader will notice that this technique can easily be transposed to other document/data stores. The specifics will vary, but the theme will be the same. The reason for applying this technique has mainly todo with the inherent constraints of certain datastores. The transactional ones – especially in cloudy environments – are limited with regard to the number of bytes they can store, which makes them less desirable for storing payloads in (prediction of payload size can be hard at times). There are easy ways to side-step concurrency problems such as serializing all aggregate access to one worker/queue, but that’s another discussion.

Your EventStream is a linked list

A week ago I had an interesting twonversation with Jérémie Chassaing and Rinat Abdullin about event streams. I mentioned how I had been toying with eventstreams as being linked lists of changesets (to the aficionados of Jonathan Oliver’s EventStore, this is very much akin to what he calls Commits) in the past. As a way of documenting some of my thoughts on the subject I’m putting up some schematics here.

Model of an event stream

Model

Fom the above picture you can deduce that an event stream is really a simple thing: a collection of changesets. A changeset itself is a collection of events that occurred (how you got there is not important at this point). Event streams and changesets both have a form of unique identity. Head marks the identity of the latest known changeset. A changeset is immutable. For the sake of simplicity I’ve omitted any form of headers/properties you can attach to an event stream, a changeset and/or an event.

Changesets as a LinkedList

EventStream - Model

Each changeset knows its “parent”, i.e. the changeset that it should be appended to. Except for the very first changeset, which obviously does not have a “parent” (strictly speaking you could have an explicit terminator “parent”). Chronologically, changeset 1 came before changeset 2, changeset 2 came before changeset 3, and so on.

Looking at the write side of a system that uses event streams for storage, there are two main scenarios:

  1. Writing a changeset of a given event stream: concerns here are duplicate changeset elimination & detecting conflicts, besides the act of actually writing.
  2. Reading the entire event stream: the main concern here is reading all changesets of the event stream as fast as we can, in order.

I’m well aware I’ve omitted other concerns such as automatic event upgrading, event dispatching, snapshotting which, frankly, are distractions at this point.

Reading

Changesets As Files

Supposing that each changeset is say a file on disk, how would I know where to start reading? Various options, really. The picture above illustrates one option where – by using “<streamid>.txt” as a convention – the Stream Head File is loaded to bootstrap “walking the chain”, by virtue of having it point to the latest changeset document (represented as LatestChangesetRef) that makes up that stream. As each Changeset File is read, it provides a reference/pointer to the next Changeset File to read (represented by ParentRef). That reference is really the identity of the next changeset.

I hope I don’t need to explain why you need to keep those identifiers logical. Don’t make it a “physical” thing, like the path to the next changeset file. That would be really painful if you were ever to restructure the changeset files on disk. Instead you should delegate the responsibility of translating/resolving a logical identity into its physical location.

Other options for “where to start reading” could be:

  1. keeping the head of each stream in memory (causing “sticky” streams and dealing with recovery mechanisms).
  2. storing the head as a record in a database or blob storage with concurrency control

Now, reading each changeset file could become a bit costly if they’re scattered allover the disk. There’s nothing stopping you from throwing all those changeset documents in one big file, even asynchronously. This is where immutability and resolving identities can really help you out. It’s important to distinguish between what happens at the logical level and what happens at the physical level.

Alternative Changesets As Files

Yet another approach might be to keep an index file of all changesets (above represented by the Stream Index File) that make up the event stream (in an append only fashion), thus alleviating the changeset documents from having to know their parents.

Writing

Basically, this operation can be split up into writing the changeset document and updating the head (or index) of the event stream. The advantage here is that storing the changeset document does not require any form of transaction. This allows you to choose from a broader range of data-stores as there really isn’t a requirement beyond the guarantee that they will remember and serve what you asked them to store. Updating the head of the event stream does require you to at least be able to detect concurrent writes are happening or have happened, depending on how you want to resolve conflicts. As such, there’s no need to store both of them in the same data-store. Also notice that the duration of the transaction is reduced by taking the changeset itself out of the equation.

When picking a changeset identity, you might be tempted to reuse the identifier of the message that caused the changes to happen (usually the Command’s message identifier). Don’t. Remember, retrying that same command might produce a different set of changes. How are you going to differentiate between rectifying a previous failure with a retry and some other thread trying to process the same message? It’s best to use identities/identifiers for just one purpose.

Model - Failure

Model - Failure

What happens when you can’t update the head to your newly inserted changeset identity? You’ll be left with a dangling changeset that didn’t make it into “the circle of trust”. No harm done, except for wasting some storage. If the changesets were blobs in the cloud it might be useful to have a special purpose daemon to hunt these down and remove them (depends on how much storage costs vs the cost of building the daemon) . In general you should optimize for having as few “concurrency conflicts” as possible (it’s bad for business).

Conclusion

I know there are a lot of holes in this story. That’s intentional, I have a lot of unanswered questions myself. I’m especially interested in any comments that can point me to prior art outside the realm of sourcecontrol systems. I have no intention of using this in production. It’s just a mental exercise.

Acknowledgements

Most of what I mention here exists in one shape or another. By now some of you will have noticed the resemblance to certain aspects of Git’s internal object model, although I’m inclined to say it’s closer to what Mercurial does. The convention based file storage, can be found in the NoDB implementation by Chris Nicola. Concepts such as commits and event streams (and much more) can be found in the EventStore project.

Message contract DSL

A long time ago, I read this post by Rinat Abdullin on teaching Microsoft Visual Studio your own language. Unfortunately, at the time I was struggling with more basic CQRS concepts and so it got somewhat pushed to the background. Today, however, after authoring the gazillionth command and event by hand, I knew something had to change. This was just too tedious, manual labor.
Granted, it only takes 5 to 10 minutes at first to define a command or event. But as you progress, you start refactoring towards composable types, creating builders for those types, and before you know it, you spent 45 minutes on datastructures. Not my idea of productivity.
As I reread Rinat’s article, I knew I had reached that point where automation needs to kick in (it just took me a little longer). I wasn’t particularly keen on using ANTLR and an AST to define my grammar, so I started looking for alternatives. The usual suspects passed by: xml (too verbose), json (better, but still too verbose), yaml (dismissed because of lack of familiarity – your mileage may vary). I continued hacking in notepad until I hit something that looked pretty much like the example below.

[MakeDogBarkCommand]
DogId=Guid
Times=Int32

[DogBarkedEvent]
Id=Guid
Version=In64
NameOfDog=String (Called)
Times=Int32 (ForNumberOfTimes)

If you’ve been developing on Microsoft Windows systems for some time, this format should look familiar. Yeah, it’s good old INI file format with conventions sprinkled all over it. Section names denote types such as events, commands, datastructures, enumerations. Key names denote properties of the type, while the values denote each property’s datatype. There’s no translation here, those are just native .NET types or your own (a big plus compared to all the type conversion between xml/json and the .NET type system). Conventions applied include: anything that ends with ‘Event’ is an event, anything that ends with ‘Command’ is a command, anything that ends with ‘Fragment’ is a fragment to be inserted at the location of reference, anything that has a number for each key value is an enumeration, anything else is just a datastructure, values with a ‘type (name)’ signature denote a property type and a custom name to be used as builder method name.

If you’re using your events and commands directly as interop datastructures, this approach might not work for you.

The beauty of this technique lies in the fact that you can define a type and reuse it pretty much everywhere, thus composing commands and events from existing types. It doesn’t take a genius to figure out that you can go from the above message contract definition to the code below.

[Serializable] [ProtoContract]
public class MakeDogBarkCommand : ICommand {
  [ProtoMember(1)]
  public Guid DogId { get; private set; }
  [ProtoMember(2)]
  public Int32 Times { get; private set; }

  MakeDogBarkCommand() {}
  public MakeDogBarkCommand(Guid dogId, Int32 times) {
    DogId = dogId;
    Times = times;
  }
}

[Serializable] [ProtoContract]
public class DogBarkedEvent : IEvent {
  [ProtoMember(1)]
  public Guid Id { get; private set; }
  [ProtoMember(2)]
  public Int64 Version { get; private set; }
  [ProtoMember(3)]
  public String NameOfDog { get; private set; }
  [ProtoMember(4)]
  public Int32 Times { get; private set; }

  DogBarkedEvent() {}
  public DogBarkedEvent(Guid id, Int64 version, String nameOfDog, Int32 times) {
    Id = id;
    Version = version;
    NameOfDog = nameOfDog;
    Times =   times;
  }
}

public class DogBarkedEventBuilder : IEventBuilder<DogBarkedEvent> {
  String _nameOfDog;
  Int32 _times;

  public DogBarkedEventBuilder Called(String value) {
    _nameOfDog = value;
    return this;
  }

  public DogBarkedEventBuilder ForNumberOfTimes(Int32 value) {
    _times = value;
    return this;
  }

  public DogBarkedEvent Build(Guid id, Int64 version) {
    return new DogBarkedEvent(id, version, _nameOfDog, _times);
  }
}

Below is a more advanced example demonstrating the use of arrays, enumerations and composing a type from fragments, other custom and native types.

[AddProductSetToCartCommand]
CartId=Guid
ProductSet=ProductSet

[ProductSet]
Id=Guid
Name=String
Items=ProductSetItem[]

[ProductIdNameCategoryFragment]
ProductId=Guid
ProductName=String
ProductCategory=ProductCategory

[ProductSetItem]
Id=Int32
Product=ProductIdNameCategoryFragment
Amount=Int32

[ProductCategory]
Toys=1
ComputerGames=2
DVDs=3
CDs=4

As a side-note, to make the transition to this DSL a little easier, I threw together a bit of reflection-based code over my existing events/commands to spit out a file in the INI format.

The thing that stood out the most for me is the terseness with which a message contract can be authored. Combined with conventions and old-fashioned code generation this can be a real time-saver.

EventBuilders

Some people have asked me to clarify what I’ve been saying over on twitter about using event builders. This post is an attempt at showing how I’ve been using them.

This is just one way of creating fluent builders for your events. If you search the net, you’ll find many more (start here).

Let’s dive in. I’m assuming there’s an event in place (called BeganTemplateRollout in this example) along with some minor infrastructure that tells me the class I’m dealing with is indeed an event.

//Could be your own, the one from your favourite messaging framework, 
//or none at all

public interface IMessage {} 
public interface IEvent : IMessage {}

public class BeganTemplateRolloutEvent : IEvent {
  public readonly Guid Id;
  public readonly long Version;
  public readonly Events.RolloutPeriod PeriodToRollout;

  public BeganTemplateRolloutEvent(Guid id, long version, Events.RolloutPeriod periodToRollout) {
    Id = id; Version = version; PeriodToRollout = periodToRollout;
  }
}

public class RolloutPeriod  { 
  //i.e. Events.RolloutPeriod, 
  //the event representation of a value object

  public readonly DateTime StartDate;
  public readonly DateTime EndDate;

  public RolloutPeriod(DateTime startDate, DateTime endDate) {
    StartDate = startDate; EndDate = endDate;
  }
}

So, given that event, what does its builder look like?

public interface IEventBuilder<in TEvent> 
  where TEvent : IEvent
{
  TEvent Build(Guid id, long version); 
}

public class BeganTemplateRolloutEventBuilder : IEventBuilder<BeganTemplateRolloutEvent> {
  Events.RolloutPeriod _periodToRollout;

  public BeganTemplateRolloutEventBuilder ForPeriod(RolloutPeriodBuilder builder) { 
    //Notice the builder? Makes it more fluent, that's all.
    _periodToRollout= builder.Build();
    return this;
  }

  public BeganTemplateRolloutEvent Build(Guid id, long version) {
    return new BeganTemplateRolloutEvent(id, version, _periodToRollout);
  }
}

public class RolloutPeriodBuilder {
  DateTime _startsOn;
  DateTime _endsOn;

  public RolloutPeriodBuilder StartsOn(DateTime value) { _startsOn = value; }
  public RolloutPeriodBuilder EndsOn(DateTime value) { _endsOn = value; }

  public Events.RolloutPeriod Build() { return new Events.RolloutPeriod(_startsOn, _endsOn); }
}

public abstract class AggregateRootEntity {
  Guid _id;
  long _version;

  protected void ApplyEvent(IEventBuilder<IEvent> builder) {
    ApplyEvent(builder.Build(_id, ++_version));
  }

  private void ApplyEvent(IEvent @event) {
    /* You know what goes here ... */
  }
}

Why the explicit event builder interface? Basically this is a contract between the derived aggregate root entities and an AggregateRootEntity base class, where the latter provides event sourcing capabilities as found in Greg Young’s m-r. It allows the derived classes to concern themselves with the purely domain specific part of an event, while the base class can provide event identity (i.e. the aggregate identifier) and versioning (i.e. incrementing a version number upon each change or calculation a version hash or …). The base class makes no assumption about how the information is stored inside the event, by virtue of that Build method.

Having event builders is great, but where can event builders be used in your codebase? The short answer is: pretty much everywhere you’d be using an event. Be it in a command handler test to assert the proper event was produced, inside an event handler/denormalizer test to properly compose an event to be handled, or inside an aggregate to build up events in a readable manner before turning them over to the ApplyEvent(…) method. Below is an example of its usage inside an aggregate.

public class Template : AggregateRootEntity {
  public void BeginRollout(Domain.RolloutPeriod periodToRollout) {
    Guard.Against(
      IsPeriodRolledOut(periodToRollout), 
      ErrorCode.PeriodAlreadyRolledOut);

    ApplyEvent(
      Build.BeganTemplateRollout.
        ForPeriod(periodToRollout) //Double dispatch
    );
  }

  public bool IsPeriodRolledOut(Domain.RolloutPeriod periodToRollout) {
    /* Do what is necessary */
    return false;
  }
  
  void ApplyEvent(BeganTemplateRolloutEvent @event) { 
    // change internal state here
    // if you care for it
  }
}

public class RolloutPeriod { 
  //i.e. Domain.RolloutPeriod

  private DateTime _startDate;
  private DateTime _endDate;

  internal void BuildEvent(BeganTemplateRolloutEventBuilder builder) {
    builder.ForPeriod(
      Build.RolloutPeriod.
        StartsOn(_startDate).
        EndsOn(_endDate));
  }
}

internal static class Build {
  public static BeganTemplateRolloutEventBuilder BeganTemplateRollout { 
    get { return new BeganTemplateRolloutEventBuilder(); } 
  }
  public static RolloutPeriodBuilder RolloutPeriod { 
    get { return new RolloutPeriodBuilder(); } 
  }
}

public static class EventBuilderExtensions {
  public static BeganTemplateRolloutEventBuilder ForPeriod(
    this BeganTemplateRolloutEventBuilder builder, 
    Domain.RolloutPeriod value) 
  {
    value.BuildEvent(builder); //Double dispatch magic
    return builder;
  }
}

Aggregates and their events

There is an interesting relationship between events and the aggregates that produce them, at least to those of us who are building infrastructure for the event sourced domain model.

On one hand we want events to be decoupled from infrastructure. The focus should be on tracking intent & business data, especially in those classes that model the domain. They trigger events in response to behavior and apply state changes, that’s it. On the other hand we want a changeset (i.e. a set of events produced by an aggregate – a.k.a. a commit) to provide enough meta data so the infrastructure knows what aggregate/event stream it belongs to, what revision the aggregate is/was at to detect concurrent changes. Infrastructure might also be interested in the command that caused the changeset to happen (e.g. for idempotent behavior) or it might want to associate execution related data (performance, tracing) with the changeset. And that’s just for writing.

When it comes to reading, typically all the past changesets are read from an event store in order to restore the current state of an aggregate. Sometimes – to optimize aggregate read performance – we might add support for snapshots. The thing about reading is that we’re not really all that interested in the changesets themselves, but rather in the events they contain, across all those changesets. Again, some meta-data will need to stick to the aggregate, either directly embedded in the aggregate or using a map which tracks the meta-data associated with an aggregate. Why? Because at some point, you’ll want to save the changes you made to an aggregate.

Looking at event sourcing implementations you can see a lot of different ways of dealing with tracking meta-data:

In this example the “apply state change” methods are responsible for tracking the aggregate’s identifier and thus form the contract between aggregate and event. Mind you, only “aggregate construction” related events need to track this. Later on, while saving, the identifier (fetched by a “template” method) is used to tell the event storage service what aggregate/stream the events need to be associated with.

//Code courtesy of 
//https://github.com/gregoryyoung/m-r/blob/master/SimpleCQRS/Domain.cs
//in commit 5a7d7d0136e86c3d0cdd851cdf2d3de7d077f117

public abstract class AggregateRoot {
        public abstract Guid Id { get; }
        //Rest of members omitted for brevity
}

public class InventoryItem : AggregateRoot {
        private bool _activated;
        private Guid _id;

        private void Apply(InventoryItemCreated e) {
            _id = e.Id;
            _activated = true;
        }

        public override Guid Id {
            get { return _id; }
        }
        //Rest of members omitted for brevity
}

public interface IRepository<T> where T : AggregateRoot, new() {
        void Save(AggregateRoot aggregate, int expectedVersion);
        //Rest of members omitted for brevity
}

public class Repository<T> : IRepository<T> where T: AggregateRoot, new() {
        public void Save(AggregateRoot aggregate, int expectedVersion) {
            _storage.SaveEvents(aggregate.Id, aggregate.GetUncommittedChanges(), expectedVersion);
        }
        //Rest of members omitted for brevity
    }

In the following example the API designers have gone a little further. They’ve created a contract between the aggregate and the event in the form of a specialized interface called ISourcedEvent. It has a method called ClaimEvent which is used to couple the event to the aggregate. Basically, the “apply state change” methods are alleviated from tracking the identifier (and version) of the aggregate – the base class (and friends) takes care of that. Of course, there’s no free lunch: you events need to derive from ISourcedEvent (or the SourcedEvent base class) to make this work.

//Code courtesy of 
//https://github.com/ncqrs/ncqrs/blob/master/Framework/src/Ncqrs/Eventing/Sourcing/EventSource.cs
//https://github.com/ncqrs/ncqrs/blob/master/Framework/src/Ncqrs/Eventing/Sourcing/SourcedEventStream.cs
//https://github.com/ncqrs/ncqrs/blob/master/Framework/src/Ncqrs/Eventing/Sourcing/ISourcedEvent.cs
//in commit c3ca2490fbf9d1e6ab0411b32bb0589b187b23a8
public abstract class EventSource : IEventSource {
        private Guid _eventSourceId;

        private readonly SourcedEventStream _uncommittedEvents = new SourcedEventStream();

        public Guid EventSourceId   {
            get { return _eventSourceId; }
            protected set {
                Contract.Requires<InvalidOperationException>(Version == 0);

                _eventSourceId = value;
                _uncommittedEvents.EventSourceId = EventSourceId;
            }
        }

        public long Version {
            get {
                return InitialVersion + _uncommittedEvents.Count;
            }
        }
        private long _initialVersion;

        public long InitialVersion {
            get { return _initialVersion; }
            protected set {
                Contract.Requires<InvalidOperationException>(Version == InitialVersion);
                Contract.Requires<ArgumentOutOfRangeException>(value >= 0);

                _initialVersion = value;
                _uncommittedEvents.SequenceOffset = value;
            }
        }

        protected EventSource() {
            InitialVersion = 0;
            EventSourceId = NcqrsEnvironment.Get<IUniqueIdentifierGenerator>().GenerateNewId();
        }

        protected EventSource(Guid eventSourceId) {
            InitialVersion = 0;
            EventSourceId = eventSourceId;
        }

        public virtual void InitializeFromHistory(IEnumerable<ISourcedEvent> history) {
            //Omitted for brevity

            foreach (var historicalEvent in history) {
                if (InitialVersion == 0) {
                    EventSourceId = historicalEvent.EventSourceId;
                }

                ApplyEventFromHistory(historicalEvent);
                InitialVersion++; // TODO: Thought... couldn't we get this from the event?
            }
        }

        internal protected void ApplyEvent(ISourcedEvent evnt) {
            _uncommittedEvents.Append(evnt);

            //Omitted for brevity
        }

        private void ApplyEventFromHistory(ISourcedEvent evnt) {
            //Omitted for brevity
        }

        public void AcceptChanges() {
            long newInitialVersion = Version;

            _uncommittedEvents.Clear();

            InitialVersion = newInitialVersion;
        }

        //Rest of members omitted for brevity
}

public class SourcedEventStream : IEnumerable<ISourcedEvent> {
        public void Append(ISourcedEvent sourcedEvent) {
            ClaimEvent(sourcedEvent);

            _events.Add(sourcedEvent);
        }

        protected void ClaimEvent(ISourcedEvent evnt) {
            //Omitted for brevity

            var nextSequence = LastSequence + 1;
            evnt.ClaimEvent(EventSourceId, nextSequence);
        }
}

public interface ISourcedEvent : IEvent {
        Guid EventSourceId { get; }

        long EventSequence { get; }

        void InitializeFrom(StoredEvent stored);

        void ClaimEvent(Guid eventSourceId, long sequence);
}

There’s nothing wrong with either of these approaches. You just have to be aware of how they work. Both deal with getting/setting meta-data in their specific way. Aggregate identifier and, optionally, its version will – most of the time – appear both as meta-data and event-data. The identifier will be used to identify aggregates, while the version will probably serve as a means to detect optimistic concurrency, both submitted as part of a command later on.
Yet, isn’t it strange we never wonder why we’re modelling it that way? I blame the “LoadFromHistory” method for only taking in a stream of events. This severely limits your options and forces you to “derive” both the aggregate identifier and version from the event stream. Why not make it explicit? Recently, while developing StreamyCore (a toy-around project of mine), I’ve come up with the following API:

public abstract class AggregateRootEntity : IInitializeAggregate, ITrackAggregateChanges {
  public const long InitialVersion = 0;
  private Guid _identifier;
  private long _baseVersion;
  private long _currentVersion;
  private List<IEvent> _events;

  protected AggregateRootEntity() { }

  //Useful for those who want to embed the metadata into their events
  protected Guid AggregateId { 
    get { return _identifier; } 
  }
  protected long AggregateVersion { 
    get { return _currentVersion; } 
  }

  protected void Initialize(Guid identifier) {
    //Used when you're creating an aggregate yourself
    _identifier = identifier;
    _baseVersion = InitialVersion;
    _currentVersion = InitialVersion;
    _events = new List<IEvent>();
  }

  protected void ApplyEvent(IEvent @event) {
    PlayEvent(@event);
    RecordEvent(@event);
    _currentVersion++;
  }

  private void PlayEvent(IEvent @event) {
    //Plays the event to get a state change
  }

  private void RecordEvent(IEvent @event) {
    //Records the event
  }

  void IInitializeAggregate.Initialize(IAggregateConstructionSet set) {
    //Used when you read an aggregate from the event store
    _identifier = set.AggregateIdentifier;
    _currentVersion = set.AggregateBaseVersion;
    _events = new List<IEvent>();
    foreach(var @event in set.Events) { 
      PlayEvent(@event); _currentVersion++; 
    }
    _baseVersion = _currentVersion;
  }

  bool ITrackAggregateChanges.HasChanges() { 
    return _baseVersion != _currentVersion; 
  }

  IAggregateChangeSet ITrackAggregateChanges.GetChanges() {
    //Used when you write an aggregate to the event store
    return new AggregateChangeSet(_identifier, GetType(), 
      _baseVersion, _currentVersion, _events.ToArray());
  }

  void ITrackAggregateChanges.AcceptChanges() {
    _baseVersion = _currentVersion;

    _events.Clear();
  }
}

public class BeautyPageant : AggregateRootEntity {
  private BeautyPageant() {}

  private BeautyPageant(Guid id, IEvent @event) {
    Initialize(id);
    ApplyEvent(@event);
  }

  public static BeautyPageant NewPageant(Guid id, string name, int yearInTheGregorianCalendar) {
    return new BeautyPageant(id,
      new NewBeautyPageantEvent(id, name, yearInTheGregorianCalendar));
  }

  public void ElectBeautyQueen(string nameOfThePoorThing) {
    ApplyEvent(new BeautyQueenElectedEvent(AggregateId, AggregateVersion + 1, nameOfThePoorThing)));
  }

  private void Apply(NewBeautyPageantEvent @event) {  }

  private void Apply(BeautyQueenElectedEvent @event) {  }
}

The idea is to think of the identifier and version as totally separate concerns (i.e. separate from the events being applied) and make them explicit in the infrastructure APIs that need to deal with them. At the same time you have to cater for the scenario where the end-user new’s up the aggregate him-/herself. But with this approach there’s no requirement to track meta-data in the apply methods yourself, nor does it really force a base class/interface upon your events (even though I’m using one here (IEvent), albeit without behavior). Generally, I think of it as a different approach, not a better one.

Regardless of your personal preference, when designing your own API or reusing an existing one there are a couple of questions you should ask yourself:

  • Do I care about the coupling of my events to some base class or interface?
  • Do I consider my events to be immutable after construction?
  • Do I consider meta-data separate from event-data?
  • What kind meta-data tracking do I want to do myself? Or do I want some base class to do that for me?
  • Who tracks the meta-data? The aggregate’s base class? An external map?
  • Am I comfortable with the “derivation” of meta-data from event-data?
  • How will a user of my aggregate base class initialize meta-data? Am I explicitly communicating how to do that?
Follow

Get every new post delivered to your Inbox.

Join 607 other followers