https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

TPL(Task Parallel Library) Dataflow

Nuget Package: System.Threading.Tasks.Dataflow

try
{
    var multiplyBlock = new TransformBlock<int, int>(item =>
    {
        if (item == 1)
            throw new InvalidOperationException("Blech.");
        return item * 2;
    });
    
    var substractBlock = new TransformBlock<int, int>(item => item - 2);
    
    multiplyBlock.LinkTo(substractBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    
    multiplyBlock.Post(1);
    substractBlock.Completion.Wait();
}
catch (AggregateException ex)
{
    AggregateException agex = ex.Flatten();
    Trace.WriteLine(ex.InnerException);
}

'.NET > C#' 카테고리의 다른 글

Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

Nuget Package: System.Reactive

 

  • 이벤트 스트림을 데이터 스트림처럼 다룰 수 있다.
  • Observable stream의 개념을 바탕으로 한다.
  • Observable stream을 구독(subscribe)하면 정해지지 않은 수의 데이터 항목을 수신(OnNext)한 뒤에 하나의 오류(OnError) 또는 스트림 끝 알림(OnCompleted)으로 스트림이 끝난다.
  • 끝나지 않는 observable stream도 있다.

실제 인터페이스 모습, System.Reactive 라이브러리에 모든 구현을 포함하고 있음.

interface IObserver<in T>
{
    void OnNext(T item);
    void OnCompleted();
    void OnError(Exception error);
}

interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

Example:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Where(ev => ev.Value % 2 == 0)
    .Select(ev => ev.Timestamp)
    .Subscribe(
        ts => Trace.WriteLine(ts),
        ex => Trace.WriteLine(ex));

IObservable<DateTimeOffset> timestamps = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Where(ev => ev.Value % 2 == 0)
    .Select(ev => ev.Timestamp);
timestamps.Subscribe(
    ts => Trace.WriteLine(ts),
    ex => Trace.WriteLine(ex));
  • System.Reactive 구독 역시 리소스다.
  • Subscribe 연산자는 구독을 나타내는 IDisposable 반환
    • 항상 오류 처리 매개 변수를 함께 받아야 함
  • 코드에서 observable stream의 수신을 완료하면 구독을 삭제해야 함
  • 구독은 hot observable과 cold observable에서 다르게 동작
    • hot observable: 언제든 발생할 수 있는 이벤트 스트림
      이벤트가 발생할 때 구독이 없으면 해당 이벤트는 사라짐
    • cold observable: 자동으로 발생하는 이벤트가 없는 경우
      구독에 대응해서 이벤트를 순서대로 발생하기 시작

http://www.introtorx.com 

 

Introduction to Rx

IntroToRx.com is the online resource for getting started with the Reactive Extensions to .Net. Originally starting life as a blog series, it has now flourished into an online book. You can read it online here via the website, or get a copy of the Kindle ed

introtorx.com

 

.NET 이벤트 변환

var progress = new Progress<int>();
IObservable<EventPattern<int>> progressReports =
    Observable.FromEventPattern<int>(
        handler => progress.ProgressChanged += handler,
        handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));


var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<ElapsedEventArgs>> ticks =
    Observable.FromEventPattern<ElaspedEventHandler, ElaspedEventArgs>(
        handler => (s, a) => handler(s, a),	// EventHandler<ElapsedEventArgs to ElapsedEventArgs
        handler => timer.Elapsed += handler,
        handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));


var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<object>> ticks =
    Observable.FromEventPattern(timer, nameof(Timer.Elapsed));
ticks.Subscribe(data => Trace.WriteLine("OnNext: "
    + (data.EventArgs as ElapsedEventArgs).SignalTime));

 

컨텍스트로 알림 전달

각 OnNext 알림은 순차적으로 발생하지만 무조건 같은 스레드에서 발생하지 않는다.

private void Button_Click(object sender, RoutedEventArgs e)
{
    SynchronizationContext uiContext = SynchronizationContext.Current;
    Trace.WriteLine($"UI Thread is {Environment.CurrentManagedThreadId}"};
    Observable.Interval(TimeSpan.FromSeconds(1))
        .ObserveOn(uiContext)
        .Subscribe(x => Trace.WriteLine(
            $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}

UI 스레드를 벗어나는 용도

SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
    handler => (s, a) => handler(s, a),
    handler => MouseMove += handler,
    handler => MouseMove -= handler)
    .Select(evt => evt.EventArgs.GetPosition(this))
    .ObserveOn(Scheduler.Default)
    .Select(position =>
    {
        // 복잡한 계산
        Trace.WriteLine($"Calculated result {rv} on thread {Environment.CurrentManagedThreadId}");
        return rv;
    })
    .ObserveOn(uiContext)
    .Subscribe(x => Trace.WriteLine(
        $"Result {x} on thread {Environment.CurrentManagedThreadId}"));

 

Window와 Buffer로 이벤트 데이터 그룹화

  Buffer Window
이벤트 전달 들어오는 이벤트의 그룹화가 끝나면 모든 이벤트를 이벤트 컬렉션으로 한 번에 전달 들어오는 이벤트를 논리적으로 그룹호하지만 도착하는 대로 전달
반환형식 IObservable<IList<T>> IObservable<IObservable<T>>

 

'.NET > C#' 카테고리의 다른 글

Concurrency - TPL Dataflow  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

데이터 병렬

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
    Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}

IEnumerable<bool> PrimalityTest(IEnumerable<int> values)
{
    return values.AsParallel()
        .Select(value => IsPrime(value));
}

 

작업 병렬(병렬 호출)

void ProcessArray(double[] array)
{
    Parallel.Invoke(
        () => ProcessPartialArray(array, 0, array.Length / 2),
        () => ProcessPartialArray(array, array.Length / 2, array.Length)
    );
}

void ProcessPartialArray(double[] array, int begin, int end)
{
    // CPU 집약적인 처리
}

작업 병렬에서는 특히 closure 안에 캡처한 변수에 주의해야 한다.

값이 아닌 참조를 캡처하므로 명확하지 않은 공유가 일어날 수도 있다.

try
{
    Parallel.Invoke(
        () => { throw new Exception(); },
        () => { throw new Exception(); });
}
catch (AggregateException ex)
{
    ex.Handle(exception =>
    {
        Trace.WriteLine(exception);
        return true; // 처리함
    });
}

루푸 중지

void InvertMatrices(IEnumerable<Matrix> matrices)
{
    Pallel.ForEach(matrices, (matrix, state) =>
    {
        if (matrix.InInvertible)
            matrix.Invert();
        else
            state.Stop();
    });
}

중지는 루프의 내부에서 일어나며 취소는 루푸의 외부에서 일어난다.

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees, CancellationToken token)
{
    Parallel.ForEach(matrices,
        new ParallelOptions { CancellationToken = token },
        matrix => matrix.Rotate(degrees));
}

공유 상태를 보호하는 잠금(lock)의 사용법 예

int InvertMatrices(IEnumerable<Matrix> matrices)
{
    object mutex = new object();
    int nonInvertibleCount = 0;
    Parallel.ForEach(matrices, matrix =>
    {
        if (matrix.IsInvertible)
            matrix.Invert();
        else
        {
            lock (mutex)
            {
                ++nonInvertibleCount;
            }
        }
    });
    return nonInvertibleCount;
}

PLINQ는 Parallel과 거의 똑같은 기능을 제공한다. 차이점:

  • PLINQ는 컴퓨터의 모든 코어를 사용할 수 있다고 가정
  • Prallel은 CPU의 상황에 따라 동적으로 대응

병렬 집계

int ParallelSum1(IEnumerable<int> values)
{
    object mutex = new object();
    int result = 0;    
    Parallel.ForEach(source: values,
        localInit: () => 0,
        body: (item, state, localValue) => localValue + item,
        localFinally: localValue =>
        {
            lock (mutex)
                result += localValue;
        });
    return result;
}

int ParallelSum2(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

int ParallelSum3(IEnumerable<int> values)
{
    return values.AsParallel().Aggregate(
        seed: 0,
        func: (sum, item) => sum + item
    );
}

병렬 호출

void DoAction20Times(Action action, CancellationToken token)
{
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(
        new ParallelOptions( { CancellationToken = token },
        actions);
}

 

TPL의 핵심은 Task 형식이다.

Parallel 클래스오 PLINQ는 강력한 Task 형식을 편리하게 쓸 수 있게 감싼 wrapper일 뿐

동적 병렬 처리가 필요하다면 Task 형식을 직접 사용하는 것이 가장 쉽다.

 

이진 트리의 각 노드에 비용이 많이 드는 처리를 수행해야 하는 예

void Traverse(Node current)
{
    DoExpensiveActionOnNode(current);
    
    if (current.Left is not null)
        ProcessNode(current.Left);
    if (current.Right is not null)
        ProcessNode(current.Right);
}

Task ProcessNode(Node node,
    TaskCreationOptions options = TaskCreationOptions.AttachedToParent)
{
    return Task.Factory.StartNew(
        () => Traverse(node),
        CancellationToken.None,
        options,
        TaskScheduler.Default);
}

void ProcessTree(Node root)
{
    var task = ProcessNode(root, TaskCreationOptions.None);
    task.Wait();
}

연속 작업(Continuation)

Task task = Task.Factory.StartNew(
    () => Thread.Sleep(TimeSpan.FromSeconds(2)),
    CancellationToken.None,
    TaskCreationOptions.None,
    TaskScheduler.Default);

Task continuation = task.ContinueWith(
    t => Trace.WriteLine("Task is done"),
    CancelltationToken.None,
    TaskContinuationOptions.None,
    TaskScheduler.Default);

PLINQ

IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
    return values.AsParallel().Select(value => value * 2);
}

IEnumerable<int> MultiplyBy2Ordered(IEnumerable<int> values)
{
    return values.AsParallel()
        .AsOrdered()
        .Select(value => value * 2);
}

int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

 

'.NET > C#' 카테고리의 다른 글

Concurrency - TPL Dataflow  (0) 2023.08.16
Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

Example

async Task DoSomethingAsync()
{
    int value = 1;
    
    // Context 저장
    // null ? SynchronizationContext : TaskScheduler
    // ASP.NET Core는 별도의 요청 컨텍스트가 아닌 threadpool context 사용
    await Task.Delay(TimeSpan.FromSeconds(1));
    
    value *= 2;
    
    await Task.Delay(TimeSpan.FromSeconds(1));
    
    Trace.WriteLine(value);
}
항상 코어 '라이브러리' 메서드 안에서 ConfigureAwait를 호출하고,
필요할 때만 다른 외부 '사용자 인터페이스' 메서드에서 컨택스트를 재개하는 것이 좋다.
async Task DoSomethingAsync()
{
    int value = 1;
    
    await Task.Delay(TimeSpan.FromSeconds(1))
        .ConfigureAwait(false);
    // Threadpool thread에서 실행을 재개한다.
    
    value *= 2;
    
    await Task.Delay(TimeSpan.FromSeconds(1))
        .ConfigureAwait(false);
    
    Trace.WriteLine(value);
}

 

ValueTask<T>

  • 메모리 내 캐시에서 결과를 읽을 수 있는 등 메모리 할당을 줄일 수 있는 형식

 

Task 인스턴스를 만드는 방법

  1. CPU가 실행해야 할 실제 코드를 나타내는 계산 작업은 Task.Run으로 생성
  2. 특정 스케줄러에서 실행해야 한다면 TaskFactory.StartNew로 생성
  3. 이벤트 기반 작업은 TaskCompletionSource<TResult>
    (대부분 I/O 작업은 TaskCompletionSource<TResult> 사용)

 

오류 처리

async Task TrySomethingAsync()
{
    // 예외는 Task에서 발생한다.
    var task = PossibleExceptionAsync();
    
    try
    {
        // 여기서 예외 발생
        await task;
    }
    catch (NotSupportedException ex)
    {
        // 이렇게 잡힌 예외는 자체적으로 적절한 스택 추적(Stack trace)을 보존하고 있으며
        // 따로 TargetInvocationException이나 AggregateException으로 쌓여 있지 않다.
        LogException(ex);
        throw;
    }
}

 

Deadlock

async Task WaitAsync()
{
    // 3. 현재 context 저장
    await Task.Delay(TimeSpan.FromSeconds(1));
    // ...
    // 3. 저장된 context 안에서 재개 시도
    //  Deadlock 메서드의 2. task.Wait()에서 차단된 thread
    //  context는 한 번에 하나의 thread만 허용하므로 재개할 수 없음
}

void Deadlock()
{
    // 1. 지연 시작
    var task = WaitAsync();
    
    // 2. 동기적으로 차단하고 async 메서드의 완료 대기
    task.Wait();
}

위의 코든느 UI 컨텍스트나 ASP.NET 클래식 컨텍스트에서 호출하면 교착 상태에 빠진다.

ConfigureAwait(false)로 해결

async Task WaitAsync()
{
    // 3. 현재 context 저장
    await Task.Delay(TimeSpan.FromSeconds(1))
        .ConfigureAwait(false);
    // ...
    // 3. Threadpool thread에서 재개
}

void Deadlock()
{
    // 1. 지연 시작
    var task = WaitAsync();
    
    // 2. 동기적으로 차단하고 async 메서드의 완료 대기
    task.Wait();
}

https://learn.microsoft.com/en-us/dotnet/csharp/asynchronous-programming/

 

Asynchronous programming in C#

An overview of the C# language support for asynchronous programming using async, await, Task, and Task

learn.microsoft.com

https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap

 

Task-based Asynchronous Pattern (TAP): Introduction and overview

Learn about the Task-based Asynchronous Pattern (TAP), and compare it to the legacy patterns: Asynchronous Programming Model (APM) and Event-based Asynchronous Pattern (EAP).

learn.microsoft.com

 

Exponential backoff

async Task<string> DownloadStringWithRetries(HttpClient client, string uri)
{
    TimeSpan nextDelay = TimeSpan.FromSeconds(1);
    for (int i = 0; i < 3; ++i)
    {
        try
        {
            return await client.GetStringAsync(uri);
        }
        catch { }
        
        await Task.Delay(nextDelay);
        nextDelay = nextDelay + nextDelay;
    }
    // 오류를 전파할 수 있게 마지막으로 한 번 더 시도
    return await client.GetStringAsync(uri);
}

 

Soft timeout

async Task<string> DownloadStringWithTimeout(HttpClient client, string uri)
{
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    Task<string> downloadTask = client.GetStringAsync(uri);
    Task timeoutTask = Task.Delay(Timeout.InfiniteTimeSpan, cts.Token);
    
    Task completedTask = await Task.WhenAny(downloadTask, timeoutTask);
    if (completedTask == timeoutTask)
    {
        // WARNING: downloadTask는 여전히 동작한다.
        return null;
    }
    return await downloadTask;
}

타임아웃이 지나면 실행을 중단해야 할 때

async Task IssueTimeoutAsync()
{
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    CancellationToken token = cts.Token;
    await Task.Delay(TimeSpan.FromSeconds(10), token);
}

async Task IssueTimeoutAsync()
{
    using var cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    cts.CancelAfter(TimeSpan.FromSeconds(3));
    await Task.Delay(TimeSpan.FromSeconds(10), token);
}

 

비동기 시그니처를 사용해서 동기 메서드 구현

interface IMyAsync
{
    Task<int> GetValueAsync(CancellationToken token);
    Task DoSomethingAsync();
}

class MySync : IMyAsync
{
    // 자주 사용하는 작업 결과라면 미리 만들어 놓고 쓴다.
    private static readonly Task<int> ZeroTask = Task.FromResult(0);

    public Task<int> GetValueAsync(CancellationToken token)
    {
        if (token.IsCancellationRequested)
            return Task.FromCanceled<int>(token);
        return Task.FromResult(10);
    }
    
    public Task<T> NotImplementedAsync()
    {
        return Task.FromException<T>(new NotImplementedException());
    }
    
    protected void DoSomethingSynchronously()
    {
    }
    
    public Task DoSomethingAsync()
    {
        try
        {
            DoSomethingSynchronously();
            return Task.CompletedTask;
        }
        catch (Exception ex)
        {
            return Task.FromException(ex);
        }
    }
}

 

진행 상황 보고

async Task MyMethodAsync(IProgress<double> progress = null)
{
    bool done = false;
    double percentComplete = 0;
    while (!done)
    {
        ...
        progress?.Report(percentComplete);
    }
}

async Task CallMyMethodAsync()
{
    var progress = new Progress<double>();
    progress.ProgressChanged += (sender, args) =>
    {
        ...
    };
    await MyMethodAsync(progress);
}

 

모든 작업의 완료 대기

Task task1 = Task.Delay(TimeSpan.FromSeconds(1));
Task task2 = Task.Delay(TimeSpan.FromSeconds(2));
Task task3 = Task.Delay(TimeSpan.FromSeconds(1));

await Task.WhenAll(task1, task2, task3);


Task<int> task1 = Task.FromResult(1);
Task<int> task2 = Task.FromResult(3);
Task<int> task3 = Task.FromResult(5);

int[] results = await Task.WhenAll(task1, task2, task3);
// results = [1, 3, 5]

Example:

async Task<string> DownloadAllAsync(HttpClient client,
    IEnumerable<string> urls)
{
    var downloads = urls.Select(url => client.GetStringAsync(url));
    // 아직 실제로 시작한 작업은 없다.
    
    // 동시에 모든 URL에서 다운로드 시작
    Task<string>[] downloadTasks = downloads.ToArray();
    
    // 모든 다운로드 완료를 비동기적으로 대기
    string[] htmlPages = await Task.WhenAll(downloadTasks);
    
    return string.Concat(htmlPages);
}

작업 중 하나가 예외를 일으키면 Task.WhenAll은 작업과 함께 해당 예외를 반환하며 실패한다.

여러 작업이 예외를 일으키면 모든 예외를 Task.WhenAll이 반환하는 Task에 넣는다.

하지만 작업이 대기 상태면 예외 중 하나만 일으킨다.

async Task ThrowNotImplementedExceptionAsync()
{
    throw new NotImplementedException();
}

async Task ThrowInvalidOperationExceptionAsync()
{
    throw new InvalidOperationException();
}

async Task ObserveOneExceptionAsync()
{
    var task1 = ThrowNotImplementedExceptionAsync();
    var task2 = ThrowInvalidOperationExceptionAsync();
    
    try
    {
        await Task.WhenAll(task1, task2);
    }
    catch (Excpeiton ex)
    {
        // ex는 NotImplementedException or InvalidOperationException
        Trace.WriteLine(ex);
    }
}

async Task ObserveAllExcpetionAsync()
{
    var task1 = ThrowNotImplementedExceptionAsync();
    var task2 = ThrowInvalidOperationExceptionAsync();

    Task allTasks = Task.WhenAll(task1, task2);
    try
    {
        await allTasks;
    }
    catch
    {
        AggregateException allExceptions = allTasks.Excpetion;
        ...
    }
}

 

작업이 완료될 때마다 처리

async Task<int> DelayAndReturnAsync(int value)
{
    await Task.Delay(TimeSpan.FromSeconds(value));
    return value;
}

async Task AwaitAndProcessAsync(Task<int> task)
{
    int rv = await task;
    Trace.WriteLine(rv);
}

async Task ProcessTasksAsync(int flag)
{
    Task<int> taskA = DelayAndReturnAsync(2);
    Task<int> taskB = DelayAndReturnAsync(3);
    Task<int> taskC = DelayAndReturnAsync(1);
    var tasks = new Task[] { taskA, taskB, taskC };
    
    Task[] processingTasks;
    
    if (flag == 1)
    {
        IEnumerable<Task> taskQuery =
            from t in tasks select AwaitAndProcessAsync(t);
        processingTasks = taskQuery.ToArray();

        await Task.WhenAll(processingTasks);
    }
    else if (flag == 2)
    {
        processingTasks = tasks.Select(async t =>
        {
            var rv = await t;
            Trace.WriteLine(rv);
        }).ToArray();

        await Task.WhenAll(processingTasks);
    }
    else if (flag == 3)
    {
        foreach (Task<int> task in tasks.OrderByCompletion())
        {
            int rv = await task;
            Trace.WriteLine(rv);
        }
    }
}

 

async void 메서드의 예외 처리

sealed class MyAsyncCommand : ICommand
{
    async void ICommand.Execute(object parameter)
    {
        await Execute(parameter);
    }
    
    public async Task Execute(object parameter)
    {
        ; // 비동기 작업 구현
    }
    
    ; // CanExecute 등 구현
}

 

ValueTask 생성/사용

  • 반환할 수 있는 동기적 결과가 있고 비동기 동작이 드문 상황에서 반환 형식으로 사용
  • 프로파일링을 통해 애플리케이션의 성능 향상을 확인할 수 있을 때만 고려해야 함
  • ValueTask를 반환하는 DisposeAsync 메서드가 있는 IAsyncDisposable을 구현할 때
private Task<int> SlowMethodAsync();

public ValueTask<int> MethodAsync()
{
    if (CanBehaveSynchronously)
        return new ValueTask<int>(1);
    
    return new ValueTask<int>(SlowMethodAsync());
}

async Task ConsumingMethodAsync()
{
    ValueTask<int> valueTask = MethodAsync();
    ; // 기타 동시성 작업
    int value = await valueTask;
    ;
}
ValueTask는 딱 한 번만 대기할 수 있다.
더 복잡한 작업을 하려면 AsTask를 호출해서 ValueTask<T>를 Task<T>로 변환해야 한다.
ValueTask에서 동기적으로 결과를 얻으려면 ValueTask를 완료한 뒤에 한 번만 할 수 있다.
async Task ConsumingTaskAsync()
{
    Task<int> task = MethodAsync().AsTask();
    ; // 기타 동시성 작업
    int value = await task;
    // Task<T>는 await로 여러 번 대기해도 완벽하게 안전하다.
    int anotherValue = await task;
}

async Task ConsumingTaskAsync()
{
    Task<int> task1 = MethodAsync().AsTask();
    Task<int> task2 = MethodAsync().AsTask();
    int[] results = await Task.WhenAll(task1, task2);
}

 

Asynchronous Stream

async IAsyncEnumerable<string> GetValuesAsync(HttpClient client)
{
    const int limit = 10;
    for (int offset = 0; true; offset += limit)
    {
        string result = await client.GetStringAsync(
            $"https://example.com/api/values?offset={offset}&limit={limit}");
        string[] valuesOnThisPage = result.Split('\n');

        // 현재 페이지의 결과 전달
        foreach (string value in valuesOnThisPage)
            yield return value;

        // 마지막 페이지면 끝
        if (valuesOnThisPage.Length != limit)
            break;
   }

   public async Task ProcessValuesAsync(HttpClient client)
   {
       await foreach (string value in GetValuesAsync(client))
       {
           Console.WriteLine(value);
       }
   }

   public async Task ProcessValuesAsync(HttpClient client)
   {
       await foreach (string value in GetValuesAsync(client).ConfigureAwait(false))
       {
           await Task.Delay(100).ConfigureAwait(false); // 비동기 작업
           Console.WriteLine(value);
       }
   }

 

비동기 스트림과 LINQ 함께 사용

IEnumerable<T>에는 LINQ to Objects가 있고 IObservable<T>에는 LINQ to Events가 있다.

IAsyncEnumerable<T>도 System.Linq.Async NuGet Package를 통해 LINQ 지원

IAsyncEnumerable<int> values = SlowRange().WhereAwait(
    async value =>
    {
        // 요소의 포함 여부를 결정할 비동기 작업 수행
        await Task.Delay(10);
        return value % 2 == 0;
    })
    .Where(value => value % 4 == 0); // 결과는 비동기 스트림

await foreach (int result in values)
    Console.WriteLine(result);


// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange()
{
    for (int i = 0; i < 10; ++i)
    {
        await Task.Delay(i * 100);
        yield return i;
    }
}

Async 접미사는 값을 추출하거나 계산을 수행한 뒤에 비동기 시퀀스가 아닌 비동기 스칼라 값을 반환하는 연산자에만 붙는다.

int count = await SlowRange().CountAsync(
    value => value % 2 == 0);

// 조건자가 비동기적일 땐 AwaitAsync 접미사가 붙는 연산자를 사용
int count = await SlowRange().CountAwaitAsync(
    async value =>
    {
        await Task.Delay(10);
        return value % 2 == 0;
    });

비동기 스트림 취소

using var cts = new CancellationTokenSource(500);
CancellationToken token = ct.Token;

await foreach (int result in SlowRange(token))
{
    Console.WriteLine(result);
}

// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange(
    [EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i < 10; ++i)
    {
        await Task.Delay(i * 100, token);
        yield return i;
    }
}

비동기 스트림의 열거에 CancellationToken을 추가할 수 있는 WithCancellation 확장 메서드 지원

async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
    using var cts = new CancellationTokenSource(500);
    CancellationToken token = cts.Token;
    await foreach (int result in items.WithCancellation(token))
    {
        Console.WriteLine(result);
    }
}

await ConsumeSequence(SlowRange());

 

'.NET > C#' 카테고리의 다른 글

Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15
Array Marshaling  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

Concurrency

  • 한 번에 두 가지 이상의 작업 수행

Multithreading

  • 다수의 실행 스레드를 사용하는 동시성의 한 형태

Parallel Processing

  • 많은 작업을 여러 스레드에 나눠서 동시에 수행
  • 멀티스레딩을 사용해서 멀티 코어 프로세서를 최대한 활용하는 방법

Asynchronous Programming

  • 불필요한 스레드의 사용을 피하려고 future나 callback을 사용하는 동시성의 한 형태

Reactive Programing

  • 애플리케이션이 이벤트에 대응하게 하는 선언형 프로그래밍 방식
  • 비동기 연산이 아닌 비동기 이벤트 기반

 

 

'.NET > C#' 카테고리의 다른 글

Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15
Array Marshaling  (0) 2021.10.15
Comparisons and Sorts  (0) 2021.10.15

+ Recent posts