BottleH Blog

Kotlin in Action - 17장 플로우 연산자

    Tags

  • Kotlin
Kotlin in Action - 17장 플로우 연산자 thumbnail

📖 17.1 플로우 연산자로 플로우 조작

  • 시퀀스와 마찬가지로 플로우도 중간 연산자와 최종 연산자를 구분
  • 중간 연산자는 코드를 실행하지 않고 변경된 플로우를 반환하며, 최종 연산자는 컬렉션, 개별 원소, 계산된 값을 반환하거나 아무 값도 반환하지 않으면서 플로우를 수집하고 실제 코드를 실행한다.

📖 17.2 중간 연산자는 업스트림 플로우에 적용되고 다운스트림 플로우를 반환한다

  • 중간 연산자는 플로우에 적용돼 새로운 플로우를 반환
  • 업스트림 플로우
    • 연산자가 적용되는 플로우
  • 다운스트림 플로우
    • 중간 연산자가 반환하는 플로우
    • 또 다른 연산자의 업스트림 플로우로 작용할 수 있음.
  • 시퀀스와 마찬가지로 중간 연산자가 호출되더라도 플로우 코드가 실제로 실행되지는 않는다.
    • 콜드 상태

🔖 17.2.1 업스트림 원소별로 임의의 값을 배출: transform 함수

fun main() { val names = flow { emit("Jo") emit("May") emit("Sue") } val uppercasedNames = names.map { it.uppercase() } runBlocking { uppercasedNames.collect { println("$it ") } } }
  • map 함수는 업스트림 플로우를 받아 원소를 변환한 후 다운스트림 플로우에 원소를 배출 할 수 있다.
fun main() { val names = flow { emit("Jo") emit("May") emit("Sue") } val upperAndLowercasedNames = names.transform { emit(it.uppercase()) emit(it.lowercase()) } runBlocking { upperAndLowercasedNames.collect { println("$it ") } } }
  • 하나 이상의 원소를 배출하고 싶을 때는 transform 함수 사용
  • flowOf 함수로 줄임표현 사용 가능

🔖 17.2.2 take나 관련 연산자는 플로우를 취소할 수 있다

  • takeWhile 같은 함수들을 플로우에서도 똑같이 쓸 수 있다.
  • 이런 연산자를 사용하면 연산자가 지정한 조건이 더 이상 유효하지 않을 때 업스트림 플로우가 취소되며, 더 이상 원소가 배출되지 않는다.
fun main() { val temps = getTemperatures() temps .take(5) .collect { log(it) } }
  • take 함수는 수집자와 관련된 코루틴 스코프를 취소하는 방식 외에 플로우 수집을 제어된 방식으로 취소하는 또 다른 방법

🔖 17.2.3 플로우의 각 단계 후킹: onStart, onEach, onCompletion, onEmpty

fun main() { val temps = getTemperatures() temps .take(5) .onCompletion { cause -> if (cause != null) { println("An error occurred! $cause") } else { println("Completed!") } } .collect { println(it) } }
  • 원소 수집 후 종료되는 것을 확인하기 위해 onCompletion 연산자를 사용할 수 있다.
    • 플로우가 정상 종료되거나, 취소되거나, 예외로 종료된 후에 호출되는 람다를 지정할 수 있게 해준다.
fun main() { flow .onEmpty { println("Nothing - emitting default value!") emit(0) } .onStart { println("Starting") } .onEach { println("On $it") } .onCompletion { println("Done!") } .collect() }
  • onCompletion은 플로우 생명주기의 특정 단계에서 작업을 수행할 수 있는 중간 연산자
  • onStart는 플로우의 수집이 시작될 때 첫 번째 배출이 일어나기 전에 실행
  • onEach는 업스트림 플로우에서 배출된 각 원소에 대해 작업을 수행한 후 다운스트림 플로우에 전달
  • onEmpty는 로직을 추가로 수행하거나 기본값을 제공

🔖 17.2.4 다운스트림 연산자와 수집자를 위한 원소 버퍼링: buffer 연산자

fun getAllUserIds(): Flow<Int> { return flow { repeat(3) { delay(200.milliseconds) log("Emitting!") emit(it) } } } suspend fun getProfileFromNetwork(id: Int): String { delay(2.seconds) return "Profile[$id]" } fun main() { val ids = getAllUserIds() runBlocking { ids .map { getProfileFromNetwork(it) } .collect { log("Got $it") } } }
  • 느린 데이터베이스에 접근해 사용자 식별자의 플로우를 얻어오는 과정
  • 값 생산자는 수집자가 이전 원소를 처리할 때까지 작업을 중단
0 [main @coroutine#1] Emitting! 2028 [main @coroutine#1] Got Profile[0] 2234 [main @coroutine#1] Emitting! 4240 [main @coroutine#1] Got Profile[1] 4447 [main @coroutine#1] Emitting! 6453 [main @coroutine#1] Got Profile[2]
  • ID의 배출과 프로필 요청이 뒤섞여 있다.
  • 원소가 배출되면 다운스트림 플로우가 해당 원소를 처리할 때까지 생산자 코드는 계속되지 않는다.
fun main() { val ids = getAllUserIds() runBlocking { ids .buffer(3) .map { getProfileFromNetwork(it) } .collect { log("Got $it") } } } 0 [main @coroutine#2] Emitting! 213 [main @coroutine#2] Emitting! 414 [main @coroutine#2] Emitting! 2030 [main @coroutine#1] Got Profile[0] 4040 [main @coroutine#1] Got Profile[1] 6046 [main @coroutine#1] Got Profile[2]
  • buffer 연산자는 버퍼를 추가해서 다운스트림 플로우가 이미 배출된 원소를 처리하느라 바쁜 동안에도 업스트림 플로우가 원소를 배출할 수 있게 해준다.
  • 버퍼를 추가했을 때 실행 시간이 줄어들었다.
  • onBufferOverflow 파라미터를 통해 버퍼 용량이 초과될 때 어떤 일이 발생할지 지정할 수 있다.
    • 생산자 대기(SUSPEND)
    • 오래된 값 버리기(DROP_OLDEST)
    • 추가 중인 마지막 값을 버릴지(DROP_LATEST)

🔖 17.2.5 중간값을 버리는 연산자: conflate 연산자

fun main() { runBlocking { val temps = getTemperatures() temps .onEach { log("Read $it from sensor") } .conflate() .collect { log("Collected $it") delay(1.seconds) } } } 142709 [main @coroutine#1] Collected 6 142864 [main @coroutine#2] Read 28 from sensor 143370 [main @coroutine#2] Read 24 from sensor 143711 [main @coroutine#1] Collected 24 143876 [main @coroutine#2] Read -2 from sensor 144382 [main @coroutine#2] Read 1 from sensor 144715 [main @coroutine#1] Collected 1 144888 [main @coroutine#2] Read 1 from sensor 145391 [main @coroutine#2] Read 29 from sensor 145719 [main @coroutine#1] Collected 29 145893 [main @coroutine#2] Read 9 from sensor 146396 [main @coroutine#2] Read 1 from sensor 146725 [main @coroutine#1] Collected 1
  • 값 생산자가 방해받지 않고 작업을 계속할 수 있게 하는 또 다른 방법은 수집자가 바쁜동안 배출된 항목을 그냥 버리는 것
  • 다운스트림의 수집자에서는 중간 원소가 버려진다.
  • conflate를 쓰면 업스트림 플로우의 실행을 다운스트림 연산자의 실행과 분리할 수 있다.
  • 느린 수집자가 플로우에서 최신 원소만 처리하게 함으로써 성능을 유지할 수 있다.

🔖 17.2.6 일정 시간 동안 값을 필터링하는 연산자: debounce 연산자

val searchQuery = flow { emit("K") delay(100.milliseconds) emit("Ko") delay(200.milliseconds) emit("Kotl") delay(500.milliseconds) emit("Kotlin") }
  • 사용자가 검색 질의 문자열을 타이핑하는 것을 시뮬레이션
  • 일정시간동안 입력이 없으면 검색 결과 표시
fun main() = runBlocking { searchQuery .debounce(250.milliseconds) .collect { log("Searching for $it") } } 0 [main @coroutine#1] Searching for Kotl 247 [main @coroutine#1] Searching for Kotlin
  • debounce 연산자는 업스트림에서 원소가 배출되지 않은 상태로 정해진 타임아웃 시간이 지나야만 항목을 다운스트림 플로우로 배출

🔖 17.2.7 플로우가 실행되는 코루틴 콘텍스트를 바꾸기: flowOn 연산자

fun main() { runBlocking { flowOf(1) .onEach { log("A") } .flowOn(Dispatchers.Default) .onEach { log("B") } .flowOn(Dispatchers.IO) .onEach { log("C") } .collect() } } 0 [DefaultDispatcher-worker-3 @coroutine#3] A 24 [DefaultDispatcher-worker-1 @coroutine#2] B 26 [main @coroutine#1] C
  • flowOn 연산자는 처리 파이프라인의 일부를 다른 디스패처나 다른 코루틴 콘텍스트에서 힐행할 수 있다.
    • 코루틴 콘텍스트를 조정
  • flowOn 연산자는 업스트림 플로우의 디스패처에만 영향을 미친다.

📖 17.3 커스텀 중간 연산자 만들기

  • 일반적으로 중간 연산자는 동시에 수집자와 생산자의 역할을 한다.
fun Flow<Double>.averageOfLast(n: Int): Flow<Double> = flow { val numbers = mutableListOf<Double>() collect { if (numbers.size > n) { numbers.removeFirst() } numbers.add(it) emit(numbers.average()) } } fun main() = runBlocking { flowOf(1.0, 2.0, 30.0, 121.0) .averageOfLast(3) .collect { print("$it ") } }
  • 내부적으로 발생한 숫자의 리스트를 유지하면서 만난 값들의 평균을 배출할 수 있다.
  • 최적화는 밖에 드러나지 않고 코드의 행동방식에는 영향을 미치지 않는다.

📖 17.4 최종 연산자는 업스트림 플로우를 실행하고 값을 계산한다

  • 최종 연산자는 단일 값이나 값의 컬렉션을 계산하거나, 플로우의 실행을 촉발시켜 지정된 연산과 부수 효과를 수행한다.
fun main() = runBlocking { getTemperatures() .onEach { log(it) } .collect() }
  • collect는 플로우의 각 원소에 대해 실행할 람다를 지정할 수 있는 유용한 지름길을 제공
  • 최종 연산자는 업스트림 플로우의 실행을 담당하기 때문에 항상 일시 중단 함수다.
fun main() = runBlocking { getTemperatures() .first() }
  • firstfirstOrNull 같은 최종연산자는 원소를 받은 다음에 업스트림 플로우를 취소할 수 있다.

🔖 17.4.1 프레임워크는 커스텀 연산자를 제공한다

@Composable fun TemperatureDisplay(temps: Flow<Int>) { val temperature = temps.collectAsState(null) Box { temperature.value?.let { Text("The current temperature is $it!") } } }
  • 텍스트를 상자 안에 넣는 간단한 코드
  • 플로우는 코루틴 기반 툴킷에 강력한 추가 기능을 제공
  • 코틀린 생태계의 어떤 프레임워크들은 플로우와 직접적인 통합을 제공하며, 커스텀 연산자와 변환 함수도 노출
Written by@BottleH
Back-End Developer

GitHub