QsMessaging is a .NET 8 library designed for sending and receiving messages between services or components of your application using RabbitMQ or Azure Service Bus. It supports horizontal scalability, allowing multiple instances of the same service to handle messages efficiently.
Available on NuGet for seamless integration:
Project website:
https://pavlo-0.github.io/QsMessaging/
A simple, scalable messaging solution for distributed systems.
Note: Azure Service Bus support is in an early, not fully tested or implemented state. The API is the same as for RabbitMQ, but some features may be missing or behave unexpectedly.
Install the package using the following command:
dotnet add package QsMessaging
Registering the library is simple. Add the following two lines of code to your Program.cs:
// Add QsMessaging (use the default configuration)...
builder.Services.AddQsMessaging(options => { });
...
await host.UseQsMessaging();
RabbitMQ (default transport)
localhostguestguest5672builder.Services.AddQsMessaging(options =>
{
options.RabbitMQ.Host = "my-rabbitmq-host";
options.RabbitMQ.UserName = "myuser";
options.RabbitMQ.Password = "mypassword";
options.RabbitMQ.Port = 5672;
});
Azure Service Bus support is available but is not fully tested or implemented. The public interface (
IQsMessaging) is identical to RabbitMQ — no code changes are needed in your handlers or senders.
Set options.Transport = QsMessagingTransport.AzureServiceBus and supply a connection string:
builder.Services.AddQsMessaging(options =>
{
options.Transport = QsMessagingTransport.AzureServiceBus;
options.AzureServiceBus.ConnectionString = "<your-connection-string>";
});
...
await host.UseQsMessaging();
Use your Azure Service Bus namespace connection string directly:
builder.Services.AddQsMessaging(options =>
{
options.Transport = QsMessagingTransport.AzureServiceBus;
options.AzureServiceBus.ConnectionString =
"Endpoint=sb://your-namespace.servicebus.windows.net/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;" +
"SharedAccessKey=YOUR_KEY;";
});
The Azure Service Bus Emulator uses separate ports for AMQP (messaging) and the management API:
builder.Services.AddQsMessaging(options =>
{
options.Transport = QsMessagingTransport.AzureServiceBus;
options.AzureServiceBus.ConnectionString =
"Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";
options.AzureServiceBus.EmulatorAmqpPort = 5672;
options.AzureServiceBus.EmulatorManagementPort = 5300;
});
| Property | Default | Description |
|---|---|---|
ConnectionString |
(required) | Azure Service Bus connection string (cloud or emulator) |
EmulatorAmqpPort |
5672 |
AMQP port for the local emulator. Ignored for cloud namespaces. |
EmulatorManagementPort |
5300 |
Management/admin port for the local emulator. Ignored for cloud namespaces. |
AdministrationConnectionString |
null |
Optional separate connection string for admin operations. Falls back to ConnectionString when omitted. |
Define a message contract:
public class RegularMessageContract
{
public required string MyTextMessage { get; set; }
}
Inject IQsMessaging into your class:
public YourClass(IQsMessaging qsMessaging) {}
Then, use it to send a message:
await qsMessaging.SendMessageAsync(new RegularMessageContract { MyTextMessage = "My message." });
To handle the message, create a handler:
public class RegularMessageContractHandler : IQsMessageHandler<RegularMessageContract>
{
public Task<bool> Consumer(RegularMessageContract contractModel)
{
// Process the message here
return Task.FromResult(true);
}
}
All handlers discovered by QsMessaging are registered in DI as Transient. This means each message/request is handled by a fresh handler instance, and handlers have full support for constructor injection of your application services.
If your handler throws an exception, QsMessaging catches it and passes it to every registered IQsMessagingConsumerErrorHandler implementation.
Current behavior by transport:
If you need retry, dead-letter, alerting, or custom logging, implement IQsMessagingConsumerErrorHandler and handle the exception there.
Qs:{FullTypeName}:ex for exchanges and Qs:{FullTypeName}:permanent for durable queues. Azure Service Bus uses Qs-Queue-{FullTypeName} and Qs-Topic-{FullTypeName}. Long names are hashed.IQsMessageHandler<T>, instances compete on one shared queue, so one message is processed by one instance. For IQsEventHandler<T>, each instance gets its own temporary queue/subscription, so every instance receives the event.IQsMessagingConsumerErrorHandler, but the message is not automatically retried by the library.50000 ms. If no response arrives in time, the request fails with TimeoutException. Correlation ID is generated automatically per request as a new Guid string and copied to the response. Cancellation token is passed into transport operations, but timeout is the main response wait guard. Duplicate responses are not specially deduplicated by the library; late responses are ignored after the request is removed from the local store.public class MessagingErrorHandler : IQsMessagingConsumerErrorHandler
{
private readonly ILogger<MessagingErrorHandler> _logger;
public MessagingErrorHandler(ILogger<MessagingErrorHandler> logger)
{
_logger = logger;
}
public Task HandleErrorAsync(Exception exception, ErrorConsumerDetail detail)
{
_logger.LogError(
exception,
"Handler failed. Queue or entity: {QueueName}, Handler: {HandlerType}, Payload type: {PayloadType}",
detail.QueueName,
detail.HandlerType,
detail.GenericType);
// Add your own logic here:
// - save to database
// - send alert
// - push to dead-letter queue
// - trigger retry workflow
return Task.CompletedTask;
}
}
You can also use the Request/Response pattern to send a request and await a response. This is useful when you need to communicate between services and expect a response.
Define the request and response contracts:
public class MyRequestContract
{
public required string RequestMessage { get; set; }
}
public class MyResponseContract
{
public required string ResponseMessage { get; set; }
}
To send a request and await a response, use the RequestResponse<TRequest, TResponse>:
public class MyService
{
private readonly IQsMessaging _qsMessaging;
public MyService(IQsMessaging qsMessaging)
{
_qsMessaging = qsMessaging;
}
public async Task<MyResponseContract> SendRequestAsync(MyRequestContract request)
{
var response = await _qsMessaging.SendRequestResponseAsync<MyRequestContract, MyResponseContract>(request);
return response;
}
}
To handle requests, implement the IQsRequestResponseHandler<TRequest, TResponse> interface:
public class MyRequestHandler : IQsRequestResponseHandler<MyRequestContract, MyResponseContract>
{
public Task<MyResponseContract> Handle(MyRequestContract request)
{
// Process the request and create a response
return Task.FromResult(new MyResponseContract { ResponseMessage = "Response to: " + request.RequestMessage });
}
}
The examples below show how handlers can consume dependencies through constructor injection.
public interface IOrderProcessor
{
Task ProcessAsync(CreateOrderMessage message);
}
public class CreateOrderMessage
{
public required string OrderId { get; set; }
}
public class CreateOrderMessageHandler : IQsMessageHandler<CreateOrderMessage>
{
private readonly IOrderProcessor _orderProcessor;
private readonly ILogger<CreateOrderMessageHandler> _logger;
public CreateOrderMessageHandler(
IOrderProcessor orderProcessor,
ILogger<CreateOrderMessageHandler> logger)
{
_orderProcessor = orderProcessor;
_logger = logger;
}
public async Task<bool> Consumer(CreateOrderMessage contractModel)
{
_logger.LogInformation("Processing order {OrderId}", contractModel.OrderId);
await _orderProcessor.ProcessAsync(contractModel);
return true;
}
}
public interface IUserRepository
{
Task<UserDto?> GetByIdAsync(Guid id);
}
public class GetUserRequest
{
public Guid UserId { get; set; }
}
public class GetUserResponse
{
public string? Name { get; set; }
public bool Found { get; set; }
}
public class GetUserHandler : IQsRequestResponseHandler<GetUserRequest, GetUserResponse>
{
private readonly IUserRepository _userRepository;
public GetUserHandler(IUserRepository userRepository)
{
_userRepository = userRepository;
}
public async Task<GetUserResponse> Handle(GetUserRequest request)
{
var user = await _userRepository.GetByIdAsync(request.UserId);
return new GetUserResponse
{
Found = user is not null,
Name = user?.Name
};
}
}
For detailed documentation, visit the QsMessaging Wiki.
That’s all, folks!