Rxlifecycle source code analysis

use

rely on

implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.2'
implementation 'com.trello.rxlifecycle2:rxlifecycle-android:2.2.2'
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.2'

call

    @Override
    public void login(String phone,
                      String password,
                      String deviceId,
                      String tag,
                      IView iView,
                      final ActivityEvent event,
                      int maxRetries,
                      int retryDelayMilliSecond,
                      ILoadingView iLoadingView,
                      OnNextListener<ResponseTsp> onNextListener) {

        ReqLoginParams reqLoginParams = new ReqLoginParams(phone, deviceId);
        reqLoginParams.setPassword(password);

        RxHttpUtils
                .createApi(AccountService.class)
                .login(reqLoginParams)
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.i(TAG, "doOnDispose");
                    }
                })
                .retryWhen(new RetryWithDelay(maxRetries, retryDelayMilliSecond))
//                .compose(RxLifecycleUtils.bindToLifecycle(iView))
                .compose(RxLifecycleUtils.bindUntilEvent(iView, event))
                .compose(Transformer.<String>switchSchedulers(iLoadingView))
                .subscribe(new CommonObserver<String>() {

                    @Override
                    protected String setTag() {
                        return tag;
                    }

                    @Override
                    protected void onError(String errorMsg) {
                        LogUtils.i(TAG, "onError");
                        NullUtils.onNext(onNextListener,
                                ResponseTsp.builder().setCode(ERROR_REQ).setMessage(errorMsg).build());
                    }

                    @Override
                    protected void onSuccess(String s) {
                        LogUtils.i(TAG, "onSuccess");
                        NullUtils.onNext(onNextListener,
                                ResponseTsp.builder().setCode(SUCCESS_REQ).setData(s).build());
                    }
                });
    }

As mentioned above, there are two methods to bind the life cycle, bindtolife and binduntileevent. The writing method is slightly different because the https://github.com/JessYanCoding/MVPArms The implementation method and specific implementation principle in the project are the same.

Is the test available?

                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.i(TAG, "doOnDispose");
                    }
                })

Set the life cycle to STOP the request at STOP. After clicking the request, return the project to the background. It is found that "dondispose" is printed, indicating that it is available

principle

It's said that the principle is quite simple. Let's pick it up

Bindtollifecycle and binduntileevent

Bindtollifecycle and binduntileevent are the locations where the call starts

bindUntilEvent:

Binduntileevent is called to return bind (takeuntileevent (lifecycle, event))

    @Nonnull
    @CheckReturnValue
    public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
                                                                @Nonnull final R event) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(event, "event == null");
        return bind(takeUntilEvent(lifecycle, event));
    }

bindToLifecycle:

Bindtollifecycle calls bindActivity/bindFragment, and then calls return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));

    @NonNull
    @CheckResult
    public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
        return bind(lifecycle, ACTIVITY_LIFECYCLE);
    }
    @NonNull
    @CheckResult
    public static <T> LifecycleTransformer<T> bindFragment(@NonNull final Observable<FragmentEvent> lifecycle) {
        return bind(lifecycle, FRAGMENT_LIFECYCLE);
    }
    @Nonnull
    @CheckReturnValue
    public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
                                                      @Nonnull final Function<R, R> correspondingEvents) {
        checkNotNull(lifecycle, "lifecycle == null");
        checkNotNull(correspondingEvents, "correspondingEvents == null");
        return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
    }

So far, both call methods finally call a method called bind.

LifecycleTransformer bind(@Nonnull final Observable lifecycle)

    @Nonnull
    @CheckReturnValue
    public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
        return new LifecycleTransformer<>(lifecycle);
    }

new LifecycleTransformer<>(lifecycle);

@ParametersAreNonnullByDefault
public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
                                                      FlowableTransformer<T, T>,
                                                      SingleTransformer<T, T>,
                                                      MaybeTransformer<T, T>,
                                                      CompletableTransformer
{
    final Observable<?> observable;

    LifecycleTransformer(Observable<?> observable) {
        checkNotNull(observable, "observable == null");
        this.observable = observable;
    }

    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        return upstream.takeUntil(observable);
    }

    @Override
    public Publisher<T> apply(Flowable<T> upstream) {
        return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
    }

    @Override
    public SingleSource<T> apply(Single<T> upstream) {
        return upstream.takeUntil(observable.firstOrError());
    }

    @Override
    public MaybeSource<T> apply(Maybe<T> upstream) {
        return upstream.takeUntil(observable.firstElement());
    }

    @Override
    public CompletableSource apply(Completable upstream) {
        return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE));
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) { return true; }
        if (o == null || getClass() != o.getClass()) { return false; }

        LifecycleTransformer<?> that = (LifecycleTransformer<?>) o;

        return observable.equals(that.observable);
    }

    @Override
    public int hashCode() {
        return observable.hashCode();
    }

    @Override
    public String toString() {
        return "LifecycleTransformer{" +
            "observable=" + observable +
            '}';
    }
}

You can see the final observable <? > Observable, after being constructed and passed in, is handed over to ObservableSource apply(Observable upstream) for use as a parameter:

return upstream.takeUntil(observable);

The upstream data stream will call it through takeUntil. When the observable condition is met, the next data of upstream will be discarded. So far, our truncation function has been realized. Observable internally implements the function of judging the life cycle.

.takeUntil()

Learn takeUntil:

https://cn.rx.js.org/class/es6/Observable.js~Observable.html#instance-method-takeUntil

takeUntil subscribes and starts mirroring the source Observable. It also monitors another Observable, the notifier you provide. If the notifier issues a value or complete notification, output Observable, stop mirroring the source Observable, and then complete.

The image source observable refers to the upstream above us; notifier is the observable we constructed above.

So, how is notifier implemented? You can see that the notifier implementation of bindtolife and binduntilevevent is implemented in different ways. Next, let's learn
Observable takeuntileevent (final observable lifecycle, final r event) and
Observable takeUntilCorrespondingEvent(final Observable lifecycle,final Function<R, R> correspondingEvents).

Takeuntilevevent and takeUntilCorrespondingEvent

takeUntilEvent

    private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
        return lifecycle.filter(new Predicate<R>() {
            @Override
            public boolean test(R lifecycleEvent) throws Exception {
                return lifecycleEvent.equals(event);
            }
        });
    }

If you want to know what this code means, you must know the meaning of filter:

Filter is the filter operator

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 1. Send 5 events
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
            }

            // 2. Use the filter () transformation operator
        }).filter(new Predicate<Integer>() {
            // Filter & filter the events sent by the observer according to the return value of test()
              // a. If true is returned, continue sending
              // b. If false is returned, it will not be sent (i.e. filtered)
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer > 3;
                // This example = events with integer ≤ 3 are filtered
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "Start using subscribe connect");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "The filtered events are:"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "yes Error Respond to events");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "yes Complete Respond to events");
            }
        });

Log output result:

Start using subscribe connect
4
5

You can see that the filter will filter out the unqualified data, and the qualified data can be distributed. Filter out false data and transparently transmit true data.

Look back at our takeuntileevent internal implementation

The upstream parameters of the lifecycle event include the following:

public enum ActivityEvent {
    CREATE,
    START,
    RESUME,
    PAUSE,
    STOP,
    DESTROY
}

The parameter event is one of the life cycles we pass in. For example, it can be STOP or DESTROY

Assuming that the internal condition of the filter does not hold false when the lifecycle issues data CREATE, the Observable returned by takeuntileevent will not issue data, and takeUntil will not intercept upstream to downstream data.

However, assuming that the data STOP is issued by the lifecycle, the internal condition of the filter will be true, and the Observable returned by takeuntileevent will be issued, and the upstream to downstream data will be truncated by takeUntil. So far, we have completed the function implementation of RxLifecycle.

The above is the case when we call bindtolife. We also have a way not to specify the life cycle: binduntileevent. Next, let's take another part of its implementation, takeUntilCorrespondingEvent

takeUntilCorrespondingEvent

    private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                       final Function<R, R> correspondingEvents) {
        return Observable.combineLatest(
            lifecycle.take(1).map(correspondingEvents),
            lifecycle.skip(1),
            new BiFunction<R, R, Boolean>() {
                @Override
                public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                    return lifecycleEvent.equals(bindUntilEvent);
                }
            })
            .onErrorReturn(Functions.RESUME_FUNCTION)
            .filter(Functions.SHOULD_COMPLETE);
    }

First of all, we need to know what takeUntilCorrespondingEvent is to achieve. We know that takeuntileevent determines whether the life cycle has reached the end (whether it has reached the final R event) through the two parameters passed in. If it has reached the end, it returns true, otherwise it returns false. Our takeUntilCorrespondingEvent is also for this purpose.

take

By setting the specified number of events, only a specific number of events are sent

skip

Skip an event

For example, the upstream sends three events: 1, 2 and 3. When skip(1) is set, the downstream will only receive 2 and 3

combineLatest

We understand the combineLatest operator through examples, which are reproduced from: https://blog.csdn.net/pizza_lawson/article/details/45155225

    private void test() {

        Observable<String> observable_1 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("o1");
                emitter.onNext("o2");
                emitter.onNext("o3");
            }
        });

        Observable<String> observable_2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("o4");
                emitter.onNext("o5");
                emitter.onNext("o6");
            }
        });
        
        Observable.combineLatest(
                observable_1,
                observable_2,
                new BiFunction<String, String, String>() {
                    @Override
                    public String apply(String text_1, String text_2) throws Exception {
                        Log.e("combine --- >", "text_1 = " + text_1 + " | text_2 = " + text_2);
                        return text_1 + text_2;
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onError(Throwable e) {
                        Log.e("onError --- >", "onError");

                    }

                    @Override
                    public void onComplete() {
                        Log.e("onComplete --- >", "onCompleted");
                    }

                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String o) {
                        Log.e("onNext --- >", o);
                    }
                });
    }

Log output:

2020-11-20 17:37:46.867 27558-27558/com.yoshin.tspsdk E/combine --- >: text_1 = o3 | text_2 = o4
2020-11-20 17:37:46.868 27558-27558/com.yoshin.tspsdk E/onNext --- >: o3o4
2020-11-20 17:37:46.868 27558-27558/com.yoshin.tspsdk E/combine --- >: text_1 = o3 | text_2 = o5
2020-11-20 17:37:46.868 27558-27558/com.yoshin.tspsdk E/onNext --- >: o3o5
2020-11-20 17:37:46.868 27558-27558/com.yoshin.tspsdk E/combine --- >: text_1 = o3 | text_2 = o6
2020-11-20 17:37:46.868 27558-27558/com.yoshin.tspsdk E/onNext --- >: o3o6

We can see:
observable_1 the output is always the latest value;
observable_2 output each value;
New bifunction < string, string, string > () is an internal implementation, which can be observed through the above_ 1 and observable_2 to execute the operation.

map

It is mainly used for data conversion, data preprocessing, etc

Example: https://www.jianshu.com/p/52cd2d514528

        Observable.just(student1, student2, student2)
                //map is used for conversion. Parameter 1: type before conversion and parameter 2: type after conversion
                .map(new Func1<Student, String>() {
                    @Override
                    public String call(Student i) {
                        String name = i.getName();//Gets the name in the Student object
                        return name;//Return name
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        nameList.add(s);
                    }
                });

onErrorReturnon

Let Observable launch a special item when it encounters an error and terminate normally

The onErrorReturnon method creates and returns a new Observable similar to the original Observable. The latter ignores the onError call of the former and does not pass the error to the observer. Instead, it creates a special item through the parameter function and emits it concurrently. Finally, it calls the observer's onCompleted method

Look back at the internal implementation of takeUntilCorrespondingEvent

lifecycle.take(1).map(correspondingEvents)

First look at the parameter correspondingEvents: if it is an Activity, it should be ACTIVITY_LIFECYCLE ; If it is Fragment, it should be FRAGMENT_LIFECYCLE;

    private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
        new Function<ActivityEvent, ActivityEvent>() {
            @Override
            public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
                switch (lastEvent) {
                    case CREATE:
                        return ActivityEvent.DESTROY;
                    case START:
                        return ActivityEvent.STOP;
                    case RESUME:
                        return ActivityEvent.PAUSE;
                    case PAUSE:
                        return ActivityEvent.STOP;
                    case STOP:
                        return ActivityEvent.DESTROY;
                    case DESTROY:
                        throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
                    default:
                        throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
                }
            }
        };

    // Figures out which corresponding next lifecycle event in which to unsubscribe, for Fragments
    private static final Function<FragmentEvent, FragmentEvent> FRAGMENT_LIFECYCLE =
        new Function<FragmentEvent, FragmentEvent>() {
            @Override
            public FragmentEvent apply(FragmentEvent lastEvent) throws Exception {
                switch (lastEvent) {
                    case ATTACH:
                        return FragmentEvent.DETACH;
                    case CREATE:
                        return FragmentEvent.DESTROY;
                    case CREATE_VIEW:
                        return FragmentEvent.DESTROY_VIEW;
                    case START:
                        return FragmentEvent.STOP;
                    case RESUME:
                        return FragmentEvent.PAUSE;
                    case PAUSE:
                        return FragmentEvent.STOP;
                    case STOP:
                        return FragmentEvent.DESTROY_VIEW;
                    case DESTROY_VIEW:
                        return FragmentEvent.DESTROY;
                    case DESTROY:
                        return FragmentEvent.DETACH;
                    case DETACH:
                        throw new OutsideLifecycleException("Cannot bind to Fragment lifecycle when outside of it.");
                    default:
                        throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
                }
            }
        };

There are three return values:

  1. The value corresponding to the lifecycle of ActivityEvent or FragmentEvent
    The value that needs to be returned in order to implement the function
  2. OutsideLifecycleException
    Life cycle error for direct interruption
  3. UnsupportedOperationException
    Binding error

That's it Map (corespondingevents) returns the content, plus lifecycle Take (1), you can change the lifecycle take(1). Map (corresponding events) will always return the same value. Of course, we are actually here to prepare the logic for the return value 1. As for 2 and 3, there will be additional processing because they are exceptions.

In fact, this position is to dynamically find the stopped life cycle, corresponding to the specified life cycle in takeuntileevent (final R event)

lifecycle.skip(1)

Ignore the call event of the first binding and return a null value

lifecycleEvent.equals(bindUntilEvent)
            new BiFunction<R, R, Boolean>() {
                @Override
                public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                    return lifecycleEvent.equals(bindUntilEvent);
                }
            })

Compare lifecycle take(1). The constant value returned by map (corespondingevents) and lifecycle Is the value of skip (1) changed in real time equal? Equal returns true, indicating that the link needs to be broken.

onErrorReturn(Functions.RESUME_FUNCTION)
    static final Function<Throwable, Boolean> RESUME_FUNCTION = new Function<Throwable, Boolean>() {
        @Override
        public Boolean apply(Throwable throwable) throws Exception {
            if (throwable instanceof OutsideLifecycleException) {
                return true;
            }

            //noinspection ThrowableResultOfMethodCallIgnored
            Exceptions.propagate(throwable);
            return false;
        }
    };

If yes, lifecycle take(1). Map (corespondingevents) returns the second parameter OutsideLifecycleException, which will be used here. At this time, the return of true also means that the link needs to be interrupted.

filter(Functions.SHOULD_COMPLETE)
    static final Predicate<Boolean> SHOULD_COMPLETE = new Predicate<Boolean>() {
        @Override
        public boolean test(Boolean shouldComplete) throws Exception {
            return shouldComplete;
        }
    };

Through transmission, directly return the value passed down above.

So far, takeUntilCorrespondingEvent is finished. takeUntilCorrespondingEvent will dynamically judge which life cycle needs to be terminated. If the final observable life cycle changes to the life cycle we need to terminate, it will return true, otherwise it will return false

summary

It can be seen that it is mainly used takeUntil() to implement the function, whether it is passed in or not, the life cycle we need to stop

AutoDispose is a new technology to replace Rxlifecycle. You can learn from AutoDispose~

Reference address

Android RxJava: comprehensive explanation of filter operators https://www.jianshu.com/p/c3a930a03855

The function of the operator is directly reproduced from the above link. Thank you.

Solve RxJava memory leak (previous part): detailed explanation and principle analysis of RxLifecycle: https://www.jianshu.com/p/8311410de676

Add AutoDispose in Android architecture to solve RxJava memory leak: https://blog.csdn.net/mq2553299/article/details/79418068

Tags: rxjava

Posted by Darhazer on Sat, 07 May 2022 02:56:29 +0300