频道栏目
首页 > 程序开发 > 移动开发 > 其他 > 正文
一张图搞定-RxJava2的线程切换原理和内存泄露问题
2017-06-14 09:26:00      个评论    来源:kJ的专栏  
收藏   我要投稿

一张图搞定-RxJava2的线程切换原理和内存泄露问题分析

首先祭出我自己画的一张图

这张图显示的是RxJava2源码层面上的调用关系
下面通过一个例子来解释一下这张图

public class MainActivity extends Activity {
    private static final String TAG = "MainActivity";

    CompositeDisposable comDisposable = new CompositeDisposable();
    Bitmap bitmap;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(@NonNull ObservableEmitter emitter) throws Exception {
                emitter.onNext("hello");
            }
        })
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());
        Disposable disposable = observable.subscribe(new Consumer() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG,s);
            }
        });
        comDisposable.add(disposable);
        bitmap = BitmapFactory.decodeResource(getResources(),R.mipmap.aaa);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        //comDisposable.dispose();
    }
}

这个例子很简单,在io线程里发送一个字符串,然后在主线程中打印出来.这里面我们加了一个bitmap对象,这个对象里面保持了一个大图片,我们通过观察内存占用,
就可以分析此Activity是否内存泄露,没有被垃圾回收.
首先从Observable.create方法开始分析

public static  Observable create(ObservableOnSubscribe source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}

生成了一个ObservableCreate对象,此对象中传入了一个ObservableOnSubscribe对象,而这个ObservableOnSubscribe对象是我们实现的一个内部类

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter emitter) throws Exception {
        emitter.onNext("hello");
    }
}

这个ObservableOnSubscribe的构造方法定义为

public ObservableCreate(ObservableOnSubscribe source) {
    this.source = source;
}

将ObservableOnSubscribe对象赋值给了source,在图中可以看到ObservableCreate里面的source的颜色和ObservableOnSubscribe的颜色相同都是黄色,这个
图中,我特意将相同的对象置为了相同的颜色,从ObservableCreate –> ObservableSubscribeOn –> ObservableObserveOn,从上到下,依次持有上层的对象,用的是装饰者模式,像极了Java的流对象.
ObservableSubscribeOn是执行subscribeOn方法生成的,代码如下

public final Observable subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}

ObservableObserveOn是执行observeOn方法生成的,代码如下

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}

也就是下面这段代码最终得到了一个ObservableObserveOn对象

Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter emitter) throws Exception {
        emitter.onNext("hello");
    }
})
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread());

OK,这就是图的左半边的对象生成逻辑,也许你会问每一个对象里的source是干什么用的?,别着急接下来你就会明白了.
来看subscribe操作,代码如下

Disposable disposable = observable.subscribe(new Consumer() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        Log.i(TAG,s);
    }
});

这个subscribe方法中我们传了一个onNext(consumer对象)操作,进入subscribe源码里

public final Disposable subscribe(Consumer onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

其实这个subscribe里是可以传四个参数的分别是onNext,onError,onComplete,onSubscribe,我们这里只传了一个onNext,其他操作源码传入了默认操作.
接着向里面跟源码:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer onNext, Consumer onError,
        Action onComplete, Consumer onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

可以看到new了一个LambdaObserver对象,并传入了 onNext, onError, onComplete, onSubscribe.之后调用subscribe方法,
此subscribe方法是ObservableObserveOn对象的,最终会调用ObservableObserveOn的subscribeActual方法,代码如下:

@Override
protected void subscribeActual(Observer observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
    }
}

让我们结合图来分析一下,这个方法调用的是source.subscribe并传入一个ObserveOnObserver对象




从图中我们可以看到ObservableObserveOn对象里的source是绿色的,是一个ObservableSubscribeOn对象,调用其subscribe方法并传入一个ObserveOnObserver对象(图中浅绿色),这个ObserveOnOnserver对象中有一个actual对象,此对象为LambdaObserver(图中紫色),从上面的分析中我们知道LambdaObserver对象中保存着我们的定义的onNext操作.

注意,接下来比较重要,要出现线程切换了:
因为调用了ObservableSubscribeOn的subscribe方法,最终会调用subscribeActual方法,代码如下

@Override
public void subscribeActual(final Observer s) {
    final SubscribeOnObserver parent = new SubscribeOnObserver(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

我们在知道ObservableSubscribeOn对象时调用subscribeOn(Schedulers.io())方法是生成的对象,上面的scheduler对象就是Schedulers.io()生成的一个线程调度对象,此对象是维护这一个线程池,让操作在io线程池中执行(关于io线程池,大家可以自己百度:) ),也就是此方法 scheduler.scheduleDirect(new SubscribeTask(parent))会切换线程来执行SubscribeTask任务,此任务的定义为:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver parent;

    SubscribeTask(SubscribeOnObserver parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

其中parent就是一个SubscribeOnObserver对象,soure就是一个ObservableCreate对象,如图:




SubscribeOnObserver对象中也有一个actual对象(浅绿色),从图中可看到是从下面传上来的一个ObserveOnObserver对象.
执行source.subscribe(parent) (也就是ObservableCreate.subscribe(ObserveOnObserver))时,是在io线程中进行的,如图所示切换了线程.这里其实可以解释一个问题 subscribeOn如果执行多次为什么只有第一次有作用?,每一次subscribeOn都会切换线程,从图中我们可以看到,这个切换线程是倒着的从下往上,也就是一直切换到最后一个subscribeOn,其实就是我们代码中定义的第一个subscribeOn,也许你会问,从下往上是subscribeOn切换线程,那如果要调用我们自己定义的onNext,最终不是还要再从上往下调用回去,调用回去时还会切换线程么?别着急,一会就会讲到从上往下调用回去的逻辑,这里我可以先说一个概括 调用subscribe时的从下往上是subscribeOn切换线程,之后调用onNext传递数据时的从上往下是ObserveOn切换线程
接着往下,在io线程里调用ObservableCreate.subscribe(ObserveOnObserver),最终同样调用到subscribeActual方法,代码如下

@Override
protected void subscribeActual(Observer observer) {
    CreateEmitter parent = new CreateEmitter(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

同样的调用了source.subscribe方法,这里的source是ObservableOnSubscribe对象,parent是一个CreateEmitter对象,如下图:




从图中可以看到CreateEmitter中有一个observer对象(深蓝色),此对象是传上来的SubscribeOnObserver对象,Ok终于调到头了,最终调用ObservableOnSubscribe的subscribe方法,这个ObservableOnSubscribe就是我们在Activity中自己实现的一个匿名内部类:

Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(@NonNull ObservableEmitter emitter) throws Exception {
                emitter.onNext("hello");
            }
        })

豁然开朗有木有,终于到了数据产生的源头了,并且这个方法是在io线程里执行的,准确的说是第一个subscribeOn方法所指定的线程,这里我们只分析onNext方法,其他onError,onComplete分析方法是相同的.
??接着来分析一下onNext方法的传递.首先从我们的调用开始emitter.onNext(“hello”);,这里的emitter其实就是上面我们创建的CreateEmitter对象,调用其onNext()方法,代码如下:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

调用了observer的onNext方法,从我们的流程图中我们可以看到,observer其实是一个SubscribeOnObserver,也就是调用SubscribeOnObserver的onNext方法,代码如下:

@Override
       public void onNext(T t) {
           actual.onNext(t);
       }

调用actual的onNext方法,从流程图中我们可以看到,这个acutal是一个ObserveObObserver对象,接着来看ObserveObObserver的onNext方法:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

关键的来了,调用了schedule()方法,从名字看是要切换线程了,

void schedule() {
           if (getAndIncrement() == 0) {
               worker.schedule(this);
           }
       }

       void drainNormal() {
           int missed = 1;

           final SimpleQueue q = queue;
           final Observer a = actual;

           for (;;) {
           boolean d = done;
                T v;
                try {
                    v = q.poll();
                } catch (Throwable ex) {
                  ....
                }
                   a.onNext(v);
               }
              ......
           }
       }

schedule()方法调用worker.schedule(this);此方法其实就是在将线程切换到主线程(也就是我们ObserveOn(AndroidSchedulers.mainThread())所指定的线程),执行drainNormal方法。

点击复制链接 与好友分享!回本站首页
上一篇:ValueAnimator属性动画学习笔记
下一篇:基于Xposed修改微信运动步数
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 |

版权所有: 88bifa.com--致力于做实用的IT技术学习网站