Seoplee
개발의 섭리, Seoplee의 개발
Seoplee
  • 분류 전체보기 (54)
    • Android (26)
      • Architecture (12)
      • Compose (0)
      • Tips (11)
      • 트러블슈팅 (3)
    • IOS (1)
      • Tips (1)
    • Kotlin (1)
    • Coroutine (3)
      • Flow (3)
    • RxJava (12)
    • CI&CD (1)
    • WEB (8)
    • Network (1)
    • ETC (1)
    • (임시) (0)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Seoplee

개발의 섭리, Seoplee의 개발

RxJava

RxJava 04 - Flowable과 Observable의 연산자 (part.04)

2022. 1. 25. 18:52

Flowable/Observable을 결합하는 연산자

1. merge계열 연산자

여러 Flowable/Observable에서 받은 데이터를 하나의 Flowable/Observable로 통지하는 연산자.
여러 생산자가 동시에 데이터를 통지하더라도 결과를 통지할때는 동기화 돼 순차적으로 통지된다.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
    val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
        .map{ data ->
            data + 100L
        }

    val result = Flowable.merge(flowable, flowable2)

    result.subscribe(DebugSubscriber())

    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-2: 100
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-2: 101
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 완료

2. concat계열 연산자

여러개의 Flowable/Observable을 전달받아 하나의 Flowable/Observable로 결합한 후 순차적으로 실행하는 연산자.
결합 전의 통지를 순차로 실행하므로 완료되지 않는 생산자가 포함되면 다음 생산자는 실행되지 않는다.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
    val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
        .map { data ->
            data + 100L
        }

    val result = Flowable.concat(flowable, flowable2)

    result.subscribe(DebugSubscriber())

    Thread.sleep(3000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-2: 100
RxComputationThreadPool-2: 101
RxComputationThreadPool-2: 완료

3. concatEager/concatArrayEager

concat 메서드와 달리 원래의 Flowable/Observable이 한꺼번에 실행되지만 통지를 순차적으로 하는 연산자.

fun main() {
    val flowable1 = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
    val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(5)
        .map { data ->
            data + 100L
        }

    val source = listOf(flowable1, flowable2)
    val result = Flowable.concatEager(source)
    result.subscribe(DebugSubscriber())
    Thread.sleep(3000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-1: 100
RxComputationThreadPool-1: 101
RxComputationThreadPool-1: 102
RxComputationThreadPool-2: 103
RxComputationThreadPool-2: 104
RxComputationThreadPool-2: 완료

4. startWith/startWithArray

인자의 데이터를 통지하고나서 자신의 데이터를 통지하는 연산자.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
    val other = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(2)
        .map { data ->
            data + 100L
        }

    val result = flowable.startWith(other)
    result.subscribe(DebugSubscriber())

    Thread.sleep(3000L)
}

출력 결과

RxComputationThreadPool-1: 100
RxComputationThreadPool-1: 101
RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
RxComputationThreadPool-2: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-2: 완료

5. zip/zipWith

인자로 전달된 여러개의 Flowable/Observable에서 데이터를 받아 데이터들이 모인 시점에 이 데이터들을 함수형 인터페이스에 전달하고, 이 함수형 인터페이스에서 새로 생성한 데이터를 결과로 통지하는 연산자.

새로운 데이터는 모두 생산자에서 동일한 순서의 데이터를 받아 생성한다. 따라서 인자로 전달받은 각 생산자의 통지 시점이 다르면 가장 느리게 처리한 생산자가 데이터를 처리한 시점에 새로운 데이터가 생성된다.

완료 통지 시점은 통지하는 데이터 개수가 가장 적은 인자 생산자에 맞춘다.

fun main() {
    val flowable1 = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
    val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(3)
        .map { data ->
            data + 100L
        }

    val result = Flowable.zip(flowable1, flowable2) { data1, data2 ->
        listOf(data1, data2)
    }

    result.subscribe(DebugSubscriber())

    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-2: [1, 101]
RxComputationThreadPool-2: [2, 102]
RxComputationThreadPool-2: 완료

6. combineLatest/combineLatestDelayError

인자로 받은 여러 Flowable/Observable이 데이터를 받는 시점에 각 Flowable/Observable이 마지막으로 통지한 데이터를 함수형 인터페이스에 전달하고 이 데이터를 받아 새로 데이터를 생성해 통지하는 연산자.

fun main() {
    val flowable1 = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(5)
    val flowable2 = Flowable.interval(500L, TimeUnit.MILLISECONDS).take(3)
        .map { data ->
            data + 100L
        }

    val result = Flowable.combineLatest(flowable1, flowable2) { data1, data2 ->
        listOf(data1, data2)
    }

    result.subscribe(DebugSubscriber())
    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-1: [1, 100]
RxComputationThreadPool-1: [2, 100]
RxComputationThreadPool-2: [2, 101]
RxComputationThreadPool-1: [3, 101]
RxComputationThreadPool-1: [4, 101]
RxComputationThreadPool-1: [4, 102]
RxComputationThreadPool-1: 완료

Flowable/Observable 상태를 통지하는 연산자

1. isEmpty

Flowable/Observable이 통지할 데이터가 있는지 판단하는 연산자.
원본 Flowable에서 데이터 없이 완료만 통지되면 true를, 데이터가 통지되면 false를 Single로 반환한다.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).take(3)
        .filter { data ->
            data >= 3
        }
        .isEmpty

    flowable.subscribe{ data ->
        println(data)
    }

    Thread.sleep(4000L)
}

출력 결과

true

2. contains

인자의 데이터가 Flowable/Observable에 포함됐는지 판단하는 연산자.

원봉 Flowable/Observable이 지정 데이터를 포함하면 true를, 없거나 통지할 데이터가 없으면 false를 Single로 통지한다.

fun main() {
    val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).contains(3L)

    flowable.subscribe{ data ->
        println(data)
    }
    Thread.sleep(4000L)
}

출력 결과

true

원본 Flowable이 해당 데이터를 통지하기 전까지는 결과를 판단할 수 없기 때문에 시간이 걸린다.

3. all

Flowable/Observable이 통지하는 모든 데이터가 설정한 조건에 맞는지 판단하는 연산자.

fun main() {
    val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(3L)
        .all{ data ->
            data <5
        }
    flowable.subscribe{ data ->
        println(data)
    }
    Thread.sleep(4000L)
}

출력 결과

true

4. sequenceEqual

인자로 전달된 두개의 Flowable/Observable이 통지하는 데이터가 동일한지, 같은 순서로 같은 수를 통지하는지 판단하는 연산자.
통지 시점은 비교하지 않는다.

fun main() {
    val flowable1 = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(3)
    val flowable2 = Flowable.just(0L, 1L, 2L)

    val result = Flowable.sequenceEqual(flowable1, flowable2)

    result.subscribe{ data ->
        println(data)
    }
    Thread.sleep(4000L)
}

출력 결과

true

5. count

Flowable/Observable의 데이터 개수를 통지하는 연산자.
Long타입으로 Single을 반환한다.

Flowable/Observable 데이터를 집계하는 연산자

1. reduce/reduceWith

설정한 집계 방식으로 Flowable/Observable의 데이터를 계산하고 최종 결과를 통지하는 연산자.
초기값을 받으면 Single, 받지 않으면 Maybe를 반환한다.

fun main() {
    val flowable = Flowable.just(1, 10, 100, 1000, 10000)
        .reduce(0) { sum, data ->
            sum + data
        }

    flowable.subscribe { data ->
        println(data)
    }
}

출력 결과

11111

2. scan

원본 Flowable/Observable이 통지한 데이터를 인자의 함수형 인터페이스를 사용해 집계하는 연산자.
계산할 때 마다 생성되는 결과값을 통지한다.
초기값이 있으면 초기값을 첫번째 데이터로 통지한다.

fun main() {
    val flowable = Flowable.just(1, 10, 100, 1000, 10000)
        .scan(0) { sum, data ->
            sum + data
        }

    flowable.subscribe { data ->
        println(data)
    }
}

출력 결과

0
1
11
111
1111
11111

유틸리티 연산자

3. repeat

원본 Flowable/Observable이 처리를 완료하면 데이터 통지를 처음부터 반복하는 연산자.
인자가 없으면 완료하지 않고 데이터를 반복 통지, 있으면 지정한 숫자만큼 반복 통지한다.

fun main() {
    val flowable = Flowable.just("A","B","C").repeat(2)

    flowable.subscribe(DebugSubscriber())
}

출력 결과

main: A
main: B
main: C
main: A
main: B
main: C
main: 완료

4. repeatUntil

지정한 조건이 될 때 까지 Flowable/Observable의 데이터 통지를 반복하는 연산자.

fun main() {

    val startTime = System.currentTimeMillis()

    val flowable = Flowable.interval(100L,TimeUnit.MILLISECONDS).take(3)
        .repeatUntil{
            println("called")
            System.currentTimeMillis() - startTime > 500L
        }
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(1000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
called
RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
called
RxComputationThreadPool-2: 완료

5. repeatWhen

지정한 시점까지 원본 Flowable/Observable의 통지를 반복하는 연산자.
반복 여부는 repeatWhen 메서드의 인자인 함수형 인터페이스에서 판단한다.

fun main() {
    val flowable = Flowable.just(1,2,3).repeatWhen { hanlder ->
        hanlder.delay(100L, TimeUnit.MILLISECONDS).take(2).doOnNext {
            println("emit: $it")
        }.doOnComplete {
            println("complete")
        }.map {
            val time = System.currentTimeMillis()
            "$time ms: $it"
        }
    }
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(5000L)
}

출력결과

main: 1
main: 2
main: 3
emit: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
emit: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
complete
RxComputationThreadPool-1: 완료

완료 통지를 알리는 Flowable을 repeatWhen의 메서드의 인자로 받아 반복을 제어하는 Flowable을 생성하는 함수형 인터페이스를 구현한다.

6. delay

Flowable/Observable에서 받은 데이터를 설정한 기간만큼 지연해 통지하는 연산자.
지연 기간은 직접 시간을 지정하거나, 통지 지연 시점을 함수형 인터페이스로 설정할 수 있다.

fun main() {
    println("처리 시작 :  ${System.currentTimeMillis()}")

    val flowable = Flowable.create({ emitter: FlowableEmitter<String> ->
        println("구독 시작 : ${System.currentTimeMillis()}")

        emitter.onNext("A")
        emitter.onNext("B")
        emitter.onNext("C")

        emitter.onComplete()
    }, BackpressureStrategy.BUFFER)
        .delay(2000L, TimeUnit.MILLISECONDS)
        .doOnNext { println("통지 시각 : ${System.currentTimeMillis()}") }

    flowable.subscribe(DebugSubscriber())
    Thread.sleep(3000L)
}

출력 결과

처리 시작 : 1643102620593
구독 시작 : 1643102620735
통지 시각 : 1643102622750
RxComputationThreadPool-1: A
통지 시각 : 1643102622750
RxComputationThreadPool-1: B
통지 시각 : 1643102622751
RxComputationThreadPool-1: C
RxComputationThreadPool-1: 완료

처리시작 시간과 통지시각의 차이가 약 2000밀리초인것을 보면 데이터가 delay메서드로 설정한 시간만큼 지연돼 통지된 것을 볼 수 있다.
또한 처리 시작과 구독 시작은 시간 차가 거의 없으므로 구독 자체가 지연되는것은 아닌것을 볼 수 있다.

7. delaySubscription

Flowable/Observable의 처리 시작을 지연하는 연산자.
예를들어 Cold한 생산자는 subscribe를 실행하면 바로 처리를 시작하지만, delaySubscription을 사용하면 설정한 기간을 기다린 후에 처리를 시작한다.

fun main() {
    println("처리 시작 : ${System.currentTimeMillis()}")

    val flowable = Flowable.create({ emitter: FlowableEmitter<String> ->
        println("구독 시작 : ${System.currentTimeMillis()}")

        emitter.onNext("A")
        emitter.onNext("B")
        emitter.onNext("C")

        emitter.onComplete()
    }, BackpressureStrategy.BUFFER)
        .delaySubscription(2000L, TimeUnit.MILLISECONDS)

    flowable.subscribe(DebugSubscriber())
    Thread.sleep(3000L)
}

출력 결과

처리 시작 : 1643102817522
구독 시작 : 1643102819683
RxComputationThreadPool-1: A
RxComputationThreadPool-1: B
RxComputationThreadPool-1: C
RxComputationThreadPool-1: 완료

처리시작과 구독시작의 시간차이가 약 2000밀리초인것을 보면 지정한 시간만큼 처리를 늦게 시작한것을 알 수 있다.

8. timeout

데이터를 통지할 때 설정된 시간을 넘기면 에러를 통지하거나 대체 Flowable/Observable의 데이터를 통지하는 연산자.
인자로 대체하는 생산자가 없으면 에러를 통지하며 이때 에러 객체는 TimeoutException이다.

fun main() {
    println("처리 시작 : ${System.currentTimeMillis()}")

    val flowable = Flowable.create({ emitter: FlowableEmitter<Int> ->
        println("구독 시작 : ${System.currentTimeMillis()}")

        emitter.onNext(1)
        emitter.onNext(2)
        try {
            Thread.sleep(1200L)
        } catch (e: InterruptedException) {
            emitter.onError(e)
            return@create
        }
        emitter.onNext(3)
        emitter.onComplete()

    }, BackpressureStrategy.BUFFER)
        .timeout(1000L, TimeUnit.MILLISECONDS)

    flowable.subscribe(DebugSubscriber())
    Thread.sleep(2000L)
}

출력 결과

처리 시작 : 1643103077418
구독 시작 : 1643103077552
main: 1
main: 2
RxComputationThreadPool-1: 에러= java.util.concurrent.TimeoutException: The source did not signal an event for 1000 milliseconds and has been terminated.

설정한 타임아웃 값 (1000밀리초)이 지나면서 TimeoutException 에러가 발생하였다.

저작자표시 (새창열림)

'RxJava' 카테고리의 다른 글

RxJava 06 - RxJava의 디버깅과 테스트  (0) 2022.02.16
RxJava 05 - Processor/Subject  (0) 2022.02.16
RxJava 04 - Flowable과 Observable의 연산자 (part.03)  (0) 2022.01.18
RxJava 04 - Flowable과 Observable의 연산자 (part.02)  (0) 2022.01.18
RxJava 04 - Flowable과 Observable의 연산자 (part.01)  (0) 2022.01.18
    'RxJava' 카테고리의 다른 글
    • RxJava 06 - RxJava의 디버깅과 테스트
    • RxJava 05 - Processor/Subject
    • RxJava 04 - Flowable과 Observable의 연산자 (part.03)
    • RxJava 04 - Flowable과 Observable의 연산자 (part.02)
    Seoplee
    Seoplee
    개발공부를 하며 기록할만한 것들을 정리해놓은 블로그입니다.

    티스토리툴바