Processor/Subject
Processor/Subject란
Processor는 Reactive Streams에서 정의한 Publisher 인터페이스와 Subscriber 인터페이스를 모두 상속하는 인터페이스.
Processor 인터페이스는 소비자로서 데이터를 받고 이 데이터를 생산자로서 통지하는 역할을 한다.
Processor가 Publisher를 구독하면 Publisher가 통지하는 데이터를 받는 소비자가 되며, 동시에 이 Processor를 Subscriber가 구독하면, Subscriber에 데이터를 통지하는 생산자가 된다. 즉 Processor가 생산자와 소비자 사이에서 통지의 중개 역할을 한다.
Processor 기본 예제
fun main() {
val processor = PublishProcessor.create<Int>() // Processor를 생성한다.
processor.subscribe(object : Subscriber<Int?> { // Processor를 구독한다.
override fun onSubscribe(subscription: Subscription) {
subscription.request(Long.MAX_VALUE)
}
override fun onNext(data: Int?) {
println(data)
}
override fun onError(error: Throwable) {
System.err.println("에러: $error")
}
override fun onComplete() {
println("완료")
}
})
//Processor에 데이터를 통지한다.
processor.onNext(1);
processor.onNext(2);
processor.onNext(3);
}
이 구현은 멀티 스레드 환경에서 Processor에 데이터를 전달한다면 데이터를 통지하는 onNext 메서드가 동시 호출 되므로 정상적인 통지가 불가능하므로, 통지 메서드 호출에 주의를 기울여야 한다.
RxJava에서는 이 Processor를 구현한 추상 클래스로 FlowableProcessor와, 배압기능이 없는 Subject클래스를 제공한다.
Processor/Subject의 종류
클래스 | 설명 |
---|---|
Publish | Processor/Subject가 데이터를 받은 시점에 데이터를 통지한다. |
Behavior | Processor/Subject가 마지막으로 받은 데이터를 캐시하고 구독 시점에 캐시한 데이터를 바로 통지한다. 그 이후부터는 데이터를 받은 시점에 통지한다. |
Replay | Processor/Subject가 받은 모든 데이터를 캐시하고 구독 시점에 캐시한 데이터를 바로 통지한다. 그 이후부터는 데이터를 받은 시점에 통지한다. |
Async | Processor/Subject가 완료 통지를 받았을 때 마지막으로 받은 데이터만 통지한다. |
Unicast | 하나의 소비자만 구독한다. |
FlowablePRocess/Subject의 메서드
반환값 타입 | 메서드 | 설명 |
---|---|---|
boolean | hasComplete() | 완료가 통지되면 true를 반환한다. |
boolean | hasThrowable() | 에러가 통지되면 true를 반환한다. |
Throwable | getThrowable() | 에러가 통지되면 해당 에러 객체를 반환하고 통지되지 않으면 null을 반환한다. |
boolean | hasSubscribers()/hasObservers() | 구독중인 Subscriber/Observer가 있으면 true를 반환하고 완료나 에러 통지 이후에는 false를 반환한다. |
toSerialized
하나의 Processor/Subject는 서로 다른 스레드에서 동시에 통지하는것을 허용하지 않는다.
RxJava에서는 SerializedProcessor/SerializedSubject라는 클래스를 제공하여 여러 스레드에서 통시에 통지 메서드를 호출해도 안전한 스레드 처리를 할 수 있게 해준다.
PublishProcessor/PublishSubject
이미 통지한 데이터를 캐시하지 않고 구독한 뒤에 받은 데이터만 통지한다.
fun main() {
val processor = PublishProcessor.create<Int>()
processor.subscribe(DebugSubscriber("No.1"))
processor.onNext(1)
processor.onNext(2)
processor.onNext(3)
println("Subscriber No.2 추가")
processor.subscribe(DebugSubscriber("--- No.2"))
processor.onNext(4)
processor.onNext(5)
processor.onComplete()
println("Subscriber No.3 추가")
processor.subscribe(DebugSubscriber("------ No.3"))
}
출력 결과
main: No.1: 1
main: No.1: 2
main: No.1: 3
Subscriber No.2 추가
main: No.1: 4
main: --- No.2: 4
main: No.1: 5
main: --- No.2: 5
main: No.1: 완료
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 완료
BehaviorProcessor/BehaviorSubject
마지막으로 통지한 데이터를 캐시하고 구독시 캐시된 데이터를 소비자에게 통지하는 Processor/Subject.
fun main() {
val processor = BehaviorProcessor.create<Int>()
processor.subscribe(DebugSubscriber("No.1"))
processor.onNext(1)
processor.onNext(2)
processor.onNext(3)
println("Subscriber No.2 추가")
processor.subscribe(DebugSubscriber("--- No.2"))
processor.onNext(4)
processor.onNext(5)
processor.onComplete()
println("Subscriber No.3 추가")
processor.subscribe(DebugSubscriber("------ No.3"))
}
출력 결과
main: No.1: 1
main: No.1: 2
main: No.1: 3
Subscriber No.2 추가
main: --- No.2: 3
main: No.1: 4
main: --- No.2: 4
main: No.1: 5
main: --- No.2: 5
main: No.1: 완료
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 완료
ReplayProcessor/ReplaySubject
통지한 데이터를 모두 또는 지정한 범위까지 캐시하고 구독 시점에 바로 캐시덴 되이터를 통지하는 Processor/Subject.
이미 완료한 후 구독하면 캐시된 모든 데이터와 완료/에러를 통지한다.
fun main() {
val processor = ReplayProcessor.create<Int>()
processor.subscribe(DebugSubscriber("No.1"))
processor.onNext(1)
processor.onNext(2)
processor.onNext(3)
println("Subscriber No.2 추가")
processor.subscribe(DebugSubscriber("--- No.2"))
processor.onNext(4)
processor.onNext(5)
processor.onComplete()
println("Subscriber No.3 추가")
processor.subscribe(DebugSubscriber("------ No.3"))
}
출력 결과
main: No.1: 1
main: No.1: 2
main: No.1: 3
Subscriber No.2 추가
main: --- No.2: 1
main: --- No.2: 2
main: --- No.2: 3
main: No.1: 4
main: --- No.2: 4
main: No.1: 5
main: --- No.2: 5
main: No.1: 완료
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 1
main: ------ No.3: 2
main: ------ No.3: 3
main: ------ No.3: 4
main: ------ No.3: 5
main: ------ No.3: 완료
AsyncProcessor/AsyncSubject
완료할 때까지 아무것도 통지하지 않다가 완료했을 때 마지막으로 통지한 데이터와 완료만 통지하는 Processor/Subject.
fun main() {
val processor = AsyncProcessor.create<Int>()
processor.subscribe(DebugSubscriber("No.1"))
processor.onNext(1)
processor.onNext(2)
processor.onNext(3)
println("Subscriber No.2 추가")
processor.subscribe(DebugSubscriber("--- No.2"))
processor.onNext(4)
processor.onNext(5)
processor.onComplete()
println("Subscriber No.3 추가")
processor.subscribe(DebugSubscriber("------ No.3"))
}
출력 결과
Subscriber No.2 추가
main: No.1: 5
main: No.1: 완료
main: --- No.2: 5
main: --- No.2: 완료
Subscriber No.3 추가
main: ------ No.3: 5
main: ------ No.3: 완료
UnicastProcessor/UnicastSubject
1개의 소비자만 구독할 수 있는 Processor/Subject.
다른 소비자가 구독하면 IllegalStateException 에러를 통지한다.
fun main() {
val processor = UnicastProcessor.create<Int>()
processor.onNext(1)
processor.onNext(2)
println("Subscriber No.1 추가")
processor.subscribe(DebugSubscriber("No.1"))
println("Subscriber No.2 추가")
processor.subscribe(DebugSubscriber("--- No.1"))
processor.onNext(3)
processor.onComplete()
}
출력 결과
Subscriber No.1 추가
main: No.1: 1
main: No.1: 2
Subscriber No.2 추가
main: --- No.1: 에러= java.lang.IllegalStateException: This processor allows only a single Subscriber
main: No.1: 3
main: No.1: 완료
'RxJava' 카테고리의 다른 글
RxJava 안드로이드 프로젝트에 적용하기 01 (with Sunflower) (0) | 2022.04.05 |
---|---|
RxJava 06 - RxJava의 디버깅과 테스트 (0) | 2022.02.16 |
RxJava 04 - Flowable과 Observable의 연산자 (part.04) (0) | 2022.01.25 |
RxJava 04 - Flowable과 Observable의 연산자 (part.03) (0) | 2022.01.18 |
RxJava 04 - Flowable과 Observable의 연산자 (part.02) (0) | 2022.01.18 |