Loading...
墨滴

掀乱书页的风

2021/10/16  阅读:28  主题:兰青

Rxjava2上车指南

背景

Xnip20200509_114702.png 目前Rxjava已经发布了3.0.3版本了,但是我们项目中依然使用的是1.3.8的旧版本,2018年3月31日已经不再更新了,那么我们是升级到2.x还是3.x呢?

1.x 2.x 3.x
最后支持时间 2018.3.31 2021.2.28 最长
对比1.x改动范围 过时 很大 很大
对比1.x性能 提升 提升
RxBinding是否支持 支持
  1. RxJava2.x 版本对比1.x版本是一次重大升级,最大的修改是对背压的支持,可以使用 Flowable 方便的配置背压策略
  2. RxJava3.x 版本对比2.x版本没有做出很大的更改,相比1.x版本改动还是很大的
  3. 性能:https://github.com/akarnokd/akarnokd-misc/issues/2

由于RxBinding仅支持2.x版本,并且作者明确表示不想同时支持3.x版本,所以我们计划将58app中的Rxjava升级到2.x版本

背压(Backpressure)

先来复习一下Rxjava的观察者模式 观察者.png

RxJava是一个观察者模式的架构,当这个架构中被观察者(Observable)和观察者(Subscriber)处在不同的线程环境中时,由于各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这就会出现两种情况:

  • 被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件,(好比观察者在等米下锅,程序等待,这没有问题)。

  • 被观察者产生事件的速度很快,而观察者处理很慢。如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃。(好比被观察者生产的大米没人吃,堆积最后就会烂掉)。

这个时候我们就要用到背压了,什么是背压呢?

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略

简单说,背压是流速控制的一种策略 需要强调两点:

  1. 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
  2. 背压(Backpressure)并不是一个像flatMap一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。

背压图示 Xnip20200509_161455.png

常用观察者模式

在RxJava1.X中,同样是Observable,有的不支持背压策略,导致某些情况下,显得特别麻烦,出了问题也很难排查,2.x中 Observable 拆分成了新的 Observable 和 Flowable

  • Observable(被观察者)/Observer(观察者)
  • Flowable(被观察者)/Subscriber(观察者)
1cfc1779f2a93ad2f7c8.png
1cfc1779f2a93ad2f7c8.png

Action相关

命名方式改变 Rx1.0-----------Rx2.0

Action0 更改为 Action Aciton1 更改为 Consumer Action2 更改为 BiConsumer ActionN 更改为 Consumer Action3 - Action9 移除

Function相关

命名方式改变 Rx1.0-----------Rx2.0

Func 更改为 Function Func2 更改为 BiFunction FuncN 更改为 Function Func3 - Func9 更改为 Function3 - Function9 和RxJava1.0相比,他们都增加了throws Exception,在这些方法做某些操作就不需要try-catch

2.x开发带来的影响

这里只介绍常用的一些模块

  1. 两对观察者订阅关系是一一对应的,不要混了 Flowable/Subscriber
public interface Subscriber {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

onSubscribe(Subscription s)传入的参数s就肩负着取消订阅的功能,当然,他也可以用于请求上游的数据。

Observable/Observer

public interface Observer {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();
    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

在Observer接口中,onSubscribe(Disposable d)方法传入的Disposable也是用于取消订阅,基本功能是差不多的,只不过命名不一致,大家知道就好

subscribe()方法的返回类型为void了,在1.X中,这个方法一般会返回一个Subscription对象,用于取消订阅。现在,这个功能的对象已经被放到观察者Observer或者subscriber的内部实现方法中了

补充一点:subscribe有多个重载方法,基础的是observer,subscriber对象,然后依然可以订阅Action、Consumer等对象的

  1. 收紧了create()权限 首先先说一点,观察者不再接收null作为数据源了 在RxJava 1.x 最明显的问题就是由于 create 的太过开放,导致其被开发者滥用,而不是学习使用提供的操作符。并且用户对 RxJava 不够了解,导致各种各样的问题,如背压、异常处理等,因此新的Observable和Flowable create() 采用了保守的设计,让用户实现 FlowableOnSubscribe/ObservableOnSubscribe 接口,并且只有一个create方法,而且create方法的参数都是接口
        Flowable.create(new FlowableOnSubscribe() {
            @Override
            public void subscribe(FlowableEmitter e) throws Exception {
                e.onNext(https://wos.58cdn.com.cn/IjGfEdCbIlr/ishare/8eadeebe-255e-4b28-97d3-4efe2884ef8fXnip2020-05-12_16-33-16.png);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }
        //需要指定背压策略
        , BackpressureStrategy.BUFFER);
  1. Disposed取消订阅 1.x中Subscription来统一取消订阅,在RxJava2.0中,由于subscribe()方法现在返回void了,那么我们如何来取消订阅呢? 2.x中Subscription不再有订阅subcribe和unSubcribe的概念 旧的Subscription在这里因为名字被占,而被重新命名成了Disposable。 RxJava2.0中,Flowable提供了subscribeWith这个方法可以返回当前订阅的观察者,并且通过ResourceSubscriber DisposableSubscriber等观察者来提供 Disposable的接口。
  • DefaultSubscriber:通过实现Subscriber接口,可以通过调用request(long n)方法请求或者cancel()方法取消订阅(同步请求)
public abstract class DefaultSubscriber implements Subscriber
  • DisposableSubscriber:通过实现Desposable异步删除。
public abstract class DisposableSubscriber implements Subscriber, Disposable
  • ResourceSubscriber:允许异步取消其订阅相关资源,节省内存而且是线程安全
public abstract class ResourceSubscriber implements Subscriber, Disposable
  • SafeSubscriber:包装另一个订阅者,并确保所有onXXX方法遵守协议(序列化要求访问除外)
public final class SafeSubscriber implements Subscriber, Subscription
  • SerializedSubscriber:序列化访问另一个订阅者的onNext,onError和onComplete方法。
public final class SerializedSubscriber implements Subscriber, Subscription

同理,Observable也提供了类似的subscribeWith这个方法可以返回当前订阅的观察者,并且通过ResourceObserver DisposableObserver等观察者来提供 Disposable的接口。并且可以从外面取消 dispose(),默认Subscriber是无法从外面取消订阅的 举个栗子

ResourceSubscriber subscriber = new ResourceSubscriber() {  
    @Override
    public void onStart() {
        request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(t);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
};

Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);

subscriber.dispose();

如果是调用的是参数为Consumer和Action的subscribe方法,也是会返回Disposable对象的

public final Disposable subscribe(Consumer onNext)
public final Disposable subscribe(Consumer onNext, Consumer onError)
public final Disposable subscribe(Consumer onNext, Consumer onError, Action onComplete)
......

1.x和2.x共存

如何升级的呢,并没有找到一种可以删掉1.x的依赖,同时又能保证代码不报错的方法,最终我们的代码里是两套共存的状态

Rxjava事件发射的原理

在阅读源码之前,我们需要对RxJava的大体概念进行简单的梳理

  • 发射器:Emitter,发射数据的对象
  • 被观察者:Observable,被观察的对象
  • 观察者:Observer,观察的对象
  • 被观察者被订阅时:ObservableOnSubscribe,被订阅时的回调,同时创建出发射器
  • 释放者:Disposable,释放RxJava的对象

RxJava的分析三步骤

  • 创建:被观察者创建的过程
  • 订阅:被观察者订阅观察者的过程
  • 发射:发射器发射的过程
  1. 手写一个最简单的rxjava事件流程
Observable.create(object : ObservableOnSubscribe{
            override fun subscribe(emitter: ObservableEmitter) {
                emitter.onNext("Hello RxJava");
                emitter.onError(Exception());
                emitter.onNext("Hello RxJava");
            }
        }).subscribe(object : Observer{
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
                println("onNext=$t");
            }

            override fun onError(e: Throwable) {
                println("onError");

            }

        })
    }

输出结果:在输出onError后,就不会继续收到新的事件流,表示事件已经被释放了

//发射器接口
public interface Emitter {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

//观察者接口
public interface Observer {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
    
//被观察者被订阅时
public interface ObservableOnSubscribe {
    void subscribe(@NonNull ObservableEmitter emitter) throws Exception;
}
  1. Observable.create 创建的过程只是将传递进来的参数交给新的ObservableCreate进行管理
public static  Observable create(ObservableOnSubscribe source) {
    ObjectHelper.requireNonNull(source"source is null");//判空
    return RxJavaPlugins.onAssembly(new ObservableCreate(source));//返回自身
}

3.ObservableCreate继承自Observable

public final class ObservableCreate extends Observable {
    final ObservableOnSubscribe source;

    public ObservableCreate(ObservableOnSubscribe source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer observer) {
        //1、创建发射器
        CreateEmitter parent = new CreateEmitter(observer);
        //2、回调observer的onSubscribe
        observer.onSubscribe(parent);

        try {
            //3、回调ObservableOnSubscribe的subscribe
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter
    extends AtomicReference
    implements ObservableEmitter, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer observer;

        CreateEmitter(Observer observer) {
            this.observer = observer;
        }

        .........
    }
}

  1. Observable.subscribe 由于Observable.create返回当前ObservableCreate,所以在subscribe的时候,走的是ObservableCreate的subscribeActual方法
public final void subscribe(Observer observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");//判空
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);//返回自身

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);//回调ObservableCreate的subscribeActual
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ......
        throw npe;
    }
}

RxBinding的使用

RxBinding是什么?它是一组开源库,可将Android中各类UI控件的动作事件转换为RxJava中的数据流。也就是说使用RxBinding,我们就可以以RxJava的形式来处理UI事件。RxBinding中主要包含RxView、RxTextView、RxAdapterView、RxCompoundButton等等,这里只介绍常用的一些场景

点击事件防抖动

点击某个按钮,正常应该启动一个页面,但是手机比较卡,没有立即启动,用户就点了好几下,结果等手机回过神来的时候,就会启动好几个一样的页面。

RxView.clicks(btn)
    .debounce(500, TimeUnit.MILLISECONDS)
    .observerOn(AndroidSchedulers.mainThread())
    .subscribe(o -> {
        // handle clicks
    })

防抖动还有一个应用场景就是点赞和取消点赞

debounce 操作符产生一个新的 Observable, 这个 Observable 只发射原 Observable 中时间间隔小于指定阈值的最大子序列的最后一个元素。

获取验证码倒计时

btn_code.clicks()
            .throttleFirst(5, TimeUnit.SECONDS)  //5s内只响应第一次点击
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext {
                btn_code.isEnabled = false
            }
            .subscribe {
                Observable.interval(1, TimeUnit.SECONDS) //每隔1s发一个事件流
                    .observeOn(AndroidSchedulers.mainThread())
                    .take(5) //取5次
                    .subscribe(object : Observer{
                        override fun onComplete() {
                            btn_code.text = "获取验证码"
                            btn_code.isEnabled = true
                        }

                        override fun onSubscribe(d: Disposable) {
                        }

                        override fun onNext(t: Long) {
                            btn_code.text = "剩余" + (5 - t) + "秒"
                        }

                        override fun onError(e: Throwable) {
                        }


                    })
            }

这个例子是用kotlin写的,RxBinding对kotlin的支持是扩展方法的形式,而没有用RxView调用

扩展方法: Kotlin 能够扩展一个类的新功能而无需继承该类或者使用像装饰者这样的设计模式。通过叫做 扩展 的特殊声明完成。 你可以为一个你不能修改的、来自第三方库中的类编写一个新的方法。 这个新增的方法就像那个原始类本来就有的方法一样,可以用普通的方法调用。 这种机制称为 扩展方法 。

kotlin转为java使用不同的类名: 可以使用 @JvmName 注解修改生成的Java类的类名,如果想让多个文件使用相同的java类名,可以在所有相关文件中使用 @JvmMultifileClass 注解

合并监听

     Observable userNameObservable = RxTextView.textChanges(userNameEt);
    Observable userPwdObservable = RxTextView.textChanges(userPasswordEt);
    // 相当于合并
    Observable.combineLatest(userNameObservable, userPwdObservable,
            new BiFunction() {
                @Override
                public Object apply(CharSequence userName, CharSequence userPwd) throws Exception {
                    Toast.makeText(MainActivity.this, "33333", Toast.LENGTH_LONG).show();
                    // 设置按钮是否可用(或者改变背景颜色)
                    clearContent.setEnabled(!TextUtils.isEmpty(userName) && !TextUtils.isEmpty(userPwd));
                    return null;
                }
            });

combineLatest 操作符用来将多个 Observable 发射的数据组装起来然后在发射

搜索提示

我们平时使用的搜索框中,常常是当用户输入一部分内容后,下方就会显示对应的搜索提示,当在搜索框输入“a”关键词后,下方自动刷新和关键词相关的结果

image.png
image.png

那么在做这个功能的时候,我们要考虑以下几个问题

  • 防止用户输入过快,触发过多网络请求,需要对输入事件做一下防抖动。
  • 用户在输入关键词过程中可能触发多次请求,那么,如果后一次请求的结果先返回,前一次请求的结果后返回,这种情况应该保证界面展示的是后一次请求的结果。
  • 用户在输入关键词过程中可能触发多次请求,那么,如果后一次请求的结果返回时,前一次请求的结果尚未返回的情况下,就应该取消前一次请求。
RxTextView.textChanges(InputEditText)
    .debounce(300, TimeUnit.MILLISECONDS)
    .switchMap(text ->SearchApi.getSearchSuggest(mCityId, mCateId, text.toString()))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(results -> {
        // 处理返回结果
    });

switchMap 这个操作符与 flatMap 操作符类似,但是区别是如果原 Observable 中的两个元素,通过 switchMap 操作符都转为 Observable 之后,如果后一个元素对应的 Observable 发射元素时,前一个元素对应的 Observable 尚未发射完所有元素,那么前一个元素对应的 Observable 会被自动取消订阅,尚未发射完的元素也不会体现在 switchMap 操作符调用后产生的新的 Observable 发射的元素中

源码分析

先看下textChanges, 是TextView的扩展方法,没什么好解释的,然后会返回TextViewTextObservable这个Observable对象

fun TextView.textChanges(): InitialValueObservable {
  return TextViewTextChangesObservable(this)
}

接着跟到TextViewTextChangesObservable里面看看

private class TextViewTextChangesObservable(
  private val view: TextView
) : InitialValueObservable() {

  override fun subscribeListener(observer: Observer) {
    val listener = Listener(view, observer)
    observer.onSubscribe(listener)
    view.addTextChangedListener(listener)
  }
 }

先看下subscribeListener这个方法在哪里调用, 在父类InitialValueObservable中的subscribeActual方法中调用

override fun subscribeActual(observer: Observer) {
    subscribeListener(observer)
    observer.onNext(initialValue)
  }

subscribeActual这个方法上面说过,是在Observable被subscribe的时候进行调用的。再看下这个方法里面做了什么 第一行代码new一个Listener

private class Listener(
    private val view: TextView,
    private val observer: Observer
  ) : MainThreadDisposable(), TextWatcher

继承MainThreadDisposable ,这个是在dispose的时候会回调onDispose()方法,这里可以解除监听;Listener还实现了TextWatcher接口,主要看下这个方法

override fun onTextChanged(s: CharSequence, start: Int, before: Int, count: Int) {
    if (!isDisposed) {
       observer.onNext(s)
    }
}

其实就是对系统接口方法的封装,在文本发送变化的时候调用observer.onNext(s);,这个observer就是我们在Observable.subscribe(observer)使用的时候传入的,这样就保证了接收到文本的数据

第二行代码observer.onSubscribe(listener);这个其实就是提供一个Disposable,供解除用,在Listener中实现了这个方法,在解除监听的时候调用

override fun onDispose() {
      view.removeTextChangedListener(this)
    }

第三行代码view.addTextChangedListener(listener);其中view在我们这个例子中就是EditText,给这个EditText注册系统的监听事件,前面已经说了Listener还实现了TextWatcher接口

其实就是RxTextView封装了一个Observable,这样就可以使用RxJava的各种操作符了,然后注册系统原生的响应事件,在事件发生时通过observer.onNext(s);发送数据给observer,这个observer就是我们自己实现也是最关心的,回调的函数。

源码还是要多看看的,就算看不懂,也能感觉到自己很菜

掀乱书页的风

2021/10/16  阅读:28  主题:兰青

作者介绍

掀乱书页的风