/**
 * The read/write mutex module provides a primitive for maintaining shared read
 * access and mutually exclusive write access.
 *
 * Copyright: Copyright Sean Kelly 2005 - 2009.
 * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
 * Authors:   Sean Kelly
 * Source:    $(DRUNTIMESRC core/sync/_rwmutex.d)
 */
/*          Copyright Sean Kelly 2005 - 2009.
 * Distributed under the Boost Software License, Version 1.0.
 *    (See accompanying file LICENSE or copy at
 *          http://www.boost.org/LICENSE_1_0.txt)
 */
module core.sync.rwmutex;
public import core.sync.exception;
import core.sync.condition;
import core.sync.mutex;
import core.memory;
version (Posix)
{
    import core.sys.posix.pthread;
}
////////////////////////////////////////////////////////////////////////////////
// ReadWriteMutex
//
// Reader reader();
// Writer writer();
////////////////////////////////////////////////////////////////////////////////
/**
 * This class represents a mutex that allows any number of readers to enter,
 * but when a writer enters, all other readers and writers are blocked.
 *
 * Please note that this mutex is not recursive and is intended to guard access
 * to data only.  Also, no deadlock checking is in place because doing so would
 * require dynamic memory allocation, which would reduce performance by an
 * unacceptable amount.  As a result, any attempt to recursively acquire this
 * mutex may well deadlock the caller, particularly if a write lock is acquired
 * while holding a read lock, or vice-versa.  In practice, this should not be
 * an issue however, because it is uncommon to call deeply into unknown code
 * while holding a lock that simply protects data.
 */
class ReadWriteMutex
{
    /**
     * Defines the policy used by this mutex.  Currently, two policies are
     * defined.
     *
     * The first will queue writers until no readers hold the mutex, then
     * pass the writers through one at a time.  If a reader acquires the mutex
     * while there are still writers queued, the reader will take precedence.
     *
     * The second will queue readers if there are any writers queued.  Writers
     * are passed through one at a time, and once there are no writers present,
     * all queued readers will be alerted.
     *
     * Future policies may offer a more even balance between reader and writer
     * precedence.
     */
    enum Policy
    {
        PREFER_READERS, /// Readers get preference.  This may starve writers.
        PREFER_WRITERS  /// Writers get preference.  This may starve readers.
    }
    ////////////////////////////////////////////////////////////////////////////
    // Initialization
    ////////////////////////////////////////////////////////////////////////////
    /**
     * Initializes a read/write mutex object with the supplied policy.
     *
     * Params:
     *  policy = The policy to use.
     *
     * Throws:
     *  SyncError on error.
     */
    this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
    {
        m_commonMutex = new Mutex;
        if ( !m_commonMutex )
            throw new SyncError( "Unable to initialize mutex" );
        m_readerQueue = new Condition( m_commonMutex );
        if ( !m_readerQueue )
            throw new SyncError( "Unable to initialize mutex" );
        m_writerQueue = new Condition( m_commonMutex );
        if ( !m_writerQueue )
            throw new SyncError( "Unable to initialize mutex" );
        m_policy = policy;
        m_reader = new Reader;
        m_writer = new Writer;
    }
    /// ditto
    shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
    {
        m_commonMutex = new shared Mutex;
        if ( !m_commonMutex )
            throw new SyncError( "Unable to initialize mutex" );
        m_readerQueue = new shared Condition( m_commonMutex );
        if ( !m_readerQueue )
            throw new SyncError( "Unable to initialize mutex" );
        m_writerQueue = new shared Condition( m_commonMutex );
        if ( !m_writerQueue )
            throw new SyncError( "Unable to initialize mutex" );
        m_policy = policy;
        m_reader = new shared Reader;
        m_writer = new shared Writer;
    }
    ////////////////////////////////////////////////////////////////////////////
    // General Properties
    ////////////////////////////////////////////////////////////////////////////
    /**
     * Gets the policy used by this mutex.
     *
     * Returns:
     *  The policy used by this mutex.
     */
    @property Policy policy() @safe nothrow
    {
        return m_policy;
    }
    ///ditto
    @property Policy policy() shared @safe nothrow
    {
        return m_policy;
    }
    ////////////////////////////////////////////////////////////////////////////
    // Reader/Writer Handles
    ////////////////////////////////////////////////////////////////////////////
    /**
     * Gets an object representing the reader lock for the associated mutex.
     *
     * Returns:
     *  A reader sub-mutex.
     */
    @property Reader reader() @safe nothrow
    {
        return m_reader;
    }
    ///ditto
    @property shared(Reader) reader() shared @safe nothrow
    {
        return m_reader;
    }
    /**
     * Gets an object representing the writer lock for the associated mutex.
     *
     * Returns:
     *  A writer sub-mutex.
     */
    @property Writer writer() @safe nothrow
    {
        return m_writer;
    }
    ///ditto
    @property shared(Writer) writer() shared @safe nothrow
    {
        return m_writer;
    }
    ////////////////////////////////////////////////////////////////////////////
    // Reader
    ////////////////////////////////////////////////////////////////////////////
    /**
     * This class can be considered a mutex in its own right, and is used to
     * negotiate a read lock for the enclosing mutex.
     */
    class Reader :
        Object.Monitor
    {
        /**
         * Initializes a read/write mutex reader proxy object.
         */
        this(this Q)() @trusted nothrow
            if (is(Q == Reader) || is(Q == shared Reader))
        {
            m_proxy.link = this;
            this.__monitor = cast(void*) &m_proxy;
        }
        /**
         * Acquires a read lock on the enclosing mutex.
         */
        @trusted void lock()
        {
            synchronized( m_commonMutex )
            {
                ++m_numQueuedReaders;
                scope(exit) --m_numQueuedReaders;
                while ( shouldQueueReader )
                    m_readerQueue.wait();
                ++m_numActiveReaders;
            }
        }
        /// ditto
        @trusted void lock() shared
        {
            synchronized( m_commonMutex )
            {
                ++(cast()m_numQueuedReaders);
                scope(exit) --(cast()m_numQueuedReaders);
                while ( shouldQueueReader )
                    m_readerQueue.wait();
                ++(cast()m_numActiveReaders);
            }
        }
        /**
         * Releases a read lock on the enclosing mutex.
         */
        @trusted void unlock()
        {
            synchronized( m_commonMutex )
            {
                if ( --m_numActiveReaders < 1 )
                {
                    if ( m_numQueuedWriters > 0 )
                        m_writerQueue.notify();
                }
            }
        }
        /// ditto
        @trusted void unlock() shared
        {
            synchronized( m_commonMutex )
            {
                if ( --(cast()m_numActiveReaders) < 1 )
                {
                    if ( m_numQueuedWriters > 0 )
                        m_writerQueue.notify();
                }
            }
        }
        /**
         * Attempts to acquire a read lock on the enclosing mutex.  If one can
         * be obtained without blocking, the lock is acquired and true is
         * returned.  If not, the lock is not acquired and false is returned.
         *
         * Returns:
         *  true if the lock was acquired and false if not.
         */
        @trusted bool tryLock()
        {
            synchronized( m_commonMutex )
            {
                if ( shouldQueueReader )
                    return false;
                ++m_numActiveReaders;
                return true;
            }
        }
        /// ditto
        @trusted bool tryLock() shared
        {
            synchronized( m_commonMutex )
            {
                if ( shouldQueueReader )
                    return false;
                ++(cast()m_numActiveReaders);
                return true;
            }
        }
        /**
         * Attempts to acquire a read lock on the enclosing mutex. If one can
         * be obtained without blocking, the lock is acquired and true is
         * returned. If not, the function blocks until either the lock can be
         * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
         * true if the lock was acquired and false if the function timed out.
         *
         * Params:
         *  timeout = maximum amount of time to wait for the lock
         * Returns:
         *  true if the lock was acquired and false if not.
         */
        @trusted bool tryLock(Duration timeout)
        {
            synchronized( m_commonMutex )
            {
                if (!shouldQueueReader)
                {
                    ++m_numActiveReaders;
                    return true;
                }
                enum zero = Duration.zero();
                if (timeout <= zero)
                    return false;
                ++m_numQueuedReaders;
                scope(exit) --m_numQueuedReaders;
                enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
                const initialTime = MonoTime.currTime;
                m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
                while (shouldQueueReader)
                {
                    const timeElapsed = MonoTime.currTime - initialTime;
                    if (timeElapsed >= timeout)
                        return false;
                    auto nextWait = timeout - timeElapsed;
                    m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
                }
                ++m_numActiveReaders;
                return true;
            }
        }
        /// ditto
        @trusted bool tryLock(Duration timeout) shared
        {
            const initialTime = MonoTime.currTime;
            synchronized( m_commonMutex )
            {
                ++(cast()m_numQueuedReaders);
                scope(exit) --(cast()m_numQueuedReaders);
                while (shouldQueueReader)
                {
                    const timeElapsed = MonoTime.currTime - initialTime;
                    if (timeElapsed >= timeout)
                        return false;
                    auto nextWait = timeout - timeElapsed;
                    // Avoid problems calling wait(Duration) with huge arguments.
                    enum maxWaitPerCall = dur!"hours"(24 * 365);
                    m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
                }
                ++(cast()m_numActiveReaders);
                return true;
            }
        }
    private:
        @property bool shouldQueueReader(this Q)() nothrow @safe @nogc
            if (is(Q == Reader) || is(Q == shared Reader))
        {
            if ( m_numActiveWriters > 0 )
                return true;
            switch ( m_policy )
            {
            case Policy.PREFER_WRITERS:
                 return m_numQueuedWriters > 0;
            case Policy.PREFER_READERS:
            default:
                 break;
            }
        return false;
        }
        struct MonitorProxy
        {
            Object.Monitor link;
        }
        MonitorProxy    m_proxy;
    }
    ////////////////////////////////////////////////////////////////////////////
    // Writer
    ////////////////////////////////////////////////////////////////////////////
    /**
     * This class can be considered a mutex in its own right, and is used to
     * negotiate a write lock for the enclosing mutex.
     */
    class Writer :
        Object.Monitor
    {
        /**
         * Initializes a read/write mutex writer proxy object.
         */
        this(this Q)() @trusted nothrow
            if (is(Q == Writer) || is(Q == shared Writer))
        {
            m_proxy.link = this;
            this.__monitor = cast(void*) &m_proxy;
        }
        /**
         * Acquires a write lock on the enclosing mutex.
         */
        @trusted void lock()
        {
            synchronized( m_commonMutex )
            {
                ++m_numQueuedWriters;
                scope(exit) --m_numQueuedWriters;
                while ( shouldQueueWriter )
                    m_writerQueue.wait();
                ++m_numActiveWriters;
            }
        }
        /// ditto
        @trusted void lock() shared
        {
            synchronized( m_commonMutex )
            {
                ++(cast()m_numQueuedWriters);
                scope(exit) --(cast()m_numQueuedWriters);
                while ( shouldQueueWriter )
                    m_writerQueue.wait();
                ++(cast()m_numActiveWriters);
            }
        }
        /**
         * Releases a write lock on the enclosing mutex.
         */
        @trusted void unlock()
        {
            synchronized( m_commonMutex )
            {
                if ( --m_numActiveWriters < 1 )
                {
                    switch ( m_policy )
                    {
                    default:
                    case Policy.PREFER_READERS:
                        if ( m_numQueuedReaders > 0 )
                            m_readerQueue.notifyAll();
                        else if ( m_numQueuedWriters > 0 )
                            m_writerQueue.notify();
                        break;
                    case Policy.PREFER_WRITERS:
                        if ( m_numQueuedWriters > 0 )
                            m_writerQueue.notify();
                        else if ( m_numQueuedReaders > 0 )
                            m_readerQueue.notifyAll();
                    }
                }
            }
        }
        /// ditto
        @trusted void unlock() shared
        {
            synchronized( m_commonMutex )
            {
                if ( --(cast()m_numActiveWriters) < 1 )
                {
                    switch ( m_policy )
                    {
                    default:
                    case Policy.PREFER_READERS:
                        if ( m_numQueuedReaders > 0 )
                            m_readerQueue.notifyAll();
                        else if ( m_numQueuedWriters > 0 )
                            m_writerQueue.notify();
                        break;
                    case Policy.PREFER_WRITERS:
                        if ( m_numQueuedWriters > 0 )
                            m_writerQueue.notify();
                        else if ( m_numQueuedReaders > 0 )
                            m_readerQueue.notifyAll();
                    }
                }
            }
        }
        /**
         * Attempts to acquire a write lock on the enclosing mutex.  If one can
         * be obtained without blocking, the lock is acquired and true is
         * returned.  If not, the lock is not acquired and false is returned.
         *
         * Returns:
         *  true if the lock was acquired and false if not.
         */
        @trusted bool tryLock()
        {
            synchronized( m_commonMutex )
            {
                if ( shouldQueueWriter )
                    return false;
                ++m_numActiveWriters;
                return true;
            }
        }
        /// ditto
        @trusted bool tryLock() shared
        {
            synchronized( m_commonMutex )
            {
                if ( shouldQueueWriter )
                    return false;
                ++(cast()m_numActiveWriters);
                return true;
            }
        }
        /**
         * Attempts to acquire a write lock on the enclosing mutex. If one can
         * be obtained without blocking, the lock is acquired and true is
         * returned. If not, the function blocks until either the lock can be
         * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
         * true if the lock was acquired and false if the function timed out.
         *
         * Params:
         *  timeout = maximum amount of time to wait for the lock
         * Returns:
         *  true if the lock was acquired and false if not.
         */
        @trusted bool tryLock(Duration timeout)
        {
            synchronized( m_commonMutex )
            {
                if (!shouldQueueWriter)
                {
                    ++m_numActiveWriters;
                    return true;
                }
                enum zero = Duration.zero();
                if (timeout <= zero)
                    return false;
                ++m_numQueuedWriters;
                scope(exit) --m_numQueuedWriters;
                enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
                const initialTime = MonoTime.currTime;
                m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
                while (shouldQueueWriter)
                {
                    const timeElapsed = MonoTime.currTime - initialTime;
                    if (timeElapsed >= timeout)
                        return false;
                    auto nextWait = timeout - timeElapsed;
                    m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
                }
                ++m_numActiveWriters;
                return true;
            }
        }
        /// ditto
        @trusted bool tryLock(Duration timeout) shared
        {
            const initialTime = MonoTime.currTime;
            synchronized( m_commonMutex )
            {
                ++(cast()m_numQueuedWriters);
                scope(exit) --(cast()m_numQueuedWriters);
                while (shouldQueueWriter)
                {
                    const timeElapsed = MonoTime.currTime - initialTime;
                    if (timeElapsed >= timeout)
                        return false;
                    auto nextWait = timeout - timeElapsed;
                    // Avoid problems calling wait(Duration) with huge arguments.
                    enum maxWaitPerCall = dur!"hours"(24 * 365);
                    m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
                }
                ++(cast()m_numActiveWriters);
                return true;
            }
        }
    private:
        @property bool shouldQueueWriter(this Q)()
            if (is(Q == Writer) || is(Q == shared Writer))
        {
            if ( m_numActiveWriters > 0 ||
                m_numActiveReaders > 0 )
                return true;
            switch ( m_policy )
            {
            case Policy.PREFER_READERS:
                return m_numQueuedReaders > 0;
            case Policy.PREFER_WRITERS:
            default:
                 break;
            }
        return false;
        }
        struct MonitorProxy
        {
            Object.Monitor link;
        }
        MonitorProxy    m_proxy;
    }
private:
    Policy      m_policy;
    Reader      m_reader;
    Writer      m_writer;
    Mutex       m_commonMutex;
    Condition   m_readerQueue;
    Condition   m_writerQueue;
    int         m_numQueuedReaders;
    int         m_numActiveReaders;
    int         m_numQueuedWriters;
    int         m_numActiveWriters;
}
////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
unittest
{
    import core.atomic, core.thread, core.sync.semaphore;
    static void runTest(ReadWriteMutex.Policy policy)
    {
        scope mutex = new ReadWriteMutex(policy);
        scope rdSemA = new Semaphore, rdSemB = new Semaphore,
              wrSemA = new Semaphore, wrSemB = new Semaphore;
        shared size_t numReaders, numWriters;
        void readerFn()
        {
            synchronized (mutex.reader)
            {
                atomicOp!"+="(numReaders, 1);
                rdSemA.notify();
                rdSemB.wait();
                atomicOp!"-="(numReaders, 1);
            }
        }
        void writerFn()
        {
            synchronized (mutex.writer)
            {
                atomicOp!"+="(numWriters, 1);
                wrSemA.notify();
                wrSemB.wait();
                atomicOp!"-="(numWriters, 1);
            }
        }
        void waitQueued(size_t queuedReaders, size_t queuedWriters)
        {
            for (;;)
            {
                synchronized (mutex.m_commonMutex)
                {
                    if (mutex.m_numQueuedReaders == queuedReaders &&
                        mutex.m_numQueuedWriters == queuedWriters)
                        break;
                }
                Thread.yield();
            }
        }
        scope group = new ThreadGroup;
        // 2 simultaneous readers
        group.create(&readerFn); group.create(&readerFn);
        rdSemA.wait(); rdSemA.wait();
        assert(numReaders == 2);
        rdSemB.notify(); rdSemB.notify();
        group.joinAll();
        assert(numReaders == 0);
        foreach (t; group) group.remove(t);
        // 1 writer at a time
        group.create(&writerFn); group.create(&writerFn);
        wrSemA.wait();
        assert(!wrSemA.tryWait());
        assert(numWriters == 1);
        wrSemB.notify();
        wrSemA.wait();
        assert(numWriters == 1);
        wrSemB.notify();
        group.joinAll();
        assert(numWriters == 0);
        foreach (t; group) group.remove(t);
        // reader and writer are mutually exclusive
        group.create(&readerFn);
        rdSemA.wait();
        group.create(&writerFn);
        waitQueued(0, 1);
        assert(!wrSemA.tryWait());
        assert(numReaders == 1 && numWriters == 0);
        rdSemB.notify();
        wrSemA.wait();
        assert(numReaders == 0 && numWriters == 1);
        wrSemB.notify();
        group.joinAll();
        assert(numReaders == 0 && numWriters == 0);
        foreach (t; group) group.remove(t);
        // writer and reader are mutually exclusive
        group.create(&writerFn);
        wrSemA.wait();
        group.create(&readerFn);
        waitQueued(1, 0);
        assert(!rdSemA.tryWait());
        assert(numReaders == 0 && numWriters == 1);
        wrSemB.notify();
        rdSemA.wait();
        assert(numReaders == 1 && numWriters == 0);
        rdSemB.notify();
        group.joinAll();
        assert(numReaders == 0 && numWriters == 0);
        foreach (t; group) group.remove(t);
        // policy determines whether queued reader or writers progress first
        group.create(&writerFn);
        wrSemA.wait();
        group.create(&readerFn);
        group.create(&writerFn);
        waitQueued(1, 1);
        assert(numReaders == 0 && numWriters == 1);
        wrSemB.notify();
        if (policy == ReadWriteMutex.Policy.PREFER_READERS)
        {
            rdSemA.wait();
            assert(numReaders == 1 && numWriters == 0);
            rdSemB.notify();
            wrSemA.wait();
            assert(numReaders == 0 && numWriters == 1);
            wrSemB.notify();
        }
        else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
        {
            wrSemA.wait();
            assert(numReaders == 0 && numWriters == 1);
            wrSemB.notify();
            rdSemA.wait();
            assert(numReaders == 1 && numWriters == 0);
            rdSemB.notify();
        }
        group.joinAll();
        assert(numReaders == 0 && numWriters == 0);
        foreach (t; group) group.remove(t);
    }
    runTest(ReadWriteMutex.Policy.PREFER_READERS);
    runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
}
unittest
{
    import core.atomic, core.thread;
    __gshared ReadWriteMutex rwmutex;
    shared static bool threadTriedOnceToGetLock;
    shared static bool threadFinallyGotLock;
    rwmutex = new ReadWriteMutex();
    atomicFence;
    const maxTimeAllowedForTest = dur!"seconds"(20);
    // Test ReadWriteMutex.Reader.tryLock(Duration).
    {
        static void testReaderTryLock()
        {
            assert(!rwmutex.reader.tryLock(Duration.min));
            threadTriedOnceToGetLock.atomicStore(true);
            assert(rwmutex.reader.tryLock(Duration.max));
            threadFinallyGotLock.atomicStore(true);
            rwmutex.reader.unlock;
        }
        assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
        auto otherThread = new Thread(&testReaderTryLock).start;
        const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
        Thread.yield;
        // We started otherThread with the writer lock held so otherThread's
        // first rwlock.reader.tryLock with timeout Duration.min should fail.
        while (!threadTriedOnceToGetLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        rwmutex.writer.unlock;
        // Soon after we release the writer lock otherThread's second
        // rwlock.reader.tryLock with timeout Duration.max should succeed.
        while (!threadFinallyGotLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        otherThread.join;
    }
    threadTriedOnceToGetLock.atomicStore(false); // Reset.
    threadFinallyGotLock.atomicStore(false); // Reset.
    // Test ReadWriteMutex.Writer.tryLock(Duration).
    {
        static void testWriterTryLock()
        {
            assert(!rwmutex.writer.tryLock(Duration.min));
            threadTriedOnceToGetLock.atomicStore(true);
            assert(rwmutex.writer.tryLock(Duration.max));
            threadFinallyGotLock.atomicStore(true);
            rwmutex.writer.unlock;
        }
        assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
        auto otherThread = new Thread(&testWriterTryLock).start;
        const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
        Thread.yield;
        // We started otherThread with the reader lock held so otherThread's
        // first rwlock.writer.tryLock with timeout Duration.min should fail.
        while (!threadTriedOnceToGetLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        rwmutex.reader.unlock;
        // Soon after we release the reader lock otherThread's second
        // rwlock.writer.tryLock with timeout Duration.max should succeed.
        while (!threadFinallyGotLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        otherThread.join;
    }
}
unittest
{
    import core.atomic, core.thread, core.sync.semaphore;
    static void runTest(ReadWriteMutex.Policy policy)
    {
        shared scope mutex = new shared ReadWriteMutex(policy);
        scope rdSemA = new Semaphore, rdSemB = new Semaphore,
              wrSemA = new Semaphore, wrSemB = new Semaphore;
        shared size_t numReaders, numWriters;
        void readerFn()
        {
            synchronized (mutex.reader)
            {
                atomicOp!"+="(numReaders, 1);
                rdSemA.notify();
                rdSemB.wait();
                atomicOp!"-="(numReaders, 1);
            }
        }
        void writerFn()
        {
            synchronized (mutex.writer)
            {
                atomicOp!"+="(numWriters, 1);
                wrSemA.notify();
                wrSemB.wait();
                atomicOp!"-="(numWriters, 1);
            }
        }
        void waitQueued(size_t queuedReaders, size_t queuedWriters)
        {
            for (;;)
            {
                synchronized (mutex.m_commonMutex)
                {
                    if (mutex.m_numQueuedReaders == queuedReaders &&
                        mutex.m_numQueuedWriters == queuedWriters)
                        break;
                }
                Thread.yield();
            }
        }
        scope group = new ThreadGroup;
        // 2 simultaneous readers
        group.create(&readerFn); group.create(&readerFn);
        rdSemA.wait(); rdSemA.wait();
        assert(numReaders == 2);
        rdSemB.notify(); rdSemB.notify();
        group.joinAll();
        assert(numReaders == 0);
        foreach (t; group) group.remove(t);
        // 1 writer at a time
        group.create(&writerFn); group.create(&writerFn);
        wrSemA.wait();
        assert(!wrSemA.tryWait());
        assert(numWriters == 1);
        wrSemB.notify();
        wrSemA.wait();
        assert(numWriters == 1);
        wrSemB.notify();
        group.joinAll();
        assert(numWriters == 0);
        foreach (t; group) group.remove(t);
        // reader and writer are mutually exclusive
        group.create(&readerFn);
        rdSemA.wait();
        group.create(&writerFn);
        waitQueued(0, 1);
        assert(!wrSemA.tryWait());
        assert(numReaders == 1 && numWriters == 0);
        rdSemB.notify();
        wrSemA.wait();
        assert(numReaders == 0 && numWriters == 1);
        wrSemB.notify();
        group.joinAll();
        assert(numReaders == 0 && numWriters == 0);
        foreach (t; group) group.remove(t);
        // writer and reader are mutually exclusive
        group.create(&writerFn);
        wrSemA.wait();
        group.create(&readerFn);
        waitQueued(1, 0);
        assert(!rdSemA.tryWait());
        assert(numReaders == 0 && numWriters == 1);
        wrSemB.notify();
        rdSemA.wait();
        assert(numReaders == 1 && numWriters == 0);
        rdSemB.notify();
        group.joinAll();
        assert(numReaders == 0 && numWriters == 0);
        foreach (t; group) group.remove(t);
        // policy determines whether queued reader or writers progress first
        group.create(&writerFn);
        wrSemA.wait();
        group.create(&readerFn);
        group.create(&writerFn);
        waitQueued(1, 1);
        assert(numReaders == 0 && numWriters == 1);
        wrSemB.notify();
        if (policy == ReadWriteMutex.Policy.PREFER_READERS)
        {
            rdSemA.wait();
            assert(numReaders == 1 && numWriters == 0);
            rdSemB.notify();
            wrSemA.wait();
            assert(numReaders == 0 && numWriters == 1);
            wrSemB.notify();
        }
        else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
        {
            wrSemA.wait();
            assert(numReaders == 0 && numWriters == 1);
            wrSemB.notify();
            rdSemA.wait();
            assert(numReaders == 1 && numWriters == 0);
            rdSemB.notify();
        }
        group.joinAll();
        assert(numReaders == 0 && numWriters == 0);
        foreach (t; group) group.remove(t);
    }
    runTest(ReadWriteMutex.Policy.PREFER_READERS);
    runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
}
unittest
{
    import core.atomic, core.thread;
    shared static ReadWriteMutex rwmutex;
    shared static bool threadTriedOnceToGetLock;
    shared static bool threadFinallyGotLock;
    rwmutex = new shared ReadWriteMutex();
    atomicFence;
    const maxTimeAllowedForTest = dur!"seconds"(20);
    // Test ReadWriteMutex.Reader.tryLock(Duration).
    {
        static void testReaderTryLock()
        {
            assert(!rwmutex.reader.tryLock(Duration.min));
            threadTriedOnceToGetLock.atomicStore(true);
            assert(rwmutex.reader.tryLock(Duration.max));
            threadFinallyGotLock.atomicStore(true);
            rwmutex.reader.unlock;
        }
        assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
        auto otherThread = new Thread(&testReaderTryLock).start;
        const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
        Thread.yield;
        // We started otherThread with the writer lock held so otherThread's
        // first rwlock.reader.tryLock with timeout Duration.min should fail.
        while (!threadTriedOnceToGetLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        rwmutex.writer.unlock;
        // Soon after we release the writer lock otherThread's second
        // rwlock.reader.tryLock with timeout Duration.max should succeed.
        while (!threadFinallyGotLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        otherThread.join;
    }
    threadTriedOnceToGetLock.atomicStore(false); // Reset.
    threadFinallyGotLock.atomicStore(false); // Reset.
    // Test ReadWriteMutex.Writer.tryLock(Duration).
    {
        static void testWriterTryLock()
        {
            assert(!rwmutex.writer.tryLock(Duration.min));
            threadTriedOnceToGetLock.atomicStore(true);
            assert(rwmutex.writer.tryLock(Duration.max));
            threadFinallyGotLock.atomicStore(true);
            rwmutex.writer.unlock;
        }
        assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
        auto otherThread = new Thread(&testWriterTryLock).start;
        const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
        Thread.yield;
        // We started otherThread with the reader lock held so otherThread's
        // first rwlock.writer.tryLock with timeout Duration.min should fail.
        while (!threadTriedOnceToGetLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        rwmutex.reader.unlock;
        // Soon after we release the reader lock otherThread's second
        // rwlock.writer.tryLock with timeout Duration.max should succeed.
        while (!threadFinallyGotLock.atomicLoad)
        {
            assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
            Thread.yield;
        }
        otherThread.join;
    }
}