Почему subscribeOn() не отправляет предыдущее значение из Behaviour subject?

Рейтинг: 2Ответов: 1Опубликовано: 08.01.2023

Изучаю нюансы работы методов subscribeOn() / observeOn() для Subjects. Не могу понять, почему в приведенном коде subscribeOn() запрещает отправку события str 0?

    val observer: Observer<String> = object : Observer<String> {
    override fun onComplete() {
        println("Consumption onComplete [thread] - ${Thread.currentThread().name}\"")
    }

    override fun onNext(item: String) {
        println("Consumption onNext $item [thread] - ${Thread.currentThread().name}")
    }

    override fun onError(e: Throwable) {
        println("Consumption onError ${e.message} [thread] - ${Thread.currentThread().name}\"")
    }

    override fun onSubscribe(d: Disposable) {
        println("Consumption onSubscribe [thread] - ${Thread.currentThread().name}\"")
    }
}

val subject = BehaviorSubject.create<String>()

subject.onNext("str 0 ")

subject
    .doOnNext { println("doOnNext $it [thread] - ${Thread.currentThread().name}\"") }
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe(observer)

subject.onNext("str 1")

Thread.sleep(4000)

Вывод будет такой:

Consumption onSubscribe [thread] - main"

doOnNext str 1 [thread] - RxComputationThreadPool-1"

Consumption onNext str 1 [thread] - RxCachedThreadScheduler-1

Если закомментировать subscribeOn(), то событие str 0 веже придет при подписке:

Consumption onSubscribe [thread] - main"

doOnNext str 0  [thread] - main"

doOnNext str 1 [thread] - main"

Consumption onNext str 0  [thread] - RxCachedThreadScheduler-1

Consumption onNext str 1 [thread] - RxCachedThreadScheduler-1

Почему это происходит?

Ответы

▲ 1

Потому что из-за .subscribeOn(Schedulers.computation()) подписка на upstream планируется, собственно, на Schedulers.computation(). К тому времени как произойдёт подписка, строка subject.onNext("str 1") будет уже выполнена. Поэтому первым значением, которое увидит подписчик будет "str 1"

ObservableSubscribeOn::subscribeActual:

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    observer.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

Тут мы видим, как планируется задача подписки scheduler.scheduleDirect(new SubscribeTask(parent))

В SubscribeTask::run происходит подписка на source (upstream)

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        source.subscribe(parent);
    }
}