chronos/transports/stream

    Dark Mode
Search:
Group by:
  Source   Edit

Types

ReadMessagePredicate = proc (data: openArray[byte]): tuple[consumed: int,
    done: bool] {....gcsafe, raises: [].}
  Source   Edit
SocketFlags {.pure.} = enum
  TcpNoDelay, ReuseAddr, ReusePort
  Source   Edit
StreamCallback = proc (server: StreamServer; client: StreamTransport) {.async.}
Connection callback that doesn't check for exceptions at compile time server - StreamServer object. client - accepted client transport.   Source   Edit
StreamCallback2 = proc (server: StreamServer; client: StreamTransport) {.
    async: (...raises: []).}
New remote client connection callback server - StreamServer object. client - accepted client transport.   Source   Edit
StreamServer = ref object of SocketServer
  function*: StreamCallback2
  init*: TransportInitCallback
StreamServer object   Source   Edit
StreamTransport = ref object of RootRef
  fd*: AsyncFD
  state: set[TransportState]
  reader: ReaderFuture
  buffer: BipBuffer
  error: ref TransportError
  queue: Deque[StreamVector]
  future: Future[void].Raising([])
  flags: set[TransportFlags]
  case kind*: TransportKind
  of TransportKind.Socket:
      domain: Domain
      local: TransportAddress
      remote: TransportAddress

  of TransportKind.Pipe:
      nil

  of TransportKind.File:
      nil

  
  Source   Edit
TransportFlags = enum
  None, WinServerPipe, WinNoPipeFlash, TcpNoDelay, V4Mapped
  Source   Edit
TransportInitCallback = proc (server: StreamServer; fd: AsyncFD): StreamTransport {.
    ...gcsafe, raises: [].}
Custom transport initialization procedure, which can allocate inherited StreamTransport object.   Source   Edit
TransportKind {.pure.} = enum
  Socket, Pipe, File
  Source   Edit

Consts

DefaultBacklogSize = 2147483647'i32
  Source   Edit
StreamServerTrackerName = "stream.server"
  Source   Edit
StreamTransportTrackerName = "stream.transport"
  Source   Edit

Procs

proc accept(server: StreamServer): InternalRaisesFuture[StreamTransport, (
    TransportUseClosedError, TransportTooManyError, TransportAbortedError,
    TransportOsError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
    raises: [], raises: [], tags: [RootEffect].}
  Source   Edit
proc atEof(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
Returns true if transp is at EOF.   Source   Edit
proc close(server: StreamServer) {....raises: [], tags: [].}

Release server resources.

Please note that release of resources is not completed immediately, to be sure all resources got released please use await server.join().

  Source   Edit
proc close(transp: StreamTransport) {....raises: [], tags: [].}

Closes and frees resources of transport transp.

Please note that release of resources is not completed immediately, to be sure all resources got released please use await transp.join().

  Source   Edit
proc closed(server: StreamServer): bool {....raises: [], tags: [].}
  Source   Edit
proc closed(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
Returns true if transport in closed state.   Source   Edit
proc closeWait(server: StreamServer): InternalRaisesFuture[void, void] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [],
    tags: [RootEffect].}
Close server server and release all resources.   Source   Edit
proc closeWait(transp: StreamTransport): InternalRaisesFuture[void, void] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [],
    tags: [RootEffect].}
Close and frees resources of transport transp.   Source   Edit
proc connect(address: TransportAddress; bufferSize = DefaultStreamBufferSize;
             child: StreamTransport = nil; flags: set[TransportFlags];
             localAddress = TransportAddress(); dualstack = DualStackType.Auto): InternalRaisesFuture[
    StreamTransport, (TransportError, CancelledError)] {.stackTrace: false,
    ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
  Source   Edit
proc connect(address: TransportAddress; bufferSize = DefaultStreamBufferSize;
             child: StreamTransport = nil; localAddress = TransportAddress();
             flags: set[SocketFlags] = {}; dualstack = DualStackType.Auto): InternalRaisesFuture[
    StreamTransport, (TransportError, CancelledError)] {.stackTrace: false,
    ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
  Source   Edit
proc consume(transp: StreamTransport): InternalRaisesFuture[int,
    (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
                                        raises: [], raises: [],
                                        tags: [RootEffect].}

Consume all bytes from transport transp and discard it.

Return number of bytes actually consumed and discarded.

  Source   Edit
proc consume(transp: StreamTransport; n: int): InternalRaisesFuture[int,
    (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
                                        raises: [], raises: [],
                                        tags: [RootEffect].}

Consume all bytes (n <= 0) or n bytes from transport transp and discard it.

Return number of bytes actually consumed and discarded.

  Source   Edit
proc createStreamServer(cbproc: StreamCallback2; port: Port;
                        host: Opt[IpAddress] = Opt.none(IpAddress);
                        flags: set[ServerFlags] = {};
                        sock: AsyncFD = asyncInvalidSocket;
                        backlog: int = DefaultBacklogSize;
                        bufferSize: int = DefaultStreamBufferSize;
                        child: StreamServer = nil;
                        init: TransportInitCallback = nil; udata: pointer = nil;
                        dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [], tags: [RootEffect].}
Create stream server which will be bound to:
  1. IPv6 address ::, if IPv6 is available
  2. IPv4 address 0.0.0.0, if IPv6 is not available.
  Source   Edit
proc createStreamServer(host: TransportAddress; cbproc: StreamCallback2;
                        flags: set[ServerFlags] = {};
                        sock: AsyncFD = asyncInvalidSocket;
                        backlog: int = DefaultBacklogSize;
                        bufferSize: int = DefaultStreamBufferSize;
                        child: StreamServer = nil;
                        init: TransportInitCallback = nil; udata: pointer = nil;
                        dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [], tags: [].}

Create new TCP stream server.

host - address to which server will be bound. flags - flags to apply to server socket. cbproc - callback function which will be called, when new client connection will be established. sock - user-driven socket to use. backlog - number of outstanding connections in the socket's listen queue. bufferSize - size of internal buffer for transport. child - existing object StreamServerobject to initialize, can be used to initalize StreamServer inherited objects. udata - user-defined pointer.

  Source   Edit
proc createStreamServer(host: TransportAddress; cbproc: StreamCallback;
                        flags: set[ServerFlags] = {};
                        sock: AsyncFD = asyncInvalidSocket;
                        backlog: int = DefaultBacklogSize;
                        bufferSize: int = DefaultStreamBufferSize;
                        child: StreamServer = nil;
                        init: TransportInitCallback = nil; udata: pointer = nil;
                        dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}",
    raises: [], tags: [RootEffect].}
Deprecated: Callback must not raise exceptions, annotate with {.async: (raises: []).}
  Source   Edit
proc createStreamServer(host: TransportAddress; flags: set[ServerFlags] = {};
                        sock: AsyncFD = asyncInvalidSocket;
                        backlog: int = DefaultBacklogSize;
                        bufferSize: int = DefaultStreamBufferSize;
                        child: StreamServer = nil;
                        init: TransportInitCallback = nil; udata: pointer = nil;
                        dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [], tags: [RootEffect].}
  Source   Edit
proc createStreamServer(port: Port; host: Opt[IpAddress] = Opt.none(IpAddress);
                        flags: set[ServerFlags] = {};
                        sock: AsyncFD = asyncInvalidSocket;
                        backlog: int = DefaultBacklogSize;
                        bufferSize: int = DefaultStreamBufferSize;
                        child: StreamServer = nil;
                        init: TransportInitCallback = nil; udata: pointer = nil;
                        dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [], tags: [RootEffect].}
Create stream server which will be bound to:
  1. IPv6 address ::, if IPv6 is available
  2. IPv4 address 0.0.0.0, if IPv6 is not available.
  Source   Edit
proc createStreamServer[T](cbproc: StreamCallback2; port: Port;
                           host: Opt[IpAddress] = Opt.none(IpAddress);
                           flags: set[ServerFlags] = {}; udata: ref T;
                           sock: AsyncFD = asyncInvalidSocket;
                           backlog: int = DefaultBacklogSize;
                           bufferSize: int = DefaultStreamBufferSize;
                           child: StreamServer = nil;
                           init: TransportInitCallback = nil;
                           dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [].}
Create stream server which will be bound to:
  1. IPv6 address ::, if IPv6 is available
  2. IPv4 address 0.0.0.0, if IPv6 is not available.
  Source   Edit
proc createStreamServer[T](host: TransportAddress; cbproc: StreamCallback2;
                           flags: set[ServerFlags] = {}; udata: ref T;
                           sock: AsyncFD = asyncInvalidSocket;
                           backlog: int = DefaultBacklogSize;
                           bufferSize: int = DefaultStreamBufferSize;
                           child: StreamServer = nil;
                           init: TransportInitCallback = nil;
                           dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [].}
  Source   Edit
proc createStreamServer[T](host: TransportAddress; cbproc: StreamCallback;
                           flags: set[ServerFlags] = {}; udata: ref T;
                           sock: AsyncFD = asyncInvalidSocket;
                           backlog: int = DefaultBacklogSize;
                           bufferSize: int = DefaultStreamBufferSize;
                           child: StreamServer = nil;
                           init: TransportInitCallback = nil;
                           dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}",
    raises: [].}
Deprecated: Callback must not raise exceptions, annotate with {.async: (raises: []).}
  Source   Edit
proc createStreamServer[T](host: TransportAddress; flags: set[ServerFlags] = {};
                           udata: ref T; sock: AsyncFD = asyncInvalidSocket;
                           backlog: int = DefaultBacklogSize;
                           bufferSize: int = DefaultStreamBufferSize;
                           child: StreamServer = nil;
                           init: TransportInitCallback = nil;
                           dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [].}
  Source   Edit
proc createStreamServer[T](port: Port;
                           host: Opt[IpAddress] = Opt.none(IpAddress);
                           flags: set[ServerFlags] = {}; udata: ref T;
                           sock: AsyncFD = asyncInvalidSocket;
                           backlog: int = DefaultBacklogSize;
                           bufferSize: int = DefaultStreamBufferSize;
                           child: StreamServer = nil;
                           init: TransportInitCallback = nil;
                           dualstack = DualStackType.Auto): StreamServer {.
    ...raises: [TransportOsError], raises: [].}
Create stream server which will be bound to:
  1. IPv6 address ::, if IPv6 is available
  2. IPv4 address 0.0.0.0, if IPv6 is not available.
  Source   Edit
proc failed(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
Returns true if transport in error state.   Source   Edit
proc finished(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
Returns true if transport in finished (EOF) state.   Source   Edit
proc fromPipe(fd: AsyncFD; child: StreamTransport = nil;
              bufferSize = DefaultStreamBufferSize): StreamTransport {.
    ...raises: [TransportOsError], raises: [], tags: [].}

Create new transport object using pipe's file descriptor.

bufferSize is size of internal buffer for transport.

  Source   Edit
proc fromPipe2(fd: AsyncFD; child: StreamTransport = nil;
               bufferSize = DefaultStreamBufferSize): Result[StreamTransport,
    OSErrorCode] {....raises: [], tags: [].}

Create new transport object using pipe's file descriptor.

bufferSize is size of internal buffer for transport.

  Source   Edit
proc getUserData[T](server: StreamServer): T {.inline, ...raises: [].}
Obtain user data stored in server object.   Source   Edit
proc join(server: StreamServer): InternalRaisesFuture[void, (CancelledError,)] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [],
    tags: [RootEffect].}
  Source   Edit
proc join(transp: StreamTransport): InternalRaisesFuture[void, (CancelledError,)] {.
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [],
    tags: [RootEffect].}
  Source   Edit
proc localAddress(server: StreamServer): TransportAddress {....raises: [], tags: [].}
Returns server bound local socket address.   Source   Edit
proc localAddress(transp: StreamTransport): TransportAddress {.
    ...raises: [TransportOsError], raises: [], tags: [].}
Returns transp remote socket address.   Source   Edit
proc localAddress2(transp: StreamTransport): Result[TransportAddress,
    OSErrorCode] {....raises: [], tags: [].}
Returns transp local socket address.   Source   Edit
proc read(transp: StreamTransport): InternalRaisesFuture[seq[byte],
    (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
                                        raises: [], raises: [],
                                        tags: [RootEffect].}

Read all bytes from transport transp.

This procedure allocates buffer seq[byte] and return it as result.

  Source   Edit
proc read(transp: StreamTransport; n: int): InternalRaisesFuture[seq[byte],
    (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
                                        raises: [], raises: [],
                                        tags: [RootEffect].}

Read all bytes (n <= 0) or exactly n bytes from transport transp.

This procedure allocates buffer seq[byte] and return it as result.

  Source   Edit
proc readExactly(transp: StreamTransport; pbytes: pointer; nbytes: int): InternalRaisesFuture[
    void, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [], tags: [RootEffect].}

Read exactly nbytes bytes from transport transp and store it to pbytes. pbytes must not be nil pointer and nbytes should be Natural.

If nbytes == 0 this operation will return immediately.

If EOF is received and nbytes is not yet read, the procedure will raise TransportIncompleteError, potentially with some bytes already written.

  Source   Edit
proc readLine(transp: StreamTransport; limit = 0; sep = "\r\n"): InternalRaisesFuture[
    string, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [], tags: [RootEffect].}

Read one line from transport transp, where "line" is a sequence of bytes ending with sep (default is "rn").

If EOF is received, and sep was not found, the method will return the partial read bytes.

If the EOF was received and the internal buffer is empty, return an empty string.

If limit more then 0, then read is limited to limit bytes.

  Source   Edit
proc readMessage(transp: StreamTransport; predicate: ReadMessagePredicate): InternalRaisesFuture[
    void, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [], tags: [RootEffect].}

Read all bytes from transport transp until predicate callback will not be satisfied.

predicate callback should return tuple (consumed, result), where consumed is the number of bytes processed and result is a completion flag (true if readMessage() should stop reading data, or false if readMessage() should continue to read data from transport).

predicate callback must copy all the data from data array and return number of bytes it is going to consume. predicate callback will receive (zero-length) openArray, if transport is at EOF.

  Source   Edit
proc readOnce(transp: StreamTransport; pbytes: pointer; nbytes: int): InternalRaisesFuture[
    int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [], tags: [RootEffect].}

Perform one read operation on transport transp.

If internal buffer is not empty, nbytes bytes will be transferred from internal buffer, otherwise it will wait until some bytes will be received.

  Source   Edit
proc readUntil(transp: StreamTransport; pbytes: pointer; nbytes: int;
               sep: seq[byte]): InternalRaisesFuture[int,
    (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
                                        raises: [], raises: [],
                                        tags: [RootEffect].}

Read data from the transport transp until separator sep is found.

On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.

If EOF is received, and sep was not found, procedure will raise TransportIncompleteError.

If nbytes bytes has been received and sep was not found, procedure will raise TransportLimitError.

Procedure returns actual number of bytes read.

  Source   Edit
proc remoteAddress(transp: StreamTransport): TransportAddress {.
    ...raises: [TransportOsError], raises: [], tags: [].}
Returns transp remote socket address.   Source   Edit
proc remoteAddress2(transp: StreamTransport): Result[TransportAddress,
    OSErrorCode] {....raises: [], tags: [].}
Returns transp remote socket address.   Source   Edit
proc running(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
Returns true if transport is still pending.   Source   Edit
proc shutdownWait(transp: StreamTransport): InternalRaisesFuture[void,
    (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
                                        raises: [], raises: [], tags: [].}
  Source   Edit
proc start(server: StreamServer) {....raises: [TransportOsError], raises: [],
                                   tags: [RootEffect].}
Starts server.   Source   Edit
proc start2(server: StreamServer): Result[void, OSErrorCode] {....raises: [],
    tags: [RootEffect].}
Starts server.   Source   Edit
proc stop(server: StreamServer) {....raises: [TransportOsError], raises: [],
                                  tags: [].}
Stops server.   Source   Edit
proc stop2(server: StreamServer): Result[void, OSErrorCode] {....raises: [],
    tags: [].}
Stops server.   Source   Edit
func toException(v: OSErrorCode): ref TransportOsError {....raises: [], tags: [].}
  Source   Edit
proc write(transp: StreamTransport; msg: string; msglen = -1): InternalRaisesFuture[
    int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [], tags: [RootEffect].}
  Source   Edit
proc write(transp: StreamTransport; pbytes: pointer; nbytes: int): InternalRaisesFuture[
    int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [], tags: [].}
  Source   Edit
proc write[T](transp: StreamTransport; msg: seq[T]; msglen = -1): InternalRaisesFuture[
    int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [].}
  Source   Edit
proc writeFile(transp: StreamTransport; handle: int; offset: uint = 0;
               size: int = 0): InternalRaisesFuture[int,
    (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe,
                                        raises: [], raises: [], tags: [].}
  Source   Edit

Templates

template toUnchecked(a: untyped): untyped
  Source   Edit