article thumbnail image
Published 2022. 11. 27. 01:03

 

이번 포스팅에서는 RxSwift에서 핵심이라고 할 수 있는 Observable에 대해서 알아보자

 

Observable의 사전적 정의는 "관찰 가능한" 인데, 

저번 포스팅에서 말했듯이,

Observable은 관찰 가능한 시퀀스를 통해! 이벤트를 방출 한다.

 

이러한 이유 때문에, 

RxSwift에서는 이 Observable을 sequence, Observable Sequence라고도 부른다.

 

Observable은 이 이벤트를 방출할때, 비동기로 이벤트를 방출한다!

 

 

Observable 생명 주기 

우선 Observable은 3가지 종류의 이벤트를 방출 할 수 있는데, 

  • next
  • error
  • completed

 

실제 Observable의 이벤트를 보면

@frozen public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}

이렇게 enum 케이스로 정리가 되어있다. 

 

next

Observable은 element들을 next이벤트를 통해 방출한다. 

또한 이는 onNext 메서드를 통해 방출한다.

 

completed

completed는 모든 element를 방출하였고, 

Sequence(Observable)가 정상 종료되었음을 알리는 이벤트이다.

이 이벤트는 onCompleted 메서드를 통해 방출한다.

 

error

Completed와 Seqence를 종료 시킨다는 점에서는 같지만, 

에러가 발생하여 종료되었음을 알리는 이벤트이다.

이는 onError 메서드를 통해 방출한다.

 

func downLoadJson(_ url: String) -> Observable<String> {
    return Observable.create { emitter in
        let jsonURL = URL(string: url)!
        
        let task = URLSession.shared.dataTask(with: jsonURL) { data, _, err in
            if err != nil {
                emitter.onError(err!)  // error 이벤트 방출 -> sequence 종료
            }
            if let emitData = data {
                let strData = String(data: emitData, encoding: .utf8)!
                emitter.onNext(strData) // next 이벤트 방출 -> sequence 유지
            }
            emitter.onCompleted() // completed 이벤트 방출 -> sequence 종료
        }
        task.resume()
        
        return Disposables.create() {
            task.cancel()
        }
    }
}

 

 

 

Subscribe

방금 살펴 보았듯이 Observable이 이벤트를 방출한 이벤트를 받고 싶으면,

이를 관찰하기 위해 Subscriber(Observer)는 이를 subscribe 해야 한다.

 

subscribe 할 수 있는 방법에는 여러 가지가 있는데 그중 2가지에 대해서 알아보자

 

첫 번째는 아래와 같이 처리하는 방법이다.

let json = downLoadJson(TEST_URL)
    .subscribe(onNext: { _ in print("next Event")},
               onError: {_ in print("err Event")},
               onCompleted: {print("completed Event")},
               onDisposed: {print("disposed!")})

여기서 보면 onNext는 next이벤트,

onError는 error이벤트, 

onCompleted는 completed이벤트를 처리한다. 

여기까지는 Observable이 방출하는 이벤트 타입과 일치하지만!

 

onDisposed라는  생소한 것이 보이는데,

이는 Observable의 stream이 완전히 종료되고 리소스가 해제되면, 해당 클로저가 호출되는 것일 뿐

Observable이 방출하는 이벤트는 아니다.

 

 

두 번째 방법은, event로 처리하는 방법인데, 

let observable = Observable.of(1, 2, 3)

observable.subscribe{ event in
    print(event)
}

/* Prints:
 next(1)
 next(2)
 next(3)
 completed
 */
}

 

event는 Observable에서 방출하는 이벤트 타입이므로 

next로 전달된 이벤트를 받고 싶다면! 

observable.subscribe{ event in
    switch event {
    case .next(let data):  //next 이벤트에 대한 처리
        print(data)
    case .completed:       //completed 이벤트에 대한 처리
        print("completed")
    case .error(let err):  //error 이벤트에 대한 처리
        print(err)
    }
}

여기서 dispose에 대한 이벤트가 없는 이유는 아까도 말했듯이, 

dispose는 Observable을 통해 방출되는 이벤트 타입이 아니기 때문이다!

 

만약 next 이벤트만 처리하고 싶은 경우

observable.subscribe(onNext: {print($0)})

이렇게 표현이 가능하다! 

 

 

순환 참조 문제

특정 이벤트를 처리할 경우 클로져를 통해서 처리가 된다. 

Memory Leak을 다루었던 포스팅에서 언급했듯이,

Swift에서 클로져는 기본적으로 강한 참조를 기반으로 한다. 

그렇기 때문에 메모리 누수가 발생할 수 있는데, 

 

 

weak 키워드를 통해 해결할 수 있지만, 

observable.subscribe(onNext: { [weak self] _ in
        self?.someFunc()
})

 

subsribe의 with 옵션으로도 해결이 가능하다.

observable.subscribe(with: self, onNext: { strongSelf, _ in
    strongSelf.someFunc()
})

 

 

Disposable

실제 subscribe 할 때 보면, 

이렇게 return type이 Disposable 객체인 것을 볼 수 있다. 

Disposable의 사전적 정의는 "버릴 수 있는"인데, 

즉 버릴 수 있는 상태가 된다.

 

해당 Disposable은 dispose()라는 메서드가 있는데, 

이 메서드를 통해 Observable과 Subscriber 사이의 stream을 끊어준다

observable.subscribe(onNext: {print($0)})
    .dispose()

이런 식으로 사용이 가능하다! 

 

하지만, onDisposed()를 다룰 때 보았듯이

Observable이 completed나, error 이벤트가 방출이 되면, 

stream이 끊기게 되는데

해당 코드에선, .dispose()는 사실상 필요가 없는 코드이다.

(이미 stream이 끊긴 상태에서 .dispose()로 한 번 더 끊어 준 셈이 된다.)

 

 

그렇다면, 언제 필요할까? 

예를 들어 1초에 한 번씩 총  "A" 를 출력하는 AViewController가 있을 때, 

다른 SubViewController로 이동하고 AViewController를 해제할 경우,

let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)

let disposable = observable.subscribe(
    onNext: {_ in print("A")},
    onCompleted: {print("completed")},
    onDisposed:{ print("dispose!")})
    

/* Prints:
 A
 A
 A
 viewDeinit
 A
 A
 ...
 */

해당 Observable과 subscriber의 stream은 메모리에서 해제되지 않는다.

이는 메모리 누수 문제로 이어지게 된다. 

 

view가 사라지는 시점에서

disposable.dispose()

이런 식으로 해제 해주여 한다. 

 

하지만, 해당 Observable을 구독한 Subscriber가 하나가 아닌 여러 개일 경우,

코드가 복잡해지는데, 

 

이는 disposBag로 해결이 가능하다. 

 

DisposBag

disposBag은 여러 disposable 객체들을 (Observable subscribe 된 상태!)

한 번에 가방처럼 관리한다.

 

var disposBag = DisposeBag()

let disposable = observable.subscribe(
    onNext: {_ in print("A")},
    onCompleted: {print("completed")},
    onDisposed:{ print("dispose!")})
    .disposed(by: disposBag)		//disposBag에 넣어줌!

 

이런 식으로 선언해주게 되면, 

 

/* Prints:
 A
 A
 A
 viewDeinit
 dispose!
 */

해당 AViewController 가 메모리에서 해제되는 시점에,

disposBag도 메모리에서 해제되게 되므로, 안에 담겨있던 subscriber의 stream은 다 끊기게 된다!

 

 

이는 원하는 시점에 해제도 가능한데, 

disposBag = nil

nil을 통해서 원하는 시점에 해제가 가능하다. 해당 경우에는 disposBag를 optional로 선언해주어야 한다.

 

Reference

https://reactivex.io/documentation/observable.html

https://www.notion.so/Wallaby-RxSwift-72194669a39a4557baa69c672268af38

'iOS > RxSwift' 카테고리의 다른 글

[RxSwift] Operator(2) - Transforming  (2) 2022.12.27
[RxSwift] Operator(1) - Filtering  (1) 2022.12.25
[RxSwift] Subjects  (0) 2022.12.01
[RxSwift] Observable(2) - Creating Observable  (0) 2022.11.28
[RxSwift] RxSwift란?  (0) 2022.11.25
복사했습니다!