시작하기 전에 아래에 있는 UserDownloader 클래스를 살펴보자. 이 클래스에서 아이디로 사용자를 받아오거나, 이전에 전송받은 모든 사용자를 얻을 수 있다. 이렇게 구현하면 어떤 문제가 있을까?

class UserDownloader(
    private val api: NetworkService
) {
    private val users = mutableListOf<User>()
    
    fun donloaded(): List<User> = users.toList()
    
    suspend fun fetchUser(id: Int) {
        val newUser = api.fetchUser(id)
        users.add(newUser)
    }
}

<aside> 📢 위 예제에서는 방어적 복사로 toList를 사용했다. downloaded로 반환된 객체를 읽을 때와 변경 가능한 리스트에 원소를 추가할 때 발생할 수 있는 충돌을 피하기 위함이다. users를 읽기만 가능한 리스트(List<User>)와 읽고 쓰기가 가능한 프로퍼티(var)로 선언할 수도 있다. 방어적 복사를 하지 않아도 되고 downloaded 함수를 보호할 필요도 없지만, 컬렉션에 원소를 추가하는 작업의 효율이 떨어지게 된다. 저자는 var + immutable을 선호하지만, 실제 현업의 많은 프로젝트에서 변경 가능한 컬렉션을 사용하고 있으므로 val + mutable 방법을 예로 들었다.

</aside>

앞의 구현 방식은 동시 사용에 대한 대비가 되어 있지 않다. fetchUser 호출은 users를 변경한다. 이 경우 같은 시간에 해당 함수가 한 개의 스레드에서 시작할 경우에만 정상적으로 작동한다. 같은 시간에 두 개 이상의 스레드에서 함수가 호출될 수 있으므로 users는 공유 상태에 해당하며 보호될 필요가 있다. 동시에 리스트를 변경하면 충돌이 일어날 수 있기 때문이다. 아래 예제에서 충돌이 일어날 수 있는 경우를 확인할 수 있다.


data class User(
    val name: String,
)
interface NetworkService {
    suspend fun fetchUser(id: Int): User
}
class FakeNetworkService: NetworkService {
    override suspend fun fetchUser(id: Int): User {
        delay(2)
        return User("User$id")
    }
}

class UserDownloader(
    private val api: NetworkService
) {
    private val users = mutableListOf<User>()

    fun donloaded(): List<User> = users.toList()

    suspend fun fetchUser(id: Int) {
        val newUser = api.fetchUser(id)
        users.add(newUser)
    }
}

suspend fun main() {
    val downloader = UserDownloader(FakeNetworkService())
    coroutineScope {
        repeat(1_000_000) {
            launch {
                downloader.fetchUser(it)
            }
        }
    }
    println(downloader.donloaded().size) // 917002, 964403, 949144
}

같은 객체와 상호작용하는 스레드가 많기 때문에 위 코드는 1,000,000보다 작은 숫자를 출력하거나 예외(ArrayIndexOutOfBoundsException)를 던지게 된다.

앞서 살펴본 문제는 공유 상태를 변경할 때 쉽게 만날 수 있다. 좀더 간단한 예를 들면 하나의 정수를 1씩 증가시키는 스레드가 여러 개 있는 경우가 있다. 여기서는 Dispatchers.Default를 사용하는 1,000개의 코루틴에서 1,000번의 연산을 호출하는 massiveRun을 사용한다. 모든 연산이 끝난 뒤 숫자는 1,000,000(1,000 * 1,000)이 되어야 한다. 하지만 동기화되지 않으면 충돌이 발생하므로 실제 결과는 이보다 더 작다.

var counter = 0

fun main() = runBlocking {
    massiveRun {
        counter++
    }
    println(counter) // 207600
}

suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

결과가 1,000,000이 아니라는 걸 이해하기 위해 두 개의 스레드가 똑같은 시간에 같은 수를 1씩 증가시킨다고 가정해 보자. 시작값은 0이다. 첫 번째 스레드가 현재 값인 0을 받고 난 뒤 프로세서가 두 번째 스레드로 옮기기로 결정한다. 두 번째 스레드 또한 0을 받고 1로 증가시킨 뒤 변수에 저장한다. 첫 번째 스레드로 다시 옮긴다면 이전에 멈췄을 때 사용한 0을 1로 증가시키고 저장한다. 그 결과 변수는 2가 되어야 하지만 실제로는 1이 되어 버린다. 이 때문에 연산 일부가 반영되지 않는 결과가 일어난다.

동기화 블로킹

위와 같은 문제는 자바에서 사용되는 전통적인 도구인 synchronized 블록이나 동기화된 컬렉션을 사용해 해결할 수 있다.

var counter = 0

fun main() = runBlocking {
    val lock = Any()

    massiveRun {
        synchronized(lock) {
            counter++
        }
    }
    println(counter) // 1000000
}

suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

이 방법은 작동하긴 하지만, 몇 가지 문제점이 있다. 가장 큰 문제점은 synchronized 블록 내부에서 중단 함수를 사용할 수 없다는 것이다. 두 번째는 syncrhonized 블록에서 코루틴이 자기 차례를 기다릴 때 스레드를 블로킹한다는 것이다. 디스패처의 원리를 생각해 보면 코투린이 스레드를 블로킹하는 건 지양해야 한다. 메인 스레드가 블로킹되면 어떻게 될까? 제한된 수의 스레드만 가지고 있다면 어떨까? 왜 스레드와 같은 자원을 낭비해야 할까? 이러한 방법 대신 코루틴에 특화된 방법을 사용해야 한다. 블로킹 없이 중단하거나 충돌을 회피하는 방법을 사용해야 한다. 지금까지 봤던 방식과는 다른, 코루틴에서 사용하는 방식을 보자.

원자성(atomic)

자바에는 간단한 경우에 사용할 수 있는 다른 방법이 있다. 자바는 다양한 원자값을 가지고 있다. 원자값을 활용한 연산은 빠르며 ‘스레드 안전’을 보장한다. 이러한 연산을 원자성 연산이라 한다. 원자성 연산은 락 없이 로우 레벨로 구현되어 효율적이고 사용하기가 쉽다. 사용할 수 있는 원자값의 종류는 다양하다. 여기서는 AtomicInteger를 사용하겠다.

private var counter = AtomicInteger()

fun main() = runBlocking {
    massiveRun {
        counter.incrementAndGet()
    }
    println(counter.get()) // 1000000
}

suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

원자값은 의도대로 완벽하게 작동하지만 사용성이 제한되기 떄문에 조심해서 다뤄야 한다. 하나의 연산에서 원자성을 가지고 있다고 해서 전체 연산에서 원자성이 보장되는 것은 아니기 때문이다.

private var counter = AtomicInteger()

fun main() = runBlocking {
    massiveRun {
        counter.set(counter.get() + 1)
    }
    println(counter.get()) // 189420
}

suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

UserDownloader를 안전하게 사용하기 위해서 읽기만 가능한 사용자 리스트를 AtomicReference로 래핑할 수도 있다. 충돌 없이 값을 갱신하기 위해서는 getAndUpdate라는 원자성 보장 함수를 사용한다.