Friday, March 11, 2016

TAP interleaving using build-in WhenAny and Custom TCS Combinator

For a list of Task, Interleaving is to have a list of data independent Task workflow.

(1) await always block when doing Task.Run. So there are no interleaving here.
(2) This changed in a Parallel loop, since await is threadlocal so won't block all of the task.
(3) TCS Parallel is similar to WhenAny in term of error rate and speed. So Parallel is not effective 
   on repeated 7 url to 392. It seems interleaving does not optimize or could be too much network contention.

    class InterLeaving
    {
        public int _processingCount;

        static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
        {
            var inputTasks = tasks.ToList();
            var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
                           select new TaskCompletionSource<T>()).ToList();
            int nextTaskIndex = -1;
            foreach (var inputTask in inputTasks)
            {
                inputTask.ContinueWith(completed =>
                {
                    var source = sources[Interlocked.Increment(ref nextTaskIndex)];
                    if (completed.IsFaulted)
                        source.TrySetException(completed.Exception.InnerExceptions);
                    else if (completed.IsCanceled)
                        source.TrySetCanceled();
                    else
                        source.TrySetResult(completed.Result);
                }, CancellationToken.None,
                   TaskContinuationOptions.ExecuteSynchronously,
                   TaskScheduler.Default);
            }
            return from source in sources
                   select source.Task;
        }

        public void TCSCombinator()
        {
            var urls = GetListOfUrl();
            List<Task<string>> taskList = (from _ in urls select WebDownloadDataTask(_)).ToList();

            // TCS interleaved by running in parallel
            Interleaved(taskList).AsParallel().ForAll(async t =>  // async lambda
            {
                string s = await t;
                Console.WriteLine("Interleaved in Parallel: " + s.Length);
                if(s!="") Interlocked.Increment(ref _processingCount);
            });

            //foreach( var t in Interleaved(taskList))
            //{
            //    string s = await t; // await will block include foreach
            //    Console.WriteLine("Interleaved: "+s.Length);
            //}
            Console.WriteLine("TCS Combinator Totally {0} processed", _processingCount);
        }


        public async void WhenAny()
        {
            var urls = GetListOfUrl();
            List<Task<string>> taskList = (from _ in urls select WebDownloadDataTask(_)).ToList();

            // sequencial interleaving since await block while-loop
            while (taskList.Count > 0)
            {
                // WhenAny return Task<CompletedTask>>
                Task<string> t = await Task.WhenAny(taskList); // await trigger Task.Run then blocks while-loop
                taskList.Remove(t);
                string s = await t; // this is the same as t.Result, just to make sure to complete
                Console.WriteLine("WhenAny interleaving process data: " + s.Length);
                if (t.Result != "") _processingCount++;
            }
            Console.WriteLine("WhenAny Totally {0} processed", _processingCount);
        }

        private Task<string> WebDownloadDataTask(string u)
        {
            return Task<string>.Run(() =>
            {
                string data = "";
                try
                {
                    WebClient wc = new WebClient();
                    data = wc.DownloadString(u);
                    Console.WriteLine("Done: " + u);
                }
                catch
                {
                    Console.WriteLine("Exception");
                }
                return data;
            });
        }

        List<string> GetListOfUrl()
        {
            var list = new List<string>
            {
                "http://www.cnn.com",
                "http://www.yahoo.com",
                "http://www.nfl.com",
                "http://www.nba.com",
                "http://www.cnbc.com",
                "http://www.msdn.com",
                "http://www.usatoday.com"
            };

            return list.Concat(list).Concat(list).Concat(list).Concat(list).Concat(list).
 ...
                Concat(list).Concat(list).Concat(list).Concat(list).Concat(list).ToList();
        }
    }

No comments:

Post a Comment