RxJava 04 - Flowable과 Observable의 연산자 (part.03)
통지 데이터를 제한하는 연산자
1. filter
받은 데이터가 조건에 맞는지 판정해 true인것만 통지하는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.filter{ data ->
data % 2L == 0L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 6
RxComputationThreadPool-1: 8
2. distinct
통지하려는 데이터가 이미 통지된 데이터와 같다면 통지하지 않는 연산자. 내부의 HashSet을 이용하여 판별한다.
distinct()
fun main() {
val flowable = Flowable.just("A","B","b","A","a","B","b")
.distinct()
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: A
main: B
main: b
main: a
main: 완료
distinct(keySelector)
데이터를 비교하고 통지하며, 통지하는 데이터는 원본 데이터이다.
fun main() {
val flowable = Flowable.just("A","B","b","A","a","B","b")
.distinct{
it.lowercase()
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: A
main: B
main: 완료
3. distinctUntilChanged
같은 데이터를 연속으로 받을 때 이 데이터를 제외하고 통지하는 연산자.
distinctUntilChanged()
fun main() {
val flowable = Flowable.just("A","a","a","A","a")
.distinctUntilChanged()
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: A
main: a
main: A
main: a
main: 완료
distinctUntilChanged(comparer)
fun main() {
val flowable = Flowable.just("1", "1.0", "0.1", "0.10", "1")
.distinctUntilChanged { data1, data2 ->
val convert1 = BigDecimal(data1)
val convert2 = BigDecimal(data2)
convert1.compareTo(convert2) == 0
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: 1
main: 0.1
main: 1
main: 완료
4. take
지정한 개수나 기간에 도달할 때 까지 받은 데이터를 통지하는 연산자.
5. takeUntil
인자로 지정한 조건이 될 때 까지 데이터를 통지하는 연산자.
takeUntil(stopPredicate)
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeUntil { data ->
data == 3L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 완료
takeUntil(other)
takeUntil 메서드의 인자 Flowable이 데이터를 통지할 때 까지 받은 데이터를 통지한다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeUntil(
Flowable.timer(1000L, TimeUnit.MILLISECONDS)
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
RxComputationThreadPool-1: 완료
경과 시간 | 0 | 300 | 600 | 900 | 1000 |
---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | ||
인자 Flowable | 0 | ||||
결과 통지 | 0 | 1 | 2 | 완료 |
6. takeWhile
인자로 지정한 조건이 true일 동안 받은 데이터를 통지하는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeWhile { data ->
data != 3L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료
7. takeLast
원본 생산자가 완료될 때 마지막 데이터부터 지정한 개수나 지정한 기간의 데이터만을 세어 통지하는 연산자.
takeLast(count)
fun main() {
val flowable = Flowable.interval(800L, TimeUnit.MILLISECONDS)
.take(5)
.takeLast(2)
flowable.subscribe(DebugSubscriber())
Thread.sleep(5000L)
}
출력 결과
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 완료
takeLast(count, time, unit)
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(10)
.takeLast(2, 1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 9
RxComputationThreadPool-1: 완료
경과 시간 | 0 | 300 | 600 | 900 | 1200 | 1500 | 1800 | 2100 | 2400 | 2700 | 3000 |
---|---|---|---|---|---|---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | |
지정 기간의 데이터 | 6 | 7 | 8 | 9 | |||||||
통지 대상의 데이터 | 8 | 9 |
완료 통지 전 1000밀리초동안 통지된 데이터 중 끝에서부터 2건을 통지한다.
8. skip
앞에서부터 지정한 만큼 데이터를 건너 뛴 후 나머지 데이터를 통지하는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skip(2)
flowable.subscribe(DebugSubscriber())
Thread.sleep(1500L)
}
출력 결과
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
(앞의 2개 0과 1을 통지하지 않는다)
9. skipUntil
인자로 지정한 생산자가 데이터를 통지할때까지 결과를 통지하지 않는다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipUntil (
Flowable.timer(1000L,TimeUnit.MILLISECONDS)
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-2: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-2: 5
경과 시간 | 0 | 300 | 600 | 900 | 1000 | 1200 | 1500 | 1800 |
---|---|---|---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | 3 | 4 | 5 | ||
인자 Flowable | 0 | |||||||
통지 | 3 | 4 | 5 |
10. skipWhile
인자로 지정한 조건이 true일 때 데이터를 통지하지 않는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipWhile{ data ->
data != 3L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 5
데이터가 3이 되기 전까지 skip하였다.
11. skipLast
통지 데이터 중에서 끝에서부터 지정한 범위만큼 통지하지 않는 연산자.
fun main() {
val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS)
.take(5)
.skipLast(2)
flowable.subscribe(DebugSubscriber())
Thread.sleep(6000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료
12. throttleFirst
데이터를 통지하고나면 지정 시간동안 통지되는ㄷ ㅔ이터는 파기하는 연산자로 처리가 완료될 때 까지 반복한다.
단기간에 들어오는 데이터가 모두 필요한 것이 아니라면 데이터를 쳐낼 수 있다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(10)
.throttleFirst(1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(6000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 완료
13. throttleLast/sample
인자로 받은 시간마다 가장 마지막에 통지되는 데이터만 통지하는 연산자.
throttleLast
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.throttleLast(1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료
경과 시간 | 0 | 300 | 600 | 900 | 1000 | 1200 | 1500 | 1800 | 2000 | 2100 | 2400 | 2700 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | |||
결과 데이터 | 2 | 5 |
sample
인자 Flowable이 데이터를 통지하는 시점에 가장 마지막에 받은 데이터를 통지한다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.sample(
Flowable.interval(1000L, TimeUnit.MILLISECONDS)
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료
14. throttleWithTomeout/debounce
원본 생산자에서 데이터를 받고나서 일정 기간 안에 다른 데이터를 받지 않으면 현재 받은 데이터를 통지하는 연산자.
지정 기간에 다음 데이터를 받으면 새로 데이터를 받은 시점부터 다시 지정 기간 안에 다음 데이터가 오는지 확인한다.
throttleWithTomeout
fun main() {
val flowable = Flowable.create(
{ emitter: FlowableEmitter<String?> ->
emitter.onNext("A")
Thread.sleep(1000L)
emitter.onNext("B")
Thread.sleep(300L)
emitter.onNext("C")
Thread.sleep(300L)
emitter.onNext("D")
Thread.sleep(1000L)
emitter.onNext("E")
Thread.sleep(100L)
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.throttleWithTimeout(500L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
}
출력 결과
RxComputationThreadPool-1: A
RxComputationThreadPool-1: D
main: E
main: 완료
데이터 A와 D를 받은 뒤 500밀리초동안 다음 데이터가 오지 않으므로 통지된다.
B와 C는 500밀리초 안에 다음 데이터가 오므로 통지되지 않는다.
마지막 E는 500밀리초를 기다리는 사이에 완료 통지를 받으므로 기다리지 않고 E와 완료를 통지받는다.
debounce
fun main() {
val flowable = Flowable.create(
{ emitter: FlowableEmitter<String?> ->
emitter.onNext("A")
Thread.sleep(1000L)
emitter.onNext("B")
Thread.sleep(300L)
emitter.onNext("C")
Thread.sleep(300L)
emitter.onNext("D")
Thread.sleep(1000L)
emitter.onNext("E")
Thread.sleep(100L)
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.debounce{
Flowable.timer(500L, TimeUnit.MILLISECONDS)
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
RxComputationThreadPool-1: A
RxComputationThreadPool-4: D
main: E
main: 완료
15. elementAt/elementAtOrError
지정한 위치의 데이터만을 통지하는 연산자.
통지할 데이터가 없을 때 처리 방식이 서로 다르다.
반환값은 Single 혹은 Maybe
fun main() {
val maybe = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.elementAt(3)
maybe.subscribe{ data ->
println(data)
}
Thread.sleep(1000L)
}
출력 결과
3
'RxJava' 카테고리의 다른 글
RxJava 05 - Processor/Subject (0) | 2022.02.16 |
---|---|
RxJava 04 - Flowable과 Observable의 연산자 (part.04) (0) | 2022.01.25 |
RxJava 04 - Flowable과 Observable의 연산자 (part.02) (0) | 2022.01.18 |
RxJava 04 - Flowable과 Observable의 연산자 (part.01) (0) | 2022.01.18 |
RxJava 03 - RxJava의 매커니즘 (0) | 2022.01.15 |