Keyboard shortcuts

Press ← or β†’ to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Scaling & Finishing Touches

Goal: Learn how to use semaphores to control concurrency.

Source code: chapter7/src/uptimemon.nim

Our app is almost ready to run on production and do regular background URI checks.

However, there's one issue we need to address before we can feed it tens of URIs and wrap it in a while true: we need to limit the number of simultaneous checks. If we don't do that, our app can potentially run out of file descriptors or choke the DNS resolver with 20+ requests.

Instead of simultaneusly launching checks for all URIs in the list, we'll run them in batches of 5, i.e. no more than 5 checks will run at any given moment, keeping resource usage low and under control.

To achieve that, we'll use a semaphoreβ€”an special object that a function must acquire to run and must release after it's finished. A semaphore can be acquired by a fixed number of function at any moment, and this is how it regulates concurrency.

Here's the code with a semaphore and an infinite loop added:

import std/sequtils
import chronos/apps/http/httpclient

const
  maxConcurrency = 5
  ntfyTopic = "<YOUR_NTFY_TOPIC_NAME>"
  uris = @[
    "https://duckduckgo.com/?q=chronos", "https://mock.codes/403",
    "http://10.255.255.1", "https://html.spec.whatwg.org",
    "https://mock.codes/200", "https://github.com", "https://archive.org",
    "https://nim-lang.org", "https://w3.org", "https://free.technology",
    "https://codeberg.org", "https://nimble.directory", "https://status.app",
    "https://keycard.tech", "https://stackoverflow.com", "https://nimbus.team",
    "https://logos.co", "https://forum.nim-lang.org", "https://acid.info",
    "https://vac.dev", "https://expired.badssl.com", "http://10.255.255.2",
    "http://10.255.255.3",
  ]

proc sendAlert(
    session: HttpSessionRef, message: string, priority = 3
) {.async: (raises: [CancelledError]).} =
  let
    headers = {"Title": "Chronos Uptime Monitor", "Priority": $priority}
    body = message.stringToBytes()
    request = HttpClientRequestRef.new(
      session,
      "https://ntfy.sh/" & ntfyTopic,
      meth = MethodPost,
      headers = headers,
      body = body,
    ).valueOr:
      echo "[WRN] Failed to send alert: " & error
      return

  try:
    let response = await request.send().wait(5.seconds)
    await response.closeWait()
  except HttpError, FuturePendingError, AsyncTimeoutError:
    echo "[WRN] Failed to send alert: " & getCurrentExceptionMsg()
  finally:
    await request.closeWait()

proc findMarker(
    bodyReader: HttpBodyReader
): Future[bool] {.async: (raises: [AsyncStreamError, CancelledError]).} =
  const
    marker = "<html"
    readLimit = 10 * 1024

  var
    totalRead = 0
    sample = newString(len(marker) - 1)
    found = false

  proc findMarkerInSample(data: openArray[byte]): (int, bool) =
    if len(data) == 0:
      (0, false)
    else:
      sample = sample[^(len(marker) - 1) .. high(sample)]
      sample &= bytesToString(data)
      found = marker in sample
      totalRead += len(data)
      (len(data), found and totalRead <= readLimit)

  await bodyReader.readMessage(findMarkerInSample)
  found

proc check(
    session: HttpSessionRef, uri: string, semaphore: AsyncSemaphore
) {.async: (raises: [CancelledError]).} =
  await acquire(semaphore)

  defer:
    try:
      release(semaphore)
    except AsyncSemaphoreError:
      echo "Could not release a lock: " & getCurrentExceptionMsg()

  let
    request = HttpClientRequestRef.new(session, uri).valueOr:
      echo "[ERR] " & uri & ": " & error
      return
    response =
      try:
        await request.send().wait(5.seconds)
      except HttpError, AsyncTimeoutError:
        echo "[ERR] " & uri & ": " & getCurrentExceptionMsg()
        return
      finally:
        await request.closeWait()

  try:
    if response.status == 200:
      let
        bodyReader = response.getBodyReader()
        markerFound =
          try:
            await bodyReader.findMarker()
          finally:
            await bodyReader.closeWait()

      if markerFound:
        echo "[OK] " & uri
      else:
        let message = "[NOK] " & uri & ": Not valid HTML"
        echo message
        await session.sendAlert(message)
    else:
      let message = "[NOK] " & uri & ": " & $response.status
      echo message
      await session.sendAlert(message)
  except HttpError, AsyncStreamError:
    let message = "[ERR] " & uri & ": " & getCurrentExceptionMsg()
    echo message
    await session.sendAlert(message, 4)
  finally:
    await response.closeWait()

proc check(uris: seq[string]) {.async: (raises: []).} =
  let
    session = HttpSessionRef.new()
    semaphore = newAsyncSemaphore(maxConcurrency)

  try:
    while true:
      echo "Checking " & $len(uris) & " URIs:"
      let
        futures = uris.mapIt(session.check(it, semaphore))

      try:
        await allFutures(futures)
      except CancelledError:
        await cancelAndWait(futures)
        break

      echo "Done. Next check in 10 seconds."
      try:
        await sleepAsync(10.seconds)
      except CancelledError:
        break
  except CancelledError:
    discard
  finally:
    await session.closeWait()

when isMainModule:
  waitFor check(uris)

Let's see what changed.

const
  maxConcurrency = 5

We define a constant that would determine the capacity of our semaphore.

uris = @[
  "https://duckduckgo.com/?q=chronos", "https://mock.codes/403",
  "http://10.255.255.1", "https://html.spec.whatwg.org",
  "https://mock.codes/200", "https://github.com", "https://archive.org",
  "https://nim-lang.org", "https://w3.org", "https://free.technology",
  "https://codeberg.org", "https://nimble.directory", "https://status.app",
  "https://keycard.tech", "https://stackoverflow.com", "https://nimbus.team",
  "https://logos.co", "https://forum.nim-lang.org", "https://acid.info",
  "https://vac.dev", "https://expired.badssl.com", "http://10.255.255.2",
  "http://10.255.255.3",
]

We've added more URIs to the list to make batching effect visible.

proc check(
    session: HttpSessionRef, uri: string, semaphore: AsyncSemaphore
) {.async: (raises: [CancelledError]).} =
  await acquire(semaphore)

  defer:
    try:
      release(semaphore)
    except AsyncSemaphoreError:
      echo "Could not release a lock: " & getCurrentExceptionMsg()

We've modified check function for a single URI so that it accepts a semaphore (of typeAsyncSemaphore), waits to acquire it, and releases it at the end (we use defer to postpone the release).

With this short addition, we prevent check from running if the semaphore is full.

Because releasing a semaphore can raise a AsyncSemaphoreError and it would happen outside of our managed try block, we wrap the release call in its own try..except block to handle it gracefully and prevent it from bubbling up.

proc check(uris: seq[string]) {.async: (raises: []).} =
  let
    session = HttpSessionRef.new()
    semaphore = newAsyncSemaphore(maxConcurrency)

In the check function for a URI sequence, we create a semaphore of the required capacity.

try:
  while true:

Instead of a one-off launch, we do the checks in an infinite loop. We wrap the entire loop in a try..finally block to ensure the session is always closed when the program stops.

echo "Checking " & $len(uris) & " URIs:"
let
  futures = uris.mapIt(session.check(it, semaphore))

try:
  await allFutures(futures)
except CancelledError:
  await cancelAndWait(futures)
  break

Then we pass the semaphore to check for each URI using mapIt. We also add a try..except CancelledError block around allFutures to ensure that if the program is stopped (e.g. by pressing Ctrl+C), all pending requests are cancelled and cleaned up properly. Note that in this case, we break the loop to finish the execution gracefully.

We've added an echo to denote the start of each cycle.

echo "Done. Next check in 10 seconds."
try:
  await sleepAsync(10.seconds)
except CancelledError:
  break

Finally, print the message to mark the end of a cycle and wait 10 seconds before the next one.

Note

Even though we set the program to wait for 10 seconds before the next check loop, in reality the waiting time will be longer because there is some delay for the system to wake up and resume execution.

This is called drift. For an uptime monitor, this isn't critical but there are cases where you would need to compensate for it.

Run the program and you'll see an even flow of statuses in your terminal.

Important

To stop the program, press Ctrl+C.