728x90
- 채널은 송신자와 수신자의 수에 제한이 없으며, 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있습니다.
- SendChannel 은 원소를 보내거나(또는 더하거나) 채널을 닫는 용도로 사용됩니다.
- ReceiveChannel 은 원소를 받을 때(또는 꺼낼때) 사용됩니다.
- 채널의 진입점을 제한하기 위해 ReceiveChannel 이나, SendChannel 중 하나만 노출시키는 것도 가능합니다.
- 원소를 보내고 받는 함수가 중단 함수인 것은 필수적인 특징입니다.
- receive: 호출했는데 채널에 원소가 없다면 코루틴은 원소가 들어올 때 까지 중단됩니다.
- send: 채널의 용량이 다 찼을 때 중단됩니다.
- *중단 함수가 아닌 함수로 보내거나 받아야 한다면 trySend, tryReceive 를 사용할 수 있다.
두 연산 모두 연산이 성공했는지 실패했는지에 대한 정보를 담고 있는 ChanncelResult 를 즉시 반환합니다.
용량이 제한적인 채널에서만 trySend 와 tryReceive 를 사용해야 하는데, (버퍼가 없음)랑데뷰 채널에서는 작동하지 않기 때문입니다.- 버퍼가 없다? -> 채널에 메시지를 임시로 저장할 수 있는 공간이 전혀 없다라는것을 의미.
// 227 페이지 - 불완전한 코드
suspend fun main1(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) { index ->
delay(1000)
println("Producing next one")
channel.send(index * 2)
}
}
launch {
repeat(5) {
val received = channel.receive()
println("Received: $received")
}
}
}
위와 같은 구현방식은 불완전하다. 수신자가 얼마나 많은 원소를 보내는지 알아야하고,
수신자가 이런 정보를 아는 경우는 별로 없기때문에
송신자가 보내는 만큼 수신자가 기다리는 방식을 선호함
// 228 페이지 - 개선된 코드
suspend fun main2(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) { index ->
delay(1000)
println("Producing next one")
channel.send(index * 2)
}
channel.close()
}
launch {
for (element in channel) {
println("Received element: $element")
}
// 또는
// channel.consumeEach { element ->
// println("Received element: $element")
// }
}
}
위와 같이 원소를 보내는 방식의 문제점은 (특히 예외가 발생했을 때) 채널을 닫는 걸 깜박하기 쉽다는 것
예외로 인해 코루틴이 원소를 보내는 걸 중단하면, 다른 코루틴은 원소를 영원히 기다려야 합니다.
ReciveChannel 을 반환하는 코루틴 빌더인 produce 함수를 사용하는 것이 좀 더 편리합니다.
// 229 페이지 - produce 사용으로 좀 더 개선된 코드
suspend fun main3(): Unit = coroutineScope {
val channel = produce {
repeat(5) { index ->
println("Producing next one")
delay(1000)
send(index * 2)
}
}
for (element in channel) {
println("Received element: $element")
}
}
- 채널타입: 설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있습니다.
- 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 Channel.UNLIMITED 로 설정된 채널로, send 가 중단되지 않습니다.
- 버퍼(Bufferd): 특정 용량 크기 또는 Channel.BUFFERED(기본값은 64이며 JVM 의 kotlinx.coroutines.defaultBuffer 를 설정하면 오버라이드 할 수 있습니다)로 설정된 채널
- 랑데뷰(Rendezvous): 용량이 0이거나 Channel.RENDZVOUS(용량이 9입니다)인 채널로, 송신자와 수신자가 만날 때만 원소를 교환합니다.
- 융합(Conflated): 버퍼 크기가 1인 Channel.CONFLATED 를 가진 채널로 새로 운 원소가 이전 원소를 대체합니다.
- 무제한(Unlimited)
suspend fun main4(): Unit = coroutineScope {
val channel = produce(capacity = Channel.UNLIMITED) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println("Received element: $element")
delay(1000)
}
}
- 용량이 무제한이면 채널은 모든 원소를 받고 수신자가 하나씩 가져가게 합니다.
- 버퍼
suspend fun main5(): Unit = coroutineScope {
val channel = produce(capacity = 3) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println("Received element: $element")
delay(1000)
}
}
- 정해진 용량을 가지고 있다면 버퍼가 가득 찰 때까지 원소가 생성되고, 이후에 생성자는 수신자가 원소를 소비하기를 기다리기 시작합니다.
- 기본 또는 랑데뷰
suspend fun main6(): Unit = coroutineScope {
val channel = produce {
// 또는 produce(capacity = Channel.RENDEZVOUS)
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println("Received element: $element")
delay(1000)
}
}
- 기본(또는 Channel.RENDEZVOUS) 용량을 가진 채널의 경우 송신자는 항상 수신자를 기다립니다.
- 융햡(Channel.CONFLATED)
suspend fun main7(): Unit = coroutineScope {
val channel = produce(capacity = Channel.CONFLATED) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println("Received element: $element")
delay(1000)
}
}
- Channel.CONFLATED 용량을 사용하면
- 이전 원소를 더 이상 저장하지 않습니다.
- 새로운 원소가 이전 원소를 대체하며, 최근 원소만 받을 수 있게 되므로 먼저 보내진 원소가 유실됩니다.
- 버퍼 오퍼블로일 때
- 채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때(onBufferOverflow 파라미터)의 행동을 정의할 수 있다.
- SUSPEND(기본옵션): 버퍼가 가득 찼을 때, send 메서드가 중단됩니다.
- DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거됩니다.
- DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거됩니다.
- 전달되지 않은 원소 핸들러
- Channel 함수에서 반드시 알아야 할 또 다른 파라미터는 inUndeliveredElement 입니다.
- 원소가 어떠한 이유로 처리되지 않을 때 호출됩니다.
- 대부분 채널이 닫히거나 취소되었을음 의미하지만, send, receive, receiveOrNull, hasNext 가 에러를 던질 때 발생할 수도 있습니다.
- 주로 채널에서 보낸 자원을 닫을 때 사용합니다.
- 팬아웃(Fan-out)
- 여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있습니다.
- 하지만 원소를 적절하게 처리하려면 반드시 for 루프를 사용해야 합니다(consumeEach 는 여러개의 코루틴이 사용하기에는 안전하지 않습니다.)
- consumeEach는 채널의 원소를 하나씩 받아서 처리하는 함수이며, 이를 사용하는 코루틴은 채널을 독점합니다. 이 독점 사용으로 인해, consumeEach를 사용하는 코루틴이 실행되는 동안 다른 코루틴은 해당 채널에서 원소를 받을 수 없게 됩니다.
이러한 독점적 소비 방식 때문에, 팬아웃 상황(하나의 채널로부터 여러 코루틴이 원소를 받는 경우)에서 consumeEach를 사용하면 문제가 발생할 수 있습니다.
한 코루틴이 채널을 소비하고 있을 때 다른 코루틴이 동일한 채널에서 원소를 받으려고 하면, 채널 접근 충돌이 일어나거나 예외가 발생할 수 있습니다.
fun CoroutineScope.produceNumbers2() = produce {
repeat(10) {
delay(100)
send(it)
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
// 236 ~ 237 페이지 - 팬아웃
suspend fun main9(): Unit = coroutineScope {
val channel = produceNumbers2()
repeat(3) { id ->
delay(10)
launchProcessor(id, channel)
}
}
Processor #0 received 0
Processor #1 received 1
Processor #2 received 2
Processor #0 received 3
Processor #1 received 4
Processor #2 received 5
Processor #0 received 6
Processor #1 received 7
Processor #2 received 8
Processor #0 received 9
- 원소는 공평하게 배분되며, FIFO 큐를 가지고 있다.
- 팬인(Fan-in)
- 여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있습니다.
suspend fun sendString(
channel: SendChannel<String>,
text: String,
time: Long
) {
while (true) {
delay(time)
channel.send(text)
}
}
fun main10() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(50) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
- 파이프라인
- 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우를 파이프라인 이라고 부릅니다.
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> =
produce {
for (num in numbers) {
send(num * num)
}
}
suspend fun main11(): Unit = coroutineScope {
val numbers = numbers()
val squares = square(numbers)
for (square in squares) {
println(square)
}
}
1
4
9
- 통신의 기본 형태로서의 채널
728x90
'공부 > 코틀린코루틴:딥다이브(마르친 모스카와)' 카테고리의 다른 글
29장 - 코루틴을 시작하는 것과 중단 함 숮 어떤 것이 나을까? (0) | 2024.03.16 |
---|---|
18장 - 핫 데이터 소스와 콜드 데이터 소스 (0) | 2024.03.10 |
14장 - 공유 상태로 인한 문제 (0) | 2024.03.01 |
13장 - 코루틴 스코프 만들기 (0) | 2024.02.26 |
12장 - 디스패처 (0) | 2024.02.20 |
댓글