Rxjava2 source code analysis

reference resources

As an opening remark, this article recommends: RxJava meditation
I'm not familiar with the source code characteristics of Rxjava. To be honest, I've seen it N times and barely read it( AAC The source code involved in the demo of basic rxjavasample (I don't want to read it all, but to be practical, I'll start with the demo and follow it. I personally think it's a good learning scheme). So I read the above Rxjava meditation record reluctantly, but I can't understand it. I'll read it again after I finish writing the source code principle. There must be new insights and new experiences. After a period of time, I believe I have new experiences. Maybe this is life. It won't be invariable and defeat the enemy with one move, but accumulate slowly.
Reading code and understanding life
Not paying attention to the process and being eager for success. Understanding this in three minutes and understanding that in ten minutes has brought very serious consequences. Coupled with the excessive exaggeration of characters or things by the media, some people (originally wanted to say a lot of people, but we don't dare to represent others indiscriminately. Anyway, I must be one. Some people mean that there are also such people in my family HA) are affected and fail to achieve the ideal goal and have negative emotions.
Human characteristics must be two sides: advantages and disadvantages, there can be no one side. Personally, if you are reading a story or character, don't look at it upside down, it will be very destructive to you. After all these years, I'm sorry!!! I've learned the truth, Amitabha (I'm a Buddhist apprentice. Why am I a Buddhist apprentice? Because I feel that I don't meet the standard, I only dare to become an apprentice).
If many people only tend to the results, then I believe those who care about enjoying the process will become very valuable, and things are rare and expensive!!! I hope I am the kind of person who loves to enjoy the process,. In principle, I will rest when I am tired and wait when I am bored, but I always have it in my heart. forever!!!

demo
Take a direct look at the code, get the username in the ViewModel, and then perform a series of operations. Next, let's directly analyze the source code (analysis is not enough, it can only be read through). What is the purpose of reading through the source code? At present, I have two purposes: first, I know why, otherwise I feel uncomfortable and personally love it more seriously; Second, I have a tacit impression of the code and the code design mode;

Here are several steps:
1.mDataSource.getUser(): get a Flowable;
2.map converts the obtained Flowable type through the map operator;
3.subscribeOn the above operation settings are completed in the IO thread;
4.observeOn is consumed in the main thread
5.subscribe settings consumer

UserActivity.java

private final CompositeDisposable mDisposable = new CompositeDisposable();
protected void onStart() {
        super.onStart();
      
        mDisposable.add(mViewModel.getUserName()
                .subscribeOn(Schedulers.io())//Upstream data acquisition in io thread,
                .observeOn(AndroidSchedulers.mainThread())//Consumers (downstream) consume in the main thread
                .subscribe(userName -> mUserName.setText(userName),
                        throwable -> Log.e(TAG, "Unable to get username", throwable)));
    }
    
@Override
    protected void onStop() {
        super.onStop();

        mDisposable.clear();
    }
UserViewModel.java

public Flowable<String> getUserName() {
        return mDataSource.getUser()
                // for every emission of the user, get the user name
                .map(user -> {
                    mUser = user;
                    return user.getUserName();
                });
    }

Source code summary

1.mDataSource.getUser(): get a Flowable
In fact, this is to obtain the User object in the dao layer

2.map converts the obtained Flowable type through the map operator

Rxjavaplugins in the map Onassembly only returns the FlowableMap object. The FlowableMap object completes the conversion. Its core code is in the subscribeActual method

Flowable.java
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
    }

3.subscribeOn the above operation settings are completed in the IO thread;

It means that all its upstream operations are completed in io. The following is the source code in Flowable. In the created FlowableSubscribeOn class (the class name translates that the Flowable object is subscribed on XX), the observer operation upstream of the subscription is completed through the thread (here is the IO thread). The core code is in the subscribeActual method

Flowable.java

public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return subscribeOn(scheduler, !(this instanceof FlowableCreate));
    }
    
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
    }

4.observeOn is consumed in the main thread

Indicates that the downstream Consumer (.User-defined Consumer object in the subscribe) is called in the main thread (see the name meaning: the observer is on XX). The subscribeActual method in the FlowableObserveOn class is the core code

Flowable.java

public final Flowable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

5.subscribe settings consumer

Similarly, create a subscriber LambdaSubscriber, which consumes onNext and onError consumers

Flowable.java

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
    }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Subscription> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

Source code analysis

Rxjava adopts the observer mode. The following is my understanding:

If you look at the code, you will find, for example, mviewmodel In getusername() Map (our custom Function.apply method). The actual method is not called. It is actually called by the observer. The observer is LambdaSubscriber. In The observer created in the subscribe method and passed onNext and onError consumers (Consumer objects). The entry point is here:

1.subscribe method

The following source code: Subscribe creates LambdaSubscriber object (observer), and the subscribe method means to deliver to the observer. As can be seen from the following code, subscribeActual(z) is implemented here; Method, where Z is the LambdaSubscriber object, and subscribeActual traces back to the upstream observeOn(AndroidSchedulers.mainThread()).

This is also described above: observeOn(AndroidSchedulers.mainThread()) is used to implement the subscribeActual(LambdaSubscriber) method

Flowable.java

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
    }

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Subscription> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }
    
public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(z);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

2. According to the above statement The observeOn method is used to implement the above subscribeActual(LambdaSubscriber)


Let's look for the code and create a FlowableObserveOn object (it is also the object actually returned, that is, FlowableObserveOn.subscribe(onNext,onError))

public final Flowable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

Therefore, we trace back to the subscribeActual method in FlowableObserveOn. From the above, we can see that the parameter is LambdaSubscriber

FlowableObserveOn.java

public void subscribeActual(Subscriber<? super T> s) {
        Worker worker = scheduler.createWorker();//The HandlerScheduler object is introduced in the following episode

	//s is LambdaSubscriber
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new ObserveOnConditionalSubscriber<T>(
                    (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
        } else {
            source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
        }
    }

Insert Android schedulers Start of mainthread() parameter
Here's the Android schedulers Mainthread() parameter

In fact, the code is not complex. The final return is the HandlerScheduler object, so observeOn passes the HandlerScheduler object parameters

AndroidSchedulers.java

 private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

Insert Android schedulers End of mainthread() parameter

FlowableObserveOn.subscribeActual called source Subscribe method

We can trace the source here and see that it comes from

subscribeActual(z) is implemented here. Z represents the observaonsubscriber object. subscribeActual is naturally the source object

public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(z);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

3. Therefore, the above subscribeActual (ObserveOnSubscriber) objects need to be in Found in subscribeOn

The FlowableSubscribeOn object is returned here, so the implementation of subscribeActual is implemented in this class

Flowable.java

public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return subscribeOn(scheduler, !(this instanceof FlowableCreate));
    }
    
 public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
    }

Insert schedulers IO () thread start
As shown below, what is returned here is an IoScheduler object

Schedulers.java

 static final Scheduler IO;
static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
  public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

Insert schedulers IO () thread end

Let's look at the subscribeActual implementation class in FlowableSubscribeOn

FlowableSubscribeOn.java

@Override
    public void subscribeActual(final Subscriber<? super T> s) {
        Scheduler.Worker w = scheduler.createWorker();//IoScheduler object
        final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
        s.onSubscribe(sos);

        w.schedule(sos);
    }

s. What does onsubscribe (SOS) implement: ObserveOnSubscriber (static class in FlowableObserveOn) onSubscribe(SubscribeOnSubscriber object), as shown below, we looked for the downstream object and found the previous subscribeActual (s), whose s represents LambdaSubscriber object

FlowableObserveOn.java->ObserveOnSubscriber.java

public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.validate(this.upstream, s)) {
                this.upstream = s;

                ...

                queue = new SpscArrayQueue<T>(prefetch);

                downstream.onSubscribe(this);

                s.request(prefetch);
            }
        }

So downstream onSubscribe(this); Lambdasubscriber Onsubscribe, and then call onsubscribe accept(this);, Where did onsubscribe come from??? (digging, it's good that I'm so handsome and loved by everyone, otherwise I'm so dizzy... Well, I've actually been dizzy. This is deleted N times and edited N+1 times. There's no way. Open the golden finger, but Rxjava observer mode and responsive programming are really the first serious system understanding, so it's a little difficult to look at the code, but the most difficult thing is to understand and absorb its ideas.)

LambdaSubscriber.java

@Override
    public void onSubscribe(Subscription s) {
        if (SubscriptionHelper.setOnce(this, s)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.cancel();
                onError(ex);
            }
        }
    }

To get back to business, the onSubscribe object is found in the subscribe method. The last parameter, flowableinternalhelper, is found RequestMax. INSTANCE

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
    }

So onsubscribe accept(this); In fact, it is t.request(Long.MAX_VALUE); So the request method here calls lambdasubscriber request

public enum RequestMax implements Consumer<Subscription> {
        INSTANCE;
        @Override
        public void accept(Subscription t) throws Exception {
            t.request(Long.MAX_VALUE);
        }
    }

Get () here request(n); The final location is observeonsubscriber (inheriting baseobserveonsubscriber) Request, why?
SubscriptionHelper.setOnce(this, s) is assigned a value here, and S (observaonsubscriber object) is assigned to this. I understand this here, but I always feel so uncomfortable in my heart. I interrupted and found that I entered ObserveOnSubscriber Request method, so I need to make a mark here to understand the fusion again in the future

LambdaSubscriber.java
@Override
    public void request(long n) {
        get().request(n);
    }

@Override
    public void onSubscribe(Subscription s) {
        if (SubscriptionHelper.setOnce(this, s)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.cancel();
                onError(ex);
            }
        }
    }

OK, let's jump to observeonsubscriber (inheriting baseobserveonsubscriber) Request again, the final worker schedule(this)

@Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(requested, n);
                trySchedule();
            }
        }

 final void trySchedule() {
            if (getAndIncrement() != 0) {
                return;
            }
            worker.schedule(this);
        }

Insert FlowableObserveOn class subscribeActual method worker = scheduler createWorker(); Source start
First, the scheduler is the HandlerScheduler object, and the scheduler createWorker().

HandlerScheduler.java

public Worker createWorker() {
        return new HandlerWorker(handler);//Handler is the Handler(Looper.getMainLooper())
    }


Insert FlowableObserveOn class subscribeActual method worker = scheduler createWorker(); Source end

Finally, handlerscheduler is called Schedule method. There is code to understand. Execute the run thread through the handler

HandlerScheduler.java

@Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

The run thread that executes is baseobserveonsubscriber run; Then execute runAsync()

@Override
        public final void run() {
            if (outputFused) {//
                runBackfused();
            } else if (sourceMode == SYNC) {
                runSync();
            } else {
            //This method is implemented here
                runAsync();
            }
        }

This method is in observeonsubscriber Runasync, a.onNext(v) here;, a.onComplete() corresponds to the of LambdaSubscriber onNext(v), onComplete method, and then execute our custom consumer accept;

void runSync() {
            int missed = 1;

            final Subscriber<? super T> a = downstream;
            final SimpleQueue<T> q = queue;

            long e = produced;

            for (;;) {

                long r = requested.get();

                while (e != r) {
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        cancelled = true;
                        upstream.cancel();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }

                    if (cancelled) {
                        return;
                    }
                    if (v == null) {
                        cancelled = true;
                        a.onComplete();
                        worker.dispose();
                        return;
                    }

                    a.onNext(v);

                    e++;
                }

                if (cancelled) {
                    return;
                }

                if (q.isEmpty()) {
                    cancelled = true;
                    a.onComplete();
                    worker.dispose();
                    return;
                }

                int w = get();
                if (missed == w) {
                    produced = e;
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                } else {
                    missed = w;
                }
            }
        }

OK, the downstream onNext and onError have been called, but remember that the upstream apply method has not been called!!!

We're going back to flowablesubscribeon Subscribeactual all the above is s.onSubscribe(sos). Then let's look at w.schedule(sos),

FlowableSubscribeOn.java
@Override
public void subscribeActual(final Subscriber<? super T> s) {
    Scheduler.Worker w = scheduler.createWorker();
    final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
    s.onSubscribe(sos);

    w.schedule(sos);
}

The scheduler here is IoScheduler, so the worker here returns EventLoopWorker and the parameter is pool You can see that the above parameter () is not used in the method of start(), which may be Compareandset (none, update), and then let's look at the schedule method

IoScheduler.java

@Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }

public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

@NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

The above schedule method calls threadworker scheduleActual(action, delayTime, unit, tasks),

Insert threadWorker start
Pool get();

IoScheduler.java

EventLoopWorker(CachedWorkerPool pool) {
        ...
            this.threadWorker = pool.get();
        }
ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }

The get() method creates a NewThreadWorker object, passes the threadFactory (where does this come from, you can find it by yourself, and give a construction method that prompts the IoScheduler with five parameters), and creates an ExecutorService

NewThreadWorker.java

public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

Insert threadWorker again

NewThreadWorker. scheduleActual (there is a scheduleActual method. It can be said that all core codes are basically implemented in this method). In the final analysis, we execute the run method, and then let's find out who the thread is?

NewThreadWorker.java

@NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
FlowableSubscribeOn.java->SubscribeOnSubscriber.java,It's done here src.subscribe(this)Method, here src corresponding source,source Who is it?

@Override
        public void run() {
            lazySet(Thread.currentThread());
            Publisher<T> src = source;
            source = null;
            src.subscribe(this);
        }

The subscription here is still flowable subscribe, where is subscribeActual(z) going? Z represents the SubscribeOnSubscriber object in FlowableSubscribeOn

Flowable.java
public final void subscribe(Subscriber<? super T> s) {
        if (s instanceof FlowableSubscriber) {//s is the FlowableSubscriber
            subscribe((FlowableSubscriber<? super T>)s);
        } else {
            ObjectHelper.requireNonNull(s, "s is null");
            subscribe(new StrictSubscriber<T>(s));
        }
    }
 public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(z);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

4. map operator of rxjava2

According to the source code, the implementation of subscribeActual this time is FlowableMap

Flowable.java
 public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
    }

There is a call to source Subscribe method. Here we will find source subscribeActual(z) will still be called in subscribe; So where does subscribeActual correspond?

 @Override
    protected void subscribeActual(Subscriber<? super U> s) {
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
        } else {//s is not a ConditionalSubscriber class
            source.subscribe(new MapSubscriber<T, U>(s, mapper));
        }
    }

In fact, we can stop the above subscribeActual (implemented by flowableflatmapmay), because the following code is completed in roomdata. We might as well re create a Flowable, here map is the continuation of our above


From the following source code, we can know that subscribeActual is implemented in FlowableCreate, and the parameter is new mapsubscriber < T, u > (s, mapper)

 public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }

Source. Is executed here The subscribe (emitter) operation is our customized subscribe

FlowableCreate.java
 public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }

emitter here is the ErrorAsyncEmitter object. Let's see what onnext does in it. His parent nooverflowbaseasynceemitter does the onnext method, and downstream is called here onNext(t); Method, who is downstream?

ErrorAsyncEmitter.java

@Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            if (get() != 0) {
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

We can find that the downstream corresponds to the parameter in the subscribeActual method of the FlowableCreate class. This parameter is the static class MapSubscriber object in the FlowableMap class, and then we call the onNext method. Here, we finally call the apply method in the map. It is impossible. We can only say that we have completed the closure of a demo. There are too many things we need to learn here. We can only know the source code first

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
            //This line of code, the key point, calls the apply method in our previous map
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //This line of code is also the key. We will talk about it in detail later. If you are interested, you can have a look
            downstream.onNext(v);
        }

Code summary:
I deleted, wrote and deleted the above code. I've probably done it back and forth for 3 or 4 times. I've also found some information on the Internet, but to be honest, if I don't read the source code myself, I'm very unhappy, or I can't understand the explanations of those great gods. I always have a feeling that I can only be meaningful and unspeakable.
Some people say that Rxjava implementation code is more concise or not necessarily brief. I feel that after reading the source code, I need more details, but who wrote the code is too powerful. There are many things I haven't digested yet. Anyway, I must digest, learn, imitate and study later

Summary of questions:
1.LambdaSubscriber. The explanation of the request part needs to be call ed here. We'll continue to look at it later
2. The room database compatibility is really strong. It also depends on the source code (I guess it's not easy)

Tags: Android

Posted by learningsql on Tue, 24 May 2022 08:49:03 +0300