Seoplee
개발의 섭리, Seoplee의 개발
Seoplee
  • 분류 전체보기 (54)
    • Android (26)
      • Architecture (12)
      • Compose (0)
      • Tips (11)
      • 트러블슈팅 (3)
    • IOS (1)
      • Tips (1)
    • Kotlin (1)
    • Coroutine (3)
      • Flow (3)
    • RxJava (12)
    • CI&CD (1)
    • WEB (8)
    • Network (1)
    • ETC (1)
    • (임시) (0)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Seoplee

개발의 섭리, Seoplee의 개발

RxJava

RxJava 04 - Flowable과 Observable의 연산자 (part.03)

2022. 1. 18. 18:48

RxJava 04 - Flowable과 Observable의 연산자 (part.03)


통지 데이터를 제한하는 연산자

1. filter

받은 데이터가 조건에 맞는지 판정해 true인것만 통지하는 연산자.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .filter{ data ->
            data % 2L == 0L
        }

    flowable.subscribe(DebugSubscriber())
    Thread.sleep(3000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 6
RxComputationThreadPool-1: 8

2. distinct

통지하려는 데이터가 이미 통지된 데이터와 같다면 통지하지 않는 연산자. 내부의 HashSet을 이용하여 판별한다.

distinct()

fun main() {
    val flowable = Flowable.just("A","B","b","A","a","B","b")
        .distinct()

    flowable.subscribe(DebugSubscriber())
}

출력 결과

main: A
main: B
main: b
main: a
main: 완료

distinct(keySelector)

데이터를 비교하고 통지하며, 통지하는 데이터는 원본 데이터이다.

fun main() {
    val flowable = Flowable.just("A","B","b","A","a","B","b")
        .distinct{
            it.lowercase()
        }

    flowable.subscribe(DebugSubscriber())
}

출력 결과

main: A
main: B
main: 완료

3. distinctUntilChanged

같은 데이터를 연속으로 받을 때 이 데이터를 제외하고 통지하는 연산자.

distinctUntilChanged()

fun main() {
    val flowable = Flowable.just("A","a","a","A","a")
        .distinctUntilChanged()

    flowable.subscribe(DebugSubscriber())
}

출력 결과

main: A
main: a
main: A
main: a
main: 완료

distinctUntilChanged(comparer)

fun main() {
    val flowable = Flowable.just("1", "1.0", "0.1", "0.10", "1")
        .distinctUntilChanged { data1, data2 ->
            val convert1 = BigDecimal(data1)
            val convert2 = BigDecimal(data2)
            convert1.compareTo(convert2) == 0
        }
    flowable.subscribe(DebugSubscriber())
}

출력 결과

main: 1
main: 0.1
main: 1
main: 완료

4. take

지정한 개수나 기간에 도달할 때 까지 받은 데이터를 통지하는 연산자.

5. takeUntil

인자로 지정한 조건이 될 때 까지 데이터를 통지하는 연산자.

takeUntil(stopPredicate)

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .takeUntil { data ->
            data == 3L
        }
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 완료

takeUntil(other)

takeUntil 메서드의 인자 Flowable이 데이터를 통지할 때 까지 받은 데이터를 통지한다.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .takeUntil(
            Flowable.timer(1000L, TimeUnit.MILLISECONDS)
        )
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
RxComputationThreadPool-1: 완료
경과 시간 0 300 600 900 1000
원본 Flowable 0 1 2
인자 Flowable 0
결과 통지 0 1 2 완료

6. takeWhile

인자로 지정한 조건이 true일 동안 받은 데이터를 통지하는 연산자.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .takeWhile { data ->
            data != 3L
        }
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료

7. takeLast

원본 생산자가 완료될 때 마지막 데이터부터 지정한 개수나 지정한 기간의 데이터만을 세어 통지하는 연산자.

takeLast(count)

fun main() {
    val flowable = Flowable.interval(800L, TimeUnit.MILLISECONDS)
        .take(5)
        .takeLast(2)
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(5000L)
}

출력 결과

RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 완료

takeLast(count, time, unit)

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .take(10)
        .takeLast(2, 1000L, TimeUnit.MILLISECONDS)
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(4000L)
}

출력 결과

RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 9
RxComputationThreadPool-1: 완료
경과 시간 0 300 600 900 1200 1500 1800 2100 2400 2700 3000
원본 Flowable 0 1 2 3 4 5 6 7 8 9
지정 기간의 데이터 6 7 8 9
통지 대상의 데이터 8 9

완료 통지 전 1000밀리초동안 통지된 데이터 중 끝에서부터 2건을 통지한다.

8. skip

앞에서부터 지정한 만큼 데이터를 건너 뛴 후 나머지 데이터를 통지하는 연산자.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .skip(2)
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(1500L)
}

출력 결과

RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4

(앞의 2개 0과 1을 통지하지 않는다)

9. skipUntil

인자로 지정한 생산자가 데이터를 통지할때까지 결과를 통지하지 않는다.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .skipUntil (
            Flowable.timer(1000L,TimeUnit.MILLISECONDS)
            )
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-2: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-2: 5
경과 시간 0 300 600 900 1000 1200 1500 1800
원본 Flowable 0 1 2 3 4 5
인자 Flowable 0
통지 3 4 5

10. skipWhile

인자로 지정한 조건이 true일 때 데이터를 통지하지 않는 연산자.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .skipWhile{ data ->
            data != 3L
        }
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(2000L)
}

출력 결과

RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 5

데이터가 3이 되기 전까지 skip하였다.

11. skipLast

통지 데이터 중에서 끝에서부터 지정한 범위만큼 통지하지 않는 연산자.

fun main() {
    val flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS)
        .take(5)
        .skipLast(2)
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(6000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료

12. throttleFirst

데이터를 통지하고나면 지정 시간동안 통지되는ㄷ ㅔ이터는 파기하는 연산자로 처리가 완료될 때 까지 반복한다.
단기간에 들어오는 데이터가 모두 필요한 것이 아니라면 데이터를 쳐낼 수 있다.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .take(10)
        .throttleFirst(1000L, TimeUnit.MILLISECONDS)
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(6000L)
}

출력 결과

RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 완료

13. throttleLast/sample

인자로 받은 시간마다 가장 마지막에 통지되는 데이터만 통지하는 연산자.

throttleLast

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .take(9)
        .throttleLast(1000L, TimeUnit.MILLISECONDS)
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(3000L)
}

출력 결과

RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료
경과 시간 0 300 600 900 1000 1200 1500 1800 2000 2100 2400 2700
원본 Flowable 0 1 2 3 4 5 6 7 8
결과 데이터 2 5

sample

인자 Flowable이 데이터를 통지하는 시점에 가장 마지막에 받은 데이터를 통지한다.

fun main() {
    val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .take(9)
        .sample(
            Flowable.interval(1000L, TimeUnit.MILLISECONDS)
        )
    flowable.subscribe(DebugSubscriber())
    Thread.sleep(3000L)
}

출력 결과

RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료

14. throttleWithTomeout/debounce

원본 생산자에서 데이터를 받고나서 일정 기간 안에 다른 데이터를 받지 않으면 현재 받은 데이터를 통지하는 연산자.
지정 기간에 다음 데이터를 받으면 새로 데이터를 받은 시점부터 다시 지정 기간 안에 다음 데이터가 오는지 확인한다.

1

throttleWithTomeout

fun main() {
    val flowable = Flowable.create(
            { emitter: FlowableEmitter<String?> ->
                emitter.onNext("A")
                Thread.sleep(1000L)
                emitter.onNext("B")
                Thread.sleep(300L)
                emitter.onNext("C")
                Thread.sleep(300L)
                emitter.onNext("D")
                Thread.sleep(1000L)
                emitter.onNext("E")
                Thread.sleep(100L)

                emitter.onComplete()
            }, BackpressureStrategy.BUFFER)
            .throttleWithTimeout(500L, TimeUnit.MILLISECONDS)

    flowable.subscribe(DebugSubscriber())
}

출력 결과

RxComputationThreadPool-1: A
RxComputationThreadPool-1: D
main: E
main: 완료

데이터 A와 D를 받은 뒤 500밀리초동안 다음 데이터가 오지 않으므로 통지된다.
B와 C는 500밀리초 안에 다음 데이터가 오므로 통지되지 않는다.
마지막 E는 500밀리초를 기다리는 사이에 완료 통지를 받으므로 기다리지 않고 E와 완료를 통지받는다.

debounce

fun main() {
    val flowable = Flowable.create(
            { emitter: FlowableEmitter<String?> ->
                emitter.onNext("A")
                Thread.sleep(1000L)
                emitter.onNext("B")
                Thread.sleep(300L)
                emitter.onNext("C")
                Thread.sleep(300L)
                emitter.onNext("D")
                Thread.sleep(1000L)
                emitter.onNext("E")
                Thread.sleep(100L)

                emitter.onComplete()
            }, BackpressureStrategy.BUFFER)
            .debounce{
                Flowable.timer(500L, TimeUnit.MILLISECONDS)
            }

    flowable.subscribe(DebugSubscriber())
}

출력 결과

RxComputationThreadPool-1: A
RxComputationThreadPool-4: D
main: E
main: 완료

15. elementAt/elementAtOrError

지정한 위치의 데이터만을 통지하는 연산자.
통지할 데이터가 없을 때 처리 방식이 서로 다르다.
반환값은 Single 혹은 Maybe

fun main() {
    val maybe = Flowable.interval(100L, TimeUnit.MILLISECONDS)
        .elementAt(3)

    maybe.subscribe{ data ->
        println(data)
    }
    Thread.sleep(1000L)
}

출력 결과

3
저작자표시 (새창열림)

'RxJava' 카테고리의 다른 글

RxJava 05 - Processor/Subject  (0) 2022.02.16
RxJava 04 - Flowable과 Observable의 연산자 (part.04)  (0) 2022.01.25
RxJava 04 - Flowable과 Observable의 연산자 (part.02)  (0) 2022.01.18
RxJava 04 - Flowable과 Observable의 연산자 (part.01)  (0) 2022.01.18
RxJava 03 - RxJava의 매커니즘  (0) 2022.01.15
    'RxJava' 카테고리의 다른 글
    • RxJava 05 - Processor/Subject
    • RxJava 04 - Flowable과 Observable의 연산자 (part.04)
    • RxJava 04 - Flowable과 Observable의 연산자 (part.02)
    • RxJava 04 - Flowable과 Observable의 연산자 (part.01)
    Seoplee
    Seoplee
    개발공부를 하며 기록할만한 것들을 정리해놓은 블로그입니다.

    티스토리툴바