Types
ReadMessagePredicate = proc (data: openArray[byte]): tuple[consumed: int, done: bool] {....gcsafe, raises: [].}
- Source Edit
SocketFlags {.pure.} = enum TcpNoDelay, ReuseAddr, ReusePort
- Source Edit
StreamCallback = proc (server: StreamServer; client: StreamTransport) {.async.}
- Connection callback that doesn't check for exceptions at compile time server - StreamServer object. client - accepted client transport. Source Edit
StreamCallback2 = proc (server: StreamServer; client: StreamTransport) {. async: (...raises: []).}
- New remote client connection callback server - StreamServer object. client - accepted client transport. Source Edit
StreamServer = ref object of SocketServer function*: StreamCallback2 init*: TransportInitCallback
- StreamServer object Source Edit
StreamTransport = ref object of RootRef fd*: AsyncFD state: set[TransportState] reader: ReaderFuture buffer: BipBuffer error: ref TransportError queue: Deque[StreamVector] future: Future[void].Raising([]) flags: set[TransportFlags] case kind*: TransportKind of TransportKind.Socket: domain: Domain local: TransportAddress remote: TransportAddress of TransportKind.Pipe: nil of TransportKind.File: nil
- Source Edit
TransportFlags = enum None, WinServerPipe, WinNoPipeFlash, TcpNoDelay, V4Mapped
- Source Edit
TransportInitCallback = proc (server: StreamServer; fd: AsyncFD): StreamTransport {. ...gcsafe, raises: [].}
- Custom transport initialization procedure, which can allocate inherited StreamTransport object. Source Edit
TransportKind {.pure.} = enum Socket, Pipe, File
- Source Edit
Consts
DefaultBacklogSize = 2147483647'i32
- Source Edit
StreamServerTrackerName = "stream.server"
- Source Edit
StreamTransportTrackerName = "stream.transport"
- Source Edit
Procs
proc accept(server: StreamServer): InternalRaisesFuture[StreamTransport, ( TransportUseClosedError, TransportTooManyError, TransportAbortedError, TransportOsError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
- Source Edit
proc atEof(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
- Returns true if transp is at EOF. Source Edit
proc close(server: StreamServer) {....raises: [], tags: [].}
-
Release server resources.
Please note that release of resources is not completed immediately, to be sure all resources got released please use await server.join().
Source Edit proc close(transp: StreamTransport) {....raises: [], tags: [].}
-
Closes and frees resources of transport transp.
Please note that release of resources is not completed immediately, to be sure all resources got released please use await transp.join().
Source Edit proc closed(server: StreamServer): bool {....raises: [], tags: [].}
- Source Edit
proc closed(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
- Returns true if transport in closed state. Source Edit
proc closeWait(server: StreamServer): InternalRaisesFuture[void, void] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
- Close server server and release all resources. Source Edit
proc closeWait(transp: StreamTransport): InternalRaisesFuture[void, void] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
- Close and frees resources of transport transp. Source Edit
proc connect(address: TransportAddress; bufferSize = DefaultStreamBufferSize; child: StreamTransport = nil; flags: set[TransportFlags]; localAddress = TransportAddress(); dualstack = DualStackType.Auto): InternalRaisesFuture[ StreamTransport, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
- Source Edit
proc connect(address: TransportAddress; bufferSize = DefaultStreamBufferSize; child: StreamTransport = nil; localAddress = TransportAddress(); flags: set[SocketFlags] = {}; dualstack = DualStackType.Auto): InternalRaisesFuture[ StreamTransport, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
- Source Edit
proc consume(transp: StreamTransport): InternalRaisesFuture[int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Consume all bytes from transport transp and discard it.
Return number of bytes actually consumed and discarded.
Source Edit proc consume(transp: StreamTransport; n: int): InternalRaisesFuture[int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Consume all bytes (n <= 0) or n bytes from transport transp and discard it.
Return number of bytes actually consumed and discarded.
Source Edit proc createStreamServer(cbproc: StreamCallback2; port: Port; host: Opt[IpAddress] = Opt.none(IpAddress); flags: set[ServerFlags] = {}; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; udata: pointer = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [], tags: [RootEffect].}
-
Create stream server which will be bound to:
- IPv6 address ::, if IPv6 is available
- IPv4 address 0.0.0.0, if IPv6 is not available.
proc createStreamServer(host: TransportAddress; cbproc: StreamCallback2; flags: set[ServerFlags] = {}; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; udata: pointer = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [], tags: [].}
-
Create new TCP stream server.
host - address to which server will be bound. flags - flags to apply to server socket. cbproc - callback function which will be called, when new client connection will be established. sock - user-driven socket to use. backlog - number of outstanding connections in the socket's listen queue. bufferSize - size of internal buffer for transport. child - existing object StreamServerobject to initialize, can be used to initalize StreamServer inherited objects. udata - user-defined pointer.
Source Edit proc createStreamServer(host: TransportAddress; cbproc: StreamCallback; flags: set[ServerFlags] = {}; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; udata: pointer = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}", raises: [], tags: [RootEffect].}
- Source Edit
proc createStreamServer(host: TransportAddress; flags: set[ServerFlags] = {}; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; udata: pointer = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [], tags: [RootEffect].}
- Source Edit
proc createStreamServer(port: Port; host: Opt[IpAddress] = Opt.none(IpAddress); flags: set[ServerFlags] = {}; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; udata: pointer = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [], tags: [RootEffect].}
-
Create stream server which will be bound to:
- IPv6 address ::, if IPv6 is available
- IPv4 address 0.0.0.0, if IPv6 is not available.
proc createStreamServer[T](cbproc: StreamCallback2; port: Port; host: Opt[IpAddress] = Opt.none(IpAddress); flags: set[ServerFlags] = {}; udata: ref T; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [].}
-
Create stream server which will be bound to:
- IPv6 address ::, if IPv6 is available
- IPv4 address 0.0.0.0, if IPv6 is not available.
proc createStreamServer[T](host: TransportAddress; cbproc: StreamCallback2; flags: set[ServerFlags] = {}; udata: ref T; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [].}
- Source Edit
proc createStreamServer[T](host: TransportAddress; cbproc: StreamCallback; flags: set[ServerFlags] = {}; udata: ref T; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}", raises: [].}
- Source Edit
proc createStreamServer[T](host: TransportAddress; flags: set[ServerFlags] = {}; udata: ref T; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [].}
- Source Edit
proc createStreamServer[T](port: Port; host: Opt[IpAddress] = Opt.none(IpAddress); flags: set[ServerFlags] = {}; udata: ref T; sock: AsyncFD = asyncInvalidSocket; backlog: int = DefaultBacklogSize; bufferSize: int = DefaultStreamBufferSize; child: StreamServer = nil; init: TransportInitCallback = nil; dualstack = DualStackType.Auto): StreamServer {. ...raises: [TransportOsError], raises: [].}
-
Create stream server which will be bound to:
- IPv6 address ::, if IPv6 is available
- IPv4 address 0.0.0.0, if IPv6 is not available.
proc failed(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
- Returns true if transport in error state. Source Edit
proc finished(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
- Returns true if transport in finished (EOF) state. Source Edit
proc fromPipe(fd: AsyncFD; child: StreamTransport = nil; bufferSize = DefaultStreamBufferSize): StreamTransport {. ...raises: [TransportOsError], raises: [], tags: [].}
-
Create new transport object using pipe's file descriptor.
bufferSize is size of internal buffer for transport.
Source Edit proc fromPipe2(fd: AsyncFD; child: StreamTransport = nil; bufferSize = DefaultStreamBufferSize): Result[StreamTransport, OSErrorCode] {....raises: [], tags: [].}
-
Create new transport object using pipe's file descriptor.
bufferSize is size of internal buffer for transport.
Source Edit proc getUserData[T](server: StreamServer): T {.inline, ...raises: [].}
- Obtain user data stored in server object. Source Edit
proc join(server: StreamServer): InternalRaisesFuture[void, (CancelledError,)] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
- Source Edit
proc join(transp: StreamTransport): InternalRaisesFuture[void, (CancelledError,)] {. stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
- Source Edit
proc localAddress(server: StreamServer): TransportAddress {....raises: [], tags: [].}
- Returns server bound local socket address. Source Edit
proc localAddress(transp: StreamTransport): TransportAddress {. ...raises: [TransportOsError], raises: [], tags: [].}
- Returns transp remote socket address. Source Edit
proc localAddress2(transp: StreamTransport): Result[TransportAddress, OSErrorCode] {....raises: [], tags: [].}
- Returns transp local socket address. Source Edit
proc read(transp: StreamTransport): InternalRaisesFuture[seq[byte], (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Read all bytes from transport transp.
This procedure allocates buffer seq[byte] and return it as result.
Source Edit proc read(transp: StreamTransport; n: int): InternalRaisesFuture[seq[byte], (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Read all bytes (n <= 0) or exactly n bytes from transport transp.
This procedure allocates buffer seq[byte] and return it as result.
Source Edit proc readExactly(transp: StreamTransport; pbytes: pointer; nbytes: int): InternalRaisesFuture[ void, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Read exactly nbytes bytes from transport transp and store it to pbytes. pbytes must not be nil pointer and nbytes should be Natural.
If nbytes == 0 this operation will return immediately.
If EOF is received and nbytes is not yet read, the procedure will raise TransportIncompleteError, potentially with some bytes already written.
Source Edit proc readLine(transp: StreamTransport; limit = 0; sep = "\r\n"): InternalRaisesFuture[ string, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Read one line from transport transp, where "line" is a sequence of bytes ending with sep (default is "rn").
If EOF is received, and sep was not found, the method will return the partial read bytes.
If the EOF was received and the internal buffer is empty, return an empty string.
If limit more then 0, then read is limited to limit bytes.
Source Edit proc readMessage(transp: StreamTransport; predicate: ReadMessagePredicate): InternalRaisesFuture[ void, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Read all bytes from transport transp until predicate callback will not be satisfied.
predicate callback should return tuple (consumed, result), where consumed is the number of bytes processed and result is a completion flag (true if readMessage() should stop reading data, or false if readMessage() should continue to read data from transport).
predicate callback must copy all the data from data array and return number of bytes it is going to consume. predicate callback will receive (zero-length) openArray, if transport is at EOF.
Source Edit proc readOnce(transp: StreamTransport; pbytes: pointer; nbytes: int): InternalRaisesFuture[ int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Perform one read operation on transport transp.
If internal buffer is not empty, nbytes bytes will be transferred from internal buffer, otherwise it will wait until some bytes will be received.
Source Edit proc readUntil(transp: StreamTransport; pbytes: pointer; nbytes: int; sep: seq[byte]): InternalRaisesFuture[int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
Read data from the transport transp until separator sep is found.
On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.
If EOF is received, and sep was not found, procedure will raise TransportIncompleteError.
If nbytes bytes has been received and sep was not found, procedure will raise TransportLimitError.
Procedure returns actual number of bytes read.
Source Edit proc remoteAddress(transp: StreamTransport): TransportAddress {. ...raises: [TransportOsError], raises: [], tags: [].}
- Returns transp remote socket address. Source Edit
proc remoteAddress2(transp: StreamTransport): Result[TransportAddress, OSErrorCode] {....raises: [], tags: [].}
- Returns transp remote socket address. Source Edit
proc running(transp: StreamTransport): bool {.inline, ...raises: [], tags: [].}
- Returns true if transport is still pending. Source Edit
proc shutdownWait(transp: StreamTransport): InternalRaisesFuture[void, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
- Source Edit
proc start(server: StreamServer) {....raises: [TransportOsError], raises: [], tags: [RootEffect].}
- Starts server. Source Edit
proc start2(server: StreamServer): Result[void, OSErrorCode] {....raises: [], tags: [RootEffect].}
- Starts server. Source Edit
proc stop(server: StreamServer) {....raises: [TransportOsError], raises: [], tags: [].}
- Stops server. Source Edit
proc stop2(server: StreamServer): Result[void, OSErrorCode] {....raises: [], tags: [].}
- Stops server. Source Edit
func toException(v: OSErrorCode): ref TransportOsError {....raises: [], tags: [].}
- Source Edit
proc write(transp: StreamTransport; msg: string; msglen = -1): InternalRaisesFuture[ int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
- Source Edit
proc write(transp: StreamTransport; pbytes: pointer; nbytes: int): InternalRaisesFuture[ int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
- Source Edit
proc write[T](transp: StreamTransport; msg: seq[T]; msglen = -1): InternalRaisesFuture[ int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [].}
- Source Edit
proc writeFile(transp: StreamTransport; handle: int; offset: uint = 0; size: int = 0): InternalRaisesFuture[int, (TransportError, CancelledError)] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [].}
- Source Edit
Templates
template toUnchecked(a: untyped): untyped
- Source Edit