https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

TPL(Task Parallel Library) Dataflow

Nuget Package: System.Threading.Tasks.Dataflow

try
{
var multiplyBlock = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
var substractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(substractBlock,
new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(1);
substractBlock.Completion.Wait();
}
catch (AggregateException ex)
{
AggregateException agex = ex.Flatten();
Trace.WriteLine(ex.InnerException);
}

'.NET > C#' 카테고리의 다른 글

Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

Nuget Package: System.Reactive

 

  • 이벤트 스트림을 데이터 스트림처럼 다룰 수 있다.
  • Observable stream의 개념을 바탕으로 한다.
  • Observable stream을 구독(subscribe)하면 정해지지 않은 수의 데이터 항목을 수신(OnNext)한 뒤에 하나의 오류(OnError) 또는 스트림 끝 알림(OnCompleted)으로 스트림이 끝난다.
  • 끝나지 않는 observable stream도 있다.

실제 인터페이스 모습, System.Reactive 라이브러리에 모든 구현을 포함하고 있음.

interface IObserver<in T>
{
void OnNext(T item);
void OnCompleted();
void OnError(Exception error);
}
interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}

Example:

Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(ev => ev.Value % 2 == 0)
.Select(ev => ev.Timestamp)
.Subscribe(
ts => Trace.WriteLine(ts),
ex => Trace.WriteLine(ex));
IObservable<DateTimeOffset> timestamps = Observable
.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(ev => ev.Value % 2 == 0)
.Select(ev => ev.Timestamp);
timestamps.Subscribe(
ts => Trace.WriteLine(ts),
ex => Trace.WriteLine(ex));
  • System.Reactive 구독 역시 리소스다.
  • Subscribe 연산자는 구독을 나타내는 IDisposable 반환
    • 항상 오류 처리 매개 변수를 함께 받아야 함
  • 코드에서 observable stream의 수신을 완료하면 구독을 삭제해야 함
  • 구독은 hot observable과 cold observable에서 다르게 동작
    • hot observable: 언제든 발생할 수 있는 이벤트 스트림
      이벤트가 발생할 때 구독이 없으면 해당 이벤트는 사라짐
    • cold observable: 자동으로 발생하는 이벤트가 없는 경우
      구독에 대응해서 이벤트를 순서대로 발생하기 시작

http://www.introtorx.com 

 

Introduction to Rx

IntroToRx.com is the online resource for getting started with the Reactive Extensions to .Net. Originally starting life as a blog series, it has now flourished into an online book. You can read it online here via the website, or get a copy of the Kindle ed

introtorx.com

 

.NET 이벤트 변환

var progress = new Progress<int>();
IObservable<EventPattern<int>> progressReports =
Observable.FromEventPattern<int>(
handler => progress.ProgressChanged += handler,
handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<ElapsedEventArgs>> ticks =
Observable.FromEventPattern<ElaspedEventHandler, ElaspedEventArgs>(
handler => (s, a) => handler(s, a), // EventHandler<ElapsedEventArgs to ElapsedEventArgs
handler => timer.Elapsed += handler,
handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<object>> ticks =
Observable.FromEventPattern(timer, nameof(Timer.Elapsed));
ticks.Subscribe(data => Trace.WriteLine("OnNext: "
+ (data.EventArgs as ElapsedEventArgs).SignalTime));

 

컨텍스트로 알림 전달

각 OnNext 알림은 순차적으로 발생하지만 무조건 같은 스레드에서 발생하지 않는다.

private void Button_Click(object sender, RoutedEventArgs e)
{
SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI Thread is {Environment.CurrentManagedThreadId}"};
Observable.Interval(TimeSpan.FromSeconds(1))
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(
$"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}

UI 스레드를 벗어나는 용도

SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(evt => evt.EventArgs.GetPosition(this))
.ObserveOn(Scheduler.Default)
.Select(position =>
{
// 복잡한 계산
Trace.WriteLine($"Calculated result {rv} on thread {Environment.CurrentManagedThreadId}");
return rv;
})
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(
$"Result {x} on thread {Environment.CurrentManagedThreadId}"));

 

Window와 Buffer로 이벤트 데이터 그룹화

  Buffer Window
이벤트 전달 들어오는 이벤트의 그룹화가 끝나면 모든 이벤트를 이벤트 컬렉션으로 한 번에 전달 들어오는 이벤트를 논리적으로 그룹호하지만 도착하는 대로 전달
반환형식 IObservable<IList<T>> IObservable<IObservable<T>>

 

'.NET > C#' 카테고리의 다른 글

Concurrency - TPL Dataflow  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

데이터 병렬

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}
IEnumerable<bool> PrimalityTest(IEnumerable<int> values)
{
return values.AsParallel()
.Select(value => IsPrime(value));
}

 

작업 병렬(병렬 호출)

void ProcessArray(double[] array)
{
Parallel.Invoke(
() => ProcessPartialArray(array, 0, array.Length / 2),
() => ProcessPartialArray(array, array.Length / 2, array.Length)
);
}
void ProcessPartialArray(double[] array, int begin, int end)
{
// CPU 집약적인 처리
}

작업 병렬에서는 특히 closure 안에 캡처한 변수에 주의해야 한다.

값이 아닌 참조를 캡처하므로 명확하지 않은 공유가 일어날 수도 있다.

try
{
Parallel.Invoke(
() => { throw new Exception(); },
() => { throw new Exception(); });
}
catch (AggregateException ex)
{
ex.Handle(exception =>
{
Trace.WriteLine(exception);
return true; // 처리함
});
}

루푸 중지

void InvertMatrices(IEnumerable<Matrix> matrices)
{
Pallel.ForEach(matrices, (matrix, state) =>
{
if (matrix.InInvertible)
matrix.Invert();
else
state.Stop();
});
}

중지는 루프의 내부에서 일어나며 취소는 루푸의 외부에서 일어난다.

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees, CancellationToken token)
{
Parallel.ForEach(matrices,
new ParallelOptions { CancellationToken = token },
matrix => matrix.Rotate(degrees));
}

공유 상태를 보호하는 잠금(lock)의 사용법 예

int InvertMatrices(IEnumerable<Matrix> matrices)
{
object mutex = new object();
int nonInvertibleCount = 0;
Parallel.ForEach(matrices, matrix =>
{
if (matrix.IsInvertible)
matrix.Invert();
else
{
lock (mutex)
{
++nonInvertibleCount;
}
}
});
return nonInvertibleCount;
}

PLINQ는 Parallel과 거의 똑같은 기능을 제공한다. 차이점:

  • PLINQ는 컴퓨터의 모든 코어를 사용할 수 있다고 가정
  • Prallel은 CPU의 상황에 따라 동적으로 대응

병렬 집계

int ParallelSum1(IEnumerable<int> values)
{
object mutex = new object();
int result = 0;
Parallel.ForEach(source: values,
localInit: () => 0,
body: (item, state, localValue) => localValue + item,
localFinally: localValue =>
{
lock (mutex)
result += localValue;
});
return result;
}
int ParallelSum2(IEnumerable<int> values)
{
return values.AsParallel().Sum();
}
int ParallelSum3(IEnumerable<int> values)
{
return values.AsParallel().Aggregate(
seed: 0,
func: (sum, item) => sum + item
);
}

병렬 호출

void DoAction20Times(Action action, CancellationToken token)
{
Action[] actions = Enumerable.Repeat(action, 20).ToArray();
Parallel.Invoke(
new ParallelOptions( { CancellationToken = token },
actions);
}

 

TPL의 핵심은 Task 형식이다.

Parallel 클래스오 PLINQ는 강력한 Task 형식을 편리하게 쓸 수 있게 감싼 wrapper일 뿐

동적 병렬 처리가 필요하다면 Task 형식을 직접 사용하는 것이 가장 쉽다.

 

이진 트리의 각 노드에 비용이 많이 드는 처리를 수행해야 하는 예

void Traverse(Node current)
{
DoExpensiveActionOnNode(current);
if (current.Left is not null)
ProcessNode(current.Left);
if (current.Right is not null)
ProcessNode(current.Right);
}
Task ProcessNode(Node node,
TaskCreationOptions options = TaskCreationOptions.AttachedToParent)
{
return Task.Factory.StartNew(
() => Traverse(node),
CancellationToken.None,
options,
TaskScheduler.Default);
}
void ProcessTree(Node root)
{
var task = ProcessNode(root, TaskCreationOptions.None);
task.Wait();
}

연속 작업(Continuation)

Task task = Task.Factory.StartNew(
() => Thread.Sleep(TimeSpan.FromSeconds(2)),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default);
Task continuation = task.ContinueWith(
t => Trace.WriteLine("Task is done"),
CancelltationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default);

PLINQ

IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
return values.AsParallel().Select(value => value * 2);
}
IEnumerable<int> MultiplyBy2Ordered(IEnumerable<int> values)
{
return values.AsParallel()
.AsOrdered()
.Select(value => value * 2);
}
int ParallelSum(IEnumerable<int> values)
{
return values.AsParallel().Sum();
}

 

'.NET > C#' 카테고리의 다른 글

Concurrency - TPL Dataflow  (0) 2023.08.16
Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

Example

async Task DoSomethingAsync()
{
int value = 1;
// Context 저장
// null ? SynchronizationContext : TaskScheduler
// ASP.NET Core는 별도의 요청 컨텍스트가 아닌 threadpool context 사용
await Task.Delay(TimeSpan.FromSeconds(1));
value *= 2;
await Task.Delay(TimeSpan.FromSeconds(1));
Trace.WriteLine(value);
}
항상 코어 '라이브러리' 메서드 안에서 ConfigureAwait를 호출하고,
필요할 때만 다른 외부 '사용자 인터페이스' 메서드에서 컨택스트를 재개하는 것이 좋다.
async Task DoSomethingAsync()
{
int value = 1;
await Task.Delay(TimeSpan.FromSeconds(1))
.ConfigureAwait(false);
// Threadpool thread에서 실행을 재개한다.
value *= 2;
await Task.Delay(TimeSpan.FromSeconds(1))
.ConfigureAwait(false);
Trace.WriteLine(value);
}

 

ValueTask<T>

  • 메모리 내 캐시에서 결과를 읽을 수 있는 등 메모리 할당을 줄일 수 있는 형식

 

Task 인스턴스를 만드는 방법

  1. CPU가 실행해야 할 실제 코드를 나타내는 계산 작업은 Task.Run으로 생성
  2. 특정 스케줄러에서 실행해야 한다면 TaskFactory.StartNew로 생성
  3. 이벤트 기반 작업은 TaskCompletionSource<TResult>
    (대부분 I/O 작업은 TaskCompletionSource<TResult> 사용)

 

오류 처리

async Task TrySomethingAsync()
{
// 예외는 Task에서 발생한다.
var task = PossibleExceptionAsync();
try
{
// 여기서 예외 발생
await task;
}
catch (NotSupportedException ex)
{
// 이렇게 잡힌 예외는 자체적으로 적절한 스택 추적(Stack trace)을 보존하고 있으며
// 따로 TargetInvocationException이나 AggregateException으로 쌓여 있지 않다.
LogException(ex);
throw;
}
}

 

Deadlock

async Task WaitAsync()
{
// 3. 현재 context 저장
await Task.Delay(TimeSpan.FromSeconds(1));
// ...
// 3. 저장된 context 안에서 재개 시도
// Deadlock 메서드의 2. task.Wait()에서 차단된 thread
// context는 한 번에 하나의 thread만 허용하므로 재개할 수 없음
}
void Deadlock()
{
// 1. 지연 시작
var task = WaitAsync();
// 2. 동기적으로 차단하고 async 메서드의 완료 대기
task.Wait();
}

위의 코든느 UI 컨텍스트나 ASP.NET 클래식 컨텍스트에서 호출하면 교착 상태에 빠진다.

ConfigureAwait(false)로 해결

async Task WaitAsync()
{
// 3. 현재 context 저장
await Task.Delay(TimeSpan.FromSeconds(1))
.ConfigureAwait(false);
// ...
// 3. Threadpool thread에서 재개
}
void Deadlock()
{
// 1. 지연 시작
var task = WaitAsync();
// 2. 동기적으로 차단하고 async 메서드의 완료 대기
task.Wait();
}

https://learn.microsoft.com/en-us/dotnet/csharp/asynchronous-programming/

 

Asynchronous programming in C#

An overview of the C# language support for asynchronous programming using async, await, Task, and Task

learn.microsoft.com

https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap

 

Task-based Asynchronous Pattern (TAP): Introduction and overview

Learn about the Task-based Asynchronous Pattern (TAP), and compare it to the legacy patterns: Asynchronous Programming Model (APM) and Event-based Asynchronous Pattern (EAP).

learn.microsoft.com

 

Exponential backoff

async Task<string> DownloadStringWithRetries(HttpClient client, string uri)
{
TimeSpan nextDelay = TimeSpan.FromSeconds(1);
for (int i = 0; i < 3; ++i)
{
try
{
return await client.GetStringAsync(uri);
}
catch { }
await Task.Delay(nextDelay);
nextDelay = nextDelay + nextDelay;
}
// 오류를 전파할 수 있게 마지막으로 한 번 더 시도
return await client.GetStringAsync(uri);
}

 

Soft timeout

async Task<string> DownloadStringWithTimeout(HttpClient client, string uri)
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
Task<string> downloadTask = client.GetStringAsync(uri);
Task timeoutTask = Task.Delay(Timeout.InfiniteTimeSpan, cts.Token);
Task completedTask = await Task.WhenAny(downloadTask, timeoutTask);
if (completedTask == timeoutTask)
{
// WARNING: downloadTask는 여전히 동작한다.
return null;
}
return await downloadTask;
}

타임아웃이 지나면 실행을 중단해야 할 때

async Task IssueTimeoutAsync()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
CancellationToken token = cts.Token;
await Task.Delay(TimeSpan.FromSeconds(10), token);
}
async Task IssueTimeoutAsync()
{
using var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter(TimeSpan.FromSeconds(3));
await Task.Delay(TimeSpan.FromSeconds(10), token);
}

 

비동기 시그니처를 사용해서 동기 메서드 구현

interface IMyAsync
{
Task<int> GetValueAsync(CancellationToken token);
Task DoSomethingAsync();
}
class MySync : IMyAsync
{
// 자주 사용하는 작업 결과라면 미리 만들어 놓고 쓴다.
private static readonly Task<int> ZeroTask = Task.FromResult(0);
public Task<int> GetValueAsync(CancellationToken token)
{
if (token.IsCancellationRequested)
return Task.FromCanceled<int>(token);
return Task.FromResult(10);
}
public Task<T> NotImplementedAsync()
{
return Task.FromException<T>(new NotImplementedException());
}
protected void DoSomethingSynchronously()
{
}
public Task DoSomethingAsync()
{
try
{
DoSomethingSynchronously();
return Task.CompletedTask;
}
catch (Exception ex)
{
return Task.FromException(ex);
}
}
}

 

진행 상황 보고

async Task MyMethodAsync(IProgress<double> progress = null)
{
bool done = false;
double percentComplete = 0;
while (!done)
{
...
progress?.Report(percentComplete);
}
}
async Task CallMyMethodAsync()
{
var progress = new Progress<double>();
progress.ProgressChanged += (sender, args) =>
{
...
};
await MyMethodAsync(progress);
}

 

모든 작업의 완료 대기

Task task1 = Task.Delay(TimeSpan.FromSeconds(1));
Task task2 = Task.Delay(TimeSpan.FromSeconds(2));
Task task3 = Task.Delay(TimeSpan.FromSeconds(1));
await Task.WhenAll(task1, task2, task3);
Task<int> task1 = Task.FromResult(1);
Task<int> task2 = Task.FromResult(3);
Task<int> task3 = Task.FromResult(5);
int[] results = await Task.WhenAll(task1, task2, task3);
// results = [1, 3, 5]

Example:

async Task<string> DownloadAllAsync(HttpClient client,
IEnumerable<string> urls)
{
var downloads = urls.Select(url => client.GetStringAsync(url));
// 아직 실제로 시작한 작업은 없다.
// 동시에 모든 URL에서 다운로드 시작
Task<string>[] downloadTasks = downloads.ToArray();
// 모든 다운로드 완료를 비동기적으로 대기
string[] htmlPages = await Task.WhenAll(downloadTasks);
return string.Concat(htmlPages);
}

작업 중 하나가 예외를 일으키면 Task.WhenAll은 작업과 함께 해당 예외를 반환하며 실패한다.

여러 작업이 예외를 일으키면 모든 예외를 Task.WhenAll이 반환하는 Task에 넣는다.

하지만 작업이 대기 상태면 예외 중 하나만 일으킨다.

async Task ThrowNotImplementedExceptionAsync()
{
throw new NotImplementedException();
}
async Task ThrowInvalidOperationExceptionAsync()
{
throw new InvalidOperationException();
}
async Task ObserveOneExceptionAsync()
{
var task1 = ThrowNotImplementedExceptionAsync();
var task2 = ThrowInvalidOperationExceptionAsync();
try
{
await Task.WhenAll(task1, task2);
}
catch (Excpeiton ex)
{
// ex는 NotImplementedException or InvalidOperationException
Trace.WriteLine(ex);
}
}
async Task ObserveAllExcpetionAsync()
{
var task1 = ThrowNotImplementedExceptionAsync();
var task2 = ThrowInvalidOperationExceptionAsync();
Task allTasks = Task.WhenAll(task1, task2);
try
{
await allTasks;
}
catch
{
AggregateException allExceptions = allTasks.Excpetion;
...
}
}

 

작업이 완료될 때마다 처리

async Task<int> DelayAndReturnAsync(int value)
{
await Task.Delay(TimeSpan.FromSeconds(value));
return value;
}
async Task AwaitAndProcessAsync(Task<int> task)
{
int rv = await task;
Trace.WriteLine(rv);
}
async Task ProcessTasksAsync(int flag)
{
Task<int> taskA = DelayAndReturnAsync(2);
Task<int> taskB = DelayAndReturnAsync(3);
Task<int> taskC = DelayAndReturnAsync(1);
var tasks = new Task[] { taskA, taskB, taskC };
Task[] processingTasks;
if (flag == 1)
{
IEnumerable<Task> taskQuery =
from t in tasks select AwaitAndProcessAsync(t);
processingTasks = taskQuery.ToArray();
await Task.WhenAll(processingTasks);
}
else if (flag == 2)
{
processingTasks = tasks.Select(async t =>
{
var rv = await t;
Trace.WriteLine(rv);
}).ToArray();
await Task.WhenAll(processingTasks);
}
else if (flag == 3)
{
foreach (Task<int> task in tasks.OrderByCompletion())
{
int rv = await task;
Trace.WriteLine(rv);
}
}
}

 

async void 메서드의 예외 처리

sealed class MyAsyncCommand : ICommand
{
async void ICommand.Execute(object parameter)
{
await Execute(parameter);
}
public async Task Execute(object parameter)
{
; // 비동기 작업 구현
}
; // CanExecute 등 구현
}

 

ValueTask 생성/사용

  • 반환할 수 있는 동기적 결과가 있고 비동기 동작이 드문 상황에서 반환 형식으로 사용
  • 프로파일링을 통해 애플리케이션의 성능 향상을 확인할 수 있을 때만 고려해야 함
  • ValueTask를 반환하는 DisposeAsync 메서드가 있는 IAsyncDisposable을 구현할 때
private Task<int> SlowMethodAsync();
public ValueTask<int> MethodAsync()
{
if (CanBehaveSynchronously)
return new ValueTask<int>(1);
return new ValueTask<int>(SlowMethodAsync());
}
async Task ConsumingMethodAsync()
{
ValueTask<int> valueTask = MethodAsync();
; // 기타 동시성 작업
int value = await valueTask;
;
}
ValueTask는 딱 한 번만 대기할 수 있다.
더 복잡한 작업을 하려면 AsTask를 호출해서 ValueTask<T>를 Task<T>로 변환해야 한다.
ValueTask에서 동기적으로 결과를 얻으려면 ValueTask를 완료한 뒤에 한 번만 할 수 있다.
async Task ConsumingTaskAsync()
{
Task<int> task = MethodAsync().AsTask();
; // 기타 동시성 작업
int value = await task;
// Task<T>는 await로 여러 번 대기해도 완벽하게 안전하다.
int anotherValue = await task;
}
async Task ConsumingTaskAsync()
{
Task<int> task1 = MethodAsync().AsTask();
Task<int> task2 = MethodAsync().AsTask();
int[] results = await Task.WhenAll(task1, task2);
}

 

Asynchronous Stream

async IAsyncEnumerable<string> GetValuesAsync(HttpClient client)
{
const int limit = 10;
for (int offset = 0; true; offset += limit)
{
string result = await client.GetStringAsync(
$"https://example.com/api/values?offset={offset}&limit={limit}");
string[] valuesOnThisPage = result.Split('\n');
// 현재 페이지의 결과 전달
foreach (string value in valuesOnThisPage)
yield return value;
// 마지막 페이지면 끝
if (valuesOnThisPage.Length != limit)
break;
}
public async Task ProcessValuesAsync(HttpClient client)
{
await foreach (string value in GetValuesAsync(client))
{
Console.WriteLine(value);
}
}
public async Task ProcessValuesAsync(HttpClient client)
{
await foreach (string value in GetValuesAsync(client).ConfigureAwait(false))
{
await Task.Delay(100).ConfigureAwait(false); // 비동기 작업
Console.WriteLine(value);
}
}

 

비동기 스트림과 LINQ 함께 사용

IEnumerable<T>에는 LINQ to Objects가 있고 IObservable<T>에는 LINQ to Events가 있다.

IAsyncEnumerable<T>도 System.Linq.Async NuGet Package를 통해 LINQ 지원

IAsyncEnumerable<int> values = SlowRange().WhereAwait(
async value =>
{
// 요소의 포함 여부를 결정할 비동기 작업 수행
await Task.Delay(10);
return value % 2 == 0;
})
.Where(value => value % 4 == 0); // 결과는 비동기 스트림
await foreach (int result in values)
Console.WriteLine(result);
// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange()
{
for (int i = 0; i < 10; ++i)
{
await Task.Delay(i * 100);
yield return i;
}
}

Async 접미사는 값을 추출하거나 계산을 수행한 뒤에 비동기 시퀀스가 아닌 비동기 스칼라 값을 반환하는 연산자에만 붙는다.

int count = await SlowRange().CountAsync(
value => value % 2 == 0);
// 조건자가 비동기적일 땐 AwaitAsync 접미사가 붙는 연산자를 사용
int count = await SlowRange().CountAwaitAsync(
async value =>
{
await Task.Delay(10);
return value % 2 == 0;
});

비동기 스트림 취소

using var cts = new CancellationTokenSource(500);
CancellationToken token = ct.Token;
await foreach (int result in SlowRange(token))
{
Console.WriteLine(result);
}
// 진행에 따라 속도가 느려지는 시퀀스 생성
async IAsyncEnumerable<int> SlowRange(
[EnumeratorCancellation] CancellationToken token = default)
{
for (int i = 0; i < 10; ++i)
{
await Task.Delay(i * 100, token);
yield return i;
}
}

비동기 스트림의 열거에 CancellationToken을 추가할 수 있는 WithCancellation 확장 메서드 지원

async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
using var cts = new CancellationTokenSource(500);
CancellationToken token = cts.Token;
await foreach (int result in items.WithCancellation(token))
{
Console.WriteLine(result);
}
}
await ConsumeSequence(SlowRange());

 

'.NET > C#' 카테고리의 다른 글

Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15
Array Marshaling  (0) 2021.10.15

https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

Concurrency

  • 한 번에 두 가지 이상의 작업 수행

Multithreading

  • 다수의 실행 스레드를 사용하는 동시성의 한 형태

Parallel Processing

  • 많은 작업을 여러 스레드에 나눠서 동시에 수행
  • 멀티스레딩을 사용해서 멀티 코어 프로세서를 최대한 활용하는 방법

Asynchronous Programming

  • 불필요한 스레드의 사용을 피하려고 future나 callback을 사용하는 동시성의 한 형태

Reactive Programing

  • 애플리케이션이 이벤트에 대응하게 하는 선언형 프로그래밍 방식
  • 비동기 연산이 아닌 비동기 이벤트 기반

 

 

'.NET > C#' 카테고리의 다른 글

Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15
Array Marshaling  (0) 2021.10.15
Comparisons and Sorts  (0) 2021.10.15

+ Recent posts