chronos/asyncsync

    Dark Mode
Search:
Group by:
  Source   Edit

This module implements some core synchronization primitives.

Types

AsyncEvent = ref object of RootRef
  flag: bool
  waiters: seq[Future[void].Raising([CancelledError])]

A primitive event object.

An event manages a flag that can be set to true with the fire() procedure and reset to false with the clear() procedure. The wait() coroutine blocks until the flag is false.

If more than one coroutine blocked in wait() waiting for event state to be signaled, when event get fired, then all coroutines continue proceeds in order, they have entered waiting state.

  Source   Edit
AsyncEventQueue[T] = ref object of RootObj
  readers: seq[EventQueueReader]
  queue: Deque[T]
  counter: uint64
  limit: int
  offset: int
  Source   Edit
AsyncLock = ref object of RootRef
  locked: bool
  acquired: bool
  waiters: seq[Future[void].Raising([CancelledError])]

A primitive lock is a synchronization primitive that is not owned by a particular coroutine when locked. A primitive lock is in one of two states, locked or unlocked.

When more than one coroutine is blocked in acquire() waiting for the state to turn to unlocked, only one coroutine proceeds when a release() call resets the state to unlocked; first coroutine which is blocked in acquire() is being processed.

  Source   Edit
AsyncLockError = object of AsyncError
  
AsyncLock is either locked or unlocked.   Source   Edit
AsyncQueue[T] = ref object of RootRef
  getters: seq[Future[void].Raising([CancelledError])]
  putters: seq[Future[void].Raising([CancelledError])]
  queue: Deque[T]
  maxsize: int

A queue, useful for coordinating producer and consumer coroutines.

If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than 0, then "await put()" will block when the queue reaches maxsize, until an item is removed by "await get()".

  Source   Edit
AsyncQueueEmptyError = object of AsyncError
  
AsyncQueue is empty.   Source   Edit
AsyncQueueFullError = object of AsyncError
  
AsyncQueue is full.   Source   Edit
EventQueueKey = distinct uint64
  Source   Edit
EventQueueReader = object
  key: EventQueueKey
  offset: int
  waiter: Future[void].Raising([CancelledError])
  overflow: bool
  Source   Edit

Procs

proc `$`[T](aq: AsyncQueue[T]): string {....raises: [].}
Turn an async queue aq into its string representation.   Source   Edit
proc `[]=`[T](aq: AsyncQueue[T]; i: BackwardsIndex; item: T) {.inline,
    ...raises: [].}
Change the i-th element of aq.   Source   Edit
proc `[]=`[T](aq: AsyncQueue[T]; i: Natural; item: T) {.inline, ...raises: [].}
Change the i-th element of aq.   Source   Edit
proc `[]`[T](aq: AsyncQueue[T]; i: BackwardsIndex): T {.inline, ...raises: [].}
Access the i-th element of aq by order from first to last. aq[0] is the first element, aq[^1] is the last element.   Source   Edit
proc `[]`[T](aq: AsyncQueue[T]; i: Natural): T {.inline, ...raises: [].}
Access the i-th element of aq by order from first to last. aq[0] is the first element, aq[^1] is the last element.   Source   Edit
proc acquire(lock: AsyncLock): InternalRaisesFuture[void, (CancelledError,)] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [],
    tags: [RootEffect].}

Acquire a lock lock.

This procedure blocks until the lock lock is unlocked, then sets it to locked and returns.

  Source   Edit
proc addFirst[T](aq: AsyncQueue[T]; item: T): InternalRaisesFuture[void,
    (CancelledError,)] {.stackTrace: false, ...raises: [], gcsafe, raises: [],
                         raises: [].}
Put an item to the beginning of the queue aq. If the queue is full, wait until a free slot is available before adding item.   Source   Edit
proc addFirstNoWait[T](aq: AsyncQueue[T]; item: T) {.
    ...raises: [AsyncQueueFullError], raises: [].}

Put an item item to the beginning of the queue aq immediately.

If queue aq is full, then AsyncQueueFullError exception raised.

  Source   Edit
proc addLast[T](aq: AsyncQueue[T]; item: T): InternalRaisesFuture[void,
    (CancelledError,)] {.stackTrace: false, ...raises: [], gcsafe, raises: [],
                         raises: [].}
Put an item to the end of the queue aq. If the queue is full, wait until a free slot is available before adding item.   Source   Edit
proc addLastNoWait[T](aq: AsyncQueue[T]; item: T) {.
    ...raises: [AsyncQueueFullError], raises: [].}

Put an item item at the end of the queue aq immediately.

If queue aq is full, then AsyncQueueFullError exception raised.

  Source   Edit
proc clear(event: AsyncEvent) {....raises: [], tags: [].}
Reset the internal flag of event to false. Subsequently, tasks calling wait() will block until fire() is called to set the internal flag to true again.   Source   Edit
proc clear[T](aq: AsyncQueue[T]) {.inline, ...raises: [].}
Clears all elements of queue aq.   Source   Edit
proc close(ab: AsyncEventQueue) {....raises: [], raises: [].}
  Source   Edit
proc closeWait(ab: AsyncEventQueue): InternalRaisesFuture[void, void] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
  Source   Edit
proc contains[T](aq: AsyncQueue[T]; item: T): bool {.inline, ...raises: [].}
Return true if item is in aq or false if not found. Usually used via the in operator.   Source   Edit
proc emit[T](ab: AsyncEventQueue[T]; data: T) {....raises: [].}
  Source   Edit
proc empty[T](aq: AsyncQueue[T]): bool {.inline, ...raises: [].}
Return true if the queue is empty, false otherwise.   Source   Edit
proc fire(event: AsyncEvent) {....raises: [], tags: [].}
Set the internal flag of event to true. All tasks waiting for it to become true are awakened. Task that call wait() once the flag is true will not block at all.   Source   Edit
proc full[T](aq: AsyncQueue[T]): bool {.inline, ...raises: [].}

Return true if there are maxsize items in the queue.

Note: If the aq was initialized with maxsize = 0 (default), then full() is never true.

  Source   Edit
proc get[T](aq: AsyncQueue[T]): InternalRaisesFuture[T, (CancelledError,)] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
  Source   Edit
proc getNoWait[T](aq: AsyncQueue[T]): T {....raises: [AsyncQueueEmptyError],
    raises: [].}
Alias of popFirstNoWait().   Source   Edit
proc isSet(event: AsyncEvent): bool {....raises: [], tags: [].}
Return true if and only if the internal flag of event is true.   Source   Edit
proc len(ab: AsyncEventQueue): int {....raises: [], raises: [].}
  Source   Edit
proc len[T](aq: AsyncQueue[T]): int {.inline, ...raises: [].}
Return the number of elements in aq.   Source   Edit
proc locked(lock: AsyncLock): bool {....raises: [], tags: [].}
Return true if the lock lock is acquired, false otherwise.   Source   Edit
proc newAsyncEvent(): AsyncEvent {....raises: [], tags: [].}

Creates new asyncronous event AsyncEvent.

An event manages a flag that can be set to true with the fire() procedure and reset to false with the clear() procedure. The wait() procedure blocks until the flag is true. The flag is initially false.

  Source   Edit
proc newAsyncEventQueue[T](limitSize = 0): AsyncEventQueue[T] {....raises: [],
    raises: [].}

Creates new AsyncEventBus maximum size of limitSize (default is 0 which means that there no limits).

When number of events emitted exceeds limitSize - emit() procedure will discard new events, consumers which has number of pending events more than limitSize will get AsyncEventQueueFullError error.

  Source   Edit
proc newAsyncLock(): AsyncLock {....raises: [], tags: [].}

Creates new asynchronous lock AsyncLock.

Lock is created in the unlocked state. When the state is unlocked, acquire() changes the state to locked and returns immediately. When the state is locked, acquire() blocks until a call to release() in another coroutine changes it to unlocked.

The release() procedure changes the state to unlocked and returns immediately.

  Source   Edit
proc newAsyncQueue[T](maxsize: int = 0): AsyncQueue[T] {....raises: [].}
Creates a new asynchronous queue AsyncQueue.   Source   Edit
proc popFirst[T](aq: AsyncQueue[T]): InternalRaisesFuture[T, (CancelledError,)] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
Remove and return an item from the beginning of the queue aq. If the queue is empty, wait until an item is available.   Source   Edit
proc popFirstNoWait[T](aq: AsyncQueue[T]): T {....raises: [AsyncQueueEmptyError],
    raises: [].}

Get an item from the beginning of the queue aq immediately.

If queue aq is empty, then AsyncQueueEmptyError exception raised.

  Source   Edit
proc popLast[T](aq: AsyncQueue[T]): InternalRaisesFuture[T, (CancelledError,)] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
Remove and return an item from the end of the queue aq. If the queue is empty, wait until an item is available.   Source   Edit
proc popLastNoWait[T](aq: AsyncQueue[T]): T {....raises: [AsyncQueueEmptyError],
    raises: [].}

Get an item from the end of the queue aq immediately.

If queue aq is empty, then AsyncQueueEmptyError exception raised.

  Source   Edit
proc put[T](aq: AsyncQueue[T]; item: T): InternalRaisesFuture[void,
    (CancelledError,)] {.stackTrace: false, ...raises: [], gcsafe, raises: [],
                         raises: [].}
  Source   Edit
proc putNoWait[T](aq: AsyncQueue[T]; item: T) {....raises: [AsyncQueueFullError],
    raises: [].}
Alias of addLastNoWait().   Source   Edit
proc register(ab: AsyncEventQueue): EventQueueKey {....raises: [], raises: [].}
  Source   Edit
proc release(lock: AsyncLock) {....raises: [AsyncLockError], raises: [], tags: [].}

Release a lock lock.

When the lock is locked, reset it to unlocked, and return. If any other coroutines are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed.

  Source   Edit
proc size[T](aq: AsyncQueue[T]): int {.inline, ...raises: [].}
Return the maximum number of elements in aq.   Source   Edit
proc unregister(ab: AsyncEventQueue; key: EventQueueKey) {....raises: [],
    raises: [].}
  Source   Edit
proc wait(event: AsyncEvent): InternalRaisesFuture[void, (CancelledError,)] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
  Source   Edit
proc waitEvents[T](ab: AsyncEventQueue[T]; key: EventQueueKey; eventsCount = -1): InternalRaisesFuture[
    seq[T], (AsyncEventQueueFullError, CancelledError)] {.stackTrace: false,
    ...raises: [], gcsafe, raises: [], raises: [].}
Wait for events   Source   Edit

Iterators

iterator items[T](aq: AsyncQueue[T]): T {.inline, ...raises: [].}
Yield every element of aq.   Source   Edit
iterator mitems[T](aq: AsyncQueue[T]): var T {.inline, ...raises: [].}
Yield every element of aq.   Source   Edit
iterator pairs[T](aq: AsyncQueue[T]): tuple[key: int, val: T] {.inline,
    ...raises: [].}
Yield every (position, value) of aq.   Source   Edit

Templates

template readerOverflow(ab: AsyncEventQueue; reader: EventQueueReader): bool
  Source   Edit

Exports

setThreadDispatcher, closeSocket, ENOSR, EHOSTUNREACH, EHOSTDOWN, ECONNRESET, EDQUOT, fail, ECANCELED, EMLINK, FuturePendingError, milliseconds, <=, +=, $, asyncTimer, internalRaiseIfError, getSrcLocation, FutureError, fromNow, weeks, ESPIPE, withTimeout, LocationKind, id, <=, FutureCompletedError, ENOEXEC, ESHUTDOWN, fail, EREMOTEIO, toString, EXFULL, waitFor, complete, ==, oneValue, internalInitFutureBase, EPROTONOSUPPORT, Nanosecond, epochNanoSeconds, cancelAndWait, addTimer, Moment, ECOMM, EPROTOTYPE, AsyncExceptionError, milliseconds, getTrackerCounter, ENOENT, millis, removeTimer, FutureFlag, ENOTCONN, EBUSY, init, ENOTUNIQ, days, +=, await, or, ZeroDuration, EKEYREVOKED, AsyncTimeoutError, EUCLEAN, Second, EEXIST, ENOLINK, newInternalRaisesFuture, idleAsync, ==, internalRaiseIfError, SomeIntegerI64, ENOMEM, ENOKEY, +, removeWriter2, ENOMSG, [], trackCounter, EKEYEXPIRED, Week, isCounterLeaked, awaitne, EAFNOSUPPORT, EWOULDBLOCK, EREMOTE, Finished, untrackCounter, AsyncCallback, EHWPOISON, epochSeconds, newDispatcher, EPERM, microseconds, trackerCounters, >=, weeks, Finished, cancelCallback=, ETOOMANYREFS, $, EISCONN, callSoon, ESOCKTNOSUPPORT, setGlobalDispatcher, EXDEV, EBADF, cancelSoon, allFinished, TrackerBase, hours, InternalAsyncCallback, EBFONT, ENETDOWN, EACCES, ELOOP, InfiniteDuration, ETIMEDOUT, >=, EINVAL, value, EBADFD, Future, completed, PDispatcher, ESRCH, EL2NSYNC, EFAULT, low, ESTRPIPE, -=, or, ENOTSOCK, getAsyncTimestamp, await, Hour, EIDRM, removeReader2, secs, getThreadDispatcher, futureContinue, ENOANO, EADV, CallbackFunc, ENFILE, FutureBase, waitFor, high, Microsecond, ENOPKG, ELIBBAD, callSoon, EOWNERDEAD, flags, ERANGE, done, ENONET, completed, callSoon, cancelSoon, EMSGSIZE, EALREADY, EBADMSG, EILSEQ, ENOPROTOOPT, SrcLoc, allFutures, -, complete, FutureState, EBADR, nanos, location, FutureDefect, EDOTDOT, ENOBUFS, EKEYREJECTED, <, Minute, error, ==, one, CancelledError, nanoseconds, EISDIR, EOVERFLOW, FutureSeq, cancelAndWait, micros, all, -, ESRMNT, ENOTEMPTY, EPROTO, TimerCallback, microseconds, error, ENOTRECOVERABLE, wait, callIdle, join, hours, AsyncError, EBADRQC, ENODEV, tryCancel, addTimer, ECHRNG, ENOTDIR, EUNATCH, cancelAndSchedule, failed, nanoseconds, ERESTART, *, ETXTBSY, getTracker, newFutureStr, EMFILE, LocFinishIndex, withTimeout, addTimer, race, EUSERS, ENOTBLK, ENOTTY, EISNAM, ELIBACC, ENAVAIL, init, race, internalCallTick, EDESTADDRREQ, +, cancelSoon, completed, minutes, cancel, FutureFlags, getGlobalDispatcher, ELIBEXEC, stepsAsync, micros, wait, EINPROGRESS, EPFNOSUPPORT, E2BIG, -=, runForever, ENOMEDIUM, secs, value, read, waitFor, EL3HLT, unregisterAndCloseFd, EBADSLT, InternalRaisesFuture, -, <, ENETRESET, fastEpochTime, ENXIO, isZero, EPIPE, allFutures, Day, wait, now, read, asyncSpawn, removeCallback, EAGAIN, raiseOsDefect, trackerCounterKeys, EADDRNOTAVAIL, read, removeCallback, TrackerCounter, async, addCallback, init, async, EREMCHG, finished, addCallback, seconds, noCancel, low, AsyncFD, ECHILD, Raising, init, ECONNABORTED, state, wait, >, callIdle, removeTimer, LocCreateIndex, addReader2, ETIME, ENOLCK, EDEADLK, asyncCheck, Duration, ELNRNG, EDOM, ENOCSI, millis, EBADE, EMEDIUMTYPE, readError, sleepAsync, isInfinite, asyncDiscard, callback=, ENETUNREACH, InternalFutureBase, callIdle, ENOSYS, ELIBSCN, EIO, EROFS, seconds, ELIBMAX, div, init, EL2HLT, EOPNOTSUPP, FutureStr, poll, oneIndex, clearTimer, register2, LocCompleteIndex, days, ENOSTR, EADDRINUSE, one, sleepAsync, ENAMETOOLONG, EMULTIHOP, ESTALE, race, ENODATA, EFBIG, +, minutes, ERFKILL, cancelled, $, toException, join, EDEADLOCK, allFutures, MaxEventsCount, wait, read, raiseAsDefect, ENOSPC, Millisecond, >, setTimer, cancelSoon, unregister2, failed, waitFor, newFutureSeq, addTracker, callback=, orImpl, ECONNREFUSED, high, ENOTNAM, nanos, addWriter2, removeTimer, internalCallTick, EINTR, and, closeHandle, EL3RST, readError, *, newFuture, internalCallTick