How to mark a TPL dataflow cycle to complete?

If the purpose of your code is to traverse the directory structure using some sort of parallelism then I would suggest not using TPL Dataflow and use Microsoft’s Reactive Framework instead. I think it becomes much simpler.

Here’s how I would do it.

First define a recursive function to build the list of directories:

Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
    Observable
        .Return(di)
        .Concat(di.GetDirectories()
            .ToObservable()
            .SelectMany(di2 => recurse(di2)))
        .ObserveOn(Scheduler.Default);

This performs the recurse of the directories and uses the default Rx scheduler which causes the observable to run in parallel.

So by calling recurse with an input DirectoryInfo I get an observable list of the input directory and all of its descendants.

Now I can build a fairly straight-forward query to get the results I want:

var query =
    from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
    from fi in di.GetFiles().ToObservable()
    let zxy =
        fi
            .FullName
            .Split('\\')
            .Reverse()
            .Take(3)
            .Reverse()
            .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
            .ToArray()
    let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
    select new FileInfo(Path.Combine(di.FullName, suffix));

Now I can action the query like this:

query
    .Subscribe(s =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);
    });

Now I may have missed a little bit in your custom code but if this is an approach you want to take I’m sure you can fix any logical issues quite easily.

This code automatically handles completion when it runs out of child directories and files.

To add Rx to your project look for “Rx-Main” in NuGet.

Leave a Comment