RabbitMQ + .NET + Mass Transit
Notas sobre o uso do RabbitMQ com Mass Transit, usando C#
Filas
A estrutura mais simples dentro do RabbitMQ são as filas. Nessa estrutura há o envio de dados feito por um producer e o seu consumo é feito por um consumer.
Fila é uma estrutura de dados caracterizada pelo comportamento FIFO - first in, first out
Exchange
Subindo o nível das estruturas presentes na solução RabbitMQ temos as Exchanges que podem ser lidas como um distribuidor de mensagens.
Mas quando isso pode ser útil?! Situações onde uma mesma mensagem tem múltiplos consumidores diferentes. Exemplo: Restaurantes processam o pedido do cliente e inserem numa exchange (em outros provedores pode ser chamado de tópico), e então esse mesmo pedido será processado em uma fila na cozinha, para fazer criar os pratos, e também pode ser processada em uma fila de financeiro para levantar os lucros do dia.
Como é feito isso?! A exchange precisa de meios para rotear as mensagens, isso pode ser feito via interface, configurando a para enviar suas mensagens para filas específicas. Outra possibilidade é que para cada consumer será criado uma fila temporária onde o dado será consumido. Há também como usar headers para marcar a mensagem.
Mass Transit
Usando diretamente as Filas
Mesmo utilizando só filas, é criada uma exchange por padrão
Minhas observações
Achei mais difícil usar esta lib do que a disponibilizada pela própria RabbitMQ, para publicação dos dados
Gostei de usá-la para consumo, ficou mais agradável a leitura do código
Foi necessário utilizar o padrão da program + main para inicializar o serviço do consumer com a inversão do handler
Producer
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Common.Models;
using System.Data.Common;
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("default_user_dOjGazQi6DE0u23JWCu");
h.Password("25j4FKb0ESJQc4cYUJzJvbbmZNelOWnJ");
});
cfg.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();
System.Console.WriteLine("Start transmission...");
var idNumber = 100;
while (true)
{
var order = new Order(){
Id = idNumber.ToString(),
Message = $"Pedido qualquer de numero {idNumber}"
};
// var endpoint = await bus.GetSendEndpoint(new Uri("rabbitmq://localhost/order-service"));
//enviar para uma exchange
var endpoint = await bus.GetSendEndpoint(new Uri("queue:kitchen"));
await endpoint.Send(order);
Console.WriteLine($"Message {order.Id} sent...");
idNumber++;
Thread.Sleep(500);
}
Para enviar a uma fila específica é necessário utilizar dessa estrutura, onde passamos o nome da fila na criação do objeto
Uri("queue:nome_da_fila")
Para enviar a uma exchange devemos utilizar
Uri("rabbitmq://localhost/nome_da_exchange")
- Além disso será necessário configurar o bind para um fila específica via UI
Consumer
Program
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OrderHandler.Domain;
using OrderHandler.Domain.Interfaces;
using OrderHandler.Infrastructure;
internal class Program
{
private static void Main(string[] args)
{
var builder = Host.CreateDefaultBuilder(args);
builder.ConfigureServices((services) =>
{
services
.AddScoped<IOrderManager, OrderManager>()
.AddMassTransit(x =>
{
x.AddConsumer<Consumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("default_user_dOjGazQi6DE0u23JWCu");
h.Password("25j4FKb0ESJQc4cYUJzJvbbmZNelOWnJ");
});
cfg.ReceiveEndpoint("kitchen", ep =>
{
ep.ConfigureConsumer<Consumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
});
var host = builder.Build();
host.Run();
}
}
Consumer
using Common.Models;
using MassTransit;
using OrderHandler.Domain.Interfaces;
namespace OrderHandler.Infrastructure
{
public class Consumer : IConsumer<Order>
{
private readonly IOrderManager orderManager;
public Consumer(IOrderManager orderManager_)
{
orderManager = orderManager_;
}
public Task Consume(ConsumeContext<Order> context)
{
orderManager.PrintOrder(context.Message);
return Task.CompletedTask;
}
}
}
- Dessa forma toda mensagem lida é enviada para o consumer tratar
Usando as Exchanges
Minhas observações
Como foi dito, para que um exchange encaminhe os dados é necessário fazer o bind
Nosso consumer consegue configurar o bind quando é criado logo há uma ordem na subida de cada parte do sistema
- RabbitMQ => Consumer => Producer
o código é o mesmo, só vou mostrar as pontuais diferenças
Producer
var endpoint = await bus.GetSendEndpoint(new Uri("rabbitmq://localhost/order-service")); //exchange:name
await endpoint.Send(order);
Consumer
cfg.Message<Order>(x => x.SetEntityName("order-service")); //exchange name
cfg.ReceiveEndpoint("finance", ep => //queue name
{
ep.ConfigureConsumer<Consumer>(context);
});
Configurando TTL - Time To Live
Determina um tempo de vida para mensagem, testei usando milissegundo, segundos e horas. Podemos implementar para dias também.
await endpoint.Send(order, config =>
{
config.TimeToLive = TimeSpan.FromSeconds(1);
});