Introduction to NServiceBus message processing at MSMQ endpoints

NServiceBus is similar to BizTalk. BizTalk is a message broker. NServiceBus is a message bus. It implements the bus architecture where backend systems have the infrastructure to process messages. Internally, NServiceBus uses MSMQ for receiving messages and RavenDB for scalability and transactions.

There are two types of message processing: Commands and Events. Command messages are sent to endpoints. Applications listening at the endpoint process the messages. We show a simple NServiceBus demo application which implements command processing at endpoints.

Overview of the Sample application

The Sample application has five projects:

  1. Console application which sends messages to an endpoint named queue1.
  2. Message Processor class library which processes the message in queue1 and moves it to queue2.
  3. Message Processor class library which processes the message in queue2 and moves it to queue3.
  4. Message Receiver class library which processes the message in queue3 and displays the message contents in the console.
  5. Class library which has the definition of messages used by all NServiceBus applications including the Console application.

Command Message Definition

NServiceBus.Interfaces class library has marker interfaces. IMessage is a marker interface representing a message. ICommand is another marker interface representing command messages. DummyMessage represents a command message. It implements the ICommand interface.

public class DummyMessage : ICommand
{
    public int Id { get; set; }
    public bool Processed1 { get; set; }
    public bool Processed2 { get; set; }
    public bool Received { get; set; }
}

The DummyMessage class is found in the NServiceBusDemo.Messages project.

Sending messages to an endpoint

To kickstart command processing at an endpoint, send command messages to the endpoint. Endpoint refers to a MSMQ queue. We create three private queues in MSMQ: queue1, queue2, queue3. Make all queues transactional. Ensure that appropriate security permissions is set to read messages from the queue and send messages to the queue.

NServiceBus allows applications to send messages to an endpoint using a sendOnly bus. The sendOnly bus does not require complicated configuration.

Configure.Serialization.Xml();
var bus = Configure.With()
    .DefaultBuilder()
    .UseTransport<Msmq>()
    .UnicastBus()
    .SendOnly();

var address = new Address(@"queue1", "localhost");
var message = new DummyMessage() { Id = 1 }; 
bus.Send(address, message);
Console.WriteLine("Sent message to queue 1");

Use the IBus interface to send messages to an endpoint. In the above code, a bus is configured with MSMQ Transport, Unicast Bus, XML serialization and marked as Send Only. NServiceBus uses an IoC container to build the dependent objects used by the Bus. The standard IoC container is Autofac.

The console application uses the above sendOnly bus. The console application is available in the NServiceBusDemo project.

Endpoint configuration

For processing messages, configure the endpoint. For command messages, use either Client or Server configuration.

For the sample application, Define three Endpoints: Message Processor 1, Message Processor 2 and Message Receiver. Message Processor 1 and Message Processor 2 listens to incoming messages in a queue. When the endpoint receives a message, it processes the message. And the processed message is sent to the next queue. The next Message processor picks it up for further processing.

Configure all the endpoints as servers.

[EndpointName("queue1")]
public class EndpointConfig : IConfigureThisEndpoint, 
                              AsA_Server, 
                              IWantCustomInitialization
{
    public void Init()
    {
        Configure.Serialization.Xml();
        var bus = Configure.With()
            .DefaultBuilder()
            .UseTransport<Msmq>()
            .DefineEndpointName("queue1")
            .PurgeOnStartup(false)
            .UnicastBus()
            .DisableTimeoutManager();
           
        Console.WriteLine("Bus configured");
    }
}

Specify the endpoint name in the class attribute. Endpoint name is queue1. This endpoint receives messages from the MSMQ queue queue1.

IConfigureThisEndpoint is a marker interface which indicates NServiceBus to configure the endpoint. The endpoint is configured as a server by inheriting from AsA_Server class.

To implement custom configuration, implement the IWantCustomInitialization interface. Implement the Init method of the interface. In the custom initialization, perform additional configurations. For example, we disable the Timeout Manager. This avoids additional RavenDB configurations.

Handler for processing Messages

Handler classes implement IHandleMessages<T> interface. Handler classes process messages using the Handle method.

public class Handler : IHandleMessages<DummyMessage>
{
    public void Handle(DummyMessage message)
    {
        message.Processed2 = true;
        Bus.Send(new Address("queue3", "localhost"), message);
        Console.WriteLine("Message sent to the next queue");
    }

    public IBus Bus { get; set; }
}

Handler classes are dependent on the Bus. Dependency injection injects Bus object into the handler. We mark the message as processed and send it to the next queue.

NServiceBus Host

NServiceBus applications are class libraries which require three packages from NuGet:

  1. NServiceBus Interfaces.
  2. NServiceBus.
  3. NServiceBus Host.

NServiceBus Host package does the following:

  1. Provides an app.config to allow you to configure endpoints via XML.
  2. A default endpoint configuration class using AsA_Server built-in configuration.
  3. Modify project properties so that the class libraries can be debugged using NServiceBusHost.exe.

NServiceBus applications are class libraries. But they can be run as a console application using NServiceBusHost.exe.

Running the Sample application

The sample application can be run using the following steps:

  1. Create three private queue – queue1, queue2, queue3 in MSMQ.
  2. Open Visual Studio in administrator mode.
  3. Run the NServiceBusDemo console application.
  4. Run the MessageProcessor1 library.
  5. Run the MessageProcessor2 library.
  6. Run the MessageReceiver library.

Running the console application creates a message in queue1. MessageProcessor1 application moves the message to queue2. MessageProcessor2 application moves it to queue3.  MessageReceiver receives the message from queue3.

Download the sample application: NServiceBusDemo

Related Posts

Leave a Reply

Your email address will not be published.