Skip to content

Changelog

New updates and improvements at Cloudflare.

Pipelines
hero image
  1. Logpush has traditionally been great at delivering Cloudflare logs to a variety of destinations in JSON format. While JSON is flexible and easily readable, it can be inefficient to store and query at scale.

    With this release, you can now send your logs directly to Pipelines to ingest, transform, and store your logs in R2 as Parquet files or Apache Iceberg tables managed by R2 Data Catalog. This makes the data footprint more compact and more efficient at querying your logs instantly with R2 SQL or any other query engine that supports Apache Iceberg or Parquet.

    Transform logs before storage

    Pipelines SQL runs on each log record in-flight, so you can reshape your data before it is written. For example, you can drop noisy fields, redact sensitive values, or derive new columns:

    INSERT INTO http_logs_sink
    SELECT
    ClientIP,
    EdgeResponseStatus,
    to_timestamp_micros(EdgeStartTimestamp) AS event_time,
    upper(ClientRequestMethod) AS method,
    sha256(ClientIP) AS hashed_ip
    FROM http_logs_stream
    WHERE EdgeResponseStatus >= 400;

    Pipelines SQL supports string functions, regex, hashing, JSON extraction, timestamp conversion, conditional expressions, and more. For the full list, refer to the Pipelines SQL reference.

    Get started

    To configure Pipelines as a Logpush destination, refer to Enable Cloudflare Pipelines.

  1. Cloudflare Pipelines ingests streaming data via Workers or HTTP endpoints, transforms it with SQL, and writes it to R2 as Apache Iceberg tables. Today we're shipping three improvements to help you understand why streaming events get dropped, catch data quality issues early, and set up Pipelines faster.

    Dropped event metrics

    When stream events don't match the expected schema, Pipelines accepts them during ingestion but drops them when attempting to deliver them to the sink. To help you identify the root cause of these issues, we are introducing a new dashboard and metrics that surface dropped events with detailed error messages.

    The Errors tab in the Cloudflare dashboard showing deserialization errors grouped by type with individual error details

    Dropped events can also be queried programmatically via the new pipelinesUserErrorsAdaptiveGroups GraphQL dataset. The dataset breaks down failures by specific error type (missing_field, type_mismatch, parse_failure, or null_value) so you can trace issues back to the source.

    query GetPipelineUserErrors(
    $accountTag: String!
    $pipelineId: String!
    $datetimeStart: Time!
    $datetimeEnd: Time!
    ) {
    viewer {
    accounts(filter: { accountTag: $accountTag }) {
    pipelinesUserErrorsAdaptiveGroups(
    limit: 100
    filter: {
    pipelineId: $pipelineId
    datetime_geq: $datetimeStart
    datetime_leq: $datetimeEnd
    }
    orderBy: [count_DESC]
    ) {
    count
    dimensions {
    errorFamily
    errorType
    }
    }
    }
    }
    }

    For the full list of dimensions, error types, and additional query examples, refer to User error metrics.

    Typed Pipelines bindings

    Sending data to a Pipeline from a Worker previously used a generic Pipeline<PipelineRecord> type, which meant schema mismatches (wrong field names, incorrect types) were only caught at runtime as dropped events.

    Running wrangler types now generates schema-specific TypeScript types for your Pipeline bindings. TypeScript catches missing required fields and incorrect field types at compile time, before your code is deployed.

    TypeScript
    declare namespace Cloudflare {
    type EcommerceStreamRecord = {
    user_id: string;
    event_type: string;
    product_id?: string;
    amount?: number;
    };
    interface Env {
    STREAM: import("cloudflare:pipelines").Pipeline<Cloudflare.EcommerceStreamRecord>;
    }
    }

    For more information, refer to Typed Pipeline bindings.

    Improved Pipelines setup

    Setting up a new Pipeline previously required multiple manual steps: creating an R2 bucket, enabling R2 Data Catalog, generating an API token, and configuring format, compression, and rolling policies individually.

    The wrangler pipelines setup command now offers a Simple setup mode that applies recommended defaults and automatically creates the R2 bucket and enables R2 Data Catalog if they do not already exist. Validation errors during setup prompt you to retry inline rather than restarting the entire process.

    For a full walkthrough, refer to the Getting started guide.

  1. Today, we're launching the new Cloudflare Pipelines: a streaming data platform that ingests events, transforms them with SQL, and writes to R2 as Apache Iceberg tables or Parquet files.

    Pipelines can receive events via HTTP endpoints or Worker bindings, transform them with SQL, and deliver to R2 with exactly-once guarantees. This makes it easy to build analytics-ready warehouses for server logs, mobile application events, IoT telemetry, or clickstream data without managing streaming infrastructure.

    For example, here's a pipeline that ingests clickstream events and filters out bot traffic while extracting domain information:

    INSERT into events_table
    SELECT
    user_id,
    lower(event) AS event_type,
    to_timestamp_micros(ts_us) AS event_time,
    regexp_match(url, '^https?://([^/]+)')[1] AS domain,
    url,
    referrer,
    user_agent
    FROM events_json
    WHERE event = 'page_view'
    AND NOT regexp_like(user_agent, '(?i)bot|spider');

    Get started by creating a pipeline in the dashboard or running a single command in Wrangler:

    Terminal window
    npx wrangler pipelines setup

    Check out our getting started guide to learn how to create a pipeline that delivers events to an Iceberg table you can query with R2 SQL. Read more about today's announcement in our blog post.

  1. Cloudflare Pipelines is now available in beta, to all users with a Workers Paid plan.

    Pipelines let you ingest high volumes of real time data, without managing the underlying infrastructure. A single pipeline can ingest up to 100 MB of data per second, via HTTP or from a Worker. Ingested data is automatically batched, written to output files, and delivered to an R2 bucket in your account. You can use Pipelines to build a data lake of clickstream data, or to store events from a Worker.

    Create your first pipeline with a single command:

    Create a pipeline
    $ npx wrangler@latest pipelines create my-clickstream-pipeline --r2-bucket my-bucket
    🌀 Authorizing R2 bucket "my-bucket"
    🌀 Creating pipeline named "my-clickstream-pipeline"
    Successfully created pipeline my-clickstream-pipeline
    Id: 0e00c5ff09b34d018152af98d06f5a1xvc
    Name: my-clickstream-pipeline
    Sources:
    HTTP:
    Endpoint: https://0e00c5ff09b34d018152af98d06f5a1xvc.pipelines.cloudflare.com/
    Authentication: off
    Format: JSON
    Worker:
    Format: JSON
    Destination:
    Type: R2
    Bucket: my-bucket
    Format: newline-delimited JSON
    Compression: GZIP
    Batch hints:
    Max bytes: 100 MB
    Max duration: 300 seconds
    Max records: 100,000
    🎉 You can now send data to your pipeline!
    Send data to your pipeline's HTTP endpoint:
    curl "https://0e00c5ff09b34d018152af98d06f5a1xvc.pipelines.cloudflare.com/" -d '[{ ...JSON_DATA... }]'
    To send data to your pipeline from a Worker, add the following configuration to your config file:
    {
    "pipelines": [
    {
    "pipeline": "my-clickstream-pipeline",
    "binding": "PIPELINE"
    }
    ]
    }

    Head over to our getting started guide for an in-depth tutorial to building with Pipelines.