RxJava의 디버깅과 테스트
디버깅과 테스트
생산자가 소비자에게 통지하는 데이터는 기본적으로 외부 접근이 불가능하여 완료/에러 통지를 받아야 비로소 어떤 데이터가 어떻게 바뀌었는지 외부에서 확인할 수 있기 때문에 디버깅/테스트 하는것이 어렵다.
더욱이 비동기 처리는 여러 처리가 동시에 실행되어 실행때마다 결과가 달라질 수 있기 때문에 디버깅/테스트시 고려해야 한다.
RxJava에서는 디버깅이나 테스트를 쉽게할 수 있는 여러 메서드와 클래스를 제공한다.
'do'로 시작하는 메서드
RxJava는 Flowable/Observable이 통지할 때나 통지한 이후에 특정한 부가 작용이 발생하는 메서드를 제공한다.
이런 메서드는 이름이 'doOn'이나 'doAfter'로 시작하는데 상당수는 통지 시에 처리할 내용을 정의한 함수형 인터페이스를 인자로 전달받는다.
이 때 해당 함수형 인터페스의 메서드에 반환값이 없어서 부가 작용이 발생한다.
doOnNext
데이터 통지시에 지정한 처리 작업을 실행하는 메서드.
Flowable/Observable이 데이터를 통지하는 시점에 인자로 전달받은 함수형 인터페이스르 실행한다.
fun main() {
val flowable = Flowable.range(1,5).doOnNext { data ->
println("--- 기존 데이터 $data")
}.filter { data ->
data % 2 == 0
}.doOnNext{ data ->
println("----- filter 적용 후 데이터 $data")
}.subscribe(DebugSubscriber())
}
출력 결과
--- 기존 데이터 1
--- 기존 데이터 2
----- filter 적용 후 데이터 2
main: 2
--- 기존 데이터 3
--- 기존 데이터 4
----- filter 적용 후 데이터 4
main: 4
--- 기존 데이터 5
main: 완료doOnComplete
완료 통지 시 지정한 처리 작업을 실행하는 메서드.
fun main() {
val flowable = Flowable.range(1,5).doOnComplete {
println("doOnComplete")
}.subscribe(DebugSubscriber())
}
출력 결과
main: 1
main: 2
main: 3
main: 4
main: 5
doOnComplete
main: 완료doOnError
에러를 통지하면 지정한 처리를 실행하는 메서드.
fun main() {
val flowable = Flowable.range(1,5)
.doOnError { error ->
println("기존 데이터: ${error.message}")
}.map { data ->
if (data == 3) throw Exception("예외 발생")
data
}.doOnError { error ->
println("--- map 적용 후: ${error.message}")
}.subscribe(DebugSubscriber())
}
출력 결과
main: 1
main: 2
--- map 적용 후: 예외 발생
main: 에러= java.lang.Exception: 예외 발생doOnSubscribe
구독 시작시 지정한 처리를 실행하는 메서드.
fun main() {
Flowable.range(1, 5)
.doOnSubscribe {
println("doOnSubscribe")
}.subscribe(object : Subscriber<Int?> {
override fun onSubscribe(subscription: Subscription) {
println("--- Subscriber: onSubscribe")
subscription.request(Long.MAX_VALUE)
}
override fun onNext(data: Int?) {
println("--- Subscriber: onNext: $data")
}
override fun onComplete() {
}
override fun onError(error: Throwable?) {
}
})
}
출력 결과
doOnSubscribe
--- Subscriber: onSubscribe
--- Subscriber: onNext: 1
--- Subscriber: onNext: 2
--- Subscriber: onNext: 3
--- Subscriber: onNext: 4
--- Subscriber: onNext: 5doOnRequest
Flowable이 데이터 개수를 요청받을 때 인자로 지정한 함수형 인터페이스의 처리를 실행하는 메서드.
fun main() {
Flowable.range(1, 3)
.doOnRequest { size -> println("기존 데이터: size = $size") }
.observeOn(Schedulers.computation())
.doOnRequest { size -> println("--- observeOn 적용 후: size = $size") }
.subscribe(object : Subscriber<Int?> {
private var subscription: Subscription? = null
override fun onSubscribe(subscription: Subscription) {
this.subscription = subscription
this.subscription!!.request(1)
}
override fun onNext(data: Int?) {
println(data)
subscription!!.request(1)
}
override fun onComplete() {
println("완료")
}
override fun onError(error: Throwable) {
println("에러: $error")
}
})
Thread.sleep(500L)
}
출력 결과
--- observeOn 적용 후: size = 1
기존 데이터: size = 128
1
--- observeOn 적용 후: size = 1
2
--- observeOn 적용 후: size = 1
3
--- observeOn 적용 후: size = 1
완료doOnCancel/doOnDispose
구독을 해지하면 지정한 처리를 실행하는 메서드.
fun main() {
Flowable.interval(100L, TimeUnit.MILLISECONDS)
.doOnCancel { println("doOnCancel") }
.subscribe(object : Subscriber<Long?> {
private var startTime: Long = 0
private var subscription: Subscription? = null
override fun onSubscribe(subscription: Subscription) {
startTime = System.currentTimeMillis()
this.subscription = subscription
this.subscription!!.request(Long.MAX_VALUE)
}
override fun onNext(data: Long?) {
if (System.currentTimeMillis() - startTime > 300L) {
println("구독 해지")
subscription!!.cancel()
return
}
println(data)
}
override fun onComplete() {
println("완료")
}
override fun onError(error: Throwable) {
println("에러: $error")
}
})
Thread.sleep(1000L)
}
출력 결과
0
1
구독 해지
doOnCancel'blocking'으로 시작하는 메서드
일반적으로 비동기 처리 결과를 테스트하려면 테스트가 실행되는 스레드에서 실행 결과를 받을때까지 대기할 수 있어야 한다. RxJava는 비동기 처리 결과를 현재 실행중인 스레드에서 받을 수 있는 메서드를 제공한다.
이러한 메서드들은 'blocking'으로 시작되며 다른 스레드에서 수행하는 Flowable/Observable의 통지를 이 메서드를 호출한 스레드에 반환한다.
따라서 테스트를 실행하는 스레드에서 비동기 통지의 결과값을 받을 수 있어 받은 결과값과 기댓값을 비교할 수 있다.
blockingFirst
메서드를 호출한 스레드에서 Flowable/Observable의 첫번쨰 통지 데이터를 받게하는 메서드.
호출한 스레드는 첫번째 통지 데이터를 받을때까지 대기하며 다음 처리를 진행하지 않는다.
subscribe 메서드를 호출해 처리를 시작하는 Flowable/Observable은 blockingFirst를 호출하면 처리를 시작하지만, ConnectableFlowable/ConnetableObservable처럼 subscribe 메서드를 호출해도 처리를 시작하지 않을 경우 blockingFirst를 호출한 시점에 처리가 멈추므로 주의하여야 한다.
blockingFirst 메서드가 결과를 반환하는 시점은 첫번째 데이터가 통지될 때이다. 통지할 데이터가 없으면 메서드의 인자 유무에 따라 수행하는 작업이 달라진다.
@Test
fun blockingFirstTest() {
val actual = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.blockingFirst()
assertEquals(actual, 0L)
}
blockingLast
메서드를 호출한 스레드에서 Flowable/Observable이 통지하는 마지막 데이터를 받게하는 메서드.
호출한 스레드는 마지막 통지 데이터를 얻을 때까지 다음 처리를 진행하지 않는다.
blockingLast 메서드가 결과를 반환하는 시점은 완료 통지 시점이라서 완료가 통지될 때 까지 다음 처리가 진행되지 않는다.
@Test
fun blockingLastTest() {
val actual = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(3)
.blockingLast()
assertEquals(actual, 2L)
}
blockingIterable
호출한 원본 스레드에서 Flowable/Observable이 통지하는 모든 데이터를 받는 Iterable을 얻게하는 메서드.
subscribe 메서드가 호출되면 처리를 시작하는 Flowable/Observable은 Iterable의 Iterator 메서드를 호출하는 시점부터 처리를 시작하므로 Flowable/Observable의 처리가 종료되지 않더라도 Iterable을 얻을 수 있다.
데이터를 받으려면 Iterable에서 Iterator를 얻어와 next를 호출한다. next를 호출할 때 데이터가 통지되지 않은 상태라면 기다린다. 반대로 통지된 데이터가 있더라도 next가 호출되지 않았으면 버퍼에 보관하고 대기한다. 따라서 장시간 데이터를 가져오지 않으면 버퍼를 초과할 수 있다.
데이터와 완료를 통지하는 Flowable/Observable에 사용해야 한다.
@Test
fun blockingIterableTest() {
val result = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(5)
.blockingIterable()
val iterator = result.iterator()
assertTrue(iterator.hasNext())
assertEquals(iterator.next(), 0L)
assertEquals(iterator.next(), 1L)
assertEquals(iterator.next(), 2L)
Thread.sleep(1000L)
assertEquals(iterator.next(), 3L)
assertEquals(iterator.next(), 4L)
assertFalse(iterator.hasNext())
}
blockingSubscribe
호출한 원본 스레드에서 소비자의 통지 데이터 처리를 실행할 수 있게 하는 메서드.
subscribe가 호출되면 처리를 시작하는 생산자는 이 blockingSubscribe 메서드를 호출해 처리를 시작한다.
blockingSubscribe 메서드의 인자로는 일반적으로 subscribe 메서드에 전달할 수 있는 Subscriber/Observer뿐 아니라 각 통지의 함수형 인터페이스들이 올 수 있다. 하지만 인자가 없는 blockingSubscribe 메서드는 일반적인 subscribe메서드와 달리 에러가 발생하면 에러를 다시 던지므로 주의해야 한다.
blockingSubscribe 메서드를 실행하면 모든 데이터의 통지 완료 처리가 끝나거나 에러가 발생해 에러 통지 처리가 끝날 때까지 호출한 스레드에서 이후 처리를 진행하지 않는다. 따라서 비동기적으로 생산자를 통지하고, 통지를 받은 소비자가 어떤 부가 작용을 처리할 때 이 부가 작용의 결과를 확인하는데에 blockingSubscribe 메서드를 사용한다.
@Test
fun blockingSubscribeTest() {
val flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
val counter = Counter()
flowable.blockingSubscribe(object : DisposableSubscriber<Long>() {
override fun onNext(t: Long?) {
counter.increment()
}
override fun onError(t: Throwable?) {
fail(t?.message)
}
override fun onComplete() {
}
})
assertEquals(counter.get(), 5)
}
private class Counter {
@Volatile
private var count = 0
fun increment() {
count++
}
fun get(): Int {
return count
}
}
TestSubscriber/TestObserver
테스트 용도로 사용하는 소비자 클래스.
TestSubscriber/TestObserver를 구독하면 생산자로부터 받은 통지를 확인할 수 있다.
통지와 관련된 테스트를 수행하는 assert 메서드, 지정된 시간 동안이나 이벤트가 발생할 때까지 대기하는 await메서드, 통지받은 데이터를 가져오는 메서드 등, 테스트 시에 필요한 메서드를 제공한다.
@Test
fun testSubscriberTest() {
val target = Flowable.interval(100L, TimeUnit.MILLISECONDS)
val testSubscriber = target.test()
testSubscriber.assertEmpty()
testSubscriber.await(150L, TimeUnit.MILLISECONDS)
testSubscriber.assertValue(0L)
testSubscriber.await(100L, TimeUnit.MILLISECONDS)
testSubscriber.assertValues(0L, 1L)
}
await 메서드
Flowable/Observable의 처리는 진행하게 하면서 자신의 처리는 지정한 시간동안 대기하게하는 메서드.
TestSubscribe/TestObserver 혹은 boolean등을 반환한다.
기타 메서드
assert와 values 외에도 다양한 메서드가 존재한다.
| 반환값 | 메서드 | 설명 |
|---|---|---|
| List |
values() | 받은 데이터를 리스트로 반환한다. |
| int | valueCount() | 받은 데이터의 건수를 반환한다. |
| boolean | isTerminated() | 완료 통지 또는 에러 통지를 받으면 true를 반환한다. |
| boolean | isCancelled()/isDisposed() | 구독이 해지되면 true를 반환한다. |
| Thread | lastThread() | 마지막 통지를 받았을 때의 스레드를 얻는다. |
| List |
errors() | 에러 통지 시 받은 에러 객체 리스트를 반환한다. |
| int | errorCount() | 에러 통지 시 받은 에러의 개수를 반환한다. |
| long | completions() | 완료 통지가 호출된 횟수를 반환한다. |
TestScheduler
지정한 시간에 진행할 처리를 실제로 시간을 쓰지 않고 테스트할 수 있는 스케줄러.
테스트할 때 실제로 실행하는 데 걸리는 시간 때문에 결과를 얻기 힘들 때 이 TestScheduler를 사용하여 즉시 테스트 결과를 얻을 수 있다.
하지만 interval 메서드나 timer메서드처럼 RxJava에서 시간과 관련된 데이터를 다룰때만 사용할 수 있고, 실제 처리의 속도를 단축시킬 수 있는것은 아니다.
주요 메서드
| 반환값 | 메서드 | 설명 |
|---|---|---|
| void | advanceTimeBy(long delayTime, TimeUnit unit) | 설정한 시간동안에만 처리를 수행하는 메서드 |
| void | advanceTimeTo(long delayTime, TimeUnit unit) | 설정한 시각까지 처리를 수행하는 메서드 |
| long | now(TimeUnit unit) | 스케줄러를 생성한 이후부터 현재까지 경과한 시간을 인자로 받은 시간 단위로 변환하는 메서드 |
TestScheduler 예제
@Test
fun testSchedulerTest() {
val start = System.currentTimeMillis()
val testScheduler = TestScheduler()
val flowable = Flowable.interval(500L, TimeUnit.MILLISECONDS, testScheduler)
val result = flowable.test()
println("data = ${result.values()}")
result.assertEmpty()
testScheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS)
println("data = ${result.values()}")
testScheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS)
println("data = ${result. values()}")
result.assertValues(0L, 1L)
testScheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS)
println("data = ${result.values()}")
result.assertValues(0L, 1L, 2L, 3L)
println("testScheduler#now= ${testScheduler.now(TimeUnit.MILLISECONDS)}")
val totalTime = System.currentTimeMillis() - start
println("테스트에 걸린 시간 = $totalTime")
}
테스트 결과
data = []
data = [0]
data = [0, 1]
data = [0, 1, 2, 3]
testScheduler#now= 2000
테스트에 걸린 시간 = 73'RxJava' 카테고리의 다른 글
| RxJava 안드로이드 프로젝트에 적용하기 02 (with Paging 3) (0) | 2022.04.05 |
|---|---|
| RxJava 안드로이드 프로젝트에 적용하기 01 (with Sunflower) (0) | 2022.04.05 |
| RxJava 05 - Processor/Subject (0) | 2022.02.16 |
| RxJava 04 - Flowable과 Observable의 연산자 (part.04) (0) | 2022.01.25 |
| RxJava 04 - Flowable과 Observable의 연산자 (part.03) (0) | 2022.01.18 |