Seoplee
개발의 섭리, Seoplee의 개발
Seoplee
  • 분류 전체보기 (54)
    • Android (26)
      • Architecture (12)
      • Compose (0)
      • Tips (11)
      • 트러블슈팅 (3)
    • IOS (1)
      • Tips (1)
    • Kotlin (1)
    • Coroutine (3)
      • Flow (3)
    • RxJava (12)
    • CI&CD (1)
    • WEB (8)
    • Network (1)
    • ETC (1)
    • (임시) (0)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Seoplee

개발의 섭리, Seoplee의 개발

RxJava

RxJava 05 - Processor/Subject

2022. 2. 16. 21:54

Processor/Subject

Processor/Subject란

Processor는 Reactive Streams에서 정의한 Publisher 인터페이스와 Subscriber 인터페이스를 모두 상속하는 인터페이스.
Processor 인터페이스는 소비자로서 데이터를 받고 이 데이터를 생산자로서 통지하는 역할을 한다.
Processor가 Publisher를 구독하면 Publisher가 통지하는 데이터를 받는 소비자가 되며, 동시에 이 Processor를 Subscriber가 구독하면, Subscriber에 데이터를 통지하는 생산자가 된다. 즉 Processor가 생산자와 소비자 사이에서 통지의 중개 역할을 한다.

Processor 기본 예제

fun main() {
    val processor = PublishProcessor.create<Int>() // Processor를 생성한다.

    processor.subscribe(object : Subscriber<Int?> { // Processor를 구독한다.
        override fun onSubscribe(subscription: Subscription) {
            subscription.request(Long.MAX_VALUE)
        }

        override fun onNext(data: Int?) {
            println(data)
        }

        override fun onError(error: Throwable) {
            System.err.println("에러: $error")
        }

        override fun onComplete() {
            println("완료")
        }
    })

    //Processor에 데이터를 통지한다.
    processor.onNext(1);
    processor.onNext(2);
    processor.onNext(3);
}

이 구현은 멀티 스레드 환경에서 Processor에 데이터를 전달한다면 데이터를 통지하는 onNext 메서드가 동시 호출 되므로 정상적인 통지가 불가능하므로, 통지 메서드 호출에 주의를 기울여야 한다.

RxJava에서는 이 Processor를 구현한 추상 클래스로 FlowableProcessor와, 배압기능이 없는 Subject클래스를 제공한다.

Processor/Subject의 종류

클래스 설명
Publish Processor/Subject가 데이터를 받은 시점에 데이터를 통지한다.
Behavior Processor/Subject가 마지막으로 받은 데이터를 캐시하고 구독 시점에 캐시한 데이터를 바로 통지한다. 그 이후부터는 데이터를 받은 시점에 통지한다.
Replay Processor/Subject가 받은 모든 데이터를 캐시하고 구독 시점에 캐시한 데이터를 바로 통지한다. 그 이후부터는 데이터를 받은 시점에 통지한다.
Async Processor/Subject가 완료 통지를 받았을 때 마지막으로 받은 데이터만 통지한다.
Unicast 하나의 소비자만 구독한다.

FlowablePRocess/Subject의 메서드

반환값 타입 메서드 설명
boolean hasComplete() 완료가 통지되면 true를 반환한다.
boolean hasThrowable() 에러가 통지되면 true를 반환한다.
Throwable getThrowable() 에러가 통지되면 해당 에러 객체를 반환하고 통지되지 않으면 null을 반환한다.
boolean hasSubscribers()/hasObservers() 구독중인 Subscriber/Observer가 있으면 true를 반환하고 완료나 에러 통지 이후에는 false를 반환한다.

toSerialized

하나의 Processor/Subject는 서로 다른 스레드에서 동시에 통지하는것을 허용하지 않는다.
RxJava에서는 SerializedProcessor/SerializedSubject라는 클래스를 제공하여 여러 스레드에서 통시에 통지 메서드를 호출해도 안전한 스레드 처리를 할 수 있게 해준다.

PublishProcessor/PublishSubject

이미 통지한 데이터를 캐시하지 않고 구독한 뒤에 받은 데이터만 통지한다.

fun main() {
    val processor = PublishProcessor.create<Int>()

    processor.subscribe(DebugSubscriber("No.1"))

    processor.onNext(1)
    processor.onNext(2)
    processor.onNext(3)

    println("Subscriber No.2 추가")
    processor.subscribe(DebugSubscriber("--- No.2"))

    processor.onNext(4)
    processor.onNext(5)

    processor.onComplete()

    println("Subscriber No.3 추가")
    processor.subscribe(DebugSubscriber("------ No.3"))
}

출력 결과

main: No.1: 1
main: No.1: 2
main: No.1: 3
Subscriber No.2 추가
main: No.1: 4
main: --- No.2: 4
main: No.1: 5
main: --- No.2: 5
main: No.1: 완료
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 완료

BehaviorProcessor/BehaviorSubject

마지막으로 통지한 데이터를 캐시하고 구독시 캐시된 데이터를 소비자에게 통지하는 Processor/Subject.

fun main() {
    val processor = BehaviorProcessor.create<Int>()

    processor.subscribe(DebugSubscriber("No.1"))

    processor.onNext(1)
    processor.onNext(2)
    processor.onNext(3)

    println("Subscriber No.2 추가")
    processor.subscribe(DebugSubscriber("--- No.2"))

    processor.onNext(4)
    processor.onNext(5)

    processor.onComplete()

    println("Subscriber No.3 추가")
    processor.subscribe(DebugSubscriber("------ No.3"))
}

출력 결과

main: No.1: 1
main: No.1: 2
main: No.1: 3
Subscriber No.2 추가
main: --- No.2: 3
main: No.1: 4
main: --- No.2: 4
main: No.1: 5
main: --- No.2: 5
main: No.1: 완료
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 완료

ReplayProcessor/ReplaySubject

통지한 데이터를 모두 또는 지정한 범위까지 캐시하고 구독 시점에 바로 캐시덴 되이터를 통지하는 Processor/Subject.
이미 완료한 후 구독하면 캐시된 모든 데이터와 완료/에러를 통지한다.

fun main() {
    val processor = ReplayProcessor.create<Int>()

    processor.subscribe(DebugSubscriber("No.1"))

    processor.onNext(1)
    processor.onNext(2)
    processor.onNext(3)

    println("Subscriber No.2 추가")
    processor.subscribe(DebugSubscriber("--- No.2"))

    processor.onNext(4)
    processor.onNext(5)

    processor.onComplete()

    println("Subscriber No.3 추가")
    processor.subscribe(DebugSubscriber("------ No.3"))
}

출력 결과

main: No.1: 1
main: No.1: 2
main: No.1: 3
Subscriber No.2 추가
main: --- No.2: 1
main: --- No.2: 2
main: --- No.2: 3
main: No.1: 4
main: --- No.2: 4
main: No.1: 5
main: --- No.2: 5
main: No.1: 완료
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 1
main: ------ No.3: 2
main: ------ No.3: 3
main: ------ No.3: 4
main: ------ No.3: 5
main: ------ No.3: 완료

AsyncProcessor/AsyncSubject

완료할 때까지 아무것도 통지하지 않다가 완료했을 때 마지막으로 통지한 데이터와 완료만 통지하는 Processor/Subject.

fun main() {
    val processor = AsyncProcessor.create<Int>()

    processor.subscribe(DebugSubscriber("No.1"))

    processor.onNext(1)
    processor.onNext(2)
    processor.onNext(3)

    println("Subscriber No.2 추가")
    processor.subscribe(DebugSubscriber("--- No.2"))

    processor.onNext(4)
    processor.onNext(5)

    processor.onComplete()

    println("Subscriber No.3 추가")
    processor.subscribe(DebugSubscriber("------ No.3"))
}

출력 결과

Subscriber No.2 추가
main: No.1: 5
main: No.1: 완료
main: --- No.2: 5
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 5
main: ------ No.3: 완료

UnicastProcessor/UnicastSubject

1개의 소비자만 구독할 수 있는 Processor/Subject.
다른 소비자가 구독하면 IllegalStateException 에러를 통지한다.

fun main() {
    val processor = UnicastProcessor.create<Int>()

    processor.onNext(1)
    processor.onNext(2)

    println("Subscriber No.1 추가")
    processor.subscribe(DebugSubscriber("No.1"))

    println("Subscriber No.2 추가")
    processor.subscribe(DebugSubscriber("--- No.1"))

    processor.onNext(3)

    processor.onComplete()
}

출력 결과

Subscriber No.1 추가
main: No.1: 1
main: No.1: 2
Subscriber No.2 추가
main: --- No.1: 에러= java.lang.IllegalStateException: This processor allows only a single Subscriber
main: No.1: 3
main: No.1: 완료
저작자표시 (새창열림)

'RxJava' 카테고리의 다른 글

RxJava 안드로이드 프로젝트에 적용하기 01 (with Sunflower)  (0) 2022.04.05
RxJava 06 - RxJava의 디버깅과 테스트  (0) 2022.02.16
RxJava 04 - Flowable과 Observable의 연산자 (part.04)  (0) 2022.01.25
RxJava 04 - Flowable과 Observable의 연산자 (part.03)  (0) 2022.01.18
RxJava 04 - Flowable과 Observable의 연산자 (part.02)  (0) 2022.01.18
    'RxJava' 카테고리의 다른 글
    • RxJava 안드로이드 프로젝트에 적용하기 01 (with Sunflower)
    • RxJava 06 - RxJava의 디버깅과 테스트
    • RxJava 04 - Flowable과 Observable의 연산자 (part.04)
    • RxJava 04 - Flowable과 Observable의 연산자 (part.03)
    Seoplee
    Seoplee
    개발공부를 하며 기록할만한 것들을 정리해놓은 블로그입니다.

    티스토리툴바