MassTransit
RCommon integrates with MassTransit to deliver events to consumers across service boundaries. The integration uses MassTransitEventHandler<TEvent> as a MassTransit IConsumer<TEvent> that delegates to the application's ISubscriber<TEvent> implementation. Application handler code has no dependency on MassTransit types.
Installation
dotnet add package RCommon.MassTransitConfiguration
Use WithEventHandling<MassTransitEventHandlingBuilder> on the RCommon builder. The builder inherits from MassTransit's ServiceCollectionBusConfigurator, so all standard MassTransit configuration APIs are available directly on it:
using RCommon;
using RCommon.MassTransit;
using RCommon.MassTransit.Producers;
builder.Services.AddRCommon()
.WithEventHandling<MassTransitEventHandlingBuilder>(mt =>
{
// Register the producer that publishes events to the broker
mt.AddProducer<PublishWithMassTransitEventProducer>();
// Register subscribers; each call also adds a MassTransit consumer
mt.AddSubscriber<OrderShipped, OrderShippedHandler>();
mt.AddSubscriber<OrderCancelled, OrderCancelledHandler>();
// Standard MassTransit transport configuration
mt.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(ctx);
});
});
AddSubscriber<TEvent, TEventHandler> performs three registrations:
- Registers
ISubscriber<TEvent>in the DI container as transient. - Calls
AddConsumer<MassTransitEventHandler<TEvent>>so MassTransit creates a consumer for the endpoint. - Records the event-to-producer association in the
EventSubscriptionManagerso the router routes this event only to the MassTransit producer.
Defining an event
Events must implement ISerializableEvent and must have a parameterless constructor for broker deserialization:
using RCommon.Models.Events;
public class OrderShipped : ISyncEvent
{
public OrderShipped(Guid orderId, string trackingNumber)
{
OrderId = orderId;
TrackingNumber = trackingNumber;
}
public OrderShipped() { }
public Guid OrderId { get; }
public string TrackingNumber { get; } = string.Empty;
}
Implementing a subscriber
Implement ISubscriber<TEvent>. No MassTransit types appear in handler code:
using RCommon.EventHandling.Subscribers;
public class OrderShippedHandler : ISubscriber<OrderShipped>
{
private readonly ILogger<OrderShippedHandler> _logger;
public OrderShippedHandler(ILogger<OrderShippedHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(OrderShipped @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Order {OrderId} shipped with tracking {TrackingNumber}",
@event.OrderId, @event.TrackingNumber);
await Task.CompletedTask;
}
}
Publishing events
Call IEventRouter.RouteEventsAsync after adding transactional events. The router forwards each event to PublishWithMassTransitEventProducer, which calls IBus.Publish:
public class ShippingService
{
private readonly IEventRouter _eventRouter;
public ShippingService(IEventRouter eventRouter)
{
_eventRouter = eventRouter;
}
public async Task ShipOrderAsync(Guid orderId, string trackingNumber, CancellationToken cancellationToken)
{
// ... update order state ...
_eventRouter.AddTransactionalEvent(new OrderShipped(orderId, trackingNumber));
await _eventRouter.RouteEventsAsync(cancellationToken);
}
}
Publish vs Send
Register SendWithMassTransitEventProducer instead of (or in addition to) PublishWithMassTransitEventProducer when you want point-to-point delivery to a single consumer endpoint:
mt.AddProducer<SendWithMassTransitEventProducer>();
SendWithMassTransitEventProducer calls IBus.Send internally. It is appropriate for command-style messages where only one consumer should process the event.
Transactional outbox
Pair MassTransit with the outbox pattern to guarantee at-least-once delivery. See Transactional Outbox.
How the consumer bridge works
MassTransitEventHandler<TEvent> implements both IMassTransitEventHandler<TEvent> and MassTransit's IConsumer<TEvent>. When MassTransit delivers a message, it calls Consume(ConsumeContext<TEvent>), which resolves ISubscriber<TEvent> from DI and calls HandleAsync. This keeps application handler code free of any MassTransit API surface.
// Framework code — you do not write this yourself
public class MassTransitEventHandler<TEvent> : IConsumer<TEvent>
where TEvent : class, ISerializableEvent
{
public async Task Consume(ConsumeContext<TEvent> context)
{
await _subscriber.HandleAsync(context.Message, context.CancellationToken);
}
}
API summary
| Type | Description |
|---|---|
MassTransitEventHandlingBuilder | Builder used with WithEventHandling<T>; inherits ServiceCollectionBusConfigurator |
IMassTransitEventHandlingBuilder | Interface for the MassTransit event handling builder |
PublishWithMassTransitEventProducer | IEventProducer that calls IBus.Publish (fan-out) |
SendWithMassTransitEventProducer | IEventProducer that calls IBus.Send (point-to-point) |
MassTransitEventHandler<TEvent> | Internal IConsumer<TEvent> that bridges MassTransit to ISubscriber<TEvent> |
IMassTransitEventHandler<TEvent> | Marker interface for MassTransit event handlers |