Kotlin in Action - 17장 플로우 연산자 June 29, 2025
📖 17.1 플로우 연산자로 플로우 조작
시퀀스와 마찬가지로 플로우도 중간 연산자와 최종 연산자를 구분
중간 연산자는 코드를 실행하지 않고 변경된 플로우를 반환하며, 최종 연산자는 컬렉션, 개별 원소, 계산된 값을 반환하거나 아무 값도 반환하지 않으면서 플로우를 수집하고 실제 코드를 실행한다.
📖 17.2 중간 연산자는 업스트림 플로우에 적용되고 다운스트림 플로우를 반환한다
중간 연산자는 플로우에 적용돼 새로운 플로우를 반환
업스트림 플로우
다운스트림 플로우
중간 연산자가 반환하는 플로우
또 다른 연산자의 업스트림 플로우로 작용할 수 있음.
시퀀스와 마찬가지로 중간 연산자가 호출되더라도 플로우 코드가 실제로 실행되지는 않는다.
fun main() {
val names = flow {
emit("Jo")
emit("May")
emit("Sue")
}
val uppercasedNames = names.map {
it.uppercase()
}
runBlocking {
uppercasedNames.collect { println("$it ") }
}
}
map 함수는 업스트림 플로우를 받아 원소를 변환한 후 다운스트림 플로우에 원소를 배출 할 수 있다.
fun main() {
val names = flow {
emit("Jo")
emit("May")
emit("Sue")
}
val upperAndLowercasedNames = names.transform {
emit(it.uppercase())
emit(it.lowercase())
}
runBlocking {
upperAndLowercasedNames.collect { println("$it ") }
}
}
하나 이상의 원소를 배출하고 싶을 때는 transform 함수 사용
flowOf 함수로 줄임표현 사용 가능
🔖 17.2.2 take나 관련 연산자는 플로우를 취소할 수 있다
takeWhile 같은 함수들을 플로우에서도 똑같이 쓸 수 있다.
이런 연산자를 사용하면 연산자가 지정한 조건이 더 이상 유효하지 않을 때 업스트림 플로우가 취소되며, 더 이상 원소가 배출되지 않는다.
fun main() {
val temps = getTemperatures()
temps
.take(5)
.collect {
log(it)
}
}
take 함수는 수집자와 관련된 코루틴 스코프를 취소하는 방식 외에 플로우 수집을 제어된 방식으로 취소하는 또 다른 방법
🔖 17.2.3 플로우의 각 단계 후킹: onStart, onEach, onCompletion, onEmpty
fun main() {
val temps = getTemperatures()
temps
.take(5)
.onCompletion { cause ->
if (cause != null) {
println("An error occurred! $cause")
} else {
println("Completed!")
}
}
.collect {
println(it)
}
}
원소 수집 후 종료되는 것을 확인하기 위해 onCompletion 연산자를 사용할 수 있다.
플로우가 정상 종료되거나, 취소되거나, 예외로 종료된 후에 호출되는 람다를 지정할 수 있게 해준다.
fun main() {
flow
.onEmpty {
println("Nothing - emitting default value!")
emit(0)
}
.onStart {
println("Starting")
}
.onEach {
println("On $it")
}
.onCompletion {
println("Done!")
}
.collect()
}
onCompletion은 플로우 생명주기의 특정 단계에서 작업을 수행할 수 있는 중간 연산자
onStart는 플로우의 수집이 시작될 때 첫 번째 배출이 일어나기 전에 실행
onEach는 업스트림 플로우에서 배출된 각 원소에 대해 작업을 수행한 후 다운스트림 플로우에 전달
onEmpty는 로직을 추가로 수행하거나 기본값을 제공
🔖 17.2.4 다운스트림 연산자와 수집자를 위한 원소 버퍼링: buffer 연산자
fun getAllUserIds(): Flow<Int> {
return flow {
repeat(3) {
delay(200.milliseconds)
log("Emitting!")
emit(it)
}
}
}
suspend fun getProfileFromNetwork(id: Int): String {
delay(2.seconds)
return "Profile[$id]"
}
fun main() {
val ids = getAllUserIds()
runBlocking {
ids
.map { getProfileFromNetwork(it) }
.collect { log("Got $it") }
}
}
느린 데이터베이스에 접근해 사용자 식별자의 플로우를 얻어오는 과정
값 생산자는 수집자가 이전 원소를 처리할 때까지 작업을 중단
0 [main @coroutine#1] Emitting!
2028 [main @coroutine#1] Got Profile[0]
2234 [main @coroutine#1] Emitting!
4240 [main @coroutine#1] Got Profile[1]
4447 [main @coroutine#1] Emitting!
6453 [main @coroutine#1] Got Profile[2]
ID의 배출과 프로필 요청이 뒤섞여 있다.
원소가 배출되면 다운스트림 플로우가 해당 원소를 처리할 때까지 생산자 코드는 계속되지 않는다.
fun main() {
val ids = getAllUserIds()
runBlocking {
ids
.buffer(3)
.map { getProfileFromNetwork(it) }
.collect { log("Got $it") }
}
}
0 [main @coroutine#2] Emitting!
213 [main @coroutine#2] Emitting!
414 [main @coroutine#2] Emitting!
2030 [main @coroutine#1] Got Profile[0]
4040 [main @coroutine#1] Got Profile[1]
6046 [main @coroutine#1] Got Profile[2]
buffer 연산자는 버퍼를 추가해서 다운스트림 플로우가 이미 배출된 원소를 처리하느라 바쁜 동안에도 업스트림 플로우가 원소를 배출할 수 있게 해준다.
버퍼를 추가했을 때 실행 시간이 줄어들었다.
onBufferOverflow 파라미터를 통해 버퍼 용량이 초과될 때 어떤 일이 발생할지 지정할 수 있다.
생산자 대기(SUSPEND)
오래된 값 버리기(DROP_OLDEST)
추가 중인 마지막 값을 버릴지(DROP_LATEST)
🔖 17.2.5 중간값을 버리는 연산자: conflate 연산자
fun main() {
runBlocking {
val temps = getTemperatures()
temps
.onEach {
log("Read $it from sensor")
}
.conflate()
.collect {
log("Collected $it")
delay(1.seconds)
}
}
}
142709 [main @coroutine#1] Collected 6
142864 [main @coroutine#2] Read 28 from sensor
143370 [main @coroutine#2] Read 24 from sensor
143711 [main @coroutine#1] Collected 24
143876 [main @coroutine#2] Read -2 from sensor
144382 [main @coroutine#2] Read 1 from sensor
144715 [main @coroutine#1] Collected 1
144888 [main @coroutine#2] Read 1 from sensor
145391 [main @coroutine#2] Read 29 from sensor
145719 [main @coroutine#1] Collected 29
145893 [main @coroutine#2] Read 9 from sensor
146396 [main @coroutine#2] Read 1 from sensor
146725 [main @coroutine#1] Collected 1
값 생산자가 방해받지 않고 작업을 계속할 수 있게 하는 또 다른 방법은 수집자가 바쁜동안 배출된 항목을 그냥 버리는 것
다운스트림의 수집자에서는 중간 원소가 버려진다.
conflate를 쓰면 업스트림 플로우의 실행을 다운스트림 연산자의 실행과 분리할 수 있다.
느린 수집자가 플로우에서 최신 원소만 처리하게 함으로써 성능을 유지할 수 있다.
🔖 17.2.6 일정 시간 동안 값을 필터링하는 연산자: debounce 연산자
val searchQuery = flow {
emit("K")
delay(100.milliseconds)
emit("Ko")
delay(200.milliseconds)
emit("Kotl")
delay(500.milliseconds)
emit("Kotlin")
}
사용자가 검색 질의 문자열을 타이핑하는 것을 시뮬레이션
일정시간동안 입력이 없으면 검색 결과 표시
fun main() = runBlocking {
searchQuery
.debounce(250.milliseconds)
.collect {
log("Searching for $it")
}
}
0 [main @coroutine#1] Searching for Kotl
247 [main @coroutine#1] Searching for Kotlin
debounce 연산자는 업스트림에서 원소가 배출되지 않은 상태로 정해진 타임아웃 시간이 지나야만 항목을 다운스트림 플로우로 배출
🔖 17.2.7 플로우가 실행되는 코루틴 콘텍스트를 바꾸기: flowOn 연산자
fun main() {
runBlocking {
flowOf(1)
.onEach { log("A") }
.flowOn(Dispatchers.Default)
.onEach { log("B") }
.flowOn(Dispatchers.IO)
.onEach { log("C") }
.collect()
}
}
0 [DefaultDispatcher-worker-3 @coroutine#3] A
24 [DefaultDispatcher-worker-1 @coroutine#2] B
26 [main @coroutine#1] C
flowOn 연산자는 처리 파이프라인의 일부를 다른 디스패처나 다른 코루틴 콘텍스트에서 힐행할 수 있다.
flowOn 연산자는 업스트림 플로우의 디스패처에만 영향을 미친다.
📖 17.3 커스텀 중간 연산자 만들기
일반적으로 중간 연산자는 동시에 수집자와 생산자의 역할을 한다.
fun Flow<Double>.averageOfLast(n: Int): Flow<Double> =
flow {
val numbers = mutableListOf<Double>()
collect {
if (numbers.size > n) {
numbers.removeFirst()
}
numbers.add(it)
emit(numbers.average())
}
}
fun main() = runBlocking {
flowOf(1.0, 2.0, 30.0, 121.0)
.averageOfLast(3)
.collect {
print("$it ")
}
}
내부적으로 발생한 숫자의 리스트를 유지하면서 만난 값들의 평균을 배출할 수 있다.
최적화는 밖에 드러나지 않고 코드의 행동방식에는 영향을 미치지 않는다.
📖 17.4 최종 연산자는 업스트림 플로우를 실행하고 값을 계산한다
최종 연산자는 단일 값이나 값의 컬렉션을 계산하거나, 플로우의 실행을 촉발시켜 지정된 연산과 부수 효과를 수행한다.
fun main() = runBlocking {
getTemperatures()
.onEach {
log(it)
}
.collect()
}
collect는 플로우의 각 원소에 대해 실행할 람다를 지정할 수 있는 유용한 지름길을 제공
최종 연산자는 업스트림 플로우의 실행을 담당하기 때문에 항상 일시 중단 함수다.
fun main() = runBlocking {
getTemperatures()
.first()
}
first나 firstOrNull 같은 최종연산자는 원소를 받은 다음에 업스트림 플로우를 취소할 수 있다.
🔖 17.4.1 프레임워크는 커스텀 연산자를 제공한다
@Composable fun TemperatureDisplay(temps: Flow<Int>) {
val temperature = temps.collectAsState(null)
Box {
temperature.value?.let {
Text("The current temperature is $it!")
}
}
}
텍스트를 상자 안에 넣는 간단한 코드
플로우는 코루틴 기반 툴킷에 강력한 추가 기능을 제공
코틀린 생태계의 어떤 프레임워크들은 플로우와 직접적인 통합을 제공하며, 커스텀 연산자와 변환 함수도 노출