Implementing Live Visitor Count using Cloudflare Workers & Durable Objects

Arnav Gosain

Arnav Gosain

As a fun experiment, I wanted to add a live visitor count to my blog. I’ve seen this done in a few different ways, most commonly in a server-full way, but I wanted to try implementing it as a serverless function. Unlike other serverless solutions, Cloudflare Workers support WebSockets & provide a neat interface for consistent storage via Durable Objects, so I decided to give it a shot.

PS: If you’re familiar with Cloudflare Workers, you can skip to Implementing the live visitors counter.

The Basics

Wrangler

Cloudflare provides a CLI tool called wrangler to control/deploy to any of their serverless offerings (Workers, D1, etc). Wranger can be installed using npm.

npm i -g wrangler

Scaffolding a Worker

To scaffold a new Worker, run wrangler generate and follow the prompts.

wrangler init my-worker

Wrangler will prompt you to create a package.json file, as well as whether you want to use TypeScript or not and if you want it to setup Jest/Vitest for testing. I recommend saying yes to all of these, as it will make your life easier later.

Wrangler will create the following files:

  • package.json - Self-explanatory.
  • wrangler.toml - Configuration file for the Wrangler
  • src/index.ts - The main entry point for the Worker
  • src/index.test.ts - Vitest test file for the Worker

Creating a basic WebSocket server with Cloudflare Workers

async function handleRequest(request: Request, env: Env) {
  const upgradeHeader = request.headers.get("Upgrade");

  // If the upgrade header is not set, or it's not set to "websocket", return 426
  if (!upgradeHeader || upgradeHeader !== "websocket") {
    return new Response("Expected Upgrade: websocket", { status: 426 });
  }

  const [client, server] = new WebSocketPair();

  server.addEventListener("message", (event) => {
    // Handle messages from the client
  });

  server.addEventListener("close", () => {
    // Handle the client disconnecting
  });

  server.accept();

  return new Response(null, {
    status: 101,
    webSocket: client,
  });
}

export default {
  fetch: handleRequest,
};

On the client side, you can connect to the Worker using the WebSocket constructor.

const socket = new WebSocket("wss://my-worker.example.com");

socket.addEventListener("open", () => {
  console.log("Connected to the Worker");
});

socket.addEventListener("message", () => {
  console.log("Connected to the Worker");
});

socket.addEventListener("close", () => {
  console.log("Disconnected from the Worker");
});

Making a Counter using Durable Objects

On the surface level, a Durable Object is just a class. But an instance of a Durable Object class cannot be created manually, as it needs to be persisted and used in multiple requests.

That’s where the Env object comes in. The Env object is passed to the request & the Durable Object constructor by the Workers runtime. It contains the special properties that let you access your Durable Object in the request handler.

For the runtime to know which Durable Object class to use, you need to register it in the wrangler.toml file.

[durable_objects]
bindings = [{ name = "counter", class_name = "Counter" }]

[[migrations]]
tag = "v1" # Should be unique for each entry
new_classes = ["Counter"]

After which it’s accessible in the Env object as a DurableObjectNamespace.

interface Env {
  counter: DurableObjectNamespace;
}

export class Counter {
  constructor(private state: DurableObjectState, private env: Env) {}

  async decrement() {
    const count = (await this.state.storage.get("count")) ?? 0;
    const newCount = count - 1;
    await this.state.storage.put("count", newCount);
    return newCount;
  }

  async increment() {
    const count = (await this.state.storage.get("count")) ?? 0;
    const newCount = count + 1;
    await this.state.storage.put("count", newCount);
    return newCount;
  }
}

Hooking up our Counter Durable Object with the request handler

A Durable Object class’s methods can’t be called directly as we don’t get access to the instance of the class itself. Instead, Workers runtime gives us to a DurableObjectStub, through which we can pass our request to the Durable Object by calling its fetch method.

So instead of calling counter.increment() directly, we can call await counter.fetch(server). After this, our persisted Durable Object instance will then handle our WebSocket connection.

interface Env {
  counter: DurableObjectNamespace;
}

export class Counter {
  // Code from the previous section
  // ...

  async fetch(request: Request) {
    const [client, server] = new WebSocketPair();

    server.addEventListener("message", async (event) => {
      // Messages are received/sent as strings, so we need to parse it into JSON
      // to use it as an object
      const action = JSON.parse(event.data);

      if (action.type === "increment") {
        const newCount = await this.increment();
        server.send(JSON.stringify({ type: "update/count", count: newCount }));
      } else if (action.type === "decrement") {
        const newCount = await this.decrement();

        server.send(JSON.stringify({ type: "update/count", count: newCount }));
      }
    });

    server.addEventListener("close", async () => {
      // When the client disconnects, we can delete all the data in Durable Object
      // Deleting all data automatically discards the Durable Object instance
      await this.state.storage.deleteAll();
    });

    server.accept();

    return new Response(null, {
      status: 101,
      webSocket: client,
    });
  }
}

async function handleRequest(request: Request, env: Env) {
  // Cloudflare provides the client IP in the 'CF-Connecting-IP' header
  const ip = request.headers.get("CF-Connecting-IP") as string;

  // We'll use the IP as the Durable Object's id
  const counterId = env.counter.idFromName(ip);
  // .get() will fetch the existing Durable Object instance, or create a new one if it doesn't exist
  const counter = env.counter.get(counterId);

  return await counter.fetch(request);
}

export default {
  fetch: handleRequest,
};

Implementing the live visitors counter

Now that we have the basics out of the way, let’s implement the live visitors counter.

So here’s the rundown of how it works:

  • Each client connects to the Worker via a WebSocket
  • When the connection is established, we’ll increment the count in our DurableObject by 1.
  • When the connection is closed, we’ll decrement the count in our DurableObject by 1.
  • Whenever the count changes, the new count will be broadcasted to all connected clients.
  • When all clients disconnect, we’ll delete all the data in the DurableObject, which will discard the DurableObject instance.

Keeping track of connections

Since our DurableObject’s fetch method is called for every WebSocket connection, we can store the WebSocket connection in a Set.

export class Counter {
  // Refer to the previous section for the full code
  // ...

  private conns = new Set<WebSocket>();

  async fetch(request: Request) {
    const upgradeHeader = request.headers.get("Upgrade");

    // If the upgrade header is not set, or it's not set to "websocket", return 426
    if (!upgradeHeader || upgradeHeader !== "websocket") {
      return new Response("Expected Upgrade: websocket", { status: 426 });
    }

    const [client, server] = new WebSocketPair();

    server.addEventListener("message", async (event) => {
      // ...
    });

    server.addEventListener("close", async () => {
      // Remove the session from the Set
      this.conns.delete(server);
    });

    server.accept();

    // Add the session to the Set
    this.conns.add(server);

    return new Response(null, {
      status: 101,
      webSocket: client,
    });
  }
}

Broadcasting a message to all connected clients

export class Counter {
  // Refer to the previous section for the full code
  // ...

  private conns = new Set<WebSocket>();

  async fetch(request: Request) {
    // ...
  }

  private broadcast(message: string) {
    for (const session of this.conns) {
      session.send(message);
    }
  }
}

Now we’ll modify the increment and decrement methods to broadcast the new count to all connected clients

export class Counter {
  // Refer to the previous section for the full code
  // ...

  private conns = new Set<WebSocket>();

  private broadcast(message: string) {
    for (const conn of this.conns) {
      conn.send(message);
    }
  }

  async increment() {
    const count = (await this.state.storage.get("count")) ?? 0;
    const newCount = count + 1;
    await this.state.storage.put("count", newCount);

    // Broadcast the new count to all connected clients
    this.broadcast(JSON.stringify({ type: "update/count", count: newCount }));

    return newCount;
  }

  async decrement() {
    const count = (await this.state.storage.get("count")) ?? 0;
    const newCount = count - 1;
    await this.state.storage.put("count", newCount);

    // Broadcast the new count to all connected clients
    this.broadcast(JSON.stringify({ type: "update/count", count: newCount }));

    return newCount;
  }

  async fetch(request: Request) {
    /* ...*/
  }
}

Discarding closed connections

export class Counter {
  // Refer to the previous section for the full code
  // ...

  private conns = new Set<WebSocket>();

  private broadcast(message: string) {
    for (const conn of this.conns) {
      // Check if the connection is still alive
      try {
        conn.send(message);
      } catch {
        // If the connection is closed, remove it from the Set
        this.conns.delete(session);
      }
    }
  }

  async increment() {
    /* ... */
  }
  async decrement() {
    /* ... */
  }
  async fetch(request: Request) {
    /* ... */
  }
}

Full code

interface Env {
  counter: DurableObjectNamespace;
}

export class Counter {
  private conns = new Set<WebSocket>();

  constructor(private state: DurableObjectState, private env: Env) {}

  private broadcast(message: string) {
    for (const conn of this.conns) {
      // Check if the connection is still alive
      try {
        conn.send(message);
      } catch {
        // If the connection is closed, remove it from the Set
        this.conns.delete(session);
      }
    }
  }

  async increment() {
    const count = (await this.state.storage.get("count")) ?? 0;
    const newCount = count + 1;
    await this.state.storage.put("count", newCount);

    // Broadcast the new count to all connected clients
    this.broadcast(JSON.stringify({ type: "update/count", count: newCount }));

    return newCount;
  }

  async decrement() {
    const count = (await this.state.storage.get("count")) ?? 0;
    const newCount = count - 1;
    await this.state.storage.put("count", newCount);

    // Broadcast the new count to all connected clients
    this.broadcast(JSON.stringify({ type: "update/count", count: newCount }));

    return newCount;
  }

  async fetch(request: Request) {
    const [client, server] = new WebSocketPair();

    server.addEventListener("message", async (event) => {
      // Messages are received/sent as strings, so we need to parse it into JSON
      // to use it as an object
      const action = JSON.parse(event.data);

      if (action.type === "increment") {
        const newCount = await this.increment();
        server.send(JSON.stringify({ type: "update/count", count: newCount }));
      } else if (action.type === "decrement") {
        const newCount = await this.decrement();

        server.send(JSON.stringify({ type: "update/count", count: newCount }));
      }
    });

    server.addEventListener("close", async () => {
      // Remove the session from the Set
      this.conns.delete(server);

      if (this.conns.size === 0) {
        // When the client disconnects, we can delete all the data in Durable Object
        // Deleting all data automatically discards the Durable Object instance
        await this.state.storage.deleteAll();
      }
    });

    server.accept();

    // Add the session to the Set
    this.conns.add(server);

    return new Response(null, {
      status: 101,
      webSocket: client,
    });
  }
}

async function handleRequest(request: Request, env: Env) {
  const upgradeHeader = request.headers.get("Upgrade");

  // If the upgrade header is not set, or it's not set to "websocket", return 426
  if (!upgradeHeader || upgradeHeader !== "websocket") {
    return new Response("Expected Upgrade: websocket", { status: 426 });
  }

  // Since we want all clients to connect to the same Durable Object instance, we'll use static string
  // instead of the client IP
  const counterId = env.counter.idFromName("arnavgosain.com");
  const counter = env.counter.get(counterId);

  return await counter.fetch(request);
}

export default {
  fetch: handleRequest,
};