MassTransit
RCommon integrates with MassTransit to deliver events to consumers across service boundaries. The integration registers MassTransitEventHandler<TEvent> as a MassTransit IConsumer<TEvent> that delegates to the application's ISubscriber<TEvent>. Handler code has no dependency on MassTransit types.
Installation
dotnet add package RCommon.MassTransitConfiguration
Use WithEventHandling<MassTransitEventHandlingBuilder> on the RCommon builder. The builder inherits from MassTransit's ServiceCollectionBusConfigurator, so all standard MassTransit configuration APIs are available directly on it:
using RCommon;
using RCommon.MassTransit;
using RCommon.MassTransit.Producers;
builder.Services.AddRCommon()
.WithEventHandling<MassTransitEventHandlingBuilder>(mt =>
{
// Register the producer that publishes events to the broker
mt.AddProducer<PublishWithMassTransitEventProducer>();
// Register subscribers; each call also adds a MassTransit consumer
mt.AddSubscriber<OrderShipped, OrderShippedHandler>();
mt.AddSubscriber<OrderCancelled, OrderCancelledHandler>();
// Standard MassTransit transport configuration
mt.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(ctx);
});
});
AddSubscriber<TEvent, TEventHandler> performs three registrations in one call:
- Registers
ISubscriber<TEvent>in the DI container as transient. - Calls
AddConsumer<MassTransitEventHandler<TEvent>>so MassTransit creates a consumer for the endpoint. - Records the event-to-producer association in the
EventSubscriptionManagerso the router routes this event only to MassTransit producers.
Defining an event
Events must implement ISerializableEvent and must include a parameterless constructor for broker deserialization:
using RCommon.Models.Events;
public class OrderShipped : ISyncEvent
{
public OrderShipped(Guid orderId, string trackingNumber)
{
OrderId = orderId;
TrackingNumber = trackingNumber;
}
// Required for MassTransit deserialization
public OrderShipped() { }
public Guid OrderId { get; }
public string TrackingNumber { get; } = string.Empty;
}
Implementing a subscriber (consumer)
Implement ISubscriber<TEvent>. No MassTransit types appear in handler code:
using RCommon.EventHandling.Subscribers;
public class OrderShippedHandler : ISubscriber<OrderShipped>
{
private readonly ILogger<OrderShippedHandler> _logger;
public OrderShippedHandler(ILogger<OrderShippedHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(OrderShipped @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Order {OrderId} shipped with tracking {TrackingNumber}",
@event.OrderId, @event.TrackingNumber);
await Task.CompletedTask;
}
}
Producing events
Queue events on IEventRouter and route them after your database work is complete. The router forwards each event to PublishWithMassTransitEventProducer, which calls IBus.Publish:
public class ShippingService
{
private readonly IEventRouter _eventRouter;
public ShippingService(IEventRouter eventRouter)
{
_eventRouter = eventRouter;
}
public async Task ShipOrderAsync(Guid orderId, string trackingNumber, CancellationToken cancellationToken)
{
// ... perform database work ...
_eventRouter.AddTransactionalEvent(new OrderShipped(orderId, trackingNumber));
await _eventRouter.RouteEventsAsync(cancellationToken);
}
}
Publish vs Send
Two producers are available. Choose based on the delivery semantics you need:
| Producer | MassTransit call | Use when |
|---|---|---|
PublishWithMassTransitEventProducer | IBus.Publish | Fan-out: all consumers subscribed to the type receive the message |
SendWithMassTransitEventProducer | IBus.Send | Point-to-point: a single endpoint receives the message |
Register SendWithMassTransitEventProducer instead of (or alongside) PublishWithMassTransitEventProducer:
mt.AddProducer<SendWithMassTransitEventProducer>();
Transport configuration
The builder inherits all MassTransit bus configuration methods. Any transport supported by MassTransit can be used:
// RabbitMQ
mt.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("amqp://guest:guest@localhost/");
cfg.ConfigureEndpoints(ctx);
});
// Azure Service Bus
mt.UsingAzureServiceBus((ctx, cfg) =>
{
cfg.Host("Endpoint=sb://...");
cfg.ConfigureEndpoints(ctx);
});
// In-memory (for testing)
mt.UsingInMemory((ctx, cfg) =>
{
cfg.ConfigureEndpoints(ctx);
});
Transactional outbox
Pair MassTransit with the outbox pattern for guaranteed at-least-once delivery. See Transactional Outbox.
How the consumer bridge works
MassTransitEventHandler<TEvent> implements both IMassTransitEventHandler<TEvent> and MassTransit's IConsumer<TEvent>. When MassTransit delivers a message it calls Consume(ConsumeContext<TEvent>), which resolves ISubscriber<TEvent> from DI and calls HandleAsync:
// Framework code — you do not write this yourself
public class MassTransitEventHandler<TEvent> : IConsumer<TEvent>
where TEvent : class, ISerializableEvent
{
public async Task Consume(ConsumeContext<TEvent> context)
{
await _subscriber.HandleAsync(context.Message, context.CancellationToken);
}
}
This keeps application handler code free of any MassTransit API surface.
API summary
| Type | Package | Description |
|---|---|---|
MassTransitEventHandlingBuilder | RCommon.MassTransit | Builder used with WithEventHandling<T>; inherits ServiceCollectionBusConfigurator |
IMassTransitEventHandlingBuilder | RCommon.MassTransit | Interface combining IEventHandlingBuilder and IBusRegistrationConfigurator |
PublishWithMassTransitEventProducer | RCommon.MassTransit | IEventProducer that calls IBus.Publish (fan-out) |
SendWithMassTransitEventProducer | RCommon.MassTransit | IEventProducer that calls IBus.Send (point-to-point) |
MassTransitEventHandler<TEvent> | RCommon.MassTransit | Internal IConsumer<TEvent> that bridges MassTransit to ISubscriber<TEvent> |
IMassTransitEventHandler<TEvent> | RCommon.MassTransit | Marker interface for MassTransit event handlers |
AddSubscriber<TEvent, THandler> | RCommon.MassTransit | Extension on IMassTransitEventHandlingBuilder; registers handler, consumer, and routing |