https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

데이터 병렬

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
    Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}

IEnumerable<bool> PrimalityTest(IEnumerable<int> values)
{
    return values.AsParallel()
        .Select(value => IsPrime(value));
}

 

작업 병렬(병렬 호출)

void ProcessArray(double[] array)
{
    Parallel.Invoke(
        () => ProcessPartialArray(array, 0, array.Length / 2),
        () => ProcessPartialArray(array, array.Length / 2, array.Length)
    );
}

void ProcessPartialArray(double[] array, int begin, int end)
{
    // CPU 집약적인 처리
}

작업 병렬에서는 특히 closure 안에 캡처한 변수에 주의해야 한다.

값이 아닌 참조를 캡처하므로 명확하지 않은 공유가 일어날 수도 있다.

try
{
    Parallel.Invoke(
        () => { throw new Exception(); },
        () => { throw new Exception(); });
}
catch (AggregateException ex)
{
    ex.Handle(exception =>
    {
        Trace.WriteLine(exception);
        return true; // 처리함
    });
}

루푸 중지

void InvertMatrices(IEnumerable<Matrix> matrices)
{
    Pallel.ForEach(matrices, (matrix, state) =>
    {
        if (matrix.InInvertible)
            matrix.Invert();
        else
            state.Stop();
    });
}

중지는 루프의 내부에서 일어나며 취소는 루푸의 외부에서 일어난다.

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees, CancellationToken token)
{
    Parallel.ForEach(matrices,
        new ParallelOptions { CancellationToken = token },
        matrix => matrix.Rotate(degrees));
}

공유 상태를 보호하는 잠금(lock)의 사용법 예

int InvertMatrices(IEnumerable<Matrix> matrices)
{
    object mutex = new object();
    int nonInvertibleCount = 0;
    Parallel.ForEach(matrices, matrix =>
    {
        if (matrix.IsInvertible)
            matrix.Invert();
        else
        {
            lock (mutex)
            {
                ++nonInvertibleCount;
            }
        }
    });
    return nonInvertibleCount;
}

PLINQ는 Parallel과 거의 똑같은 기능을 제공한다. 차이점:

  • PLINQ는 컴퓨터의 모든 코어를 사용할 수 있다고 가정
  • Prallel은 CPU의 상황에 따라 동적으로 대응

병렬 집계

int ParallelSum1(IEnumerable<int> values)
{
    object mutex = new object();
    int result = 0;    
    Parallel.ForEach(source: values,
        localInit: () => 0,
        body: (item, state, localValue) => localValue + item,
        localFinally: localValue =>
        {
            lock (mutex)
                result += localValue;
        });
    return result;
}

int ParallelSum2(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

int ParallelSum3(IEnumerable<int> values)
{
    return values.AsParallel().Aggregate(
        seed: 0,
        func: (sum, item) => sum + item
    );
}

병렬 호출

void DoAction20Times(Action action, CancellationToken token)
{
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(
        new ParallelOptions( { CancellationToken = token },
        actions);
}

 

TPL의 핵심은 Task 형식이다.

Parallel 클래스오 PLINQ는 강력한 Task 형식을 편리하게 쓸 수 있게 감싼 wrapper일 뿐

동적 병렬 처리가 필요하다면 Task 형식을 직접 사용하는 것이 가장 쉽다.

 

이진 트리의 각 노드에 비용이 많이 드는 처리를 수행해야 하는 예

void Traverse(Node current)
{
    DoExpensiveActionOnNode(current);
    
    if (current.Left is not null)
        ProcessNode(current.Left);
    if (current.Right is not null)
        ProcessNode(current.Right);
}

Task ProcessNode(Node node,
    TaskCreationOptions options = TaskCreationOptions.AttachedToParent)
{
    return Task.Factory.StartNew(
        () => Traverse(node),
        CancellationToken.None,
        options,
        TaskScheduler.Default);
}

void ProcessTree(Node root)
{
    var task = ProcessNode(root, TaskCreationOptions.None);
    task.Wait();
}

연속 작업(Continuation)

Task task = Task.Factory.StartNew(
    () => Thread.Sleep(TimeSpan.FromSeconds(2)),
    CancellationToken.None,
    TaskCreationOptions.None,
    TaskScheduler.Default);

Task continuation = task.ContinueWith(
    t => Trace.WriteLine("Task is done"),
    CancelltationToken.None,
    TaskContinuationOptions.None,
    TaskScheduler.Default);

PLINQ

IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
    return values.AsParallel().Select(value => value * 2);
}

IEnumerable<int> MultiplyBy2Ordered(IEnumerable<int> values)
{
    return values.AsParallel()
        .AsOrdered()
        .Select(value => value * 2);
}

int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

 

'.NET > C#' 카테고리의 다른 글

Concurrency - TPL Dataflow  (0) 2023.08.16
Concurrency - Reactive Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

+ Recent posts