Building Responsive Systems with Observer Pattern
Imagine building a stock trading application where multiple dashboard widgets need to update whenever a stock price changes. You could poll the database every second, but that wastes resources. Or each widget could tightly couple to your data source, making the code rigid and hard to maintain.
The Observer pattern solves this by letting objects subscribe to notifications from a subject. When the subject's state changes, it automatically notifies all subscribers. This decouples components, letting them react to changes without knowing about each other. The subject doesn't care who's listening, and observers don't need references to the subject.
You'll learn three ways to implement the Observer pattern in .NET. We'll start with C# events and delegates, move to the formal IObservable interface, and finish with practical real-world examples. By the end, you'll know when to use each approach and how to avoid common pitfalls.
Using C# Events and Delegates
C# events provide the most straightforward implementation of the Observer pattern. An event is a special type of delegate that only the declaring class can invoke. Subscribers register with the += operator and unsubscribe with -=. This built-in language support makes events the default choice for most notification scenarios.
The standard pattern uses EventHandler delegates with custom EventArgs classes to pass data. This convention appears throughout the .NET framework and makes your code immediately familiar to other C# developers.
// Custom EventArgs to carry data
public class StockPriceChangedEventArgs : EventArgs
{
public string Symbol { get; set; }
public decimal OldPrice { get; set; }
public decimal NewPrice { get; set; }
public decimal ChangePercent { get; set; }
}
// Subject - the thing being observed
public class StockTicker
{
private readonly Dictionary _prices = new();
// Event declaration
public event EventHandler? PriceChanged;
public void UpdatePrice(string symbol, decimal newPrice)
{
decimal oldPrice = _prices.GetValueOrDefault(symbol, newPrice);
_prices[symbol] = newPrice;
if (oldPrice != newPrice)
{
// Raise the event
OnPriceChanged(new StockPriceChangedEventArgs
{
Symbol = symbol,
OldPrice = oldPrice,
NewPrice = newPrice,
ChangePercent = oldPrice > 0
? ((newPrice - oldPrice) / oldPrice) * 100
: 0
});
}
}
protected virtual void OnPriceChanged(StockPriceChangedEventArgs e)
{
PriceChanged?.Invoke(this, e);
}
}
// Observers - objects that react to changes
public class PriceAlert
{
private readonly decimal _threshold;
public PriceAlert(decimal threshold)
{
_threshold = threshold;
}
public void OnPriceChanged(object? sender, StockPriceChangedEventArgs e)
{
if (Math.Abs(e.ChangePercent) >= _threshold)
{
Console.WriteLine($"ALERT: {e.Symbol} changed by {e.ChangePercent:F2}%");
}
}
}
// Usage
var ticker = new StockTicker();
var alert = new PriceAlert(5.0m);
ticker.PriceChanged += alert.OnPriceChanged;
ticker.PriceChanged += (sender, e) =>
{
Console.WriteLine($"{e.Symbol}: ${e.OldPrice} -> ${e.NewPrice}");
};
ticker.UpdatePrice("AAPL", 150.00m);
ticker.UpdatePrice("AAPL", 158.50m);
// Output:
// AAPL: $150.00 -> $158.50
// ALERT: AAPL changed by 5.67%
The event-based approach keeps observers loosely coupled. The StockTicker doesn't know or care who subscribes. Observers can be added or removed at runtime without modifying the subject. The null-conditional operator prevents crashes when no subscribers exist.
Implementing IObservable and IObserver
.NET provides IObservable and IObserver interfaces that formalize the Observer pattern. These interfaces define a contract for push-based notifications and include built-in error handling and completion signals. While more complex than events, they offer better composability and work seamlessly with Reactive Extensions.
IObservable represents a data source that pushes values. IObserver represents a consumer that receives those values. The Subscribe method returns IDisposable, letting observers unsubscribe by disposing the subscription.
public class TemperatureReading
{
public DateTime Timestamp { get; set; }
public double Temperature { get; set; }
public string Location { get; set; }
}
// Observable subject
public class TemperatureSensor : IObservable
{
private readonly List> _observers = new();
public IDisposable Subscribe(IObserver observer)
{
if (!_observers.Contains(observer))
_observers.Add(observer);
return new Unsubscriber(_observers, observer);
}
public void RecordTemperature(double temp, string location)
{
var reading = new TemperatureReading
{
Timestamp = DateTime.UtcNow,
Temperature = temp,
Location = location
};
foreach (var observer in _observers.ToArray())
{
observer.OnNext(reading);
}
}
public void EndTransmission()
{
foreach (var observer in _observers.ToArray())
{
observer.OnCompleted();
}
_observers.Clear();
}
// Helper class for unsubscription
private class Unsubscriber : IDisposable
{
private readonly List> _observers;
private readonly IObserver _observer;
public Unsubscriber(
List> observers,
IObserver observer)
{
_observers = observers;
_observer = observer;
}
public void Dispose()
{
_observers.Remove(_observer);
}
}
}
// Observer implementation
public class TemperatureMonitor : IObserver
{
private IDisposable? _unsubscriber;
public void Subscribe(IObservable provider)
{
_unsubscriber = provider.Subscribe(this);
}
public void OnNext(TemperatureReading value)
{
Console.WriteLine($"{value.Location}: {value.Temperature}°C at {value.Timestamp:HH:mm:ss}");
if (value.Temperature > 30)
Console.WriteLine(" WARNING: High temperature!");
}
public void OnError(Exception error)
{
Console.WriteLine($"Error: {error.Message}");
}
public void OnCompleted()
{
Console.WriteLine("Monitoring ended.");
_unsubscriber?.Dispose();
}
}
// Usage
var sensor = new TemperatureSensor();
var monitor = new TemperatureMonitor();
monitor.Subscribe(sensor);
sensor.RecordTemperature(25.5, "Room 101");
sensor.RecordTemperature(32.1, "Server Room");
sensor.EndTransmission();
// Output:
// Room 101: 25.5°C at 14:23:15
// Server Room: 32.1°C at 14:23:16
// WARNING: High temperature!
// Monitoring ended.
The IObservable approach provides three notification types: OnNext for values, OnError for exceptions, and OnCompleted to signal the end of the sequence. This explicit lifecycle management helps observers handle cleanup properly and respond to errors gracefully.
Thread-Safe Event Handling
Events can cause threading issues when subscribers modify the subscription list during notification. If an observer unsubscribes while you're iterating through subscribers, you'll hit exceptions. The solution is copying the subscriber list before iteration or using thread-safe collections.
The null-conditional operator combined with Invoke creates a temporary copy of the delegate chain, making it safe against concurrent modifications. For more complex scenarios, consider using concurrent collections or synchronization primitives.
public class DataProcessedEventArgs : EventArgs
{
public int RecordsProcessed { get; set; }
public TimeSpan Duration { get; set; }
}
public class DataProcessor
{
private readonly object _lock = new();
private event EventHandler? _dataProcessed;
// Thread-safe event wrapper
public event EventHandler DataProcessed
{
add
{
lock (_lock)
{
_dataProcessed += value;
}
}
remove
{
lock (_lock)
{
_dataProcessed -= value;
}
}
}
public async Task ProcessDataAsync(int recordCount)
{
var startTime = DateTime.UtcNow;
// Simulate processing
await Task.Delay(100);
var args = new DataProcessedEventArgs
{
RecordsProcessed = recordCount,
Duration = DateTime.UtcNow - startTime
};
// Safe invocation - creates snapshot of delegate
EventHandler? handler;
lock (_lock)
{
handler = _dataProcessed;
}
handler?.Invoke(this, args);
}
}
// Usage with concurrent modifications
var processor = new DataProcessor();
void Handler1(object? s, DataProcessedEventArgs e) =>
Console.WriteLine($"Handler 1: {e.RecordsProcessed} records");
void Handler2(object? s, DataProcessedEventArgs e)
{
Console.WriteLine($"Handler 2: {e.RecordsProcessed} records");
// This unsubscribe during event is safe
processor.DataProcessed -= Handler2;
}
processor.DataProcessed += Handler1;
processor.DataProcessed += Handler2;
await processor.ProcessDataAsync(1000);
await processor.ProcessDataAsync(2000);
// Output:
// Handler 1: 1000 records
// Handler 2: 1000 records
// Handler 1: 2000 records
The lock statement protects the subscription list from concurrent access. Taking a snapshot of the delegate before invocation prevents issues when handlers modify subscriptions. This pattern works for most scenarios but adds overhead that might not be necessary for single-threaded applications.
Preventing Memory Leaks with Weak Events
Event subscriptions create strong references from the subject to observers. If you forget to unsubscribe, the subject keeps observers alive, preventing garbage collection. This causes memory leaks, especially when short-lived objects subscribe to long-lived subjects.
Weak event patterns solve this by using weak references that don't prevent garbage collection. The subject holds weak references to observers, allowing them to be collected when no other references exist. This trades some complexity for automatic memory management.
public class StatusChangedEventArgs : EventArgs
{
public string Status { get; set; }
}
public class ServiceMonitor
{
private readonly List>> _handlers = new();
public void Subscribe(EventHandler handler)
{
_handlers.Add(new WeakReference>(handler));
}
public void Unsubscribe(EventHandler handler)
{
_handlers.RemoveAll(wr =>
{
if (wr.TryGetTarget(out var target))
return target == handler;
return true;
});
}
public void UpdateStatus(string status)
{
var args = new StatusChangedEventArgs { Status = status };
_handlers.RemoveAll(wr =>
{
if (wr.TryGetTarget(out var handler))
{
handler(this, args);
return false;
}
return true; // Remove dead references
});
Console.WriteLine($"Active subscribers: {_handlers.Count}");
}
}
// Usage demonstrating automatic cleanup
var monitor = new ServiceMonitor();
void CreateAndSubscribe()
{
var handler = new EventHandler((s, e) =>
Console.WriteLine($"Status: {e.Status}"));
monitor.Subscribe(handler);
// handler goes out of scope here
}
CreateAndSubscribe();
monitor.UpdateStatus("Running");
GC.Collect();
GC.WaitForPendingFinalizers();
monitor.UpdateStatus("Stopped");
// Output:
// Status: Running
// Active subscribers: 1
// Active subscribers: 0
Weak references let the garbage collector reclaim observer objects even when they're subscribed. The subject automatically cleans up dead references during notifications. This pattern works well for long-lived subjects like application services that outlive their observers.
Try It Yourself
Here's a complete example combining the Observer pattern with async notifications. This demonstrates how to notify observers asynchronously while handling errors gracefully.
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
</Project>
public class OrderPlacedEventArgs : EventArgs
{
public int OrderId { get; set; }
public decimal Total { get; set; }
}
public class OrderService
{
public event EventHandler? OrderPlaced;
public async Task PlaceOrderAsync(int orderId, decimal total)
{
Console.WriteLine($"Processing order {orderId}...");
await Task.Delay(100);
var args = new OrderPlacedEventArgs { OrderId = orderId, Total = total };
// Notify all subscribers asynchronously
var handlers = OrderPlaced?.GetInvocationList();
if (handlers != null)
{
var tasks = handlers
.Cast>()
.Select(handler => Task.Run(() => handler(this, args)));
await Task.WhenAll(tasks);
}
Console.WriteLine($"Order {orderId} completed.\n");
}
}
var orderService = new OrderService();
orderService.OrderPlaced += async (s, e) =>
{
await Task.Delay(50);
Console.WriteLine($" Email sent for order {e.OrderId}");
};
orderService.OrderPlaced += async (s, e) =>
{
await Task.Delay(30);
Console.WriteLine($" Inventory updated for order {e.OrderId}");
};
await orderService.PlaceOrderAsync(1001, 99.99m);
await orderService.PlaceOrderAsync(1002, 149.50m);
// Output:
// Processing order 1001...
// Inventory updated for order 1001
// Email sent for order 1001
// Order 1001 completed.
This async pattern lets observers run concurrently without blocking each other. The subject waits for all notifications to complete before continuing. Run this with dotnet run to see how multiple observers handle the same event independently.
Testing and Validation
Testing Observer implementations requires verifying that observers receive correct notifications and can unsubscribe properly. Mock observers let you assert that events fire with expected data without complex test setups.
Create simple test observers that record invocations. Assert that the correct number of notifications occurred with the right arguments. Test edge cases like unsubscribing during notification or subscribing the same observer multiple times.
using Xunit;
public class ObserverTests
{
[Fact]
public void Observer_ReceivesNotification_WhenSubjectChanges()
{
// Arrange
var ticker = new StockTicker();
var receivedEvents = new List();
ticker.PriceChanged += (s, e) => receivedEvents.Add(e);
// Act
ticker.UpdatePrice("MSFT", 100.00m);
ticker.UpdatePrice("MSFT", 105.00m);
// Assert
Assert.Equal(2, receivedEvents.Count);
Assert.Equal("MSFT", receivedEvents[1].Symbol);
Assert.Equal(105.00m, receivedEvents[1].NewPrice);
}
[Fact]
public void Unsubscribe_StopsNotifications()
{
// Arrange
var ticker = new StockTicker();
var count = 0;
void Handler(object? s, StockPriceChangedEventArgs e) => count++;
ticker.PriceChanged += Handler;
// Act
ticker.UpdatePrice("GOOG", 150.00m);
ticker.PriceChanged -= Handler;
ticker.UpdatePrice("GOOG", 155.00m);
// Assert
Assert.Equal(1, count);
}
}
These tests verify the core Observer contract. The first test confirms observers receive notifications with correct data. The second test ensures unsubscription works properly. Add tests for error scenarios like null subscribers or exceptions in observer code.