Scaling & Finishing Touches
Goal: Learn how to use semaphores to control concurrency.
Source code: chapter7/src/uptimemon.nim
Our app is almost ready to run on production and do regular background URI checks.
However, there's one issue we need to address before we can feed it tens of URIs and wrap it in a while true: we need to limit the number of simultaneous checks. If we don't do that, our app can potentially run out of file descriptors or choke the DNS resolver with 20+ requests.
Instead of simultaneusly launching checks for all URIs in the list, we'll run them in batches of 5, i.e. no more than 5 checks will run at any given moment, keeping resource usage low and under control.
To achieve that, we'll use a semaphoreβan special object that a function must acquire to run and must release after it's finished. A semaphore can be acquired by a fixed number of function at any moment, and this is how it regulates concurrency.
Here's the code with a semaphore and an infinite loop added:
import std/sequtils
import chronos/apps/http/httpclient
const
maxConcurrency = 5
ntfyTopic = "<YOUR_NTFY_TOPIC_NAME>"
uris = @[
"https://duckduckgo.com/?q=chronos", "https://mock.codes/403",
"http://10.255.255.1", "https://html.spec.whatwg.org",
"https://mock.codes/200", "https://github.com", "https://archive.org",
"https://nim-lang.org", "https://w3.org", "https://free.technology",
"https://codeberg.org", "https://nimble.directory", "https://status.app",
"https://keycard.tech", "https://stackoverflow.com", "https://nimbus.team",
"https://logos.co", "https://forum.nim-lang.org", "https://acid.info",
"https://vac.dev", "https://expired.badssl.com", "http://10.255.255.2",
"http://10.255.255.3",
]
proc sendAlert(
session: HttpSessionRef, message: string, priority = 3
) {.async: (raises: [CancelledError]).} =
let
headers = {"Title": "Chronos Uptime Monitor", "Priority": $priority}
body = message.stringToBytes()
request = HttpClientRequestRef.new(
session,
"https://ntfy.sh/" & ntfyTopic,
meth = MethodPost,
headers = headers,
body = body,
).valueOr:
echo "[WRN] Failed to send alert: " & error
return
try:
let response = await request.send().wait(5.seconds)
await response.closeWait()
except HttpError, FuturePendingError, AsyncTimeoutError:
echo "[WRN] Failed to send alert: " & getCurrentExceptionMsg()
finally:
await request.closeWait()
proc findMarker(
bodyReader: HttpBodyReader
): Future[bool] {.async: (raises: [AsyncStreamError, CancelledError]).} =
const
marker = "<html"
readLimit = 10 * 1024
var
totalRead = 0
sample = newString(len(marker) - 1)
found = false
proc findMarkerInSample(data: openArray[byte]): (int, bool) =
if len(data) == 0:
(0, false)
else:
sample = sample[^(len(marker) - 1) .. high(sample)]
sample &= bytesToString(data)
found = marker in sample
totalRead += len(data)
(len(data), found and totalRead <= readLimit)
await bodyReader.readMessage(findMarkerInSample)
found
proc check(
session: HttpSessionRef, uri: string, semaphore: AsyncSemaphore
) {.async: (raises: [CancelledError]).} =
await acquire(semaphore)
defer:
try:
release(semaphore)
except AsyncSemaphoreError:
echo "Could not release a lock: " & getCurrentExceptionMsg()
let
request = HttpClientRequestRef.new(session, uri).valueOr:
echo "[ERR] " & uri & ": " & error
return
response =
try:
await request.send().wait(5.seconds)
except HttpError, AsyncTimeoutError:
echo "[ERR] " & uri & ": " & getCurrentExceptionMsg()
return
finally:
await request.closeWait()
try:
if response.status == 200:
let
bodyReader = response.getBodyReader()
markerFound =
try:
await bodyReader.findMarker()
finally:
await bodyReader.closeWait()
if markerFound:
echo "[OK] " & uri
else:
let message = "[NOK] " & uri & ": Not valid HTML"
echo message
await session.sendAlert(message)
else:
let message = "[NOK] " & uri & ": " & $response.status
echo message
await session.sendAlert(message)
except HttpError, AsyncStreamError:
let message = "[ERR] " & uri & ": " & getCurrentExceptionMsg()
echo message
await session.sendAlert(message, 4)
finally:
await response.closeWait()
proc check(uris: seq[string]) {.async: (raises: []).} =
let
session = HttpSessionRef.new()
semaphore = newAsyncSemaphore(maxConcurrency)
try:
while true:
echo "Checking " & $len(uris) & " URIs:"
let
futures = uris.mapIt(session.check(it, semaphore))
try:
await allFutures(futures)
except CancelledError:
await cancelAndWait(futures)
break
echo "Done. Next check in 10 seconds."
try:
await sleepAsync(10.seconds)
except CancelledError:
break
except CancelledError:
discard
finally:
await session.closeWait()
when isMainModule:
waitFor check(uris)
Let's see what changed.
const
maxConcurrency = 5
We define a constant that would determine the capacity of our semaphore.
uris = @[
"https://duckduckgo.com/?q=chronos", "https://mock.codes/403",
"http://10.255.255.1", "https://html.spec.whatwg.org",
"https://mock.codes/200", "https://github.com", "https://archive.org",
"https://nim-lang.org", "https://w3.org", "https://free.technology",
"https://codeberg.org", "https://nimble.directory", "https://status.app",
"https://keycard.tech", "https://stackoverflow.com", "https://nimbus.team",
"https://logos.co", "https://forum.nim-lang.org", "https://acid.info",
"https://vac.dev", "https://expired.badssl.com", "http://10.255.255.2",
"http://10.255.255.3",
]
We've added more URIs to the list to make batching effect visible.
proc check(
session: HttpSessionRef, uri: string, semaphore: AsyncSemaphore
) {.async: (raises: [CancelledError]).} =
await acquire(semaphore)
defer:
try:
release(semaphore)
except AsyncSemaphoreError:
echo "Could not release a lock: " & getCurrentExceptionMsg()
We've modified check function for a single URI so that it accepts a semaphore (of typeAsyncSemaphore), waits to acquire it, and releases it at the end (we use defer to postpone the release).
With this short addition, we prevent check from running if the semaphore is full.
Because releasing a semaphore can raise a AsyncSemaphoreError and it would happen outside of our managed try block, we wrap the release call in its own try..except block to handle it gracefully and prevent it from bubbling up.
proc check(uris: seq[string]) {.async: (raises: []).} =
let
session = HttpSessionRef.new()
semaphore = newAsyncSemaphore(maxConcurrency)
In the check function for a URI sequence, we create a semaphore of the required capacity.
try:
while true:
Instead of a one-off launch, we do the checks in an infinite loop. We wrap the entire loop in a try..finally block to ensure the session is always closed when the program stops.
echo "Checking " & $len(uris) & " URIs:"
let
futures = uris.mapIt(session.check(it, semaphore))
try:
await allFutures(futures)
except CancelledError:
await cancelAndWait(futures)
break
Then we pass the semaphore to check for each URI using mapIt. We also add a try..except CancelledError block around allFutures to ensure that if the program is stopped (e.g. by pressing Ctrl+C), all pending requests are cancelled and cleaned up properly. Note that in this case, we break the loop to finish the execution gracefully.
We've added an echo to denote the start of each cycle.
echo "Done. Next check in 10 seconds."
try:
await sleepAsync(10.seconds)
except CancelledError:
break
Finally, print the message to mark the end of a cycle and wait 10 seconds before the next one.
Even though we set the program to wait for 10 seconds before the next check loop, in reality the waiting time will be longer because there is some delay for the system to wake up and resume execution.
This is called drift. For an uptime monitor, this isn't critical but there are cases where you would need to compensate for it.
Run the program and you'll see an even flow of statuses in your terminal.