Skip to main content
Version: 2.4.1

MassTransit

RCommon integrates with MassTransit to deliver events to consumers across service boundaries. The integration registers MassTransitEventHandler<TEvent> as a MassTransit IConsumer<TEvent> that delegates to the application's ISubscriber<TEvent>. Handler code has no dependency on MassTransit types.

Installation

NuGet Package
dotnet add package RCommon.MassTransit

Configuration

Use WithEventHandling<MassTransitEventHandlingBuilder> on the RCommon builder. The builder inherits from MassTransit's ServiceCollectionBusConfigurator, so all standard MassTransit configuration APIs are available directly on it:

using RCommon;
using RCommon.MassTransit;
using RCommon.MassTransit.Producers;

builder.Services.AddRCommon()
.WithEventHandling<MassTransitEventHandlingBuilder>(mt =>
{
// Register the producer that publishes events to the broker
mt.AddProducer<PublishWithMassTransitEventProducer>();

// Register subscribers; each call also adds a MassTransit consumer
mt.AddSubscriber<OrderShipped, OrderShippedHandler>();
mt.AddSubscriber<OrderCancelled, OrderCancelledHandler>();

// Standard MassTransit transport configuration
mt.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(ctx);
});
});

AddSubscriber<TEvent, TEventHandler> performs three registrations in one call:

  1. Registers ISubscriber<TEvent> in the DI container as transient.
  2. Calls AddConsumer<MassTransitEventHandler<TEvent>> so MassTransit creates a consumer for the endpoint.
  3. Records the event-to-producer association in the EventSubscriptionManager so the router routes this event only to MassTransit producers.

Defining an event

Events must implement ISerializableEvent and must include a parameterless constructor for broker deserialization:

using RCommon.Models.Events;

public class OrderShipped : ISyncEvent
{
public OrderShipped(Guid orderId, string trackingNumber)
{
OrderId = orderId;
TrackingNumber = trackingNumber;
}

// Required for MassTransit deserialization
public OrderShipped() { }

public Guid OrderId { get; }
public string TrackingNumber { get; } = string.Empty;
}

Implementing a subscriber (consumer)

Implement ISubscriber<TEvent>. No MassTransit types appear in handler code:

using RCommon.EventHandling.Subscribers;

public class OrderShippedHandler : ISubscriber<OrderShipped>
{
private readonly ILogger<OrderShippedHandler> _logger;

public OrderShippedHandler(ILogger<OrderShippedHandler> logger)
{
_logger = logger;
}

public async Task HandleAsync(OrderShipped @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Order {OrderId} shipped with tracking {TrackingNumber}",
@event.OrderId, @event.TrackingNumber);
await Task.CompletedTask;
}
}

Producing events

Queue events on IEventRouter and route them after your database work is complete. The router forwards each event to PublishWithMassTransitEventProducer, which calls IBus.Publish:

public class ShippingService
{
private readonly IEventRouter _eventRouter;

public ShippingService(IEventRouter eventRouter)
{
_eventRouter = eventRouter;
}

public async Task ShipOrderAsync(Guid orderId, string trackingNumber, CancellationToken cancellationToken)
{
// ... perform database work ...

_eventRouter.AddTransactionalEvent(new OrderShipped(orderId, trackingNumber));
await _eventRouter.RouteEventsAsync(cancellationToken);
}
}

Publish vs Send

Two producers are available. Choose based on the delivery semantics you need:

ProducerMassTransit callUse when
PublishWithMassTransitEventProducerIBus.PublishFan-out: all consumers subscribed to the type receive the message
SendWithMassTransitEventProducerIBus.SendPoint-to-point: a single endpoint receives the message

Register SendWithMassTransitEventProducer instead of (or alongside) PublishWithMassTransitEventProducer:

mt.AddProducer<SendWithMassTransitEventProducer>();

Transport configuration

The builder inherits all MassTransit bus configuration methods. Any transport supported by MassTransit can be used:

// RabbitMQ
mt.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host("amqp://guest:guest@localhost/");
cfg.ConfigureEndpoints(ctx);
});

// Azure Service Bus
mt.UsingAzureServiceBus((ctx, cfg) =>
{
cfg.Host("Endpoint=sb://...");
cfg.ConfigureEndpoints(ctx);
});

// In-memory (for testing)
mt.UsingInMemory((ctx, cfg) =>
{
cfg.ConfigureEndpoints(ctx);
});

Transactional outbox

Pair MassTransit with the outbox pattern for guaranteed at-least-once delivery. See Transactional Outbox.

How the consumer bridge works

MassTransitEventHandler<TEvent> implements both IMassTransitEventHandler<TEvent> and MassTransit's IConsumer<TEvent>. When MassTransit delivers a message it calls Consume(ConsumeContext<TEvent>), which resolves ISubscriber<TEvent> from DI and calls HandleAsync:

// Framework code — you do not write this yourself
public class MassTransitEventHandler<TEvent> : IConsumer<TEvent>
where TEvent : class, ISerializableEvent
{
public async Task Consume(ConsumeContext<TEvent> context)
{
await _subscriber.HandleAsync(context.Message, context.CancellationToken);
}
}

This keeps application handler code free of any MassTransit API surface.

API summary

TypePackageDescription
MassTransitEventHandlingBuilderRCommon.MassTransitBuilder used with WithEventHandling<T>; inherits ServiceCollectionBusConfigurator
IMassTransitEventHandlingBuilderRCommon.MassTransitInterface combining IEventHandlingBuilder and IBusRegistrationConfigurator
PublishWithMassTransitEventProducerRCommon.MassTransitIEventProducer that calls IBus.Publish (fan-out)
SendWithMassTransitEventProducerRCommon.MassTransitIEventProducer that calls IBus.Send (point-to-point)
MassTransitEventHandler<TEvent>RCommon.MassTransitInternal IConsumer<TEvent> that bridges MassTransit to ISubscriber<TEvent>
IMassTransitEventHandler<TEvent>RCommon.MassTransitMarker interface for MassTransit event handlers
AddSubscriber<TEvent, THandler>RCommon.MassTransitExtension on IMassTransitEventHandlingBuilder; registers handler, consumer, and routing
RCommonRCommon