이번 포스팅에서는 sequence들을 모으고,
여러 sequence내의 데이터들을 병합하는 방법에 대해 알아보자.
startWith(_:)
subscriber가 Observable을 구독하고, 요소를 방출받을 때 초기값을 지정해 주는 연산자이다. Observable의 타입과 초기값의 타입이 같아야 한다.
let observable = Observable<String>.of("A", "B", "C")
observable
.startWith("alphabet will start to emit")
.subscribe(
onNext: { print($0) }
)
.disposed(by: disposBag)
/* prints:
alphabet will start to emit
A
B
C
*/
concat(_:)
concat은 2개의 Observable Sequence를 연결시켜주는 연산자이다.
startWith의 경우에는 하나의 요소만 방출하는 Observable을 앞에 연결시켜주는 concat의 단순한 형태이다.
concat의 사용법은 다음과 같다.
func observable1() -> Observable<String> {
return Observable.create { emitter in
emitter.onNext("A")
emitter.onNext("B")
emitter.onNext("C")
emitter.onCompleted()
return Disposables.create()
}
}
let observable2 = Observable<String>.of("1", "2", "3")
// 2개의 concat의 결과는 같다!
Observable
.concat([observable1(), observable2])
.subscribe(
onNext: { print($0) }
)
.disposed(by: disposBag)
observable1()
.concat(observable2)
.subscribe(
onNext: { print($0) }
)
.disposed(by: disposBag)
/* prints:
A
B
C
1
2
3
*/
observable1()이 다 방출되고 completed이벤트가 방출되어야만, observable2가 방출을 시작한다.
즉, observable1()의 모든 데이터가 observable2의 데이터보다 먼저 방출된다.
func observable() -> Observable<String> {
return Observable.create { emitter in
emitter.onNext("A")
emitter.onNext("B")
emitter.onNext("C")
return Disposables.create()
}
}
let observable2 = Observable<String>.of("1", "2", "3")
observable()
.concat(observable2)
.subscribe(
onNext: { print($0) }
)
.disposed(by: disposBag)
/* prints:
A
B
C
*/
이러한 특성 때문에 hot Observable의 경우 원하는 결과가 나오지 않을 수 있다.
let publishSubject1 = PublishSubject<String>()
let publishSubject2 = PublishSubject<String>()
publishSubject1
.concat(publishSubject2)
.subscribe(
onNext: { print($0) }
)
.disposed(by: disposBag)
publishSubject1.onNext("A")
publishSubject1.onNext("B")
publishSubject1.onNext("C")
//publishSubject1.onCompleted() //주석 풀면 A B C 2 출력
publishSubject2.onNext("2")
/* prints:
A
B
C
*/
추가적으로 두 Observable의 데이터 타입이 같아야 사용 가능하다.
concatMap(_:)
concatMap은 concat과 flatMap을 합친거라고 생각하면 된다.
즉, flatMap과 비슷하지만, concat의 특징과 같이 순서를 보장해준다는 특징을 갖는다.
let alphabetPublish = PublishSubject<String>()
let koreanPublish = PublishSubject<String>()
let concatMapPublish = PublishSubject<PublishSubject<String>>()
concatMapPublish
.concatMap { $0 }
.subscribe(
onNext: { print($0) }
)
.disposed(by: disposBag)
concatMapPublish.onNext(alphabetPublish)
concatMapPublish.onNext(koreanPublish)
alphabetPublish.onNext("A")
koreanPublish.onNext("가")
koreanPublish.onNext("나")
alphabetPublish.onNext("B")
alphabetPublish.onNext("C")
alphabetPublish.onCompleted()
koreanPublish.onNext("다")
/* prints:
A
B
C
다
*/
concat의 특성에 의해 alphabetPublish가 completed되지 않았기 때문에,
koreanPublish는 "가", "나"를 방출하지 못한다.
merge()
merge는 concat처럼 두 개의 Sequence를 하나로 합쳐주면서, 각 Sequence에서 요소의 방출 시점도 유지시켜 준다.
하지만, 하나의 Sequence에서 error 이벤트가 발생하는 경우 Merge Observable Sequence는 종료된다.
let alphabetPublish = PublishSubject<String>()
let koreanPublish = PublishSubject<String>()
Observable
.merge(alphabetPublish, koreanPublish)
.subscribe(
onNext: { print($0) }
)
.disposed(by: disposBag)
alphabetPublish.onNext("A")
koreanPublish.onNext("가")
koreanPublish.onNext("나")
alphabetPublish.onNext("B")
alphabetPublish.onNext("C")
koreanPublish.onNext("다")
koreanPublish.onCompleted()
alphabetPublish.onCompleted()
/* prints:
A
가
나
B
C
다
completed
*/
모든 Observable에서 completed이벤트가 발생해야만
Merge Observable Sequence도 completed된다.
아래와 같이도 merge가 가능하다.
let mergeObservable = Observable.of(alphabetPublish.asObservable(), koreanPublish.asObservable())
mergeObservable
.merge()
.subscribe(
onNext: { print($0) },
onCompleted: { print("completed") }
)
.disposed(by: disposBag)
merge(maxConcurrent:)를 통해 몇 개의 Observable을 merge할지 제한할 수 있다.
public func merge(maxConcurrent: Int)
-> Observable<Element.Element> {
MergeLimited(source: self.asObservable(), maxConcurrent: maxConcurrent)
}
이는, maxConcurrent에 도달하면, observable을 대기열에 넣고,
현재 Sequence 중 하나가 completed되면 구독을 시작한다.
combineLatest
combineLates연산자는 최대 8개까지의 Observable에서 방출되는 요소들 중 가장 최근 요소들을 결합하여 전달해주는 연산자이다.
해당 연산자는 여러 개로 오버로딩 되어 있다.
최대 8개의 Observable Sequence의 데이터를 결합하여 전달할 수 있는데,
데이터는 튜플 형태로 묶여 전달된다.
let observable1 = Observable<Int>.of(1,2,3)
let observable2 = Observable<Int>.of(4,5,6)
let observable3 = Observable<Int>.of(7,8,9)
let observable4 = Observable<Int>.of(10,11,12)
let observable5 = Observable<Int>.of(13,14,15)
let observable6 = Observable<Int>.of(16,17,18)
let observable7 = Observable<Int>.of(19,20,21)
let observable8 = Observable<Int>.of(22,23,24)
Observable
.combineLatest(observable1,observable2, observable3, observable4, observable5, observable6, observable7, observable8)
.subscribe(
onNext: { print($0) }
).disposed(by: disposBag)
/* Prints:
(1, 4, 7, 10, 13, 16, 19, 22)
(2, 4, 7, 10, 13, 16, 19, 22)
...
(3, 6, 9, 12, 15, 18, 21, 23)
(3, 6, 9, 12, 15, 18, 21, 24)
*/
또한, Collection Type으로 결합할 Observable Sequence를 전달할 수 있는데,
8개보다 더 많은 Observable Sequence를 결합할 수 있으며, 데이터는 Collection Type으로 방출된다.
Observable
.combineLatest([observable1,observable2, observable3, observable4, observable5, observable6, observable7, observable8, observable9, observable10])
.subscribe(
onNext: { print($0) }
).disposed(by: disposBag)
/* Prints:
[1, 4, 7, 10, 13, 16, 19, 22, 25, 28]
[2, 4, 7, 10, 13, 16, 19, 22, 25, 28]
...
[3, 6, 9, 12, 15, 18, 21, 24, 27, 29]
[3, 6, 9, 12, 15, 18, 21, 24, 27, 30]
*/
resultSelector라는 클로져를 통해 요소들이 결합되는 로직을 직접 구현할 수도 있다.
해당 경우에는 결합되는 Observable의 타입이 달라도 상관없다.
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
Observable
.combineLatest(subject1, subject2){
"\($0) + \($1) = \($0 + $1)"
}
.subscribe(
onNext: { print($0) }
).disposed(by: disposBag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onNext(3)
subject2.onNext(4)
/* Prints:
2 + 3 = 5
2 + 4 = 6
*/
combineLatest연산자는 로그인과 같이 n개의 요소들을 묶어 한번에 전달할 때, 유용하게 사용할 수 있다.
zip
zip연산자는 combineLatest와 같이 여러 Observable Sequence의 데이터를 결합해주는 연산자이지만,
새로운 데이터들의 튜플 조합으로 방출한다.
combineLatest와 같이, 8개의 Observable Sequence를 결합할 수 있으며,
Collection Type의 경우는 8개보다 더 많은 Sequence를 결합할 수 있다.
또한, resultSelector라는 클로저를 통해 결합되는 로직을 작성할 수 있다.
하지만, zip에서는 Observable의 타입이 같아야 결합이 가능하다.
Observable
.zip(subject1, subject2){
"\($0) + \($1) = \($0 + $1)"
}
.subscribe(
onNext: { print($0) }
).disposed(by: disposBag)
subject1.onNext(1)
subject1.onNext(2)
subject2.onNext(3)
subject2.onNext(4)
/* Prints:
1 + 3 = 4
2 + 4 = 6
*/
/* CombineLatest 결과:
2 + 3 = 5
2 + 4 = 6
*/
위와 같이 combineLatest는 최근의 element와 결합하는 반면,
zip은 다른 Sequence에서 값이 방출되기까지 기다리다가,
방출이 되면 묶어서 데이터를 전달한다.
withLastestFrom
withLatestFrom은 tigger가 되는 Observable에서 데이터가 방출되면,
target Observable의 데이터를 방출하는 연산자이다.
let trigger = PublishSubject<String>()
let target = PublishSubject<Int>()
trigger
.withLatestFrom(target)
.subscribe(
onNext: { print("\($0) 방출됨.") }
).disposed(by: disposBag)
target.onNext(1)
target.onNext(2)
trigger.onNext("방출")
trigger.onNext("방출")
target.onNext(3)
trigger.onNext("방출")
/* Prints:
2 방출됨.
2 방출됨.
3 방출됨.
*/
클로져를 통해 방출되는 데이터를 조작할 수 있다.
trigger
.withLatestFrom(target) { triggerElement, targetElement in
"\(triggerElement) \(targetElement)됨."
}
.subscribe(
onNext: { print($0) }
).disposed(by: disposBag)
/* Prints:
2 방출됨.
2 방출됨.
3 방출됨.
*/
해당 Observable은 trigger에서 completed이벤트가 발생하면 결합된 sequence가 종료되지만,
target Observable에서 completed이벤트가 발생하는 경우, 결합된 sequence가 종료되지 않는다.
sample
sample은 withLatestFrom처럼 trigger에서 방출될 때, target의 데이터를 방출하지만,
가장 최근의 데이터를 한 번만 전달한다.
withLatestFrom은 trigger에서 호출된 메서드인 반면,
sample은 target에서 호출되는 메서드이다.
target
.sample(trigger)
.subscribe(
onNext: { print("\($0) 방출됨.") },
onCompleted: ( {print("completed")} )
).disposed(by: disposBag)
target.onNext(1)
target.onNext(2)
trigger.onNext("방출")
trigger.onNext("방출")
target.onNext(3)
trigger.onNext("방출")
/* Prints:
2 방출됨.
3 방출됨.
*/
amb
amb는 ambiguous(모호한)의 약자로,
2개의 Sequence중 가장 먼저 데이터를 방출한 Sequence를 선택하고,
나머지 Sequence에 대해서는 구독을 중단한다.
amb는 둘 중 하나의 sequence를 선택하는 것이기 때문에,
두 Observable의 타입이 같아야 한다.
let one = PublishSubject<Int>()
let two = PublishSubject<Int>()
one
.amb(two)
.subscribe(
onNext: { print($0) }
).disposed(by: disposBag)
two.onNext(2)
one.onNext(1)
one.onNext(1)
one.onNext(1)
two.onNext(2)
/* Prints:
2
2
*/
switchLatest
Observable을 switch시키면서 가장 최근에 전달(switch)된 Observable의 요소를 전달하고,
이전에 구독했던 Observable은 구독을 취소한다.
Tranforming Operator에서 살펴보았던, flatMapLatest가 map + switchLatest이다.
따라서, switchLatest는 병합되는 로직을 구현할 수 있는 클로저를 제공하지 않는다.
let one = PublishSubject<Int>()
let two = PublishSubject<Int>()
let three = PublishSubject<Int>()
let source = PublishSubject<Observable<Int>>()
source
.switchLatest()
.subscribe(
onNext: { print($0) },
onCompleted: { print("completed") }
).disposed(by: disposBag)
print("--one 구독--")
source.onNext(one)
one.onNext(1)
one.onNext(1)
three.onNext(3)
print("--two 구독--")
source.onNext(two)
two.onNext(2)
one.onNext(1)
three.onNext(3)
two.onNext(2)
print("--three 구독--")
source.onNext(three)
one.onNext(1)
three.onNext(3)
print("--one 구독--")
source.onNext(one)
one.onNext(1)
three.onNext(3)
/* Prints:
--one 구독--
1
1
--two 구독--
2
2
--three 구독--
3
--one 구독--
1
*/
flatMapLatest와 동일하게,
현재 구독 중인 sequence에서 completed이벤트가 발생하고,
source를 completed 시켜야만, sequence가 종료된다.
print("--three 구독--")
source.onNext(three)
one.onNext(1)
three.onNext(3)
three.onCompleted() //구독중인 sequence 종료
source.onCompleted()
/* Prints:
--one 구독--
1
1
--two 구독--
2
2
--three 구독--
3
completed
--one 구독--
*/
reduce
Swift에서 제공되는 reduce와 Rx에서만 제공된다는 점만 제외하곤 동일하다.
아래 그림에서 보듯이, 데이터가 전달되고 completed이벤트가 발생해야만 데이터가 전달된다.
let number = PublishSubject<Int>()
number
.reduce(0, accumulator: +)
.subscribe(
onNext: { print($0) },
onCompleted: { print("completed") }
).disposed(by: disposBag)
number.onNext(1)
number.onNext(2)
number.onNext(3)
number.onNext(4)
number.onCompleted()
/* Prints:
10
completed
*/
scan
scan연산자는 reduce와 유사하지만,
completed이벤트에서 데이터를 방출하던 reduce와 달리,
데이터가 방출될 때마다 연산을 적용하여 전달한다.
number
.scan(0, accumulator: +)
.subscribe(
onNext: { print($0) },
onCompleted: { print("completed") }
).disposed(by: disposBag)
number.onNext(1)
number.onNext(2)
number.onNext(3)
number.onNext(4)
number.onCompleted()
/* Prints:
1
3
6
10
completed
*/
Reference
https://www.notion.so/Wallaby-RxSwift-72194669a39a4557baa69c672268af38
'iOS > RxSwift' 카테고리의 다른 글
[RxSwift] Relay vs. Signal vs. Driver (0) | 2023.07.25 |
---|---|
[RxSwift] Operator(4) - Share (0) | 2023.07.24 |
[RxSwift] Traits (0) | 2022.12.28 |
[RxSwift] Operator(2) - Transforming (2) | 2022.12.27 |
[RxSwift] Operator(1) - Filtering (1) | 2022.12.25 |