Concurrency in C# Cookbook, 2nd Edition

TPL(Task Parallel Library) Dataflow

Nuget Package: System.Threading.Tasks.Dataflow

    var multiplyBlock = new TransformBlock<int, int>(item =>
        if (item == 1)
            throw new InvalidOperationException("Blech.");
        return item * 2;
    var substractBlock = new TransformBlock<int, int>(item => item - 2);
        new DataflowLinkOptions { PropagateCompletion = true });
catch (AggregateException ex)
    AggregateException agex = ex.Flatten();

Concurrency in C# Cookbook, 2nd Edition

Nuget Package: System.Reactive


  • 이벤트 스트림을 데이터 스트림처럼 다룰 수 있다.
  • Observable stream의 개념을 바탕으로 한다.
  • Observable stream을 구독(subscribe)하면 정해지지 않은 수의 데이터 항목을 수신(OnNext)한 뒤에 하나의 오류(OnError) 또는 스트림 끝 알림(OnCompleted)으로 스트림이 끝난다.
  • 끝나지 않는 observable stream도 있다.

실제 인터페이스 모습, System.Reactive 라이브러리에 모든 구현을 포함하고 있음.

interface IObserver<in T>
    void OnNext(T item);
    void OnCompleted();
    void OnError(Exception error);

interface IObservable<out T>
    IDisposable Subscribe(IObserver<T> observer);


    .Where(ev => ev.Value % 2 == 0)
    .Select(ev => ev.Timestamp)
        ts => Trace.WriteLine(ts),
        ex => Trace.WriteLine(ex));

IObservable<DateTimeOffset> timestamps = Observable
    .Where(ev => ev.Value % 2 == 0)
    .Select(ev => ev.Timestamp);
    ts => Trace.WriteLine(ts),
    ex => Trace.WriteLine(ex));
  • System.Reactive 구독 역시 리소스다.
  • Subscribe 연산자는 구독을 나타내는 IDisposable 반환
    • 항상 오류 처리 매개 변수를 함께 받아야 함
  • 코드에서 observable stream의 수신을 완료하면 구독을 삭제해야 함
  • 구독은 hot observable과 cold observable에서 다르게 동작
    • hot observable: 언제든 발생할 수 있는 이벤트 스트림
      이벤트가 발생할 때 구독이 없으면 해당 이벤트는 사라짐
    • cold observable: 자동으로 발생하는 이벤트가 없는 경우
      구독에 대응해서 이벤트를 순서대로 발생하기 시작 


Introduction to Rx is the online resource for getting started with the Reactive Extensions to .Net. Originally starting life as a blog series, it has now flourished into an online book. You can read it online here via the website, or get a copy of the Kindle ed


.NET 이벤트 변환

var progress = new Progress<int>();
IObservable<EventPattern<int>> progressReports =
        handler => progress.ProgressChanged += handler,
        handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));

var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<ElapsedEventArgs>> ticks =
    Observable.FromEventPattern<ElaspedEventHandler, ElaspedEventArgs>(
        handler => (s, a) => handler(s, a),	// EventHandler<ElapsedEventArgs to ElapsedEventArgs
        handler => timer.Elapsed += handler,
        handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));

var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<object>> ticks =
    Observable.FromEventPattern(timer, nameof(Timer.Elapsed));
ticks.Subscribe(data => Trace.WriteLine("OnNext: "
    + (data.EventArgs as ElapsedEventArgs).SignalTime));


컨텍스트로 알림 전달

각 OnNext 알림은 순차적으로 발생하지만 무조건 같은 스레드에서 발생하지 않는다.

private void Button_Click(object sender, RoutedEventArgs e)
    SynchronizationContext uiContext = SynchronizationContext.Current;
    Trace.WriteLine($"UI Thread is {Environment.CurrentManagedThreadId}"};
        .Subscribe(x => Trace.WriteLine(
            $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));

UI 스레드를 벗어나는 용도

SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
    handler => (s, a) => handler(s, a),
    handler => MouseMove += handler,
    handler => MouseMove -= handler)
    .Select(evt => evt.EventArgs.GetPosition(this))
    .Select(position =>
        // 복잡한 계산
        Trace.WriteLine($"Calculated result {rv} on thread {Environment.CurrentManagedThreadId}");
        return rv;
    .Subscribe(x => Trace.WriteLine(
        $"Result {x} on thread {Environment.CurrentManagedThreadId}"));


Window와 Buffer로 이벤트 데이터 그룹화

  Buffer Window
이벤트 전달 들어오는 이벤트의 그룹화가 끝나면 모든 이벤트를 이벤트 컬렉션으로 한 번에 전달 들어오는 이벤트를 논리적으로 그룹호하지만 도착하는 대로 전달
반환형식 IObservable<IList<T>> IObservable<IObservable<T>>


Concurrency in C# Cookbook, 2nd Edition

데이터 병렬

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
    Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));

IEnumerable<bool> PrimalityTest(IEnumerable<int> values)
    return values.AsParallel()
        .Select(value => IsPrime(value));


작업 병렬(병렬 호출)

void ProcessArray(double[] array)
        () => ProcessPartialArray(array, 0, array.Length / 2),
        () => ProcessPartialArray(array, array.Length / 2, array.Length)

void ProcessPartialArray(double[] array, int begin, int end)
    // CPU 집약적인 처리

작업 병렬에서는 특히 closure 안에 캡처한 변수에 주의해야 한다.

값이 아닌 참조를 캡처하므로 명확하지 않은 공유가 일어날 수도 있다.

        () => { throw new Exception(); },
        () => { throw new Exception(); });
catch (AggregateException ex)
    ex.Handle(exception =>
        return true; // 처리함

루푸 중지

void InvertMatrices(IEnumerable<Matrix> matrices)
    Pallel.ForEach(matrices, (matrix, state) =>
        if (matrix.InInvertible)

중지는 루프의 내부에서 일어나며 취소는 루푸의 외부에서 일어난다.

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees, CancellationToken token)
        new ParallelOptions { CancellationToken = token },
        matrix => matrix.Rotate(degrees));

공유 상태를 보호하는 잠금(lock)의 사용법 예

int InvertMatrices(IEnumerable<Matrix> matrices)
    object mutex = new object();
    int nonInvertibleCount = 0;
    Parallel.ForEach(matrices, matrix =>
        if (matrix.IsInvertible)
            lock (mutex)
    return nonInvertibleCount;

PLINQ는 Parallel과 거의 똑같은 기능을 제공한다. 차이점:

  • PLINQ는 컴퓨터의 모든 코어를 사용할 수 있다고 가정
  • Prallel은 CPU의 상황에 따라 동적으로 대응

병렬 집계

int ParallelSum1(IEnumerable<int> values)
    object mutex = new object();
    int result = 0;    
    Parallel.ForEach(source: values,
        localInit: () => 0,
        body: (item, state, localValue) => localValue + item,
        localFinally: localValue =>
            lock (mutex)
                result += localValue;
    return result;

int ParallelSum2(IEnumerable<int> values)
    return values.AsParallel().Sum();

int ParallelSum3(IEnumerable<int> values)
    return values.AsParallel().Aggregate(
        seed: 0,
        func: (sum, item) => sum + item

병렬 호출

void DoAction20Times(Action action, CancellationToken token)
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
        new ParallelOptions( { CancellationToken = token },


TPL의 핵심은 Task 형식이다.

Parallel 클래스오 PLINQ는 강력한 Task 형식을 편리하게 쓸 수 있게 감싼 wrapper일 뿐

동적 병렬 처리가 필요하다면 Task 형식을 직접 사용하는 것이 가장 쉽다.


이진 트리의 각 노드에 비용이 많이 드는 처리를 수행해야 하는 예

void Traverse(Node current)
    if (current.Left is not null)
    if (current.Right is not null)

Task ProcessNode(Node node,
    TaskCreationOptions options = TaskCreationOptions.AttachedToParent)
    return Task.Factory.StartNew(
        () => Traverse(node),

void ProcessTree(Node root)
    var task = ProcessNode(root, TaskCreationOptions.None);

연속 작업(Continuation)

Task task = Task.Factory.StartNew(
    () => Thread.Sleep(TimeSpan.FromSeconds(2)),

Task continuation = task.ContinueWith(
    t => Trace.WriteLine("Task is done"),


IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
    return values.AsParallel().Select(value => value * 2);

IEnumerable<int> MultiplyBy2Ordered(IEnumerable<int> values)
    return values.AsParallel()
        .Select(value => value * 2);

int ParallelSum(IEnumerable<int> values)
    return values.AsParallel().Sum();


Concurrency in C# Cookbook, 2nd Edition

async Task DoSomethingAsync()
    int value = 1;
    // Context 저장
    // null ? SynchronizationContext : TaskScheduler
    // ASP.NET Core는 별도의 요청 컨텍스트가 아닌 threadpool context 사용
    await Task.Delay(TimeSpan.FromSeconds(1));
    value *= 2;
    await Task.Delay(TimeSpan.FromSeconds(1));
항상 코어 '라이브러리' 메서드 안에서 ConfigureAwait를 호출하고,
필요할 때만 다른 외부 '사용자 인터페이스' 메서드에서 컨택스트를 재개하는 것이 좋다.
async Task DoSomethingAsync()
    int value = 1;
    await Task.Delay(TimeSpan.FromSeconds(1))
    // Threadpool thread에서 실행을 재개한다.
    value *= 2;
    await Task.Delay(TimeSpan.FromSeconds(1))



  • 메모리 내 캐시에서 결과를 읽을 수 있는 등 메모리 할당을 줄일 수 있는 형식


Task 인스턴스를 만드는 방법

  1. CPU가 실행해야 할 실제 코드를 나타내는 계산 작업은 Task.Run으로 생성
  2. 특정 스케줄러에서 실행해야 한다면 TaskFactory.StartNew로 생성
  3. 이벤트 기반 작업은 TaskCompletionSource<TResult>
    (대부분 I/O 작업은 TaskCompletionSource<TResult> 사용)


오류 처리

async Task TrySomethingAsync()
    // 예외는 Task에서 발생한다.
    var task = PossibleExceptionAsync();
        // 여기서 예외 발생
        await task;
    catch (NotSupportedException ex)
        // 이렇게 잡힌 예외는 자체적으로 적절한 스택 추적(Stack trace)을 보존하고 있으며
        // 따로 TargetInvocationException이나 AggregateException으로 쌓여 있지 않다.



async Task WaitAsync()
    // 3. 현재 context 저장
    await Task.Delay(TimeSpan.FromSeconds(1));
    // ...
    // 3. 저장된 context 안에서 재개 시도
    //  Deadlock 메서드의 2. task.Wait()에서 차단된 thread
    //  context는 한 번에 하나의 thread만 허용하므로 재개할 수 없음

void Deadlock()
    // 1. 지연 시작
    var task = WaitAsync();
    // 2. 동기적으로 차단하고 async 메서드의 완료 대기

위의 코든느 UI 컨텍스트나 ASP.NET 클래식 컨텍스트에서 호출하면 교착 상태에 빠진다.

ConfigureAwait(false)로 해결

async Task WaitAsync()
    // 3. 현재 context 저장
    await Task.Delay(TimeSpan.FromSeconds(1))
    // ...
    // 3. Threadpool thread에서 재개

void Deadlock()
    // 1. 지연 시작
    var task = WaitAsync();
    // 2. 동기적으로 차단하고 async 메서드의 완료 대기


Asynchronous programming in C#

An overview of the C# language support for asynchronous programming using async, await, Task, and Task


Task-based Asynchronous Pattern (TAP): Introduction and overview

Learn about the Task-based Asynchronous Pattern (TAP), and compare it to the legacy patterns: Asynchronous Programming Model (APM) and Event-based Asynchronous Pattern (EAP).


Exponential backoff

async Task<string> DownloadStringWithRetries(HttpClient client, string uri)
    TimeSpan nextDelay = TimeSpan.FromSeconds(1);
    for (int i = 0; i < 3; ++i)
            return await client.GetStringAsync(uri);
        catch { }
        await Task.Delay(nextDelay);
        nextDelay = nextDelay + nextDelay;
    // 오류를 전파할 수 있게 마지막으로 한 번 더 시도
    return await client.GetStringAsync(uri);


Soft timeout

async Task<string> DownloadStringWithTimeout(HttpClient client, string uri)
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    Task<string> downloadTask = client.GetStringAsync(uri);
    Task timeoutTask = Task.Delay(Timeout.InfiniteTimeSpan, cts.Token);
    Task completedTask = await Task.WhenAny(downloadTask, timeoutTask);
    if (completedTask == timeoutTask)
        // WARNING: downloadTask는 여전히 동작한다.
        return null;
    return await downloadTask;

타임아웃이 지나면 실행을 중단해야 할 때

async Task IssueTimeoutAsync()
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    CancellationToken token = cts.Token;
    await Task.Delay(TimeSpan.FromSeconds(10), token);

async Task IssueTimeoutAsync()
    using var cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    await Task.Delay(TimeSpan.FromSeconds(10), token);


비동기 시그니처를 사용해서 동기 메서드 구현

interface IMyAsync
    Task<int> GetValueAsync(CancellationToken token);
    Task DoSomethingAsync();

class MySync : IMyAsync
    // 자주 사용하는 작업 결과라면 미리 만들어 놓고 쓴다.
    private static readonly Task<int> ZeroTask = Task.FromResult(0);

    public Task<int> GetValueAsync(CancellationToken token)
        if (token.IsCancellationRequested)
            return Task.FromCanceled<int>(token);
        return Task.FromResult(10);
    public Task<T> NotImplementedAsync()
        return Task.FromException<T>(new NotImplementedException());
    protected void DoSomethingSynchronously()
    public Task DoSomethingAsync()
            return Task.CompletedTask;
        catch (Exception ex)
            return Task.FromException(ex);


진행 상황 보고

async Task MyMethodAsync(IProgress<double> progress = null)
    bool done = false;
    double percentComplete = 0;
    while (!done)

async Task CallMyMethodAsync()
    var progress = new Progress<double>();
    progress.ProgressChanged += (sender, args) =>
    await MyMethodAsync(progress);


모든 작업의 완료 대기

Task task1 = Task.Delay(TimeSpan.FromSeconds(1));
Task task2 = Task.Delay(TimeSpan.FromSeconds(2));
Task task3 = Task.Delay(TimeSpan.FromSeconds(1));

await Task.WhenAll(task1, task2, task3);

Task<int> task1 = Task.FromResult(1);
Task<int> task2 = Task.FromResult(3);
Task<int> task3 = Task.FromResult(5);

int[] results = await Task.WhenAll(task1, task2, task3);
// results = [1, 3, 5]


async Task<string> DownloadAllAsync(HttpClient client,
    IEnumerable<string> urls)
    var downloads = urls.Select(url => client.GetStringAsync(url));
    // 아직 실제로 시작한 작업은 없다.
    // 동시에 모든 URL에서 다운로드 시작
    Task<string>[] downloadTasks = downloads.ToArray();
    // 모든 다운로드 완료를 비동기적으로 대기
    string[] htmlPages = await Task.WhenAll(downloadTasks);
    return string.Concat(htmlPages);

작업 중 하나가 예외를 일으키면 Task.WhenAll은 작업과 함께 해당 예외를 반환하며 실패한다.

여러 작업이 예외를 일으키면 모든 예외를 Task.WhenAll이 반환하는 Task에 넣는다.

하지만 작업이 대기 상태면 예외 중 하나만 일으킨다.

async Task ThrowNotImplementedExceptionAsync()
    throw new NotImplementedException();

async Task ThrowInvalidOperationExceptionAsync()
    throw new InvalidOperationException();

async Task ObserveOneExceptionAsync()
    var task1 = ThrowNotImplementedExceptionAsync();
    var task2 = ThrowInvalidOperationExceptionAsync();
        await Task.WhenAll(task1, task2);
    catch (Excpeiton ex)
        // ex는 NotImplementedException or InvalidOperationException

async Task ObserveAllExcpetionAsync()
    var task1 = ThrowNotImplementedExceptionAsync();
    var task2 = ThrowInvalidOperationExceptionAsync();

    Task allTasks = Task.WhenAll(task1, task2);
        await allTasks;
        AggregateException allExceptions = allTasks.Excpetion;


작업이 완료될 때마다 처리

async Task<int> DelayAndReturnAsync(int value)
    await Task.Delay(TimeSpan.FromSeconds(value));
    return value;

async Task AwaitAndProcessAsync(Task<int> task)
    int rv = await task;

async Task ProcessTasksAsync(int flag)
    Task<int> taskA = DelayAndReturnAsync(2);
    Task<int> taskB = DelayAndReturnAsync(3);
    Task<int> taskC = DelayAndReturnAsync(1);
    var tasks = new Task[] { taskA, taskB, taskC };
    Task[] processingTasks;
    if (flag == 1)
        IEnumerable<Task> taskQuery =
            from t in tasks select AwaitAndProcessAsync(t);
        processingTasks = taskQuery.ToArray();

        await Task.WhenAll(processingTasks);
    else if (flag == 2)
        processingTasks = tasks.Select(async t =>
            var rv = await t;

        await Task.WhenAll(processingTasks);
    else if (flag == 3)
        foreach (Task<int> task in tasks.OrderByCompletion())
            int rv = await task;


async void 메서드의 예외 처리

sealed class MyAsyncCommand : ICommand
    async void ICommand.Execute(object parameter)
        await Execute(parameter);
    public async Task Execute(object parameter)
        ; // 비동기 작업 구현
    ; // CanExecute 등 구현


ValueTask 생성/사용

  • 반환할 수 있는 동기적 결과가 있고 비동기 동작이 드문 상황에서 반환 형식으로 사용
  • 프로파일링을 통해 애플리케이션의 성능 향상을 확인할 수 있을 때만 고려해야 함
  • ValueTask를 반환하는 DisposeAsync 메서드가 있는 IAsyncDisposable을 구현할 때
private Task<int> SlowMethodAsync();

public ValueTask<int> MethodAsync()
    if (CanBehaveSynchronously)
        return new ValueTask<int>(1);
    return new ValueTask<int>(SlowMethodAsync());

async Task ConsumingMethodAsync()
    ValueTask<int> valueTask = MethodAsync();
    ; // 기타 동시성 작업
    int value = await valueTask;
ValueTask는 딱 한 번만 대기할 수 있다.
더 복잡한 작업을 하려면 AsTask를 호출해서 ValueTask<T>를 Task<T>로 변환해야 한다.
ValueTask에서 동기적으로 결과를 얻으려면 ValueTask를 완료한 뒤에 한 번만 할 수 있다.
async Task ConsumingTaskAsync()
    Task<int> task = MethodAsync().AsTask();
    ; // 기타 동시성 작업
    int value = await task;
    // Task<T>는 await로 여러 번 대기해도 완벽하게 안전하다.
    int anotherValue = await task;

async Task ConsumingTaskAsync()
    Task<int> task1 = MethodAsync().AsTask();
    Task<int> task2 = MethodAsync().AsTask();
    int[] results = await Task.WhenAll(task1, task2);


Asynchronous Stream

async IAsyncEnumerable<string> GetValuesAsync(HttpClient client)
    const int limit = 10;
    for (int offset = 0; true; offset += limit)
        string result = await client.GetStringAsync(
        string[] valuesOnThisPage = result.Split('\n');

        // 현재 페이지의 결과 전달
        foreach (string value in valuesOnThisPage)
            yield return value;

        // 마지막 페이지면 끝
        if (valuesOnThisPage.Length != limit)

   public async Task ProcessValuesAsync(HttpClient client)
       await foreach (string value in GetValuesAsync(client))

   public async Task ProcessValuesAsync(HttpClient client)
       await foreach (string value in GetValuesAsync(client).ConfigureAwait(false))
           await Task.Delay(100).ConfigureAwait(false); // 비동기 작업


비동기 스트림과 LINQ 함께 사용

IEnumerable<T>에는 LINQ to Objects가 있고 IObservable<T>에는 LINQ to Events가 있다.

IAsyncEnumerable<T>도 System.Linq.Async NuGet Package를 통해 LINQ 지원

IAsyncEnumerable<int> values = SlowRange().WhereAwait(
    async value =>
        // 요소의 포함 여부를 결정할 비동기 작업 수행
        await Task.Delay(10);
        return value % 2 == 0;
    .Where(value => value % 4 == 0); // 결과는 비동기 스트림

await foreach (int result in values)

// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange()
    for (int i = 0; i < 10; ++i)
        await Task.Delay(i * 100);
        yield return i;

Async 접미사는 값을 추출하거나 계산을 수행한 뒤에 비동기 시퀀스가 아닌 비동기 스칼라 값을 반환하는 연산자에만 붙는다.

int count = await SlowRange().CountAsync(
    value => value % 2 == 0);

// 조건자가 비동기적일 땐 AwaitAsync 접미사가 붙는 연산자를 사용
int count = await SlowRange().CountAwaitAsync(
    async value =>
        await Task.Delay(10);
        return value % 2 == 0;

비동기 스트림 취소

using var cts = new CancellationTokenSource(500);
CancellationToken token = ct.Token;

await foreach (int result in SlowRange(token))

// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange(
    [EnumeratorCancellation] CancellationToken token = default)
    for (int i = 0; i < 10; ++i)
        await Task.Delay(i * 100, token);
        yield return i;

비동기 스트림의 열거에 CancellationToken을 추가할 수 있는 WithCancellation 확장 메서드 지원

async Task ConsumeSequence(IAsyncEnumerable<int> items)
    using var cts = new CancellationTokenSource(500);
    CancellationToken token = cts.Token;
    await foreach (int result in items.WithCancellation(token))

await ConsumeSequence(SlowRange());


Concurrency in C# Cookbook, 2nd Edition

  • 한 번에 두 가지 이상의 작업 수행


  • 다수의 실행 스레드를 사용하는 동시성의 한 형태

Parallel Processing

  • 많은 작업을 여러 스레드에 나눠서 동시에 수행
  • 멀티스레딩을 사용해서 멀티 코어 프로세서를 최대한 활용하는 방법

Asynchronous Programming

  • 불필요한 스레드의 사용을 피하려고 future나 callback을 사용하는 동시성의 한 형태

Reactive Programing

  • 애플리케이션이 이벤트에 대응하게 하는 선언형 프로그래밍 방식
  • 비동기 연산이 아닌 비동기 이벤트 기반



