https://mp.weixin.qq.com/s/SA2_DsSzXmWtLHf_6so7VA
Kotlin Flow 如此受欢迎大部分归功于其丰富、简洁的操作符,巧妙使用Flow操作符可以大大简化我们的程序结构,提升可读性与可维护性
最简单的Flow
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| fun test0() { runBlocking { val flow = flow { emit("hello world ${Thread.currentThread()}") } flow.collect { Log.e("", "collect:$it ${Thread.currentThread()}") } } }
|
如上包含了两种操作符:构造操作符flow与末端操作符collect。
总结来说,flow调用流程简化为:两个操作符+两个闭包+emit函数:
- collect操作符触发调用,执行了flow的闭包
- flow闭包里调用emit函数,执行了collect闭包
Flow返回集合
1 2 3 4 5 6 7 8 9 10
| fun test01() { runBlocking { val result = mutableListOf<String>() flow { emit("hello world ${Thread.currentThread()}") }.toList(result) Log.e("", "result == $result") } }
|
Flow变换操作符
在Flow流到下游之前,对数据进行处理,处理完成后再发射出去。可以使用transform 操作符
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| fun test02() { runBlocking { flow { emit("hello world ${Thread.currentThread()}") }.transform { emit("$it man") }.collect {
Log.e("", "it == $it") } } }
|
transform还需要自己发射数据,有点麻烦,map可解君忧
1 2 3 4 5 6 7 8 9 10 11 12
| fun test03() { runBlocking { flow { emit("hello world ${Thread.currentThread()}") }.map { "$it 1" }.collect { Log.e("", "it == $it") } } }
|
map内部封装了transform。
过滤操作符
对上流的数据进行某种条件的筛选过滤
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| fun test04() { runBlocking { flow { emit("hello world ${Thread.currentThread()}") emit("fish") }.filter { it.contains("hello") }.collect { Log.e("", "it == $it") } } }
|
Flow里如何切换协程与线程
在主线程执行collect操作符,在flow闭包里执行耗时操作
需要flow闭包里的代码在子线程执行
使用flowOn操作符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| fun test05() { runBlocking { flow { Log.e("", "emit ${Thread.currentThread().name}") emit("hello world") }.flowOn(Dispatchers.IO) .collect {
Log.e("", "collect ${Thread.currentThread().name}") } } }
15:07:48.599 E emit DefaultDispatcher-worker-1 15:07:48.600 E collect main
|
flow闭包(上游),collect闭包(下游)分别执行在不同的协程以及不同的线程里
Flow处理背压
上游发射数据速度高于下游,如何提升发射效率?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| fun test06() { runBlocking { val time = measureTimeMillis { flow { Log.e("", "emit ${Thread.currentThread().name}") emit("hello world") delay(1000) emit("hello world2") }.collect { delay(2000) Log.e("", "$it") Log.e("", "collect ${Thread.currentThread().name}") } } Log.e("", "use time:$time") } }
|
使用buffer操作符解决背压问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| fun test06() { runBlocking { val time = measureTimeMillis { flow { Log.e("", "emit ${Thread.currentThread().name}") emit("hello world") delay(1000) emit("hello world2") }.buffer().collect { delay(2000) Log.e("", "$it") Log.e("", "collect ${Thread.currentThread().name}") } } Log.e("", "use time:$time") } }
|
buffer原理简单来说:
构造了新的协程执行flow闭包,上游数据会发送到Channel 缓冲区里,发送完成继续发送下一条。
collect操作符监听缓冲区是否有数据,若有则收集成功。
原理是基于ChannelFlow。
上游覆盖旧数据
上游生产速度很快,下游消费速度慢,我们只关心最新数据,旧的数据没价值可以丢掉。使用conflate操作符处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| fun test07() { runBlocking { flow { repeat(5) { emit("emit $it") delay(100) } }.conflate().collect { Log.e("", "conflate") delay(1000) Log.e("", "$it") } } }
|
中间产生的数据由于下游没有来得及消费,被上游新的数据冲刷掉了
相当于使用了buffer操作符,该buffer只能容纳一个数据,新来的数据将会覆盖旧的数据。
原理是基于ChannelFlow
Flow变换取最新值
在使用transform处理数据的时候,若是它处理比较慢,当有新的值过来后就取消未处理好的值。
使用transformLatest操作符处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| fun test08() { runBlocking { flow { repeat(5) { emit("emit $it") } Log.e("","emit ${Thread.currentThread()}") }.transformLatest { delay(1000) emit("$it fish") }.collect { Log.e("","collect ${Thread.currentThread()}") Log.e("","$it") } } }
|
map也有类似的操作符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| fun test09() { runBlocking { flow { repeat(5) { emit("emit $it") } println("emit ${Thread.currentThread()}") }.mapLatest { delay(200) "$it fish" }.collect { println("collect ${Thread.currentThread()}") println("$it") } } }
|
收集最新的数据
监听下载进度,UI展示最新进度
此种场景下,我们只是关注最新的进度,没必要频繁刷新UI,因此使用Flow实现时上游发射太快了可以忽略旧的数据
使用collectLatest操作符实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| fun test014() { runBlocking { val time = measureTimeMillis { val flow1 = flow { repeat(10000) { emit(it + 1) } } flow1.collectLatest { delay(20) Log.e("","collect progress $it") } } Log.e("","use time:$time") } }
|
展平流
flatMapConcat
依照顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| fun test010() { runBlocking { val flow1 = flow { emit("stuInfo") } flow1.flatMapConcat { flow { emit("$it teachInfo") } }.collect { Log.e("","collect $it") } } }
|
将两个Flow的数据拍平了输出
flatMapConcat 并没有涉及到多协程,使用了装饰者模式。
先将Flow2使用map进行变换,而后将Flow1、Flow2数据发射出来。
Concat顾名思义,将两个Flow连接起来。
flatMapMerge
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| fun test012() { runBlocking { val time = measureTimeMillis { val flow1 = flow { println("emit ${Thread.currentThread()}") emit("stuInfo 1") emit("stuInfo 2") emit("stuInfo 3") } flow1.flatMapMerge(4) { flow { println("flatMapMerge ${Thread.currentThread()}") emit("$it teachInfo") delay(1000) } }.collect { println("collect ${Thread.currentThread()}") println("collect $it") } } println("use time:$time") } }
|
flatMapMerge由于是并发执行,整体速度比flatMapConcat快了很多。
flatMapMerge可以指定并发的数量,当指定flatMapMerge(0)时,flatMapMerge退化为flatMapConcat
flatMapLatest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| fun test013() { runBlocking { val time = measureTimeMillis { val flow1 = flow {
emit("stuInfo 1") emit("stuInfo 2") emit("stuInfo 3") } flow1.flatMapLatest { flow {
delay(1000) emit("$it teachInfo") } }.collect {
println("collect $it") } } println("use time:$time") } }
|
组合流
combine
短的一方会等待长的一方结束后才结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| fun test015() { runBlocking { val time = measureTimeMillis { val flow1 = flow { emit("stuSex 1") emit("stuSex 2") emit("stuSex 3") } val flow2 = flow { emit("stuSubject") } flow1.combine(flow2) { sex, subject->"$sex-->$subject" }.collect { Log.e("",it) } } Log.e("","use time:$time") } }
|
flow1的每个emit和flow2的emit关联起来了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| fun test016() { runBlocking { val time = measureTimeMillis { val flow1 = flow { emit("a") emit("b") emit("c") emit("d") } val flow2 = flow { emit("1") emit("2") } flow1.combine(flow2) { sex, subject->"$sex-->$subject" }.collect { println(it) } } println("use time:$time") } }
a-->1 b-->2 c-->2 d-->2 use time:45
|
zip
只要某个Flow获取结束了就取消Flow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| fun test017() { runBlocking { val time = measureTimeMillis { val flow1 = flow { emit("a") emit("b") emit("c") emit("d") } val flow2 = flow { emit("1") emit("2") } flow1.zip(flow2) { sex, subject->"$sex-->$subject" }.collect { println(it) } } println("use time:$time") } }
a-->1 b-->2 use time:71
|
可以看出flow2先结束了,并且flow1没发送完成。
zip的特点:
短的Flow结束,另一个Flow也结束