This module implements some core synchronization primitives.
Types
AsyncEvent = ref object of RootRef flag: bool waiters: seq[Future[void].Raising([CancelledError])]
-
A primitive event object.
An event manages a flag that can be set to true with the fire() procedure and reset to false with the clear() procedure. The wait() coroutine blocks until the flag is false.
If more than one coroutine blocked in wait() waiting for event state to be signaled, when event get fired, then all coroutines continue proceeds in order, they have entered waiting state.
Source Edit AsyncEventQueue[T] = ref object of RootObj readers: seq[EventQueueReader] queue: Deque[T] counter: uint64 limit: int offset: int
- Source Edit
AsyncEventQueueFullError = object of AsyncError
- Source Edit
AsyncLock = ref object of RootRef locked: bool acquired: bool waiters: seq[Future[void].Raising([CancelledError])]
-
A primitive lock is a synchronization primitive that is not owned by a particular coroutine when locked. A primitive lock is in one of two states, locked or unlocked.
When more than one coroutine is blocked in acquire() waiting for the state to turn to unlocked, only one coroutine proceeds when a release() call resets the state to unlocked; first coroutine which is blocked in acquire() is being processed.
Source Edit AsyncLockError = object of AsyncError
- AsyncLock is either locked or unlocked. Source Edit
AsyncQueue[T] = ref object of RootRef getters: seq[Future[void].Raising([CancelledError])] putters: seq[Future[void].Raising([CancelledError])] queue: Deque[T] maxsize: int
-
A queue, useful for coordinating producer and consumer coroutines.
If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than 0, then "await put()" will block when the queue reaches maxsize, until an item is removed by "await get()".
Source Edit AsyncQueueEmptyError = object of AsyncError
- AsyncQueue is empty. Source Edit
AsyncQueueFullError = object of AsyncError
- AsyncQueue is full. Source Edit
EventQueueKey = distinct uint64
- Source Edit
EventQueueReader = object key: EventQueueKey offset: int waiter: Future[void].Raising([CancelledError]) overflow: bool
- Source Edit
Procs
proc `$`[T](aq: AsyncQueue[T]): string {....raises: [].}
- Turn an async queue aq into its string representation. Source Edit
proc `[]=`[T](aq: AsyncQueue[T]; i: BackwardsIndex; item: T) {.inline, ...raises: [].}
- Change the i-th element of aq. Source Edit
proc `[]=`[T](aq: AsyncQueue[T]; i: Natural; item: T) {.inline, ...raises: [].}
- Change the i-th element of aq. Source Edit
proc `[]`[T](aq: AsyncQueue[T]; i: BackwardsIndex): T {.inline, ...raises: [].}
- Access the i-th element of aq by order from first to last. aq[0] is the first element, aq[^1] is the last element. Source Edit
proc `[]`[T](aq: AsyncQueue[T]; i: Natural): T {.inline, ...raises: [].}
- Access the i-th element of aq by order from first to last. aq[0] is the first element, aq[^1] is the last element. Source Edit
proc acquire(lock: AsyncLock): InternalRaisesFuture[void, (CancelledError,)] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Acquire a lock lock.
This procedure blocks until the lock lock is unlocked, then sets it to locked and returns.
Source Edit proc addFirst[T](aq: AsyncQueue[T]; item: T): InternalRaisesFuture[void, (CancelledError,)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Put an item to the beginning of the queue aq. If the queue is full, wait until a free slot is available before adding item. Source Edit
proc addFirstNoWait[T](aq: AsyncQueue[T]; item: T) {. ...raises: [AsyncQueueFullError], raises: [].}
-
Put an item item to the beginning of the queue aq immediately.
If queue aq is full, then AsyncQueueFullError exception raised.
Source Edit proc addLast[T](aq: AsyncQueue[T]; item: T): InternalRaisesFuture[void, (CancelledError,)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Put an item to the end of the queue aq. If the queue is full, wait until a free slot is available before adding item. Source Edit
proc addLastNoWait[T](aq: AsyncQueue[T]; item: T) {. ...raises: [AsyncQueueFullError], raises: [].}
-
Put an item item at the end of the queue aq immediately.
If queue aq is full, then AsyncQueueFullError exception raised.
Source Edit proc clear(event: AsyncEvent) {....raises: [], tags: [].}
- Reset the internal flag of event to false. Subsequently, tasks calling wait() will block until fire() is called to set the internal flag to true again. Source Edit
proc clear[T](aq: AsyncQueue[T]) {.inline, ...raises: [].}
- Clears all elements of queue aq. Source Edit
proc close(ab: AsyncEventQueue) {....raises: [], raises: [].}
- Source Edit
proc closeWait(ab: AsyncEventQueue): InternalRaisesFuture[void, void] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Source Edit
proc contains[T](aq: AsyncQueue[T]; item: T): bool {.inline, ...raises: [].}
- Return true if item is in aq or false if not found. Usually used via the in operator. Source Edit
proc emit[T](ab: AsyncEventQueue[T]; data: T) {....raises: [].}
- Source Edit
proc empty[T](aq: AsyncQueue[T]): bool {.inline, ...raises: [].}
- Return true if the queue is empty, false otherwise. Source Edit
proc fire(event: AsyncEvent) {....raises: [], tags: [].}
- Set the internal flag of event to true. All tasks waiting for it to become true are awakened. Task that call wait() once the flag is true will not block at all. Source Edit
proc full[T](aq: AsyncQueue[T]): bool {.inline, ...raises: [].}
-
Return true if there are maxsize items in the queue.
Note: If the aq was initialized with maxsize = 0 (default), then full() is never true.
Source Edit proc get[T](aq: AsyncQueue[T]): InternalRaisesFuture[T, (CancelledError,)] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Source Edit
proc getNoWait[T](aq: AsyncQueue[T]): T {....raises: [AsyncQueueEmptyError], raises: [].}
- Alias of popFirstNoWait(). Source Edit
proc isSet(event: AsyncEvent): bool {....raises: [], tags: [].}
- Return true if and only if the internal flag of event is true. Source Edit
proc len(ab: AsyncEventQueue): int {....raises: [], raises: [].}
- Source Edit
proc len[T](aq: AsyncQueue[T]): int {.inline, ...raises: [].}
- Return the number of elements in aq. Source Edit
proc locked(lock: AsyncLock): bool {....raises: [], tags: [].}
- Return true if the lock lock is acquired, false otherwise. Source Edit
proc newAsyncEvent(): AsyncEvent {....raises: [], tags: [].}
-
Creates new asyncronous event AsyncEvent.
An event manages a flag that can be set to true with the fire() procedure and reset to false with the clear() procedure. The wait() procedure blocks until the flag is true. The flag is initially false.
Source Edit proc newAsyncEventQueue[T](limitSize = 0): AsyncEventQueue[T] {....raises: [], raises: [].}
-
Creates new AsyncEventBus maximum size of limitSize (default is 0 which means that there no limits).
When number of events emitted exceeds limitSize - emit() procedure will discard new events, consumers which has number of pending events more than limitSize will get AsyncEventQueueFullError error.
Source Edit proc newAsyncLock(): AsyncLock {....raises: [], tags: [].}
-
Creates new asynchronous lock AsyncLock.
Lock is created in the unlocked state. When the state is unlocked, acquire() changes the state to locked and returns immediately. When the state is locked, acquire() blocks until a call to release() in another coroutine changes it to unlocked.
The release() procedure changes the state to unlocked and returns immediately.
Source Edit proc newAsyncQueue[T](maxsize: int = 0): AsyncQueue[T] {....raises: [].}
- Creates a new asynchronous queue AsyncQueue. Source Edit
proc popFirst[T](aq: AsyncQueue[T]): InternalRaisesFuture[T, (CancelledError,)] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Remove and return an item from the beginning of the queue aq. If the queue is empty, wait until an item is available. Source Edit
proc popFirstNoWait[T](aq: AsyncQueue[T]): T {....raises: [AsyncQueueEmptyError], raises: [].}
-
Get an item from the beginning of the queue aq immediately.
If queue aq is empty, then AsyncQueueEmptyError exception raised.
Source Edit proc popLast[T](aq: AsyncQueue[T]): InternalRaisesFuture[T, (CancelledError,)] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Remove and return an item from the end of the queue aq. If the queue is empty, wait until an item is available. Source Edit
proc popLastNoWait[T](aq: AsyncQueue[T]): T {....raises: [AsyncQueueEmptyError], raises: [].}
-
Get an item from the end of the queue aq immediately.
If queue aq is empty, then AsyncQueueEmptyError exception raised.
Source Edit proc put[T](aq: AsyncQueue[T]; item: T): InternalRaisesFuture[void, (CancelledError,)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Source Edit
proc putNoWait[T](aq: AsyncQueue[T]; item: T) {....raises: [AsyncQueueFullError], raises: [].}
- Alias of addLastNoWait(). Source Edit
proc register(ab: AsyncEventQueue): EventQueueKey {....raises: [], raises: [].}
- Source Edit
proc release(lock: AsyncLock) {....raises: [AsyncLockError], raises: [], tags: [].}
-
Release a lock lock.
When the lock is locked, reset it to unlocked, and return. If any other coroutines are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed.
Source Edit proc size[T](aq: AsyncQueue[T]): int {.inline, ...raises: [].}
- Return the maximum number of elements in aq. Source Edit
proc unregister(ab: AsyncEventQueue; key: EventQueueKey) {....raises: [], raises: [].}
- Source Edit
proc wait(event: AsyncEvent): InternalRaisesFuture[void, (CancelledError,)] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
- Source Edit
proc waitEvents[T](ab: AsyncEventQueue[T]; key: EventQueueKey; eventsCount = -1): InternalRaisesFuture[ seq[T], (AsyncEventQueueFullError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Wait for events Source Edit
Iterators
iterator items[T](aq: AsyncQueue[T]): T {.inline, ...raises: [].}
- Yield every element of aq. Source Edit
iterator mitems[T](aq: AsyncQueue[T]): var T {.inline, ...raises: [].}
- Yield every element of aq. Source Edit
iterator pairs[T](aq: AsyncQueue[T]): tuple[key: int, val: T] {.inline, ...raises: [].}
- Yield every (position, value) of aq. Source Edit
Templates
template readerOverflow(ab: AsyncEventQueue; reader: EventQueueReader): bool
- Source Edit
Exports
-
setThreadDispatcher, closeSocket, ENOSR, EHOSTUNREACH, EHOSTDOWN, ECONNRESET, EDQUOT, fail, ECANCELED, EMLINK, FuturePendingError, milliseconds, <=, +=, $, asyncTimer, internalRaiseIfError, getSrcLocation, FutureError, fromNow, weeks, ESPIPE, withTimeout, LocationKind, id, <=, FutureCompletedError, ENOEXEC, ESHUTDOWN, fail, EREMOTEIO, toString, EXFULL, waitFor, complete, ==, oneValue, internalInitFutureBase, EPROTONOSUPPORT, Nanosecond, epochNanoSeconds, cancelAndWait, addTimer, Moment, ECOMM, EPROTOTYPE, AsyncExceptionError, milliseconds, getTrackerCounter, ENOENT, millis, removeTimer, FutureFlag, ENOTCONN, EBUSY, init, ENOTUNIQ, days, +=, await, or, ZeroDuration, EKEYREVOKED, AsyncTimeoutError, EUCLEAN, Second, EEXIST, ENOLINK, newInternalRaisesFuture, idleAsync, ==, internalRaiseIfError, SomeIntegerI64, ENOMEM, ENOKEY, +, removeWriter2, ENOMSG, [], trackCounter, EKEYEXPIRED, Week, isCounterLeaked, awaitne, EAFNOSUPPORT, EWOULDBLOCK, EREMOTE, Finished, untrackCounter, AsyncCallback, EHWPOISON, epochSeconds, newDispatcher, EPERM, microseconds, trackerCounters, >=, weeks, Finished, cancelCallback=, ETOOMANYREFS, $, EISCONN, callSoon, ESOCKTNOSUPPORT, setGlobalDispatcher, EXDEV, EBADF, cancelSoon, allFinished, TrackerBase, hours, InternalAsyncCallback, EBFONT, ENETDOWN, EACCES, ELOOP, InfiniteDuration, ETIMEDOUT, >=, EINVAL, value, EBADFD, Future, completed, PDispatcher, ESRCH, EL2NSYNC, EFAULT, low, ESTRPIPE, -=, or, ENOTSOCK, getAsyncTimestamp, await, Hour, EIDRM, removeReader2, secs, getThreadDispatcher, futureContinue, ENOANO, EADV, CallbackFunc, ENFILE, FutureBase, waitFor, high, Microsecond, ENOPKG, ELIBBAD, callSoon, EOWNERDEAD, flags, ERANGE, done, ENONET, completed, callSoon, cancelSoon, EMSGSIZE, EALREADY, EBADMSG, EILSEQ, ENOPROTOOPT, SrcLoc, allFutures, -, complete, FutureState, EBADR, nanos, location, FutureDefect, EDOTDOT, ENOBUFS, EKEYREJECTED, <, Minute, error, ==, one, CancelledError, nanoseconds, EISDIR, EOVERFLOW, FutureSeq, cancelAndWait, micros, all, -, ESRMNT, ENOTEMPTY, EPROTO, TimerCallback, microseconds, error, ENOTRECOVERABLE, wait, callIdle, join, hours, AsyncError, EBADRQC, ENODEV, tryCancel, addTimer, ECHRNG, ENOTDIR, EUNATCH, cancelAndSchedule, failed, nanoseconds, ERESTART, *, ETXTBSY, getTracker, newFutureStr, EMFILE, LocFinishIndex, withTimeout, addTimer, race, EUSERS, ENOTBLK, ENOTTY, EISNAM, ELIBACC, ENAVAIL, init, race, internalCallTick, EDESTADDRREQ, +, cancelSoon, completed, minutes, cancel, FutureFlags, getGlobalDispatcher, ELIBEXEC, stepsAsync, micros, wait, EINPROGRESS, EPFNOSUPPORT, E2BIG, -=, runForever, ENOMEDIUM, secs, value, read, waitFor, EL3HLT, unregisterAndCloseFd, EBADSLT, InternalRaisesFuture, -, <, ENETRESET, fastEpochTime, ENXIO, isZero, EPIPE, allFutures, Day, wait, now, read, asyncSpawn, removeCallback, EAGAIN, raiseOsDefect, trackerCounterKeys, EADDRNOTAVAIL, read, removeCallback, TrackerCounter, async, addCallback, init, async, EREMCHG, finished, addCallback, seconds, noCancel, low, AsyncFD, ECHILD, Raising, init, ECONNABORTED, state, wait, >, callIdle, removeTimer, LocCreateIndex, addReader2, ETIME, ENOLCK, EDEADLK, asyncCheck, Duration, ELNRNG, EDOM, ENOCSI, millis, EBADE, EMEDIUMTYPE, readError, sleepAsync, isInfinite, asyncDiscard, callback=, ENETUNREACH, InternalFutureBase, callIdle, ENOSYS, ELIBSCN, EIO, EROFS, seconds, ELIBMAX, div, init, EL2HLT, EOPNOTSUPP, FutureStr, poll, oneIndex, clearTimer, register2, LocCompleteIndex, days, ENOSTR, EADDRINUSE, one, sleepAsync, ENAMETOOLONG, EMULTIHOP, ESTALE, race, ENODATA, EFBIG, +, minutes, ERFKILL, cancelled, $, toException, join, EDEADLOCK, allFutures, MaxEventsCount, wait, read, raiseAsDefect, ENOSPC, Millisecond, >, setTimer, cancelSoon, unregister2, failed, waitFor, newFutureSeq, addTracker, callback=, orImpl, ECONNREFUSED, high, ENOTNAM, nanos, addWriter2, removeTimer, internalCallTick, EINTR, and, closeHandle, EL3RST, readError, *, newFuture, internalCallTick