.NET 响应式编程 System.Reactive 系列文章(三):Subscribe 和 IDisposable 的深入理解

B站影视 2025-01-10 08:42 2

摘要:在前两篇文章中,我们详细介绍了和IObserver的核心概念及交互流程。但在实际使用System.Reactive时,一个常见的误区是认为数据流一旦订阅,就不需要额外管理。这种认知是危险的,因为

# 引言:为什么理解 Subscribe 和 IDisposable 很重要?

在前两篇文章中,我们详细介绍了和IObserver的核心概念及交互流程。但在实际使用System.Reactive时,一个常见的误区是认为数据流一旦订阅,就不需要额外管理。这种认知是危险的,因为Observable 的订阅可能是无限的,如果不管理好订阅的生命周期,很容易导致内存泄漏资源浪费。在 Rx 中,方法返回一个IDisposable接口对象,用于手动取消订阅和释放资源。另外,System.Reactive还提供了不返回IDisposable的Subscribe重载,这些重载方法通过CancellationToken管理订阅的生命周期。在本篇文章中,我们将深入探讨Subscribe 和 IDisposable的原理、这些特殊重载的设计原因,以及在实际使用中的应用场景。Subscribe是连接IObservable和IObserver的桥梁。当你调用Subscribe方法时:1.2 为什么 Subscribe 返回 IDisposable?普通的重载 返回一个IDisposable对象,允许你通过调用Dispose方法取消订阅。这是管理数据流生命周期的核心机制之一。System.Reactive提供了一些特殊的Subscribe重载方法,它们不返回IDisposable,而是依赖于来控制订阅的生命周期。这些方法设计的目的是为了提供一种外部取消订阅的机制,让你无需手动管理Dispose的调用。2.1 方法签名以下是其中一个不返回的public static void Subscribe(
this IObservable source,
Action onNext,
Action onError,
Action onCompleted,
CancellationToken cancellationToken
);
这种重载方法的使用场景是:你希望通过来控制订阅的生命周期,而不是手动调用Dispose。2.2 示例代码:使用 CancellationToken 管理订阅示例:超时取消订阅using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
static void Main(string args)
{
IObservable observable=Observable.Interval(TimeSpan.FromSeconds(1 ));

CancellationTokenSource cts=new;

// 使用 Subscribe 方法并传入 CancellationToken
observable.Subscribe(
onNext: static value=>Console.WriteLine($"Received: {value}"),
onError: static ex=>Console.WriteLine($"Error: {ex.Message}"),
onCompleted: static =>Console.WriteLine("Completed"),
token: cts.Token
);

// 模拟运行 5 秒后取消订阅
Console.WriteLine("Running for 5 seconds...");
Thread.Sleep(5000);
cts.Cancel;
Console.WriteLine("Subscription cancelled.");
}
}

输出结果:

Running for 5 seconds...
Received:0
Received:1
Received:2
Received:3
Subscription cancelled.
2.3 使用场景:什么时候使用 CancellationToken?

使用场景

推荐的 Subscribe 重载

需要手动取消订阅

返回 IDisposable的重载

使用外部控制(如用户交互、超时)控制订阅

带 CancellationToken的重载

典型场景:

异步任务取消

在异步任务中使用 CancellationToken取消订阅数据流,避免阻塞或内存泄漏。

超时控制

使用 CancellationTokenSource.CancelAfter设置超时取消订阅。

2.4 示例:设置超时取消订阅using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
static void Main(string args)
{
IObservable observable=Observable.Interval(TimeSpan.FromSeconds(1 ));

CancellationTokenSource cts=new;
cts.CancelAfter(TimeSpan.FromSeconds(3 ));// 设置 3 秒后自动取消订阅

observable.Subscribe(
onNext: static value=>Console.WriteLine($"Received: {value}"),
onError: static ex=>Console.WriteLine($"Error: {ex.Message}"),
onCompleted: static =>Console.WriteLine("Completed"),
token: cts.Token
);

Console.WriteLine("Running...");
Thread.Sleep(5000);
Console.WriteLine("Program ended.");
}
}

输出结果:

Running...
Received:0
Received:1
Received:2
Program ended.
使用方式特点适用场景Subscribe返回IDisposable

允许手动取消订阅

长时间订阅或频繁管理多个订阅

虽然使用可以简化订阅管理,但也有一些需要注意的地方:

是否支持手动取消订阅

✅ 支持

❌ 不支持

是否支持外部控制订阅生命周期

❌ 需要手动调用 Dispose

✅ 通过 CancellationToken控制

在本篇文章中,我们详细探讨了 Subscribe 和 IDisposable的内部机制,并重点介绍了 带的Subscribe重载:下一篇文章预告《.NET 响应式编程 System.Reactive 系列文章(四):操作符基础》

下一篇文章将介绍 System.Reactive的基础操作符,包括如何创建转换过滤

数据流。我们将通过实战示例,帮助你快速掌握 Rx 的操作符使用方法。敬请期待!

来源:opendotnet

相关推荐