플로우(flow)는 비동기적으로 계산해야 할 값의 스트림을 나타낸다. Flow 인터페이스 자체는 떠다니는 원소들을 모으는 역할을 하며, 플로우의 끝에 도달할 때까지 각 값을 처리하는 걸 의미한다.(Flow의 collect는 컬렉션의 forEach와 비슷하다.)
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
Flow의 유일한 멤버 함수는 collect다. 다른 함수들은 확장 함수로 정의되어 있다. iterator만 멤버 함수로 가지고 있는 Iterable 또는 Sequence와 비슷하다고 볼 수 있다.
public interface Iterable<out T> {
/**
* Returns an iterator over the elements of this object.
*/
public operator fun iterator(): Iterator<T>
}
public interface Sequence<out T> {
/**
* Returns an [Iterator] that returns the values from the sequence.
*
* Throws an exception if the sequence is constrained to be iterated once and `iterator` is invoked the second time.
*/
public operator fun iterator(): Iterator<T>
}
여러 개의 값을 반환하는 함수가 필요하다고 가정해보자. 한번에 모든 값을 만들 때는 List나 Set과 같은 컬렉션을 사용한다.
fun allUsers(): List<User> =
api.getAllUsers().map { it.toUser() }
명심해야 할 점은 List와 Set이 모든 원소의 계산이 완료된 컬렉션이라는 것이다. 값들을 계산하는 과정에 시간이 걸리기 때문에, 원소들이 채워질 때까지 모든값이 생성되길 기다려야 한다.
fun getList() : List<String> = List(3) {
Thread.sleep(1000)
"User$it"
}
fun main() {
val list = getList()
println("Function started")
list.forEach { println(it) }
}
(3초 후)
Function started
User0
User1
User2
원소를 하나씩 계산할 때는, 원소가 나오자마자 바로 얻을 수 있는 것이 낫다. Sequnece를 사용하는 것이 한 가지 방법이다.
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
Thread.sleep(1000)
yield("User$it")
}
}
fun main() {
val list = getSequence()
println("Function started")
list.forEach { println(it) }
}
Function started
(1초 후)
User0
(1초 후)
User1
(1초 후)
User2
시퀸스는 복잡한 결괏값을 계산하는 등의 CPU 집약적인 연산 또는 파일을 읽는 등의 블로킹 연산이 필요할 때마다 값을 계산하는 플로우를 나타내기에 적절하다. (forEach와 같은) 시퀸스의 최종 연산은 중단 함수가 아니기 때문에, 시퀸스 빌더 내부에 중단점이 있다면 값을 기다리는 스레드가 블로킹된다. 따라서 sequence 빌더의 스코프에서는 SequenceScope의 리시버에서 호출되는 함수(yield와 yieldAll) 외에 다른 중단 함수를 사용할 수 없다.
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
// Restricted suspending functions can only invoke member or
// extension suspending functions on their restricted coroutine scope
delay(1000) // 컴파일 에러 발생
yield("User$it")
}
}
시퀸스를 잘못 사용하면 안 되기 때문에 중단 함수를 사용할 수 없다는 제약사항이 도입되었다. 위 예제는 컴파일이 되더라도 (forEach와 같은) 최종 연산이 코루틴을 중단시키는 대신 스레드를 블로킹하여 생각지도 못한 스레드 블로킹이 발생할 수 있다. 누군가 HTTP의 엔드포인트로부터 시퀸스를 사용해 페이징 기법으로 빈 페이지를 받을 때까지 모든 사용자의 리스트를 얻는 경우를 생각해보자. Sequence의 iterator가 중단 함수가 아니기 때문에, 시퀸스의 원소를 소비할 때 블로킹이 되는 것이 문제가 된다.
// 이렇게 구현하면 안 된다. 시퀸스 대신 플로우를 사용해야 한다.
fun allUsersSequence(
api: UserApi
): Sequence<User> = sequence {
var page = 0
do {
val users = api.takePage(page++) // 중단 함수이므로 컴파일 에러 발생
yieldAll(users)
} while (!users.isNullOrEmpty())
}
interface UserApi {
suspend fun takePage(pageNum: Int): List<User>
}
data class User(
val name: String,
)
시퀸스는 위와 같은 상황에서 사용하기에는 적합하지 않다. 데이터 소스의 개수가 많거나(또는 무한정이거나) 원소가 무거운 경우, 원소를 필요로 할 때만 계산하거나 읽는 지연 연산을 하게 되는 상황에서 시퀸스가 정확히 들어맞다.
val fibonacci: Sequence<BigInteger> = sequence {
var first = 0.toBigInteger()
var second = 1.toBigInteger()
while (true) {
yield(first)
val temp = first
first += second
second = temp
}
}
fun countCharactersInFile(path: String) {
File(path).useLines { lines ->
lines.sumOf { it.length }
}
}
스레드 블로킹이 매우 위험하고 예기치 않은 상황을 유발할 수 있다. 다음 예제를 통해 확인해 보자. Sequence를 사용했기 때문에 forEach가 블로킹 연산이 된다. 따라서 같은 스레드에서 launch로 시작된 코루틴이 대기하게 되며, 하나의 코루틴이 다른 코루틴을 블로킹하게 된다.
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
Thread.sleep(1000)
yield("User$it")
}
}
suspend fun main() {
withContext(newSingleThreadContext("main")) {
launch {
repeat(3) {
delay(100)
println("Processing on continue")
}
}
val list = getSequence()
list.forEach { println(it) }
}
}
(1초 후)
User0
(1초 후)
User1
(1초 후)
User2
Processing on continue
(0.1초 후)
Processing on continue
(0.1초 후)
Processing on continue