https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

Example

async Task DoSomethingAsync()
{
    int value = 1;
    
    // Context 저장
    // null ? SynchronizationContext : TaskScheduler
    // ASP.NET Core는 별도의 요청 컨텍스트가 아닌 threadpool context 사용
    await Task.Delay(TimeSpan.FromSeconds(1));
    
    value *= 2;
    
    await Task.Delay(TimeSpan.FromSeconds(1));
    
    Trace.WriteLine(value);
}
항상 코어 '라이브러리' 메서드 안에서 ConfigureAwait를 호출하고,
필요할 때만 다른 외부 '사용자 인터페이스' 메서드에서 컨택스트를 재개하는 것이 좋다.
async Task DoSomethingAsync()
{
    int value = 1;
    
    await Task.Delay(TimeSpan.FromSeconds(1))
        .ConfigureAwait(false);
    // Threadpool thread에서 실행을 재개한다.
    
    value *= 2;
    
    await Task.Delay(TimeSpan.FromSeconds(1))
        .ConfigureAwait(false);
    
    Trace.WriteLine(value);
}

 

ValueTask<T>

  • 메모리 내 캐시에서 결과를 읽을 수 있는 등 메모리 할당을 줄일 수 있는 형식

 

Task 인스턴스를 만드는 방법

  1. CPU가 실행해야 할 실제 코드를 나타내는 계산 작업은 Task.Run으로 생성
  2. 특정 스케줄러에서 실행해야 한다면 TaskFactory.StartNew로 생성
  3. 이벤트 기반 작업은 TaskCompletionSource<TResult>
    (대부분 I/O 작업은 TaskCompletionSource<TResult> 사용)

 

오류 처리

async Task TrySomethingAsync()
{
    // 예외는 Task에서 발생한다.
    var task = PossibleExceptionAsync();
    
    try
    {
        // 여기서 예외 발생
        await task;
    }
    catch (NotSupportedException ex)
    {
        // 이렇게 잡힌 예외는 자체적으로 적절한 스택 추적(Stack trace)을 보존하고 있으며
        // 따로 TargetInvocationException이나 AggregateException으로 쌓여 있지 않다.
        LogException(ex);
        throw;
    }
}

 

Deadlock

async Task WaitAsync()
{
    // 3. 현재 context 저장
    await Task.Delay(TimeSpan.FromSeconds(1));
    // ...
    // 3. 저장된 context 안에서 재개 시도
    //  Deadlock 메서드의 2. task.Wait()에서 차단된 thread
    //  context는 한 번에 하나의 thread만 허용하므로 재개할 수 없음
}

void Deadlock()
{
    // 1. 지연 시작
    var task = WaitAsync();
    
    // 2. 동기적으로 차단하고 async 메서드의 완료 대기
    task.Wait();
}

위의 코든느 UI 컨텍스트나 ASP.NET 클래식 컨텍스트에서 호출하면 교착 상태에 빠진다.

ConfigureAwait(false)로 해결

async Task WaitAsync()
{
    // 3. 현재 context 저장
    await Task.Delay(TimeSpan.FromSeconds(1))
        .ConfigureAwait(false);
    // ...
    // 3. Threadpool thread에서 재개
}

void Deadlock()
{
    // 1. 지연 시작
    var task = WaitAsync();
    
    // 2. 동기적으로 차단하고 async 메서드의 완료 대기
    task.Wait();
}

https://learn.microsoft.com/en-us/dotnet/csharp/asynchronous-programming/

 

Asynchronous programming in C#

An overview of the C# language support for asynchronous programming using async, await, Task, and Task

learn.microsoft.com

https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap

 

Task-based Asynchronous Pattern (TAP): Introduction and overview

Learn about the Task-based Asynchronous Pattern (TAP), and compare it to the legacy patterns: Asynchronous Programming Model (APM) and Event-based Asynchronous Pattern (EAP).

learn.microsoft.com

 

Exponential backoff

async Task<string> DownloadStringWithRetries(HttpClient client, string uri)
{
    TimeSpan nextDelay = TimeSpan.FromSeconds(1);
    for (int i = 0; i < 3; ++i)
    {
        try
        {
            return await client.GetStringAsync(uri);
        }
        catch { }
        
        await Task.Delay(nextDelay);
        nextDelay = nextDelay + nextDelay;
    }
    // 오류를 전파할 수 있게 마지막으로 한 번 더 시도
    return await client.GetStringAsync(uri);
}

 

Soft timeout

async Task<string> DownloadStringWithTimeout(HttpClient client, string uri)
{
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    Task<string> downloadTask = client.GetStringAsync(uri);
    Task timeoutTask = Task.Delay(Timeout.InfiniteTimeSpan, cts.Token);
    
    Task completedTask = await Task.WhenAny(downloadTask, timeoutTask);
    if (completedTask == timeoutTask)
    {
        // WARNING: downloadTask는 여전히 동작한다.
        return null;
    }
    return await downloadTask;
}

타임아웃이 지나면 실행을 중단해야 할 때

async Task IssueTimeoutAsync()
{
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    CancellationToken token = cts.Token;
    await Task.Delay(TimeSpan.FromSeconds(10), token);
}

async Task IssueTimeoutAsync()
{
    using var cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    cts.CancelAfter(TimeSpan.FromSeconds(3));
    await Task.Delay(TimeSpan.FromSeconds(10), token);
}

 

비동기 시그니처를 사용해서 동기 메서드 구현

interface IMyAsync
{
    Task<int> GetValueAsync(CancellationToken token);
    Task DoSomethingAsync();
}

class MySync : IMyAsync
{
    // 자주 사용하는 작업 결과라면 미리 만들어 놓고 쓴다.
    private static readonly Task<int> ZeroTask = Task.FromResult(0);

    public Task<int> GetValueAsync(CancellationToken token)
    {
        if (token.IsCancellationRequested)
            return Task.FromCanceled<int>(token);
        return Task.FromResult(10);
    }
    
    public Task<T> NotImplementedAsync()
    {
        return Task.FromException<T>(new NotImplementedException());
    }
    
    protected void DoSomethingSynchronously()
    {
    }
    
    public Task DoSomethingAsync()
    {
        try
        {
            DoSomethingSynchronously();
            return Task.CompletedTask;
        }
        catch (Exception ex)
        {
            return Task.FromException(ex);
        }
    }
}

 

진행 상황 보고

async Task MyMethodAsync(IProgress<double> progress = null)
{
    bool done = false;
    double percentComplete = 0;
    while (!done)
    {
        ...
        progress?.Report(percentComplete);
    }
}

async Task CallMyMethodAsync()
{
    var progress = new Progress<double>();
    progress.ProgressChanged += (sender, args) =>
    {
        ...
    };
    await MyMethodAsync(progress);
}

 

모든 작업의 완료 대기

Task task1 = Task.Delay(TimeSpan.FromSeconds(1));
Task task2 = Task.Delay(TimeSpan.FromSeconds(2));
Task task3 = Task.Delay(TimeSpan.FromSeconds(1));

await Task.WhenAll(task1, task2, task3);


Task<int> task1 = Task.FromResult(1);
Task<int> task2 = Task.FromResult(3);
Task<int> task3 = Task.FromResult(5);

int[] results = await Task.WhenAll(task1, task2, task3);
// results = [1, 3, 5]

Example:

async Task<string> DownloadAllAsync(HttpClient client,
    IEnumerable<string> urls)
{
    var downloads = urls.Select(url => client.GetStringAsync(url));
    // 아직 실제로 시작한 작업은 없다.
    
    // 동시에 모든 URL에서 다운로드 시작
    Task<string>[] downloadTasks = downloads.ToArray();
    
    // 모든 다운로드 완료를 비동기적으로 대기
    string[] htmlPages = await Task.WhenAll(downloadTasks);
    
    return string.Concat(htmlPages);
}

작업 중 하나가 예외를 일으키면 Task.WhenAll은 작업과 함께 해당 예외를 반환하며 실패한다.

여러 작업이 예외를 일으키면 모든 예외를 Task.WhenAll이 반환하는 Task에 넣는다.

하지만 작업이 대기 상태면 예외 중 하나만 일으킨다.

async Task ThrowNotImplementedExceptionAsync()
{
    throw new NotImplementedException();
}

async Task ThrowInvalidOperationExceptionAsync()
{
    throw new InvalidOperationException();
}

async Task ObserveOneExceptionAsync()
{
    var task1 = ThrowNotImplementedExceptionAsync();
    var task2 = ThrowInvalidOperationExceptionAsync();
    
    try
    {
        await Task.WhenAll(task1, task2);
    }
    catch (Excpeiton ex)
    {
        // ex는 NotImplementedException or InvalidOperationException
        Trace.WriteLine(ex);
    }
}

async Task ObserveAllExcpetionAsync()
{
    var task1 = ThrowNotImplementedExceptionAsync();
    var task2 = ThrowInvalidOperationExceptionAsync();

    Task allTasks = Task.WhenAll(task1, task2);
    try
    {
        await allTasks;
    }
    catch
    {
        AggregateException allExceptions = allTasks.Excpetion;
        ...
    }
}

 

작업이 완료될 때마다 처리

async Task<int> DelayAndReturnAsync(int value)
{
    await Task.Delay(TimeSpan.FromSeconds(value));
    return value;
}

async Task AwaitAndProcessAsync(Task<int> task)
{
    int rv = await task;
    Trace.WriteLine(rv);
}

async Task ProcessTasksAsync(int flag)
{
    Task<int> taskA = DelayAndReturnAsync(2);
    Task<int> taskB = DelayAndReturnAsync(3);
    Task<int> taskC = DelayAndReturnAsync(1);
    var tasks = new Task[] { taskA, taskB, taskC };
    
    Task[] processingTasks;
    
    if (flag == 1)
    {
        IEnumerable<Task> taskQuery =
            from t in tasks select AwaitAndProcessAsync(t);
        processingTasks = taskQuery.ToArray();

        await Task.WhenAll(processingTasks);
    }
    else if (flag == 2)
    {
        processingTasks = tasks.Select(async t =>
        {
            var rv = await t;
            Trace.WriteLine(rv);
        }).ToArray();

        await Task.WhenAll(processingTasks);
    }
    else if (flag == 3)
    {
        foreach (Task<int> task in tasks.OrderByCompletion())
        {
            int rv = await task;
            Trace.WriteLine(rv);
        }
    }
}

 

async void 메서드의 예외 처리

sealed class MyAsyncCommand : ICommand
{
    async void ICommand.Execute(object parameter)
    {
        await Execute(parameter);
    }
    
    public async Task Execute(object parameter)
    {
        ; // 비동기 작업 구현
    }
    
    ; // CanExecute 등 구현
}

 

ValueTask 생성/사용

  • 반환할 수 있는 동기적 결과가 있고 비동기 동작이 드문 상황에서 반환 형식으로 사용
  • 프로파일링을 통해 애플리케이션의 성능 향상을 확인할 수 있을 때만 고려해야 함
  • ValueTask를 반환하는 DisposeAsync 메서드가 있는 IAsyncDisposable을 구현할 때
private Task<int> SlowMethodAsync();

public ValueTask<int> MethodAsync()
{
    if (CanBehaveSynchronously)
        return new ValueTask<int>(1);
    
    return new ValueTask<int>(SlowMethodAsync());
}

async Task ConsumingMethodAsync()
{
    ValueTask<int> valueTask = MethodAsync();
    ; // 기타 동시성 작업
    int value = await valueTask;
    ;
}
ValueTask는 딱 한 번만 대기할 수 있다.
더 복잡한 작업을 하려면 AsTask를 호출해서 ValueTask<T>를 Task<T>로 변환해야 한다.
ValueTask에서 동기적으로 결과를 얻으려면 ValueTask를 완료한 뒤에 한 번만 할 수 있다.
async Task ConsumingTaskAsync()
{
    Task<int> task = MethodAsync().AsTask();
    ; // 기타 동시성 작업
    int value = await task;
    // Task<T>는 await로 여러 번 대기해도 완벽하게 안전하다.
    int anotherValue = await task;
}

async Task ConsumingTaskAsync()
{
    Task<int> task1 = MethodAsync().AsTask();
    Task<int> task2 = MethodAsync().AsTask();
    int[] results = await Task.WhenAll(task1, task2);
}

 

Asynchronous Stream

async IAsyncEnumerable<string> GetValuesAsync(HttpClient client)
{
    const int limit = 10;
    for (int offset = 0; true; offset += limit)
    {
        string result = await client.GetStringAsync(
            $"https://example.com/api/values?offset={offset}&limit={limit}");
        string[] valuesOnThisPage = result.Split('\n');

        // 현재 페이지의 결과 전달
        foreach (string value in valuesOnThisPage)
            yield return value;

        // 마지막 페이지면 끝
        if (valuesOnThisPage.Length != limit)
            break;
   }

   public async Task ProcessValuesAsync(HttpClient client)
   {
       await foreach (string value in GetValuesAsync(client))
       {
           Console.WriteLine(value);
       }
   }

   public async Task ProcessValuesAsync(HttpClient client)
   {
       await foreach (string value in GetValuesAsync(client).ConfigureAwait(false))
       {
           await Task.Delay(100).ConfigureAwait(false); // 비동기 작업
           Console.WriteLine(value);
       }
   }

 

비동기 스트림과 LINQ 함께 사용

IEnumerable<T>에는 LINQ to Objects가 있고 IObservable<T>에는 LINQ to Events가 있다.

IAsyncEnumerable<T>도 System.Linq.Async NuGet Package를 통해 LINQ 지원

IAsyncEnumerable<int> values = SlowRange().WhereAwait(
    async value =>
    {
        // 요소의 포함 여부를 결정할 비동기 작업 수행
        await Task.Delay(10);
        return value % 2 == 0;
    })
    .Where(value => value % 4 == 0); // 결과는 비동기 스트림

await foreach (int result in values)
    Console.WriteLine(result);


// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange()
{
    for (int i = 0; i < 10; ++i)
    {
        await Task.Delay(i * 100);
        yield return i;
    }
}

Async 접미사는 값을 추출하거나 계산을 수행한 뒤에 비동기 시퀀스가 아닌 비동기 스칼라 값을 반환하는 연산자에만 붙는다.

int count = await SlowRange().CountAsync(
    value => value % 2 == 0);

// 조건자가 비동기적일 땐 AwaitAsync 접미사가 붙는 연산자를 사용
int count = await SlowRange().CountAwaitAsync(
    async value =>
    {
        await Task.Delay(10);
        return value % 2 == 0;
    });

비동기 스트림 취소

using var cts = new CancellationTokenSource(500);
CancellationToken token = ct.Token;

await foreach (int result in SlowRange(token))
{
    Console.WriteLine(result);
}

// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange(
    [EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i < 10; ++i)
    {
        await Task.Delay(i * 100, token);
        yield return i;
    }
}

비동기 스트림의 열거에 CancellationToken을 추가할 수 있는 WithCancellation 확장 메서드 지원

async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
    using var cts = new CancellationTokenSource(500);
    CancellationToken token = cts.Token;
    await foreach (int result in items.WithCancellation(token))
    {
        Console.WriteLine(result);
    }
}

await ConsumeSequence(SlowRange());

 

'.NET > C#' 카테고리의 다른 글

Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15
Array Marshaling  (0) 2021.10.15

+ Recent posts