RxJava Chapter 03 - RxJava의 매커니즘
RxJava와 디자인 패턴
RxJava는 옵저버 패턴, 이터레이터 패턴 두가지에 영향을 받아서 작성되었음.
Observer 패턴
관찰 대상 객체의 상태 변화가 발생하면 관찰하는 객체가 변화에 따른 작업을 처리하는 디자인 패턴
옵저버 패턴의 클래스
클래스 | 상세 |
---|---|
Subject | 관찰 대상을 나타내는 클래스. 이 클래스에 Observer를 추가,삭제할 수 있으며 상태 변화 발생시 통지할 수 있다. |
Observer | 변화가 발생했을 때 이를 처리하는 메서드를 가진 인터페이스 |
ConcreteSubject | Subject 상속 클래스로, 실제 변화가 이 클래스에 나타나며 변화가 발생했을 때 통지메서드를 통해 Observer에 통지한다. |
ConcreteObserver | Observer의 구현 클래스로, 통지가 발생했을 때 처리할 내용을 구현한다. |
Subject 메서드
메서드 | 상세 |
---|---|
addObserver(Observer) | 변화를 관찰하는 Observer를 등록 |
deleteObserver(Observer) | 등록된 Observer를 리스트에서 제거 |
notifyObservers() | 변화가 발생하면 Observer에 통지 |
Observer 메서드
메서드 | 상세 |
---|---|
update | 변화가 발생했다는 통지를 받으면 수행. 실제 처리 내용은 Observer인터페이스 구현체에 구현 |
RxJava의 생산자-소비자의 관계는 옵저버 패턴이 적용돼었다. (Subejct 생산자, Observer 소비자)
이터레이터 패턴
RxJava의 실제 구현은 이터레이터 패턴과는 다른 구조를 지니고 있으나, 데이터집합체에서 데이터를 꺼내어 쓴다는 점에서 유사한 개념을 지닌다.
이터레이터 패턴의 클래스
클래스 | 상세 |
---|---|
Aggregate | 데이터를 담고 있는 집합체 인터페이스 |
Iterator | 데이터를 순서대로 받는 인터페이스 |
ConcreteAggregate | 실제 데이터를 넣어두는 데이터 집합체 구현 클래스 |
ConcreteIterator | 실제 데이터를 받는 이터레이터 구현 클래스. |
Aggregate 메서드
메서드 | 상세 |
---|---|
iterator | Iterator를 생성한다 |
Iterator 메서드
메서드 | 상세 |
---|---|
hasNext | next 메서드로 받을 수 있는 데이터가 남아있으면 true 반환 |
이터레이터 기본 사용 예제
fun main() {
val list: List<String> = listOf("a", "b", "c")
val iterator = list.iterator()
while (iterator.hasNext()) {
val value = iterator.next()
println(value)
}
}
출력결과
a
b
c
Flowable 사용 예제
fun main() {
val list: List<String> = listOf("a", "b", "c")
val flowable = Flowable.fromIterable(list)
flowable.subscribe{ println(it) }
}
비동기 처리
RxJava에서는 비동기 처리에 필요한 API를 제공하며, 용도별로 적절히 스레드를 관리하는 클래스를 제공해 직접 스레드를 관리하지 않아도 된다.
RxJava에서는 데이터를 통지하는 측이 데이터를 처리하는측의 속도에 영향을 받는다.
데이터를 받는 측이 무거운 처리 작업을 하는 예제
fun main() {
Flowable.interval(1000L, TimeUnit.MILLISECONDS)
.doOnNext {
println("emit: ${System.currentTimeMillis()} 밀리 초 $it")
}
.subscribe {
Thread.sleep(2000L) // 무거운 처리 작업 가정
}
Thread.sleep(6000L)
}
출력 결과
emit: 1641868079563 밀리 초 0
emit: 1641868081569 밀리 초 1
emit: 1641868083580 밀리 초 2
이처럼 데이터를 처리하는 속도의 영향을 받아 약 2000밀리초마다 데이터를 통지하는것을 확인할 수 있다.
RxJava가 제공하는 interval 메서드처럼 시간을 다루는 메서드가 생성한 Flowable/Observable은 데이터를 받는 측의 속도가 어느정도 느려도 데이터를 처리하는 속도보다 느리지 않으면 내부에서 시간 간격을 조정해 적절하게 통지한다.
interval 메서드로 생선한 Flowable 예제
fun main() {
Flowable.interval(1000L, TimeUnit.MILLISECONDS)
.doOnNext {
println("emit: ${System.currentTimeMillis()} 밀리 초 $it")
}
.subscribe {
Thread.sleep(500L) // 무거운 처리 작업 가정
}
Thread.sleep(3000L)
}
출력결과
emit: 1641868383416 밀리 초 0
emit: 1641868384427 밀리 초 1
emit: 1641868385415 밀리 초 2
데이터 통지 가격이 약 1000밀리초인것을 보아 데이터 받는 측의 500밀리초 지연에 영향을 받지 않는다.
단 이는 interval등이 제공하는 기능일 뿐, 개발자가 직접 create로 통지처리를 구현할때에는 받는 측의 처리 속도에 영향을 받지 않게 해야 한다.
또한 Flowable/Observable을 생성하는 메서드에 따라 스레드가 달라지며, just,from등 이미 생성된 데이터를 통지하는 경우 메인스레드에서, timer나 interval 같이 시간과 관련된 처리 작업을 하는 경우 다른 스레드에서 (비동기적으로) 동작한다.
RxJava는 데이터를 통지하는 측과 받는 측의 처리 작업이 논리적으로 분리돼어 있어, 처리 작업을 각각 서로 다른 스레드에서 실행할 수 있게 한다.
RxJava에서 비동기 처리를 구현할 때에는 생산자가 스레드를 모두 관리해야하며, 이를 위해 subscribeOn, observeOn 메서드를 제공한다. 또한 스레드 종류를 관리하는 스케쥴러를 지정한다.
스케줄러
RxJava에서 제공하는 스레드를 관리하는 클래스.
메서드 | 상세 |
---|---|
computation | 연산처리를 할 때 사용하는 스케줄러. 논리 프로세서의 수와 같은 수 만큼 스레드를 캐시한다. i/o 작업에는 사용할 수 없다. |
io | I/O 작업을 처리할때 사용하는 스케줄러. 스레드 풀에서 스레드를 가져오며 필요에 따라 새로운 스레드 생성 |
single | 싱글 스레드에서 작업을 할 때 사용하는 스케줄러 |
newThread | 매번 새로운 스레드를 생성하는 스케줄러. |
from(Executor executor) | 지정한 Executor가 생성한 스레드에서 작업을 수행하는 스케줄러 |
trampoline | 현재 스레드의 큐에 처리 작업을 넣는 스케줄러 |
RxJava는 스케줄러를 별도 생성하지않고, interval,timer등의 메서드를 사용하지 않으면 기본스레드에서 처리 작업을 수행한다.
그러므로 데이터 통지와 받은 데이터 처리를 어떤 스케줄러에서 실행할 것인지 설정할 때는 적절한 스케줄러를 subscribeOn, observeOn 메서드로 지정해야 한다.
subscribeOn 메서드
생산자의 처리 작업을 어떤 스케줄러에서 실행할지 결정하는 메서드. 최초 1회만 설정할 수 있다.
스케줄러 설정 예시
fun main() {
Flowable.just(1, 2, 3, 4, 5) // Flowable 설정
.subscribeOn(Schedulers.computation()) // RxComputationThreadPool
.subscribeOn(Schedulers.io()) // RxCachedThreadScheduler
.subscribeOn(Schedulers.single()) // RxSingleScheduler
.subscribe {
val threadName = Thread.currentThread().name
println("$threadName: $it")
}
Thread.sleep(500)
}
출력 결과
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 5
observeOn 메서드
소비자의 처리 작업을 어떤 스케줄러에서 실행할지 결정하는 메서드. observeOn메서드가 데이터를 받는 측의 스케줄러를 지정하므로, 연산자마자 다른 스케줄러를 지정할 수 있다.
또한 observeOn의 세번째 인자는 배압을 적용할 때, 버퍼에 담긴 통지 대기 데이터에서 인자 크기만큼 데이터를 꺼낸다.
observeOn 메서드로 bufferSize를 지정하는 예제
fun main() {
val flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS).onBackpressureDrop() // BackpressureMode.DROP을 설정했을 때와 마찬가지로 동작한다
flowable // 비동기로 데이터를 받게 하고, 버퍼 크기를 2로 설정한다
.observeOn(Schedulers.computation(), false, 2)
.subscribe(object : ResourceSubscriber<Long?>() {
// 데이터를 받을 때의 처리
override fun onNext(item: Long?) {
try {
Thread.sleep(1000L)
} catch (e: InterruptedException) {
e.printStackTrace()
exitProcess(1)
}
val threadName = Thread.currentThread().name
println("$threadName: $item")
}
override fun onComplete() {
println("완료")
}
override fun onError(error: Throwable) {
println("에러=$error")
}
})
Thread.sleep(7000L)
}
출력 결과
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 7
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 14
RxComputationThreadPool-1: 15
2건씩 데이터를 통지하고 있다.
연산자 내에서 생성되는 비동기 Flowable/Observable
RxJava의 메서드 중에는 연산자 내부에서 Flowable/Observable를 생성하고, 시작 한 뒤 데이터를 통지하는 메서드가 존재한다.
이 때 생성한 Flowable/Observable를 별도의 스레드에서 실행하면 데이터를 받아 생성한 Flowable/Observable이 시작 될 때 까지는 flatMap 메서드가 데이터를 받은 순서대로 실행되지만, 일단 Flowable/Observable이 시작되면, 그 뒤로는 각자 다른 스레드에서 작업을 수행한다.
즉 사용하는 메서드에 따라 여러 Flowable/Observable를 서로 다른 스레드에서 동시에 실행한다.
flatMap 메서드
데이터를 받으면 새로운 Flowable/Observable를 생성하고 실행하여 여기에서 통지되는 데이터를 메서드의 결과물로 통지하는 연산자.
flatMap 예제
fun main() {
val flowable = Flowable.just("A", "B", "C") // flatMap으로 생성한다.
.flatMap {
Flowable.just(it).delay(1000L, TimeUnit.MILLISECONDS)
}
flowable.subscribe {
val threadName = Thread.currentThread().name
println("$threadName: $it")
}
Thread.sleep(2000L)
}
- flatMap 메서드에서 받은 데이터로 Flowable을 생성하고, 생성한 Flowable이 통지하는 데이터를 결과로 통지한다.
- delay 메서드로 Flowable이 1000밀리초 늦게 데이터를 통지하게 한다.
출력 결과
RxComputationThreadPool-3: C
RxComputationThreadPool-1: A
RxComputationThreadPool-1: B
통지된 데이터 순서와 데이터를 받은 순서가 다르다.
그러므로 실행 순서와 상관없이 처리 성능이 중요할 때에는 flatMap 메서드를 사용할 수 있다.
concatMap 메서드
받은 데이터로 메서드 내부에 Flowable/Observable을 생성하고 하나씩 순서대로 실행해 통지하는 연산자.
concatMap 예제
fun main() {
val flowable = Flowable.just("A", "B", "C") // concatMap으로 생성한다.
.concatMap {
Flowable.just(it).delay(1000L, TimeUnit.MILLISECONDS)
}
flowable.subscribe {
val threadName = Thread.currentThread().name
val time: String = LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"))
println("$threadName: data=$it, time=$time")
}
Thread.sleep(4000L)
}
- 받은 데이터로 concatMap 메서드에서 Flowable을 생성하고 이 Flowable을 시작해 통지되는 데이터를 결과로 통지한다.
- delay 메서드로 Flowable이 데이터를 1000밀리초 늦게 통지하게 한다.
출력 결과
RxComputationThreadPool-1: data=A, time=05.486
RxComputationThreadPool-2: data=B, time=06.502
RxComputationThreadPool-3: data=C, time=07.506
원본 데이터를 통지 순서대로 통지하고, 간격은 약 1000밀리초인것을 확인할 수 있다.
따라서 데이터 성능보다는 순서가 중요할 때 concatMap 메서드를 사용할 수 있다.
concatMapEager 메서드
데이터를 받으면 새로운 Flowable/Observable을 생성하고 즉시 실행해 원본 데이터 순서대로 통지한다.
이 때 Flowable/Observable이 서로 다른 스레드에서 실행된다면 생성한 Flowable/Observable은 flatMap 처럼 동시에 실행된다.
하지만 결과는 concatMap처럼 원본 순서대로 통지된다.
concatMapEager 예제
fun main() {
val flowable = Flowable.just("A", "B", "C") // concatMapEager로 생성한다.
.concatMapEager {
Flowable.just(it).delay(1000L, TimeUnit.MILLISECONDS)
}
flowable.subscribe {
val threadName = Thread.currentThread().name
val time =
LocalTime.now().format(DateTimeFormatter.ofPattern("ss.SSS"))
println("$threadName: data=$it, time=$time")
}
Thread.sleep(2000L)
}
출력 결과
RxComputationThreadPool-3: data=A, time=16.804
RxComputationThreadPool-3: data=B, time=16.811
RxComputationThreadPool-3: data=C, time=16.811
원본 데이터 순서대로 데이터를 통지하며, 거의 동시에 데이터가 통지된다. 데이터 순서와 성능 모두가 중요하다면 concatMapEager를 사용할 수 있지만, 통지 전까지 데이터를 버퍼에 쌓아야 하므로 대량의 데이터가 버퍼에 쌓여 메모리 부족이 일어날 수 있다.
다른 스레드 간 공유되는 객체 문제
생산자와 소비자 사이가 아닌 외부에서도 공유되는 객체를 다룰 때에는 순차성을 잃을 수 있다.
각 생산자와 소비자 사이에서 처리 작업을 순차적으로 실행하지만, 두 소비자가 동시에 공유객체에 접근할 때는 순차적으로 접근하지 않는다.
이에 대한 대책의 하나로 RxJava는 여러개의 Flowable/Observable을 한 Flowable/Observable로 결합하여 새로운 Flowable을 생성하는 merge 메서드를 제공한다.
merge 메서드로 결합하는 예제
fun main() {
val counter = Counter()
val source1 = Flowable.range(1, 10000)
.subscribeOn(Schedulers.computation()) // 다른 스레드에서 처리 작업을 하게 한다
.observeOn(Schedulers.computation())
// Counter의 increment 메서드를 10,000번 호출한다(다른 스레드에서 동시에 실행)
val source2 = Flowable.range(1, 10000)
.subscribeOn(Schedulers.computation()) // 다른 스레드에서 처리 작업을 하게 한다
.observeOn(Schedulers.computation())
Flowable.merge(source1, source2) // merge를 이용하여 합쳐서 구독한다
.subscribe(
{ counter.increment() }, // 데이터를 받을 때의 처리
{ error -> println("에러=$error") } // 에러를 받을 때의 처리
)
{ println("counter.get()=" + counter.get()) } // 완료 통지를 받을 때의 처리
Thread.sleep(1000L)
}
출력 결과
counter.get()=20000
에러 처리
RxJava는 다음과 같이 세가지의 에러처리를 제공한다
- 소비자에게 에러 통지하기
에러를 통지해 에러에 대응하는 매커니즘. 명시적으로 통지 기능을 구현하지 않아도 소비자에게 발생한 에러를 통지한다. subscrbie 메서드로 구독할 때에는 별도의 에러 처리를 하지 않으면 stackTrace만 출력하므로 주의해야 한다. - 처리 작업 재시도
순간적인 네트워크 중단과 같이 재실행으로 정상적인 처리가 가능한 경우, retry 메서드를 사용하여 재시도할 수 있다. 재시도 횟수를 제한하여 사용하는것이 좋다. - 대체 데이터 통지
에러 발생시 대체 데이터를 통지하여 에러가 아니라 완료 통지를 통해 정상적으로 종료하는 방법. 상황에 따라 대체데이터를 통지하거나, 에러를 통지할 수 있다. onError, onException등의 메서드가 존재한다.
리소스 관리
DB 커넥션이나, 파일과 같은 리소스를 얻어 처리 작업을 진행한 뒤 리소스를 해제해야 한다. RxJava는 Flowable/Observable이 구독될 때 리소스를 얻고, 완료 및 에러 통지시 리소스를 해체하여, 리소스의 lifeCycle을 Flowable/Observable의 lifeCycler과 맞출 수 있다.
using 메서드
리소스의 lifeCycle에 맞춘 Flowable/Observable을 생성할 수 있다.
FlowableEmitter/ObservableEmitter
Flowable/Observable의 create 메서드 내부에서 사용하며 Cancellable, Disposable 메서드를 통해 리소스를 해제할 수 있다.
Cancellable 인터페이스 작성 예제
interface Cancellable {
fun cancel()
}
Cancellable 예제
emitter.setCancellable { resource.close() }
메서드가 하나만 존재하므로 인터페이스를 작성하지 않고 바로 구현할 수 있다.
Disposable 인터페이스 작성 예제
interface Disposable {
fun dispose() // 파기시 처리
fun isDisposed(): Boolean // 파기됐으면 true 반환
}
setDisposable 메서드로 해당 Disposable을 설정하여 완료, 에러 통지 혹은 구독 해제시 dispose 메서드를 실행할 수 있다.
배압
Flowable과 데이터를 받는 측이 서로 다른 스레드에서 처리할 때 데이터를 통지하는 속도가 데이터를 받는 측의 속도보다 빠를 때, 데이터가 점점 쌓여 메모리 부족이 일어날 수 있어 배압설정을 통해 대량 데이터가 처리 대기 상태가 되는것을 피할 수 있게 한다.
배압의 처리 흐름
- 데이터를 받는 측이 지정한 개수만큼한 데이터를 통지하게끔 Flowable에 요청한다.
- Flowable은 요청받은 개수만큼만 데이터를 통지한다.
- Flowable이 데이터를 통지한 후 통지를 멈춘다. 그러나 RxJava에서는 데이터 통지만 하지 않을 뿐 데이터 생산을 계속한다.
- 데이터를 받는 측이 마지막 데이터를 처리하면 다시 데이터를 요청한다.
- Flowable은 요청한만큼 데이터를 다시 통지하기 시작한다.
이처럼 모든 데이터 통지가 끝날 때까지 생산자는 요청받은만큼 데이터를 통지하고, 소비자는 요청한 만큼 데이터를 처리하고 다시 요청하는 작업을 반복한다. 이 때 Flowable이 통지를 기다리는 데이터를 다루는 방법에 관하여 BackpressureStrategy에 설정할 수 있다.
BackpressureStrategy 종류
- BUFFER: 모든 데이터를 버퍼에 쌓는다
- DROP: 통지 대기 데이터는 모두 파기한다.
- LATEST: 마지막으로 통지한 데이터만 버퍼에 쌓는다.
- ERROR: 지정한 수 만큼 버퍼에 쌓고 넘으면 에러처리한다.
- NONE: 특정 처리 작업을 실행하지 않는다.
Request 메서드
통지할 데이터 개수를 요청하는 메서드
observeOn 메서드와 배압
Flowable이 사용하는 스레드와 데이터 처리 스레드가 다르면 observeOn 메서드로 스케줄러를 설정하는데, 이 때 Flowable에 데이터를 요청하여 사용하며, bufferSize를 지정하여 요청할 수 있다. (기본값 128)
이때 Subscriber는 원본 Flowable이 아니라 지정한 bufferSize를 쌓아 놓은 새로운 Flowable에 데이터를 요청한다.
MissingBackpressureException
Flowable이 버퍼에 쌓아둔 데이터가 너무 많으면 MissingBackpressureException 에러를 통지한다.
'RxJava' 카테고리의 다른 글
RxJava 04 - Flowable과 Observable의 연산자 (part.02) (0) | 2022.01.18 |
---|---|
RxJava 04 - Flowable과 Observable의 연산자 (part.01) (0) | 2022.01.18 |
RxJava 02 - RxJava를 사용하는 데 필요한 배경 지식 (0) | 2022.01.13 |
RxJava 01 - (번외) Hot Observable 예제 in Android (0) | 2022.01.04 |
RxJava 01 - RxJava란 무엇인가? (0) | 2021.12.28 |