Intro摘要:这里我们创建了一个并且指定了AckTimeout为 1s 来方便测试,并向队列里添加了两条消息,然后再从队列中取出消息进行消费,输出结果如下:AckQueue前面两个分别取出了队列里的两条消息,第三次没有取到消息返回了 ,因为此时队列中没有消息,之后将没有 a
有些时候我们会希望从消息队列里消费消息时只有消费成功 Ack 之后才将消息从队列中移除,我们尝试来实现一个支持 Ack 的队列
Sample使用示例如下:
varackQueue =newAckQueue(newAckQueueOptions{
AckTimeout = TimeSpan.FromSeconds(1)
});
awaitackQueue.EnqueueAsync(newCounterEvent { Counter =1});
awaitnew2});
varevent1 =awaitackQueue.DequeueAsync;
ArgumentException.ThrowIf(event1);
Console.WriteLine(@$"event1: {event1.ToJson}");
varevent2 =await
ArgumentException.ThrowIf(event2);
Console.WriteLine(@$"event2: {event2.ToJson}");
awaitackQueue.AckMessageAsync(event2.Properties.EventId);
varevent3 =await
Console.WriteLine(@$"event3: {event3.ToJson}");
awaitTask.Delay(2000);
ackQueue.RequeueUnAckedMessages;
varevent4 =await
Console.WriteLine(@$"event4: {event4.ToJson}");
ArgumentException.ThrowIf(event4);
awaitackQueue.AckMessageAsync(event4.Properties.EventId);
这里我们创建了一个并且指定了AckTimeout为 1s 来方便测试,并向队列里添加了两条消息,然后再从队列中取出消息进行消费,输出结果如下:AckQueue前面两个分别取出了队列里的两条消息,第三次没有取到消息返回了 ,因为此时队列中没有消息,之后将没有 ack 的消息重新添加到队列里,所以此时队列里重新将第一条消息加入了队列,所以我们再次取消息的时候再次取到了第一条消息,从输出的EventId/EventAt可以看出来和第一个消息是一样的Implement一般地我们可能会基于ConcurrentQueue或者Channel来实现一个内存队列,但是如果直接使用,取出来之后如果出现异常导致消息没有消费成功就会导致消息丢失,我们可以将消息存起来放在一个缓冲区,当消息消费成功 Ack 的时候再从缓冲区中移除,为了比较方便地处理我们也可以设置一个时间定期把缓冲区的消息重新加入回队列中。
实现代码如下:
publicsealed class AckQueueOptions
{
publicTimeSpan AckTimeout {get;set; } = TimeSpan.FromMinutes(10);
public boolAutoRequeue {get;set; }
publicTimeSpan RequeuePeriod {get;set1);
}
public sealed class AckQueue:IDisposable
{
private readonlyAckQueueOptions _options;
private readonlyConcurrentQueue _queue =new;
private readonlyConcurrentDictionarystring, IEvent> _unAckedMessages =new;
private readonlyTimer? _timer;
publicAckQueue :this(new) { }
publicAckQueue(AckQueueOptions options)
{
_options = options;
if(options.AutoRequeue)
{
_timer =newTimer(_ => RequeueUnAckedMessages, , options.RequeuePeriod, options.RequeuePeriod);
}
}
publicTask EnqueueAsync(TEvent @event, EventProperties? properties = )
{
properties ??=newEventProperties;
if(string.IsOrEmpty(properties.EventId))
{
properties.EventId = Guid.NewGuid.ToString;
}
if(properties.EventAt ==default)
{
properties.EventAt = DateTimeOffset.Now;
}
varinternalEvent =newEventWrapper
{
Data = @event
Properties = properties
};
_queue.Enqueue(internalEvent);
returnTask.CompletedTask;
}
publicTask?> DequeueAsync
{
if(_queue.TryDequeue(outvareventWrapper))
{
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
returnTask.FromResult((IEvent?)eventWrapper);
}
returnTask.FromResult?>;
}
publicTaskAckMessageAsync(stringeventId)
{
_unAckedMessages.TryRemove(eventId,out_);
returnTask.CompletedTask;
}
publicvoidRequeueUnAckedMessages
{
foreach(varmessagein_unAckedMessages)
{
if(DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout)
{
if(_unAckedMessages.TryRemove(message.Key,outvareventWrapper)
&& eventWrapper != )
{
_queue.Enqueue(eventWrapper);
}
}
}
}
publicvoidDispose
{
_timer?.Dispose;
}
}
为了实现自动化地将没有及时 Ack 的消息重新加入队列重新消费,我们添加了一个 Timer 默认 disable,enable 的时候定期重新将未及时 ack 的消息重新加入队列
更多例子可以参考单元测试:https://github.com/WeihanLi/WeihanLi.Common/blob/1.0.74/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
More这个实现是基于内存来实现的,如果要实现比较可靠的消息队列,基于 Redis 来实现的话要怎么实现呢?
Referenceshttps://github.com/WeihanLi/WeihanLi.Common/blob/dev/samples/DotNetCoreSample/EventTest.cs#L52
来源:opendotnet
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!