Flowable/Observable을 결합하는 연산자
1. merge계열 연산자
여러 Flowable/Observable에서 받은 데이터를 하나의 Flowable/Observable로 통지하는 연산자.
여러 생산자가 동시에 데이터를 통지하더라도 결과를 통지할때는 동기화 돼 순차적으로 통지된다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
.map{ data ->
data + 100L
}
val result = Flowable.merge(flowable, flowable2)
result.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-2: 100
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-2: 101
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 완료2. concat계열 연산자
여러개의 Flowable/Observable을 전달받아 하나의 Flowable/Observable로 결합한 후 순차적으로 실행하는 연산자.
결합 전의 통지를 순차로 실행하므로 완료되지 않는 생산자가 포함되면 다음 생산자는 실행되지 않는다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
.map { data ->
data + 100L
}
val result = Flowable.concat(flowable, flowable2)
result.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-2: 100
RxComputationThreadPool-2: 101
RxComputationThreadPool-2: 완료3. concatEager/concatArrayEager
concat 메서드와 달리 원래의 Flowable/Observable이 한꺼번에 실행되지만 통지를 순차적으로 하는 연산자.
fun main() {
val flowable1 = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(5)
.map { data ->
data + 100L
}
val source = listOf(flowable1, flowable2)
val result = Flowable.concatEager(source)
result.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-1: 100
RxComputationThreadPool-1: 101
RxComputationThreadPool-1: 102
RxComputationThreadPool-2: 103
RxComputationThreadPool-2: 104
RxComputationThreadPool-2: 완료4. startWith/startWithArray
인자의 데이터를 통지하고나서 자신의 데이터를 통지하는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
val other = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
.map { data ->
data + 100L
}
val result = flowable.startWith(other)
result.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 100
RxComputationThreadPool-1: 101
RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
RxComputationThreadPool-2: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-2: 완료5. zip/zipWith
인자로 전달된 여러개의 Flowable/Observable에서 데이터를 받아 데이터들이 모인 시점에 이 데이터들을 함수형 인터페이스에 전달하고, 이 함수형 인터페이스에서 새로 생성한 데이터를 결과로 통지하는 연산자.
새로운 데이터는 모두 생산자에서 동일한 순서의 데이터를 받아 생성한다. 따라서 인자로 전달받은 각 생산자의 통지 시점이 다르면 가장 느리게 처리한 생산자가 데이터를 처리한 시점에 새로운 데이터가 생성된다.
완료 통지 시점은 통지하는 데이터 개수가 가장 적은 인자 생산자에 맞춘다.
fun main() {
val flowable1 = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.map { data ->
data + 100L
}
val result = Flowable.zip(flowable1, flowable2) { data1, data2 ->
listOf(data1, data2)
}
result.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-2: [1, 101]
RxComputationThreadPool-2: [2, 102]
RxComputationThreadPool-2: 완료6. combineLatest/combineLatestDelayError
인자로 받은 여러 Flowable/Observable이 데이터를 받는 시점에 각 Flowable/Observable이 마지막으로 통지한 데이터를 함수형 인터페이스에 전달하고 이 데이터를 받아 새로 데이터를 생성해 통지하는 연산자.
fun main() {
val flowable1 = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.map { data ->
data + 100L
}
val result = Flowable.combineLatest(flowable1, flowable2) { data1, data2 ->
listOf(data1, data2)
}
result.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-1: [1, 100]
RxComputationThreadPool-1: [2, 100]
RxComputationThreadPool-2: [2, 101]
RxComputationThreadPool-1: [3, 101]
RxComputationThreadPool-1: [4, 101]
RxComputationThreadPool-1: [4, 102]
RxComputationThreadPool-1: 완료Flowable/Observable 상태를 통지하는 연산자
1. isEmpty
Flowable/Observable이 통지할 데이터가 있는지 판단하는 연산자.
원본 Flowable에서 데이터 없이 완료만 통지되면 true를, 데이터가 통지되면 false를 Single로 반환한다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(3)
.filter { data ->
data >= 3
}
.isEmpty
flowable.subscribe{ data ->
println(data)
}
Thread.sleep(4000L)
}
출력 결과
true2. contains
인자의 데이터가 Flowable/Observable에 포함됐는지 판단하는 연산자.
원봉 Flowable/Observable이 지정 데이터를 포함하면 true를, 없거나 통지할 데이터가 없으면 false를 Single로 통지한다.
fun main() {
val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).contains(3L)
flowable.subscribe{ data ->
println(data)
}
Thread.sleep(4000L)
}
출력 결과
true원본 Flowable이 해당 데이터를 통지하기 전까지는 결과를 판단할 수 없기 때문에 시간이 걸린다.
3. all
Flowable/Observable이 통지하는 모든 데이터가 설정한 조건에 맞는지 판단하는 연산자.
fun main() {
val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(3L)
.all{ data ->
data <5
}
flowable.subscribe{ data ->
println(data)
}
Thread.sleep(4000L)
}
출력 결과
true4. sequenceEqual
인자로 전달된 두개의 Flowable/Observable이 통지하는 데이터가 동일한지, 같은 순서로 같은 수를 통지하는지 판단하는 연산자.
통지 시점은 비교하지 않는다.
fun main() {
val flowable1 = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(3)
val flowable2 = Flowable.just(0L, 1L, 2L)
val result = Flowable.sequenceEqual(flowable1, flowable2)
result.subscribe{ data ->
println(data)
}
Thread.sleep(4000L)
}
출력 결과
true5. count
Flowable/Observable의 데이터 개수를 통지하는 연산자.
Long타입으로 Single을 반환한다.
Flowable/Observable 데이터를 집계하는 연산자
1. reduce/reduceWith
설정한 집계 방식으로 Flowable/Observable의 데이터를 계산하고 최종 결과를 통지하는 연산자.
초기값을 받으면 Single, 받지 않으면 Maybe를 반환한다.
fun main() {
val flowable = Flowable.just(1, 10, 100, 1000, 10000)
.reduce(0) { sum, data ->
sum + data
}
flowable.subscribe { data ->
println(data)
}
}
출력 결과
111112. scan
원본 Flowable/Observable이 통지한 데이터를 인자의 함수형 인터페이스를 사용해 집계하는 연산자.
계산할 때 마다 생성되는 결과값을 통지한다.
초기값이 있으면 초기값을 첫번째 데이터로 통지한다.
fun main() {
val flowable = Flowable.just(1, 10, 100, 1000, 10000)
.scan(0) { sum, data ->
sum + data
}
flowable.subscribe { data ->
println(data)
}
}
출력 결과
0
1
11
111
1111
11111유틸리티 연산자
3. repeat
원본 Flowable/Observable이 처리를 완료하면 데이터 통지를 처음부터 반복하는 연산자.
인자가 없으면 완료하지 않고 데이터를 반복 통지, 있으면 지정한 숫자만큼 반복 통지한다.
fun main() {
val flowable = Flowable.just("A","B","C").repeat(2)
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: A
main: B
main: C
main: A
main: B
main: C
main: 완료4. repeatUntil
지정한 조건이 될 때 까지 Flowable/Observable의 데이터 통지를 반복하는 연산자.
fun main() {
val startTime = System.currentTimeMillis()
val flowable = Flowable.interval(100L,TimeUnit.MILLISECONDS).take(3)
.repeatUntil{
println("called")
System.currentTimeMillis() - startTime > 500L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(1000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
called
RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
called
RxComputationThreadPool-2: 완료5. repeatWhen
지정한 시점까지 원본 Flowable/Observable의 통지를 반복하는 연산자.
반복 여부는 repeatWhen 메서드의 인자인 함수형 인터페이스에서 판단한다.
fun main() {
val flowable = Flowable.just(1,2,3).repeatWhen { hanlder ->
hanlder.delay(100L, TimeUnit.MILLISECONDS).take(2).doOnNext {
println("emit: $it")
}.doOnComplete {
println("complete")
}.map {
val time = System.currentTimeMillis()
"$time ms: $it"
}
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(5000L)
}
출력결과
main: 1
main: 2
main: 3
emit: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
emit: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
complete
RxComputationThreadPool-1: 완료완료 통지를 알리는 Flowable을 repeatWhen의 메서드의 인자로 받아 반복을 제어하는 Flowable을 생성하는 함수형 인터페이스를 구현한다.
6. delay
Flowable/Observable에서 받은 데이터를 설정한 기간만큼 지연해 통지하는 연산자.
지연 기간은 직접 시간을 지정하거나, 통지 지연 시점을 함수형 인터페이스로 설정할 수 있다.
fun main() {
println("처리 시작 : ${System.currentTimeMillis()}")
val flowable = Flowable.create({ emitter: FlowableEmitter<String> ->
println("구독 시작 : ${System.currentTimeMillis()}")
emitter.onNext("A")
emitter.onNext("B")
emitter.onNext("C")
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.delay(2000L, TimeUnit.MILLISECONDS)
.doOnNext { println("통지 시각 : ${System.currentTimeMillis()}") }
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
처리 시작 : 1643102620593
구독 시작 : 1643102620735
통지 시각 : 1643102622750
RxComputationThreadPool-1: A
통지 시각 : 1643102622750
RxComputationThreadPool-1: B
통지 시각 : 1643102622751
RxComputationThreadPool-1: C
RxComputationThreadPool-1: 완료처리시작 시간과 통지시각의 차이가 약 2000밀리초인것을 보면 데이터가 delay메서드로 설정한 시간만큼 지연돼 통지된 것을 볼 수 있다.
또한 처리 시작과 구독 시작은 시간 차가 거의 없으므로 구독 자체가 지연되는것은 아닌것을 볼 수 있다.
7. delaySubscription
Flowable/Observable의 처리 시작을 지연하는 연산자.
예를들어 Cold한 생산자는 subscribe를 실행하면 바로 처리를 시작하지만, delaySubscription을 사용하면 설정한 기간을 기다린 후에 처리를 시작한다.
fun main() {
println("처리 시작 : ${System.currentTimeMillis()}")
val flowable = Flowable.create({ emitter: FlowableEmitter<String> ->
println("구독 시작 : ${System.currentTimeMillis()}")
emitter.onNext("A")
emitter.onNext("B")
emitter.onNext("C")
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.delaySubscription(2000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
처리 시작 : 1643102817522
구독 시작 : 1643102819683
RxComputationThreadPool-1: A
RxComputationThreadPool-1: B
RxComputationThreadPool-1: C
RxComputationThreadPool-1: 완료처리시작과 구독시작의 시간차이가 약 2000밀리초인것을 보면 지정한 시간만큼 처리를 늦게 시작한것을 알 수 있다.
8. timeout
데이터를 통지할 때 설정된 시간을 넘기면 에러를 통지하거나 대체 Flowable/Observable의 데이터를 통지하는 연산자.
인자로 대체하는 생산자가 없으면 에러를 통지하며 이때 에러 객체는 TimeoutException이다.
fun main() {
println("처리 시작 : ${System.currentTimeMillis()}")
val flowable = Flowable.create({ emitter: FlowableEmitter<Int> ->
println("구독 시작 : ${System.currentTimeMillis()}")
emitter.onNext(1)
emitter.onNext(2)
try {
Thread.sleep(1200L)
} catch (e: InterruptedException) {
emitter.onError(e)
return@create
}
emitter.onNext(3)
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.timeout(1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
처리 시작 : 1643103077418
구독 시작 : 1643103077552
main: 1
main: 2
RxComputationThreadPool-1: 에러= java.util.concurrent.TimeoutException: The source did not signal an event for 1000 milliseconds and has been terminated.설정한 타임아웃 값 (1000밀리초)이 지나면서 TimeoutException 에러가 발생하였다.
'RxJava' 카테고리의 다른 글
| RxJava 06 - RxJava의 디버깅과 테스트 (0) | 2022.02.16 |
|---|---|
| RxJava 05 - Processor/Subject (0) | 2022.02.16 |
| RxJava 04 - Flowable과 Observable의 연산자 (part.03) (0) | 2022.01.18 |
| RxJava 04 - Flowable과 Observable의 연산자 (part.02) (0) | 2022.01.18 |
| RxJava 04 - Flowable과 Observable의 연산자 (part.01) (0) | 2022.01.18 |