01. of、just、from
of
: 将一系列元素转换为Observable
并发出;
just
: 只能发出一个元素;
from
: 将数组中一系列元素转换为Observable
并发出;
仅发出一个元素时,三种用法等价:
func test_of_from_just() { Observable.of(1) Observable.of(1, 2, 3, 4) Observable.just(1) Observable.from([1]) Observable.from([1, 2, 3, 4]) }
02. take相关
1. 通过take
操作符可以只发出头n
个元素,并且忽略掉后面的元素,直接结束序列;
func testTake() { let observable = Observable.of(1, 3, 5, 7) observable.take(2).subscribe(onNext: { (value) in Logger(value) }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 1 Log - 3 Log - onCompleted end
2. take(while: Element -> Bool)
操作符将Observable
直到某个元素的判定为false
,这个Observable
将立即终止;
func testTakeWhile() { let observable = Observable.of(7, 3, 2, 1) observable.take(while: { $0 > 2 }).subscribe(onNext: { (value) in Logger(value) }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 7 Log - 3 Log - onCompleted end
3. take(until: Element -> Bool)
操作符将镜像源Observable
,它同时观测第二个Observable
,一旦第二个Observable
发出一个元素或者产生一个终止事件,那个镜像的Observable
将立即终止;
func testTakeUntil() { let observable = Observable.of(1, 3, 5, 7) observable.take(until: { $0 > 3}).subscribe(onNext: { (value) in Logger(value) }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 1 Log - 3 Log - onCompleted end
03. skip相关
1. skip
操作符跳过Observable
中头n
个元素,只关注后面的元素;
func testSkip() { let observable = Observable.of(1, 3, 5, 7) observable.skip(2).subscribe(onNext: { (value) in Logger(value) }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 5 Log - 7 Log - onCompleted end
2. skip(while: Element -> Bool)
操作符可以让你忽略源Observable
中头几个元素,直到元素的判定为否后,它才镜像源Observable
;
func testSkipWhile() { let observable = Observable.of(7, 5, 1, 3) observable.skip(while: { $0 > 3 }).subscribe(onNext: { (value) in Logger(value) }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 1 Log - 3 Log - onCompleted end
3. skipUntil
操作符可以让你忽略源Observable
中头几个元素,直到另一个Observable
发出一个元素后,它才镜像源Observable
;
func testSkipUntil() { let source = PublishSubject<Int>() let refSource = PublishSubject<Int>() source.skip(until: refSource).subscribe(onNext: { (value) in Logger(value) }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) source.onNext(1) source.onNext(2) refSource.onNext(3) source.onNext(4) source.onNext(5) }
运行结果:
Log - 4 Log - 5
04. materialize
通常,一个Observable
将产生零个或者多个onNext
事件,然后产生一个onCompleted
或者onError
事件;
materialize
操作符将Observable
产生的这些事件全部转换成元素,然后发送出来;
func testMaterialize() { let observable = Observable.of(1, 3, 5, 7) observable.materialize().subscribe(onNext: { (event) in //正常情况下这里应该是一个值,使用materialize后这里都转换为了Event switch event { case .next(let value): Logger(value) break case .completed: Logger("completed") break default: Logger("defalut value") } }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 1 Log - 3 Log - 5 Log - 7 Log - completed Log - onCompleted end
05. withLatestFrom
1. withLatestFrom(_:)
当第一个Observable
发出一个元素时,就立即取出第二个Observable
中最新的元素(前提是第二个发出过元素),然后把第二个Observable
中最新的元素发送出去;
func testWithLatestFrom() { let source = PublishSubject<Int>() let refSource = PublishSubject<Int>() source.withLatestFrom(refSource).subscribe(onNext: { (value) in Logger("\(value)") }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) source.onNext(1) source.onNext(2) refSource.onNext(3) refSource.onNext(4) source.onNext(5) source.onNext(6) }
运行结果:
Log - 4 Log - 4
2. withLatestFrom(_:resultSelector:)
操作符当第一个Observable
发出一个元素时,就立即取出第二个Observable
中最新的元素(前提是第二个发出过元素),将第一个Observable
中最新的元素first
和第二个Observable
中最新的元素second
组合,然后把组合结果first+second
发送出去;
func testWithLatestFromResultSelector() { let source = PublishSubject<Int>() let refSource = PublishSubject<Int>() source.withLatestFrom(refSource) { (first, second) -> Int in return first + second }.subscribe(onNext: { (value) in Logger("\(value)") }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) source.onNext(1) source.onNext(2) refSource.onNext(3) refSource.onNext(4) source.onNext(5) source.onNext(6) }
运行结果:
Log - 9 Log - 10
06. timer
timer
操作符将创建一个Observable
,它在经过设定的一段时间后,产生唯一的一个元素
func testTimer() { let relay = Observable<Int>.timer(.seconds(1), scheduler: MainScheduler.instance) relay.subscribe(onNext: { (value) in Logger("\(value) - \(Thread.current)") }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 0 - <NSThread: 0x600002778980>{number = 1, name = main} Log - onCompleted end
07. interval
interval
(定时器)操作符将创建一个Observable
,它每隔一段设定的时间,发出一个索引数的元素,它将发出无数个元素;
func testInterval() { let relay = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) relay.subscribe(onNext: { (value) in Logger("\(value) - \(Thread.current)") }, onCompleted: { Logger("onCompleted end") }).disposed(by: disposeBag) }
运行结果:
Log - 0 - <NSThread: 0x600002138400>{number = 1, name = main} Log - 1 - <NSThread: 0x600002138400>{number = 1, name = main} Log - 2 - <NSThread: 0x600002138400>{number = 1, name = main} ...