所有示例基于 System.Reactive + ReactiveUI,配合 WhenActivated + Disposables 自动释放订阅,杜绝内存泄漏。
前置统一模板(所有订阅标准写法)
csharp
运行
private readonly CompositeDisposable _disposables = new();
public void OnActivated()
{
this.WhenActivated(d =>
{
// 所有Rx订阅末尾 .DisposeWith(d);
});
}
1. 防抖、节流(防止重复点击、重复扫码)
1.1 Throttle 节流:N 毫秒内只取最后一次(扫码最常用)
场景:扫码枪连续快速扫同一个码,只处理最后一次
csharp
运行
// 300ms内多次扫码,只执行最后一次
MessageBus.Current.Listen<BarCodeMsg>()
.Throttle(TimeSpan.FromMilliseconds(300))
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(ProcessBarCode)
.DisposeWith(_disposables);
1.2 Debounce 防抖:停止触发 N 毫秒后执行(搜索框)
场景:商品搜索框边输入边请求接口,停止输入 300ms 再查询
csharp
运行
this.WhenAnyValue(x => x.SearchKey)
.Debounce(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
.Subscribe(SearchGoods)
.DisposeWith(_disposables);
1.3 DistinctUntilChanged 去重:相同值跳过
同一个条码短时间重复扫描直接忽略,避免重复调用 API
csharp
运行
MessageBus.Current.Listen<BarCodeMsg>()
.DistinctUntilChanged(x => x.BarCode)
.Subscribe(ProcessBarCode)
.DisposeWith(_disposables);
2. 线程调度(解决 WPF 跨线程 UI 报错)
SubscribeOn:在哪个线程执行数据源(网络请求、硬件读取放后台)
ObserveOn:在哪个线程接收结果更新 UI(必须切主线程)
csharp
运行
Observable.FromAsync(() => _apiService.GetGoodsList())
.SubscribeOn(RxApp.TaskpoolScheduler) // 后台线程请求接口
.ObserveOn(RxApp.MainThreadScheduler) // 切UI线程绑定数据
.Subscribe(list => GoodsList = list)
.DisposeWith(_disposables);
3. 数据转换 Select
把上游数据加工成需要的格式,类似 LINQ Select
csharp
运行
MessageBus.Current.Listen<StockChangeMsg>()
.Select(x => x.GoodsId) // 只提取商品ID
.Subscribe(RefreshSingleGoods)
.DisposeWith(_disposables);
4. 过滤 Where:只处理符合条件的数据
只处理上架商品,过滤下架商品扫码
csharp
运行
MessageBus.Current.Listen<BarCodeMsg>()
.Where(x => x.IsOnSale)
.Subscribe(ProcessBarCode)
.DisposeWith(_disposables);
5. 多数据流合并
5.1 Merge 合并多个相同类型流(多个硬件事件统一处理)
扫码事件 + 手动输入商品编码事件统一走同一个处理方法
csharp
运行
var scanStream = MessageBus.Current.Listen<BarCodeMsg>();
var inputStream = this.WhenAnyValue(x => x.InputCode).Select(c => new BarCodeMsg { BarCode = c });
scanStream.Merge(inputStream)
.Throttle(TimeSpan.FromMilliseconds(300))
.Subscribe(ProcessBarCode)
.DisposeWith(_disposables);
5.2 CombineLatest 多流都有值才触发(多条件筛选)
商品分类 + 价格区间两个筛选条件任意一个变化就刷新列表
csharp
运行
var categoryStream = this.WhenAnyValue(x => x.SelectedCategoryId);
var priceStream = this.WhenAnyValue(x => x.SelectedPriceRange);
categoryStream.CombineLatest(priceStream)
.Subscribe(tuple => QueryGoods(tuple.Item1, tuple.Item2))
.DisposeWith(_disposables);
6. 异常处理 Catch 防止单个请求崩溃整个程序
接口异常自动捕获,弹窗提示不闪退
csharp
运行
Observable.FromAsync(() => _saleApi.CreateOrder(orderDto))
.Catch<long, Exception>(ex =>
{
MessageBox.Show($"收银失败:{ex.Message}");
return Observable.Empty<long>(); // 异常返回空流,终止本次订阅
})
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(orderId => ShowOrderSuccess(orderId))
.DisposeWith(_disposables);
7. 超时 Timeout:防止接口卡死 POS
请求 10 秒没返回直接判定超时失败
csharp
运行
Observable.FromAsync(() => _api.GetGoodsByBarCode(code))
.Timeout(TimeSpan.FromSeconds(10))
.Catch<GoodsDto, TimeoutException>(_ =>
{
MessageBox.Show("接口请求超时,请检查网络");
return Observable.Empty<GoodsDto>();
})
.Subscribe(goods => AddToCart(goods))
.DisposeWith(_disposables);
8. 重复重试 Retry:网络抖动自动重试
网络偶尔闪断,最多重试 2 次再报错
csharp
运行
Observable.FromAsync(() => _api.UploadOfflineOrder())
.Retry(2)
.Catch<HttpResponseMessage, Exception>(ex =>
{
MessageBox.Show("同步失败,请稍后手动同步");
return Observable.Empty<HttpResponseMessage>();
})
.Subscribe()
.DisposeWith(_disposables);
9. Take 只取前 N 次、TakeUntil 条件终止
Take (1):只执行一次(弹窗确认、一次性弹窗)
csharp
运行
OkButton.Events().Click
.Take(1)
.Subscribe(_ => CloseWindow())
.DisposeWith(_disposables);
TakeUntil:当另一个事件触发就终止当前流
关闭窗口时,停止所有库存自动轮询
csharp
运行
Observable.Interval(TimeSpan.FromSeconds(10))
.TakeUntil(this.WhenAnyValue(x => x.IsClosed))
.Subscribe(_ => RefreshStock())
.DisposeWith(_disposables);
10. 定时轮询 Interval(库存心跳、设备在线检测)
每 10 秒请求一次接口,刷新设备在线状态
csharp
运行
Observable.Interval(TimeSpan.FromSeconds(10))
.Select(_ => Observable.FromAsync(() => _deviceApi.CheckPosStatus()))
.Switch()
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(UpdateDeviceStatus)
.DisposeWith(_disposables);
Switch 切换最新请求(轮询必备)
上一次接口没返回,下一次轮询来了直接舍弃上一次请求,防止数据错乱。
二、收银系统高频组合经典场景模板
场景 1:POS 扫码终极安全处理(防抖 + 去重 + 过滤 + 线程 + 异常)
csharp
运行
MessageBus.Current.Listen<BarCodeMsg>()
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged(x => x.BarCode)
.Where(x => !string.IsNullOrEmpty(x.BarCode))
.Select(x => Observable.FromAsync(() => _goodsApi.GetByBarCode(x.BarCode)))
.Switch()
.SubscribeOn(RxApp.TaskpoolScheduler)
.ObserveOn(RxApp.MainThreadScheduler)
.Catch<GoodsDto, Exception>(ex =>
{
MessageBox.Show($"查询商品异常:{ex.Message}");
return Observable.Empty<GoodsDto>();
})
.Subscribe(AddShoppingCart)
.DisposeWith(_disposables);
场景 2:按钮防重复提交(下单、结账)
csharp
运行
CheckoutCommand = ReactiveCommand.CreateFromTask(async () =>
{
return await _saleApi.CreateOrder(CurrentOrder);
});
CheckoutCommand.ThrownExceptions
.Subscribe(ex => MessageBox.Show($"结账失败:{ex.Message}"))
.DisposeWith(_disposables);
三、避坑要点
- 所有订阅必须绑定
Disposables,页面销毁自动释放,解决 Prism 事件聚合器最大痛点:内存泄漏; - 耗时网络、硬件 IO 必须
SubscribeOn放到线程池,避免 UI 卡顿; - 更新 UI 一定要
ObserveOn(RxApp.MainThreadScheduler); - 高频事件优先用
Throttle + DistinctUntilChanged; - 网络请求必须加
Catch+Timeout,防止 POS 卡死崩溃。
若文章对您有帮助,可以激励一下我哦,祝您平安幸福!
| 微信 | 支付宝 |
|---|---|
![]() |
![]() |
WPF中文网

