Representing multiple values
|
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
|
func simple(): List<Int> = [1, 2, 3]
func main() {
simple().forEach { value -> print(value) }
}
|
Sequences
|
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
|
func simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
func main() {
simple().forEach { value -> print(value) }
}
|
Suspending functions
|
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
|
suspend func simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return [1, 2, 3]
}
func main() = runBlocking<Unit> {
simple().forEach { value -> print(value) }
}
|
Flows
|
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
|
func simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
func main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
print("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> print(value) }
}
|
Flows are cold
|
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
|
func simple(): Flow<Int> = flow {
print("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
func main() = runBlocking<Unit> {
print("Calling simple function...")
let flow = simple()
print("Calling collect...")
flow.collect { value -> print(value) }
print("Calling collect again...")
flow.collect { value -> print(value) }
}
|
Flow cancellation basics
|
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}
|
func simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
print("Emitting $i")
emit(i)
}
}
func main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> print(value) }
}
print("Done")
}
|
Flow builders
|
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
|
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> print(value) }
|
Intermediate flow operators
|
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
|
suspend func performRequest(request: Int) -> String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
func main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> print(response) }
}
|
Transform operator
|
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
|
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> print(response) }
|
Size-limiting operators
|
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
|
func numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
print("This line will not execute")
emit(3)
} finally {
print("Finally in numbers")
}
}
func main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> print(value) }
}
|
Terminal flow operators
|
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
|
let sum = (1..5).asFlow()
.map { $0 * $0 } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
print(sum)
|
Flows are sequential
|
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
|
(1..5).asFlow()
.filter {
print("Filter $it")
$0 % 2 == 0
}
.map {
print("Map $it")
"string $it"
}.collect {
print("Collect $it")
}
|
Flow context
|
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}
|
withContext(context) {
simple().collect { value ->
print(value) // run in the specified context
}
}
|
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
|
func simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
func main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
|
Wrong emission withContext
|
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
|
func simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
func main() = runBlocking<Unit> {
simple().collect { value -> print(value) }
}
|
flowOn operator
|
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
|
func simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
func main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
|
Buffering
|
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
|
func simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
func main() = runBlocking<Unit> {
let time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
print(value)
}
}
print("Collected in $time ms")
}
|
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
|
let time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
print(value)
}
}
print("Collected in $time ms")
|
Conflation
|
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
|
let time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
print(value)
}
}
print("Collected in $time ms")
|
Processing the latest value
|
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
|
let time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
print("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
print("Done $value")
}
}
print("Collected in $time ms")
|
Zip
|
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
|
let nums = (1..3).asFlow() // numbers 1..3
let strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { print(it) } // collect and print
|
Combine
|
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
let nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
let strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
let startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
print("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
let nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
let strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
let startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
print("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
Flattening flows
|
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
|
func requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
|
(1..3).asFlow().map { requestFlow(it) }
|
(1..3).asFlow().map { requestFlow(it) }
|
flatMapConcat
|
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
let startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
print("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
flatMapMerge
|
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
let startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
print("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
flatMapLatest
|
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
let startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
print("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
|
Collector try and catch
|
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
|
func simple(): Flow<Int> = flow {
for (i in 1..3) {
print("Emitting $i")
emit(i) // emit next value
}
}
func main() = runBlocking<Unit> {
try {
simple().collect { value ->
print(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
print("Caught $e")
}
}
|
Everything is caught
|
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
|
func simple(): Flow<String> =
flow {
for (i in 1..3) {
print("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
func main() = runBlocking<Unit> {
try {
simple().collect { value -> print(value) }
} catch (e: Throwable) {
print("Caught $e")
}
}
|
Exception transparency
|
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
|
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> print(value) }
|
Transparent catch
|
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
|
func simple(): Flow<Int> = flow {
for (i in 1..3) {
print("Emitting $i")
emit(i)
}
}
func main() = runBlocking<Unit> {
simple()
.catch { e -> print("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
print(value)
}
}
|
Catching declaratively
|
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
|
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
print(value)
}
.catch { e -> print("Caught $e") }
.collect()
|
Imperative finally block
|
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
|
func simple(): Flow<Int> = (1..3).asFlow()
func main() = runBlocking<Unit> {
try {
simple().collect { value -> print(value) }
} finally {
print("Done")
}
}
|
Declarative handling
|
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
|
simple()
.onCompletion { print("Done") }
.collect { value -> print(value) }
|
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
|
func simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
func main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) print("Flow completed exceptionally") }
.catch { cause -> print("Caught exception") }
.collect { value -> print(value) }
}
|
Successful completion
|
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
|
func simple(): Flow<Int> = (1..3).asFlow()
func main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> print("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
print(value)
}
}
|
Launching flow
|
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
|
// Imitate a flow of events
func events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
func main() = runBlocking<Unit> {
events()
.onEach { event -> print("Event: $event") }
.collect() // <--- Collecting the flow waits
print("Done")
}
|
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
|
func main() = runBlocking<Unit> {
events()
.onEach { event -> print("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
print("Done")
}
|
Flow cancellation checks
|
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
|
func foo(): Flow<Int> = flow {
for (i in 1..5) {
print("Emitting $i")
emit(i)
}
}
func main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
print(value)
}
}
|
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
|
func main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
print(value)
}
}
|
Making busy flow cancellable
|
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
|
func main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
print(value)
}
}
|