본문 바로가기
공부/코틀린코루틴:딥다이브(마르친 모스카와)

16장 - 채널

by 띵커베르 2024. 3. 2.
728x90
  • 채널은 송신자와 수신자의 수에 제한이 없으며, 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있습니다.
  • SendChannel 은 원소를 보내거나(또는 더하거나) 채널을 닫는 용도로 사용됩니다.
  • ReceiveChannel 은 원소를 받을 때(또는 꺼낼때) 사용됩니다.
  • 채널의 진입점을 제한하기 위해 ReceiveChannel 이나, SendChannel 중 하나만 노출시키는 것도 가능합니다.
  • 원소를 보내고 받는 함수가 중단 함수인 것은 필수적인 특징입니다.
  • receive: 호출했는데 채널에 원소가 없다면 코루틴은 원소가 들어올 때 까지 중단됩니다.
  • send: 채널의 용량이 다 찼을 때 중단됩니다.
  • *중단 함수가 아닌 함수로 보내거나 받아야 한다면 trySend, tryReceive 를 사용할 수 있다.
    두 연산 모두 연산이 성공했는지 실패했는지에 대한 정보를 담고 있는 ChanncelResult 를 즉시 반환합니다.
    용량이 제한적인 채널에서만 trySend 와 tryReceive 를 사용해야 하는데, (버퍼가 없음)랑데뷰 채널에서는 작동하지 않기 때문입니다.
    • 버퍼가 없다? -> 채널에 메시지를 임시로 저장할 수 있는 공간이 전혀 없다라는것을 의미.
  •  
// 227 페이지 - 불완전한 코드
suspend fun main1(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            delay(1000)
            println("Producing next one")
            channel.send(index * 2)
        }
    }

    launch {
        repeat(5) {
            val received = channel.receive()
            println("Received: $received")
        }
    }
}
위와 같은 구현방식은 불완전하다. 수신자가 얼마나 많은 원소를 보내는지 알아야하고, 
수신자가 이런 정보를 아는 경우는 별로 없기때문에
송신자가 보내는 만큼 수신자가 기다리는 방식을 선호함


// 228 페이지 - 개선된 코드
suspend fun main2(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            delay(1000)
            println("Producing next one")
            channel.send(index * 2)
        }
        channel.close()
    }

    launch {
        for (element in channel) {
            println("Received element: $element")
        }

        // 또는
//        channel.consumeEach { element ->
//            println("Received element: $element")
//        }
    }
}
위와 같이 원소를 보내는 방식의 문제점은 (특히 예외가 발생했을 때) 채널을 닫는 걸 깜박하기 쉽다는 것
예외로 인해 코루틴이 원소를 보내는 걸 중단하면, 다른 코루틴은 원소를 영원히 기다려야 합니다.
ReciveChannel 을 반환하는 코루틴 빌더인 produce 함수를 사용하는 것이 좀 더 편리합니다.

// 229 페이지 - produce 사용으로 좀 더 개선된 코드
suspend fun main3(): Unit = coroutineScope {
    val channel = produce {
        repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            send(index * 2)
        }
    }

    for (element in channel) {
        println("Received element: $element")
    }

}

 

 

  • 채널타입: 설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있습니다.
    • 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 Channel.UNLIMITED 로 설정된 채널로, send 가 중단되지 않습니다.
    • 버퍼(Bufferd): 특정 용량 크기 또는 Channel.BUFFERED(기본값은 64이며 JVM 의 kotlinx.coroutines.defaultBuffer 를 설정하면 오버라이드 할 수 있습니다)로 설정된 채널
    • 랑데뷰(Rendezvous): 용량이 0이거나 Channel.RENDZVOUS(용량이 9입니다)인 채널로, 송신자와 수신자가 만날 때만 원소를 교환합니다.
    • 융합(Conflated): 버퍼 크기가 1인 Channel.CONFLATED 를 가진 채널로 새로 운 원소가 이전 원소를 대체합니다.
  • 무제한(Unlimited)
suspend fun main4(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.UNLIMITED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println("Received element: $element")
        delay(1000)
    }
}
  • 용량이 무제한이면 채널은 모든 원소를 받고 수신자가 하나씩 가져가게 합니다.

 

  • 버퍼
suspend fun main5(): Unit = coroutineScope {
    val channel = produce(capacity = 3) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println("Received element: $element")
        delay(1000)
    }
}
  • 정해진 용량을 가지고 있다면 버퍼가 가득 찰 때까지 원소가 생성되고, 이후에 생성자는 수신자가 원소를 소비하기를 기다리기 시작합니다.

 

  • 기본 또는 랑데뷰

suspend fun main6(): Unit = coroutineScope {
    val channel = produce {
        // 또는 produce(capacity = Channel.RENDEZVOUS)
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println("Received element: $element")
        delay(1000)
    }
}
  • 기본(또는 Channel.RENDEZVOUS) 용량을 가진 채널의 경우 송신자는 항상 수신자를 기다립니다.

 

  • 융햡(Channel.CONFLATED)
suspend fun main7(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.CONFLATED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println("Received element: $element")
        delay(1000)
    }
}
  • Channel.CONFLATED 용량을 사용하면
  • 이전 원소를 더 이상 저장하지 않습니다.
  • 새로운 원소가 이전 원소를 대체하며, 최근 원소만 받을 수 있게 되므로 먼저 보내진 원소가 유실됩니다.

 


  • 버퍼 오퍼블로일 때
    • 채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때(onBufferOverflow 파라미터)의 행동을 정의할 수 있다.
    • SUSPEND(기본옵션): 버퍼가 가득 찼을 때, send 메서드가 중단됩니다.
    • DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거됩니다.
    • DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거됩니다.

 

 

  • 전달되지 않은 원소 핸들러
    • Channel 함수에서 반드시 알아야 할 또 다른 파라미터는 inUndeliveredElement 입니다.
    • 원소가 어떠한 이유로 처리되지 않을 때 호출됩니다.
    • 대부분 채널이 닫히거나 취소되었을음 의미하지만, send, receive, receiveOrNull, hasNext 가 에러를 던질 때 발생할 수도 있습니다.
    • 주로 채널에서 보낸 자원을 닫을 때 사용합니다.

 

  • 팬아웃(Fan-out)
    • 여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있습니다.
    • 하지만 원소를 적절하게 처리하려면 반드시 for 루프를 사용해야 합니다(consumeEach 는 여러개의 코루틴이 사용하기에는 안전하지 않습니다.)
    • consumeEach는 채널의 원소를 하나씩 받아서 처리하는 함수이며, 이를 사용하는 코루틴은 채널을 독점합니다. 이 독점 사용으로 인해, consumeEach를 사용하는 코루틴이 실행되는 동안 다른 코루틴은 해당 채널에서 원소를 받을 수 없게 됩니다.
      이러한 독점적 소비 방식 때문에, 팬아웃 상황(하나의 채널로부터 여러 코루틴이 원소를 받는 경우)에서 consumeEach를 사용하면 문제가 발생할 수 있습니다.
      한 코루틴이 채널을 소비하고 있을 때 다른 코루틴이 동일한 채널에서 원소를 받으려고 하면, 채널 접근 충돌이 일어나거나 예외가 발생할 수 있습니다.
fun CoroutineScope.produceNumbers2() = produce {
    repeat(10) {
        delay(100)
        send(it)
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

// 236 ~ 237 페이지 - 팬아웃
suspend fun main9(): Unit = coroutineScope {
    val channel = produceNumbers2()
    repeat(3) { id ->
        delay(10)
        launchProcessor(id, channel)
    }
}

Processor #0 received 0
Processor #1 received 1
Processor #2 received 2
Processor #0 received 3
Processor #1 received 4
Processor #2 received 5
Processor #0 received 6
Processor #1 received 7
Processor #2 received 8
Processor #0 received 9
  • 원소는 공평하게 배분되며, FIFO 큐를 가지고 있다.

 

  • 팬인(Fan-in)
    • 여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있습니다.
suspend fun sendString(
    channel: SendChannel<String>,
    text: String,
    time: Long
) {
    while (true) {
        delay(time)
        channel.send(text)
    }
}

fun main10() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(50) {
        println(channel.receive())
    }
    coroutineContext.cancelChildren()
}

 

 

  • 파이프라인
    • 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우를 파이프라인 이라고 부릅니다.
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
    produce {
        for (num in numbers) {
            send(num * num)
        }
    }


suspend fun main11(): Unit = coroutineScope {
    val numbers = numbers()
    val squares = square(numbers)
    for (square in squares) {
        println(square)
    }
}

1
4
9

 

 

 

  • 통신의 기본 형태로서의 채널
    •  
728x90

댓글