https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/
Concurrency in C# Cookbook, 2nd Edition
If you’re one of many developers still uncertain about concurrent and multithreaded development, this practical cookbook will change your mind. With more than 85 code-rich recipes in this updated second … - Selection from Concurrency in C# Cookbook, 2n
www.oreilly.com
Nuget Package: System.Reactive
- 이벤트 스트림을 데이터 스트림처럼 다룰 수 있다.
- Observable stream의 개념을 바탕으로 한다.
- Observable stream을 구독(subscribe)하면 정해지지 않은 수의 데이터 항목을 수신(OnNext)한 뒤에 하나의 오류(OnError) 또는 스트림 끝 알림(OnCompleted)으로 스트림이 끝난다.
- 끝나지 않는 observable stream도 있다.
실제 인터페이스 모습, System.Reactive 라이브러리에 모든 구현을 포함하고 있음.
interface IObserver<in T> { void OnNext(T item); void OnCompleted(); void OnError(Exception error); } interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); }
Example:
Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(ev => ev.Value % 2 == 0) .Select(ev => ev.Timestamp) .Subscribe( ts => Trace.WriteLine(ts), ex => Trace.WriteLine(ex)); IObservable<DateTimeOffset> timestamps = Observable .Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(ev => ev.Value % 2 == 0) .Select(ev => ev.Timestamp); timestamps.Subscribe( ts => Trace.WriteLine(ts), ex => Trace.WriteLine(ex));
- System.Reactive 구독 역시 리소스다.
- Subscribe 연산자는 구독을 나타내는 IDisposable 반환
- 항상 오류 처리 매개 변수를 함께 받아야 함
- 코드에서 observable stream의 수신을 완료하면 구독을 삭제해야 함
- 구독은 hot observable과 cold observable에서 다르게 동작
- hot observable: 언제든 발생할 수 있는 이벤트 스트림
이벤트가 발생할 때 구독이 없으면 해당 이벤트는 사라짐 - cold observable: 자동으로 발생하는 이벤트가 없는 경우
구독에 대응해서 이벤트를 순서대로 발생하기 시작
- hot observable: 언제든 발생할 수 있는 이벤트 스트림
Introduction to Rx
IntroToRx.com is the online resource for getting started with the Reactive Extensions to .Net. Originally starting life as a blog series, it has now flourished into an online book. You can read it online here via the website, or get a copy of the Kindle ed
introtorx.com
.NET 이벤트 변환
var progress = new Progress<int>(); IObservable<EventPattern<int>> progressReports = Observable.FromEventPattern<int>( handler => progress.ProgressChanged += handler, handler => progress.ProgressChanged -= handler); progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs)); var timer = new System.Timers.Timer(interval: 1000) { Enabled = true }; IObservable<EventPattern<ElapsedEventArgs>> ticks = Observable.FromEventPattern<ElaspedEventHandler, ElaspedEventArgs>( handler => (s, a) => handler(s, a), // EventHandler<ElapsedEventArgs to ElapsedEventArgs handler => timer.Elapsed += handler, handler => timer.Elapsed -= handler); ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime)); var timer = new System.Timers.Timer(interval: 1000) { Enabled = true }; IObservable<EventPattern<object>> ticks = Observable.FromEventPattern(timer, nameof(Timer.Elapsed)); ticks.Subscribe(data => Trace.WriteLine("OnNext: " + (data.EventArgs as ElapsedEventArgs).SignalTime));
컨텍스트로 알림 전달
각 OnNext 알림은 순차적으로 발생하지만 무조건 같은 스레드에서 발생하지 않는다.
private void Button_Click(object sender, RoutedEventArgs e) { SynchronizationContext uiContext = SynchronizationContext.Current; Trace.WriteLine($"UI Thread is {Environment.CurrentManagedThreadId}"}; Observable.Interval(TimeSpan.FromSeconds(1)) .ObserveOn(uiContext) .Subscribe(x => Trace.WriteLine( $"Interval {x} on thread {Environment.CurrentManagedThreadId}")); }
UI 스레드를 벗어나는 용도
SynchronizationContext uiContext = SynchronizationContext.Current; Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}"); Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>( handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler) .Select(evt => evt.EventArgs.GetPosition(this)) .ObserveOn(Scheduler.Default) .Select(position => { // 복잡한 계산 Trace.WriteLine($"Calculated result {rv} on thread {Environment.CurrentManagedThreadId}"); return rv; }) .ObserveOn(uiContext) .Subscribe(x => Trace.WriteLine( $"Result {x} on thread {Environment.CurrentManagedThreadId}"));
Window와 Buffer로 이벤트 데이터 그룹화
Buffer | Window | |
이벤트 전달 | 들어오는 이벤트의 그룹화가 끝나면 모든 이벤트를 이벤트 컬렉션으로 한 번에 전달 | 들어오는 이벤트를 논리적으로 그룹호하지만 도착하는 대로 전달 |
반환형식 | IObservable<IList<T>> | IObservable<IObservable<T>> |
'.NET > C#' 카테고리의 다른 글
Concurrency - TPL Dataflow (0) | 2023.08.16 |
---|---|
Concurrency - Parallel Programming (0) | 2023.08.16 |
Concurrency - Asynchronous Programming (0) | 2023.08.16 |
Concurrency (동시성) (0) | 2023.08.16 |
Marshaling: 복사 및 고정 (0) | 2021.10.15 |