/**
 * The event module provides a primitive for lightweight signaling of other threads
 * (emulating Windows events on Posix)
 *
 * Copyright: Copyright (c) 2019 D Language Foundation
 * License: Distributed under the
 *    $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
 *    (See accompanying file LICENSE)
 * Authors: Rainer Schuetze
 * Source:    $(DRUNTIMESRC core/sync/event.d)
 */
module core.sync.event;
version (Windows)
{
    import core.sys.windows.basetsd /+: HANDLE +/;
    import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
    import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
        WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
}
else version (Posix)
{
    import core.sys.posix.pthread;
    import core.sys.posix.sys.types;
    import core.sys.posix.time;
}
else
{
    static assert(false, "Platform not supported");
}
import core.time;
import core.internal.abort : abort;
/**
 * represents an event. Clients of an event are suspended while waiting
 * for the event to be "signaled".
 *
 * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
 * `CreateEvent` and `SetEvent` on Windows.
---
import core.sync.event, core.thread, std.file;
struct ProcessFile
{
    ThreadGroup group;
    Event event;
    void[] buffer;
    void doProcess()
    {
        event.wait();
        // process buffer
    }
    void process(string filename)
    {
        event.initialize(true, false);
        group = new ThreadGroup;
        for (int i = 0; i < 10; ++i)
            group.create(&doProcess);
        buffer = std.file.read(filename);
        event.set();
        group.joinAll();
        event.terminate();
    }
}
---
 */
struct Event
{
nothrow @nogc:
    /**
     * Creates an event object.
     *
     * Params:
     *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
     *  initialState = initial state of the signal
     */
    this(bool manualReset, bool initialState)
    {
        initialize(manualReset, initialState);
    }
    /**
     * Initializes an event object. Does nothing if the event is already initialized.
     *
     * Params:
     *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
     *  initialState = initial state of the signal
     */
    void initialize(bool manualReset, bool initialState)
    {
        version (Windows)
        {
            if (m_event)
                return;
            m_event = CreateEvent(null, manualReset, initialState, null);
            m_event || abort("Error: CreateEvent failed.");
        }
        else version (Posix)
        {
            if (m_initalized)
                return;
            pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
                abort("Error: pthread_mutex_init failed.");
            static if ( is( typeof( pthread_condattr_setclock ) ) )
            {
                pthread_condattr_t attr = void;
                pthread_condattr_init(&attr) == 0 ||
                    abort("Error: pthread_condattr_init failed.");
                pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
                    abort("Error: pthread_condattr_setclock failed.");
                pthread_cond_init(&m_cond, &attr) == 0 ||
                    abort("Error: pthread_cond_init failed.");
                pthread_condattr_destroy(&attr) == 0 ||
                    abort("Error: pthread_condattr_destroy failed.");
            }
            else
            {
                pthread_cond_init(&m_cond, null) == 0 ||
                    abort("Error: pthread_cond_init failed.");
            }
            m_state = initialState;
            m_manualReset = manualReset;
            m_initalized = true;
        }
    }
    // copying not allowed, can produce resource leaks
    @disable this(this);
    @disable void opAssign(Event);
    ~this()
    {
        terminate();
    }
    /**
     * deinitialize event. Does nothing if the event is not initialized. There must not be
     * threads currently waiting for the event to be signaled.
    */
    void terminate()
    {
        version (Windows)
        {
            if (m_event)
                CloseHandle(m_event);
            m_event = null;
        }
        else version (Posix)
        {
            if (m_initalized)
            {
                pthread_mutex_destroy(&m_mutex) == 0 ||
                    abort("Error: pthread_mutex_destroy failed.");
                pthread_cond_destroy(&m_cond) == 0 ||
                    abort("Error: pthread_cond_destroy failed.");
                m_initalized = false;
            }
        }
    }
    /// Set the event to "signaled", so that waiting clients are resumed
    void set()
    {
        version (Windows)
        {
            if (m_event)
                SetEvent(m_event);
        }
        else version (Posix)
        {
            if (m_initalized)
            {
                pthread_mutex_lock(&m_mutex);
                m_state = true;
                pthread_cond_broadcast(&m_cond);
                pthread_mutex_unlock(&m_mutex);
            }
        }
    }
    /// Reset the event manually
    void reset()
    {
        version (Windows)
        {
            if (m_event)
                ResetEvent(m_event);
        }
        else version (Posix)
        {
            if (m_initalized)
            {
                pthread_mutex_lock(&m_mutex);
                m_state = false;
                pthread_mutex_unlock(&m_mutex);
            }
        }
    }
    /**
     * Wait for the event to be signaled without timeout.
     *
     * Returns:
     *  `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
     */
    bool wait()
    {
        version (Windows)
        {
            return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
        }
        else version (Posix)
        {
            return wait(Duration.max);
        }
    }
    /**
     * Wait for the event to be signaled with timeout.
     *
     * Params:
     *  tmout = the maximum time to wait
     * Returns:
     *  `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
     *  the event is uninitialized or another error occured
     */
    bool wait(Duration tmout)
    {
        version (Windows)
        {
            if (!m_event)
                return false;
            auto maxWaitMillis = dur!("msecs")(uint.max - 1);
            while (tmout > maxWaitMillis)
            {
                auto res = WaitForSingleObject(m_event, uint.max - 1);
                if (res != WAIT_TIMEOUT)
                    return res == WAIT_OBJECT_0;
                tmout -= maxWaitMillis;
            }
            auto ms = cast(uint)(tmout.total!"msecs");
            return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
        }
        else version (Posix)
        {
            if (!m_initalized)
                return false;
            pthread_mutex_lock(&m_mutex);
            int result = 0;
            if (!m_state)
            {
                if (tmout == Duration.max)
                {
                    result = pthread_cond_wait(&m_cond, &m_mutex);
                }
                else
                {
                    import core.sync.config;
                    timespec t = void;
                    mktspec(t, tmout);
                    result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
                }
            }
            if (result == 0 && !m_manualReset)
                m_state = false;
            pthread_mutex_unlock(&m_mutex);
            return result == 0;
        }
    }
private:
    version (Windows)
    {
        HANDLE m_event;
    }
    else version (Posix)
    {
        pthread_mutex_t m_mutex;
        pthread_cond_t m_cond;
        bool m_initalized;
        bool m_state;
        bool m_manualReset;
    }
}
// Test single-thread (non-shared) use.
@nogc nothrow unittest
{
    // auto-reset, initial state false
    Event ev1 = Event(false, false);
    assert(!ev1.wait(1.dur!"msecs"));
    ev1.set();
    assert(ev1.wait());
    assert(!ev1.wait(1.dur!"msecs"));
    // manual-reset, initial state true
    Event ev2 = Event(true, true);
    assert(ev2.wait());
    assert(ev2.wait());
    ev2.reset();
    assert(!ev2.wait(1.dur!"msecs"));
}
unittest
{
    import core.thread, core.atomic;
    scope event      = new Event(true, false);
    int  numThreads = 10;
    shared int numRunning = 0;
    void testFn()
    {
        event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
        numRunning.atomicOp!"+="(1);
    }
    auto group = new ThreadGroup;
    for (int i = 0; i < numThreads; ++i)
        group.create(&testFn);
    auto start = MonoTime.currTime;
    assert(numRunning == 0);
    event.set();
    group.joinAll();
    assert(numRunning == numThreads);
    assert(MonoTime.currTime - start < 5.dur!"seconds");
}