摘要:在 rabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicP
在 rabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。
下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期
首先,你需要安装RabbitMQ的.NET客户端库。这可以通过NuGet包管理器来完成:
Install-Package RabbitMQ.Client在你的文件中,添加RabbitMQ的连接配置:public class RabbitMQOptions{
public string HostName { get; set; }
public int Port { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}在Startup.cs或程序启动时的配置方法中,注册RabbitMQ服务:// 绑定RabbitMQ配置
builder.Services.Configure(builder.Configuration.GetSection("RabbitMQ"));
// 注册RabbitMQ连接工厂
builder.Services.AddSingleton(sp =>
{
var options = sp.GetRequiredService>.Value;
var factory = new ConnectionFactory { HostName = options.HostName, Port = options.Port, UserName = options.UserName, Password = options.Password };
return new RabbitMQConnection(factory);
});
// 添加RabbitMQService的服务注册
builder.Services.AddSingleton; public interface IRabbitMQConnection : IDisposable
{
Task CreateChannel;
}
public class RabbitMQConnection : IRabbitMQConnection
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private bool _isDisposed;
public RabbitMQConnection(ConnectionFactory factory)
{
_factory = factory ?? throw new ArgumentException(nameof(factory));
_connection = factory.CreateConnectionAsync.Result;
}
public async Task CreateChannel
{
EnsureNotDisposed;
return await _connection.CreateChannelAsync;
}
public void Dispose
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_isDisposed) return;
if (disposing)
{
// Free any other managed objects here.
}
// Free any unmanaged objects here.
_connection.Dispose;
_isDisposed = true;
}
~RabbitMQConnection
{
Dispose(false);
}
private void EnsureNotDisposed
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(RabbitMQConnection));
}
}
}在你的服务或消费者中,注入IRabbitMQConnection并使用它来创建模型(channel):using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;
public class RabbitMQService
{
private readonly IRabbitMQConnection _connection;
public RabbitMQService(IRabbitMQConnection connection)
{
_connection = connection ?? throw new ArgumentException(nameof(connection));
}
public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default)
{
try
{
using var channel = _connection.CreateChannel;
var mesjson = JsonSerializer.Serialize(message);
Console.WriteLine("发送消息:"+ mesjson);
var body = Encoding.UTF8.Getbytes(mesjson);
var properties = new RabbitMQ.Client.BasicProperties
{
Persistent = true // 设置消息持久化
};
channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);
}
catch (OperationCanceledException ex)
{
Console.WriteLine($"Operation was canceled: {ex.Message}");
//throw; // Re-throw if you want to propagate the cancellation
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
//throw; // Re-throw if you want to propagate the error
}
}
public async Task ReceiveAsync(string queueName, Func callback, CancellationToken cancellationToken = default)
{
var channel = _connection.CreateChannel;
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: );
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray;
try
{
// 直接传递 model 和 body 给 callback,不需要转换
await callback(channel, body);
}
finally
{
//await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
}
};
await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
// Prevent the method from returning immediately
await Task.Delay(-1, cancellationToken);
}
}消费var app = builder.Build;
var rabbitMQService = app.Services.GetRequiredService;
var cancellationTokenSource = new CancellationTokenSource;
var cancellationToken = cancellationTokenSource.Token;
// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{
// 处理接收到的消息
//string message = Encoding.UTF8.GetString(body);
//Console.WriteLine($"收到消息 message: {message}");
//// 确认消息
//await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);
}, cancellationToken);生产端
来源:opendotnet
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!