RxJava Chapter 04 - Flowable과 Observable의 연산자
RxJava는 데이터 소스 역할을 하는 Flowable/Observable을 생성하는 메서드와 데이터를 변환하거나 선별하는 작업을 거쳐 새로운 Flowable/Observable을 생성하는 메서드를 제공한다.
Flowable/Observable을 생성하는 연산자
1. just
인자로 받은 데이터를 통지하는 Flowable/Observable을 생성하는 연산자.
인자는 최대 10개까지 시정할 수 있으며, 순서대로 통지한다. 모든 데이터를 통지하면 완료를 통지한다.
fun main() {
val flowable = Flowable.just("A","B","C")
flowable.subscribe(DebugSubscriber())
}
출력결과
main: A
main: B
main: C
main: 완료
2. fromAwrray/fromIterable
인자로 지정한 배열, Iterable에 담긴 객체등을 순서대로 통지하는 연산자.
모든 데이터를 통지하면 완료를 통지한다.
fun main() {
val flowable = Flowable.fromArray("A","B","C")
flowable.subscribe(DebugSubscriber())
}
3. fromCallable
인자로 지정한 java.util.concurrent.Callable 인터페이스에서 생성한 데이터를 통지하는 연산자.
데이터를 통지하면 완료를 통지하고, 통지하는 데이터는 호출될 때 마다 새로 생성된다.
fun main() {
val flowable = Flowable.fromCallable { System.currentTimeMillis() }
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: 1642480854348
main: 완료
4. range/rangeLong
지정한 시작 숫자부터 지정한 개수만큼 증가하는 Integer,Long 데이터를 통지하는 연산자.
fun main() {
val flowable = Flowable.range(10, 3)
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: 10
main: 11
main: 12
main: 완료
5. interval
지정한 통지 간격마다 0부터 시작하는 long 타입의 숫자 데이터를 통지하는 연산자.
별도 설정 없이는 Schedulers.computation()의 스케쥴러에서 실행된다.
최초 통지 데이터인 0을 통지하는 시점은 별도 설정이 없다면 지정한 시간만큼 시간이 지난 뒤에 통지한다.
완료를 통지할 수 없으므로 take 메서드 등으로 통지할 데이터 개수를 제한하여 사용한다.
fun main() {
val timeFormat = DateTimeFormatter.ofPattern("mm:ss.SSS");
val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS)
println("시작 시각: ${LocalTime.now().format(timeFormat)}")
flowable.subscribe{
val threadName = Thread.currentThread().name
val time = LocalTime.now().format(timeFormat)
println("$threadName : $time : data = $it")
}
Thread.sleep(5000L)
}
출력 결과
시작 시각: 00:47.144
RxComputationThreadPool-1 : 00:48.175 : data = 0
RxComputationThreadPool-1 : 00:49.185 : data = 1
RxComputationThreadPool-1 : 00:50.184 : data = 2
RxComputationThreadPool-1 : 00:51.182 : data = 3
RxComputationThreadPool-1 : 00:52.176 : data = 4
6. timer
호출 시점부터 지정 시간동안 대기한 후, long타입 숫자 0 하나만 통지하고 종료하는 연산자.
기본적으로 Schedulers.computation()의 스케쥴러에서 실행한다.
fun main() {
val timeFormat = DateTimeFormatter.ofPattern("mm:ss.SSS");
println("시작 시각: ${LocalTime.now().format(timeFormat)}")
val flowable = Flowable.timer(1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(
{ data ->
val threadName = Thread.currentThread().name
val time = LocalTime.now().format(timeFormat)
println("$threadName: $time: data=$data")
},
{ error ->
println("에러=$error")
}
)
{ println("완료") }
Thread.sleep(5000L)
}
출력 결과
시작 시각: 07:53.014
RxComputationThreadPool-1: 07:54.182: data=0
완료
7. defer
구독이 발생할 때 마다 함수형 인터페이스로 정의한 새로운 생산자를 생성하는 연산자.
통지하는 데이터는 새로 생성한 Flowable/Observable의 데이터.
선언한 시점의 데이터를 통지하는 것이 아니라, 호출 시점에 데이터 생성이 필요할 때 사용한다.
fun main() {
val flowable = Flowable.defer {
Flowable.just(LocalTime.now())
}
flowable.subscribe(DebugSubscriber("No. 1"))
Thread.sleep(2000L)
flowable.subscribe(DebugSubscriber("No. 2"))
}
출력 결과
main: No. 1: 14:17:14.132401400
main: No. 1: 완료
main: No. 2: 14:17:16.143880100
main: No. 2: 완료
8. empty
빈 생산자를 생성하는 연산자.
처리를 시작하면 바로 완료를 통지한다.
단독으로는 거의 사용하지 않고, flatMap 메서드의 통지 데이터가 null일때 이를 대신해 empty메서드에서 생성한 생산자로 이후 데이터를 통지 대상에서 제외하는 등의 작업을 할 수 있다.
9. error
에러만 통지하는 생산자를 생성하는 연산자.
처리를 시작하면 에러 객체와 함께 에러를 통지한다.
단독으로 거의 사용하지 않으며 flatMap 메서드를 처리하는 도중 에러를 통지하고 싶을 때 error메서드로 생성한 생산자를 반환해 명시적으로 에러를 통지하는 등의 용도로 사용할 수 있다.
10. never
아무것도 통지하는 생산자를 생성하는 연산자.
완료도 통지하지 않기 때문에 완료를 통지하는 empty와 구분된다.
통지 데이터를 변환하는 연산자
1. map
원본 생산자에서 통지하는 데이터를 변환한 뒤 변환된 데이터를 통지하는 연산자. 한개의 데이터로 여러 데이터를 생성해 통지하거나 통지를 건너뛸 수는 없다. 반드시 null이 아닌 데이터 하나를 반환한다.
fun main() {
val flowable = Flowable.just("A","B","C").map {
it.lowercase()
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: a
main: b
main: c
main: 완료
2. flatMap
map과 달리 여러 데이터가 담긴 생산자를 반환하므로 데이터 한개로 여러 데이터를 통지할 수 있다.
빈 생산자를 반환해 특정 데이터를 통지하지 않거나, 에러 생산자를 반환해 에러를 통지할수도 있다.
flatMap(mapper)
fun main() {
val flowable = Flowable.just("A","B","C").flatMap {
if (it.isNullOrEmpty()) {
Flowable.empty()
}
else {
Flowable.just(it.lowercase())
}
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: a
main: b
main: c
main: 완료
flatMap(mapper, combiner)
fun main() {
val flowable = Flowable.just("A","B","C")
.flatMap({
Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3)
},{ sourceData, newData ->
"[$sourceData] $newData"
})
flowable.subscribe(DebugSubscriber())
Thread.sleep(1000L)
}
출력 결과
RxComputationThreadPool-1: [A] 0
RxComputationThreadPool-2: [B] 0
RxComputationThreadPool-2: [C] 0
RxComputationThreadPool-1: [A] 1
RxComputationThreadPool-1: [B] 1
RxComputationThreadPool-1: [C] 1
RxComputationThreadPool-1: [A] 2
RxComputationThreadPool-1: [B] 2
RxComputationThreadPool-1: [C] 2
RxComputationThreadPool-1: 완료
flatMap(onNextMapper, onErrorMapper, onCompleSupplier)
fun main() {
val original = Flowable.just(1,2,0,4,5)
.map { 10/it }
val flowable = original.flatMap({
// 일반 통지 데이터
Flowable.just(it)
}, { // 에러 발생시 데이터
Flowable.just(-1)
}, { // 완료 시 데이터
Flowable.just(100)
}
)
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: 10
main: 5
main: -1
main: 완료
3. concatMap/concatMapDelayError
원본 데이터를 생산자로 반환해 이 변환한 생산자의 데이터를 통지하는 연산자.
flatMap 메서드와 다르게 데이터를 받은 순서대로 생산자를 생성하고 하나씩 실행한다. 여러 데이터를 계속해서 받아도 첫 데이터의 처리가 끝나야 다음 데이터의 생산자를 실행한다.
순서대로 처리하는것을 보증하나, 처리 성능에 영향을 줄 수 있다.
fun main() {
val flowable = Flowable.range(10,3)
.concatMap { sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
.map { data ->
val time = System.currentTimeMillis()
"$time ms: [$sourceData] $data"
}
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-1: 1642486999693 ms: [10] 0
RxComputationThreadPool-1: 1642487000191 ms: [10] 1
RxComputationThreadPool-2: 1642487000698 ms: [11] 0
RxComputationThreadPool-2: 1642487001196 ms: [11] 1
RxComputationThreadPool-3: 1642487001700 ms: [12] 0
RxComputationThreadPool-3: 1642487002200 ms: [12] 1
RxComputationThreadPool-3: 완료
4. concatMapEager/concatMapEagerDelayError
원본 데이터를 생산자로 변환하고 이 생산자의 데이터를 통지하는 연산자.
concatMap과 마찬가지로 데이터 한개로 여러개의 데이터를 생성해 받은 데이터 순서대로 통지할 수 있다.
그러나 concatMap과 달리 받은 데이터로 만든 생산자가 다른 스레드에서 실행될 때는 즉시 실행된다.
concatMapEager(mapper)
fun main() {
val flowable = Flowable.range(10, 3)
.concatMapEager { sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
.map { data ->
val time = System.currentTimeMillis()
"$time + ms: [$sourceData] $data"
}
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-2: 1642487289924 + ms: [10] 0
RxComputationThreadPool-3: 1642487290427 + ms: [10] 1
RxComputationThreadPool-1: 1642487289924 + ms: [11] 0
RxComputationThreadPool-1: 1642487290427 + ms: [11] 1
RxComputationThreadPool-1: 1642487289924 + ms: [12] 0
RxComputationThreadPool-1: 1642487290427 + ms: [12] 1
RxComputationThreadPool-1: 완료
데이터 통지를 원본 Flowable이 생성한 순서대로 하지만 시간을 보면 처리 자체는 바로 진행하였음을 볼 수 있다.
concatMapEagerDelay(mapper, tillTheEnd)
tillTheEnd 인자가 true면 모든 데이터를 통지하고 에러를 통지하고, false면 에러 발생 시점에 에러를 통지한다.
fun main() {
val flowable = Flowable.range(10, 3)
.concatMapEagerDelayError( { sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext { data ->
if(sourceData == 11 && data == 1L) {
throw Exception ("예외 발생")
}
}
.map { data ->
val time = System.currentTimeMillis()
"$time + ms: [$sourceData] $data"
}
}, true // or false
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
True일 때
RxComputationThreadPool-1: 1642487877490 + ms: [10] 0
RxComputationThreadPool-1: 1642487877981 + ms: [10] 1
RxComputationThreadPool-1: 1642487878481 + ms: [10] 2
RxComputationThreadPool-1: 1642487877490 + ms: [11] 0
RxComputationThreadPool-1: 1642487877490 + ms: [12] 0
RxComputationThreadPool-1: 1642487877997 + ms: [12] 1
RxComputationThreadPool-3: 1642487878496 + ms: [12] 2
RxComputationThreadPool-3: 에러= java.lang.Exception: 예외 발생
False일 때
RxComputationThreadPool-1: 1642487929729 + ms: [10] 0
RxComputationThreadPool-1: 1642487930231 + ms: [10] 1
RxComputationThreadPool-1: 1642487930726 + ms: [10] 2
RxComputationThreadPool-1: 에러= java.lang.Exception: 예외 발생
5. buffer
통지하려는 데이터를 매번 통지하는것이 아니라, 어느정도 모아서 리스트, 컬렉션에 담아서 통지하는 연산자.
buffer(count)
fun main() {
val flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS).take(10).buffer(3)
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-1: [3, 4, 5]
RxComputationThreadPool-1: [6, 7, 8]
RxComputationThreadPool-1: [9]
RxComputationThreadPool-1: 완료
buffer(boundaryIndicatorSupplier)
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(7)
.buffer(
Callable<Publisher<Long>> {
Flowable.timer(1000L, TimeUnit.MILLISECONDS)
})
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-3: [3, 4, 5]
RxComputationThreadPool-2: [6]
RxComputationThreadPool-2: 완료
데이터는 약 300밀리초 마다 통지하지만, 버퍼에서 꺼내는 작업은 1000밀리초 단위로 진행한다.
buffer(openIndicator/closingIndicator)
openIndicator는 여러 데이터를 통지하지만, closingIndicator는 데이터를 하나만 통지한다.
open에서 데이터를 통지하는 시점은 버퍼링 시작을 나타내지만, closing에서 데이터를 통지하는 시점은 버퍼링 종료를 나타내기 때문.
6. toList
통지되는 데이터를 모두 리스트에 담아 통지하는 연산자.
완료 통지를 받은 시점에 결과 리스트를 통지하므로 완료를 통지하지 않는 생산자에서는 사용할 수 없다.
데이터를 대량으로 통지하는 생산자에서는 버퍼에 쌓이는 데이터가 많아 메모리 부족이 일어날 수 있으므로 주의해야 한다. Single을 반환한다.
fun main() {
val flowable = Flowable.just("A","B","C").toList()
flowable.subscribe { data ->
println(data)
}
}
출력 결과
[A, B, C]
Single을 통지하기 때문에 데이터 통지만으로 처리가 끝나 별도의 완료 통지는 하지 않는다.
7. toMap
통지할 데이터를 key-value 쌍으로 map에 담아 통지하는 연산자.
완료 통지 시점에 결과 map을 통지하므로 완료를 통지하지 않는 생산자에서는 사용할 수 없다.
데이터를 대량으로 통지하는 생산자에서는 버퍼에 쌓이는 데이터가 많아 메모리 부족이 일어날 수 있으므로 주의해야 한다. Single을 반환한다.
toMap(keySelector)
받은 데이터로 map에서 사용할 키를 생성한다
fun main() {
val flowable = Flowable.just("1A","2B","3C","1D","2E").toMap {
it.substring(0,1)
}
flowable.subscribe{ data ->
println(data)
}
}
출력 결과
{1=1D, 2=2E, 3=3C}
1A와 2B는 중복되어 덮어씌워진것을 볼 수 있다.
toMap(keySelector, valueSelector)
받은 데이터로 map에 넣을 값을 생성한다. 통지받은 데이터 그대로가 아닌, 통지받은 데이터로 특정 변환 작업을 한 데이터를 map에 넣을때 사용한다.
fun main() {
val flowable = Flowable.just("1A","2B","3C","1D","2E")
.toMap(
{ data ->
data.substring(0,1)
},{ data ->
data.substring(1)
}
)
flowable.subscribe{ data ->
println(data)
}
}
출력 결과
{1=D, 2=E, 3=C}
8. toMultiMap
key-value를 담은 collection의 쌍으로 이루어진 map을 데이터로 통지하는 연산자.
받은 데이터를 기반으로 키를 작성하고 연관된 컬렉션에 데이터 값을 담은 Map을 작성한 후 원본 생산자가 완료를 통지하는 시점에 결과로 Map을 통지한다.
완료 통지 시점에 결과 map을 통지하므로 완료를 통지하지 않는 생산자에서는 사용할 수 없다.
데이터를 대량으로 통지하는 생산자에서는 버퍼에 쌓이는 데이터가 많아 메모리 부족이 일어날 수 있으므로 주의해야 한다. Single을 반환한다.
fun main() {
val flowable = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(5)
.toMultimap { data ->
if (data % 2L == 0L) {
"짝수"
} else {
"홀수"
}
}
flowable.subscribe{ data ->
println(data)
}
Thread.sleep(3000L)
}
출력 결과
{짝수=[0, 2, 4], 홀수=[1, 3]}
받은 데이터로 생성한 키를 바탕으로 컬렉션 객체에 데이터를 담고, Map을 데이터로 통지한다.
통지 데이터를 제한하는 연산자
1. filter
받은 데이터가 조건에 맞는지 판정해 true인것만 통지하는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.filter{ data ->
data % 2L == 0L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 6
RxComputationThreadPool-1: 8
2. distinct
통지하려는 데이터가 이미 통지된 데이터와 같다면 통지하지 않는 연산자. 내부의 HashSet을 이용하여 판별한다.
distinct()
fun main() {
val flowable = Flowable.just("A","B","b","A","a","B","b")
.distinct()
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: A
main: B
main: b
main: a
main: 완료
distinct(keySelector)
데이터를 비교하고 통지하며, 통지하는 데이터는 원본 데이터이다.
fun main() {
val flowable = Flowable.just("A","B","b","A","a","B","b")
.distinct{
it.lowercase()
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: A
main: B
main: 완료
3. distinctUntilChanged
같은 데이터를 연속으로 받을 때 이 데이터를 제외하고 통지하는 연산자.
distinctUntilChanged()
fun main() {
val flowable = Flowable.just("A","a","a","A","a")
.distinctUntilChanged()
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: A
main: a
main: A
main: a
main: 완료
distinctUntilChanged(comparer)
fun main() {
val flowable = Flowable.just("1", "1.0", "0.1", "0.10", "1")
.distinctUntilChanged { data1, data2 ->
val convert1 = BigDecimal(data1)
val convert2 = BigDecimal(data2)
convert1.compareTo(convert2) == 0
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: 1
main: 0.1
main: 1
main: 완료
4. take
지정한 개수나 기간에 도달할 때 까지 받은 데이터를 통지하는 연산자.
5. takeUntil
인자로 지정한 조건이 될 때 까지 데이터를 통지하는 연산자.
takeUntil(stopPredicate)
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeUntil { data ->
data == 3L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 완료
takeUntil(other)
takeUntil 메서드의 인자 Flowable이 데이터를 통지할 때 까지 받은 데이터를 통지한다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeUntil(
Flowable.timer(1000L, TimeUnit.MILLISECONDS)
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
RxComputationThreadPool-1: 완료
경과 시간 | 0 | 300 | 600 | 900 | 1000 |
---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | ||
인자 Flowable | 0 | ||||
결과 통지 | 0 | 1 | 2 | 완료 |
6. takeWhile
인자로 지정한 조건이 true일 동안 받은 데이터를 통지하는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeWhile { data ->
data != 3L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료
7. takeLast
원본 생산자가 완료될 때 마지막 데이터부터 지정한 개수나 지정한 기간의 데이터만을 세어 통지하는 연산자.
takeLast(count)
fun main() {
val flowable = Flowable.interval(800L, TimeUnit.MILLISECONDS)
.take(5)
.takeLast(2)
flowable.subscribe(DebugSubscriber())
Thread.sleep(5000L)
}
출력 결과
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 완료
takeLast(count, time, unit)
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(10)
.takeLast(2, 1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 9
RxComputationThreadPool-1: 완료
경과 시간 | 0 | 300 | 600 | 900 | 1200 | 1500 | 1800 | 2100 | 2400 | 2700 | 3000 |
---|---|---|---|---|---|---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | |
지정 기간의 데이터 | 6 | 7 | 8 | 9 | |||||||
통지 대상의 데이터 | 8 | 9 |
완료 통지 전 1000밀리초동안 통지된 데이터 중 끝에서부터 2건을 통지한다.
8. skip
앞에서부터 지정한 만큼 데이터를 건너 뛴 후 나머지 데이터를 통지하는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skip(2)
flowable.subscribe(DebugSubscriber())
Thread.sleep(1500L)
}
출력 결과
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
(앞의 2개 0과 1을 통지하지 않는다)
9. skipUntil
인자로 지정한 생산자가 데이터를 통지할때까지 결과를 통지하지 않는다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipUntil (
Flowable.timer(1000L,TimeUnit.MILLISECONDS)
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-2: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-2: 5
경과 시간 | 0 | 300 | 600 | 900 | 1000 | 1200 | 1500 | 1800 |
---|---|---|---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | 3 | 4 | 5 | ||
인자 Flowable | 0 | |||||||
통지 | 3 | 4 | 5 |
10. skipWhile
인자로 지정한 조건이 true일 때 데이터를 통지하지 않는 연산자.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipWhile{ data ->
data != 3L
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 5
데이터가 3이 되기 전까지 skip하였다.
11. skipLast
통지 데이터 중에서 끝에서부터 지정한 범위만큼 통지하지 않는 연산자.
fun main() {
val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS)
.take(5)
.skipLast(2)
flowable.subscribe(DebugSubscriber())
Thread.sleep(6000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료
12. throttleFirst
데이터를 통지하고나면 지정 시간동안 통지되는ㄷ ㅔ이터는 파기하는 연산자로 처리가 완료될 때 까지 반복한다.
단기간에 들어오는 데이터가 모두 필요한 것이 아니라면 데이터를 쳐낼 수 있다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(10)
.throttleFirst(1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(6000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 완료
13. throttleLast/sample
인자로 받은 시간마다 가장 마지막에 통지되는 데이터만 통지하는 연산자.
throttleLast
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.throttleLast(1000L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료
경과 시간 | 0 | 300 | 600 | 900 | 1000 | 1200 | 1500 | 1800 | 2000 | 2100 | 2400 | 2700 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
원본 Flowable | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | |||
결과 데이터 | 2 | 5 |
sample
인자 Flowable이 데이터를 통지하는 시점에 가장 마지막에 받은 데이터를 통지한다.
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.sample(
Flowable.interval(1000L, TimeUnit.MILLISECONDS)
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료
14. throttleWithTomeout/debounce
원본 생산자에서 데이터를 받고나서 일정 기간 안에 다른 데이터를 받지 않으면 현재 받은 데이터를 통지하는 연산자.
지정 기간에 다음 데이터를 받으면 새로 데이터를 받은 시점부터 다시 지정 기간 안에 다음 데이터가 오는지 확인한다.
throttleWithTomeout
fun main() {
val flowable = Flowable.create(
{ emitter: FlowableEmitter<String?> ->
emitter.onNext("A")
Thread.sleep(1000L)
emitter.onNext("B")
Thread.sleep(300L)
emitter.onNext("C")
Thread.sleep(300L)
emitter.onNext("D")
Thread.sleep(1000L)
emitter.onNext("E")
Thread.sleep(100L)
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.throttleWithTimeout(500L, TimeUnit.MILLISECONDS)
flowable.subscribe(DebugSubscriber())
}
출력 결과
RxComputationThreadPool-1: A
RxComputationThreadPool-1: D
main: E
main: 완료
데이터 A와 D를 받은 뒤 500밀리초동안 다음 데이터가 오지 않으므로 통지된다.
B와 C는 500밀리초 안에 다음 데이터가 오므로 통지되지 않는다.
마지막 E는 500밀리초를 기다리는 사이에 완료 통지를 받으므로 기다리지 않고 E와 완료를 통지받는다.
debounce
fun main() {
val flowable = Flowable.create(
{ emitter: FlowableEmitter<String?> ->
emitter.onNext("A")
Thread.sleep(1000L)
emitter.onNext("B")
Thread.sleep(300L)
emitter.onNext("C")
Thread.sleep(300L)
emitter.onNext("D")
Thread.sleep(1000L)
emitter.onNext("E")
Thread.sleep(100L)
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
.debounce{
Flowable.timer(500L, TimeUnit.MILLISECONDS)
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
RxComputationThreadPool-1: A
RxComputationThreadPool-4: D
main: E
main: 완료
15. elementAt/elementAtOrError
지정한 위치의 데이터만을 통지하는 연산자.
통지할 데이터가 없을 때 처리 방식이 서로 다르다.
반환값은 Single 혹은 Maybe
fun main() {
val maybe = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.elementAt(3)
maybe.subscribe{ data ->
println(data)
}
Thread.sleep(1000L)
}
출력 결과
3
'RxJava' 카테고리의 다른 글
RxJava 04 - Flowable과 Observable의 연산자 (part.03) (0) | 2022.01.18 |
---|---|
RxJava 04 - Flowable과 Observable의 연산자 (part.02) (0) | 2022.01.18 |
RxJava 03 - RxJava의 매커니즘 (0) | 2022.01.15 |
RxJava 02 - RxJava를 사용하는 데 필요한 배경 지식 (0) | 2022.01.13 |
RxJava 01 - (번외) Hot Observable 예제 in Android (0) | 2022.01.04 |