https://www.oreilly.com/library/view/concurrency-in-c/9781492054498/
Nuget Package: System.Reactive
- 이벤트 스트림을 데이터 스트림처럼 다룰 수 있다.
- Observable stream의 개념을 바탕으로 한다.
- Observable stream을 구독(subscribe)하면 정해지지 않은 수의 데이터 항목을 수신(OnNext)한 뒤에 하나의 오류(OnError) 또는 스트림 끝 알림(OnCompleted)으로 스트림이 끝난다.
- 끝나지 않는 observable stream도 있다.
실제 인터페이스 모습, System.Reactive 라이브러리에 모든 구현을 포함하고 있음.
interface IObserver<in T>
{
void OnNext(T item);
void OnCompleted();
void OnError(Exception error);
}
interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
Example:
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(ev => ev.Value % 2 == 0)
.Select(ev => ev.Timestamp)
.Subscribe(
ts => Trace.WriteLine(ts),
ex => Trace.WriteLine(ex));
IObservable<DateTimeOffset> timestamps = Observable
.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(ev => ev.Value % 2 == 0)
.Select(ev => ev.Timestamp);
timestamps.Subscribe(
ts => Trace.WriteLine(ts),
ex => Trace.WriteLine(ex));
- System.Reactive 구독 역시 리소스다.
- Subscribe 연산자는 구독을 나타내는 IDisposable 반환
- 항상 오류 처리 매개 변수를 함께 받아야 함
- 코드에서 observable stream의 수신을 완료하면 구독을 삭제해야 함
- 구독은 hot observable과 cold observable에서 다르게 동작
- hot observable: 언제든 발생할 수 있는 이벤트 스트림
이벤트가 발생할 때 구독이 없으면 해당 이벤트는 사라짐 - cold observable: 자동으로 발생하는 이벤트가 없는 경우
구독에 대응해서 이벤트를 순서대로 발생하기 시작
- hot observable: 언제든 발생할 수 있는 이벤트 스트림
.NET 이벤트 변환
var progress = new Progress<int>();
IObservable<EventPattern<int>> progressReports =
Observable.FromEventPattern<int>(
handler => progress.ProgressChanged += handler,
handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<ElapsedEventArgs>> ticks =
Observable.FromEventPattern<ElaspedEventHandler, ElaspedEventArgs>(
handler => (s, a) => handler(s, a), // EventHandler<ElapsedEventArgs to ElapsedEventArgs
handler => timer.Elapsed += handler,
handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<object>> ticks =
Observable.FromEventPattern(timer, nameof(Timer.Elapsed));
ticks.Subscribe(data => Trace.WriteLine("OnNext: "
+ (data.EventArgs as ElapsedEventArgs).SignalTime));
컨텍스트로 알림 전달
각 OnNext 알림은 순차적으로 발생하지만 무조건 같은 스레드에서 발생하지 않는다.
private void Button_Click(object sender, RoutedEventArgs e)
{
SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI Thread is {Environment.CurrentManagedThreadId}"};
Observable.Interval(TimeSpan.FromSeconds(1))
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(
$"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}
UI 스레드를 벗어나는 용도
SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(evt => evt.EventArgs.GetPosition(this))
.ObserveOn(Scheduler.Default)
.Select(position =>
{
// 복잡한 계산
Trace.WriteLine($"Calculated result {rv} on thread {Environment.CurrentManagedThreadId}");
return rv;
})
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(
$"Result {x} on thread {Environment.CurrentManagedThreadId}"));
Window와 Buffer로 이벤트 데이터 그룹화
Buffer | Window | |
이벤트 전달 | 들어오는 이벤트의 그룹화가 끝나면 모든 이벤트를 이벤트 컬렉션으로 한 번에 전달 | 들어오는 이벤트를 논리적으로 그룹호하지만 도착하는 대로 전달 |
반환형식 | IObservable<IList<T>> | IObservable<IObservable<T>> |
'.NET > C#' 카테고리의 다른 글
Concurrency - TPL Dataflow (0) | 2023.08.16 |
---|---|
Concurrency - Parallel Programming (0) | 2023.08.16 |
Concurrency - Asynchronous Programming (0) | 2023.08.16 |
Concurrency (동시성) (0) | 2023.08.16 |
Marshaling: 복사 및 고정 (0) | 2021.10.15 |