Java

[RxJava] RxJava 에서 Observable 와 Flowable 선택하여 사용하는 기준

Question영 2019. 12. 3. 14:21
반응형

Observable 와 Flowable 선택하여 사용하시는 기준에 대해 개인적으로 이해한 부분을 정리하여 포스팅하도록 하겠습니다.

 

 

시스템 환경

운영 체제 : macOS Mojave
Java 1.8.0_201

junit 4.12

 

 

'배압' 이란?

 

국어사전에는 증기 원동기 또는 내연 기관에서 뿜어져 나오는 증기나 가스의 압력. 압력이 높으면 기관의 효율이 떨어진다. 라고 정의하고 있습니다.

 

프로그램 입장에서 해석해보자면 요청하는 작업이 많으면 압력이 높아지는 것으로 표현할수 있겠네요.

 

Flowable 은 RxJava 의 작업의 배압에 관련된 함수들을 포함하고 있으며 2.x 버전에서 도입되었습니다.

 

개인적으로 이해한 개념을 적어보자면

 

'한정된 자원 환경에서 처리 요청을 받아 정해진 로직을 수행 을 문제 없이 수행하기 위한 기능을 사용하기 위한 클래스'

 

라고 볼수 있습니다.

 

데이터를 처리하는데 이슈가 없으면 기본적으로 Observable 사용해라

공식 사이트 에서 정의하고 있는 사항을 간략하게 요약 해보았습니다.

 

ReactiveX/RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. - ReactiveX/RxJava

github.com

 

  • 1000개 미만 처리 작업
  • OOME(Out of Memory Error) 에러 가능성 낮은 작업
  • GUI 이벤트(마우스, 터치) 작업
  • Java 스트림 API 를 사용하지 않는 경우

 

위의 사항들은 기본적으로 권장하는 최소한의 단위로 보통 RxJava 를 이용하여 로직을 구성할때 Observable 을 이용하시면 됩니다.

 

데이터가 많거나 시간이 지연되는 등의 예외처리가 필요한 사항에서 먼저 Observable 의 sample(), throttle(), debounce() 등의 기능을

 

이용해보거나 Flowable 을 사용하면 됩니다.

 

통신, DB, 대량의 데이터 처리에는 Flowable 사용하자

 

공식 사이트 에서 정의하고 있는 사항을 간략하게 요약 해보았습니다.

 

ReactiveX/RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. - ReactiveX/RxJava

github.com

 

  • 10,000개 이상 처리 작업
  • 디스크에서 파일을 읽어드릴경우
  • JDBC 를 이용한 DB 쿼리 결과를 가져오는 작업
  • 네트워크 I/O
  • 다수 혹은 논 블로킹 방식의 데이터 API 를 요청하는 작업

 

위의 내용들을 종합해 봤을때 개인적으로 Flowable 은

 

어떤 다수의 데이터를 처리하기 위한 비동기적 작업을 진행하려고 할때 작업양제한된 실행 환경에서 발생이 예상되는

 

예외 사항을 처리를 하기 위해 사용하는 것으로 정리해 볼수 있었습니다.

 

Flowable 의 배압 관련 기능

 

onBackPressureBuffer()

  • 배압 문제 발생 시 별도의 버퍼에 저장

  • Flowable은 기본적으로 128개의 버퍼가 있음.

  • Parameter

    • long capacity

      버퍼 갯수

    • Action onOverflow

      요청 작업양이 설정한 버퍼 허용치를 넘었을때 실행 동작 정의 (리스너 함수)

    • BackpressureOverflowStrategy overflowStrategy

      설정한 버퍼가 가득 찼을때 정의된 3가지의 실행 동작 중 하나 설정

      • DROP_OLDEST : 오버 플로우 발생시 버퍼의 가장 오래된 데이터를 제거 설정
      • DROP_LATEST : 오버 플로우 발생시 버퍼의 가장 최근 데이터를 제거 설정
      • ERROR : 오버 플로우 발생시 예외를 던지고 스트림 중지

 

onBackPressureDrop()

  • 배압 문제 발생 시 해당 데이터 무시.

 

onBackPressureLatest()

  • 배압 문제 발생 시 해당 데이터 무시.
  • 최신 데이터만 유지.

 

기본 예시

PublishSubject<Integer> subject = PublishSubject.create();

subject.observeOn(Schedulers.computation())
  .subscribe(data -> {
    Thread.sleep(100);
    System.out.println(data);
  }, error -> System.out.println(error.getMessage()));

for (int i = 0; i < 50000000; i++) {
  subject.onNext(i);
}

subject.onComplete();

 

for 문의 실행시간은 13 ~14 초 걸리지만

 

뜨거운 Observable 의 PublishSubject 의 구독 진행시 sleep 0.1 초 진행하기 때문에 결과 값은 다음과 같이 나타납니다.

 

// 결과
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14

Process finished with exit code 0

 

onBackPressureBuffer 예시

Flowable.range(1, 50_000_000)
                .onBackpressureBuffer(128, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST)
                .observeOn(Schedulers.computation())
                .subscribe(data -> {
                    Thread.sleep(100);
                    System.out.println(data);
                }, error -> System.out.println(error.getMessage()));

// 결과 =========================================================================================
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Process finished with exit code 0

 

앞의 subject 예시보다 조금 더 빠르고 많이 처리 진행되었으나 배합 문제가 되고 있는 상황입니다.

 

눈에 띄는 것은 subject 와 달리 오버플로우가 발생했을때 처리를 진행할 수 있다는 게 특징입니다.

 

onBackpressureDrop 예시

 Flowable.range(1, 50_000_000)
                .onBackpressureDrop()
                .observeOn(Schedulers.computation())
                .subscribe(data -> {
                    Thread.sleep(100);
                    System.out.println(data);
                }, error -> System.out.println(error.getMessage()));

        try {
            Thread.sleep(15000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

// 결과 =========================================================================================
1
2
3
...
128

Process finished with exit code 0

 

Thread.sleep(20000) 을 해준 이유는 버퍼에 담긴 128개 외에 데이터 처리 작업이 무시가 되는 상황을 보기 위함입니다.

 

onBackpressureLatest 예시

Flowable.range(1, 50_000_000)
                .onBackpressureLatest()
                .observeOn(Schedulers.computation())
                .subscribe(data -> {
                    Thread.sleep(100);
                    System.out.println(data);
                }, error -> System.out.println(error.getMessage()));

        try {
            Thread.sleep(15000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

// 결과 =========================================================================================
1
2
3
...
128
50000000

Process finished with exit code 0

 

이전 onBackpressureDrop 예시와 다른 점은 128 버퍼에 담긴 작업에 대한 처리와 별도로 마지막 최신 데이터 작업이 처리된 부분입니다.

 


참고 자료

반응형

'Java' 카테고리의 다른 글

[Java/Error]reached end of file while parsing  (0) 2019.12.13
[Java/Error] Unhandled exception: Java.lang.exception  (0) 2019.12.10