Wednesday, March 30, 2016

Think threading in pseudo-code

Threading primitives, Algorithm are better described in code

PLinq=AsParallel+With options
static void p_linq_as_parallel() 
{
    var rng = Enumerable.Range(3, 1000000000 - 3).AsParallel();
    var primes = rng.Where(r => Enumerable.Range(2, (int) Math.Sqrt(r)).AsParallel().All(i => r%i > 0));
}

CountDown Rendezvous
static CountdownEvent _ce= new CountdownEvent(2);
static void count_down_2()
{
    new Thread((d) =>
    {
        Thread.Sleep(3000);
        Console.WriteLine("signal/wait after 3 sec, Id={0}",
            Thread.CurrentThread.ManagedThreadId);
        _ce.Signal();
        _ce.Wait();
    }).Start(); //immediate start no more sleep
    Console.WriteLine("immed signal but have to wait,Id={0}",
        Thread.CurrentThread.ManagedThreadId);
    _ce.Signal();
    _ce.Wait();
    Console.WriteLine("all signaled so proceed");
}


Reader/Writer Lock = Monitor +upgrade+policy

// ReaderWriterLockSlim is 2x slower than lock
private static List<int> _shared_data = new List<int>();
private static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static void reader_writer()
{
    //multi- reader and writer
    new Thread(read_intermittent_continously).Start(); //muti-reader
    new Thread(read_intermittent_continously).Start();

    new Thread(write_occationa_continously).Start(); //muti-reader
    new Thread(write_occationa_continously).Start();
}

static void read_intermittent_continously()
{
    while (true)
    {
        _rw.EnterReadLock();
        foreach (var i in _shared_data) { Thread.Sleep(10);}
        _rw.ExitReadLock();
    }
}

static void write_occationa_continously()
{
    while (true)
    {
        int i = (new Random()).Next();
        if (!_shared_data.Contains(i))
        {
            _rw.EnterWriteLock();
                _shared_data.Add(i);
            _rw.ExitWriteLock();
        }
        // need to read before upd=upgradeable
        _rw.EnterUpgradeableReadLock();
            int ii = _shared_data[0];
            _rw.EnterWriteLock();
                // do some updates
            _rw.ExitWriteLock();
        _rw.ExitUpgradeableReadLock();
        Thread.Sleep(300);
    }
}

static void nested_lock()
{
    //var rw = new ReaderWriterLockSlim(); runtime exception, need policy
    var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    rw.EnterReadLock();
    rw.EnterReadLock();
    rw.ExitReadLock();
    rw.ExitReadLock();
}


Barrier=Rendezvous= Countdown repeatedly
static Barrier _barrier = new Barrier(3);
static Action _act = () => {
    for (int i = 0; i < 5; i++)
    {
        Console.Write(i + " ");
        _barrier.SignalAndWait();
    }
    };

static void setup_rendezvous()
{
    // output 000111222333444 so block till all 3 get there
    new Thread(()=> { _act(); } );
    new Thread(() => { _act(); });
    new Thread(() => { _act(); });
}




No comments:

Post a Comment