For a list of Task, Interleaving is to have a list of data independent Task workflow.
(1) await always block when doing Task.Run. So there are no interleaving here.
(2) This changed in a Parallel loop, since await is threadlocal so won't block all of the task.
(3) TCS Parallel is similar to WhenAny in term of error rate and speed. So Parallel is not effective
on repeated 7 url to 392. It seems interleaving does not optimize or could be too much network contention.
class InterLeaving
{
public int _processingCount;
static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToList();
var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
select new TaskCompletionSource<T>()).ToList();
int nextTaskIndex = -1;
foreach (var inputTask in inputTasks)
{
inputTask.ContinueWith(completed =>
{
var source = sources[Interlocked.Increment(ref nextTaskIndex)];
if (completed.IsFaulted)
source.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled)
source.TrySetCanceled();
else
source.TrySetResult(completed.Result);
}, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
return from source in sources
select source.Task;
}
public void TCSCombinator()
{
var urls = GetListOfUrl();
List<Task<string>> taskList = (from _ in urls select WebDownloadDataTask(_)).ToList();
// TCS interleaved by running in parallel
Interleaved(taskList).AsParallel().ForAll(async t => // async lambda
{
string s = await t;
Console.WriteLine("Interleaved in Parallel: " + s.Length);
if(s!="") Interlocked.Increment(ref _processingCount);
});
//foreach( var t in Interleaved(taskList))
//{
// string s = await t; // await will block include foreach
// Console.WriteLine("Interleaved: "+s.Length);
//}
Console.WriteLine("TCS Combinator Totally {0} processed", _processingCount);
}
public async void WhenAny()
{
var urls = GetListOfUrl();
List<Task<string>> taskList = (from _ in urls select WebDownloadDataTask(_)).ToList();
// sequencial interleaving since await block while-loop
while (taskList.Count > 0)
{
// WhenAny return Task<CompletedTask>>
Task<string> t = await Task.WhenAny(taskList); // await trigger Task.Run then blocks while-loop
taskList.Remove(t);
string s = await t; // this is the same as t.Result, just to make sure to complete
Console.WriteLine("WhenAny interleaving process data: " + s.Length);
if (t.Result != "") _processingCount++;
}
Console.WriteLine("WhenAny Totally {0} processed", _processingCount);
}
private Task<string> WebDownloadDataTask(string u)
{
return Task<string>.Run(() =>
{
string data = "";
try
{
WebClient wc = new WebClient();
data = wc.DownloadString(u);
Console.WriteLine("Done: " + u);
}
catch
{
Console.WriteLine("Exception");
}
return data;
});
}
List<string> GetListOfUrl()
{
var list = new List<string>
{
"http://www.cnn.com",
"http://www.yahoo.com",
"http://www.nfl.com",
"http://www.nba.com",
"http://www.cnbc.com",
"http://www.msdn.com",
"http://www.usatoday.com"
};
return list.Concat(list).Concat(list).Concat(list).Concat(list).Concat(list).
...
Concat(list).Concat(list).Concat(list).Concat(list).Concat(list).ToList();
}
}
Friday, March 11, 2016
TAP interleaving using build-in WhenAny and Custom TCS Combinator
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment