Today we going to look at a quick tutorial on how to use MassTransit as a messaging bus for your Asp.Net core 2.2 application. I have uploaded the sample project on GitHub, if you want to follow along
To begin spin up a new Api project, I am using Visual Studio 2019, I also added 2 nuget packages listed below.
In your Startup.cs file
Spin up new project
To begin spin up a new Api project, I am using Visual Studio 2019, I also added 2 nuget packages listed below.
<PackageReference Include="MassTransit.AspNetCore" Version="5.5.5" />
<PackageReference Include="MassTransit.RabbitMQ" Version="5.5.5" />
<PackageReference Include="MassTransit.RabbitMQ" Version="5.5.5" />
Startup.cs
In your Startup.cs file
public void ConfigureServices(IServiceCollection services)
{
//Register your message consumer
services.AddScoped<OrderConsumer>();
// Register MassTransit
services.AddMassTransit(x =>
{
x.AddConsumer<OrderConsumer>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost"), hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
cfg.ReceiveEndpoint(host, "submit-order", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.ConfigureConsumer<OrderConsumer>(provider);
});
}));
});
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
}
{
//Register your message consumer
services.AddScoped<OrderConsumer>();
// Register MassTransit
services.AddMassTransit(x =>
{
x.AddConsumer<OrderConsumer>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost"), hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
cfg.ReceiveEndpoint(host, "submit-order", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.ConfigureConsumer<OrderConsumer>(provider);
});
}));
});
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
}
Notice that we registered an OrderConsumer, we then went on to add it as a consumer and specified an endpoint for it. A consumer is a piece of code which gets executed when a matching message is received. We then go on and register IBus and a class called BusService which implements IHostedService, the reason for this is that we want our Bus service to start up when the application starts up, and also exit when our application stops.
OrderConsumer.cs
Below is our OrderConsumer class, which simply implements the IConsumer interface which is part of MassTransit.
public class OrderConsumer : IConsumer<IAddOrder>
{
public Task Consume(ConsumeContext<IAddOrder> context)
{
Console.WriteLine($"Receive message value: {context.Message.OrderDescription}");
return Task.CompletedTask;
}
}
{
public Task Consume(ConsumeContext<IAddOrder> context)
{
Console.WriteLine($"Receive message value: {context.Message.OrderDescription}");
return Task.CompletedTask;
}
}
IAddOrder.cs
Before we go far before is our interface IAddOrder, at this stage is worth stopping for a breather and also to point out that you donot have to build you message against an interface, rather its the recommended why according to MassTransit, you can view their recommendation here
public interface IAddOrder
{
string OrderDescription { get;}
}
{
string OrderDescription { get;}
}
BusService.cs
Now for the BusService, as noted above this is to simply start up our BusService when our application startsup as well as terminate when our application terminates.
public class BusService : IHostedService
{
private readonly IBusControl _busControl;
public BusService(IBusControl busControl)
{
_busControl = busControl;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return _busControl.StartAsync(cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _busControl.StopAsync(cancellationToken);
}
}
{
private readonly IBusControl _busControl;
public BusService(IBusControl busControl)
{
_busControl = busControl;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return _busControl.StartAsync(cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _busControl.StopAsync(cancellationToken);
}
}
TestController
Now for how we bring all this together in our controller.
[Route("api/[controller]")]
[ApiController]
public class TestController : ControllerBase
{
IBus _bus;
public TestController(IBus bus)
{
_bus = bus;
}
[HttpGet]
public async Task<IActionResult> Get()
{
var endpoint = await _bus.GetSendEndpoint(new Uri("rabbitmq://localhost:15672/submit-order"));
await endpoint.Send<IAddOrder>(new { OrderDescription="test order" });
return Ok();
}
}
[ApiController]
public class TestController : ControllerBase
{
IBus _bus;
public TestController(IBus bus)
{
_bus = bus;
}
[HttpGet]
public async Task<IActionResult> Get()
{
var endpoint = await _bus.GetSendEndpoint(new Uri("rabbitmq://localhost:15672/submit-order"));
await endpoint.Send<IAddOrder>(new { OrderDescription="test order" });
return Ok();
}
}
Comments
Post a Comment