The journeylism of @yreynhout

On CQRS, DDD(D), ES, …

Aggregates and their events

There is an interesting relationship between events and the aggregates that produce them, at least to those of us who are building infrastructure for the event sourced domain model.

On one hand we want events to be decoupled from infrastructure. The focus should be on tracking intent & business data, especially in those classes that model the domain. They trigger events in response to behavior and apply state changes, that’s it. On the other hand we want a changeset (i.e. a set of events produced by an aggregate – a.k.a. a commit) to provide enough meta data so the infrastructure knows what aggregate/event stream it belongs to, what revision the aggregate is/was at to detect concurrent changes. Infrastructure might also be interested in the command that caused the changeset to happen (e.g. for idempotent behavior) or it might want to associate execution related data (performance, tracing) with the changeset. And that’s just for writing.

When it comes to reading, typically all the past changesets are read from an event store in order to restore the current state of an aggregate. Sometimes – to optimize aggregate read performance – we might add support for snapshots. The thing about reading is that we’re not really all that interested in the changesets themselves, but rather in the events they contain, across all those changesets. Again, some meta-data will need to stick to the aggregate, either directly embedded in the aggregate or using a map which tracks the meta-data associated with an aggregate. Why? Because at some point, you’ll want to save the changes you made to an aggregate.

Looking at event sourcing implementations you can see a lot of different ways of dealing with tracking meta-data:

In this example the “apply state change” methods are responsible for tracking the aggregate’s identifier and thus form the contract between aggregate and event. Mind you, only “aggregate construction” related events need to track this. Later on, while saving, the identifier (fetched by a “template” method) is used to tell the event storage service what aggregate/stream the events need to be associated with.

//Code courtesy of 
//https://github.com/gregoryyoung/m-r/blob/master/SimpleCQRS/Domain.cs
//in commit 5a7d7d0136e86c3d0cdd851cdf2d3de7d077f117

public abstract class AggregateRoot {
        public abstract Guid Id { get; }
        //Rest of members omitted for brevity
}

public class InventoryItem : AggregateRoot {
        private bool _activated;
        private Guid _id;

        private void Apply(InventoryItemCreated e) {
            _id = e.Id;
            _activated = true;
        }

        public override Guid Id {
            get { return _id; }
        }
        //Rest of members omitted for brevity
}

public interface IRepository<T> where T : AggregateRoot, new() {
        void Save(AggregateRoot aggregate, int expectedVersion);
        //Rest of members omitted for brevity
}

public class Repository<T> : IRepository<T> where T: AggregateRoot, new() {
        public void Save(AggregateRoot aggregate, int expectedVersion) {
            _storage.SaveEvents(aggregate.Id, aggregate.GetUncommittedChanges(), expectedVersion);
        }
        //Rest of members omitted for brevity
    }

In the following example the API designers have gone a little further. They’ve created a contract between the aggregate and the event in the form of a specialized interface called ISourcedEvent. It has a method called ClaimEvent which is used to couple the event to the aggregate. Basically, the “apply state change” methods are alleviated from tracking the identifier (and version) of the aggregate – the base class (and friends) takes care of that. Of course, there’s no free lunch: you events need to derive from ISourcedEvent (or the SourcedEvent base class) to make this work.

//Code courtesy of 
//https://github.com/ncqrs/ncqrs/blob/master/Framework/src/Ncqrs/Eventing/Sourcing/EventSource.cs
//https://github.com/ncqrs/ncqrs/blob/master/Framework/src/Ncqrs/Eventing/Sourcing/SourcedEventStream.cs
//https://github.com/ncqrs/ncqrs/blob/master/Framework/src/Ncqrs/Eventing/Sourcing/ISourcedEvent.cs
//in commit c3ca2490fbf9d1e6ab0411b32bb0589b187b23a8
public abstract class EventSource : IEventSource {
        private Guid _eventSourceId;

        private readonly SourcedEventStream _uncommittedEvents = new SourcedEventStream();

        public Guid EventSourceId   {
            get { return _eventSourceId; }
            protected set {
                Contract.Requires<InvalidOperationException>(Version == 0);

                _eventSourceId = value;
                _uncommittedEvents.EventSourceId = EventSourceId;
            }
        }

        public long Version {
            get {
                return InitialVersion + _uncommittedEvents.Count;
            }
        }
        private long _initialVersion;

        public long InitialVersion {
            get { return _initialVersion; }
            protected set {
                Contract.Requires<InvalidOperationException>(Version == InitialVersion);
                Contract.Requires<ArgumentOutOfRangeException>(value >= 0);

                _initialVersion = value;
                _uncommittedEvents.SequenceOffset = value;
            }
        }

        protected EventSource() {
            InitialVersion = 0;
            EventSourceId = NcqrsEnvironment.Get<IUniqueIdentifierGenerator>().GenerateNewId();
        }

        protected EventSource(Guid eventSourceId) {
            InitialVersion = 0;
            EventSourceId = eventSourceId;
        }

        public virtual void InitializeFromHistory(IEnumerable<ISourcedEvent> history) {
            //Omitted for brevity

            foreach (var historicalEvent in history) {
                if (InitialVersion == 0) {
                    EventSourceId = historicalEvent.EventSourceId;
                }

                ApplyEventFromHistory(historicalEvent);
                InitialVersion++; // TODO: Thought... couldn't we get this from the event?
            }
        }

        internal protected void ApplyEvent(ISourcedEvent evnt) {
            _uncommittedEvents.Append(evnt);

            //Omitted for brevity
        }

        private void ApplyEventFromHistory(ISourcedEvent evnt) {
            //Omitted for brevity
        }

        public void AcceptChanges() {
            long newInitialVersion = Version;

            _uncommittedEvents.Clear();

            InitialVersion = newInitialVersion;
        }

        //Rest of members omitted for brevity
}

public class SourcedEventStream : IEnumerable<ISourcedEvent> {
        public void Append(ISourcedEvent sourcedEvent) {
            ClaimEvent(sourcedEvent);

            _events.Add(sourcedEvent);
        }

        protected void ClaimEvent(ISourcedEvent evnt) {
            //Omitted for brevity

            var nextSequence = LastSequence + 1;
            evnt.ClaimEvent(EventSourceId, nextSequence);
        }
}

public interface ISourcedEvent : IEvent {
        Guid EventSourceId { get; }

        long EventSequence { get; }

        void InitializeFrom(StoredEvent stored);

        void ClaimEvent(Guid eventSourceId, long sequence);
}

There’s nothing wrong with either of these approaches. You just have to be aware of how they work. Both deal with getting/setting meta-data in their specific way. Aggregate identifier and, optionally, its version will – most of the time – appear both as meta-data and event-data. The identifier will be used to identify aggregates, while the version will probably serve as a means to detect optimistic concurrency, both submitted as part of a command later on.
Yet, isn’t it strange we never wonder why we’re modelling it that way? I blame the “LoadFromHistory” method for only taking in a stream of events. This severely limits your options and forces you to “derive” both the aggregate identifier and version from the event stream. Why not make it explicit? Recently, while developing StreamyCore (a toy-around project of mine), I’ve come up with the following API:

public abstract class AggregateRootEntity : IInitializeAggregate, ITrackAggregateChanges {
  public const long InitialVersion = 0;
  private Guid _identifier;
  private long _baseVersion;
  private long _currentVersion;
  private List<IEvent> _events;

  protected AggregateRootEntity() { }

  //Useful for those who want to embed the metadata into their events
  protected Guid AggregateId { 
    get { return _identifier; } 
  }
  protected long AggregateVersion { 
    get { return _currentVersion; } 
  }

  protected void Initialize(Guid identifier) {
    //Used when you're creating an aggregate yourself
    _identifier = identifier;
    _baseVersion = InitialVersion;
    _currentVersion = InitialVersion;
    _events = new List<IEvent>();
  }

  protected void ApplyEvent(IEvent @event) {
    PlayEvent(@event);
    RecordEvent(@event);
    _currentVersion++;
  }

  private void PlayEvent(IEvent @event) {
    //Plays the event to get a state change
  }

  private void RecordEvent(IEvent @event) {
    //Records the event
  }

  void IInitializeAggregate.Initialize(IAggregateConstructionSet set) {
    //Used when you read an aggregate from the event store
    _identifier = set.AggregateIdentifier;
    _currentVersion = set.AggregateBaseVersion;
    _events = new List<IEvent>();
    foreach(var @event in set.Events) { 
      PlayEvent(@event); _currentVersion++; 
    }
    _baseVersion = _currentVersion;
  }

  bool ITrackAggregateChanges.HasChanges() { 
    return _baseVersion != _currentVersion; 
  }

  IAggregateChangeSet ITrackAggregateChanges.GetChanges() {
    //Used when you write an aggregate to the event store
    return new AggregateChangeSet(_identifier, GetType(), 
      _baseVersion, _currentVersion, _events.ToArray());
  }

  void ITrackAggregateChanges.AcceptChanges() {
    _baseVersion = _currentVersion;

    _events.Clear();
  }
}

public class BeautyPageant : AggregateRootEntity {
  private BeautyPageant() {}

  private BeautyPageant(Guid id, IEvent @event) {
    Initialize(id);
    ApplyEvent(@event);
  }

  public static BeautyPageant NewPageant(Guid id, string name, int yearInTheGregorianCalendar) {
    return new BeautyPageant(id,
      new NewBeautyPageantEvent(id, name, yearInTheGregorianCalendar));
  }

  public void ElectBeautyQueen(string nameOfThePoorThing) {
    ApplyEvent(new BeautyQueenElectedEvent(AggregateId, AggregateVersion + 1, nameOfThePoorThing)));
  }

  private void Apply(NewBeautyPageantEvent @event) {  }

  private void Apply(BeautyQueenElectedEvent @event) {  }
}

The idea is to think of the identifier and version as totally separate concerns (i.e. separate from the events being applied) and make them explicit in the infrastructure APIs that need to deal with them. At the same time you have to cater for the scenario where the end-user new’s up the aggregate him-/herself. But with this approach there’s no requirement to track meta-data in the apply methods yourself, nor does it really force a base class/interface upon your events (even though I’m using one here (IEvent), albeit without behavior). Generally, I think of it as a different approach, not a better one.

Regardless of your personal preference, when designing your own API or reusing an existing one there are a couple of questions you should ask yourself:

  • Do I care about the coupling of my events to some base class or interface?
  • Do I consider my events to be immutable after construction?
  • Do I consider meta-data separate from event-data?
  • What kind meta-data tracking do I want to do myself? Or do I want some base class to do that for me?
  • Who tracks the meta-data? The aggregate’s base class? An external map?
  • Am I comfortable with the “derivation” of meta-data from event-data?
  • How will a user of my aggregate base class initialize meta-data? Am I explicitly communicating how to do that?
Advertisement

4 responses to “Aggregates and their events

  1. Szymon Pobiega February 13, 2011 at 21:09

    Great post. Take a look at new Ncqrs API inspired by Jonathan Oliver’s Event Store API: https://github.com/ncqrs/ncqrs/blob/feature-event-envelope/Framework/src/Ncqrs/Eventing/CommittedEventStream.cs. Now our ARs build themselves from this class and contained events are POCOs wrapped in an envelope.

  2. Pingback: Tweets that mention Aggregates and their events « The journeylism of @seagile -- Topsy.com

  3. Pingback: Important links on Event Sourcing and CQRS – Techtonica Blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: