博客
关于我
Rxjava2源码分析二
阅读量:354 次
发布时间:2019-03-04

本文共 15140 字,大约阅读时间需要 50 分钟。

我们继续上一节的内容,这一节的内容主要是线程的调度。

subscribeOn 给上面代码分配线程

示例代码里面是给下面的流分配IO线程,代码如下:

//完整版的被观察者        Observable.create(                //自定义的被观察者,String类型,                new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
e) throws Exception { e.onNext("A"); }}) //给下面的分配io线程,耗时读取异步 ObservableCreate.subscribeOn .subscribeOn(Schedulers.io()) //订阅方法 ObservableSubscribeOn.subscribe .subscribe( //观察者,上流往下流向的是String类型,所以最终接收的也是String类型 new Observer
() { @Override public void onSubscribe(Disposable d) { //一订阅就会被触发的方法 } @Override public void onNext(String s) { //正常执行的回调方法 } @Override public void onError(Throwable e) { //发生错误的回调方法 } @Override public void onComplete() { //执行完毕触发的方法 } });

我们先看下.subscribeOn(Schedulers.io())中Schedulers.io()具体是什么:

@NonNull    public static Scheduler io() {           return RxJavaPlugins.onIoScheduler(IO);    }

这个RxJavaPlugins.onIoScheduler又是一个全局hook,我们不管它,用不上,跟进去

public final class Schedulers {       @NonNull    static final Scheduler SINGLE;    @NonNull    static final Scheduler COMPUTATION;    @NonNull    static final Scheduler IO;

它是Schedulers 类里面的一个成员类,使用static和final修饰,静态代码块里面实例化

static {           SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());        IO = RxJavaPlugins.initIoScheduler(new IOTask());        TRAMPOLINE = TrampolineScheduler.instance();        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());    }

这里RxJavaPlugins.initIoScheduler又是一个hook,还是不理它,往下看:

static final class IOTask implements Callable
{ @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }

这是一个静态的内部类,是一个callable,有返回值的任务,返回:

static final class IoHolder {           static final Scheduler DEFAULT = new IoScheduler();    }

我们看一下: new IoScheduler()

public IoScheduler() {           this(WORKER_THREAD_FACTORY);    }

继续跟进:

public IoScheduler(ThreadFactory threadFactory) {           this.threadFactory = threadFactory;        this.pool = new AtomicReference
(NONE); start(); }

1、给工厂赋值,起个名字。

2、给pool赋值,它是一个原子操作类,
3、执行start();

先看一下NONE是个什么东东

static final CachedWorkerPool NONE;    static {           SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));        SHUTDOWN_THREAD_WORKER.dispose();        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,                Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);        EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);		//在这里,可以看出这个静态代码块都是在对当前类的成员变量进行一些初始化        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);        NONE.shutdown();    }

主要看NONE的初始化

static final class CachedWorkerPool implements Runnable {   		//任务或者线程的存活长度        private final long keepAliveTime;        //存放任务的队列,非阻塞队列        private final ConcurrentLinkedQueue
expiringWorkerQueue; //这个就是我们的disposable,类似一个set集合,可以add、remove等等, //通过定义一个全局的boolean来确认它是否 被阻断 final CompositeDisposable allWorkers; //线程池 private final ScheduledExecutorService evictorService; //包装我们任务的FutureTask private final Future
evictorTask; //线程工厂 private final ThreadFactory threadFactory; CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { //线程存活时间,最后设置是60秒 this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; //存放干活(线程池)的任务队列,ThreadWorker是线程池,也是干活的队列,为了复用。 this.expiringWorkerQueue = new ConcurrentLinkedQueue
(); //每一个ThreadWorker是一个单位,辅助中断任务。 this.allWorkers = new CompositeDisposable(); //定义自己的线程工厂 this.threadFactory = threadFactory; ScheduledExecutorService evictor = null; //接收任务返回值,可以看成是一个FutureTask。 //我看了实现过程要求,基本就跟FutureTask的一样,不是关键的东西,没再花时间去找。 Future
task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS); } evictorService = evictor; evictorTask = task; }

我们稍微分析一下这个CachedWorkerPool 类,初始化进来的时候创建对应的成员变量和赋值,其中evictor 和 task 一开始是null的,因为unit 是null,那么肯定还会另外一种情况不是null,如果不为null,evictor 是一个线程池,核心线程是1,自己定义线程工厂名字,其他是默认值,我把具体线程池参数放出来,有兴趣的朋友可以看看:

public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory) {           super(corePoolSize, Integer.MAX_VALUE,              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,              new DelayedWorkQueue(), threadFactory);    }

task ,将我们自己this扔进了线程池,因为我们本身实现了runnable,具有被线程调度的功能,看下这个runnable的run方法的作用:

@Override        public void run() {               evictExpiredWorkers();        }
void evictExpiredWorkers() {               if (!expiringWorkerQueue.isEmpty()) {               //一个纳秒级的时间戳,在同一个操作系统内比较才有意义,根据系统CUP获取的                long currentTimestamp = now();                for (ThreadWorker threadWorker : expiringWorkerQueue) {                   //比较线程是否过期,如果过期则回收                    if (threadWorker.getExpirationTime() <= currentTimestamp) {                           if (expiringWorkerQueue.remove(threadWorker)) {                               allWorkers.remove(threadWorker);                        }                    } else {                           // Queue is ordered with the worker that will expire first in the beginning, so when we                        // find a non-expired worker we can stop evicting.                        break;                    }                }            }        }

什么时候就会被回收呢?我找到了这个

void release(ThreadWorker threadWorker) {               // Refresh expire time before putting worker back in pool            //+60s,所以一个线程超过60s就会被回收            threadWorker.setExpirationTime(now() + keepAliveTime);            expiringWorkerQueue.offer(threadWorker);        }

可能有些小伙伴看到这里就疑惑了,线程池不是可以设置定时回收吗?为什么还要在这里搞这个,因为这个不是干活的线程池,是一个辅助管理类,管理添加到真正干活的线程池里的线程。如果真正干活的线程池不干活了,赶紧回收。

回过头来我们看3、执行start();

@Override    public void start() {           CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);        if (!pool.compareAndSet(NONE, update)) {               update.shutdown();        }    }

再次实例化了一次,这个传入了unit 了,不为空,接着使用原子操作类进行替换,保证多个线程也只会执行成功一次,如果不成功,说明已经实例化过了,马上调用shutdown,回收线程。是调度自己。

回过头整体看一下,Schedulers.io()做了什么?

1、如果没初始化,则初始化。
2、如果初始化完毕,则调用一下shutdown线程。不工作则回收一下线程。
3、返回IoScheduler,这个类里面封装了一个线程池,这个线程池不是我们干活的线程池,是调度自己用的。定义了各种辅助类和方法,看它的成员变量就能猜七七八八了,它的类名:CachedWorkerPool ,缓存+工作池,run方法主要是回收线程。

主要还是明白它是怎么调度线程的,其他类型的线程的过程基本都是差不多,所以这里只要分析一种,其他就都明白了,我们来整体分析一下流程:

老方法,谁点出来的观察者,是:ObservableSubscribeOn.subscribe,不再讲一遍了,从上一篇的分析过程我们都只,谁点出来的直接找谁的subscribeActual实现类

@Override    public void subscribeActual(final Observer
s) { final SubscribeOnObserver
parent = new SubscribeOnObserver
(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }

先包裹一层,然后 s.onSubscribe(parent);立马调用观察者的onSubscribe方法,所以观察者的onSubscribe方法不参与线程调度,原来什么线程还是什么线程。

new SubscribeOnObserver(s):在Rxjava中最典型的就是封包和拆包,稍微看一下一个包裹大概什么样子,可以看到连接器Disposable

//extends AtomicReference
继承了一个Disposable操作类 //实现了Observer和Disposable,所以它即可以作为一个Observer也可以作为一个Disposable static final class SubscribeOnObserver
extends AtomicReference
implements Observer
, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer
actual; final AtomicReference
s; SubscribeOnObserver(Observer
actual) { //赋值持有观察者 this.actual = actual; //创建一个原子操作类Disposable this.s = new AtomicReference
(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } }

这里持有了Observer又实现了Observer,典型的装饰模式,也非常简单就不多讲了。

(考虑到整体流程还没走通,这里暂时不适合分析disposable)

我们接着看关键代码,从里面开始分析

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
new SubscribeTask(parent):将下游的对象,这里是观察者传递进来

final class SubscribeTask implements Runnable {           private final SubscribeOnObserver
parent; SubscribeTask(SubscribeOnObserver
parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }

source.subscribe(parent);放在run方法里面,本身实现了runable,是一个任务,可以猜测这个任务等下会被扔给线程池,这个source就是上游的对象,subscribe是source的方法,跟我们的分析目标:observeOn 给上面代码分配线程 ,完全一致。但是这里有问题就是我们分析代码是从最里面开始分析的,但是执行可不是这样,是从外面开始的,因为我们开始往下传递是从自定义的被观察者中的发射器开始往下传递,这些可都是上游操作,所以结果就是整个链式调用除了观察者中的onSubscribe不参与线程调度之外,其他全都IO线程里面了。

又来到我们熟悉的代码环节:source.subscribe(parent); 将parent传递给上一层source,这里的source已经到了ObservableCreate,然后将被观察者包裹了一层,叫发射器,然后调用onNext往下拆包,这里跟我们上一节分析的已经一摸一样了。

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

我们刚看了new SubscribeTask(parent),现在我们继续看scheduler.scheduleDirect();

@NonNull    public Disposable scheduleDirect(@NonNull Runnable run) {           return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);    }
@NonNull    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {       	//如果存在ThreadWorker,则拿出来使用,不存在则创建。        final Worker w = createWorker();		//hook包裹了一层,我们不用关注它        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);		//Diaposable包裹了一层,即使任务也是一个Dispoable,主要用来中断任务        DisposeTask task = new DisposeTask(decoratedRun, w);		//关键方法,使用线程池干活,直接去IOScheduler看        w.schedule(task, delay, unit);        return task;    }

createWorker是Scheduler的抽象方法,毫无疑问肯定是在IOScheduler里面实现

@NonNull    public abstract Worker createWorker();

按照设计思维来说,整个Scheduler是一个抽象类,定义了一系列需要用到的方法和需要拓展的抽象方法,五个类型的Schduler实现类必须去实现这些,我们用的是哪个实现类就去哪个实现类找。下面是IoScheduler

@NonNull    @Override    public Worker createWorker() {           return new EventLoopWorker(pool.get());    }

看一下pool:final AtomicReference《CachedWorkerPool》 pool; 就是我们一开始初始化的那个带有线程池的那个类。

ThreadWorker get() {               if (allWorkers.isDisposed()) {                   return SHUTDOWN_THREAD_WORKER;            }            while (!expiringWorkerQueue.isEmpty()) {                   ThreadWorker threadWorker = expiringWorkerQueue.poll();                if (threadWorker != null) {                       return threadWorker;                }            }            // No cached worker found, so create a new one.            ThreadWorker w = new ThreadWorker(threadFactory);            allWorkers.add(w);            return w;        }

从队列里面拿任务线程,如何有就拿出来,如果没有则创建一个新的,我们看一下ThreadWorker。

static final class ThreadWorker extends NewThreadWorker {           private long expirationTime;        ThreadWorker(ThreadFactory threadFactory) {               super(threadFactory);            this.expirationTime = 0L;        }        public long getExpirationTime() {               return expirationTime;        }        public void setExpirationTime(long expirationTime) {               this.expirationTime = expirationTime;        }    }

单纯从上面的代码是无法知道这个类是干什么地,继续看NewThreadWorker

public class NewThreadWorker extends Scheduler.Worker implements Disposable {       private final ScheduledExecutorService executor;    volatile boolean disposed;    public NewThreadWorker(ThreadFactory threadFactory) {           executor = SchedulerPoolFactory.create(threadFactory);    }

从名字上看它是创建了一个线程池,我们看: SchedulerPoolFactory.create(threadFactory);

public static ScheduledExecutorService create(ThreadFactory factory) {           final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);        if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {               ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;            POOLS.put(e, exec);        }        return exec;    }

ThreadWorker就是一个线程池,且是我们干活的线程池。

回过头来我们接着看: w.schedule(task, delay, unit); w也就是ThreadWorker

@NonNull        @Override        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {               if (tasks.isDisposed()) {                   // don't schedule, we are unsubscribed                return EmptyDisposable.INSTANCE;            }            return threadWorker.scheduleActual(action, delayTime, unit, tasks);        }

tasks又是一个用来辅助判断是否中断任务的类,我们直接看scheduleActual方法

@NonNull    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {       	//又hook了一层,不用管它        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);		//又包裹了一层,用来辅助中断流,心累啊!        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);        if (parent != null) {               if (!parent.add(sr)) {                   return sr;            }        }        Future
f; try { if (delayTime <= 0) { //如果没有延迟要求则直接提交,使用有返回值的提交方式submit,且是异步提交 f = executor.submit((Callable)sr); } else { //有延迟要求则按照要求提交传入延迟时间参数 f = executor.schedule((Callable)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }

整个流程到这里基本走完了,总结一下吧:

线程的封包拆包,装饰模式是我们上一篇的东西,这里就不再重复了,当使用了线程调度:observeOn 的时候,我们之前看到的代码:

@Override        public void run() {               source.subscribe(parent);        }

subscribeOn 后面的部分放在run方法,在线程里面执行了,而我们的上source是上游执行到这里的对象,跟run没有关系,我们的观察者的onSubscribe和线程调度没有一毛钱关系。

IOScheduler.io这个类,里面有一个类叫:CachedWorkerPool,我们可以叫它任务管理类,它实现了runnable,本身是一个任务,能够被线程池接收,内部又定义了一个线程池,run方法里面主要是回收任务的工作,然后又有好几个成员变量,比如创建一个队列存储真正干活的任务线程,创建一个接收任务返回的对象,创建了一个管理所有任务中断流的类等等。

转载地址:http://vker.baihongyu.com/

你可能感兴趣的文章
(C++11/14/17学习笔记):线程启动、结束,创建线程多法、join,detach
查看>>
leetcode 14 最长公共前缀
查看>>
做做Java
查看>>
map的find函数和count函数
查看>>
C++并发与多线程(一)
查看>>
计算机网络子网划分错题集
查看>>
java一些基本程序
查看>>
神经元与神经网络一之概述
查看>>
FANUC机器人R-30iB_R-30iB PLUS备件规格型号统计整理
查看>>
FANUC机器人的镜像备份操作及U盘格式化具体步骤
查看>>
vue-依赖-点击复制
查看>>
LeetCode 116填充每个节点的下一个右侧结点指针
查看>>
2021-4-28【PTA】【L2-1 包装机 (25 分)】
查看>>
Arduino mega2560+MPU6050利用加速度值控制舵机
查看>>
紫书——蛇形填数
查看>>
刷题计划1——poj1753
查看>>
蓝桥杯备战——刷题(2019)
查看>>
未定义的变量“py”或函数“py.command”
查看>>
A Guide to Node.js Logging
查看>>
webwxbatchgetcontact一个神奇的接口
查看>>