RxJava 04 - Flowable과 Observable의 연산자 (part.02)
RxJava Chapter 04 - Flowable과 Observable의 연산자 (part.02)
통지 데이터를 변환하는 연산자
1. map
원본 생산자에서 통지하는 데이터를 변환한 뒤 변환된 데이터를 통지하는 연산자. 한개의 데이터로 여러 데이터를 생성해 통지하거나 통지를 건너뛸 수는 없다. 반드시 null이 아닌 데이터 하나를 반환한다.
fun main() {
val flowable = Flowable.just("A","B","C").map {
it.lowercase()
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: a
main: b
main: c
main: 완료
2. flatMap
map과 달리 여러 데이터가 담긴 생산자를 반환하므로 데이터 한개로 여러 데이터를 통지할 수 있다.
빈 생산자를 반환해 특정 데이터를 통지하지 않거나, 에러 생산자를 반환해 에러를 통지할수도 있다.
flatMap(mapper)
fun main() {
val flowable = Flowable.just("A","B","C").flatMap {
if (it.isNullOrEmpty()) {
Flowable.empty()
}
else {
Flowable.just(it.lowercase())
}
}
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: a
main: b
main: c
main: 완료
flatMap(mapper, combiner)
fun main() {
val flowable = Flowable.just("A","B","C")
.flatMap({
Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3)
},{ sourceData, newData ->
"[$sourceData] $newData"
})
flowable.subscribe(DebugSubscriber())
Thread.sleep(1000L)
}
출력 결과
RxComputationThreadPool-1: [A] 0
RxComputationThreadPool-2: [B] 0
RxComputationThreadPool-2: [C] 0
RxComputationThreadPool-1: [A] 1
RxComputationThreadPool-1: [B] 1
RxComputationThreadPool-1: [C] 1
RxComputationThreadPool-1: [A] 2
RxComputationThreadPool-1: [B] 2
RxComputationThreadPool-1: [C] 2
RxComputationThreadPool-1: 완료
flatMap(onNextMapper, onErrorMapper, onCompleSupplier)
fun main() {
val original = Flowable.just(1,2,0,4,5)
.map { 10/it }
val flowable = original.flatMap({
// 일반 통지 데이터
Flowable.just(it)
}, { // 에러 발생시 데이터
Flowable.just(-1)
}, { // 완료 시 데이터
Flowable.just(100)
}
)
flowable.subscribe(DebugSubscriber())
}
출력 결과
main: 10
main: 5
main: -1
main: 완료
3. concatMap/concatMapDelayError
원본 데이터를 생산자로 반환해 이 변환한 생산자의 데이터를 통지하는 연산자.
flatMap 메서드와 다르게 데이터를 받은 순서대로 생산자를 생성하고 하나씩 실행한다. 여러 데이터를 계속해서 받아도 첫 데이터의 처리가 끝나야 다음 데이터의 생산자를 실행한다.
순서대로 처리하는것을 보증하나, 처리 성능에 영향을 줄 수 있다.
fun main() {
val flowable = Flowable.range(10,3)
.concatMap { sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
.map { data ->
val time = System.currentTimeMillis()
"$time ms: [$sourceData] $data"
}
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-1: 1642486999693 ms: [10] 0
RxComputationThreadPool-1: 1642487000191 ms: [10] 1
RxComputationThreadPool-2: 1642487000698 ms: [11] 0
RxComputationThreadPool-2: 1642487001196 ms: [11] 1
RxComputationThreadPool-3: 1642487001700 ms: [12] 0
RxComputationThreadPool-3: 1642487002200 ms: [12] 1
RxComputationThreadPool-3: 완료
4. concatMapEager/concatMapEagerDelayError
원본 데이터를 생산자로 변환하고 이 생산자의 데이터를 통지하는 연산자.
concatMap과 마찬가지로 데이터 한개로 여러개의 데이터를 생성해 받은 데이터 순서대로 통지할 수 있다.
그러나 concatMap과 달리 받은 데이터로 만든 생산자가 다른 스레드에서 실행될 때는 즉시 실행된다.
concatMapEager(mapper)
fun main() {
val flowable = Flowable.range(10, 3)
.concatMapEager { sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
.map { data ->
val time = System.currentTimeMillis()
"$time + ms: [$sourceData] $data"
}
}
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-2: 1642487289924 + ms: [10] 0
RxComputationThreadPool-3: 1642487290427 + ms: [10] 1
RxComputationThreadPool-1: 1642487289924 + ms: [11] 0
RxComputationThreadPool-1: 1642487290427 + ms: [11] 1
RxComputationThreadPool-1: 1642487289924 + ms: [12] 0
RxComputationThreadPool-1: 1642487290427 + ms: [12] 1
RxComputationThreadPool-1: 완료
데이터 통지를 원본 Flowable이 생성한 순서대로 하지만 시간을 보면 처리 자체는 바로 진행하였음을 볼 수 있다.
concatMapEagerDelay(mapper, tillTheEnd)
tillTheEnd 인자가 true면 모든 데이터를 통지하고 에러를 통지하고, false면 에러 발생 시점에 에러를 통지한다.
fun main() {
val flowable = Flowable.range(10, 3)
.concatMapEagerDelayError( { sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext { data ->
if(sourceData == 11 && data == 1L) {
throw Exception ("예외 발생")
}
}
.map { data ->
val time = System.currentTimeMillis()
"$time + ms: [$sourceData] $data"
}
}, true // or false
)
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
True일 때
RxComputationThreadPool-1: 1642487877490 + ms: [10] 0
RxComputationThreadPool-1: 1642487877981 + ms: [10] 1
RxComputationThreadPool-1: 1642487878481 + ms: [10] 2
RxComputationThreadPool-1: 1642487877490 + ms: [11] 0
RxComputationThreadPool-1: 1642487877490 + ms: [12] 0
RxComputationThreadPool-1: 1642487877997 + ms: [12] 1
RxComputationThreadPool-3: 1642487878496 + ms: [12] 2
RxComputationThreadPool-3: 에러= java.lang.Exception: 예외 발생
False일 때
RxComputationThreadPool-1: 1642487929729 + ms: [10] 0
RxComputationThreadPool-1: 1642487930231 + ms: [10] 1
RxComputationThreadPool-1: 1642487930726 + ms: [10] 2
RxComputationThreadPool-1: 에러= java.lang.Exception: 예외 발생
5. buffer
통지하려는 데이터를 매번 통지하는것이 아니라, 어느정도 모아서 리스트, 컬렉션에 담아서 통지하는 연산자.
buffer(count)
fun main() {
val flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS).take(10).buffer(3)
flowable.subscribe(DebugSubscriber())
Thread.sleep(3000L)
}
출력 결과
RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-1: [3, 4, 5]
RxComputationThreadPool-1: [6, 7, 8]
RxComputationThreadPool-1: [9]
RxComputationThreadPool-1: 완료
buffer(boundaryIndicatorSupplier)
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(7)
.buffer(
Callable<Publisher<Long>> {
Flowable.timer(1000L, TimeUnit.MILLISECONDS)
})
flowable.subscribe(DebugSubscriber())
Thread.sleep(4000L)
}
출력 결과
RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-3: [3, 4, 5]
RxComputationThreadPool-2: [6]
RxComputationThreadPool-2: 완료
데이터는 약 300밀리초 마다 통지하지만, 버퍼에서 꺼내는 작업은 1000밀리초 단위로 진행한다.
buffer(openIndicator/closingIndicator)
openIndicator는 여러 데이터를 통지하지만, closingIndicator는 데이터를 하나만 통지한다.
open에서 데이터를 통지하는 시점은 버퍼링 시작을 나타내지만, closing에서 데이터를 통지하는 시점은 버퍼링 종료를 나타내기 때문.
6. toList
통지되는 데이터를 모두 리스트에 담아 통지하는 연산자.
완료 통지를 받은 시점에 결과 리스트를 통지하므로 완료를 통지하지 않는 생산자에서는 사용할 수 없다.
데이터를 대량으로 통지하는 생산자에서는 버퍼에 쌓이는 데이터가 많아 메모리 부족이 일어날 수 있으므로 주의해야 한다. Single을 반환한다.
fun main() {
val flowable = Flowable.just("A","B","C").toList()
flowable.subscribe { data ->
println(data)
}
}
출력 결과
[A, B, C]
Single을 통지하기 때문에 데이터 통지만으로 처리가 끝나 별도의 완료 통지는 하지 않는다.
7. toMap
통지할 데이터를 key-value 쌍으로 map에 담아 통지하는 연산자.
완료 통지 시점에 결과 map을 통지하므로 완료를 통지하지 않는 생산자에서는 사용할 수 없다.
데이터를 대량으로 통지하는 생산자에서는 버퍼에 쌓이는 데이터가 많아 메모리 부족이 일어날 수 있으므로 주의해야 한다. Single을 반환한다.
toMap(keySelector)
받은 데이터로 map에서 사용할 키를 생성한다
fun main() {
val flowable = Flowable.just("1A","2B","3C","1D","2E").toMap {
it.substring(0,1)
}
flowable.subscribe{ data ->
println(data)
}
}
출력 결과
{1=1D, 2=2E, 3=3C}
1A와 2B는 중복되어 덮어씌워진것을 볼 수 있다.
toMap(keySelector, valueSelector)
받은 데이터로 map에 넣을 값을 생성한다. 통지받은 데이터 그대로가 아닌, 통지받은 데이터로 특정 변환 작업을 한 데이터를 map에 넣을때 사용한다.
fun main() {
val flowable = Flowable.just("1A","2B","3C","1D","2E")
.toMap(
{ data ->
data.substring(0,1)
},{ data ->
data.substring(1)
}
)
flowable.subscribe{ data ->
println(data)
}
}
출력 결과
{1=D, 2=E, 3=C}
8. toMultiMap
key-value를 담은 collection의 쌍으로 이루어진 map을 데이터로 통지하는 연산자.
받은 데이터를 기반으로 키를 작성하고 연관된 컬렉션에 데이터 값을 담은 Map을 작성한 후 원본 생산자가 완료를 통지하는 시점에 결과로 Map을 통지한다.
완료 통지 시점에 결과 map을 통지하므로 완료를 통지하지 않는 생산자에서는 사용할 수 없다.
데이터를 대량으로 통지하는 생산자에서는 버퍼에 쌓이는 데이터가 많아 메모리 부족이 일어날 수 있으므로 주의해야 한다. Single을 반환한다.
fun main() {
val flowable = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(5)
.toMultimap { data ->
if (data % 2L == 0L) {
"짝수"
} else {
"홀수"
}
}
flowable.subscribe{ data ->
println(data)
}
Thread.sleep(3000L)
}
출력 결과
{짝수=[0, 2, 4], 홀수=[1, 3]}
받은 데이터로 생성한 키를 바탕으로 컬렉션 객체에 데이터를 담고, Map을 데이터로 통지한다.