C# Streaming and Pipelines
Streaming Overview
Streaming processes data incrementally rather than loading everything into memory. This enables handling of large datasets and real-time data flows efficiently.
// Traditional - loads all data into memory
List<Record> records = await LoadAllRecordsAsync(); // Memory: O(n)
foreach (var record in records)
{
Process(record);
}
// Streaming - processes one at a time
await foreach (var record in StreamRecordsAsync()) // Memory: O(1)
{
Process(record);
}
IAsyncEnumerable
Async iteration for streaming data asynchronously.
Producing Async Streams
// yield return in async method
public async IAsyncEnumerable<int> GenerateNumbersAsync(int count)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(100); // Simulate async work
yield return i;
}
}
// From database
public async IAsyncEnumerable<Customer> GetCustomersAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var connection = await _connectionFactory.CreateConnectionAsync();
await using var command = connection.CreateCommand();
command.CommandText = "SELECT * FROM Customers";
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
yield return new Customer
{
Id = reader.GetInt32(0),
Name = reader.GetString(1)
};
}
}
// Wrapping synchronous enumerable
public async IAsyncEnumerable<string> ReadLinesAsync(string path)
{
using var reader = new StreamReader(path);
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
yield return line;
}
}
Consuming Async Streams
// await foreach
await foreach (var number in GenerateNumbersAsync(10))
{
Console.WriteLine(number);
}
// With cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await foreach (var item in GetItemsAsync().WithCancellation(cts.Token))
{
Process(item);
}
// ConfigureAwait
await foreach (var item in GetItemsAsync().ConfigureAwait(false))
{
Process(item);
}
// Manual enumeration
var enumerator = GetItemsAsync().GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
Process(enumerator.Current);
}
}
finally
{
await enumerator.DisposeAsync();
}
LINQ for Async Streams
// System.Linq.Async package
using System.Linq;
var results = await GetRecordsAsync()
.Where(r => r.IsActive)
.Select(r => r.Name)
.Take(10)
.ToListAsync();
// First/Single
var first = await GetRecordsAsync().FirstOrDefaultAsync();
// Aggregate
var count = await GetRecordsAsync().CountAsync();
var sum = await GetRecordsAsync().SumAsync(r => r.Amount);
// Any/All
bool hasActive = await GetRecordsAsync().AnyAsync(r => r.IsActive);
Buffering and Batching
// Buffer into chunks
public static async IAsyncEnumerable<T[]> BufferAsync<T>(
this IAsyncEnumerable<T> source,
int batchSize,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new List<T>(batchSize);
await foreach (var item in source.WithCancellation(cancellationToken))
{
buffer.Add(item);
if (buffer.Count >= batchSize)
{
yield return buffer.ToArray();
buffer.Clear();
}
}
if (buffer.Count > 0)
{
yield return buffer.ToArray();
}
}
// Usage
await foreach (var batch in GetRecordsAsync().BufferAsync(100))
{
await ProcessBatchAsync(batch); // Process 100 at a time
}
System.IO.Pipelines
High-performance I/O processing with minimal allocations.
Core Concepts
// Pipe: a channel between writer and reader
var pipe = new Pipe();
PipeWriter writer = pipe.Writer;
PipeReader reader = pipe.Reader;
// Writing
Memory<byte> buffer = writer.GetMemory(minimumSize: 512);
int bytesWritten = FillBuffer(buffer.Span);
writer.Advance(bytesWritten);
await writer.FlushAsync();
// Reading
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
ProcessBuffer(buffer);
reader.AdvanceTo(buffer.End); // Mark consumed
Pipeline Processing Pattern
public class LineProcessor
{
public async Task ProcessAsync(Stream input)
{
var pipe = new Pipe();
// Producer: read from stream into pipe
Task writing = FillPipeAsync(input, pipe.Writer);
// Consumer: process pipe data
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
private async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
break;
writer.Advance(bytesRead);
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
ProcessLine(line);
}
// Tell reader how much we consumed
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
await reader.CompleteAsync();
}
private bool TryReadLine(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
}
Working with ReadOnlySequence
// ReadOnlySequence can span multiple segments
ReadOnlySequence<byte> sequence = ...;
// Single segment (common case)
if (sequence.IsSingleSegment)
{
ProcessSpan(sequence.FirstSpan);
}
else
{
// Multi-segment: iterate or copy
foreach (ReadOnlyMemory<byte> segment in sequence)
{
ProcessMemory(segment);
}
}
// Copy to contiguous buffer when needed
byte[] array = sequence.ToArray();
// Parse from sequence
if (Utf8Parser.TryParse(sequence.FirstSpan, out int value, out int consumed))
{
// Use value
}
Channels
Thread-safe producer-consumer queues for async programming.
Basic Channel Usage
// Unbounded channel
var channel = Channel.CreateUnbounded<int>();
// Bounded channel (backpressure)
var boundedChannel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // Wait when full
SingleReader = true, // Optimization hint
SingleWriter = false
});
// Write
await channel.Writer.WriteAsync(42);
bool written = channel.Writer.TryWrite(42);
channel.Writer.Complete();
// Read
int value = await channel.Reader.ReadAsync();
bool read = channel.Reader.TryRead(out int item);
// Async enumeration
await foreach (var item in channel.Reader.ReadAllAsync())
{
Process(item);
}
Producer-Consumer Pattern
public class DataProcessor
{
private readonly Channel<WorkItem> _channel;
public DataProcessor(int bufferSize)
{
_channel = Channel.CreateBounded<WorkItem>(bufferSize);
}
// Producer
public async Task ProduceAsync(IAsyncEnumerable<WorkItem> source)
{
await foreach (var item in source)
{
await _channel.Writer.WriteAsync(item);
}
_channel.Writer.Complete();
}
// Consumer
public async Task ConsumeAsync(CancellationToken cancellationToken)
{
await foreach (var item in _channel.Reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item);
}
}
// Multiple consumers
public Task StartConsumersAsync(int count, CancellationToken cancellationToken)
{
var tasks = Enumerable.Range(0, count)
.Select(_ => ConsumeAsync(cancellationToken));
return Task.WhenAll(tasks);
}
}
// Usage
var processor = new DataProcessor(bufferSize: 100);
var producer = processor.ProduceAsync(GetItemsAsync());
var consumers = processor.StartConsumersAsync(4, cancellationToken);
await Task.WhenAll(producer, consumers);
Fan-Out Pattern
public async Task FanOutAsync<T>(
IAsyncEnumerable<T> source,
Func<T, Task> processor,
int maxConcurrency)
{
var channel = Channel.CreateBounded<T>(maxConcurrency * 2);
// Producer
var producer = Task.Run(async () =>
{
await foreach (var item in source)
{
await channel.Writer.WriteAsync(item);
}
channel.Writer.Complete();
});
// Consumers
var consumers = Enumerable.Range(0, maxConcurrency)
.Select(_ => Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
await processor(item);
}
}));
await Task.WhenAll(consumers.Append(producer));
}
Pipeline with Multiple Stages
public class Pipeline<TInput, TOutput>
{
public async Task<IAsyncEnumerable<TOutput>> ProcessAsync(
IAsyncEnumerable<TInput> source,
Func<TInput, Task<TOutput>> transform)
{
var channel = Channel.CreateUnbounded<TOutput>();
_ = Task.Run(async () =>
{
await foreach (var input in source)
{
var output = await transform(input);
await channel.Writer.WriteAsync(output);
}
channel.Writer.Complete();
});
return channel.Reader.ReadAllAsync();
}
}
// Multi-stage pipeline
public async IAsyncEnumerable<ProcessedData> ProcessPipelineAsync(
IAsyncEnumerable<RawData> source)
{
// Stage 1: Parse
var stage1 = Channel.CreateBounded<ParsedData>(100);
// Stage 2: Validate
var stage2 = Channel.CreateBounded<ValidatedData>(100);
// Stage 3: Transform
var stage3 = Channel.CreateBounded<ProcessedData>(100);
// Run stages concurrently
var parseTask = ParseAsync(source, stage1.Writer);
var validateTask = ValidateAsync(stage1.Reader, stage2.Writer);
var transformTask = TransformAsync(stage2.Reader, stage3.Writer);
await foreach (var result in stage3.Reader.ReadAllAsync())
{
yield return result;
}
await Task.WhenAll(parseTask, validateTask, transformTask);
}
Stream Processing
Processing Large Files
public async IAsyncEnumerable<LogEntry> ParseLargeLogFileAsync(
string path,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var stream = new FileStream(
path,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
bufferSize: 4096,
useAsync: true);
using var reader = new StreamReader(stream);
string? line;
while ((line = await reader.ReadLineAsync(cancellationToken)) != null)
{
if (TryParseLogEntry(line, out var entry))
{
yield return entry;
}
}
}
// Usage with filtering
var errors = ParseLargeLogFileAsync("large.log")
.Where(e => e.Level == LogLevel.Error)
.Take(100);
await foreach (var error in errors)
{
Console.WriteLine(error);
}
Network Streaming
public async IAsyncEnumerable<Message> StreamMessagesAsync(
Stream networkStream,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new byte[4096];
var messageBuffer = new List<byte>();
while (!cancellationToken.IsCancellationRequested)
{
int bytesRead = await networkStream.ReadAsync(
buffer.AsMemory(), cancellationToken);
if (bytesRead == 0)
yield break; // Connection closed
for (int i = 0; i < bytesRead; i++)
{
if (buffer[i] == '\n') // Message delimiter
{
var message = ParseMessage(messageBuffer.ToArray());
yield return message;
messageBuffer.Clear();
}
else
{
messageBuffer.Add(buffer[i]);
}
}
}
}
JSON Streaming
// Stream JSON array elements
public async IAsyncEnumerable<T> StreamJsonArrayAsync<T>(
Stream stream,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<T>(
stream,
cancellationToken: cancellationToken))
{
if (item != null)
{
yield return item;
}
}
}
// Write streaming JSON
public async Task WriteJsonStreamAsync<T>(
Stream stream,
IAsyncEnumerable<T> items,
CancellationToken cancellationToken = default)
{
await using var writer = new Utf8JsonWriter(stream);
writer.WriteStartArray();
await foreach (var item in items.WithCancellation(cancellationToken))
{
JsonSerializer.Serialize(writer, item);
}
writer.WriteEndArray();
}
Backpressure Handling
Control flow when producer is faster than consumer.
public class BackpressureController<T>
{
private readonly Channel<T> _channel;
private readonly int _highWaterMark;
private readonly int _lowWaterMark;
public BackpressureController(int capacity)
{
_highWaterMark = (int)(capacity * 0.8);
_lowWaterMark = (int)(capacity * 0.2);
_channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
});
}
public async ValueTask ProduceAsync(T item, CancellationToken ct = default)
{
// Writer will await if channel is full
await _channel.Writer.WriteAsync(item, ct);
}
public IAsyncEnumerable<T> ConsumeAsync() =>
_channel.Reader.ReadAllAsync();
}
Combining Streams
// Merge multiple streams
public static async IAsyncEnumerable<T> MergeAsync<T>(
params IAsyncEnumerable<T>[] sources)
{
var channel = Channel.CreateUnbounded<T>();
var tasks = sources.Select(async source =>
{
await foreach (var item in source)
{
await channel.Writer.WriteAsync(item);
}
}).ToList();
var completion = Task.WhenAll(tasks).ContinueWith(_ =>
channel.Writer.Complete());
await foreach (var item in channel.Reader.ReadAllAsync())
{
yield return item;
}
}
// Zip two streams
public static async IAsyncEnumerable<(T1, T2)> ZipAsync<T1, T2>(
IAsyncEnumerable<T1> first,
IAsyncEnumerable<T2> second)
{
var enum1 = first.GetAsyncEnumerator();
var enum2 = second.GetAsyncEnumerator();
try
{
while (await enum1.MoveNextAsync() && await enum2.MoveNextAsync())
{
yield return (enum1.Current, enum2.Current);
}
}
finally
{
await enum1.DisposeAsync();
await enum2.DisposeAsync();
}
}
Performance Patterns
Object Pooling with ArrayPool
public async IAsyncEnumerable<byte[]> ProcessChunksAsync(Stream stream)
{
var pool = ArrayPool<byte>.Shared;
byte[] buffer = pool.Rent(4096);
try
{
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer)) > 0)
{
// Copy to right-sized array for yielding
var chunk = new byte[bytesRead];
buffer.AsSpan(0, bytesRead).CopyTo(chunk);
yield return chunk;
}
}
finally
{
pool.Return(buffer);
}
}
Memory-Efficient Transformations
public async IAsyncEnumerable<TOutput> TransformAsync<TInput, TOutput>(
IAsyncEnumerable<TInput> source,
Func<TInput, TOutput> transform)
{
await foreach (var item in source)
{
yield return transform(item);
// Input is eligible for GC immediately
}
}
Version History
| Feature | Version | Significance |
|---|---|---|
| IAsyncEnumerable | C# 8.0 | Async iteration |
| System.IO.Pipelines | .NET Core 2.1 | High-perf I/O |
| System.Threading.Channels | .NET Core 2.1 | Async queues |
| System.Linq.Async | NuGet package | LINQ for async streams |
| await foreach | C# 8.0 | Async enumeration |
Key Takeaways
Use IAsyncEnumerable for async iteration: Stream data without loading everything into memory.
Channels for producer-consumer: Thread-safe, async-friendly queues with backpressure support.
Pipelines for high-throughput I/O: Zero-allocation processing with System.IO.Pipelines.
Bounded channels for backpressure: Prevent memory issues when producer outpaces consumer.
Yield return for lazy production: Data is generated only when consumed.
Combine with LINQ.Async: Apply familiar LINQ operations to async streams.
Found this guide helpful? Share it with your team:
Share on LinkedIn