Почему subscribeOn() не отправляет предыдущее значение из Behaviour subject?
Изучаю нюансы работы методов subscribeOn()
/ observeOn()
для Subjects
. Не могу понять, почему в приведенном коде subscribeOn()
запрещает отправку события str 0
?
val observer: Observer<String> = object : Observer<String> {
override fun onComplete() {
println("Consumption onComplete [thread] - ${Thread.currentThread().name}\"")
}
override fun onNext(item: String) {
println("Consumption onNext $item [thread] - ${Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
println("Consumption onError ${e.message} [thread] - ${Thread.currentThread().name}\"")
}
override fun onSubscribe(d: Disposable) {
println("Consumption onSubscribe [thread] - ${Thread.currentThread().name}\"")
}
}
val subject = BehaviorSubject.create<String>()
subject.onNext("str 0 ")
subject
.doOnNext { println("doOnNext $it [thread] - ${Thread.currentThread().name}\"") }
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe(observer)
subject.onNext("str 1")
Thread.sleep(4000)
Вывод будет такой:
Consumption onSubscribe [thread] - main"
doOnNext str 1 [thread] - RxComputationThreadPool-1"
Consumption onNext str 1 [thread] - RxCachedThreadScheduler-1
Если закомментировать subscribeOn()
, то событие str 0
веже придет при подписке:
Consumption onSubscribe [thread] - main"
doOnNext str 0 [thread] - main"
doOnNext str 1 [thread] - main"
Consumption onNext str 0 [thread] - RxCachedThreadScheduler-1
Consumption onNext str 1 [thread] - RxCachedThreadScheduler-1
Почему это происходит?