需求是这样,有一个业务,它有很多的单据,且用户将单据均进行了维护,然后需要将这些单据一并提交回业务系统,但业务系统不能同时提交多个请求,它需要一定的间隔。
提交接口定义
@POST("sap/save")
Single<Response<String>> createTo(@Body Request<List<Order>> request, @Query("client") String client);
业务数据
List<Voucher> vouchers = _vouchers.getValue();
// 分组数据,然后进行提交
Map<String, List<MaterialVoucherQueryResponse>> groups = vouchers.parallelStream().collect(Collectors.groupingBy(MaterialVoucherQueryResponse::getTransferApply));
将业务数据转换为提交请求并串行提交
Flowable.fromIterable(groups.entrySet())
.concatMapSingle(entry -> FineClient.getInstance().createTo(buildRequest(entry.getValue()), "WMS")
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.onErrorResumeNext(Single::error))
.toList() // 收集所有的Response对象或异常到一个列表中
.subscribeOn(Schedulers.io()) // 在IO线程上执行操作
.observeOn(Schedulers.single()) // 在单一线程上处理最终的列表
.subscribe(new SingleObserver<>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {
addSubscription(disposable);
}
@Override
public void onSuccess(@NonNull List<Response<String>> responses) {
StringBuilder message = new StringBuilder();
if (CollectionUtils.isNotEmpty(responses)) {
_submitResponse.postValue(responses);
for (Response<String> result : responses) {
message.append(result.getMessage())
.append("\n");
}
postMessage(new ResultState<>(StateEnum.SAP, message.toString()));
}
setLoading(false);
}
@Override
public void onError(@NonNull Throwable e) {
setLoading(false);
postMessage(new ResultState<>(StateEnum.SAP, e.getMessage()));
}
});
文章评论