RabbitMQ + .NET + Mass Transit

RabbitMQ + .NET + Mass Transit

Notas sobre o uso do RabbitMQ com Mass Transit, usando C#

Filas

A estrutura mais simples dentro do RabbitMQ são as filas. Nessa estrutura há o envio de dados feito por um producer e o seu consumo é feito por um consumer.

Fila é uma estrutura de dados caracterizada pelo comportamento FIFO - first in, first out

Exchange

Subindo o nível das estruturas presentes na solução RabbitMQ temos as Exchanges que podem ser lidas como um distribuidor de mensagens.

Mas quando isso pode ser útil?! Situações onde uma mesma mensagem tem múltiplos consumidores diferentes. Exemplo: Restaurantes processam o pedido do cliente e inserem numa exchange (em outros provedores pode ser chamado de tópico), e então esse mesmo pedido será processado em uma fila na cozinha, para fazer criar os pratos, e também pode ser processada em uma fila de financeiro para levantar os lucros do dia.

Como é feito isso?! A exchange precisa de meios para rotear as mensagens, isso pode ser feito via interface, configurando a para enviar suas mensagens para filas específicas. Outra possibilidade é que para cada consumer será criado uma fila temporária onde o dado será consumido. Há também como usar headers para marcar a mensagem.

Mass Transit

Usando diretamente as Filas

Mesmo utilizando só filas, é criada uma exchange por padrão

Minhas observações

  • Achei mais difícil usar esta lib do que a disponibilizada pela própria RabbitMQ, para publicação dos dados

  • Gostei de usá-la para consumo, ficou mais agradável a leitura do código

  • Foi necessário utilizar o padrão da program + main para inicializar o serviço do consumer com a inversão do handler

Producer

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Common.Models;
using System.Data.Common;

var services = new ServiceCollection();
services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("default_user_dOjGazQi6DE0u23JWCu");
            h.Password("25j4FKb0ESJQc4cYUJzJvbbmZNelOWnJ");
        });
        cfg.ConfigureEndpoints(context);
    });
}); 

var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();

System.Console.WriteLine("Start transmission...");

var idNumber = 100;

while (true)
{
    var order = new Order(){
        Id = idNumber.ToString(),
        Message = $"Pedido qualquer de numero {idNumber}"
 };

    // var endpoint = await bus.GetSendEndpoint(new Uri("rabbitmq://localhost/order-service")); 
    //enviar para uma exchange

    var endpoint = await bus.GetSendEndpoint(new Uri("queue:kitchen"));

    await endpoint.Send(order);
    Console.WriteLine($"Message {order.Id} sent...");

    idNumber++;
    Thread.Sleep(500);
}
  • Para enviar a uma fila específica é necessário utilizar dessa estrutura, onde passamos o nome da fila na criação do objeto Uri("queue:nome_da_fila")

  • Para enviar a uma exchange devemos utilizar Uri("rabbitmq://localhost/nome_da_exchange")

    • Além disso será necessário configurar o bind para um fila específica via UI

Consumer

Program

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OrderHandler.Domain;
using OrderHandler.Domain.Interfaces;
using OrderHandler.Infrastructure;

internal class Program
{
    private static void Main(string[] args)
    {        
        var builder = Host.CreateDefaultBuilder(args);  

        builder.ConfigureServices((services) =>
        {
            services
            .AddScoped<IOrderManager, OrderManager>()
            .AddMassTransit(x =>
            {
                x.AddConsumer<Consumer>();
                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri("rabbitmq://localhost/"), h =>
                    {
                        h.Username("default_user_dOjGazQi6DE0u23JWCu");
                        h.Password("25j4FKb0ESJQc4cYUJzJvbbmZNelOWnJ");
                    });

                    cfg.ReceiveEndpoint("kitchen", ep =>
                    {
                        ep.ConfigureConsumer<Consumer>(context);
                    });

                    cfg.ConfigureEndpoints(context);
                });
            });
        });

        var host = builder.Build();

        host.Run();
    }
}

Consumer

using Common.Models;
using MassTransit;
using OrderHandler.Domain.Interfaces;
namespace OrderHandler.Infrastructure
{
    public class Consumer : IConsumer<Order>
    {
        private readonly IOrderManager orderManager;

        public Consumer(IOrderManager orderManager_)
        {
            orderManager = orderManager_;
        }

        public Task Consume(ConsumeContext<Order> context)
        {
            orderManager.PrintOrder(context.Message);
            return Task.CompletedTask;
        }
    }
}
  • Dessa forma toda mensagem lida é enviada para o consumer tratar

Usando as Exchanges

Minhas observações

  • Como foi dito, para que um exchange encaminhe os dados é necessário fazer o bind

  • Nosso consumer consegue configurar o bind quando é criado logo há uma ordem na subida de cada parte do sistema

    • RabbitMQ => Consumer => Producer

o código é o mesmo, só vou mostrar as pontuais diferenças

Producer

var endpoint = await bus.GetSendEndpoint(new Uri("rabbitmq://localhost/order-service")); //exchange:name
await endpoint.Send(order);

Consumer

cfg.Message<Order>(x => x.SetEntityName("order-service")); //exchange name
cfg.ReceiveEndpoint("finance", ep => //queue name
{
    ep.ConfigureConsumer<Consumer>(context);
});

Configurando TTL - Time To Live

Determina um tempo de vida para mensagem, testei usando milissegundo, segundos e horas. Podemos implementar para dias também.

await endpoint.Send(order, config =>
    {
        config.TimeToLive = TimeSpan.FromSeconds(1);
    });