Skip to content

Event Sourcing

The RIVORA Framework provides event sourcing primitives to persist domain events as the source of truth, enabling full audit trails and temporal queries.

Core Abstractions

IAggregateRoot

All event-sourced aggregates implement IAggregateRoot:

csharp
public interface IAggregateRoot
{
    Guid Id { get; }
    int Version { get; }
    IReadOnlyList<IDomainEvent> UncommittedEvents { get; }
    void LoadFromHistory(IEnumerable<IDomainEvent> history);
    void ClearUncommittedEvents();
}

IEventStore

The event store persists and retrieves domain events:

csharp
public interface IEventStore
{
    Task SaveEventsAsync(Guid aggregateId, IEnumerable<IDomainEvent> events, int expectedVersion, CancellationToken ct = default);
    Task<IReadOnlyList<IDomainEvent>> GetEventsAsync(Guid aggregateId, CancellationToken ct = default);
    Task<IReadOnlyList<IDomainEvent>> GetEventsAsync(Guid aggregateId, int fromVersion, CancellationToken ct = default);
}

InMemoryEventStore

A ready-to-use in-memory implementation for development and testing:

csharp
builder.Services.AddSingleton<IEventStore, InMemoryEventStore>();

Creating an Event-Sourced Aggregate

Step 1: Define domain events

csharp
public record OrderCreated(Guid OrderId, Guid CustomerId, DateTime CreatedAt) : IDomainEvent;
public record OrderItemAdded(Guid OrderId, Guid ProductId, int Quantity, decimal UnitPrice) : IDomainEvent;
public record OrderSubmitted(Guid OrderId, DateTime SubmittedAt) : IDomainEvent;
public record OrderCancelled(Guid OrderId, string Reason, DateTime CancelledAt) : IDomainEvent;

Step 2: Implement the aggregate

csharp
public class Order : AggregateRoot
{
    public Guid CustomerId { get; private set; }
    public OrderStatus Status { get; private set; }
    public List<OrderItem> Items { get; private set; } = new();
    public decimal TotalAmount => Items.Sum(i => i.Quantity * i.UnitPrice);

    // Constructor for creating a new order
    public Order(Guid customerId)
    {
        RaiseEvent(new OrderCreated(Guid.NewGuid(), customerId, DateTime.UtcNow));
    }

    // Private constructor for rehydration
    private Order() { }

    public void AddItem(Guid productId, int quantity, decimal unitPrice)
    {
        if (Status != OrderStatus.Draft)
            throw new InvalidOperationException("Cannot add items to a non-draft order.");

        RaiseEvent(new OrderItemAdded(Id, productId, quantity, unitPrice));
    }

    public void Submit()
    {
        if (Status != OrderStatus.Draft)
            throw new InvalidOperationException("Only draft orders can be submitted.");
        if (!Items.Any())
            throw new InvalidOperationException("Cannot submit an empty order.");

        RaiseEvent(new OrderSubmitted(Id, DateTime.UtcNow));
    }

    public void Cancel(string reason)
    {
        if (Status == OrderStatus.Cancelled)
            throw new InvalidOperationException("Order is already cancelled.");

        RaiseEvent(new OrderCancelled(Id, reason, DateTime.UtcNow));
    }

    // Event handlers (called during rehydration and when events are raised)
    private void Apply(OrderCreated e)
    {
        Id = e.OrderId;
        CustomerId = e.CustomerId;
        Status = OrderStatus.Draft;
    }

    private void Apply(OrderItemAdded e)
    {
        Items.Add(new OrderItem(e.ProductId, e.Quantity, e.UnitPrice));
    }

    private void Apply(OrderSubmitted e)
    {
        Status = OrderStatus.Submitted;
    }

    private void Apply(OrderCancelled e)
    {
        Status = OrderStatus.Cancelled;
    }
}

Step 3: Use the aggregate with the event store

csharp
public class OrderService
{
    private readonly IEventStore _eventStore;

    public OrderService(IEventStore eventStore)
    {
        _eventStore = eventStore;
    }

    public async Task<Order> CreateOrderAsync(Guid customerId, CancellationToken ct)
    {
        var order = new Order(customerId);
        await _eventStore.SaveEventsAsync(order.Id, order.UncommittedEvents, 0, ct);
        order.ClearUncommittedEvents();
        return order;
    }

    public async Task<Order> GetOrderAsync(Guid orderId, CancellationToken ct)
    {
        var events = await _eventStore.GetEventsAsync(orderId, ct);
        var order = new Order();
        order.LoadFromHistory(events);
        return order;
    }

    public async Task AddItemAsync(Guid orderId, Guid productId, int quantity, decimal unitPrice, CancellationToken ct)
    {
        var order = await GetOrderAsync(orderId, ct);
        order.AddItem(productId, quantity, unitPrice);
        await _eventStore.SaveEventsAsync(orderId, order.UncommittedEvents, order.Version, ct);
        order.ClearUncommittedEvents();
    }
}

Registration

csharp
// In Program.cs
builder.Services.AddRvrEventSourcing(options =>
{
    options.UseInMemoryStore();       // For development
    // options.UseSqlServerStore();   // For production
});

Replaying Events

Replay events to rebuild read models or projections:

csharp
var events = await _eventStore.GetEventsAsync(orderId);
var order = new Order();
order.LoadFromHistory(events);

// order is now fully rehydrated to its current state
Console.WriteLine($"Order {order.Id}: {order.Status}, {order.Items.Count} items, total: {order.TotalAmount}");

Released under the MIT License.