namespace WhiteRabbit { using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; internal static class DataflowBlockHelpers { private static ExecutionDataflowBlockOptions ExecutionOptions { get; } = new ExecutionDataflowBlockOptions { BoundedCapacity = 100000, }; public static IPropagatorBlock Id() { return new TransformBlock(element => element, ExecutionOptions); } public static void WriteToTargetBlock(this IEnumerable enumerable, ITargetBlock target) { var block = new TransformBlock(line => line, ExecutionOptions); block.LinkForever(target); WriteToTargetBlockAsync(enumerable, block).Wait(); block.Complete(); } public static IPropagatorBlock PipeMany(this IPropagatorBlock source, Func> mapper) { return source.Pipe(new TransformManyBlock(mapper, ExecutionOptions)); } public static IPropagatorBlock Pipe(this IPropagatorBlock source, Func mapper) { return source.Pipe(new TransformBlock(mapper, ExecutionOptions)); } public static IPropagatorBlock Pipe(this IPropagatorBlock source, IPropagatorBlock target) { source.LinkForever(target); return DataflowBlock.Encapsulate(source, target); } public static ISourceBlock Pipe(this ISourceBlock source, Func mapper) { return source.Pipe(new TransformBlock(mapper, ExecutionOptions)); } public static ISourceBlock Pipe(this ISourceBlock source, IPropagatorBlock target) { source.LinkForever(target); return target; } public static Task LinkForever(this ISourceBlock source, Action action) { return source.LinkForever(new ActionBlock(action, ExecutionOptions)); } public static Task LinkForever(this ISourceBlock source, ITargetBlock target) { source.LinkTo(target); source.Completion.ContinueWith(t => { if (t.IsFaulted) { target.Fault(t.Exception); } else { target.Complete(); } }); return target.Completion; } private static async Task WriteToTargetBlockAsync(IEnumerable enumerable, ITargetBlock target) { foreach (var element in enumerable) { await target.SendAsync(element); } } } }