Monti

Swift) RxSwift - Observable 본문

IOS/Swift

Swift) RxSwift - Observable

montt 2022. 10. 23. 21:47

Observable


Rx에서 옵저버는 Observable을 구독한다. Observable이 배출하는 하나 또는 연속된 항목에 옵저버는 반응하여 동시성 연산을 가능하게 한다.

onNext, onCompleted, 그리고 onError

  • onNext

Observable은 새로운 항목들을 배출할 때마다 이 메서드를 호출한다. 이 메서드는 Observable이 배출하는 항목을 파라미터로 전달 받는다.

  • onError

Observable은 기대하는 데이터가 생성되지 않았거나 다른 이유로 오류가 발생할 경우 오류를 알리기 위해 이 메서드를 호출한다. 이 메서드가 호출되면 onNex나 onCompleted는 더 이상 호출되지 않는다.
onError 메서드는 오류 정보를 저장하고 있는 객체를 파라미터로 전달 받는다.

  • onCompleted

오류가 발생하지 않았다면 Observable은 마지막 OnNext를 호출한 후 이 메서드를 호출한다.
Observable 계약에 명시된 조건에 따라, onNext는 0번 이상 호출 될 수 있으며 그 후에는 onCompleted 또는 onError 둘 중 하나를 마지막으로 호출한다. 단, 이 둘 모두를 호출하지는 않는다.

구독 해지

Rx 구현체 중에는 Subscriber 라는 특별한 옵저버 인터페이스가 있는데 이 인터페이스는 unsubscribe라는 메서드를 제공한다. 현재 구독 중인 Observable 중, 옵저버가 더 이상 구독을 원하지 않는 경우에는 이 메서드를 호출해서 구독을 해지할 수 있다. 만약 더 이상 관심있는 다른 옵저버가 존재하지 않는다면 Observable들은 새로운 항목들을 배출하지 않는다.

Rx Observable Doc

Observable

Operators


Rx에서는 다양한 연산자를 제공한다.

연산자 체인

거의 모든 연산자들은 Observable 상에서 동작하고 Observable을 리턴한다. 이 접근 방법은 연산자들을 연달아 호출 할 수 있는 연산자 체인을 제공한다. 연산자 체인에서 각각의 연산자는 이전 연산자가 리턴한 Observable을 기반으로 동작하며 동작에 따라 Observable을 변경한다.
특정 클래스가 제공하는 여러 메서드들을 호출하여 클래스의 항목들을 변경하는 빌더 패턴과 같은 것도 존재한다. 빌더 패턴 역시 연산자 체인과 유사하게 메서드 체인을 제공한다.

Rx Operator Doc

ReactiveX - Operators

Single


Single은 Observable의 한 형태이지만, Observable처럼 존재하지 않는 곳에서부터 무한대까지의 임의의 연속된 값들을 배출하는 것과는 달리, 항상 한 가지 값 또는 오류 알림 둘 중 하나만 배출한다.
이런 이유 때문에, Single을 구독할 때는 Observable을 구독할 때 사용하는 세 개의 메서드 (onNext, onError, 그리고 onCompleted) 대신 다음의 두 메서드만 사용할 수 있다.

  • OnSuccess

Single은 자신이 배출하는 하나의 값을 이 메서드를 통해 전달한다.

  • OnError

Single은 항목을 배출할 수 없을 때 이 메서드를 통해 Throwable 객체를 전달한다.

Rx Single Doc

Single

Subject


Subject는 옵저버나 Observable처럼 행동하는 Rx의 일부 구현체에서 사용 가능한 일종의 교각 혹은 프록시라고 볼 수 있는데, 그 이유는 Subject는 옵저버이기 때문에 하나 이상의 Observable을 구독할 수 있으며 동시에 Observable이기도 하기 때문에 항목들을 하나 하나 거치면서 재배출하고 관찰하며 새로운 항목들을 배출할 수도 있다.
하나의 Subject는 하나의 Observable을 구독하면서, Observable이 항목들을 배출시키도록 동작시킨다.

Subject 종류

Subject의 종류는 모두 4종류이며, 각각의 Subject는 특정 상황에 맞도록 설계되었다.

AysncSubject

AsyncSubject는 소스 Observable로부터 배출된 마지막 값을 배출하고 소스 Observable의 동작이 완료된 후에야 동작한다. (만약, 소스 Observable이 아무 값도 배출하지 않으면 AsyncSubject 역시 아무 값도 배출하지 않는다.)

또한, AsyncSubject 는 맨 마지막 값을 뒤 이어 오는 옵저버에 전달하는데, 만약 소스 Observable이 오류로 인해 종료될 경우 AsyncSubject는 아무 항목도 배출하지 않고 발생한 오류를 그대로 전달한다.

BehaviorSubject

옵저버가 BehaviorSubject를 구독하기 시작하면, 옵저버는 소스 Observable이 가장 최근에 발행한 항목(또는 아직 아무 값도 발행되지 않았다면 맨 처음 값이나 기본 값)의 발행을 시작하며 그 이후 소스 Observable(들)에 의해 발행된 항목들을 계속 발행한다.

만약, 소스 Observable이 오류 때문에 종료되면 BehaviorSubject는 아무런 항목도 배출하지 않고 소스 Observable에서 발생한 오류를 그대로 전달한다.

PublishSubject

PublishSubject는 구독 이후에 소스 Observable(들)이 배출한 항목들만 옵저버에게 배출한다.

주의할 점은, PublishSubject는 (이를 막지 않는 이상) 생성 시점에서 즉시 항목들을 배출하기 시작할 것이고 이런 특성 때문에 주제가 생성되는 시점과 옵저버가 이 주제를 구독하기 시작하는 그 사이에 배출되는 항목들을 잃어 버릴 수 있다는 단점이 있다. 따라서, 소스 Observable이 배출하는 모든 항목들의 배출을 보장해야 한다면 [Create](https://reactivex.io/documentation/ko/operators/create.html)을 사용해서 명시적으로 "차가운" Observable(항목들을 배출하기 전에 모든 옵저버가 구독을 시작했는지 체크한다)을 생성하거나, PublishSubject 대신 ReplaySubject를 사용해야 한다.

만약, 소스 Observable이 오류 때문에 종료되면 BehaviorSubject는 아무런 항목도 배출하지 않고 소스 Observable에서 발생한 오류를 그대로 전달한다.

ReplaySubject

ReplaySubject는 옵저버가 구독을 시작한 시점과 관계 없이 소스 Observable(들)이 배출한 모든 항목들을 모든 옵저버에게 배출한다.

ReplaySubject는 몇 개의 생성자 오버로드를 제공하는데 이를 통해, 재생 버퍼의 크기가 특정 이상으로 증가할 경우, 또는 처음 배출 이후 지정한 시간이 경과할 경우 오래된 항목들을 제거한다.

만약, ReplaySubject을 옵저버로 사용할 경우, 멀티 스레드 환경에서는 Observable 계약 위반과 주제에서 어느 항목 또는 알림을 먼저 재생해야 하는지 알 수 없는 모호함이 동시에 발생할 수 있기 때문에 (비순차적) 호출을 유발시키는 onNext(또는 그 외 on) 메서드를 사용하지 않도록 주의해야 한다.

ReactiveX - Subject

Scheduler


Observable 연산자 체인에 멀티스레딩을 적용하고 싶다면, 특정 스케줄러를 사용해서 연산자(또는 특정 Observable)를 실행하면 된다.
ReactiveX의 일부 Observable 연산자는 사용할 스케줄러를 파라미터로 전달 받기도 하는데, 이 연산자들은 자신이 처리할 연산의 일부 또는 전체를 전달된 스케줄러 내에서 실행한다.

기본적으로, Observable과 연산자 체인은 이처럼 스케줄러를 통해 동작하고 Subscribe 메서드가 호출되는 스레드를 사용해서 옵저버에게 알림을 보낸다. SubscribeOn 연산자는 다른 스케줄러를 지정해서 Observable이 처리해야 할 연산자들을 실행 시킨다. 그리고, ObserveOn 연산자는 Observable이 옵저버에게 알림을 보낼때 사용 할 스케줄러를 명시한다.

아래 그림이 보여주듯, SubscribeOn 연산자는 Observable이 연산을 위해 사용할 스레드를 지정하며, 연산자 체인 중 아무 곳에서 호출해도 문제되지 않는다. 하지만, ObserveOn 연산자는 연산자 체인 중 Observable이 사용할 스레드가 호출 체인 중 어느 시점에서 할당되는지에 따라 그 후에 호출되는 연산자는 영향을 받는다. 그렇기 때문에, 어쩌면 여러분은 특정 연산자를 별도의 스레드에서 실행 시키기 위해 연산자 체인 중 한 군데 이상에서ObserveOn을 호출하게 될 것이다.

https://reactivex.io/documentation/operators/images/schedulers.png

ReactiveX - Scheduler

Comments