WPF中文网

ReactiveUI框架关于Rx 高频操作符速查表(WPF/POS 收银场景专用)

所有示例基于 System.Reactive + ReactiveUI,配合 WhenActivated + Disposables 自动释放订阅,杜绝内存泄漏。

前置统一模板(所有订阅标准写法)

csharp

运行

private readonly CompositeDisposable _disposables = new();

public void OnActivated()
{
    this.WhenActivated(d =>
    {
        // 所有Rx订阅末尾 .DisposeWith(d);
    });
}

1. 防抖、节流(防止重复点击、重复扫码)

1.1 Throttle 节流:N 毫秒内只取最后一次(扫码最常用)

场景:扫码枪连续快速扫同一个码,只处理最后一次

csharp

运行

// 300ms内多次扫码,只执行最后一次
MessageBus.Current.Listen<BarCodeMsg>()
    .Throttle(TimeSpan.FromMilliseconds(300))
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(ProcessBarCode)
    .DisposeWith(_disposables);

1.2 Debounce 防抖:停止触发 N 毫秒后执行(搜索框)

场景:商品搜索框边输入边请求接口,停止输入 300ms 再查询

csharp

运行

this.WhenAnyValue(x => x.SearchKey)
    .Debounce(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    .Subscribe(SearchGoods)
    .DisposeWith(_disposables);

1.3 DistinctUntilChanged 去重:相同值跳过

同一个条码短时间重复扫描直接忽略,避免重复调用 API

csharp

运行

MessageBus.Current.Listen<BarCodeMsg>()
    .DistinctUntilChanged(x => x.BarCode)
    .Subscribe(ProcessBarCode)
    .DisposeWith(_disposables);

2. 线程调度(解决 WPF 跨线程 UI 报错)

SubscribeOn:在哪个线程执行数据源(网络请求、硬件读取放后台)

ObserveOn:在哪个线程接收结果更新 UI(必须切主线程)

csharp

运行

Observable.FromAsync(() => _apiService.GetGoodsList())
    .SubscribeOn(RxApp.TaskpoolScheduler) // 后台线程请求接口
    .ObserveOn(RxApp.MainThreadScheduler) // 切UI线程绑定数据
    .Subscribe(list => GoodsList = list)
    .DisposeWith(_disposables);

3. 数据转换 Select

把上游数据加工成需要的格式,类似 LINQ Select

csharp

运行

MessageBus.Current.Listen<StockChangeMsg>()
    .Select(x => x.GoodsId) // 只提取商品ID
    .Subscribe(RefreshSingleGoods)
    .DisposeWith(_disposables);

4. 过滤 Where:只处理符合条件的数据

只处理上架商品,过滤下架商品扫码

csharp

运行

MessageBus.Current.Listen<BarCodeMsg>()
    .Where(x => x.IsOnSale)
    .Subscribe(ProcessBarCode)
    .DisposeWith(_disposables);

5. 多数据流合并

5.1 Merge 合并多个相同类型流(多个硬件事件统一处理)

扫码事件 + 手动输入商品编码事件统一走同一个处理方法

csharp

运行

var scanStream = MessageBus.Current.Listen<BarCodeMsg>();
var inputStream = this.WhenAnyValue(x => x.InputCode).Select(c => new BarCodeMsg { BarCode = c });

scanStream.Merge(inputStream)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .Subscribe(ProcessBarCode)
    .DisposeWith(_disposables);

5.2 CombineLatest 多流都有值才触发(多条件筛选)

商品分类 + 价格区间两个筛选条件任意一个变化就刷新列表

csharp

运行

var categoryStream = this.WhenAnyValue(x => x.SelectedCategoryId);
var priceStream = this.WhenAnyValue(x => x.SelectedPriceRange);

categoryStream.CombineLatest(priceStream)
    .Subscribe(tuple => QueryGoods(tuple.Item1, tuple.Item2))
    .DisposeWith(_disposables);

6. 异常处理 Catch 防止单个请求崩溃整个程序

接口异常自动捕获,弹窗提示不闪退

csharp

运行

Observable.FromAsync(() => _saleApi.CreateOrder(orderDto))
    .Catch<long, Exception>(ex =>
    {
        MessageBox.Show($"收银失败:{ex.Message}");
        return Observable.Empty<long>(); // 异常返回空流,终止本次订阅
    })
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(orderId => ShowOrderSuccess(orderId))
    .DisposeWith(_disposables);

7. 超时 Timeout:防止接口卡死 POS

请求 10 秒没返回直接判定超时失败

csharp

运行

Observable.FromAsync(() => _api.GetGoodsByBarCode(code))
    .Timeout(TimeSpan.FromSeconds(10))
    .Catch<GoodsDto, TimeoutException>(_ =>
    {
        MessageBox.Show("接口请求超时,请检查网络");
        return Observable.Empty<GoodsDto>();
    })
    .Subscribe(goods => AddToCart(goods))
    .DisposeWith(_disposables);

8. 重复重试 Retry:网络抖动自动重试

网络偶尔闪断,最多重试 2 次再报错

csharp

运行

Observable.FromAsync(() => _api.UploadOfflineOrder())
    .Retry(2)
    .Catch<HttpResponseMessage, Exception>(ex =>
    {
        MessageBox.Show("同步失败,请稍后手动同步");
        return Observable.Empty<HttpResponseMessage>();
    })
    .Subscribe()
    .DisposeWith(_disposables);

9. Take 只取前 N 次、TakeUntil 条件终止

Take (1):只执行一次(弹窗确认、一次性弹窗)

csharp

运行

OkButton.Events().Click
    .Take(1)
    .Subscribe(_ => CloseWindow())
    .DisposeWith(_disposables);

TakeUntil:当另一个事件触发就终止当前流

关闭窗口时,停止所有库存自动轮询

csharp

运行

Observable.Interval(TimeSpan.FromSeconds(10))
    .TakeUntil(this.WhenAnyValue(x => x.IsClosed))
    .Subscribe(_ => RefreshStock())
    .DisposeWith(_disposables);

10. 定时轮询 Interval(库存心跳、设备在线检测)

每 10 秒请求一次接口,刷新设备在线状态

csharp

运行

Observable.Interval(TimeSpan.FromSeconds(10))
    .Select(_ => Observable.FromAsync(() => _deviceApi.CheckPosStatus()))
    .Switch()
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(UpdateDeviceStatus)
    .DisposeWith(_disposables);

Switch 切换最新请求(轮询必备)

上一次接口没返回,下一次轮询来了直接舍弃上一次请求,防止数据错乱。

二、收银系统高频组合经典场景模板

场景 1:POS 扫码终极安全处理(防抖 + 去重 + 过滤 + 线程 + 异常)

csharp

运行

MessageBus.Current.Listen<BarCodeMsg>()
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged(x => x.BarCode)
    .Where(x => !string.IsNullOrEmpty(x.BarCode))
    .Select(x => Observable.FromAsync(() => _goodsApi.GetByBarCode(x.BarCode)))
    .Switch()
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .ObserveOn(RxApp.MainThreadScheduler)
    .Catch<GoodsDto, Exception>(ex =>
    {
        MessageBox.Show($"查询商品异常:{ex.Message}");
        return Observable.Empty<GoodsDto>();
    })
    .Subscribe(AddShoppingCart)
    .DisposeWith(_disposables);

场景 2:按钮防重复提交(下单、结账)

csharp

运行

CheckoutCommand = ReactiveCommand.CreateFromTask(async () =>
{
    return await _saleApi.CreateOrder(CurrentOrder);
});

CheckoutCommand.ThrownExceptions
    .Subscribe(ex => MessageBox.Show($"结账失败:{ex.Message}"))
    .DisposeWith(_disposables);

三、避坑要点

  1. 所有订阅必须绑定 Disposables,页面销毁自动释放,解决 Prism 事件聚合器最大痛点:内存泄漏;
  2. 耗时网络、硬件 IO 必须 SubscribeOn 放到线程池,避免 UI 卡顿;
  3. 更新 UI 一定要 ObserveOn(RxApp.MainThreadScheduler)
  4. 高频事件优先用 Throttle + DistinctUntilChanged
  5. 网络请求必须加 Catch + Timeout,防止 POS 卡死崩溃。

copyright @重庆教主 WPF中文网 联系站长:(QQ)23611316 (微信)movieclip (QQ群).NET小白课堂:864486030 | 本文由WPF中文网原创发布,谢绝转载 渝ICP备2023009518号-1