RxJava 1.x
RxJava扩展了观察者模式的语义,添加了两个新的操作接口:
- onCompleted() 通知观察者,Observable没有更多的数据了
- onError() 观察到有错误出现了
而onNext() 将被观察者生产的事件通知到观察者
RxJava 1.x的四种基本角色
- Observable
- Observer
- Subscriber
- Subject
其中,Observable和Subject是两个”生产”(被观察者)的实体,Observer和Subscriber是两个”消费”(观察者)的实体。其中Observable、Observer是两个基础角色。
Subscriber实现了Observer,并且还添加了一个onStart(),它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作。
Subject = Observable + Observer
Obsrvable的变种 —— Single
Observable还有一个变种,就是Single,它总是只发射一个事件,或者一个错误通知
订阅Single只需要两个方法:
- onSuccess - Single发射单个的值到这个方法
- onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法
了解就行了,用法与Observable差不多
热的、冷的 Observables
热的:Observable只要一创建,就会立即开始emit(发射)数据。后续的订阅它的观察者,可能从序列中间的某个位置开始接受数据(有一些数据错过了)
冷的:Observable创建后,一直在等待,直到有观察者订阅它才开始emit数据
Subject = Observable + Observer
Subject,既是一个Observable,也是一个Observer。 可以emit数据,可以subscribe 一个Observable或其它Subject。
RxJava 提供四种不同的 Subject:
- PublishSubject
- BehaviorSubject
- ReplaySubject
- AsyncSubject
- UnicastSubject
- SerializedSubject
###PublishSubject
是Subject的一个子类,它通过create()创建实例。它是一个”冷的”Observable。直到触发它的onNext(T t),才开始emit数据,并完成订阅。
可用于实现类似EventBus的RxBus。
###BehaviorSubject
是Subject的一个子类,它通过create()创建实例。首先会向它的订阅者发送截止订阅前的最后一条数据流,然后才正常发送订阅后的数据流。
|
|
ReplaySubject
缓存订阅的数据,重发给订阅它的观察者
|
|
AsyncSubject
仅在Observable完成之后,发送最后一条数据给观察者。
然而如果当Observable因为异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。
|
|
UnicastSubject
只允许有一个 Subscriber 订阅(内部subscribeActual(),用到了AtomicBoolean 判断,第二次无就抛出异常)
|
|
SerializedSubject
当我们使用普通的Subject,必须要注意不要在多线程情况下调用onNext方法,
而使用SerializedSubject封装原来的 Subject即可!!
内部使用了SerializedObserver。查看其doc,如果是多线程环境,即有多个线程发射通知时,它们将被按序列执行:
1.允许仅有一个线程,执行一个emit
2.如果另一线程,已经emit,将下一个添加到通知队列
3.在循环emitting时,不持有任何锁或阻塞任何线程
|
|
RxJava操作符
(most from http://frodoking.github.io/2015/09/08/reactivex/)
大多数操作符:操作一个Observable并且返回一个Observable。
这样允许开发人员以一个链式的方式一个接一个的执行操作符。
在可修改的链式中,每一个操作结果Observable都是来之于上一个操作。
这里有一些类似于构造器Builder模式,该模式描述了一个含有一系方法的特定类通过操作方法来操作具有相同功能的类的每一项。
这个模式也允许你以类似的方式去链式操作方法。在Builder模式中,操作方法出现的顺序在链式中可能不是那么重要, 但是在Observable中的操作符顺序就很重要。
Observable操作符链不会依赖于原来的Observable去操作原始的链,每一个在Observable上的正在操作的operator都是上一个操作立即产生的。
创建Observable – 创建新的Observable的操作符
create——从头创建一个Observable,当观察者订阅Observable时,它作为一个参数传入,并执行call()
defer——不立即创建Observable,直到observer触发订阅动作。此方法为每一个observer创建一个新的Observable
empty/never/error——为非常精确和有限的行为而创建Observable:空的,不emit数据/不emit数据,且永远不会结束/不emit数据,以onError()结束
from——迭代一个序列(集合或数组),一个一个的发射数据
interval——创建一个具有发出一个整数序列间隔为一个特定的时间间隔的Observable
just——按顺序emit后面跟的”1到9个”数据
range——创建一个Observable,发送一系列连续的整数
repeat——创建一个Observable,发送一个特定的项目或项目重复序列
timer——创建一个Observable,在一个给定的一段时间延迟后发送一个对象或者项目
转换Observables – 转换成另一个Observable的操作符
- buffer——定期收集从Observable中发出的数据到集合中,每次发射一组,而不是发送一个
- concatMap——与flatMap非常相似。但它会将展开的元素,一个个有序的连接起来
- flatMap——将一个Observable发送的数据或者项目转换到Observables中,适用于将 T 变换为 Observable,Observable表示一个序列集。发送的次序可能是交错的
- flatMapIterable——与flatMap类似,只是它会将数据转换成一个Iterable
- groupBy——拆分一个Observable成多个Observable组,并且每个组发送的数据会组成一个不同的发送数据组当然这些发送数据时来自于原始的Observable。这些分组都是通过划分key来实现
- map——转换一个Observable发送的每个数据或者项目映射到一个函数上
- scan——应用一个函数给一个Observable发送出来的每一条数据, 并且是按照顺序发送每个连续值(t1,t2, return R)
- switchMap——与flatMap类似,除了一点: 当源Observable发射一个新的数据项时,
如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项
- window——类似buffer,但发射的是一个Observable,而不是列表
- lift——针对事件项和事件序列的操作符。对于事件项的类型转换,主要在一个新的Subscriber中完成。
- compose——需要一个Observable.Transformer型的入参。该类型的call(),操作的是一个Observable,并返回另一个Observable。所以compose用于在内部组合一组变换的场景。
过滤Observables – 过滤被Observable发送的数据的操作符
- debounce——如果Observable在一个特定时间间隔过去后没有发送其他数据或者项目,那么它只发送最后那个
- distinct——该Observable不可以发送重复的数据
- distinctUntilChanged——发送”跟上一个数据不重复”的新值
- elementAt——只发送可观测序列中的某一项。index从0开始
- filter——一个Observable只发送通过来特定测试描述语的匹配项
- first——只发出第一项,或第一项符合条件的项
- ignoreElements——不发送任何数据,但是必须反馈它的中断通知: Observable的onCompleted和onError事件
- last——只发送最后一项
- sample——发出Observables周期时间间隔内最新的项
- throttleFirst——发出Observables周期时间间隔内的第一项
- throttleLast——发出Observables周期时间间隔内的最后一项
- skip——跳过发送前几项
- skipLast——跳过发送后几项
- take——仅仅发送前几项
- takeLast——仅仅发送后几项
合并Observables – 将多个Observables合并成单个的Observable的操作符
- combineLatest——当某一项数据由两个Observables发送时,通过一个特殊的函数来合并每一个Observable发送的项,并且最终发送数据是该函数的结果
- join——合并两个Observables发送的结果数据。其中两个Observable的结果遵循如下规则:
每当一个Observable在定义的数据窗口中发送一个数据都是依据另外一个Observable发送的数据。
- merge——通过合并多个Observables发送的结果数据将多个Observables合并成一个
- mergeDelayError——即使发生了error也不打断merge操作,当所有merge结束后,才发射onError()
- startWith——在Observable源开始发送数据项目之前,先发送一个指定的项目序列
- zip——它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
- zipWith——与zip类似,只是一个成员方法,必须由一个Observable来发起。
错误处理操作符
(most from http://www.jianshu.com/p/5bb3e55a14c4)
retry——如果一个源Observable发送一个onError通知,需要重新发射,以期望不发生错误。
有三个变体方法:retry() 若有错,一直重新发射,直到无错误; retry(count) 若有错,只重新发射count次,直到无错误;retry(new Func2(){…}) ,三个参数的意思是,正常接收到的值, 发生的异常, 在发生错误时是否要重新发射,重写函数返回值 false 不重新发射。
- retryWhen——retryWhen类似retry。如果发射一个error,会传递给其观察者,并交由retryWhen中的Func1来操作,Func1又由Func2组成。Func2的call函数的返回值决定订阅过程是否重复发生:如果发射的error,订阅会终止,如果发射的是数据项,则会重新订阅
- onErrorReturn——若源Observable发生了错误或异常,替代源Observable调用Observer的onError方法。onErrorReturn中那个Func1实现被调用并接受这个错误或异常作为参数,这个Func1实现的返回值将作为onErrorReturn返回的值
- onErrorResumeNext——源Observable遇到错误,这个onErrorResumeNext会把源Observable用一个新的Observable替掉,然后这个新的Observable如果没遇到什么问题就会释放item给Observer。你可以直接将一个Observable实例传入onErrorResumeNext作为这个新的Observable,也可以传给onErrorResumeNext一个Func1实现,这个Func1实现接受源Observable的错误作为参数,返回新的Observable
- onExceptionResumeNext——与onErrorResumeNext类似。只是onExceptionResumeNext是在发生了Exception时,才触发。如果发生的不是一个Exception,仍会触发Observer的onError方法
实用工具操作符
delay——按照一个特定量及时的将Observable发送的结果数据向前推移
do——注册一个事件去监听Observable生命周期
doOnSubscribe在Subscriber#onStart()时回调
doOnNext在Observer#onNext()时回调
doOnError在Observer#onError()时回调
doOnCompleted在Observer#onCompleted()时回调
doOnEach在Observer#onNext()、onError()、onCompleted()时都会回调
materialize/Dematerialize——代表发送出来的项目数据或者通知,或相反过程
observeOn——指定一个observer将会观察这个Observable的调度
serialize——强制Observable按次序发射数据并且要求功能是完好的
subscribe——操作可观测的排放和通知
subscribeOn——指定一个Observable在被订阅的时候应该使用的调度
timeInterval——转换一个Observable的发送项目到另一个项目,在这些发送项之间,此项目具有指示这些发送的时间开销功能
timeout——镜像源Observable,但如果某段时间过后没有任何通知发出将会发出一个错误通知
timestamp——给一个Observable发送的每一个项目附加一个时间戳
using——创建一个一次性的资源,这个资源就像Observable一样有相同的寿命
条件和布尔运算操作符 – 评估一个或者多个Observables或者被Observables发送的项目的操作符
- all——确定发出的所有项目满足某些标准
- amb——给定一组Iterable来源,只发射第一个Observable的数据
- contains——判断Observable是否包含一个特定的项
- defaultIfEmpty——发送项从Observable源,或者如果Observable源没有任何发送内容,那么将会发送一个默认的项
- sequenceEqual——确定两个Observables发出相同的序列条目
- skipUntil——丢弃Observable发出的项,直到第二个Observable发出一项
- skipWhile——丢弃Observable发出的项,直到指定的条件变成了false
- takeUntil——在第二个Observable发送一项或者终止之后,丢弃Observable发出的项
- takeWhile——在指定的条件变成了false之后,丢弃Observable发出的项
类型转换操作符
- to——将一个Observable转换到另一个对象或数据结构。 toXxx
- cast——传入其它类型Class,进行自动转换
可连接到Observable的操作符 – 指定Observables有更多精确控制订阅动态的操作符
- connect——定义一个可连接的Observable发送项目数据给它的订阅者
- publish——把一个普通的Observable转化为一个可连接的Observable(向下转换)
- replay——返回一个Connectable Observable 对象并且可以缓存其发射过的数据,这样即使有订阅者在其发射数据之后进行订阅也能收到其之前发射过的数据。不过使用Replay操作符我们最好还是限定其缓存的大小,否则缓存的数据太多了可会占用很大的一块内存。对缓存的控制可以从空间和时间两个方面来实现,比如它的其它变体实现:replay(int bufferSize)、replay(int bufferSize, long time, TimeUnit unit) …
数学操作符
(For details, please see RxJavaMath and Mathematical-and-Aggregate-Operators)
主要使用MathObservable来操作数据
average——计算一个Observable发送所有结果的平均值,并且发射这个值
对应的变体有averageDouble、 averageFloat、 averageInteger、 averageLong
max——确定,发射最大值项
min——确定,发射最小值项
sum——计算Observable发射的所有数据的求和,并且发射这个求和结果
对应的变体有sumDouble、sumFloat、sumInteger、sumLong
以上方法,都有静态与非静态方法。静态方法要求传入一个Observable;非静态方法可通过from(Observable observable)返回一个MathObservable,来进行操作。
聚集操作符
(most from http://blog.csdn.net/jdsjlzx/article/details/51489793)
- concat——顺序连接多个Observables,并且严格按照发射顺序,前一个没有发射完,是不能发射后面的
- count/countLong——计算Observable源发出的项目数据数量,并发出这个值
- reduce——应用一个函数接收Observable发射的数据和函数的计算结果作为下次计算的参数,输出最后的结果。 跟scan操作符很类似,只是scan会输出每次计算的结果,而reduce只会输出最后的结果。
- collect——将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable
- toList——收集原始Observable发射的所有数据到一个列表,然后返回这个列表
- toSortedList——收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
- toMap——将序列数据转换为一个Map,Map的key是根据一个函数计算的
- toMultiMap——将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的
- toBlocking——转换成一个BlockingObservable。当满足条件的数据发射出来的时候才会返回一个BlockingObservable对象
#Schedulers 线程调度
在没有给定调度器(Scheduler)的情况下,Subscription将默认(产生事件与订阅)运行于调用线程上。
线程调度器(Scheduler)是将RxJava从同步观察者模式转到异步观察者模式的一个重要工具。
RxJava提供了5种主要的调度器:
- Scheduler Schedulers.io()
- Scheduler Schedulers.computation()
- Scheduler Schedulers.immediate()
- Scheduler Schedulers.newThread()
- Scheduler Schedulers.trampoline()
还有可用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form()
Schedulers.io()
内部创建一个rx.internal.schedulers.CachedThreadScheduler。底层实现是一个java中的ScheduledThreadPoolExecutor (extends ThreadPoolExecutorimplements ScheduledExecutorService)
|
|
corePoolSize=1, DEFAULT_KEEPALIVE_MILLIS=10L, DelayedWorkQueue是一个二叉树结构实现的BlockingQueue
整体还是一个无界(即容量特别大)的队列实现
例如,存储Bitmap到本地时,可以直接在Schedulers的io线程中执行任务:
|
|
Schedulers.computation()
内部是由 rx.internal.schedulers.EventLoopsScheduler 实现的。
这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认
调度器: buffer()、debounce()、delay()、interval()、sample()、skip()
Schedulers.immediate()
内部创建一个rx.internal.schedulers.ImmediateScheduler。 这个调度器允许你立即在当前线程执行指定的工作。
它是 timeout()、timeInterval() 及 timestamp() 方法默认的调度器
Schedulers.newThread()
内部创建一个rx.internal.schedulers.NewThreadScheduler。一底层跟Schedulers.io()一样是由java的ScheduledThreadPoolExecutor实现。
它为指定任务启动一个新的线程
Schedulers.trampoline()
内部创建一个rx.internal.schedulers.TrampolineScheduler。运行在当前线程。当有新任务时,并不会立即执行,而是将它加入队列PriorityBlockingQueue中,直到运行任务执行完成后,才从队列中按序取出一个继续执行。
它是repeat()和retry()默认的调度器
用于测试的调度器Schedulers.test()
(some from http://blog.csdn.net/siguoyi/article/details/51849964)
创建一个rx.schedulers.TestScheduler。这是一个公开的可访问的类。也可以直接使用无参构造方法,new出一个实例。
主要提供如下三个方法,来对调度器的时钟表现进行手动微调,这对依赖精确时间安排的任务的测试很有用处。
- advanceTimeBy(time,unit) 将调度器时时钟,前进一个指定时间。这是相对操作
- advanceTimeTo(time,unit) 将调度器时钟拨动到一个指定的时间。 这个是绝对操作
- triggerActions( ) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间
假定当前时间为0, 先advanceTimeBy(2, TimeUnit.SECONDS)再advanceTimeTo(2, TimeUnit.SECONDS),那么现在时间还是2。若反过来调用,那么现在时间就是4b了
自定义Scheduler—-Schedulers.form()
使用Schedulers.form(java.util.concurrent.Executor executor) ,来自定义Scheduler
subscribeOn()和observeOn()
subscribeOn()和observeOn() 是用来指定事件生产与订阅在哪个线程执行的。
- 默认没有定义observeOn、subscribeOn,即运行于当前线程
- subscribeOn 指定 订阅事件发生(OnSubscribe)的线程。若仅出现它,不出现observeOn, 还会影响其它所有事件
- observeOn 指定 在其之后的所有事件发生的线程,即使后面出现了 subscribeOn
- 若两者同时出现,subscribeOn 影响 observeOn 出现前的所有事件 及 OnSubscribe 事件
|
|
背压(Backpressure)
(most from https://zhuanlan.zhihu.com/p/24473022?refer=dreawer)
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。简而言之,背压是流速控制的一种策略。
若被观察者发送事件的速度太快,而观察者处理太慢,而且还没有做相应背压措施,可能抛出MissingBackpressureException
压力异常示例
|
|
上面的interval操作符,在1毫秒产生一个事件,速率过快,订阅者”消费”事件来不及处理,就会出现异常。
自带背压处理的操作符
用自带背压处理的操作符来处理压力。
过滤策略
使用之前讲的”过滤操作符”,就可以有效缓解压力。
比如,使用throttleFirst来获取一段周期时间内的首个事件。
|
|
缓存策略
缓存就是虽然被观察者发送事件速度很快,观察者处理不过来,但是可以选择先缓存一部分,然后慢慢读。
主要用到的是buffer操作符
|
|
按需拉取策略
就是需要”消费”多少个事件,自己告诉被观察者,最终实现了上游被观察者发送事件的速度的控制
主要使用request(long n)。这是一个protected方法
|
|
上面的代码中,其实可以去掉request相关代码,因 range –> observeOn,这一段过程本身就是响应式拉取数据。
observeOn这个操作符内部有一个缓冲区RxRingBuffer,其在Android环境下长度是16,它会告诉range最多发送16个事件,充满缓冲区即可
让不支持背压的Observable“支持”背压
对于不支持背压的Observable除了使用上述两类生硬的操作符之外,还有更好的选择:onBackpressureBuffer、onBackpressureDrop。
onBackpressurebuffer:把Observable发送出来的事件做缓存,当request方法被调用的时候,
给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。
onBackpressureDrop:将Observable发送的事件抛弃掉,直到Subscriber再次调用request(n)方法的时候,
就发送给它这之后的n个事件。
使用了这两种操作符,可以让原本不支持背压的Observable“支持”背压了。
|
|