Kotlin

[Kotlin] 코루틴 (Coroutine) 기초

꾸준함. 2024. 10. 2. 16:42

1. suspend 함수

  • 코틀린 코루틴에서 비동기적으로 실행할 수 있는 특수한 함수로
  • 코루틴 내에서 중단 가능(suspension) 한 작업을 처리할 수 있도록 설계됨
    • 실행 일시 중단 가능
    • 나중에 다시 실행 재개할 수 있는 특징 지님

 

  • 코루틴 내에서 네트워크 요청이나 파일 읽기/쓰기와 같이 특정 작업을 대기해야 할 때 CPU 리소스를 낭비하지 않고, 효율적으로 다른 작업 처리 가능

 

1.1 suspend 함수 특징

  • 코루틴 내에서만 실행 가능: suspend 함수는 오직 코루틴 내에서만 호출될 수 있으며 코루틴 범위 밖에서는 직접 호출 불가
  • 중단 및 재개 가능: 앞서 언급했다시피 suspend 함수는 중간에 실행을 멈추고, 나중에 다시 이어서 실행 가능
  • 비동기 작업 처리: suspend 함수는 일반 함수와 달리 비동기 작업을 처리할 수 있으며 이는 시간이 오래 걸리는 I/O 작업에 매우 적합

 

1.2 suspend 함수 vs 일반 함수

 

특징 suspend 함수 일반 함수
실행 방식 비동기
중단 및 재개 가능
동기
호출이 완료될 때까지 대기
실행 스코프 코루틴 내에서만 호출 가능 코루틴 외부에서 호출 가능
CPU 점유 작업이 완료될 때까지 중단하고 다른 작업 실행 가능 작업 중 CPU를 계속 점유
blocking 여부 호출 쓰레드를 블로킹하지 않음
non-blocking
호출 쓰레드를 블로킹
blocking

 

1.3 코틀린 코드를 자바 코드로 변환한 뒤 차이점 확인

  • Intellij에서 코틀린 코드를 bytecode로 변환한 뒤 Decompile을 통해 bytecode를 자바 코드로 변환 가능

 

1.3.1 코틀린 코드

  • 코틀린 코루틴과 Reactor의 Mono를 함께 사용하는 간단한 예제로 코틀린의 suspend 함수와 Reactor의 비동기 스트림 API를 결합하여 비동기적으로 데이터를 처리하는 방식

 

class SuspendExample {
suspend fun greet(who: String): String {
delay(100)
return getResult(who).awaitSingle()
}
private fun getResult(who: String): Mono<String> {
return Mono.just("Hello, $who!")
}
}
view raw .kt hosted with ❤ by GitHub

 

1.3.2 변환된 자바 코드

  • greet 메서드에 Continuation이라는 인자가 추가된 것을 확인 가능
  • ContinuationImpl에는 label과 result가 포함되었고
  • switch 문을 통해 label에 따라서 코드가 수행되는 것을 확인 가능

 

@Metadata(
mv = {1, 8, 0},
k = 1,
xi = 48,
d1 = {"\u0000\u0018\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\b\u0004\u0018\u00002\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\u0016\u0010\u0003\u001a\b\u0012\u0004\u0012\u00020\u00050\u00042\u0006\u0010\u0006\u001a\u00020\u0005H\u0002J\u0016\u0010\u0007\u001a\u00020\u00052\u0006\u0010\u0006\u001a\u00020\u0005H\u0086@¢\u0006\u0002\u0010\b¨\u0006\t"},
d2 = {"Lcom/grizz/wooman/coroutine/basic/SuspendExample;", "", "()V", "getResult", "Lreactor/core/publisher/Mono;", "", "who", "greet", "(Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "kotlin-coroutine"}
)
public final class SuspendExample {
@Nullable
public final Object greet(@NotNull String who, @NotNull Continuation $completion) {
Object $continuation;
label27: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label27;
}
}
$continuation = new ContinuationImpl($completion) {
Object L$0;
Object L$1;
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return SuspendExample.this.greet((String)null, (Continuation)this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch (((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).L$0 = this;
((<undefinedtype>)$continuation).L$1 = who;
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(100L, (Continuation)$continuation) == var5) {
return var5;
}
break;
case 1:
who = (String)((<undefinedtype>)$continuation).L$1;
this = (SuspendExample)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
break;
case 2:
ResultKt.throwOnFailure($result);
var10000 = $result;
return var10000;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
Mono var6 = this.getResult(who);
((<undefinedtype>)$continuation).L$0 = null;
((<undefinedtype>)$continuation).L$1 = null;
((<undefinedtype>)$continuation).label = 2;
var10000 = MonoKt.awaitSingle(var6, (Continuation)$continuation);
if (var10000 == var5) {
return var5;
} else {
return var10000;
}
}
private final Mono getResult(String who) {
Mono var2 = Mono.just("Hello, " + who + '!');
Intrinsics.checkNotNullExpressionValue(var2, "just(\"Hello, $who!\")");
return var2;
}
}
view raw .java hosted with ❤ by GitHub

 

 

1.3.3 코틀린 코드에서 자바 코드로 변환 결과

  • Continuation 인자 추가 및 Continuation 구현체 생성
  • switch문 추가
  • DelayKt.delay를 호출하며 Continuation 전달하고 종료
  • System.out.println 수행
  • Unit 반환

 

* 코틀린 컴파일러가 suspend 함수를 변환하는 과정을 파악하기 위해서는 FSM(Finite State Machine)Continuation Passing Style에 대한 이해 필요

 

2. Finite State Machine

  • 유한한 개수의 상태를 갖는 state machine으로 한 번에 오직 하나의 상태만을 가질 수 있음
  • 이벤트를 통해 하나의 상태에서 다른 상태로 전환 가능
  • 프로그래밍에서는 복잡한 제어 흐름을 관리하기 위해 FSM을 사용

 

https://www.lloydatkinson.net/posts/2022/modelling-workflows-with-finite-state-machines-in-dotnet/

 

2.1 FSM 구현 방법

  • label을 이용해서 when 문을 수행
  • 각각의 case에 작업을 수행하고 label을 변경
  • 재귀함수를 호출하며 다음 label을 전달하여 상태 변경
    • label을 직접 인자로 넘기는 대신 Shared라는 data class를 통해 전달
    • 추가로 Shared 내 result 인자에 계산된 결과를 저장하여 재귀함수에 전달

 

// 상태 및 결과를 공유하는 data class
data class Shared(
var label: Int = 0,
var result: Int = 0,
)
fun fsm(shared: Shared) {
when (shared.label) {
0 -> {
println("State 0: 작업 시작, 현재 result: ${shared.result}")
shared.result += 10
shared.label = 1
fsm(shared)
}
1 -> {
println("State 1: 작업 중, 현재 result: ${shared.result}")
shared.result *= 2
shared.label = 2
fsm(shared)
}
2 -> {
println("State 2: 추가 작업 수행, 현재 result: ${shared.result}")
shared.result -= 5
shared.label = 3
fsm(shared)
}
3 -> {
println("State 3: 작업 완료, 최종 result: ${shared.result}")
// 종료 상태이므로 더 이상 재귀 호출하지 않음
}
else -> {
println("알 수 없는 상태: ${shared.label}")
}
}
}
fun main() {
fsm(shared)
}
view raw .kt hosted with ❤ by GitHub

 

3. Continuation Passing Style

  • 함수의 실행이 끝난 후 무엇을 할지에 대한 정보를 Continuation이라는 추가 인자로 전달하는 스타일
  • 함수가 직접 결과를 반환하는 대신, 결과를 다음으로 처리할 함수에 넘겨주는 방식
    • 값을 반환하는 대신 Continuation을 실행

 

3.1 Continuation 인터페이스

  • 코틀린 코루틴에서 Continuation 인터페이스 제공
  • resumeWith를 구현하여 외부에서 해당 Continuation을 실행할 수 있는 엔드 포인트 제공
    • 이를 위해 CoroutineContext 포함

 

 

3.2 Continuation vs Callback

 

특징 Continuation Callback
사용 방식 suspend 함수에서 사용
코루틴의 중단과 재개에 쓰임
함수 호출 시 콜백 함수를 전달하여 작업 완료 시 호출
context 유지 함수 상태를 유지하고 중단된 지점에서 재개 가능 함수 상태 유지 X
완료되면 콜백 호출
가독성 비동기 코드를 동기식 코드처럼 간결하게 작성 가능 콜백 함수가 중첩되면 복잡도 올라감
비동기 코드 흐름 중단점에서 재개 작업 완료 시점에서 콜백 호출
예외 처리 try-catch로 처리 가능
코루틴에서 자체 지원
콜백 내부에서 에러 처리를 명시적으로 처리

 

3.3 FSM에 CPS를 적용한 예제

 

import kotlin.coroutines.*
data class State(val label: Int, val result: Int)
fun fsmCps(state: State, continuation: Continuation<State>) {
when (state.label) {
0 -> {
println("State 0: 작업 시작, 현재 result: ${state.result}")
continuation.resumeWith(Result.success(State(1, state.result + 10)))
}
1 -> {
println("State 1: 작업 중, 현재 result: ${state.result}")
continuation.resumeWith(Result.success(State(2, state.result * 2)))
}
2 -> {
println("State 2: 추가 작업 수행, 현재 result: ${state.result}")
continuation.resumeWith(Result.success(State(3, state.result - 5)))
}
3 -> {
println("State 3: 작업 완료, 최종 result: ${state.result}")
}
else -> {
println("알 수 없는 상태: ${state.label}")
}
}
}
suspend fun fsmCpsSuspend(state: State): State = suspendCoroutine { continuation ->
fsmCps(state, continuation)
}
suspend fun runFSM() {
var state = State(0, 0)
while (state.label != 3) {
state = fsmCpsSuspend(state) // suspend 함수로 상태 전환
}
}
fun main() = runBlocking {
runFSM()
}
view raw .kt hosted with ❤ by GitHub

 

4. 동기 코드를 코루틴으로 전환하는 과정 예시

 

4.1 동기 코드

  • 고객, 상품, 스토어, 그리고 주소 정보 조회 후 주문을 넣는 mock 코드

 

private val log = kLogger()
class OrderBlockingExample(
private val customerService: CustomerBlockingService,
private val productService: ProductBlockingService,
private val storeService: StoreBlockingService,
private val deliveryAddressService: DeliveryAddressBlockingService,
private val orderService: OrderBlockingService,
) {
fun execute(userId: Long, productIds: List<Long>): Order {
// 1. 고객 정보 조회
val customer = customerService.findCustomerById(userId)
// 2. 상품 정보 조회
val products = productService
.findAllProductsByIds(productIds)
// 3. 스토어 조회
val storeIds = products.map { it.storeId }
val stores = storeService.findStoresByIds(storeIds)
// 4. 주소 조회
val daIds = customer.deliveryAddressIds
val deliveryAddress = deliveryAddressService
.findDeliveryAddresses(daIds)
.first()
// 5. 주문 생성
val order = orderService.createOrder(
customer, products, deliveryAddress, stores
)
return order
}
}
fun main(args: Array<String>) {
val customerService = CustomerBlockingService()
val productService = ProductBlockingService()
val storeService = StoreBlockingService()
val deliveryAddressService = DeliveryAddressBlockingService()
val orderService = OrderBlockingService()
val example = OrderBlockingExample(
customerService = customerService,
productService = productService,
storeService = storeService,
deliveryAddressService = deliveryAddressService,
orderService = orderService,
)
val order = example.execute(1, listOf(1, 2, 3))
log.info("order: {}", order)
}
view raw .kt hosted with ❤ by GitHub

 

4.2 비동기 코드로 전환

  • CompletableFuture, Reactive Streams, 그리고 Flow 등을 이용하여 비동기 코드로 전환
  • 하지만 비동기 코드로 전환함에 따라 가독성이 떨어짐
    • 아도겐 코드와 같은 형태가 되어 코드 복잡도 올라감
    • 동기 코드에 비해 가독성이 떨어짐

https://x.com/shiftpsh/status/1156243329123622914

 

4.3 비동기 코드에 FSM 적용

  • data 클래스인 Shared 객체 정의
    • main에서 최초로 실행하는 경우 null이 전달되기 때문에 Shared 객체 생성
    • shared의 label 값에 따라 다른 case문을 실행하며 case 문 내에서는 label을 변경하고 이전에 실행됐던 결과를 shared 내부의 중간값에 저장
    • 각 case 문마다 cont.result에 값을 저장 후 재귀 호출


private val log = kLogger()
class OrderAsyncExampleUpgrade1(
private val customerService: CustomerFutureService,
private val productService: ProductRxjava3Service,
private val storeService: StoreMutinyService,
private val deliveryAddressService: DeliveryAddressPublisherService,
private val orderService: OrderReactorService,
) {
class Shared {
var result: Any? = null
var label = 0
// variables
lateinit var customer: Customer
lateinit var products: List<Product>
lateinit var stores: List<Store>
lateinit var deliveryAddress: DeliveryAddress
}
fun execute(userId: Long,
productIds: List<Long>,
shared: Shared? = null) {
val cont = shared ?: Shared()
when (cont.label) {
0 -> {
// 1. 고객 정보 조회
cont.label = 1
customerService.findCustomerFuture(userId)
.thenAccept { customer ->
cont.result = customer
execute(userId, productIds, cont)
}
}
1 -> {
// 2. 상품 정보 조회
cont.customer = cont.result as Customer
cont.label = 2
productService.findAllProductsFlowable(productIds)
.toList()
.subscribe { products ->
cont.result = products
execute(userId, productIds, cont)
}
}
2 -> {
// 3. 스토어 조회
cont.products = cont.result as List<Product>
cont.label = 3
val products = cont.products
val storeIds = products.map { it.storeId }
storeService.findStoresMutli(storeIds)
.collect().asList()
.subscribe()
.with { stores ->
cont.result = stores
execute(userId, productIds, cont)
}
}
3 -> {
// 4. 주소 조회
cont.stores = cont.result as List<Store>
cont.label = 4
val customer = cont.customer
val daIds = customer.deliveryAddressIds
deliveryAddressService.findDeliveryAddressesPublisher(daIds)
.subscribe(FirstFinder { deliveryAddress ->
cont.result = deliveryAddress
execute(userId, productIds, cont)
})
}
4 -> {
// 5. 주문 생성
cont.deliveryAddress = cont.result as DeliveryAddress
cont.label = 5
val customer = cont.customer
val products = cont.products
val deliveryAddress = cont.deliveryAddress
val stores = cont.stores
orderService.createOrderMono(
customer, products, deliveryAddress, stores,
).subscribe { order ->
cont.result = order
execute(userId, productIds, cont)
}
}
5 -> {
val order = cont.result as Order
log.info("order: $order")
}
}
}
}
fun main(args: Array<String>) {
val customerService = CustomerFutureService()
val productService = ProductRxjava3Service()
val storeService = StoreMutinyService()
val deliveryAddressService = DeliveryAddressPublisherService()
val orderService = OrderReactorService()
val example = OrderAsyncExampleUpgrade1(
customerService = customerService,
productService = productService,
storeService = storeService,
deliveryAddressService = deliveryAddressService,
orderService = orderService,
)
example.execute(1, listOf(1, 2, 3));
}
view raw .kt hosted with ❤ by GitHub

 

4.3.1 FSM만 적용했을 때의 문제점

  • cont.result에 값을 넣고 재귀 함수를 호출하는 코드가 반복됨
    • 재귀 함수를 직접 호출하기 때문에 코드를 외부로 분리하기 힘듦

 

  • main 함수에서는 Shared 객체를 생성하지 않기 때문에 결과를 출력하는 부분을 하드 코딩
    • 개선하기 위해서는 Continuation을 전달하는 형태로 변경해야 함

 

4.4 비동기 코드에 CPS 적용

  • Shared 객체를 CustomContinuation 객체로 변경
    • CustomContinuation은 main으로부터 Continuation을 받음
    • 중간값들 뿐만 아니라 매개변수들과 인스턴스까지 Continuation에 저장
    • 가장 마지막 state에서 complete를 호출하여 Completion 호출


private val log = kLogger()
class OrderAsyncExampleUpgrade2(
private val customerService: CustomerFutureService,
private val productService: ProductRxjava3Service,
private val storeService: StoreMutinyService,
private val deliveryAddressService: DeliveryAddressPublisherService,
private val orderService: OrderReactorService,
) {
private class CustomContinuation(
private val completion: Continuation<Any>,
) : Continuation<Any> {
var result: Any? = null
var label = 0
// arguments and instance
lateinit var that: OrderAsyncExampleUpgrade2
var userId by Delegates.notNull<Long>()
lateinit var productIds: List<Long>
// variables
lateinit var customer: Customer
lateinit var products: List<Product>
lateinit var stores: List<Store>
lateinit var deliveryAddress: DeliveryAddress
override val context: CoroutineContext
get() = completion.context
override fun resumeWith(result: Result<Any>) {
this.result = result.getOrThrow()
that.execute(0, emptyList(), this)
}
fun complete(value: Any) {
completion.resume(value)
}
}
fun execute(userId: Long,
productIds: List<Long>,
continuation: Continuation<Any>) {
val cont = if (continuation is CustomContinuation) {
continuation
} else {
CustomContinuation(continuation).apply {
that = this@OrderAsyncExampleUpgrade2
this.userId = userId
this.productIds = productIds
}
}
when (cont.label) {
0 -> {
// 1. 고객 정보 조회
cont.label = 1
customerService.findCustomerFuture(cont.userId)
.thenAccept(cont::resume)
}
1 -> {
// 2. 상품 정보 조회
cont.customer = cont.result as Customer
cont.label = 2
productService.findAllProductsFlowable(cont.productIds)
.toList()
.subscribe(cont::resume)
}
2 -> {
// 3. 스토어 조회
cont.products = cont.result as List<Product>
cont.label = 3
val products = cont.products
val storeIds = products.map { it.storeId }
storeService.findStoresMutli(storeIds)
.collect().asList()
.subscribe()
.with(cont::resume)
}
3 -> {
// 4. 주소 조회
cont.stores = cont.result as List<Store>
cont.label = 4
val customer = cont.customer
val daIds = customer.deliveryAddressIds
deliveryAddressService.findDeliveryAddressesPublisher(daIds)
.subscribe(FirstFinder(cont::resume))
}
4 -> {
// 5. 주문 생성
cont.deliveryAddress = cont.result as DeliveryAddress
cont.label = 5
val customer = cont.customer
val products = cont.products
val deliveryAddress = cont.deliveryAddress
val stores = cont.stores
orderService.createOrderMono(
customer, products, deliveryAddress, stores,
).subscribe(cont::resume)
}
5 -> {
val order = cont.result as Order
cont.complete(order)
}
}
}
}
fun main(args: Array<String>) {
val customerService = CustomerFutureService()
val productService = ProductRxjava3Service()
val storeService = StoreMutinyService()
val deliveryAddressService = DeliveryAddressPublisherService()
val orderService = OrderReactorService()
val example = OrderAsyncExampleUpgrade2(
customerService = customerService,
productService = productService,
storeService = storeService,
deliveryAddressService = deliveryAddressService,
orderService = orderService,
)
val cont = Continuation<Any>(EmptyCoroutineContext) {
log.info("result: $it")
}
example.execute(1, listOf(1, 2, 3), cont)
Thread.sleep(5000)
}
view raw .kt hosted with ❤ by GitHub

 

* 여전히 재귀 함수를 호출하는 코드가 반복됨

 

4.5 비동기 코드에서 재귀 함수 반복 호출 제거

  • subscribe, thenAccept 후 cont::resume을 넘기는 부분을 Continuation을 인자로 받는 확장 함수로 분리 가능
    • 분리할 경우 CompletableFuture, Flowable을 사용할 때는 더 이상 직접 subscribe, thenAccept 등을 사용하지 않더라도 확장 함수를 통해 재귀 함수 반복을 피할 수 있음


private val log = kLogger()
class OrderAsyncExampleUpgrade3(
private val customerService: CustomerFutureService,
private val productService: ProductRxjava3Service,
private val storeService: StoreMutinyService,
private val deliveryAddressService: DeliveryAddressPublisherService,
private val orderService: OrderReactorService,
) {
private class CustomContinuation(
private val completion: Continuation<Any>,
) : Continuation<Any> {
var result: Any? = null
var label = 0
// arguments and instance
lateinit var that: OrderAsyncExampleUpgrade3
var userId by Delegates.notNull<Long>()
lateinit var productIds: List<Long>
// variables
lateinit var customer: Customer
lateinit var products: List<Product>
lateinit var stores: List<Store>
lateinit var deliveryAddress: DeliveryAddress
override val context: CoroutineContext
get() = completion.context
override fun resumeWith(result: Result<Any>) {
this.result = result.getOrThrow()
that.execute(0, emptyList(), this)
}
fun complete(value: Any) {
completion.resume(value)
}
}
fun execute(userId: Long,
productIds: List<Long>,
continuation: Continuation<Any>) {
val cont = if (continuation is CustomContinuation) {
continuation
} else {
CustomContinuation(continuation).apply {
that = this@OrderAsyncExampleUpgrade3
this.userId = userId
this.productIds = productIds
}
}
when (cont.label) {
0 -> {
// 1. 고객 정보 조회
cont.label = 1
customerService.findCustomerFuture(cont.userId)
.await(cont)
}
1 -> {
// 2. 상품 정보 조회
cont.customer = cont.result as Customer
cont.label = 2
productService.findAllProductsFlowable(cont.productIds)
.toList().await(cont)
}
2 -> {
// 3. 스토어 조회
cont.products = cont.result as List<Product>
cont.label = 3
val products = cont.products
val storeIds = products.map { it.storeId }
storeService.findStoresMutli(storeIds)
.collect().asList().awaitSuspending(cont)
}
3 -> {
// 4. 주소 조회
cont.stores = cont.result as List<Store>
cont.label = 4
val customer = cont.customer
val daIds = customer.deliveryAddressIds
deliveryAddressService.findDeliveryAddressesPublisher(daIds)
.awaitFirst(cont)
}
4 -> {
// 5. 주문 생성
cont.deliveryAddress = cont.result as DeliveryAddress
cont.label = 5
val customer = cont.customer
val products = cont.products
val deliveryAddress = cont.deliveryAddress
val stores = cont.stores
orderService.createOrderMono(
customer, products, deliveryAddress, stores,
).awaitSingle(cont)
}
5 -> {
val order = cont.result as Order
cont.complete(order)
}
}
}
private fun <T> CompletableFuture<T>.await(
cont: Continuation<T>
) {
this.thenAccept(cont::resume)
}
private fun <T> Single<T>.await(cont: Continuation<T>)
where T : Any {
this.subscribe(cont::resume)
}
private fun <T> Uni<T>.awaitSuspending(cont: Continuation<T>) {
this.subscribe().with(cont::resume)
}
private fun <T> Publisher<T>.awaitFirst(
cont: Continuation<T>
) {
this.subscribe(FirstFinder(cont::resume))
}
private fun <T> Mono<T>.awaitSingle(
cont: Continuation<T>
) {
this.subscribe(cont::resume)
}
}
fun main(args: Array<String>) {
val customerService = CustomerFutureService()
val productService = ProductRxjava3Service()
val storeService = StoreMutinyService()
val deliveryAddressService = DeliveryAddressPublisherService()
val orderService = OrderReactorService()
val example = OrderAsyncExampleUpgrade3(
customerService = customerService,
productService = productService,
storeService = storeService,
deliveryAddressService = deliveryAddressService,
orderService = orderService,
)
val cont = Continuation<Any>(EmptyCoroutineContext) {
log.info("result: $it")
}
example.execute(1, listOf(1, 2, 3), cont)
Thread.sleep(5000)
}
view raw .kt hosted with ❤ by GitHub

 

4.6 코루틴 적용

  • 코틀린 컴파일러가 하는 일을 역순으로 수행함으로써 FSM, CPS이 적용된 비동기 코드를 코루틴 코드로 전환
  • 코루틴을 사용하면 비동기 영역에서 결과가 반환될 때까지 일시 중단하고 결과가 반환되면 재개 가능 (suspendable)
    • 이처럼 코드를 일시 중단하고 재개 가능한 단위를 코루틴이라고 지칭


private val log = kLogger()
class OrderCoroutineExample(
private val customerService: CustomerFutureService,
private val productService: ProductRxjava3Service,
private val storeService: StoreMutinyService,
private val deliveryAddressService: DeliveryAddressPublisherService,
private val orderService: OrderReactorService,
) {
suspend fun execute(userId: Long, productIds: List<Long>): Order {
// 1. 고객 정보 조회
val customer = customerService.findCustomerFuture(userId)
.await()
// 2. 상품 정보 조회
val products = productService
.findAllProductsFlowable(productIds)
.toList().await()
// 3. 스토어 조회
val storeIds = products.map { it.storeId }
val stores = storeService.findStoresMutli(storeIds)
.collect().asList().awaitSuspending()
// 4. 주소 조회
val daIds = customer.deliveryAddressIds
val deliveryAddress = deliveryAddressService
.findDeliveryAddressesPublisher(daIds)
.awaitFirst()
// 5. 주문 생성
val order = orderService.createOrderMono(
customer, products, deliveryAddress, stores,
).awaitSingle()
return order
}
}
suspend fun main(args: Array<String>) {
val customerService = CustomerFutureService()
val productService = ProductRxjava3Service()
val storeService = StoreMutinyService()
val deliveryAddressService = DeliveryAddressPublisherService()
val orderService = OrderReactorService()
val example = OrderCoroutineExample(
customerService = customerService,
productService = productService,
storeService = storeService,
deliveryAddressService = deliveryAddressService,
orderService = orderService,
)
example.execute(1, listOf(1, 2, 3));
}
view raw .kt hosted with ❤ by GitHub

 

부연 설명

  • 코틀린 컴파일러는 suspend가 붙은 함수에 Continuation 인자 추가
  • 다른 suspend 함수를 실행할 경우 소유하고 있는 Continuation 전달
    • 이러한 변환으로 인해 앞서 언급했다시피 suspend가 없는 함수는 전달할 Continuation이 없기 때문에 다른 suspend 함수 호출 불가

 

  • suspend 함수 내부를 when 문을 이용해서 FSM 상태로 변경
  • 각각의 state에서는 label을 변경하고 비동기 함수를 수행
  • 비동기 함수가 완료되면 continuation.resume을 수행하여 다시 복귀하지만 label이 변경되면서 다른 state로 전환
  • 마지막 state에 도달하면 completion.resume을 수행하고 종료

 

5. Spring WebFlux에서 코루틴 적용

  • 앞서 언급했다시피 suspend 함수는 suspend 함수나 코루틴 내부가 아니라면 실행 불가
  • 하지만 Spring WebFlux는 다음과 같은 케이스에 대해서도 suspend 함수를 호출할 수 있도록 지원
    • Controller 내부에서 suspend 함수를 호출해야 하는 케이스
    • 변경 불가능한 interface가 Mono나 CompletableFuture 등을 반환하고 suspend 함수가 아닌 케이스

 

5.1 Spring WebFlux suspend 함수 지원

  • Spring WebFlux는 suspend 함수 지원
  • Context1, MonoCoroutine, Dispatchers.Unconfined를 context로 갖고 reactor-http-nio-2 쓰레드에서 실행

 

reactor-http-nio-2 쓰레드

 

  • RequestMappingHandlerAdapter가 handlerMethod를 실행하고 handlerMethod로부터 invocableMethod를 획득하고 invoke를 통해 실행

RequestMappingHandlerAdapter.handle

 

  • 주어진 메서드가 suspend 함수인지 체크하고 suspend 함수가 맞다면 CoroutineUtils.invokeSuspendingFunction을 실행하고 아니라면 method.invoke 실행

 

InvocableHandlerMethod.invoke

 

  • invokeSuspendingFunction 내부에서 mono를 실행하며 Dispatchers.Unconfined를 전달
    • Flow를 반환한다면 mono.flatMapMany를 사용

 

invokeSuspendingFunction

 

5.2 외부 라이브러리에서 제공되는 인터페이스가 Mono를 반환하는 경우 suspend 함수 호출하는 방법

  • kotlin-coroutines-reactor에서 mono 함수를 제공
  • mono 함수를 이용해서 내부에서 suspend 함수 실행
  • mono 함수의 결과값은 Mono이기 때문에 그대로 반환

 

kotlin-coroutines-reactor > mono

 

  • monoInternal에서는 sink로부터 ReactorContext를 추출
  • 추출한 ReactorContext로 CoroutineContext를 생성
  • MonoCoroutine을 생성하고 시작

 

 

  • MonoCoroutine은 sink를 인자로 받고 Coroutine이 complete 되면 sink의 success를 호출
    • 반대로 cancel 된다면 sink의 error를 호출

 

 

5.3 CompletableFuture를 반환하는 함수에서 suspend 함수를 호출하는 방법

  • CoroutineScope를 생성하고 해당 스코프에서 Future를 실행하여 suspend 함수를 실행 후 결과를 CompletableFuture 형태로 반환

 

CoroutineScope.future

 

참고

패스트 캠퍼스 - Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지

반응형

'Kotlin' 카테고리의 다른 글

[Kotlin] CoroutineScope  (0) 2024.10.03
[Kotlin] CoroutineContext  (0) 2024.10.03
[Kotlin] 코루틴 (Coroutine) 개요  (0) 2024.10.02
[Kotlin] lateinit과 by lazy의 차이점  (2) 2022.07.13