将Observable转换为另一个对象或数据结构ReactiveX的很多语言特定实现都有一种操作符让你可以将Observable或者Observable发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到Observable终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的Observable。在某些ReactiveX实现中,还有一个操作符用于将Observable转换成阻塞式的。
创建一个只在Observable生命周期内存在的一次性资源Using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。
给Observable发射的数据项附加一个时间戳RxJava中的实现为timestamp,它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped<T>的数据的Observable,每一项都包含数据的原始发射时间。timestamp默认在immediate调度器上执行,但是可以通过参数指定其它的调度器。Javadoc: [timestamp()](http://reactivex.io/RxJava/javadoc/rx/Observable.
对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable。RxJava中的实现为timeout,但是有好几个变体。
将一个发射数据的Observable转换为发射那些数据发射时间间隔的ObservableTimeInterval操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔的对象。RxJava中的实现为timeInterval,这个操作符将原始Observable转换为另一个Obserervable,后者发射一个标志替换前者的数据项,这个标志表示前者的两个连续发射物之间流逝的时间长度。
指定Observable自身在哪个调度器上执行很多ReactiveX实现都使用调度器 "来管理多线程环境中Observable的转场。你可以使用SubscribeOn操作符指定Observable在一个特定的调度器上运转。ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。在某些实现中还有一个UnsubscribeOn操作符。
操作来自Observable的发射物和通知Subscribe操作符是连接观察者和Observable的胶水。一个观察者要想看到Observable发射的数据项,或者想要从Observable获取错误和完成通知,它首先必须使用这个操作符订阅那个Observable。
强制一个Observable连续调用并保证行为正确一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。
指定一个观察者在哪个调度器上观察这个Observable很多ReactiveX实现都使用调度器 "来管理多线程环境中Observable的转场。你可以使用ObserveOn操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext, onCompleted, onError方法)。
Materialize将数据项和事件通知都当做数据项发射,Dematerialize刚好相反。一个合法的有限的Obversable将调用它的观察者的onNext方法零次或多次,然后调用观察者的onCompleted或onError正好一次。Materialize操作符将这一系列调用,包括原来的onNext通知和终止通知onCompleted或onError都转换为一个Observable发射的数据序列。
注册一个动作作为原始Observable生命周期事件的一种占位符你可以注册回调,当Observable的某个事件发生时,Rx会在与Observable链关联的正常通知集合中调用它。Rx实现了多种操作符用于达到这个目的。RxJava实现了很多Do操作符的变体。doOnEachdoOnEach操作符让你可以注册一个回调,它产生的Observable每发射一项数据就会调用它一次。
延迟一段指定的时间再发射来自Observable的发射物Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。RxJava的实现是 delay和delaySubscription。第一种delay接受一个定义时长的参数(包括数量和单位)。
这个页面列出了很多用于Observable的辅助操作符 — 将Observable转换成一个通知列表convert an Observable into a list of Notifications — 将上面的结果逆转回一个Observable — 给Observable发射的每个数据项添加一个时间戳 — 强制Observable按次序发射数据并且要求功能是完好的 — 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者 — 指定观察…
如果原始Observable遇到错误,重新订阅它期望它能正常终止Retry操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。Retry总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复,如上图所示。RxJava中的实现为retry和retryWhen。
从onError通知中恢复发射数据Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。在某些ReactiveX的实现中,有一个叫onErrorResumeNext的操作符,它的行为与Catch相似。RxJava将Catch实现为三个不同的操作符:onErrorReturn让Observable遇到错误时发射一个特殊的项并且正常终止。
很多操作符可用于对Observable发射的onError通知做出响应或者从错误中恢复,例如,你可以:吞掉这个错误,切换到一个备用的Observable继续发射数据吞掉这个错误然后发射默认值吞掉这个错误并立即尝试重启这个Observable吞掉这个错误,在一些回退间隔后重启这个Observable这是操作符列表: — 指示Observable在遇到错误时发射一个数据序列 — 指示Observable在遇到…
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。RxJava将这个操作符实现为zip和zipWith。
将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项Switch订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。
在数据序列的开头插入一条指定的项如果你想要一个Observable在发射数据之前先发射一个指定的数据序列,可以使用StartWith操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用Concat操作符。)可接受一个Iterable或者多个Observable作为函数的参数。
合并多个Observables的发射物使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。
关注时代Java