이번 포스팅에서는 sequence들을 모으고,

여러 sequence내의 데이터들을 병합하는 방법에 대해 알아보자.

 

 

startWith(_:)

subscriber가 Observable을 구독하고, 요소를 방출받을 때 초기값을 지정해 주는 연산자이다. Observable의 타입과 초기값의 타입이 같아야 한다.

let observable = Observable<String>.of("A", "B", "C")

observable
    .startWith("alphabet will start to emit")
    .subscribe(
        onNext: { print($0) }
    )
    .disposed(by: disposBag)

/* prints:
 alphabet will start to emit
 A
 B
 C
*/

 

 

concat(_:)

concat2개의 Observable Sequence를 연결시켜주는 연산자이다.

startWith의 경우에는 하나의 요소만 방출하는 Observable을 앞에 연결시켜주는 concat의 단순한 형태이다.

concat의 사용법은 다음과 같다.

func observable1() -> Observable<String> {
    return Observable.create { emitter in
        emitter.onNext("A")
        emitter.onNext("B")
        emitter.onNext("C")
        emitter.onCompleted()
        
        return Disposables.create()
    }
}

let observable2 = Observable<String>.of("1", "2", "3")
// 2개의 concat의 결과는 같다!
Observable
    .concat([observable1(), observable2])
    .subscribe(
        onNext: { print($0) }
    )
    .disposed(by: disposBag)

observable1()
    .concat(observable2)
    .subscribe(
        onNext: { print($0) }
    )
    .disposed(by: disposBag)
    
/* prints:
 A
 B
 C
 1
 2
 3
*/

 

observable1()이 다 방출되고 completed이벤트가 방출되어야만,  observable2가 방출을 시작한다.

즉, observable1()의 모든 데이터가 observable2의 데이터보다 먼저 방출된다.

func observable() -> Observable<String> {
    return Observable.create { emitter in
        emitter.onNext("A")
        emitter.onNext("B")
        emitter.onNext("C")
        return Disposables.create()
    }
}

let observable2 = Observable<String>.of("1", "2", "3")

observable()
    .concat(observable2)
    .subscribe(
        onNext: { print($0) }
    )
    .disposed(by: disposBag)
    
/* prints:
 A
 B
 C
*/

이러한 특성 때문에 hot Observable의 경우 원하는 결과가 나오지 않을 수 있다.

let publishSubject1 = PublishSubject<String>()
let publishSubject2 = PublishSubject<String>()

publishSubject1
    .concat(publishSubject2)
    .subscribe(
        onNext: { print($0) }
    )
    .disposed(by: disposBag)
publishSubject1.onNext("A")
publishSubject1.onNext("B")
publishSubject1.onNext("C")
//publishSubject1.onCompleted() //주석 풀면 A B C 2 출력

publishSubject2.onNext("2")

/* prints:
 A
 B
 C
*/

추가적으로 두 Observable의 데이터 타입이 같아야 사용 가능하다.

 

 

concatMap(_:)

concatMapconcatflatMap을 합친거라고 생각하면 된다.

즉, flatMap과 비슷하지만, concat의 특징과 같이 순서를 보장해준다는 특징을 갖는다.

let alphabetPublish = PublishSubject<String>()
let koreanPublish = PublishSubject<String>()

let concatMapPublish = PublishSubject<PublishSubject<String>>()

concatMapPublish
    .concatMap { $0 }
    .subscribe(
        onNext: { print($0) }
    )
    .disposed(by: disposBag)
concatMapPublish.onNext(alphabetPublish)
concatMapPublish.onNext(koreanPublish)


alphabetPublish.onNext("A")


koreanPublish.onNext("가")
koreanPublish.onNext("나")

alphabetPublish.onNext("B")
alphabetPublish.onNext("C")

alphabetPublish.onCompleted()
koreanPublish.onNext("다")

/* prints:
 A
 B
 C
 다
*/

concat의 특성에 의해 alphabetPublishcompleted되지 않았기 때문에, 

koreanPublish는 "가", "나"를 방출하지 못한다.

 

 

merge()

mergeconcat처럼 두 개의 Sequence를 하나로 합쳐주면서, 각 Sequence에서 요소의 방출 시점도 유지시켜 준다.

하지만, 하나의 Sequence에서 error 이벤트가 발생하는 경우 Merge Observable Sequence는 종료된다.

let alphabetPublish = PublishSubject<String>()
let koreanPublish = PublishSubject<String>()

Observable
    .merge(alphabetPublish, koreanPublish)
    .subscribe(
        onNext: { print($0) }
    )
    .disposed(by: disposBag)
    
alphabetPublish.onNext("A")

koreanPublish.onNext("가")
koreanPublish.onNext("나")

alphabetPublish.onNext("B")
alphabetPublish.onNext("C")

koreanPublish.onNext("다")

koreanPublish.onCompleted()
alphabetPublish.onCompleted()

/* prints:
 A
 가
 나
 B
 C
 다
 completed
*/

모든 Observable에서 completed이벤트가 발생해야만

Merge Observable Sequencecompleted된다.

 

아래와 같이도 merge가 가능하다.

let mergeObservable = Observable.of(alphabetPublish.asObservable(), koreanPublish.asObservable())

mergeObservable
    .merge()
    .subscribe(
        onNext: { print($0) },
        onCompleted: { print("completed") }
    )
    .disposed(by: disposBag)

 

merge(maxConcurrent:)를 통해 몇 개의 Observable을 merge할지 제한할 수 있다.

public func merge(maxConcurrent: Int)
    -> Observable<Element.Element> {
    MergeLimited(source: self.asObservable(), maxConcurrent: maxConcurrent)
}

이는, maxConcurrent에 도달하면, observable을 대기열에 넣고,

현재 Sequence 중 하나가 completed되면 구독을 시작한다.

 

 

combineLatest

combineLates연산자는 최대 8개까지의 Observable에서 방출되는 요소들 중 가장 최근 요소들을 결합하여 전달해주는 연산자이다.

해당 연산자는 여러 개로 오버로딩 되어 있다.

 

최대 8개의 Observable Sequence의 데이터를 결합하여 전달할 수 있는데,

데이터는 튜플 형태로 묶여 전달된다.

let observable1 = Observable<Int>.of(1,2,3)
let observable2 = Observable<Int>.of(4,5,6)
let observable3 = Observable<Int>.of(7,8,9)
let observable4 = Observable<Int>.of(10,11,12)
let observable5 = Observable<Int>.of(13,14,15)
let observable6 = Observable<Int>.of(16,17,18)
let observable7 = Observable<Int>.of(19,20,21)
let observable8 = Observable<Int>.of(22,23,24)

Observable
    .combineLatest(observable1,observable2, observable3, observable4, observable5, observable6, observable7, observable8)
    .subscribe(
        onNext: { print($0) }
    ).disposed(by: disposBag)
    
/* Prints:
 (1, 4, 7, 10, 13, 16, 19, 22)
 (2, 4, 7, 10, 13, 16, 19, 22)
 ...
 (3, 6, 9, 12, 15, 18, 21, 23)
 (3, 6, 9, 12, 15, 18, 21, 24)
*/

또한, Collection Type으로 결합할 Observable Sequence를 전달할 수 있는데, 

8개보다 더 많은 Observable Sequence를 결합할 수 있으며, 데이터는 Collection Type으로 방출된다. 

Observable
    .combineLatest([observable1,observable2, observable3, observable4, observable5, observable6, observable7, observable8, observable9, observable10])
    .subscribe(
        onNext: { print($0) }
    ).disposed(by: disposBag)
    
/* Prints:
 [1, 4, 7, 10, 13, 16, 19, 22, 25, 28]
 [2, 4, 7, 10, 13, 16, 19, 22, 25, 28]
 ...
 [3, 6, 9, 12, 15, 18, 21, 24, 27, 29]
 [3, 6, 9, 12, 15, 18, 21, 24, 27, 30]
*/

 

resultSelector라는 클로져를 통해 요소들이 결합되는 로직을 직접 구현할 수도 있다.

해당 경우에는 결합되는 Observable의 타입이 달라도 상관없다.

let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()

Observable
    .combineLatest(subject1, subject2){
        "\($0) + \($1) = \($0 + $1)"
    }
    .subscribe(
        onNext: { print($0) }
    ).disposed(by: disposBag)

subject1.onNext(1)
subject1.onNext(2)

subject2.onNext(3)
subject2.onNext(4)

/* Prints:
 2 + 3 = 5
 2 + 4 = 6
*/

combineLatest연산자는 로그인과 같이 n개의 요소들을 묶어 한번에 전달할 때, 유용하게 사용할 수 있다. 

 

 

zip

zip연산자는 combineLatest와 같이 여러 Observable Sequence의 데이터를 결합해주는 연산자이지만, 

새로운 데이터들의 튜플 조합으로 방출한다.

combineLatest와 같이, 8개의 Observable Sequence를 결합할 수 있으며, 

Collection Type의 경우는 8개보다 더 많은 Sequence를 결합할 수 있다. 

또한, resultSelector라는 클로저를 통해 결합되는 로직을 작성할 수 있다.

 

하지만, zip에서는 Observable의 타입이 같아야 결합이 가능하다. 

Observable
    .zip(subject1, subject2){
        "\($0) + \($1) = \($0 + $1)"
    }
    .subscribe(
        onNext: { print($0) }
    ).disposed(by: disposBag)

subject1.onNext(1)
subject1.onNext(2)

subject2.onNext(3)
subject2.onNext(4)

/* Prints:
 1 + 3 = 4
 2 + 4 = 6
*/

/* CombineLatest 결과:
 2 + 3 = 5
 2 + 4 = 6
*/

위와 같이 combineLatest는 최근의 element와 결합하는 반면, 

zip은 다른 Sequence에서 값이 방출되기까지 기다리다가,

방출이 되면 묶어서 데이터를 전달한다.

 

 

withLastestFrom

withLatestFromtigger가 되는 Observable에서 데이터가 방출되면, 

target Observable의 데이터를 방출하는 연산자이다.

let trigger = PublishSubject<String>()
let target = PublishSubject<Int>()

trigger
    .withLatestFrom(target)
    .subscribe(
        onNext: { print("\($0) 방출됨.") }
    ).disposed(by: disposBag)

target.onNext(1)
target.onNext(2)

trigger.onNext("방출")
trigger.onNext("방출")

target.onNext(3)
trigger.onNext("방출")

/* Prints:
 2 방출됨.
 2 방출됨.
 3 방출됨.
*/

클로져를 통해 방출되는 데이터를 조작할 수 있다. 

trigger
    .withLatestFrom(target) { triggerElement, targetElement in
        "\(triggerElement) \(targetElement)됨."
    }
    .subscribe(
        onNext: { print($0) }
    ).disposed(by: disposBag)

/* Prints:
 2 방출됨.
 2 방출됨.
 3 방출됨.
*/

 

해당 Observable은 trigger에서 completed이벤트가 발생하면 결합된 sequence가 종료되지만, 

target Observable에서 completed이벤트가 발생하는 경우, 결합된 sequence가 종료되지 않는다.

 

 

sample

samplewithLatestFrom처럼 trigger에서 방출될 때, target의 데이터를 방출하지만, 

가장 최근의 데이터를 한 번만 전달한다.

 

withLatestFrom은 trigger에서 호출된 메서드인 반면, 

sample target에서 호출되는 메서드이다. 

target
    .sample(trigger)
    .subscribe(
        onNext: { print("\($0) 방출됨.") },
        onCompleted: ( {print("completed")} )
    ).disposed(by: disposBag)

target.onNext(1)
target.onNext(2)

trigger.onNext("방출")
trigger.onNext("방출")

target.onNext(3)
trigger.onNext("방출")


/* Prints:
 2 방출됨.
 3 방출됨.
*/

 

 

amb

amb는 ambiguous(모호한)의 약자로, 

2개의 Sequence중 가장 먼저 데이터를 방출한 Sequence를 선택하고,

나머지 Sequence에 대해서는 구독을 중단한다.

amb는 둘 중 하나의 sequence를 선택하는 것이기 때문에, 

두 Observable의 타입이 같아야 한다.

let one = PublishSubject<Int>()
let two = PublishSubject<Int>()

one
    .amb(two)
    .subscribe(
        onNext: { print($0) }
    ).disposed(by: disposBag)

two.onNext(2)

one.onNext(1)
one.onNext(1)
one.onNext(1)

two.onNext(2)

/* Prints:
 2
 2
*/

 

 

switchLatest

Observable을 switch시키면서 가장 최근에 전달(switch)된 Observable의 요소를 전달하고,

이전에 구독했던 Observable은 구독을 취소한다.

Tranforming Operator에서 살펴보았던, flatMapLatestmap + switchLatest이다.

 

따라서, switchLatest는 병합되는 로직을 구현할 수 있는 클로저를 제공하지 않는다.

let one = PublishSubject<Int>()
let two = PublishSubject<Int>()
let three = PublishSubject<Int>()

let source = PublishSubject<Observable<Int>>()

source
    .switchLatest()
    .subscribe(
        onNext: { print($0) },
        onCompleted: { print("completed") }
    ).disposed(by: disposBag)

print("--one 구독--")
source.onNext(one)
one.onNext(1)
one.onNext(1)
three.onNext(3)

print("--two 구독--")
source.onNext(two)
two.onNext(2)
one.onNext(1)
three.onNext(3)
two.onNext(2)

print("--three 구독--")
source.onNext(three)
one.onNext(1)
three.onNext(3)

print("--one 구독--")
source.onNext(one)
one.onNext(1)
three.onNext(3)

/* Prints:
 --one 구독--
 1
 1
 --two 구독--
 2
 2
 --three 구독--
 3
 --one 구독--
 1
*/

flatMapLatest와 동일하게,

현재 구독 중인 sequence에서 completed이벤트가 발생하고, 

source를 completed 시켜야만, sequence가 종료된다.

print("--three 구독--")
source.onNext(three)
one.onNext(1)
three.onNext(3)

three.onCompleted() //구독중인 sequence 종료
source.onCompleted()

/* Prints:
 --one 구독--
 1
 1
 --two 구독--
 2
 2
 --three 구독--
 3
 completed
 --one 구독--
*/

 

 

reduce

Swift에서 제공되는 reduce와 Rx에서만 제공된다는 점만 제외하곤 동일하다. 

아래 그림에서 보듯이, 데이터가 전달되고 completed이벤트가 발생해야만 데이터가 전달된다.

let number = PublishSubject<Int>()

number
    .reduce(0, accumulator: +)
    .subscribe(
        onNext: { print($0) },
        onCompleted: { print("completed") }
    ).disposed(by: disposBag)

number.onNext(1)
number.onNext(2)
number.onNext(3)
number.onNext(4)
number.onCompleted()

/* Prints:
 10
 completed
*/

 

 

scan

scan연산자는 reduce와 유사하지만, 

completed이벤트에서 데이터를 방출하던 reduce와 달리, 

데이터가 방출될 때마다 연산을 적용하여 전달한다.

number
    .scan(0, accumulator: +)
    .subscribe(
        onNext: { print($0) },
        onCompleted: { print("completed") }
    ).disposed(by: disposBag)

number.onNext(1)
number.onNext(2)
number.onNext(3)
number.onNext(4)
number.onCompleted()

/* Prints:
 1
 3
 6
 10
 completed
*/

 

 

 

Reference

https://github.com/fimuxd/RxSwift/blob/master/Lectures/09_Combining%20Operators/Ch9.CombiningOperators.md

https://www.notion.so/Wallaby-RxSwift-72194669a39a4557baa69c672268af38

'iOS > RxSwift' 카테고리의 다른 글

[RxSwift] Relay vs. Signal vs. Driver  (0) 2023.07.25
[RxSwift] Operator(4) - Share  (0) 2023.07.24
[RxSwift] Traits  (0) 2022.12.28
[RxSwift] Operator(2) - Transforming  (2) 2022.12.27
[RxSwift] Operator(1) - Filtering  (1) 2022.12.25
복사했습니다!