Skip to main content
Version: 2.4.1

MassTransit

RCommon integrates with MassTransit to deliver events to consumers across service boundaries. The integration uses MassTransitEventHandler<TEvent> as a MassTransit IConsumer<TEvent> that delegates to the application's ISubscriber<TEvent> implementation. Application handler code has no dependency on MassTransit types.

Installation

NuGet Package
dotnet add package RCommon.MassTransit

Configuration

Use WithEventHandling<MassTransitEventHandlingBuilder> on the RCommon builder. The builder inherits from MassTransit's ServiceCollectionBusConfigurator, so all standard MassTransit configuration APIs are available directly on it:

using RCommon;
using RCommon.MassTransit;
using RCommon.MassTransit.Producers;

builder.Services.AddRCommon()
.WithEventHandling<MassTransitEventHandlingBuilder>(mt =>
{
// Register the producer that publishes events to the broker
mt.AddProducer<PublishWithMassTransitEventProducer>();

// Register subscribers; each call also adds a MassTransit consumer
mt.AddSubscriber<OrderShipped, OrderShippedHandler>();
mt.AddSubscriber<OrderCancelled, OrderCancelledHandler>();

// Standard MassTransit transport configuration
mt.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(ctx);
});
});

AddSubscriber<TEvent, TEventHandler> performs three registrations:

  1. Registers ISubscriber<TEvent> in the DI container as transient.
  2. Calls AddConsumer<MassTransitEventHandler<TEvent>> so MassTransit creates a consumer for the endpoint.
  3. Records the event-to-producer association in the EventSubscriptionManager so the router routes this event only to the MassTransit producer.

Defining an event

Events must implement ISerializableEvent and must have a parameterless constructor for broker deserialization:

using RCommon.Models.Events;

public class OrderShipped : ISyncEvent
{
public OrderShipped(Guid orderId, string trackingNumber)
{
OrderId = orderId;
TrackingNumber = trackingNumber;
}

public OrderShipped() { }

public Guid OrderId { get; }
public string TrackingNumber { get; } = string.Empty;
}

Implementing a subscriber

Implement ISubscriber<TEvent>. No MassTransit types appear in handler code:

using RCommon.EventHandling.Subscribers;

public class OrderShippedHandler : ISubscriber<OrderShipped>
{
private readonly ILogger<OrderShippedHandler> _logger;

public OrderShippedHandler(ILogger<OrderShippedHandler> logger)
{
_logger = logger;
}

public async Task HandleAsync(OrderShipped @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Order {OrderId} shipped with tracking {TrackingNumber}",
@event.OrderId, @event.TrackingNumber);
await Task.CompletedTask;
}
}

Publishing events

Call IEventRouter.RouteEventsAsync after adding transactional events. The router forwards each event to PublishWithMassTransitEventProducer, which calls IBus.Publish:

public class ShippingService
{
private readonly IEventRouter _eventRouter;

public ShippingService(IEventRouter eventRouter)
{
_eventRouter = eventRouter;
}

public async Task ShipOrderAsync(Guid orderId, string trackingNumber, CancellationToken cancellationToken)
{
// ... update order state ...

_eventRouter.AddTransactionalEvent(new OrderShipped(orderId, trackingNumber));
await _eventRouter.RouteEventsAsync(cancellationToken);
}
}

Publish vs Send

Register SendWithMassTransitEventProducer instead of (or in addition to) PublishWithMassTransitEventProducer when you want point-to-point delivery to a single consumer endpoint:

mt.AddProducer<SendWithMassTransitEventProducer>();

SendWithMassTransitEventProducer calls IBus.Send internally. It is appropriate for command-style messages where only one consumer should process the event.

Transactional outbox

Pair MassTransit with the outbox pattern to guarantee at-least-once delivery. See Transactional Outbox.

How the consumer bridge works

MassTransitEventHandler<TEvent> implements both IMassTransitEventHandler<TEvent> and MassTransit's IConsumer<TEvent>. When MassTransit delivers a message, it calls Consume(ConsumeContext<TEvent>), which resolves ISubscriber<TEvent> from DI and calls HandleAsync. This keeps application handler code free of any MassTransit API surface.

// Framework code — you do not write this yourself
public class MassTransitEventHandler<TEvent> : IConsumer<TEvent>
where TEvent : class, ISerializableEvent
{
public async Task Consume(ConsumeContext<TEvent> context)
{
await _subscriber.HandleAsync(context.Message, context.CancellationToken);
}
}

API summary

TypeDescription
MassTransitEventHandlingBuilderBuilder used with WithEventHandling<T>; inherits ServiceCollectionBusConfigurator
IMassTransitEventHandlingBuilderInterface for the MassTransit event handling builder
PublishWithMassTransitEventProducerIEventProducer that calls IBus.Publish (fan-out)
SendWithMassTransitEventProducerIEventProducer that calls IBus.Send (point-to-point)
MassTransitEventHandler<TEvent>Internal IConsumer<TEvent> that bridges MassTransit to ISubscriber<TEvent>
IMassTransitEventHandler<TEvent>Marker interface for MassTransit event handlers
RCommonRCommon