Kotlin Flow 操作符

https://mp.weixin.qq.com/s/SA2_DsSzXmWtLHf_6so7VA

Kotlin Flow 如此受欢迎大部分归功于其丰富、简洁的操作符,巧妙使用Flow操作符可以大大简化我们的程序结构,提升可读性与可维护性

最简单的Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun test0() {
runBlocking {
//构造flow
val flow = flow {
//上游
emit("hello world ${Thread.currentThread()}")
}
//收集flow
flow.collect {
//下游
Log.e("", "collect:$it ${Thread.currentThread()}")
}
}
}

如上包含了两种操作符:构造操作符flow与末端操作符collect。

总结来说,flow调用流程简化为:两个操作符+两个闭包+emit函数

  1. collect操作符触发调用,执行了flow的闭包
  2. flow闭包里调用emit函数,执行了collect闭包

Flow返回集合

1
2
3
4
5
6
7
8
9
10
fun test01() {
runBlocking {
val result = mutableListOf<String>()
flow {
//上游
emit("hello world ${Thread.currentThread()}")
}.toList(result)
Log.e("", "result == $result")
}
}

Flow变换操作符

在Flow流到下游之前,对数据进行处理,处理完成后再发射出去。可以使用transform 操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    fun test02() {
runBlocking {
flow {
//上游
emit("hello world ${Thread.currentThread()}")
}.transform {
emit("$it man")
}.collect {
// println("$it")
Log.e("", "it == $it")
}
}
}

transform还需要自己发射数据,有点麻烦,map可解君忧

1
2
3
4
5
6
7
8
9
10
11
12
fun test03() {
runBlocking {
flow {
//上游
emit("hello world ${Thread.currentThread()}")
}.map {
"$it 1"
}.collect {
Log.e("", "it == $it")
}
}
}

map内部封装了transform。

过滤操作符

对上流的数据进行某种条件的筛选过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun test04() {
runBlocking {
flow {
//上游
emit("hello world ${Thread.currentThread()}")
emit("fish")
}.filter {
//包含hello字符串才继续往下发送
it.contains("hello")
}.collect {
Log.e("", "it == $it")
}
}
}

Flow里如何切换协程与线程

在主线程执行collect操作符,在flow闭包里执行耗时操作
需要flow闭包里的代码在子线程执行
使用flowOn操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    fun test05() {
runBlocking {
flow {
//上游
Log.e("", "emit ${Thread.currentThread().name}")
emit("hello world")
}.flowOn(Dispatchers.IO)//flowOn 之前的操作符在新协程里执行
.collect {
// Log.e("", "it == $it")
Log.e("", "collect ${Thread.currentThread().name}")
}
}
}

15:07:48.599 E emit DefaultDispatcher-worker-1
15:07:48.600 E collect main

flow闭包(上游),collect闭包(下游)分别执行在不同的协程以及不同的线程里

Flow处理背压

上游发射数据速度高于下游,如何提升发射效率?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun test06() {
runBlocking {
val time = measureTimeMillis {
flow {
//上游
Log.e("", "emit ${Thread.currentThread().name}")
emit("hello world")
delay(1000)
emit("hello world2")
}.collect {
delay(2000)
Log.e("", "$it")
Log.e("", "collect ${Thread.currentThread().name}")
}
}
Log.e("", "use time:$time")
}
}

使用buffer操作符解决背压问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun test06() {
runBlocking {
val time = measureTimeMillis {
flow {
//上游
Log.e("", "emit ${Thread.currentThread().name}")
emit("hello world")
delay(1000)
emit("hello world2")
}.buffer().collect {
delay(2000)
Log.e("", "$it")
Log.e("", "collect ${Thread.currentThread().name}")
}
}
Log.e("", "use time:$time")
}
}

buffer原理简单来说:

构造了新的协程执行flow闭包,上游数据会发送到Channel 缓冲区里,发送完成继续发送下一条。
collect操作符监听缓冲区是否有数据,若有则收集成功。
原理是基于ChannelFlow。

上游覆盖旧数据

上游生产速度很快,下游消费速度慢,我们只关心最新数据,旧的数据没价值可以丢掉。使用conflate操作符处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun test07() {
runBlocking {
flow {
//上游
repeat(5) {
emit("emit $it")
delay(100)
}
}.conflate().collect {
Log.e("", "conflate")
delay(1000)
Log.e("", "$it")
}
}
}

中间产生的数据由于下游没有来得及消费,被上游新的数据冲刷掉了

相当于使用了buffer操作符,该buffer只能容纳一个数据,新来的数据将会覆盖旧的数据。
原理是基于ChannelFlow

Flow变换取最新值

在使用transform处理数据的时候,若是它处理比较慢,当有新的值过来后就取消未处理好的值。

使用transformLatest操作符处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun test08() {
runBlocking {
flow {
//上游,协程1
repeat(5) {
emit("emit $it")
}
Log.e("","emit ${Thread.currentThread()}")
}.transformLatest {
//协程2
delay(1000)
emit("$it fish")
}.collect {
Log.e("","collect ${Thread.currentThread()}")
Log.e("","$it")
}
}
}

map也有类似的操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun test09() {
runBlocking {
flow {
//上游
repeat(5) {
emit("emit $it")
}
println("emit ${Thread.currentThread()}")
}.mapLatest {
delay(200)
"$it fish"
}.collect {
println("collect ${Thread.currentThread()}")
println("$it")
}
}
}

收集最新的数据

监听下载进度,UI展示最新进度
此种场景下,我们只是关注最新的进度,没必要频繁刷新UI,因此使用Flow实现时上游发射太快了可以忽略旧的数据
使用collectLatest操作符实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun test014() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
repeat(10000) {
emit(it + 1)
}
}
flow1.collectLatest {
delay(20)
Log.e("","collect progress $it")
}
}
Log.e("","use time:$time")
}
}

展平流

flatMapConcat
依照顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun test010() {
runBlocking {
val flow1 = flow {
emit("stuInfo")
}
flow1.flatMapConcat {
//flow2
flow {
emit("$it teachInfo")
}
}.collect {
Log.e("","collect $it")
}
}
}

将两个Flow的数据拍平了输出
flatMapConcat 并没有涉及到多协程,使用了装饰者模式。
先将Flow2使用map进行变换,而后将Flow1、Flow2数据发射出来。
Concat顾名思义,将两个Flow连接起来。

flatMapMerge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun test012() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
println("emit ${Thread.currentThread()}")
emit("stuInfo 1")
emit("stuInfo 2")
emit("stuInfo 3")
}
flow1.flatMapMerge(4) {
//flow2
flow {
println("flatMapMerge ${Thread.currentThread()}")
emit("$it teachInfo")
delay(1000)
}
}.collect {
println("collect ${Thread.currentThread()}")
println("collect $it")
}
}
println("use time:$time")
}
}

flatMapMerge由于是并发执行,整体速度比flatMapConcat快了很多。
flatMapMerge可以指定并发的数量,当指定flatMapMerge(0)时,flatMapMerge退化为flatMapConcat

flatMapLatest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun test013() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
// println("emit ${Thread.currentThread()}")
emit("stuInfo 1")
emit("stuInfo 2")
emit("stuInfo 3")
}
flow1.flatMapLatest {
//flow2
flow {
// println("flatMapLatest ${Thread.currentThread()}")
delay(1000)
emit("$it teachInfo")
}
}.collect {
// println("collect ${Thread.currentThread()}")
println("collect $it")
}
}
println("use time:$time")
}
}

组合流

combine

短的一方会等待长的一方结束后才结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun test015() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
emit("stuSex 1")
emit("stuSex 2")
emit("stuSex 3")
}
val flow2 = flow {
emit("stuSubject")
}
flow1.combine(flow2) {
sex, subject->"$sex-->$subject"
}.collect {
Log.e("",it)
}
}
Log.e("","use time:$time")
}
}

flow1的每个emit和flow2的emit关联起来了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun test016() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
emit("a")
emit("b")
emit("c")
emit("d")
}
val flow2 = flow {
emit("1")
emit("2")
}
flow1.combine(flow2) {
sex, subject->"$sex-->$subject"
}.collect {
println(it)
}
}
println("use time:$time")
}
}
//打印结果
a-->1
b-->2
c-->2
d-->2
use time:45

zip

只要某个Flow获取结束了就取消Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun test017() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
emit("a")
emit("b")
emit("c")
emit("d")
}
val flow2 = flow {
emit("1")
emit("2")
}
flow1.zip(flow2) {
sex, subject->"$sex-->$subject"
}.collect {
println(it)
}
}
println("use time:$time")
}
}
//打印结果
a-->1
b-->2
use time:71

可以看出flow2先结束了,并且flow1没发送完成。
zip的特点:
短的Flow结束,另一个Flow也结束