https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/

 

Concurrency in C# Cookbook, 2nd Edition

If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n

www.oreilly.com

 

Nuget Package: System.Reactive

 

  • 이벤트 스트림을 데이터 스트림처럼 다룰 수 있다.
  • Observable stream의 개념을 바탕으로 한다.
  • Observable stream을 구독(subscribe)하면 정해지지 않은 수의 데이터 항목을 수신(OnNext)한 뒤에 하나의 오류(OnError) 또는 스트림 끝 알림(OnCompleted)으로 스트림이 끝난다.
  • 끝나지 않는 observable stream도 있다.

실제 인터페이스 모습, System.Reactive 라이브러리에 모든 구현을 포함하고 있음.

interface IObserver<in T>
{
    void OnNext(T item);
    void OnCompleted();
    void OnError(Exception error);
}

interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

Example:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Where(ev => ev.Value % 2 == 0)
    .Select(ev => ev.Timestamp)
    .Subscribe(
        ts => Trace.WriteLine(ts),
        ex => Trace.WriteLine(ex));

IObservable<DateTimeOffset> timestamps = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Where(ev => ev.Value % 2 == 0)
    .Select(ev => ev.Timestamp);
timestamps.Subscribe(
    ts => Trace.WriteLine(ts),
    ex => Trace.WriteLine(ex));
  • System.Reactive 구독 역시 리소스다.
  • Subscribe 연산자는 구독을 나타내는 IDisposable 반환
    • 항상 오류 처리 매개 변수를 함께 받아야 함
  • 코드에서 observable stream의 수신을 완료하면 구독을 삭제해야 함
  • 구독은 hot observable과 cold observable에서 다르게 동작
    • hot observable: 언제든 발생할 수 있는 이벤트 스트림
      이벤트가 발생할 때 구독이 없으면 해당 이벤트는 사라짐
    • cold observable: 자동으로 발생하는 이벤트가 없는 경우
      구독에 대응해서 이벤트를 순서대로 발생하기 시작

http://www.introtorx.com 

 

Introduction to Rx

IntroToRx.com is the online resource for getting started with the Reactive Extensions to .Net. Originally starting life as a blog series, it has now flourished into an online book. You can read it online here via the website, or get a copy of the Kindle ed

introtorx.com

 

.NET 이벤트 변환

var progress = new Progress<int>();
IObservable<EventPattern<int>> progressReports =
    Observable.FromEventPattern<int>(
        handler => progress.ProgressChanged += handler,
        handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));


var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<ElapsedEventArgs>> ticks =
    Observable.FromEventPattern<ElaspedEventHandler, ElaspedEventArgs>(
        handler => (s, a) => handler(s, a),	// EventHandler<ElapsedEventArgs to ElapsedEventArgs
        handler => timer.Elapsed += handler,
        handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));


var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<object>> ticks =
    Observable.FromEventPattern(timer, nameof(Timer.Elapsed));
ticks.Subscribe(data => Trace.WriteLine("OnNext: "
    + (data.EventArgs as ElapsedEventArgs).SignalTime));

 

컨텍스트로 알림 전달

각 OnNext 알림은 순차적으로 발생하지만 무조건 같은 스레드에서 발생하지 않는다.

private void Button_Click(object sender, RoutedEventArgs e)
{
    SynchronizationContext uiContext = SynchronizationContext.Current;
    Trace.WriteLine($"UI Thread is {Environment.CurrentManagedThreadId}"};
    Observable.Interval(TimeSpan.FromSeconds(1))
        .ObserveOn(uiContext)
        .Subscribe(x => Trace.WriteLine(
            $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}

UI 스레드를 벗어나는 용도

SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
    handler => (s, a) => handler(s, a),
    handler => MouseMove += handler,
    handler => MouseMove -= handler)
    .Select(evt => evt.EventArgs.GetPosition(this))
    .ObserveOn(Scheduler.Default)
    .Select(position =>
    {
        // 복잡한 계산
        Trace.WriteLine($"Calculated result {rv} on thread {Environment.CurrentManagedThreadId}");
        return rv;
    })
    .ObserveOn(uiContext)
    .Subscribe(x => Trace.WriteLine(
        $"Result {x} on thread {Environment.CurrentManagedThreadId}"));

 

Window와 Buffer로 이벤트 데이터 그룹화

  Buffer Window
이벤트 전달 들어오는 이벤트의 그룹화가 끝나면 모든 이벤트를 이벤트 컬렉션으로 한 번에 전달 들어오는 이벤트를 논리적으로 그룹호하지만 도착하는 대로 전달
반환형식 IObservable<IList<T>> IObservable<IObservable<T>>

 

'.NET > C#' 카테고리의 다른 글

Concurrency - TPL Dataflow  (0) 2023.08.16
Concurrency - Parallel Programming  (0) 2023.08.16
Concurrency - Asynchronous Programming  (0) 2023.08.16
Concurrency (동시성)  (0) 2023.08.16
Marshaling: 복사 및 고정  (0) 2021.10.15

+ Recent posts