개발 지식/Kotlin

[Coroutine] Composing Suspending Functions

에르주 2022. 9. 2. 01:17
반응형

이번에는 suspending 함수의 구성에 대해 정리하고자 한다.

 

1) 비동기함수의 순차적 실행

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }

    println("Completed in $time ms")
}
suspend fun doSomethingUsefulOne() : Int {
    println("doSomethingUsefulOne")
    delay(2000L) // 뭐 어떤 DB 호출이나 외부 통신에 걸리는 로직이 들어갈 것이다.
    return 13
}


suspend fun doSomethingUsefulTwo() : Int {
    println("doSomethingUsefulTwo")
    delay(2000L)
    return 29
}

fun <T> println(msg: T) {
    kotlin.io.println("$msg [${Thread.currentThread().name}]")
}

 

  • doSomethingUsefulOne
  • doSomethingUsefulTwo

두개의 suspending 함수가 main 함수의 Blocking 안에서 순차적으로 작성시 

doSomethingUsefulOne -> doSomethingUsefulTwo

로 수행이 된다.

 

비동기 순차적 실행

 

2) 순차적 실행이 아닌 동시에 실행하는 법

 

위의 예시에는 doSomethingUsefulOne -> doSomethingUsefulTwo 순서로 실행되어 총 4초가 넘게 걸렸다면 이번에는 비동기 형태로 동시에 실행시키는 예시를 작성해보고자 한다.

Note that concurrency with coroutines is always explicit
: 코루틴을 동시적으로 실행하고 싶으면 항상 명시적이어야 한다.

 

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = async{doSomethingUsefulOne()}
        val two = async {doSomethingUsefulTwo()}
        // 첫번째 결과와 두번째 결과를 기다려서 수행
        println("The answer is ${one.await() + two.await()}")
    }

    println("Completed in $time ms")
}
suspend fun doSomethingUsefulOne() : Int {
    delay(1000L) // 뭐 어떤 DB 호출이나 외부 통신에 걸리는 로직이 들어갈 것이다.
    return 13
}


suspend fun doSomethingUsefulTwo() : Int {
    delay(1000L)
    return 29
}

fun <T> println(msg: T) {
    kotlin.io.println("$msg [${Thread.currentThread().name}]")
}

 

async 동시 실행

async 적용하여 동시에 실행되어 1초가 걸리는 것을 확인 할 수 있다.

 

 

3) async를 했지만 코루틴의 await 함수를 적용 한다면? -> 순차적으로 실행된다.

 

// 2초 기다리게 하기 위해서는 await을 따로 준다.
fun main() = runBlocking {
    val time = measureTimeMillis {

        // 첫번째 콜하고 wait하고 두번째 호출
        val one = async{doSomethingUsefulOne()}
        val oneRes = one.await() // 비동기를 순차적으로 수행
        val two = async {doSomethingUsefulTwo()}
        // 첫번째 결과와 두번째 결과를 기다려서 수행
        println("The answer is ${oneRes + two.await()}")
    }

    println("Completed in $time ms")
}
suspend fun doSomethingUsefulOne() : Int {
    delay(1000L) // 뭐 어떤 DB 호출이나 외부 통신에 걸리는 로직이 들어갈 것이다.
    return 13
}


suspend fun doSomethingUsefulTwo() : Int {
    delay(1000L)
    return 29
}
 val oneRes = one.await() // 비동기를 순차적으로 수행

await 함수를 통해 비동기를 순차적으로 수행시키게 한다.

 

async 순차적 실행

 

4-1) async 바로 실행시키지 않는 법 -> LAZY 속성 부여

async 함수에 LAZY 속성을 부여하면 바로 실행되는 것이 아닌 그 suspending function이 호출 될 때 비로소 해당 함수가 실행된다.

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        one.start()
        two.start()
        println("The answer is ${one.await() + two.await()}")
    }

    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne() : Int {
    delay(1000L) // 뭐 어떤 DB 호출이나 외부 통신에 걸리는 로직이 들어갈 것이다.
    return 13
}


suspend fun doSomethingUsefulTwo() : Int {
    delay(1000L)
    return 29
}

fun <T> println(msg: T) {
    kotlin.io.println("$msg [${Thread.currentThread().name}]")
}

 

CoroutineStart.LAZY

 

4-2) 순차적으로 실행될수도..?

 

one.start()

two.start()

 

이 부분이 없을 경우 println 안의 await시 호출되기 때문에 각각 함수에서 발생하는 delay값의 합인 2초가 걸리게 된다.

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // start가 없을 경우에는 2초가 걸린다
        // await시 호출되기 때문
//        one.start()
//        two.start()
        println("The answer is ${one.await() + two.await()}")
    }

    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne() : Int {
    delay(1000L) // 뭐 어떤 DB 호출이나 외부 통신에 걸리는 로직이 들어갈 것이다.
    return 13
}


suspend fun doSomethingUsefulTwo() : Int {
    delay(1000L)
    return 29
}

fun <T> println(msg: T) {
    kotlin.io.println("$msg [${Thread.currentThread().name}]")
}

 

2초 걸림

 

5) Async-style functions

4번 예시의 코루틴 빌더 (3,4 line) 부분을 하나의 함수로 만들어보자.

val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }

 

package coroutine.composingsuspendfun

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

/*
코루틴 빌더를 하나의 함수로 만드는 것은 suspend 함수가 아니다.
어디서나 호출될 수 있으며(코루틴 아닌 곳에서) 코틀린 코루틴에서 권장하지 않는 스타일이다
이 문제는 structured concurrency를 통해 해결 할 수 있다.
Async Style function
 */

fun main()  {

    try {
        val time = measureTimeMillis {
            val one = somethingUsefulOneAsync()
            val two = somethingUsefulTwoAsync()

            // exception 발생해도 코루틴 종료 안된다!
            // 왜냐하면 위의 코루틴은 글로벌 스코프에서 실행 되었기 때문에
            // main안에서 exception발생해도 영향 없음. -> 즉 서비스 죽지 않음음
            println("my exception")
            throw Exception("my exceptions")
            runBlocking {
                println("The answer is ${one.await() + two.await()}")
            }
        }

        println("Completed in $time ms")

    } catch (e: Exception) {

    }

    runBlocking {
        delay(10000L)
    }
}

fun somethingUsefulOneAsync() = GlobalScope.async {
    println("start, somethingUsefulOneAsync")
    val res = doSomethingUsefulOne()
    println("end, somethingUsefulOneAsync")
    println("oneAsync Exception")
    throw Exception("my exceptions")
    res
}


fun somethingUsefulTwoAsync() = GlobalScope.async {
    println("start, somethingUsefulTwoAsync")
    val res = doSomethingUsefulTwo()
    println("end, somethingUsefulTwoAsync")
    println("twoAsync Exception")
    res
}

suspend fun doSomethingUsefulOne() : Int {
    delay(3000L) // 뭐 어떤 DB 호출이나 외부 통신에 걸리는 로직이 들어갈 것이다.
    return 13
}


suspend fun doSomethingUsefulTwo() : Int {
    delay(3000L)
    return 29
}

 

코루틴 빌더 부분을 아래와 같은 두개의 함수로 각각 생성하였다.

  • somethingUsefulOneAsync()
  • somethingUsefulTwoAsync()

두개의 함수가  GlobaslScope를 활용한 2개의 sync 코루틴 빌더를 생성하였다. 

하지만 두개의 함수는 suspending 함수가 아니며 어디든 사용할 수 있다. (main함수가 아니더라도 호출 가능!)

그 뜻은 exception이 발생하여도 종료되는 것이 아닌 백그라운드에서 계속 실행된다는 것이다. 이는 개발자가 에러를 찾을 수 없으며 이 함수는 다른 어딘 가에서 계속 로직이 수행될 수 있다. 

 

 

2개의 함수 결과값의 합을 구하는 것이 main 함수의 역할인데 하나의 async 함수만 exception 발생해도 정상적인 결과값을 구할 수 없다.

 

즉 하나의 함수가 종료되면 TwoAsync 또한 종료 되어야 하지만 다른 스코프 및 같은 runblocking안에 있지 않아 백그라운드에서 계속 실행되고 있는 것이다.

 

 

이것은 코루틴에서 권장하지 않는 스타일이며 이를 structured concurrency를 활용하여 해결할 수 있다.

 

delay 10000ms 동안 끝나지 않음

6) Structured concurrency with Async

 

그렇다면 위의 Async 함수를 structured concurrency로 해결해보자.

각각의 함수를 GlobalScope가 아닌 coroutineScope로 감쌌으며 즉 일반함수가 아니며 suspending 함수이다. (코루틴 안에서만 쓸수 있다.)

 

즉 Exception 발생 및 전파가 되며 스코프 안의 모든 코루틴이 종료된다.

 

fun main() = runBlocking {
    try {
        val time = measureTimeMillis {
            println("The answer is ${concurrentSum()}")
        }
        println("Completed in $time ms")
    } catch (e: Exception) {

    }

    runBlocking {
        delay(10000L)
    }
}

// 코루틴 스코프안에서 Exception이 터지기 때문 취소가 된다.
// 실행은 되었으나 끝까지 실행되지 않고 종료 된다 (exception이 발생하였기 때문에)
// 글로벌 스코프 형태로 함수를 감싸지말고 Structured concurrency 형태로 suspending 함수를 조합하고 코루틴을 구성하는게 맞다.
suspend fun concurrentSum(): Int = coroutineScope {

    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }

    delay(10)
    kotlin.io.println("Exception")
    throw Exception()
    one.await() + two.await()
}


suspend fun doSomethingUsefulOne(): Int {
    kotlin.io.println("start, doSomethingUsefulOne")
    delay(3000L) // 뭐 어떤 DB 호출이나 외부 통신에 걸리는 로직이 들어갈 것이다.
    kotlin.io.println("end, doSomethingUsefulOne")
    return 13
}


suspend fun doSomethingUsefulTwo(): Int {
    kotlin.io.println("start, doSomethingUsefulTwo")
    delay(3000L)
    kotlin.io.println("end, doSomethingUsefulTwo")
    return 29
}

 

 

end가 없다..,.

 

end. doSomething.. 부분이 찍히지 않는다는 것을 확인 할 수 있다. 즉 코루틴이 종료 되었다.

 

글로벌 스코프에서 런치하는 형태로 함수르 감싸서 만들지 않고 structured concurrency 형태로 suspending 함수를 조합해서 만드는 것을 권장한다는 것이다.

 

 

7) Cancellation propagated coroutines hierarchy

 

캔슬이 코루틴 내에 전파가 된다. 즉 Exception 및 timeout로 인하여 Cancel될 때 전파되며 전체 코루틴이 종료가 된다.

Structured concurrency에서는 Cancel이 전파가 된다.

suspend fun failedConcurrentSum(): Int = coroutineScope {

    val one = async<Int> {
        try {
            delay(Long.MAX_VALUE)
            42
        } finally {
            println("First child was cancelled") // 취소 전파되어서 얘도 취소
        }

    }
    val two = async<Int> {
        println("Second child throws an Exception")
        throw ArithmeticException() // 두번째에서 Exception 발생
    }

    one.await() + two.await()

}


/*
exception이 발생해서 코루틴이 종료가 되면 캔슬된 것들이 hierarchy로 전파가 된다. -> 전체 코루틴이 종료가 된다.
Structured concurrency에서는 취소가 전파 된다.
 */
fun main() = runBlocking<Unit> {

    try {
        failedConcurrentSum()
    } catch (e: ArithmeticException) {
        println("Computation failed with ArithmeticException") // 얘도 취소가 전파되어서 종료
    }
}

fun <T> println(msg: T) {
    kotlin.io.println("$msg [${Thread.currentThread().name}]")
}



two 변수의 async 코루틴에서 exception 발생  println("Second child throws an Exception")

-> one에서 exception 발생하여 println("First child was cancelled") 실행
-> main 함수에서 exception을 캐치하여  println("Computation failed with ArithmeticException") 실행

취소 전파

실행중에 Exception이 발생되면 다 종료하고 리소스도 해제 하며 정상적인 상태로 종료된다.

 

결론

무거운 연산들을 코루틴 내에서 순차적으로 작성하게 되면 비동기도 순차적으로 실행이 된다.

순차적으로 시행되는 것을 async로 실행 (동시 실행)
option 파라미터로 lazy 주면 콜이 왔을 때 실행이 될 수 있게 한다.


suspend function들을 조립하고 프로그래밍할 때 Structured concurrency 형태로 구현한다.
스코프 안에서 계속 코루틴을 실행시킨다면 그런 코루틴들이 만약 잘못되었을 때 캔슬이 전파되면서 모든 코루틴이 종료된다.
별도로 GlobalScope로 하지 않고 Structured concurrency로 진행한다.
 

끝.

반응형