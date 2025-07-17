 Skip to content
Cloudflare Docs

Use ReadableStream with Durable Object and Workers

Stream ReadableStream from Durable Objects.

This example demonstrates:

  • A Worker receives a request, and forwards it to a Durable Object my-id.
  • The Durable Object streams an incrementing number every second, until it receives AbortSignal.
  • The Worker reads and logs the values from the stream.
  • The Worker then cancels the stream after 5 values.
import { DurableObject } from "cloudflare:workers";


// Send incremented counter value every second
async function* dataSource(signal) {
  let counter = 0;
  while (!signal.aborted) {
    yield counter++;
    await new Promise((resolve) => setTimeout(resolve, 1_000));
  }


  console.log("Data source cancelled");
}


export class MyDurableObject extends DurableObject {
  async fetch(request) {
    const abortController = new AbortController();


    const stream = new ReadableStream({
      async start(controller) {
        if (request.signal.aborted) {
          controller.close();
          abortController.abort();
          return;
        }


        for await (const value of dataSource(abortController.signal)) {
          controller.enqueue(new TextEncoder().encode(String(value)));
        }
      },
      cancel() {
        console.log("Stream cancelled");
        abortController.abort();
      },
    });


    const headers = new Headers({
      "Content-Type": "application/octet-stream",
    });


    return new Response(stream, { headers });
  }
}


export default {
  async fetch(request, env, ctx) {
    const id = env.MY_DURABLE_OBJECT.idFromName("my-id");
    const stub = env.MY_DURABLE_OBJECT.get(id);
    const response = await stub.fetch(request, { ...request });
    if (!response.ok || !response.body) {
      return new Response("Invalid response", { status: 500 });
    }


    const reader = response.body
      .pipeThrough(new TextDecoderStream())
      .getReader();


    let data = [];
    let i = 0;
    while (true) {
      // Cancel the stream after 5 messages
      if (i > 5) {
        reader.cancel();
        break;
      }
      const { value, done } = await reader.read();


      if (value) {
        console.log(`Got value ${value}`);
        data = [...data, value];
      }


      if (done) {
        break;
      }
      i++;
    }


    return Response.json(data);
  },
};