<page>
---
title: Pipelines · Cloudflare Pipelines Docs
description: Cloudflare Pipelines ingests events, transforms them with SQL, and
  delivers them to R2 as Iceberg tables or as Parquet and JSON files.
lastUpdated: 2026-03-02T15:59:53.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/
  md: https://developers.cloudflare.com/pipelines/index.md
---

Note

Pipelines is in **open beta**, and any developer with a [Workers Paid plan](https://developers.cloudflare.com/workers/platform/pricing/) can start using it. Currently, outside of standard R2 storage and operations, you will not be billed for your use of Pipelines.

Ingest, transform, and load streaming data into Apache Iceberg or Parquet in R2.

Available on Paid plans

Cloudflare Pipelines ingests events, transforms them with SQL, and delivers them to R2 as [Iceberg tables](https://developers.cloudflare.com/r2/data-catalog/) or as Parquet and JSON files.

Whether you're processing server logs, mobile application events, IoT telemetry, or clickstream data, Pipelines provides durable ingestion via HTTP endpoints or Worker bindings, SQL-based transformations, and exactly-once delivery to R2. This makes it easy to build analytics-ready data warehouses and lakehouses without managing streaming infrastructure.

Create your first pipeline by following the [getting started guide](https://developers.cloudflare.com/pipelines/getting-started) or running this [Wrangler](https://developers.cloudflare.com/workers/wrangler/) command:

```sh
npx wrangler pipelines setup
```

***

## Features

### Create your first pipeline

Build your first pipeline to ingest data via HTTP or Workers, apply SQL transformations, and deliver to R2 as Iceberg tables or Parquet files.

[Get started](https://developers.cloudflare.com/pipelines/getting-started/)

### Streams

Durable, buffered queues that receive events via HTTP endpoints or Worker bindings.

[Learn about Streams](https://developers.cloudflare.com/pipelines/streams/)

### Pipelines

Connect streams to sinks with SQL transformations that validate, filter, transform, and enrich your data at ingestion time.

[Learn about Pipelines](https://developers.cloudflare.com/pipelines/pipelines/)

### Sinks

Configure destinations for your data. Write Apache Iceberg tables to R2 Data Catalog or export as Parquet and JSON files.

[Learn about Sinks](https://developers.cloudflare.com/pipelines/sinks/)

***

## Related products

**[R2](https://developers.cloudflare.com/r2/)**

Cloudflare R2 Object Storage allows developers to store large amounts of unstructured data without the costly egress bandwidth fees associated with typical cloud storage services.

**[Workers](https://developers.cloudflare.com/workers/)**

Cloudflare Workers allows developers to build serverless applications and deploy instantly across the globe for exceptional performance, reliability, and scale.

***

## More resources

[Limits](https://developers.cloudflare.com/pipelines/platform/limits/)

Learn about pipelines limits.

[@CloudflareDev](https://x.com/cloudflaredev)

Follow @CloudflareDev on Twitter to learn about product announcements, and what is new in Cloudflare Workers.

[Developer Discord](https://discord.cloudflare.com)

Connect with the Workers community on Discord to ask questions, show what you are building, and discuss the platform with other developers.

</page>

<page>
---
title: 404 - Page Not Found · Cloudflare Pipelines Docs
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/404/
  md: https://developers.cloudflare.com/pipelines/404/index.md
---

# 404

Check the URL, try using our [search](https://developers.cloudflare.com/search/) or try our LLM-friendly [llms.txt directory](https://developers.cloudflare.com/llms.txt).

</page>

<page>
---
title: Getting started · Cloudflare Pipelines Docs
description: Create your first pipeline to ingest streaming data and write to R2
  Data Catalog as an Apache Iceberg table.
lastUpdated: 2026-02-24T14:35:21.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/getting-started/
  md: https://developers.cloudflare.com/pipelines/getting-started/index.md
---

This guide will instruct you through:

* Creating an [API token](https://developers.cloudflare.com/r2/api/tokens/) needed for pipelines to authenticate with your data catalog.
* Creating your first pipeline with a simple ecommerce schema that writes to an [Apache Iceberg](https://iceberg.apache.org/) table managed by R2 Data Catalog.
* Sending sample ecommerce data via HTTP endpoint.
* Validating data in your bucket and querying it with R2 SQL.

## Prerequisites

1. Sign up for a [Cloudflare account](https://dash.cloudflare.com/sign-up/workers-and-pages).
2. Install [`Node.js`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm).

Node.js version manager

Use a Node version manager like [Volta](https://volta.sh/) or [nvm](https://github.com/nvm-sh/nvm) to avoid permission issues and change Node.js versions. [Wrangler](https://developers.cloudflare.com/workers/wrangler/install-and-update/), discussed later in this guide, requires a Node version of `16.17.0` or later.

## 1. Create an API token

Pipelines must authenticate to R2 Data Catalog with an [R2 API token](https://developers.cloudflare.com/r2/api/tokens/) that has catalog and R2 permissions.

1. In the Cloudflare dashboard, go to the **R2 object storage** page.

   [Go to **Overview**](https://dash.cloudflare.com/?to=/:account/r2/overview)

2. Select **Manage API tokens**.

3. Select **Create Account API token**.

4. Give your API token a name.

5. Under **Permissions**, select the **Admin Read & Write** permission.

6. Select **Create Account API Token**.

7. Note the **Token value**.

Note

This token also includes the R2 SQL Read permission, which allows you to query your data with R2 SQL.

## 2. Create your first pipeline

* Wrangler CLI

  First, create a schema file that defines your ecommerce data structure:

  **Create `schema.json`:**

  ```json
  {
    "fields": [
      {
        "name": "user_id",
        "type": "string",
        "required": true
      },
      {
        "name": "event_type",
        "type": "string",
        "required": true
      },
      {
        "name": "product_id",
        "type": "string",
        "required": false
      },
      {
        "name": "amount",
        "type": "float64",
        "required": false
      }
    ]
  }
  ```

  Use the interactive setup to create a pipeline that writes to R2 Data Catalog:

  ```bash
  npx wrangler pipelines setup
  ```

  Note

  The setup command automatically creates the [R2 bucket](https://developers.cloudflare.com/r2/buckets/) and enables [R2 Data Catalog](https://developers.cloudflare.com/r2/data-catalog/) if they do not already exist, so you do not need to create them beforehand.

  Follow the prompts:

  1. **Pipeline name**: Enter `ecommerce`

  2. **Stream configuration**:

     * Enable HTTP endpoint: `yes`
     * Require authentication: `no` (for simplicity)
     * Configure custom CORS origins: `no`
     * Schema definition: `Load from file`
     * Schema file path: `schema.json` (or your file path)

  3. **Sink configuration**:

     * Destination type: `Data Catalog (Iceberg)`
     * Setup mode: `Simple (recommended defaults)`
     * R2 bucket name: `pipelines-tutorial` (created automatically if it does not exist)
     * Table name: `ecommerce`
     * Catalog API token: Enter your token from step 1

  4. **Review**: Confirm the summary and select `Create resources`

  5. **SQL transformation**: Choose `Simple ingestion (SELECT * FROM stream)`

  Note

  If you make a mistake during setup (such as an invalid name or incorrect credentials), you will be prompted to retry rather than needing to restart the entire setup process.

  Advanced mode options

  If you select **Advanced** instead of **Simple** during sink configuration, you can customize the following additional options:

  * **Format**: Output file format (for example, Parquet)
  * **Compression**: Compression algorithm (for example, zstd)
  * **Rolling policy**: File size threshold (minimum 5 MB) and time interval (minimum 10 seconds) for creating new files
  * **Credentials**: Choose between automatic credential generation or manually entering R2 credentials
  * **Namespace**: Data Catalog namespace (defaults to `default`)

  After setup completes, the command outputs a configuration snippet for your Wrangler file, a Worker binding example with sample data, and a curl command for the HTTP endpoint. Note the HTTP endpoint URL and the `pipelines` configuration for use in the following steps.

  You can also pre-set the pipeline name using the `--name` flag:

  ```bash
  npx wrangler pipelines setup --name ecommerce
  ```

* Dashboard

  1. In the Cloudflare dashboard, go to **R2 object storage**.

     [Go to **Overview**](https://dash.cloudflare.com/?to=/:account/r2/overview)

  2. Select **Create bucket** and enter the bucket name: `pipelines-tutorial`.

  3. Select **Create bucket**.

  4. Select the bucket, switch to the **Settings** tab, scroll down to **R2 Data Catalog**, and select **Enable**.

  5. Once enabled, note the **Catalog URI** and **Warehouse name**.

  6. Go to **Pipelines** > **Pipelines**.

     [Go to **Pipelines**](https://dash.cloudflare.com/?to=/:account/pipelines/overview)

  7. Select **Create Pipeline**.

  8. **Connect to a Stream**:

     * Pipeline name: `ecommerce`
     * Enable HTTP endpoint for sending data: Enabled
     * HTTP authentication: Disabled (default)
     * Select **Next**

  9. **Define Input Schema**:

     * Select **JSON editor**

     * Copy in the schema:

       ```json
       {
         "fields": [
           {
             "name": "user_id",
             "type": "string",
             "required": true
           },
           {
             "name": "event_type",
             "type": "string",
             "required": true
           },
           {
             "name": "product_id",
             "type": "string",
             "required": false
           },
           {
             "name": "amount",
             "type": "float64",
             "required": false
           }
         ]
       }
       ```

     * Select **Next**

  10. **Define Sink**:

      * Select your R2 bucket: `pipelines-tutorial`
      * Storage type: **R2 Data Catalog**
      * Namespace: `default`
      * Table name: `ecommerce`
      * **Advanced Settings**: Change **Maximum Time Interval** to `10 seconds`
      * Select **Next**

  11. **Credentials**:

      * Disable **Automatically create an Account API token for your sink**
      * Enter **Catalog Token** from step 1
      * Select **Next**

  12. **Pipeline Definition**:

      * Leave the default SQL query:

        ```sql
        INSERT INTO ecommerce_sink SELECT * FROM ecommerce_stream;
        ```

      * Select **Create Pipeline**

  13. After pipeline creation, note the **Stream ID** for the next step.

## 3. Send sample data

Send ecommerce events to your pipeline's HTTP endpoint:

```bash
curl -X POST https://{stream-id}.ingest.cloudflare.com \
  -H "Content-Type: application/json" \
  -d '[
    {
      "user_id": "user_12345",
      "event_type": "purchase",
      "product_id": "widget-001",
      "amount": 29.99
    },
    {
      "user_id": "user_67890",
      "event_type": "view_product",
      "product_id": "widget-002"
    },
    {
      "user_id": "user_12345",
      "event_type": "add_to_cart",
      "product_id": "widget-003",
      "amount": 15.50
    }
  ]'
```

Replace `{stream-id}` with your actual stream endpoint from the pipeline setup.

## 4. Validate data in your bucket

1. In the Cloudflare dashboard, go to the **R2 object storage** page.

2. Select your bucket: `pipelines-tutorial`.

3. You should see Iceberg metadata files and data files created by your pipeline. If you are not seeing any files in your bucket, wait a couple of minutes and try again.

4. The data is organized in the Apache Iceberg format with metadata tracking table versions.

## 5. Query your data using R2 SQL

Set up your environment to use R2 SQL:

```bash
export WRANGLER_R2_SQL_AUTH_TOKEN=YOUR_API_TOKEN
```

Or create a `.env` file with:

```txt
WRANGLER_R2_SQL_AUTH_TOKEN=YOUR_API_TOKEN
```

Where `YOUR_API_TOKEN` is the token you created in step 1. For more information on setting environment variables, refer to [Wrangler system environment variables](https://developers.cloudflare.com/workers/wrangler/system-environment-variables/).

Query your data:

```bash
npx wrangler r2 sql query "YOUR_WAREHOUSE_NAME" "
SELECT
    user_id,
    event_type,
    product_id,
    amount
FROM default.ecommerce
WHERE event_type = 'purchase'
LIMIT 10"
```

Replace `YOUR_WAREHOUSE_NAME` with the warehouse name noted during pipeline setup. You can find it in the Cloudflare dashboard under **R2 object storage** > your bucket > **Settings** > **R2 Data Catalog**.

You can also query this table with any engine that supports Apache Iceberg. To learn more about connecting other engines to R2 Data Catalog, refer to [Connect to Iceberg engines](https://developers.cloudflare.com/r2/data-catalog/config-examples/).

## Learn more

[Streams ](https://developers.cloudflare.com/pipelines/streams/)Learn about configuring streams for data ingestion.

[Pipelines ](https://developers.cloudflare.com/pipelines/pipelines/)Understand SQL transformations and pipeline configuration.

[Sinks ](https://developers.cloudflare.com/pipelines/sinks/)Configure data destinations and output formats.

</page>

<page>
---
title: Observability · Cloudflare Pipelines Docs
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: true
source_url:
  html: https://developers.cloudflare.com/pipelines/observability/
  md: https://developers.cloudflare.com/pipelines/observability/index.md
---

* [Metrics and analytics](https://developers.cloudflare.com/pipelines/observability/metrics/)

</page>

<page>
---
title: Pipelines · Cloudflare Pipelines Docs
description: Pipelines connect streams and sinks via SQL transformations, which
  can modify events before writing them to storage. This enables you to shift
  left, pushing validation, schematization, and processing to your ingestion
  layer to make your queries easy, fast, and correct.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/pipelines/
  md: https://developers.cloudflare.com/pipelines/pipelines/index.md
---

Pipelines connect [streams](https://developers.cloudflare.com/pipelines/streams/) and [sinks](https://developers.cloudflare.com/pipelines/sinks/) via SQL transformations, which can modify events before writing them to storage. This enables you to shift left, pushing validation, schematization, and processing to your ingestion layer to make your queries easy, fast, and correct.

Pipelines enable you to filter, transform, enrich, and restructure events in real-time as data flows from streams to sinks.

## Learn more

[Manage pipelines ](https://developers.cloudflare.com/pipelines/pipelines/manage-pipelines/)Create, configure, and manage SQL transformations between streams and sinks.

</page>

<page>
---
title: Reference · Cloudflare Pipelines Docs
description: Reference documentation for Cloudflare Pipelines.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: true
source_url:
  html: https://developers.cloudflare.com/pipelines/reference/
  md: https://developers.cloudflare.com/pipelines/reference/index.md
---

[Pipelines](https://developers.cloudflare.com/pipelines/) reference documentation:

* [Legacy pipelines](https://developers.cloudflare.com/pipelines/reference/legacy-pipelines/)
* [Wrangler commands](https://developers.cloudflare.com/pipelines/reference/wrangler-commands/)

</page>

<page>
---
title: Platform · Cloudflare Pipelines Docs
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: true
source_url:
  html: https://developers.cloudflare.com/pipelines/platform/
  md: https://developers.cloudflare.com/pipelines/platform/index.md
---

* [Pricing](https://developers.cloudflare.com/pipelines/platform/pricing/)
* [Limits](https://developers.cloudflare.com/pipelines/platform/limits/)

</page>

<page>
---
title: Sinks · Cloudflare Pipelines Docs
description: Sinks define destinations for your data in Cloudflare Pipelines.
  They support writing to R2 Data Catalog as Apache Iceberg tables or to R2 as
  raw JSON or Parquet files.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sinks/
  md: https://developers.cloudflare.com/pipelines/sinks/index.md
---

Sinks define destinations for your data in Cloudflare Pipelines. They support writing to [R2 Data Catalog](https://developers.cloudflare.com/r2/data-catalog/) as Apache Iceberg tables or to [R2](https://developers.cloudflare.com/r2/) as raw JSON or Parquet files.

Sinks provide exactly-once delivery guarantees, ensuring events are never duplicated or dropped. They can be configured to write files frequently for low-latency ingestion or to write larger, less frequent files for better query performance.

## Learn more

[Manage sinks ](https://developers.cloudflare.com/pipelines/sinks/manage-sinks/)Create, configure, and delete sinks using Wrangler or the API.

[Available sinks ](https://developers.cloudflare.com/pipelines/sinks/available-sinks/)Learn about supported sink destinations and their configuration options.

</page>

<page>
---
title: SQL reference · Cloudflare Pipelines Docs
description: Comprehensive reference for SQL syntax, data types, and functions
  supported in Pipelines.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: true
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/
  md: https://developers.cloudflare.com/pipelines/sql-reference/index.md
---

[Pipelines](https://developers.cloudflare.com/pipelines/) SQL reference documentation:

* [SQL data types](https://developers.cloudflare.com/pipelines/sql-reference/sql-data-types/)
* [SELECT statements](https://developers.cloudflare.com/pipelines/sql-reference/select-statements/)
* [Scalar functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/)

</page>

<page>
---
title: Streams · Cloudflare Pipelines Docs
description: Streams are durable, buffered queues that receive and store events
  for processing in Cloudflare Pipelines. They provide reliable data ingestion
  via HTTP endpoints and Worker bindings, ensuring no data loss even during
  downstream processing delays or failures.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/streams/
  md: https://developers.cloudflare.com/pipelines/streams/index.md
---

Streams are durable, buffered queues that receive and store events for processing in [Cloudflare Pipelines](https://developers.cloudflare.com/pipelines/). They provide reliable data ingestion via HTTP endpoints and Worker bindings, ensuring no data loss even during downstream processing delays or failures.

A single stream can be read by multiple pipelines, allowing you to route the same data to different destinations or apply different transformations. For example, you might send user events to both a real-time analytics pipeline and a data warehouse pipeline.

Streams currently accept events in JSON format and support both structured events with defined schemas and unstructured JSON. When a schema is provided, streams will validate and enforce it for incoming events.

## Learn more

[Manage streams ](https://developers.cloudflare.com/pipelines/streams/manage-streams/)Create, configure, and delete streams using Wrangler or the API.

[Writing to streams ](https://developers.cloudflare.com/pipelines/streams/writing-to-streams/)Send events to streams via HTTP endpoints or Worker bindings.

</page>

<page>
---
title: Metrics and analytics · Cloudflare Pipelines Docs
description: Pipelines expose metrics which allow you to measure data ingested,
  processed, and delivered to sinks.
lastUpdated: 2026-02-24T23:13:58.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/observability/metrics/
  md: https://developers.cloudflare.com/pipelines/observability/metrics/index.md
---

Pipelines expose metrics which allow you to measure data ingested, processed, and delivered to sinks.

The metrics displayed in the [Cloudflare dashboard](https://dash.cloudflare.com/) are queried from Cloudflare's [GraphQL Analytics API](https://developers.cloudflare.com/analytics/graphql-api/). You can access the metrics [programmatically](#query-via-the-graphql-api) via GraphQL or HTTP client.

## Metrics

### Operator metrics

Pipelines export the below metrics within the `pipelinesOperatorAdaptiveGroups` dataset. These metrics track data read and processed by pipeline operators.

| Metric | GraphQL Field Name | Description |
| - | - | - |
| Bytes In | `bytesIn` | Total number of bytes read by the pipeline (filter by `streamId_neq: ""` to get data read from streams) |
| Records In | `recordsIn` | Total number of records read by the pipeline (filter by `streamId_neq: ""` to get data read from streams) |
| Decode Errors | `decodeErrors` | Number of messages that could not be deserialized in the stream schema |

For a detailed breakdown of why events were dropped (including specific error types like `missing_field`, `type_mismatch`, `parse_failure`, and `null_value`), refer to [User error metrics](#user-error-metrics).

The `pipelinesOperatorAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:

* `pipelineId` - ID of the pipeline
* `streamId` - ID of the source stream
* `datetime` - Timestamp of the operation
* `date` - Timestamp of the operation, truncated to the start of a day
* `datetimeHour` - Timestamp of the operation, truncated to the start of an hour

### Sink metrics

Pipelines export the below metrics within the `pipelinesSinkAdaptiveGroups` dataset. These metrics track data delivery to sinks.

| Metric | GraphQL Field Name | Description |
| - | - | - |
| Bytes Written | `bytesWritten` | Total number of bytes written to the sink, after compression |
| Records Written | `recordsWritten` | Total number of records written to the sink |
| Files Written | `filesWritten` | Number of files written to the sink |
| Row Groups Written | `rowGroupsWritten` | Number of row groups written (for Parquet files) |
| Uncompressed Bytes Written | `uncompressedBytesWritten` | Total number of bytes written before compression |

The `pipelinesSinkAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:

* `pipelineId` - ID of the pipeline
* `sinkId` - ID of the destination sink
* `datetime` - Timestamp of the operation
* `date` - Timestamp of the operation, truncated to the start of a day
* `datetimeHour` - Timestamp of the operation, truncated to the start of an hour

### User error metrics

Pipelines track events that are dropped during processing due to deserialization errors. When a structured stream receives events that do not match its defined schema, those events are accepted during ingestion but dropped during processing. The `pipelinesUserErrorsAdaptiveGroups` dataset provides visibility into these dropped events, telling you which events were dropped and why. You can explore the full schema of this dataset using GraphQL [introspection](https://developers.cloudflare.com/analytics/graphql-api/features/discovery/introspection/).

| Metric | GraphQL Field Name | Description |
| - | - | - |
| Count | `count` | Number of events that failed validation |

The `pipelinesUserErrorsAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:

* `pipelineId` - ID of the pipeline
* `errorFamily` - Category of the error (for example, `deserialization`)
* `errorType` - Specific error type within the family
* `date` - Date of the error, truncated to start of day
* `datetime` - Timestamp of the error
* `datetimeHour` - Timestamp of the error, truncated to the start of an hour
* `datetimeMinute` - Timestamp of the error, truncated to the start of a minute

#### Known error types

| Error family | Error type | Description |
| - | - | - |
| `deserialization` | `missing_field` | A required field defined in the stream schema was not present in the event |
| `deserialization` | `type_mismatch` | A field value did not match the expected type in the schema (for example, string sent where number expected) |
| `deserialization` | `parse_failure` | The event could not be parsed as valid JSON, or a field value could not be parsed into the expected type |
| `deserialization` | `null_value` | A required field was present but had a null value |

Note

To prevent incorrect data from being ingested in the first place, consider using [typed pipeline bindings](https://developers.cloudflare.com/pipelines/streams/writing-to-streams/#typed-pipeline-bindings) to catch schema violations at compile time.

## View metrics and errors in the dashboard

Per-pipeline analytics are available in the Cloudflare dashboard. To view current and historical metrics for a pipeline:

1. Log in to the [Cloudflare dashboard](https://dash.cloudflare.com) and select your account.
2. Go to **Pipelines** > **Pipelines**.
3. Select a pipeline.
4. Go to the **Metrics** tab to view its metrics or **Errors** tab to view dropped events.

You can optionally select a time window to query. This defaults to the last 24 hours.

## Query via the GraphQL API

You can programmatically query analytics for your pipelines via the [GraphQL Analytics API](https://developers.cloudflare.com/analytics/graphql-api/). This API queries the same datasets as the Cloudflare dashboard and supports GraphQL [introspection](https://developers.cloudflare.com/analytics/graphql-api/features/discovery/introspection/).

Pipelines GraphQL datasets require an `accountTag` filter with your Cloudflare account ID.

### Measure operator metrics over time period

This query returns the total bytes and records read by a pipeline from streams, along with any decode errors.

```graphql
query PipelineOperatorMetrics(
  $accountTag: String!
  $pipelineId: String!
  $datetimeStart: Time!
  $datetimeEnd: Time!
) {
  viewer {
    accounts(filter: { accountTag: $accountTag }) {
      pipelinesOperatorAdaptiveGroups(
        limit: 10000
        filter: {
          pipelineId: $pipelineId
          streamId_neq: ""
          datetime_geq: $datetimeStart
          datetime_leq: $datetimeEnd
        }
      ) {
        sum {
          bytesIn
          recordsIn
          decodeErrors
        }
      }
    }
  }
}
```

[Run in GraphQL API Explorer](https://graphql.cloudflare.com/explorer?query=I4VwpgTgngBACgSwA5gDYIHZgPIogQwBcB7CAWTEIgQGMBnACgCgYYASfGm4kDQgFXwBzAFwwAylUxCAhC3ZJkaTGACSAEzGTqGWfLbqilBAFswk-BEJj+psHNYGjhOwFEMmmLbNyAlDABveQA3BDAAd0hA+VZObl5CRgAzBFRCSDEAmDiePkFRdhyE-JgAX38g1iqYRRR0LDpcSCJSAEFDJBdgsABxCB4kRhjqmHQTBGsYAEYABjmZ4eqUtIzokZHa5SwNMTZN+rV1RZG6KjB8Ew0AfSxgMQAie+Pqw3SXMyuhMDv2V+MzCxWZ5VP7vMBXVDfXagtweYGlY4VYF0EAmNbrKoAIyg6ToqgwwNYEDA3Ag6jxBIxIJJxHUYFcEH6EDo8OOCOq7LKTFKQA\&variables=N4IghgxhD2CuB2AXAKmA5iAXCAggYTwHkBVAOWQH0BJAERABoQAHASyYFMAbF+dqgEywgASgFEACgBl8oigHUqyABLU6jfmETtELALbsAyojAAnREIBMABgsA2ALRWAzPYCMr5K9uYArBcxOACwAWgwgGlo6+qLwgtjWdo4u7sgW-n4BISAAvkA)

### Measure sink delivery metrics

This query returns detailed metrics about data written to a specific sink, including file and compression statistics.

```graphql
query PipelineSinkMetrics(
  $accountTag: String!
  $pipelineId: String!
  $sinkId: String!
  $datetimeStart: Time!
  $datetimeEnd: Time!
) {
  viewer {
    accounts(filter: { accountTag: $accountTag }) {
      pipelinesSinkAdaptiveGroups(
        limit: 10000
        filter: {
          pipelineId: $pipelineId
          sinkId: $sinkId
          datetime_geq: $datetimeStart
          datetime_leq: $datetimeEnd
        }
      ) {
        sum {
          bytesWritten
          recordsWritten
          filesWritten
          rowGroupsWritten
          uncompressedBytesWritten
        }
      }
    }
  }
}
```

[Run in GraphQL API Explorer](https://graphql.cloudflare.com/explorer?query=I4VwpgTgngBACgSwA5gDYIHZgMqYNYCyYALhAgMYDOAFAFAwwAkAhueQPYgbEAqzA5gC4Y2Upn4BCekyTI0mMAEkAJsNFkMk6Y0r4VasZqkNGy5sRIIAtjmLMIxYT2thjTMxeIuAohlUxnGykAShgAb2kANwQwAHdIcOkGVg4uYhoAMwRUCwhhMJgUzm4+ISYitNKYAF9QiIYGmFkUdCxKXAw8AEEzJC9IsABxCE4kGiTGmHQrBEcYAEYABmXFicasnMh8tcnm+Sx9GTlWpWUdxt1Ow509M8nJj0sbAH1+MGBhU3Mn23tic4ajy8L1Q70+QJ8fgB1R2dQBlBAVkS90aACMoBZKAB1MjECwYAEMCBgDgQZTY3H4wkwDZgCmzKkohojWLDUb0vFgAlMhhcDhWJDEyiUMDKABCGLpOIZXOhOxhjQVNVo1SAA\&variables=N4IghgxhD2CuB2AXAKmA5iAXCAggYTwHkBVAOWQH0BJAERABoQAHASyYFMAbF+dqgEywgASgFEACgBl8oigHUqyABLU6jAM48A1gKFipM+YpW0GIfmETtELALbsAyojAAnREIBMABg8A2ALReAMz+AIyhyKG+mACsHpgALKEAWmYWVjb2ovCC2N5+gSHhyB7xcYkpIAC+QA)

### Query dropped event errors

This query returns a summary of events that were dropped due to schema validation failures, grouped by error type and ordered by frequency.

```graphql
query GetPipelineUserErrors(
  $accountTag: String!
  $pipelineId: String!
  $datetimeStart: Time!
  $datetimeEnd: Time!
) {
  viewer {
    accounts(filter: { accountTag: $accountTag }) {
      pipelinesUserErrorsAdaptiveGroups(
        limit: 100
        filter: {
          pipelineId: $pipelineId
          datetime_geq: $datetimeStart
          datetime_leq: $datetimeEnd
        }
        orderBy: [count_DESC]
      ) {
        count
        dimensions {
          date
          errorFamily
          errorType
        }
      }
    }
  }
}
```

[Run in GraphQL API Explorer](https://graphql.cloudflare.com/explorer?query=I4VwpgTgngBA4mALgBQJYAcwBtUDswCqAzpAKIQQD2ERAFAFAwwAkAhgMbuUi6IAqrAOYAuGAGVEEPIICEjFugzY8YAJIATURKm5Z85utaIkqALZgJrCIlF8zYOUwNGT50rk0w75uQEoYAN7yAG6oYADukIHyTBxcPIh0AGaoWMYQogEwcdy8AiIsOQn5MAC+-kFMVTCKmDj4RMRkFNREAIKG6IiowWBwVCDodDHVMDimqDYwAIwADLMj1SlpkJmLo7XK+BqizJv1aurr1YbG3eYA+oJgwLunrhaIVojHVffnYBdYN3cuH+5HUZVUqvajqSAAISgogA2vFeBcACKkMQAYQAuusKq94S8gUx1PZcERUJRidF8W8XK8mJAqBAAGKsCZYKA0mB06h8KCYV4goH84HyEGlIA\&variables=N4IghgxhD2CuB2AXAKmA5iAXCAggYTwHkBVAOWQH0BJAERABoQAHASyYFMAbF+dqgEywgASgFEACgBl8oigHUqyABLU6jfmETtELALbsAyojAAnREIBMABgsA2ALRWAzPYCMr5K9uYArBcwALADsAFoMIBpaOvqi8ILY1naOLu7IFv5+gaEgAL5AA)

Example response:

```json
{
  "data": {
    "viewer": {
      "accounts": [
        {
          "pipelinesUserErrorsAdaptiveGroups": [
            {
              "count": 679,
              "dimensions": {
                "date": "2026-02-19",
                "errorFamily": "deserialization",
                "errorType": "missing_field"
              }
            },
            {
              "count": 392,
              "dimensions": {
                "date": "2026-02-19",
                "errorFamily": "deserialization",
                "errorType": "type_mismatch"
              }
            },
            {
              "count": 363,
              "dimensions": {
                "date": "2026-02-19",
                "errorFamily": "deserialization",
                "errorType": "parse_failure"
              }
            },
            {
              "count": 44,
              "dimensions": {
                "date": "2026-02-19",
                "errorFamily": "deserialization",
                "errorType": "null_value"
              }
            }
          ]
        }
      ]
    }
  },
  "errors": null
}
```

You can filter by a specific error type by adding `errorType` to the filter:

```graphql
pipelinesUserErrorsAdaptiveGroups(
  limit: 100
  filter: {
    pipelineId: $pipelineId
    datetime_geq: $datetimeStart
    datetime_leq: $datetimeEnd
    errorType: "type_mismatch"
  }
  orderBy: [count_DESC]
)
```

To query errors across all pipelines on an account, omit the `pipelineId` filter and include `pipelineId` in the dimensions:

```graphql
pipelinesUserErrorsAdaptiveGroups(
  limit: 100
  filter: {
    datetime_geq: $datetimeStart
    datetime_leq: $datetimeEnd
  }
  orderBy: [count_DESC]
) {
  count
  dimensions {
    pipelineId
    errorFamily
    errorType
  }
}
```

Note

In addition to `pipelinesUserErrorsAdaptiveGroups`, you can also query the `pipelinesUserErrorsAdaptive` dataset, which provides detailed error descriptions within the last 24 hours. Be aware that querying this dataset may return a large volume of data if your pipeline processes many events.

</page>

<page>
---
title: Manage pipelines · Cloudflare Pipelines Docs
description: Create, configure, and manage SQL transformations between streams and sinks
lastUpdated: 2025-11-17T14:08:01.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/pipelines/manage-pipelines/
  md: https://developers.cloudflare.com/pipelines/pipelines/manage-pipelines/index.md
---

Learn how to:

* Create pipelines with SQL transformations
* View pipeline configuration and SQL
* Delete pipelines when no longer needed

## Create a pipeline

Pipelines execute SQL statements that define how data flows from streams to sinks.

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.

   [Go to **Pipelines**](https://dash.cloudflare.com/?to=/:account/pipelines/overview)

2. Select **Create Pipeline** to launch the pipeline creation wizard.

3. Follow the wizard to configure your stream, sink, and SQL transformation.

### Wrangler CLI

To create a pipeline, run the [`pipelines create`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-create) command:

```bash
npx wrangler pipelines create my-pipeline \
  --sql "INSERT INTO my_sink SELECT * FROM my_stream"
```

You can also provide SQL from a file:

```bash
npx wrangler pipelines create my-pipeline \
  --sql-file pipeline.sql
```

Alternatively, to use the interactive setup wizard that helps you configure a stream, sink, and pipeline, run the [`pipelines setup`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-setup) command:

```bash
npx wrangler pipelines setup
```

### SQL transformations

Pipelines support SQL statements for data transformation. For complete syntax, supported functions, and data types, see the [SQL reference](https://developers.cloudflare.com/pipelines/sql-reference/).

Common patterns include:

#### Basic data flow

Transfer all data from stream to sink:

```sql
INSERT INTO my_sink SELECT * FROM my_stream
```

#### Filtering events

Filter events based on conditions:

```sql
INSERT INTO my_sink
SELECT * FROM my_stream
WHERE event_type = 'purchase' AND amount > 100
```

#### Selecting specific fields

Choose only the fields you need:

```sql
INSERT INTO my_sink
SELECT user_id, event_type, timestamp, amount
FROM my_stream
```

#### Transforming data

Apply transformations to fields:

```sql
INSERT INTO my_sink
SELECT
  user_id,
  UPPER(event_type) as event_type,
  timestamp,
  amount * 1.1 as amount_with_tax
FROM my_stream
```

## View pipeline configuration

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.

2. Select a pipeline to view its SQL transformation, connected streams/sinks, and associated metrics.

### Wrangler CLI

To view a specific pipeline, run the [`pipelines get`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-get) command:

```bash
npx wrangler pipelines get <PIPELINE_ID>
```

To list all pipelines in your account, run the [`pipelines list`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-list) command:

```bash
npx wrangler pipelines list
```

## Delete a pipeline

Deleting a pipeline stops data flow from the connected stream to sink.

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.

2. Select the pipeline you want to delete. 3. In the **Settings** tab, and select **Delete**.

### Wrangler CLI

To delete a pipeline, run the [`pipelines delete`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-delete) command:

```bash
npx wrangler pipelines delete <PIPELINE_ID>
```

Warning

Deleting a pipeline immediately stops data flow between the stream and sink.

## Limitations

Pipeline SQL cannot be modified after creation. To change the SQL transformation, you must delete and recreate the pipeline.

</page>

<page>
---
title: Legacy pipelines · Cloudflare Pipelines Docs
description: Legacy pipelines, those created before September 25, 2025 via the
  legacy API, are on a deprecation path.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/reference/legacy-pipelines/
  md: https://developers.cloudflare.com/pipelines/reference/legacy-pipelines/index.md
---

Legacy pipelines, those created before September 25, 2025 via the legacy API, are on a deprecation path.

To check if your pipelines are legacy pipelines, view them in the dashboard under **Pipelines** > **Pipelines** or run the [`pipelines list`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-list) command in [Wrangler](https://developers.cloudflare.com/workers/wrangler/). Legacy pipelines are labeled "legacy" in both locations.

New pipelines offer SQL transformations, multiple output formats, and improved architecture.

## Notable changes

* New pipelines support SQL transformations for data processing.
* New pipelines write to JSON, Parquet, and Apache Iceberg formats instead of JSON only.
* New pipelines separate streams, pipelines, and sinks into distinct resources.
* New pipelines support optional structured schemas with validation.
* New pipelines offer configurable rolling policies and customizable partitioning.

## Moving to new pipelines

Legacy pipelines will continue to work until Pipelines is Generally Available, but new features and improvements are only available in the new pipeline architecture. To migrate:

1. Create a new pipeline using the interactive setup:

   ```bash
   npx wrangler pipelines setup
   ```

2. Configure your new pipeline with the desired streams, SQL transformations, and sinks.

3. Update your applications to send data to the new stream endpoints.

4. Once verified, delete your legacy pipeline.

For detailed guidance, refer to the [getting started guide](https://developers.cloudflare.com/pipelines/getting-started/).

</page>

<page>
---
title: Wrangler commands · Cloudflare Pipelines Docs
description: Interactive setup for a complete pipeline
lastUpdated: 2025-11-13T15:25:17.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/reference/wrangler-commands/
  md: https://developers.cloudflare.com/pipelines/reference/wrangler-commands/index.md
---

## `pipelines setup`

Interactive setup for a complete pipeline

* npm

  ```sh
  npx wrangler pipelines setup
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines setup
  ```

* yarn

  ```sh
  yarn wrangler pipelines setup
  ```

- `--name` string

  Pipeline name

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines create`

Create a new pipeline

* npm

  ```sh
  npx wrangler pipelines create [PIPELINE]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines create [PIPELINE]
  ```

* yarn

  ```sh
  yarn wrangler pipelines create [PIPELINE]
  ```

- `[PIPELINE]` string required

  The name of the pipeline to create

- `--sql` string

  Inline SQL query for the pipeline

- `--sql-file` string

  Path to file containing SQL query for the pipeline

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines list`

List all pipelines

* npm

  ```sh
  npx wrangler pipelines list
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines list
  ```

* yarn

  ```sh
  yarn wrangler pipelines list
  ```

- `--page` number default: 1

  Page number for pagination

- `--per-page` number default: 20

  Number of pipelines per page

- `--json` boolean default: false

  Output in JSON format

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines get`

Get details about a specific pipeline

* npm

  ```sh
  npx wrangler pipelines get [PIPELINE]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines get [PIPELINE]
  ```

* yarn

  ```sh
  yarn wrangler pipelines get [PIPELINE]
  ```

- `[PIPELINE]` string required

  The ID of the pipeline to retrieve

- `--json` boolean default: false

  Output in JSON format

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines update`

Update a pipeline configuration (legacy pipelines only)

* npm

  ```sh
  npx wrangler pipelines update [PIPELINE]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines update [PIPELINE]
  ```

* yarn

  ```sh
  yarn wrangler pipelines update [PIPELINE]
  ```

- `[PIPELINE]` string required

  The name of the legacy pipeline to update

- `--source` array

  Space separated list of allowed sources. Options are 'http' or 'worker'

- `--require-http-auth` boolean

  Require Cloudflare API Token for HTTPS endpoint authentication

- `--cors-origins` array

  CORS origin allowlist for HTTP endpoint (use \* for any origin). Defaults to an empty array

- `--batch-max-mb` number

  Maximum batch size in megabytes before flushing. Defaults to 100 MB if unset. Minimum: 1, Maximum: 100

- `--batch-max-rows` number

  Maximum number of rows per batch before flushing. Defaults to 10,000,000 if unset. Minimum: 100, Maximum: 10,000,000

- `--batch-max-seconds` number

  Maximum age of batch in seconds before flushing. Defaults to 300 if unset. Minimum: 1, Maximum: 300

- `--r2-bucket` string

  Destination R2 bucket name

- `--r2-access-key-id` string

  R2 service Access Key ID for authentication. Leave empty for OAuth confirmation.

- `--r2-secret-access-key` string

  R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation.

- `--r2-prefix` string

  Prefix for storing files in the destination bucket. Default is no prefix

- `--compression` string

  Compression format for output files

- `--shard-count` number

  Number of shards for the pipeline. More shards handle higher request volume; fewer shards produce larger output files. Defaults to 2 if unset. Minimum: 1, Maximum: 15

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines delete`

Delete a pipeline

* npm

  ```sh
  npx wrangler pipelines delete [PIPELINE]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines delete [PIPELINE]
  ```

* yarn

  ```sh
  yarn wrangler pipelines delete [PIPELINE]
  ```

- `[PIPELINE]` string required

  The ID or name of the pipeline to delete

- `--force` boolean alias: --y default: false

  Skip confirmation

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines streams create`

Create a new stream

* npm

  ```sh
  npx wrangler pipelines streams create [STREAM]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines streams create [STREAM]
  ```

* yarn

  ```sh
  yarn wrangler pipelines streams create [STREAM]
  ```

- `[STREAM]` string required

  The name of the stream to create

- `--schema-file` string

  Path to JSON file containing stream schema

- `--http-enabled` boolean default: true

  Enable HTTP endpoint

- `--http-auth` boolean default: true

  Require authentication for HTTP endpoint

- `--cors-origin` string

  CORS origin

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines streams list`

List all streams

* npm

  ```sh
  npx wrangler pipelines streams list
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines streams list
  ```

* yarn

  ```sh
  yarn wrangler pipelines streams list
  ```

- `--page` number default: 1

  Page number for pagination

- `--per-page` number default: 20

  Number of streams per page

- `--pipeline-id` string

  Filter streams by pipeline ID

- `--json` boolean default: false

  Output in JSON format

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines streams get`

Get details about a specific stream

* npm

  ```sh
  npx wrangler pipelines streams get [STREAM]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines streams get [STREAM]
  ```

* yarn

  ```sh
  yarn wrangler pipelines streams get [STREAM]
  ```

- `[STREAM]` string required

  The ID of the stream to retrieve

- `--json` boolean default: false

  Output in JSON format

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines streams delete`

Delete a stream

* npm

  ```sh
  npx wrangler pipelines streams delete [STREAM]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines streams delete [STREAM]
  ```

* yarn

  ```sh
  yarn wrangler pipelines streams delete [STREAM]
  ```

- `[STREAM]` string required

  The ID of the stream to delete

- `--force` boolean alias: --y default: false

  Skip confirmation

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines sinks create`

Create a new sink

* npm

  ```sh
  npx wrangler pipelines sinks create [SINK]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines sinks create [SINK]
  ```

* yarn

  ```sh
  yarn wrangler pipelines sinks create [SINK]
  ```

- `[SINK]` string required

  The name of the sink to create

- `--type` string required

  The type of sink to create

- `--bucket` string required

  R2 bucket name

- `--format` string default: parquet

  Output format

- `--compression` string default: zstd

  Compression method (parquet only)

- `--target-row-group-size` string

  Target row group size for parquet format

- `--path` string

  The base prefix in your bucket where data will be written

- `--partitioning` string

  Time partition pattern (r2 sinks only)

- `--roll-size` number

  Roll file size in MB

- `--roll-interval` number default: 300

  Roll file interval in seconds

- `--access-key-id` string

  R2 access key ID (leave empty for R2 credentials to be automatically created)

- `--secret-access-key` string

  R2 secret access key (leave empty for R2 credentials to be automatically created)

- `--namespace` string

  Data catalog namespace (required for r2-data-catalog)

- `--table` string

  Table name within namespace (required for r2-data-catalog)

- `--catalog-token` string

  Authentication token for data catalog (required for r2-data-catalog)

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines sinks list`

List all sinks

* npm

  ```sh
  npx wrangler pipelines sinks list
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines sinks list
  ```

* yarn

  ```sh
  yarn wrangler pipelines sinks list
  ```

- `--page` number default: 1

  Page number for pagination

- `--per-page` number default: 20

  Number of sinks per page

- `--pipeline-id` string

  Filter sinks by pipeline ID

- `--json` boolean default: false

  Output in JSON format

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines sinks get`

Get details about a specific sink

* npm

  ```sh
  npx wrangler pipelines sinks get [SINK]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines sinks get [SINK]
  ```

* yarn

  ```sh
  yarn wrangler pipelines sinks get [SINK]
  ```

- `[SINK]` string required

  The ID of the sink to retrieve

- `--json` boolean default: false

  Output in JSON format

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

## `pipelines sinks delete`

Delete a sink

* npm

  ```sh
  npx wrangler pipelines sinks delete [SINK]
  ```

* pnpm

  ```sh
  pnpm wrangler pipelines sinks delete [SINK]
  ```

* yarn

  ```sh
  yarn wrangler pipelines sinks delete [SINK]
  ```

- `[SINK]` string required

  The ID of the sink to delete

- `--force` boolean alias: --y default: false

  Skip confirmation

Global flags

* `--v` boolean alias: --version

  Show version number

* `--cwd` string

  Run as if Wrangler was started in the specified directory instead of the current working directory

* `--config` string alias: --c

  Path to Wrangler configuration file

* `--env` string alias: --e

  Environment to use for operations, and for selecting .env and .dev.vars files

* `--env-file` string

  Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files

* `--experimental-provision` boolean aliases: --x-provision default: true

  Experimental: Enable automatic resource provisioning

* `--experimental-auto-create` boolean alias: --x-auto-create default: true

  Automatically provision draft bindings with new resources

</page>

<page>
---
title: Limits · Cloudflare Pipelines Docs
description: "While in open beta, the following limits are currently in effect:"
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/platform/limits/
  md: https://developers.cloudflare.com/pipelines/platform/limits/index.md
---

While in open beta, the following limits are currently in effect:

| Feature | Limit |
| - | - |
| Maximum streams per account | 20 |
| Maximum payload size per ingestion request | 1 MB |
| Maximum ingest rate per stream | 5 MB/s |
| Maximum sinks per account | 20 |
| Maximum pipelines per account | 20 |

Need a higher limit?

To request an adjustment to a limit, complete the [Limit Increase Request Form](https://forms.gle/ukpeZVLWLnKeixDu7). If the limit can be increased, Cloudflare will contact you with next steps.

</page>

<page>
---
title: Cloudflare Pipelines - Pricing · Cloudflare Pipelines Docs
description: Cloudflare Pipelines is in open beta and available to any developer
  with a Workers Paid plan.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/platform/pricing/
  md: https://developers.cloudflare.com/pipelines/platform/pricing/index.md
---

Cloudflare Pipelines is in open beta and available to any developer with a [Workers Paid plan](https://developers.cloudflare.com/workers/platform/pricing/).

We are not currently billing for Pipelines during open beta. However, you will be billed for standard [R2 storage and operations](https://developers.cloudflare.com/r2/pricing/) for data written by sinks to R2 buckets.

We plan to bill based on the volume of data processed by pipelines, transformed by pipelines, and delivered to sinks. We'll provide at least 30 days notice before we make any changes or start charging for Pipelines usage.

</page>

<page>
---
title: Available sinks · Cloudflare Pipelines Docs
description: Find detailed configuration options for each supported sink type.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: true
source_url:
  html: https://developers.cloudflare.com/pipelines/sinks/available-sinks/
  md: https://developers.cloudflare.com/pipelines/sinks/available-sinks/index.md
---

[Pipelines](https://developers.cloudflare.com/pipelines/) supports the following sink types:

* [R2](https://developers.cloudflare.com/pipelines/sinks/available-sinks/r2/)
* [R2 Data Catalog](https://developers.cloudflare.com/pipelines/sinks/available-sinks/r2-data-catalog/)

</page>

<page>
---
title: Manage sinks · Cloudflare Pipelines Docs
description: Create, configure, and manage sinks for data storage
lastUpdated: 2026-02-06T15:42:11.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sinks/manage-sinks/
  md: https://developers.cloudflare.com/pipelines/sinks/manage-sinks/index.md
---

Learn how to:

* Create and configure sinks for data storage
* View sink configuration
* Delete sinks when no longer needed

## Create a sink

Sinks are made available to pipelines as SQL tables using the sink name (e.g., `INSERT INTO my_sink SELECT * FROM my_stream`).

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.

   [Go to **Pipelines**](https://dash.cloudflare.com/?to=/:account/pipelines/overview)

2. Select **Create Pipeline** to launch the pipeline creation wizard.

3. Complete the wizard to create your sink along with the associated stream and pipeline.

### Wrangler CLI

To create a sink, run the [`pipelines sinks create`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-sinks-create) command:

```bash
npx wrangler pipelines sinks create <SINK_NAME> \
  --type r2 \
  --bucket my-bucket \
```

For sink-specific configuration options, refer to [Available sinks](https://developers.cloudflare.com/pipelines/sinks/available-sinks/).

Alternatively, to use the interactive setup wizard that helps you configure a stream, sink, and pipeline, run the [`pipelines setup`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-setup) command:

```bash
npx wrangler pipelines setup
```

## View sink configuration

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** > **Sinks**.

2. Select a sink to view its configuration.

### Wrangler CLI

To view a specific sink, run the [`pipelines sinks get`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-sinks-get) command:

```bash
npx wrangler pipelines sinks get <SINK_ID>
```

To list all sinks in your account, run the [`pipelines sinks list`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-sinks-list) command:

```bash
npx wrangler pipelines sinks list
```

## Delete a sink

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** > **Sinks**.

2. Select the sink you want to delete.

3. In the **Settings** tab, navigate to **General**, and select **Delete**.

### Wrangler CLI

To delete a sink, run the [`pipelines sinks delete`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-sinks-delete) command:

```bash
npx wrangler pipelines sinks delete <SINK_ID>
```

Warning

Deleting a sink stops all data writes to that destination.

## Limitations

* Sinks cannot be modified after creation. To change sink configuration, you must delete and recreate the sink.
* The R2 Data Catalog Sink does not currently support writing to R2 buckets into a different jurisdiction.

</page>

<page>
---
title: Scalar functions · Cloudflare Pipelines Docs
description: Scalar functions available in Cloudflare Pipelines SQL.
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: true
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/index.md
---

[Pipelines](https://developers.cloudflare.com/pipelines/) scalar functions:

* [Math functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/math/)
* [Conditional functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/conditional/)
* [String functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/string/)
* [Binary string functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/binary-string/)
* [Regex functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/regex/)
* [JSON functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/json/)
* [Time and date functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/time-and-date/)
* [Array functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/array/)
* [Struct functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/struct/)
* [Hashing functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/hashing/)
* [Other functions](https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/other/)

</page>

<page>
---
title: SELECT statements · Cloudflare Pipelines Docs
description: Query syntax for data transformation in Cloudflare Pipelines SQL
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/select-statements/
  md: https://developers.cloudflare.com/pipelines/sql-reference/select-statements/index.md
---

SELECT statements are used to transform data in Cloudflare Pipelines. The general form is:

```sql
[WITH with_query [, ...]]
SELECT select_expr [, ...]
FROM from_item
[WHERE condition]
```

## WITH clause

The WITH clause allows you to define named subqueries that can be referenced in the main query. This can improve query readability by breaking down complex transformations.

Syntax:

```sql
WITH query_name AS (subquery) [, ...]
```

Simple example:

```sql
WITH filtered_events AS
    (SELECT user_id, event_type, amount
        FROM user_events WHERE amount > 50)
SELECT user_id, amount * 1.1 as amount_with_tax
FROM filtered_events
WHERE event_type = 'purchase';
```

## SELECT clause

The SELECT clause is a comma-separated list of expressions, with optional aliases. Column names must be unique.

```sql
SELECT select_expr [, ...]
```

Examples:

```sql
-- Select specific columns
SELECT user_id, event_type, amount FROM events


-- Use expressions and aliases
SELECT
    user_id,
    amount * 1.1 as amount_with_tax,
    UPPER(event_type) as event_type_upper
FROM events


-- Select all columns
SELECT * FROM events
```

## FROM clause

The FROM clause specifies the data source for the query. It will be either a table name or subquery. The table name can be either a stream name or a table created in the WITH clause.

```sql
FROM from_item
```

Tables can be given aliases:

```sql
SELECT e.user_id, e.amount
FROM user_events e
WHERE e.event_type = 'purchase'
```

## WHERE clause

The WHERE clause filters data using boolean conditions. Predicates are applied to input rows.

```sql
WHERE condition
```

Examples:

```sql
-- Filter by field value
SELECT * FROM events WHERE event_type = 'purchase'


-- Multiple conditions
SELECT * FROM events
WHERE event_type = 'purchase' AND amount > 50


-- String operations
SELECT * FROM events
WHERE user_id LIKE 'user_%'


-- Null checks
SELECT * FROM events
WHERE description IS NOT NULL
```

## UNNEST operator

The UNNEST operator converts arrays into multiple rows. This is useful for processing list data types.

UNNEST restrictions:

* May only appear in the SELECT clause
* Only one array may be unnested per SELECT statement

Example:

```sql
SELECT
    UNNEST([1, 2, 3]) as numbers
FROM events;
```

This will produce:

```plaintext
+---------+
| numbers |
+---------+
|       1 |
|       2 |
|       3 |
+---------+
```

</page>

<page>
---
title: SQL data types · Cloudflare Pipelines Docs
description: Supported data types in Cloudflare Pipelines SQL
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/sql-data-types/
  md: https://developers.cloudflare.com/pipelines/sql-reference/sql-data-types/index.md
---

Cloudflare Pipelines supports a set of primitive and composite data types for SQL transformations. These types can be used in stream schemas and SQL literals with automatic type inference.

## Primitive types

| Pipelines | SQL Types | Example Literals |
| - | - | - |
| `bool` | `BOOLEAN` | `TRUE`, `FALSE` |
| `int32` | `INT`, `INTEGER` | `0`, `1`, `-2` |
| `int64` | `BIGINT` | `0`, `1`, `-2` |
| `float32` | `FLOAT`, `REAL` | `0.0`, `-2.4`, `1E-3` |
| `float64` | `DOUBLE` | `0.0`, `-2.4`, `1E-35` |
| `string` | `VARCHAR`, `CHAR`, `TEXT`, `STRING` | `"hello"`, `"world"` |
| `timestamp` | `TIMESTAMP` | `'2020-01-01'`, `'2023-05-17T22:16:00.648662+00:00'` |
| `binary` | `BYTEA` | `X'A123'` (hex) |
| `json` | `JSON` | `'{"event": "purchase", "amount": 29.99}'` |

## Composite types

In addition to primitive types, Pipelines SQL supports composite types for more complex data structures.

### List types

Lists group together zero or more elements of the same type. In stream schemas, lists are declared using the `list` type with an `items` field specifying the element type. In SQL, lists correspond to arrays and are declared by suffixing another type with `[]`, for example `INT[]`.

List values can be indexed using 1-indexed subscript notation (`v[1]` is the first element of `v`).

Lists can be constructed via `[]` literals:

```sql
SELECT [1, 2, 3] as numbers
```

Pipelines provides array functions for manipulating list values, and lists may be unnested using the `UNNEST` operator.

### Struct types

Structs combine related fields into a single value. In stream schemas, structs are declared using the `struct` type with a `fields` array. In SQL, structs can be created using the `struct` function.

Example creating a struct in SQL:

```sql
SELECT struct('user123', 'purchase', 29.99) as event_data FROM events
```

This creates a struct with fields `c0`, `c1`, `c2` containing the user ID, event type, and amount.

Struct fields can be accessed via `.` notation, for example `event_data.c0` for the user ID.

</page>

<page>
---
title: Manage streams · Cloudflare Pipelines Docs
description: Create, configure, and manage streams for data ingestion
lastUpdated: 2026-02-24T14:35:21.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/streams/manage-streams/
  md: https://developers.cloudflare.com/pipelines/streams/manage-streams/index.md
---

Learn how to:

* Create and configure streams for data ingestion
* View and update stream settings
* Delete streams when no longer needed

## Create a stream

Streams are made available to pipelines as SQL tables using the stream name (for example, `SELECT * FROM my_stream`).

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.

   [Go to **Pipelines**](https://dash.cloudflare.com/?to=/:account/pipelines/overview)

2. Select **Create Pipeline** to launch the pipeline creation wizard.

3. Complete the wizard to create your stream along with the associated sink and pipeline.

### Wrangler CLI

To create a stream, run the [`pipelines streams create`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-streams-create) command:

```bash
npx wrangler pipelines streams create <STREAM_NAME>
```

Alternatively, to use the interactive setup wizard that helps you configure a stream, sink, and pipeline, run the [`pipelines setup`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-setup) command:

```bash
npx wrangler pipelines setup
```

### Schema configuration

Streams support two approaches for handling data:

* **Structured streams**: Define a schema with specific fields and data types. Events are validated against the schema.
* **Unstructured streams**: Accept any valid JSON without validation. These streams have a single `value` column containing the JSON data.

To create a structured stream, provide a schema file:

```bash
npx wrangler pipelines streams create my-stream --schema-file schema.json
```

Example schema file:

```json
{
  "fields": [
    {
      "name": "user_id",
      "type": "string",
      "required": true
    },
    {
      "name": "amount",
      "type": "float64",
      "required": false
    },
    {
      "name": "tags",
      "type": "list",
      "required": false,
      "items": {
        "type": "string"
      }
    },
    {
      "name": "metadata",
      "type": "struct",
      "required": false,
      "fields": [
        {
          "name": "source",
          "type": "string",
          "required": false
        },
        {
          "name": "priority",
          "type": "int32",
          "required": false
        }
      ]
    }
  ]
}
```

**Supported data types:**

* `string` - Text values
* `int32`, `int64` - Integer numbers
* `float32`, `float64` - Floating-point numbers
* `bool` - Boolean true/false
* `timestamp` - RFC 3339 timestamps, or numeric values parsed as Unix seconds, milliseconds, or microseconds (depending on unit)
* `json` - JSON objects
* `binary` - Binary data (base64-encoded)
* `list` - Arrays of values
* `struct` - Nested objects with defined fields

Note

Events that do not match the defined schema are accepted during ingestion but will be dropped during processing. To monitor dropped events and understand why they were dropped, query the [user error metrics](https://developers.cloudflare.com/pipelines/observability/metrics/#user-error-metrics) via GraphQL. Schema modifications are not supported after stream creation.

## View stream configuration

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** > **Streams**.

2. Select a stream to view its associated configuration.

### Wrangler CLI

To view a specific stream, run the [`pipelines streams get`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-streams-get) command:

```bash
npx wrangler pipelines streams get <STREAM_ID>
```

To list all streams in your account, run the [`pipelines streams list`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-streams-list) command:

```bash
npx wrangler pipelines streams list
```

## Update HTTP ingest settings

You can update certain HTTP ingest settings after stream creation. Schema modifications are not supported once a stream is created.

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** > **Streams**.

2. Select the stream you want to update.

3. In the **Settings** tab, go to **HTTP Ingest**.

4. To turn on or turn off HTTP ingestion, select **Enable** or **Disable**.

5. To update authentication and CORS settings, select **Edit** and modify.

6. Save your changes.

Note

For details on configuring authentication tokens and making authenticated requests, refer to [Writing to streams](https://developers.cloudflare.com/pipelines/streams/writing-to-streams/).

## Delete a stream

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** > **Streams**.

2. Select the stream you want to delete.

3. In the **Settings** tab, go to **General**, and select **Delete**.

### Wrangler CLI

To delete a stream, run the [`pipelines streams delete`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-streams-delete) command:

```bash
npx wrangler pipelines streams delete <STREAM_ID>
```

Warning

Deleting a stream will permanently remove all buffered events that have not been processed and will delete any dependent pipelines. Ensure all data has been delivered to your sink before deletion.

</page>

<page>
---
title: Writing to streams · Cloudflare Pipelines Docs
description: Send data to streams via Worker bindings or HTTP endpoints
lastUpdated: 2026-02-24T14:35:21.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/streams/writing-to-streams/
  md: https://developers.cloudflare.com/pipelines/streams/writing-to-streams/index.md
---

Send events to streams using [Worker bindings](https://developers.cloudflare.com/workers/runtime-apis/bindings/) or HTTP endpoints for client-side applications and external systems.

## Send via Workers

Worker bindings provide a secure way to send data to streams from [Workers](https://developers.cloudflare.com/workers/) without managing API tokens or credentials.

### Configure pipeline binding

Add a pipeline binding to your Wrangler file that points to your stream:

* wrangler.jsonc

  ```jsonc
  {
    "pipelines": [
      {
        "pipeline": "<STREAM_ID>",
        "binding": "STREAM"
      }
    ]
  }
  ```

* wrangler.toml

  ```toml
  [[pipelines]]
  pipeline = "<STREAM_ID>"
  binding = "STREAM"
  ```

### Workers API

The pipeline binding exposes a method for sending data to your stream:

#### `send(records)`

Sends an array of JSON-serializable records to the stream. Returns a Promise that resolves when records are confirmed as ingested.

* JavaScript

  ```js
  export default {
    async fetch(request, env, ctx) {
      const events = await request.json();


      await env.STREAM.send(events);


      return new Response("Events sent");
    },
  };
  ```

* TypeScript

  ```ts
  export default {
    async fetch(request, env, ctx): Promise<Response> {
      const events = await request.json<Record<string, unknown>[]>();


      await env.STREAM.send(events);


      return new Response("Events sent");
    },
  } satisfies ExportedHandler<Env>;
  ```

### Typed pipeline bindings

When a stream has a defined schema, running `wrangler types` generates schema-specific TypeScript types for your pipeline bindings. Instead of the generic `Pipeline<PipelineRecord>`, your bindings get a named record type with full autocomplete and compile-time type checking. Refer to the [`wrangler types` documentation](https://developers.cloudflare.com/workers/wrangler/commands/#types) to learn more.

#### Generated types

After running `wrangler types`, the generated `worker-configuration.d.ts` file contains a named record type inside the `Cloudflare` namespace. The type name is derived from the stream name (not the binding name), converted to PascalCase with a `Record` suffix.

Below is an example of what generated types look like in `worker-configuration.d.ts` for a stream named `ecommerce_stream`:

```typescript
declare namespace Cloudflare {
  type EcommerceStreamRecord = {
    user_id: string;
    event_type: string;
    product_id?: string;
    amount?: number;
  };
  interface Env {
    STREAM: import("cloudflare:pipelines").Pipeline<Cloudflare.EcommerceStreamRecord>;
  }
}
```

#### Fallback behavior

`wrangler types` falls back to the generic `Pipeline<PipelineRecord>` type in the following scenarios:

* **Not authenticated**: Run `wrangler login` to enable typed pipeline bindings.
* **Stream not found**: The stream ID in your Wrangler configuration does not match an existing stream.
* **Unstructured stream**: The stream was created without a schema.

## Send via HTTP

Each stream provides an optional HTTP endpoint for ingesting data from external applications, browsers, or any system that can make HTTP requests.

### Endpoint format

HTTP endpoints follow this format:

```txt
https://{stream-id}.ingest.cloudflare.com
```

Find your stream's endpoint URL in the Cloudflare dashboard under **Pipelines** > **Streams** or using the Wrangler CLI:

```bash
npx wrangler pipelines streams get <STREAM_ID>
```

### Making requests

Send events as JSON arrays via POST requests:

```bash
curl -X POST https://{stream-id}.ingest.cloudflare.com \
  -H "Content-Type: application/json" \
  -d '[
    {
      "user_id": "12345",
      "event_type": "purchase",
      "product_id": "widget-001",
      "amount": 29.99
    }
  ]'
```

### Authentication

When authentication is enabled for your stream, include the API token in the `Authorization` header:

```bash
curl -X POST https://{stream-id}.ingest.cloudflare.com \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_API_TOKEN" \
  -d '[{"event": "test"}]'
```

The API token must have **Workers Pipeline Send** permission. To learn more, refer to the [Create API token](https://developers.cloudflare.com/fundamentals/api/get-started/create-token/) documentation.

## Schema validation

Streams handle validation differently based on their configuration:

* **Structured streams**: Events must match the defined schema fields and types.
* **Unstructured streams**: Accept any valid JSON structure. Data is stored in a single `value` column.

For structured streams, ensure your events match the schema definition. Invalid events will be accepted but dropped, so validate your data before sending to avoid dropped events. When using Worker bindings, run `wrangler types` to generate [typed pipeline bindings](#typed-pipeline-bindings) that catch schema violations at compile time. You can also query the [user error metrics](https://developers.cloudflare.com/pipelines/observability/metrics/#user-error-metrics) to monitor dropped events and diagnose schema validation issues.

</page>

<page>
---
title: R2 · Cloudflare Pipelines Docs
description: Write data as JSON or Parquet files to R2 object storage
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sinks/available-sinks/r2/
  md: https://developers.cloudflare.com/pipelines/sinks/available-sinks/r2/index.md
---

R2 sinks write processed data from pipelines as raw files to [R2 object storage](https://developers.cloudflare.com/r2/). They currently support writing to JSON and Parquet formats.

To create an R2 sink, run the [`pipelines sinks create`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-sinks-create) command and specify the sink type and target [bucket](https://developers.cloudflare.com/r2/buckets/):

```bash
npx wrangler pipelines sinks create my-sink \
  --type r2 \
  --bucket my-bucket
```

## Format options

R2 sinks support two output formats:

### JSON format

Write data as newline-delimited JSON files:

```bash
--format json
```

### Parquet format

Write data as Parquet files for better query performance and compression:

```bash
--format parquet --compression zstd
```

**Compression options for Parquet:**

* `zstd` (default) - Best compression ratio
* `snappy` - Fastest compression
* `gzip` - Good compression, widely supported
* `lz4` - Fast compression with reasonable ratio
* `uncompressed` - No compression

**Row group size:** [Row groups](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size in MB:

```bash
--target-row-group-size 256
```

## File organization

Files are written with UUID names within the partitioned directory structure. For example, with path `analytics` and default partitioning:

```plaintext
analytics/year=2025/month=09/day=18/002507a5-d449-48e8-a484-b1bea916102f.parquet
```

### Path

Set a base directory in your bucket where files will be written:

```bash
--path analytics/events
```

### Partitioning

R2 sinks automatically partition files by time using a configurable pattern. The default pattern is `year=%Y/month=%m/day=%d` (Hive-style partitioning).

```bash
--partitioning "year=%Y/month=%m/day=%d/hour=%H"
```

For available format specifiers, refer to [strftime documentation](https://docs.rs/chrono/latest/chrono/format/strftime/index.html).

## Batching and rolling policy

Control when files are written to R2. Configure based on your needs:

* **Lower values**: More frequent writes, smaller files, lower latency
* **Higher values**: Less frequent writes, larger files, better query performance

### Roll interval

Set how often files are written (default: 300 seconds):

```bash
--roll-interval 60  # Write files every 60 seconds
```

### Roll size

Set maximum file size in MB before creating a new file:

```bash
--roll-size 100  # Create new file after 100MB
```

## Authentication

R2 sinks require an API credentials (Access Key ID and Secret Access Key) with [Object Read & Write permissions](https://developers.cloudflare.com/r2/api/tokens/#permissions) to write data to your bucket.

```bash
npx wrangler pipelines sinks create my-sink \
  --type r2 \
  --bucket my-bucket \
  --access-key-id YOUR_ACCESS_KEY_ID \
  --secret-access-key YOUR_SECRET_ACCESS_KEY
```

</page>

<page>
---
title: R2 Data Catalog · Cloudflare Pipelines Docs
description: Write data as Apache Iceberg tables to R2 Data Catalog
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sinks/available-sinks/r2-data-catalog/
  md: https://developers.cloudflare.com/pipelines/sinks/available-sinks/r2-data-catalog/index.md
---

R2 Data Catalog sinks write processed data from pipelines as [Apache Iceberg](https://iceberg.apache.org/) tables to [R2 Data Catalog](https://developers.cloudflare.com/r2/data-catalog/). Iceberg tables provide ACID transactions, schema evolution, and time travel capabilities for analytics workloads.

To create an R2 Data Catalog sink, run the [`pipelines sinks create`](https://developers.cloudflare.com/workers/wrangler/commands/#pipelines-sinks-create) command and specify the sink type, target bucket, namespace, and table name:

```bash
npx wrangler pipelines sinks create my-sink \
  --type r2-data-catalog \
  --bucket my-bucket \
  --namespace my_namespace \
  --table my_table \
  --catalog-token YOUR_CATALOG_TOKEN
```

The sink will create the specified namespace and table if they do not exist. Sinks cannot be created for existing Iceberg tables.

## Format

R2 Data Catalog sinks only support Parquet format. JSON format is not supported for Iceberg tables.

### Compression options

Configure Parquet compression for optimal storage and query performance:

```bash
--compression zstd
```

**Available compression options:**

* `zstd` (default) - Best compression ratio
* `snappy` - Fastest compression
* `gzip` - Good compression, widely supported
* `lz4` - Fast compression with reasonable ratio
* `uncompressed` - No compression

### Row group size

[Row groups](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size in MB:

```bash
--target-row-group-size 256
```

## Batching and rolling policy

Control when data is written to Iceberg tables. Configure based on your needs:

* **Lower values**: More frequent writes, smaller files, lower latency
* **Higher values**: Less frequent writes, larger files, better query performance

### Roll interval

Set how often files are written (default: 300 seconds):

```bash
--roll-interval 60  # Write files every 60 seconds
```

### Roll size

Set maximum file size in MB before creating a new file:

```bash
--roll-size 100  # Create new file after 100MB
```

## Authentication

R2 Data Catalog sinks require an API token with [R2 Admin Read & Write permissions](https://developers.cloudflare.com/r2/data-catalog/manage-catalogs/#create-api-token-in-the-dashboard). This permission grants the sink access to both R2 Data Catalog and R2 storage.

```bash
--catalog-token YOUR_CATALOG_TOKEN
```

</page>

<page>
---
title: Array functions · Cloudflare Pipelines Docs
description: Scalar functions for manipulating arrays
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/array/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/array/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `array_append`

Appends an element to the end of an array.

```plaintext
array_append(array, element)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to append to the array.

**Example**

```plaintext
> select array_append([1, 2, 3], 4);
+--------------------------------------+
| array_append(List([1,2,3]),Int64(4)) |
+--------------------------------------+
| [1, 2, 3, 4]                         |
+--------------------------------------+
```

**Aliases**

* array\_push\_back
* list\_append
* list\_push\_back

## `array_sort`

Sort array.

```plaintext
array_sort(array, desc, nulls_first)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **desc**: Whether to sort in descending order(`ASC` or `DESC`).
* **nulls\_first**: Whether to sort nulls first(`NULLS FIRST` or `NULLS LAST`).

**Example**

```plaintext
> select array_sort([3, 1, 2]);
+-----------------------------+
| array_sort(List([3,1,2]))   |
+-----------------------------+
| [1, 2, 3]                   |
+-----------------------------+
```

**Aliases**

* list\_sort

## `array_resize`

Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set.

```plaintext
array_resize(array, size, value)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **size**: New size of given array.
* **value**: Defines new elements' value or empty if value is not set.

**Example**

```plaintext
> select array_resize([1, 2, 3], 5, 0);
+-------------------------------------+
| array_resize(List([1,2,3],5,0))     |
+-------------------------------------+
| [1, 2, 3, 0, 0]                     |
+-------------------------------------+
```

**Aliases**

* list\_resize

## `array_cat`

*Alias of [array\_concat](#array_concat).*

## `array_concat`

Concatenates arrays.

```plaintext
array_concat(array[, ..., array_n])
```

**Arguments**

* **array**: Array expression to concatenate. Can be a constant, column, or function, and any combination of array operators.
* **array\_n**: Subsequent array column or literal array to concatenate.

**Example**

```plaintext
> select array_concat([1, 2], [3, 4], [5, 6]);
+---------------------------------------------------+
| array_concat(List([1,2]),List([3,4]),List([5,6])) |
+---------------------------------------------------+
| [1, 2, 3, 4, 5, 6]                                |
+---------------------------------------------------+
```

**Aliases**

* array\_cat
* list\_cat
* list\_concat

## `array_contains`

*Alias of [array\_has](#array_has).*

## `array_has`

Returns true if the array contains the element

```plaintext
array_has(array, element)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Scalar or Array expression. Can be a constant, column, or function, and any combination of array operators.

**Aliases**

* list\_has

## `array_has_all`

Returns true if all elements of sub-array exist in array

```plaintext
array_has_all(array, sub-array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **sub-array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Aliases**

* list\_has\_all

## `array_has_any`

Returns true if any elements exist in both arrays

```plaintext
array_has_any(array, sub-array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **sub-array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Aliases**

* list\_has\_any

## `array_dims`

Returns an array of the array's dimensions.

```plaintext
array_dims(array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_dims([[1, 2, 3], [4, 5, 6]]);
+---------------------------------+
| array_dims(List([1,2,3,4,5,6])) |
+---------------------------------+
| [2, 3]                          |
+---------------------------------+
```

**Aliases**

* list\_dims

## `array_distinct`

Returns distinct values from the array after removing duplicates.

```plaintext
array_distinct(array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_distinct([1, 3, 2, 3, 1, 2, 4]);
+---------------------------------+
| array_distinct(List([1,2,3,4])) |
+---------------------------------+
| [1, 2, 3, 4]                    |
+---------------------------------+
```

**Aliases**

* list\_distinct

## `array_element`

Extracts the element with the index n from the array.

```plaintext
array_element(array, index)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **index**: Index to extract the element from the array.

**Example**

```plaintext
> select array_element([1, 2, 3, 4], 3);
+-----------------------------------------+
| array_element(List([1,2,3,4]),Int64(3)) |
+-----------------------------------------+
| 3                                       |
+-----------------------------------------+
```

**Aliases**

* array\_extract
* list\_element
* list\_extract

## `array_extract`

*Alias of [array\_element](#array_element).*

## `array_fill`

Returns an array filled with copies of the given value.

DEPRECATED: use `array_repeat` instead!

```plaintext
array_fill(element, array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to copy to the array.

## `flatten`

Converts an array of arrays to a flat array

* Applies to any depth of nested arrays
* Does not change arrays that are already flat

The flattened array contains all the elements from all source arrays.

**Arguments**

* **array**: Array expression Can be a constant, column, or function, and any combination of array operators.

```plaintext
flatten(array)
```

## `array_indexof`

*Alias of [array\_position](#array_position).*

## `array_intersect`

Returns an array of elements in the intersection of array1 and array2.

```plaintext
array_intersect(array1, array2)
```

**Arguments**

* **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_intersect([1, 2, 3, 4], [5, 6, 3, 4]);
+----------------------------------------------------+
| array_intersect([1, 2, 3, 4], [5, 6, 3, 4]);       |
+----------------------------------------------------+
| [3, 4]                                             |
+----------------------------------------------------+
> select array_intersect([1, 2, 3, 4], [5, 6, 7, 8]);
+----------------------------------------------------+
| array_intersect([1, 2, 3, 4], [5, 6, 7, 8]);       |
+----------------------------------------------------+
| []                                                 |
+----------------------------------------------------+
```

***

**Aliases**

* list\_intersect

## `array_join`

*Alias of [array\_to\_string](#array_to_string).*

## `array_length`

Returns the length of the array dimension.

```plaintext
array_length(array, dimension)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **dimension**: Array dimension.

**Example**

```plaintext
> select array_length([1, 2, 3, 4, 5]);
+---------------------------------+
| array_length(List([1,2,3,4,5])) |
+---------------------------------+
| 5                               |
+---------------------------------+
```

**Aliases**

* list\_length

## `array_ndims`

Returns the number of dimensions of the array.

```plaintext
array_ndims(array, element)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_ndims([[1, 2, 3], [4, 5, 6]]);
+----------------------------------+
| array_ndims(List([1,2,3,4,5,6])) |
+----------------------------------+
| 2                                |
+----------------------------------+
```

**Aliases**

* list\_ndims

## `array_prepend`

Prepends an element to the beginning of an array.

```plaintext
array_prepend(element, array)
```

**Arguments**

* **element**: Element to prepend to the array.
* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_prepend(1, [2, 3, 4]);
+---------------------------------------+
| array_prepend(Int64(1),List([2,3,4])) |
+---------------------------------------+
| [1, 2, 3, 4]                          |
+---------------------------------------+
```

**Aliases**

* array\_push\_front
* list\_prepend
* list\_push\_front

## `array_pop_front`

Returns the array without the first element.

```plaintext
array_pop_front(array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_pop_front([1, 2, 3]);
+-------------------------------+
| array_pop_front(List([1,2,3])) |
+-------------------------------+
| [2, 3]                        |
+-------------------------------+
```

**Aliases**

* list\_pop\_front

## `array_pop_back`

Returns the array without the last element.

```plaintext
array_pop_back(array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_pop_back([1, 2, 3]);
+-------------------------------+
| array_pop_back(List([1,2,3])) |
+-------------------------------+
| [1, 2]                        |
+-------------------------------+
```

**Aliases**

* list\_pop\_back

## `array_position`

Returns the position of the first occurrence of the specified element in the array.

```plaintext
array_position(array, element)
array_position(array, element, index)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to search for position in the array.
* **index**: Index at which to start searching.

**Example**

```plaintext
> select array_position([1, 2, 2, 3, 1, 4], 2);
+----------------------------------------------+
| array_position(List([1,2,2,3,1,4]),Int64(2)) |
+----------------------------------------------+
| 2                                            |
+----------------------------------------------+
```

**Aliases**

* array\_indexof
* list\_indexof
* list\_position

## `array_positions`

Searches for an element in the array, returns all occurrences.

```plaintext
array_positions(array, element)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to search for positions in the array.

**Example**

```plaintext
> select array_positions([1, 2, 2, 3, 1, 4], 2);
+-----------------------------------------------+
| array_positions(List([1,2,2,3,1,4]),Int64(2)) |
+-----------------------------------------------+
| [2, 3]                                        |
+-----------------------------------------------+
```

**Aliases**

* list\_positions

## `array_push_back`

*Alias of [array\_append](#array_append).*

## `array_push_front`

*Alias of [array\_prepend](#array_prepend).*

## `array_repeat`

Returns an array containing element `count` times.

```plaintext
array_repeat(element, count)
```

**Arguments**

* **element**: Element expression. Can be a constant, column, or function, and any combination of array operators.
* **count**: Value of how many times to repeat the element.

**Example**

```plaintext
> select array_repeat(1, 3);
+---------------------------------+
| array_repeat(Int64(1),Int64(3)) |
+---------------------------------+
| [1, 1, 1]                       |
+---------------------------------+
```

```plaintext
> select array_repeat([1, 2], 2);
+------------------------------------+
| array_repeat(List([1,2]),Int64(2)) |
+------------------------------------+
| [[1, 2], [1, 2]]                   |
+------------------------------------+
```

**Aliases**

* list\_repeat

## `array_remove`

Removes the first element from the array equal to the given value.

```plaintext
array_remove(array, element)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to be removed from the array.

**Example**

```plaintext
> select array_remove([1, 2, 2, 3, 2, 1, 4], 2);
+----------------------------------------------+
| array_remove(List([1,2,2,3,2,1,4]),Int64(2)) |
+----------------------------------------------+
| [1, 2, 3, 2, 1, 4]                           |
+----------------------------------------------+
```

**Aliases**

* list\_remove

## `array_remove_n`

Removes the first `max` elements from the array equal to the given value.

```plaintext
array_remove_n(array, element, max)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to be removed from the array.
* **max**: Number of first occurrences to remove.

**Example**

```plaintext
> select array_remove_n([1, 2, 2, 3, 2, 1, 4], 2, 2);
+---------------------------------------------------------+
| array_remove_n(List([1,2,2,3,2,1,4]),Int64(2),Int64(2)) |
+---------------------------------------------------------+
| [1, 3, 2, 1, 4]                                         |
+---------------------------------------------------------+
```

**Aliases**

* list\_remove\_n

## `array_remove_all`

Removes all elements from the array equal to the given value.

```plaintext
array_remove_all(array, element)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to be removed from the array.

**Example**

```plaintext
> select array_remove_all([1, 2, 2, 3, 2, 1, 4], 2);
+--------------------------------------------------+
| array_remove_all(List([1,2,2,3,2,1,4]),Int64(2)) |
+--------------------------------------------------+
| [1, 3, 1, 4]                                     |
+--------------------------------------------------+
```

**Aliases**

* list\_remove\_all

## `array_replace`

Replaces the first occurrence of the specified element with another specified element.

```plaintext
array_replace(array, from, to)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **from**: Initial element.
* **to**: Final element.

**Example**

```plaintext
> select array_replace([1, 2, 2, 3, 2, 1, 4], 2, 5);
+--------------------------------------------------------+
| array_replace(List([1,2,2,3,2,1,4]),Int64(2),Int64(5)) |
+--------------------------------------------------------+
| [1, 5, 2, 3, 2, 1, 4]                                  |
+--------------------------------------------------------+
```

**Aliases**

* list\_replace

## `array_replace_n`

Replaces the first `max` occurrences of the specified element with another specified element.

```plaintext
array_replace_n(array, from, to, max)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **from**: Initial element.
* **to**: Final element.
* **max**: Number of first occurrences to replace.

**Example**

```plaintext
> select array_replace_n([1, 2, 2, 3, 2, 1, 4], 2, 5, 2);
+-------------------------------------------------------------------+
| array_replace_n(List([1,2,2,3,2,1,4]),Int64(2),Int64(5),Int64(2)) |
+-------------------------------------------------------------------+
| [1, 5, 5, 3, 2, 1, 4]                                             |
+-------------------------------------------------------------------+
```

**Aliases**

* list\_replace\_n

## `array_replace_all`

Replaces all occurrences of the specified element with another specified element.

```plaintext
array_replace_all(array, from, to)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **from**: Initial element.
* **to**: Final element.

**Example**

```plaintext
> select array_replace_all([1, 2, 2, 3, 2, 1, 4], 2, 5);
+------------------------------------------------------------+
| array_replace_all(List([1,2,2,3,2,1,4]),Int64(2),Int64(5)) |
+------------------------------------------------------------+
| [1, 5, 5, 3, 5, 1, 4]                                      |
+------------------------------------------------------------+
```

**Aliases**

* list\_replace\_all

## `array_reverse`

Returns the array with the order of the elements reversed.

```plaintext
array_reverse(array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_reverse([1, 2, 3, 4]);
+------------------------------------------------------------+
| array_reverse(List([1, 2, 3, 4]))                          |
+------------------------------------------------------------+
| [4, 3, 2, 1]                                               |
+------------------------------------------------------------+
```

**Aliases**

* list\_reverse

## `array_slice`

Returns a slice of the array based on 1-indexed start and end positions.

```plaintext
array_slice(array, begin, end)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **begin**: Index of the first element. If negative, it counts backward from the end of the array.
* **end**: Index of the last element. If negative, it counts backward from the end of the array.
* **stride**: Stride of the array slice. The default is 1.

**Example**

```plaintext
> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
+--------------------------------------------------------+
| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
+--------------------------------------------------------+
| [3, 4, 5, 6]                                           |
+--------------------------------------------------------+
```

**Aliases**

* list\_slice

## `array_to_string`

Converts each element to its text representation.

```plaintext
array_to_string(array, delimiter)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **delimiter**: Array element separator.

**Example**

```plaintext
> select array_to_string([[1, 2, 3, 4], [5, 6, 7, 8]], ',');
+----------------------------------------------------+
| array_to_string(List([1,2,3,4,5,6,7,8]),Utf8(",")) |
+----------------------------------------------------+
| 1,2,3,4,5,6,7,8                                    |
+----------------------------------------------------+
```

**Aliases**

* array\_join
* list\_join
* list\_to\_string

## `array_union`

Returns an array of elements that are present in both arrays (all elements from both arrays) with out duplicates.

```plaintext
array_union(array1, array2)
```

**Arguments**

* **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
+----------------------------------------------------+
| array_union([1, 2, 3, 4], [5, 6, 3, 4]);           |
+----------------------------------------------------+
| [1, 2, 3, 4, 5, 6]                                 |
+----------------------------------------------------+
> select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
+----------------------------------------------------+
| array_union([1, 2, 3, 4], [5, 6, 7, 8]);           |
+----------------------------------------------------+
| [1, 2, 3, 4, 5, 6, 7, 8]                           |
+----------------------------------------------------+
```

***

**Aliases**

* list\_union

## `array_except`

Returns an array of the elements that appear in the first array but not in the second.

```plaintext
array_except(array1, array2)
```

**Arguments**

* **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
+----------------------------------------------------+
| array_except([1, 2, 3, 4], [5, 6, 3, 4]);           |
+----------------------------------------------------+
| [1, 2]                                 |
+----------------------------------------------------+
> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
+----------------------------------------------------+
| array_except([1, 2, 3, 4], [3, 4, 5, 6]);           |
+----------------------------------------------------+
| [1, 2]                                 |
+----------------------------------------------------+
```

***

**Aliases**

* list\_except

## `cardinality`

Returns the total number of elements in the array.

```plaintext
cardinality(array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select cardinality([[1, 2, 3, 4], [5, 6, 7, 8]]);
+--------------------------------------+
| cardinality(List([1,2,3,4,5,6,7,8])) |
+--------------------------------------+
| 8                                    |
+--------------------------------------+
```

## `empty`

Returns 1 for an empty array or 0 for a non-empty array.

```plaintext
empty(array)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```plaintext
> select empty([1]);
+------------------+
| empty(List([1])) |
+------------------+
| 0                |
+------------------+
```

**Aliases**

* array\_empty,
* list\_empty

## `generate_series`

Similar to the range function, but it includes the upper bound.

```plaintext
generate_series(start, stop, step)
```

**Arguments**

* **start**: start of the range
* **end**: end of the range (included)
* **step**: increase by step (can not be 0)

**Example**

```plaintext
> select generate_series(1,3);
+------------------------------------+
| generate_series(Int64(1),Int64(3)) |
+------------------------------------+
| [1, 2, 3]                          |
+------------------------------------+
```

## `list_append`

*Alias of [array\_append](#array_append).*

## `list_cat`

*Alias of [array\_concat](#array_concat).*

## `list_concat`

*Alias of [array\_concat](#array_concat).*

## `list_dims`

*Alias of [array\_dims](#array_dims).*

## `list_distinct`

*Alias of [array\_dims](#array_distinct).*

## `list_element`

*Alias of [array\_element](#array_element).*

## `list_empty`

*Alias of [empty](#empty).*

## `list_except`

*Alias of [array\_element](#array_except).*

## `list_extract`

*Alias of [array\_element](#array_element).*

## `list_has`

*Alias of [array\_has](#array_has).*

## `list_has_all`

*Alias of [array\_has\_all](#array_has_all).*

## `list_has_any`

*Alias of [array\_has\_any](#array_has_any).*

## `list_indexof`

*Alias of [array\_position](#array_position).*

## `list_intersect`

*Alias of [array\_position](#array_intersect).*

## `list_join`

*Alias of [array\_to\_string](#array_to_string).*

## `list_length`

*Alias of [array\_length](#array_length).*

## `list_ndims`

*Alias of [array\_ndims](#array_ndims).*

## `list_prepend`

*Alias of [array\_prepend](#array_prepend).*

## `list_pop_back`

*Alias of [array\_pop\_back](#array_pop_back).*

## `list_pop_front`

*Alias of [array\_pop\_front](#array_pop_front).*

## `list_position`

*Alias of [array\_position](#array_position).*

## `list_positions`

*Alias of [array\_positions](#array_positions).*

## `list_push_back`

*Alias of [array\_append](#array_append).*

## `list_push_front`

*Alias of [array\_prepend](#array_prepend).*

## `list_repeat`

*Alias of [array\_repeat](#array_repeat).*

## `list_resize`

*Alias of [array\_resize](#array_resize).*

## `list_remove`

*Alias of [array\_remove](#array_remove).*

## `list_remove_n`

*Alias of [array\_remove\_n](#array_remove_n).*

## `list_remove_all`

*Alias of [array\_remove\_all](#array_remove_all).*

## `list_replace`

*Alias of [array\_replace](#array_replace).*

## `list_replace_n`

*Alias of [array\_replace\_n](#array_replace_n).*

## `list_replace_all`

*Alias of [array\_replace\_all](#array_replace_all).*

## `list_reverse`

*Alias of [array\_reverse](#array_reverse).*

## `list_slice`

*Alias of [array\_slice](#array_slice).*

## `list_sort`

*Alias of [array\_sort](#array_sort).*

## `list_to_string`

*Alias of [array\_to\_string](#array_to_string).*

## `list_union`

*Alias of [array\_union](#array_union).*

## `make_array`

Returns an Arrow array using the specified input expressions.

```plaintext
make_array(expression1[, ..., expression_n])
```

## `array_empty`

*Alias of [empty](#empty).*

**Arguments**

* **expression\_n**: Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators.

**Example**

```plaintext
> select make_array(1, 2, 3, 4, 5);
+----------------------------------------------------------+
| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
+----------------------------------------------------------+
| [1, 2, 3, 4, 5]                                          |
+----------------------------------------------------------+
```

**Aliases**

* make\_list

## `make_list`

*Alias of [make\_array](#make_array).*

## `string_to_array`

Splits a string in to an array of substrings based on a delimiter. Any substrings matching the optional `null_str` argument are replaced with NULL. `SELECT string_to_array('abc##def', '##')` or `SELECT string_to_array('abc def', ' ', 'def')`

```plaintext
starts_with(str, delimiter[, null_str])
```

**Arguments**

* **str**: String expression to split.
* **delimiter**: Delimiter string to split on.
* **null\_str**: Substring values to be replaced with `NULL`

**Aliases**

* string\_to\_list

## `string_to_list`

*Alias of [string\_to\_array](#string_to_array).*

## `trim_array`

Removes the last n elements from the array.

DEPRECATED: use `array_slice` instead!

```plaintext
trim_array(array, n)
```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **n**: Element to trim the array.

## `range`

Returns an Arrow array between start and stop with step. `SELECT range(2, 10, 3) -> [2, 5, 8]` or `SELECT range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);`

The range start..end contains all values with start <= x < end. It is empty if start >= end.

Step can not be 0 (then the range will be nonsense.).

Note that when the required range is a number, it accepts (stop), (start, stop), and (start, stop, step) as parameters, but when the required range is a date, it must be 3 non-NULL parameters. For example,

```plaintext
SELECT range(3);
SELECT range(1,5);
SELECT range(1,5,1);
```

are allowed in number ranges

but in date ranges, only

```plaintext
SELECT range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);
```

is allowed, and

```plaintext
SELECT range(DATE '1992-09-01', DATE '1993-03-01', NULL);
SELECT range(NULL, DATE '1993-03-01', INTERVAL '1' MONTH);
SELECT range(DATE '1992-09-01', NULL, INTERVAL '1' MONTH);
```

are not allowed

**Arguments**

* **start**: start of the range
* **end**: end of the range (not included)
* **step**: increase by step (can not be 0)

**Aliases**

* generate\_series

</page>

<page>
---
title: Binary string functions · Cloudflare Pipelines Docs
description: Scalar functions for manipulating binary strings
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/binary-string/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/binary-string/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `encode`

Encode binary data into a textual representation.

```plaintext
encode(expression, format)
```

**Arguments**

* **expression**: Expression containing string or binary data

* **format**: Supported formats are: `base64`, `hex`

**Related functions**: [decode](#decode)

## `decode`

Decode binary data from textual representation in string.

```plaintext
decode(expression, format)
```

**Arguments**

* **expression**: Expression containing encoded string data

* **format**: Same arguments as [encode](#encode)

**Related functions**: [encode](#encode)

</page>

<page>
---
title: Conditional functions · Cloudflare Pipelines Docs
description: Scalar functions to implement conditional logic
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/conditional/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/conditional/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `coalesce`

Returns the first of its arguments that is not *null*. Returns *null* if all arguments are *null*. This function is often used to substitute a default value for *null* values.

```plaintext
coalesce(expression1[, ..., expression_n])
```

**Arguments**

* **expression1, expression\_n**: Expression to use if previous expressions are *null*. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary.

## `nullif`

Returns *null* if *expression1* equals *expression2*; otherwise it returns *expression1*. This can be used to perform the inverse operation of [`coalesce`](#coalesce).

```plaintext
nullif(expression1, expression2)
```

**Arguments**

* **expression1**: Expression to compare and return if equal to expression2. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression2**: Expression to compare to expression1. Can be a constant, column, or function, and any combination of arithmetic operators.

## `nvl`

Returns *expression2* if *expression1* is NULL; otherwise it returns *expression1*.

```plaintext
nvl(expression1, expression2)
```

**Arguments**

* **expression1**: return if expression1 not is NULL. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression2**: return if expression1 is NULL. Can be a constant, column, or function, and any combination of arithmetic operators.

## `nvl2`

Returns *expression2* if *expression1* is not NULL; otherwise it returns *expression3*.

```plaintext
nvl2(expression1, expression2, expression3)
```

**Arguments**

* **expression1**: conditional expression. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression2**: return if expression1 is not NULL. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression3**: return if expression1 is NULL. Can be a constant, column, or function, and any combination of arithmetic operators.

## `ifnull`

*Alias of [nvl](#nvl).*

</page>

<page>
---
title: Hashing functions · Cloudflare Pipelines Docs
description: Scalar functions for hashing values
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/hashing/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/hashing/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `digest`

Computes the binary hash of an expression using the specified algorithm.

```plaintext
digest(expression, algorithm)
```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

* **algorithm**: String expression specifying algorithm to use. Must be one of:

  * md5
  * sha224
  * sha256
  * sha384
  * sha512
  * blake2s
  * blake2b
  * blake3

## `md5`

Computes an MD5 128-bit checksum for a string expression.

```plaintext
md5(expression)
```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha224`

Computes the SHA-224 hash of a binary string.

```plaintext
sha224(expression)
```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha256`

Computes the SHA-256 hash of a binary string.

```plaintext
sha256(expression)
```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha384`

Computes the SHA-384 hash of a binary string.

```plaintext
sha384(expression)
```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha512`

Computes the SHA-512 hash of a binary string.

```plaintext
sha512(expression)
```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

</page>

<page>
---
title: JSON functions · Cloudflare Pipelines Docs
description: Scalar functions for manipulating JSON
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/json/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/json/index.md
---

Cloudflare Pipelines provides two set of JSON functions, the first based on PostgreSQL's SQL functions and syntax, and the second based on the [JSONPath](https://jsonpath.com/) standard.

## SQL functions

The SQL functions provide basic JSON parsing functions similar to those found in PostgreSQL.

### json\_contains

Returns `true` if the JSON string contains the specified key(s).

```sql
SELECT json_contains('{"a": 1, "b": 2, "c": 3}', 'a') FROM source;
true
```

Also available via the `?` operator:

```sql
SELECT '{"a": 1, "b": 2, "c": 3}' ? 'a' FROM source;
true
```

### json\_get

Retrieves the value from a JSON string by the specified path (keys). Returns the value as its native type (string, int, etc.).

```sql
SELECT json_get('{"a": {"b": 2}}', 'a', 'b') FROM source;
2
```

Also available via the `->` operator:

```sql
SELECT '{"a": {"b": 2}}'->'a'->'b' FROM source;
2
```

Various permutations of `json_get` functions are available for retrieving values as a specific type, or you can use SQL type annotations:

```sql
SELECT json_get('{"a": {"b": 2}}', 'a', 'b')::int FROM source;
2
```

### json\_get\_str

Retrieves a string value from a JSON string by the specified path. Returns an empty string if the value does not exist or is not a string.

```sql
SELECT json_get_str('{"a": {"b": "hello"}}', 'a', 'b') FROM source;
"hello"
```

### json\_get\_int

Retrieves an integer value from a JSON string by the specified path. Returns `0` if the value does not exist or is not an integer.

```sql
SELECT json_get_int('{"a": {"b": 42}}', 'a', 'b') FROM source;
42
```

### json\_get\_float

Retrieves a float value from a JSON string by the specified path. Returns `0.0` if the value does not exist or is not a float.

```sql
SELECT json_get_float('{"a": {"b": 3.14}}', 'a', 'b') FROM source;
3.14
```

### json\_get\_bool

Retrieves a boolean value from a JSON string by the specified path. Returns `false` if the value does not exist or is not a boolean.

```sql
SELECT json_get_bool('{"a": {"b": true}}', 'a', 'b') FROM source;
true
```

### json\_get\_json

Retrieves a nested JSON string from a JSON string by the specified path. The value is returned as raw JSON.

```sql
SELECT json_get_json('{"a": {"b": {"c": 1}}}', 'a', 'b') FROM source;
'{"c": 1}'
```

### json\_as\_text

Retrieves any value from a JSON string by the specified path and returns it as a string, regardless of the original type.

```sql
SELECT json_as_text('{"a": {"b": 42}}', 'a', 'b') FROM source;
"42"
```

Also available via the `->>` operator:

```sql
SELECT '{"a": {"b": 42}}'->>'a'->>'b' FROM source;
"42"
```

### json\_length

Returns the length of a JSON object or array at the specified path. Returns `0` if the path does not exist or is not an object/array.

```sql
SELECT json_length('{"a": [1, 2, 3]}', 'a') FROM source;
3
```

## Json path functions

JSON functions provide basic json parsing functions using [JsonPath](https://goessner.net/articles/JsonPath/), an evolving standard for querying JSON objects.

### extract\_json

Returns the JSON elements in the first argument that match the JsonPath in the second argument. The returned value is an array of json strings.

```sql
SELECT extract_json('{"a": 1, "b": 2, "c": 3}', '$.a') FROM source;
['1']
```

### extract\_json\_string

Returns an unescaped String for the first item matching the JsonPath, if it is a string.

```sql
SELECT extract_json_string('{"a": "a", "b": 2, "c": 3}', '$.a') FROM source;
'a'
```

</page>

<page>
---
title: Math functions · Cloudflare Pipelines Docs
description: Scalar functions for mathematical operations
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/math/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/math/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `abs`

Returns the absolute value of a number.

```plaintext
abs(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `acos`

Returns the arc cosine or inverse cosine of a number.

```plaintext
acos(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `acosh`

Returns the area hyperbolic cosine or inverse hyperbolic cosine of a number.

```plaintext
acosh(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `asin`

Returns the arc sine or inverse sine of a number.

```plaintext
asin(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `asinh`

Returns the area hyperbolic sine or inverse hyperbolic sine of a number.

```plaintext
asinh(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `atan`

Returns the arc tangent or inverse tangent of a number.

```plaintext
atan(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `atanh`

Returns the area hyperbolic tangent or inverse hyperbolic tangent of a number.

```plaintext
atanh(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `atan2`

Returns the arc tangent or inverse tangent of `expression_y / expression_x`.

```plaintext
atan2(expression_y, expression_x)
```

**Arguments**

* **expression\_y**: First numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_x**: Second numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `cbrt`

Returns the cube root of a number.

```plaintext
cbrt(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `ceil`

Returns the nearest integer greater than or equal to a number.

```plaintext
ceil(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `cos`

Returns the cosine of a number.

```plaintext
cos(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `cosh`

Returns the hyperbolic cosine of a number.

```plaintext
cosh(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `degrees`

Converts radians to degrees.

```plaintext
degrees(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `exp`

Returns the base-e exponential of a number.

```plaintext
exp(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to use as the exponent. Can be a constant, column, or function, and any combination of arithmetic operators.

## `factorial`

Factorial. Returns 1 if value is less than 2.

```plaintext
factorial(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `floor`

Returns the nearest integer less than or equal to a number.

```plaintext
floor(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `gcd`

Returns the greatest common divisor of `expression_x` and `expression_y`. Returns 0 if both inputs are zero.

```plaintext
gcd(expression_x, expression_y)
```

**Arguments**

* **expression\_x**: First numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_y**: Second numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `isnan`

Returns true if a given number is +NaN or -NaN otherwise returns false.

```plaintext
isnan(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `iszero`

Returns true if a given number is +0.0 or -0.0 otherwise returns false.

```plaintext
iszero(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `lcm`

Returns the least common multiple of `expression_x` and `expression_y`. Returns 0 if either input is zero.

```plaintext
lcm(expression_x, expression_y)
```

**Arguments**

* **expression\_x**: First numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_y**: Second numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `ln`

Returns the natural logarithm of a number.

```plaintext
ln(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `log`

Returns the base-x logarithm of a number. Can either provide a specified base, or if omitted then takes the base-10 of a number.

```plaintext
log(base, numeric_expression)
log(numeric_expression)
```

**Arguments**

* **base**: Base numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `log10`

Returns the base-10 logarithm of a number.

```plaintext
log10(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `log2`

Returns the base-2 logarithm of a number.

```plaintext
log2(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `nanvl`

Returns the first argument if it's not *NaN*. Returns the second argument otherwise.

```plaintext
nanvl(expression_x, expression_y)
```

**Arguments**

* **expression\_x**: Numeric expression to return if it's not *NaN*. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_y**: Numeric expression to return if the first expression is *NaN*. Can be a constant, column, or function, and any combination of arithmetic operators.

## `pi`

Returns an approximate value of π.

```plaintext
pi()
```

## `power`

Returns a base expression raised to the power of an exponent.

```plaintext
power(base, exponent)
```

**Arguments**

* **base**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **exponent**: Exponent numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

**Aliases**

* pow

## `pow`

*Alias of [power](#power).*

## `radians`

Converts degrees to radians.

```plaintext
radians(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `random`

Returns a random float value in the range \[0, 1). The random seed is unique to each row.

```plaintext
random()
```

## `round`

Rounds a number to the nearest integer.

```plaintext
round(numeric_expression[, decimal_places])
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **decimal\_places**: Optional. The number of decimal places to round to. Defaults to 0.

## `signum`

Returns the sign of a number. Negative numbers return `-1`. Zero and positive numbers return `1`.

```plaintext
signum(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `sin`

Returns the sine of a number.

```plaintext
sin(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `sinh`

Returns the hyperbolic sine of a number.

```plaintext
sinh(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `sqrt`

Returns the square root of a number.

```plaintext
sqrt(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `tan`

Returns the tangent of a number.

```plaintext
tan(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `tanh`

Returns the hyperbolic tangent of a number.

```plaintext
tanh(numeric_expression)
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `trunc`

Truncates a number to a whole number or truncated to the specified decimal places.

```plaintext
trunc(numeric_expression[, decimal_places])
```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

* **decimal\_places**: Optional. The number of decimal places to truncate to. Defaults to 0 (truncate to a whole number). If `decimal_places` is a positive integer, truncates digits to the right of the decimal point. If `decimal_places` is a negative integer, replaces digits to the left of the decimal point with `0`.

</page>

<page>
---
title: Other functions · Cloudflare Pipelines Docs
description: Miscellaneous scalar functions
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/other/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/other/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `arrow_cast`

Casts a value to a specific Arrow data type:

```plaintext
arrow_cast(expression, datatype)
```

**Arguments**

* **expression**: Expression to cast. Can be a constant, column, or function, and any combination of arithmetic or string operators.
* **datatype**: [Arrow data type](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html) name to cast to, as a string. The format is the same as that returned by \[`arrow_typeof`]

**Example**

```plaintext
> select arrow_cast(-5, 'Int8') as a,
  arrow_cast('foo', 'Dictionary(Int32, Utf8)') as b,
  arrow_cast('bar', 'LargeUtf8') as c,
  arrow_cast('2023-01-02T12:53:02', 'Timestamp(Microsecond, Some("+08:00"))') as d
  ;
+----+-----+-----+---------------------------+
| a  | b   | c   | d                         |
+----+-----+-----+---------------------------+
| -5 | foo | bar | 2023-01-02T12:53:02+08:00 |
+----+-----+-----+---------------------------+
1 row in set. Query took 0.001 seconds.
```

## `arrow_typeof`

Returns the name of the underlying [Arrow data type](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html) of the expression:

```plaintext
arrow_typeof(expression)
```

**Arguments**

* **expression**: Expression to evaluate. Can be a constant, column, or function, and any combination of arithmetic or string operators.

**Example**

```plaintext
> select arrow_typeof('foo'), arrow_typeof(1);
+---------------------------+------------------------+
| arrow_typeof(Utf8("foo")) | arrow_typeof(Int64(1)) |
+---------------------------+------------------------+
| Utf8                      | Int64                  |
+---------------------------+------------------------+
1 row in set. Query took 0.001 seconds.
```

</page>

<page>
---
title: Regex functions · Cloudflare Pipelines Docs
description: Scalar functions for regular expressions
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/regex/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/regex/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

Cloudflare Pipelines uses a [PCRE-like](https://en.wikibooks.org/wiki/Regular_Expressions/Perl-Compatible_Regular_Expressions) regular expression [syntax](https://docs.rs/regex/latest/regex/#syntax) (minus support for several features including look-around and backreferences).

## `regexp_like`

Returns true if a [regular expression](https://docs.rs/regex/latest/regex/#syntax) has at least one match in a string, false otherwise.

```plaintext
regexp_like(str, regexp[, flags])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

* **regexp**: Regular expression to test against the string expression. Can be a constant, column, or function.

* **flags**: Optional regular expression flags that control the behavior of the regular expression. The following flags are supported:

  * **i**: case-insensitive: letters match both upper and lower case
  * **m**: multi-line mode: ^ and $ match begin/end of line
  * **s**: allow . to match \n
  * **R**: enables CRLF mode: when multi-line mode is enabled, \r\n is used
  * **U**: swap the meaning of x\* and x\*?

**Example**

```sql
select regexp_like('Köln', '[a-zA-Z]ö[a-zA-Z]{2}');
+--------------------------------------------------------+
| regexp_like(Utf8("Köln"),Utf8("[a-zA-Z]ö[a-zA-Z]{2}")) |
+--------------------------------------------------------+
| true                                                   |
+--------------------------------------------------------+
SELECT regexp_like('aBc', '(b|d)', 'i');
+--------------------------------------------------+
| regexp_like(Utf8("aBc"),Utf8("(b|d)"),Utf8("i")) |
+--------------------------------------------------+
| true                                             |
+--------------------------------------------------+
```

Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/regexp.rs)

## `regexp_match`

Returns a list of [regular expression](https://docs.rs/regex/latest/regex/#syntax) matches in a string.

```plaintext
regexp_match(str, regexp[, flags])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

* **regexp**: Regular expression to match against. Can be a constant, column, or function.

* **flags**: Optional regular expression flags that control the behavior of the regular expression. The following flags are supported:

  * **i**: case-insensitive: letters match both upper and lower case
  * **m**: multi-line mode: ^ and $ match begin/end of line
  * **s**: allow . to match \n
  * **R**: enables CRLF mode: when multi-line mode is enabled, \r\n is used
  * **U**: swap the meaning of x\* and x\*?

**Example**

```sql
select regexp_match('Köln', '[a-zA-Z]ö[a-zA-Z]{2}');
+---------------------------------------------------------+
| regexp_match(Utf8("Köln"),Utf8("[a-zA-Z]ö[a-zA-Z]{2}")) |
+---------------------------------------------------------+
| [Köln]                                                  |
+---------------------------------------------------------+
SELECT regexp_match('aBc', '(b|d)', 'i');
+---------------------------------------------------+
| regexp_match(Utf8("aBc"),Utf8("(b|d)"),Utf8("i")) |
+---------------------------------------------------+
| [B]                                               |
+---------------------------------------------------+
```

Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/regexp.rs)

## `regexp_replace`

Replaces substrings in a string that match a [regular expression](https://docs.rs/regex/latest/regex/#syntax).

```plaintext
regexp_replace(str, regexp, replacement[, flags])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

* **regexp**: Regular expression to match against. Can be a constant, column, or function.

* **replacement**: Replacement string expression. Can be a constant, column, or function, and any combination of string operators.

* **flags**: Optional regular expression flags that control the behavior of the regular expression. The following flags are supported:

  * **g**: (global) Search globally and don't return after the first match
  * **i**: case-insensitive: letters match both upper and lower case
  * **m**: multi-line mode: ^ and $ match begin/end of line
  * **s**: allow . to match \n
  * **R**: enables CRLF mode: when multi-line mode is enabled, \r\n is used
  * **U**: swap the meaning of x\* and x\*?

**Example**

```sql
SELECT regexp_replace('foobarbaz', 'b(..)', 'X\\1Y', 'g');
+------------------------------------------------------------------------+
| regexp_replace(Utf8("foobarbaz"),Utf8("b(..)"),Utf8("X\1Y"),Utf8("g")) |
+------------------------------------------------------------------------+
| fooXarYXazY                                                            |
+------------------------------------------------------------------------+
SELECT regexp_replace('aBc', '(b|d)', 'Ab\\1a', 'i');
+-------------------------------------------------------------------+
| regexp_replace(Utf8("aBc"),Utf8("(b|d)"),Utf8("Ab\1a"),Utf8("i")) |
+-------------------------------------------------------------------+
| aAbBac                                                            |
+-------------------------------------------------------------------+
```

Additional examples can be found [here](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/regexp.rs)

## `position`

Returns the position of `substr` in `origstr` (counting from 1). If `substr` does not appear in `origstr`, return 0.

```plaintext
position(substr in origstr)
```

**Arguments**

* **substr**: The pattern string.
* **origstr**: The model string.

</page>

<page>
---
title: String functions · Cloudflare Pipelines Docs
description: Scalar functions for manipulating strings
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/string/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/string/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `ascii`

Returns the ASCII value of the first character in a string.

```plaintext
ascii(str)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**: [chr](#chr)

## `bit_length`

Returns the bit length of a string.

```plaintext
bit_length(str)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**: [length](#length), [octet\_length](#octet_length)

## `btrim`

Trims the specified trim string from the start and end of a string. If no trim string is provided, all whitespace is removed from the start and end of the input string.

```plaintext
btrim(str[, trim_str])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **trim\_str**: String expression to trim from the beginning and end of the input string. Can be a constant, column, or function, and any combination of arithmetic operators. *Default is whitespace characters.*

**Related functions**: [ltrim](#ltrim), [rtrim](#rtrim)

**Aliases**

* trim

## `char_length`

*Alias of [length](#length).*

## `character_length`

*Alias of [length](#length).*

## `concat`

Concatenates multiple strings together.

```plaintext
concat(str[, ..., str_n])
```

**Arguments**

* **str**: String expression to concatenate. Can be a constant, column, or function, and any combination of string operators.
* **str\_n**: Subsequent string column or literal string to concatenate.

**Related functions**: [concat\_ws](#concat_ws)

## `concat_ws`

Concatenates multiple strings together with a specified separator.

```plaintext
concat(separator, str[, ..., str_n])
```

**Arguments**

* **separator**: Separator to insert between concatenated strings.
* **str**: String expression to concatenate. Can be a constant, column, or function, and any combination of string operators.
* **str\_n**: Subsequent string column or literal string to concatenate.

**Related functions**: [concat](#concat)

## `chr`

Returns the character with the specified ASCII or Unicode code value.

```plaintext
chr(expression)
```

**Arguments**

* **expression**: Expression containing the ASCII or Unicode code value to operate on. Can be a constant, column, or function, and any combination of arithmetic or string operators.

**Related functions**: [ascii](#ascii)

## `ends_with`

Tests if a string ends with a substring.

```plaintext
ends_with(str, substr)
```

**Arguments**

* **str**: String expression to test. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring to test for.

## `initcap`

Capitalizes the first character in each word in the input string. Words are delimited by non-alphanumeric characters.

```plaintext
initcap(str)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**: [lower](#lower), [upper](#upper)

## `instr`

*Alias of [strpos](#strpos).*

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring expression to search for. Can be a constant, column, or function, and any combination of string operators.

## `left`

Returns a specified number of characters from the left side of a string.

```plaintext
left(str, n)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: Number of characters to return.

**Related functions**: [right](#right)

## `length`

Returns the number of characters in a string.

```plaintext
length(str)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Aliases**

* char\_length
* character\_length

**Related functions**: [bit\_length](#bit_length), [octet\_length](#octet_length)

## `lower`

Converts a string to lower-case.

```plaintext
lower(str)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**: [initcap](#initcap), [upper](#upper)

## `lpad`

Pads the left side of a string with another string to a specified string length.

```plaintext
lpad(str, n[, padding_str])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: String length to pad to.
* **padding\_str**: String expression to pad with. Can be a constant, column, or function, and any combination of string operators. *Default is a space.*

**Related functions**: [rpad](#rpad)

## `ltrim`

Trims the specified trim string from the beginning of a string. If no trim string is provided, all whitespace is removed from the start of the input string.

```plaintext
ltrim(str[, trim_str])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **trim\_str**: String expression to trim from the beginning of the input string. Can be a constant, column, or function, and any combination of arithmetic operators. *Default is whitespace characters.*

**Related functions**: [btrim](#btrim), [rtrim](#rtrim)

## `octet_length`

Returns the length of a string in bytes.

```plaintext
octet_length(str)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**: [bit\_length](#bit_length), [length](#length)

## `repeat`

Returns a string with an input string repeated a specified number.

```plaintext
repeat(str, n)
```

**Arguments**

* **str**: String expression to repeat. Can be a constant, column, or function, and any combination of string operators.
* **n**: Number of times to repeat the input string.

## `replace`

Replaces all occurrences of a specified substring in a string with a new substring.

```plaintext
replace(str, substr, replacement)
```

**Arguments**

* **str**: String expression to repeat. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring expression to replace in the input string. Can be a constant, column, or function, and any combination of string operators.
* **replacement**: Replacement substring expression. Can be a constant, column, or function, and any combination of string operators.

## `reverse`

Reverses the character order of a string.

```plaintext
reverse(str)
```

**Arguments**

* **str**: String expression to repeat. Can be a constant, column, or function, and any combination of string operators.

## `right`

Returns a specified number of characters from the right side of a string.

```plaintext
right(str, n)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: Number of characters to return.

**Related functions**: [left](#left)

## `rpad`

Pads the right side of a string with another string to a specified string length.

```plaintext
rpad(str, n[, padding_str])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: String length to pad to.
* **padding\_str**: String expression to pad with. Can be a constant, column, or function, and any combination of string operators. *Default is a space.*

**Related functions**: [lpad](#lpad)

## `rtrim`

Trims the specified trim string from the end of a string. If no trim string is provided, all whitespace is removed from the end of the input string.

```plaintext
rtrim(str[, trim_str])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **trim\_str**: String expression to trim from the end of the input string. Can be a constant, column, or function, and any combination of arithmetic operators. *Default is whitespace characters.*

**Related functions**: [btrim](#btrim), [ltrim](#ltrim)

## `split_part`

Splits a string based on a specified delimiter and returns the substring in the specified position.

```plaintext
split_part(str, delimiter, pos)
```

**Arguments**

* **str**: String expression to spit. Can be a constant, column, or function, and any combination of string operators.
* **delimiter**: String or character to split on.
* **pos**: Position of the part to return.

## `starts_with`

Tests if a string starts with a substring.

```plaintext
starts_with(str, substr)
```

**Arguments**

* **str**: String expression to test. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring to test for.

## `strpos`

Returns the starting position of a specified substring in a string. Positions begin at 1. If the substring does not exist in the string, the function returns 0.

```plaintext
strpos(str, substr)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring expression to search for. Can be a constant, column, or function, and any combination of string operators.

**Aliases**

* instr

## `substr`

Extracts a substring of a specified number of characters from a specific starting position in a string.

```plaintext
substr(str, start_pos[, length])
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **start\_pos**: Character position to start the substring at. The first character in the string has a position of 1.
* **length**: Number of characters to extract. If not specified, returns the rest of the string after the start position.

## `translate`

Translates characters in a string to specified translation characters.

```plaintext
translate(str, chars, translation)
```

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **chars**: Characters to translate.
* **translation**: Translation characters. Translation characters replace only characters at the same position in the **chars** string.

## `to_hex`

Converts an integer to a hexadecimal string.

```plaintext
to_hex(int)
```

**Arguments**

* **int**: Integer expression to convert. Can be a constant, column, or function, and any combination of arithmetic operators.

## `trim`

*Alias of [btrim](#btrim).*

## `upper`

Converts a string to upper-case.

```plaintext
upper(str)
```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**: [initcap](#initcap), [lower](#lower)

## `uuid`

Returns UUID v4 string value which is unique per row.

```plaintext
uuid()
```

## `overlay`

Returns the string which is replaced by another string from the specified position and specified count length. For example, `overlay('Txxxxas' placing 'hom' from 2 for 4) → Thomas`

```plaintext
overlay(str PLACING substr FROM pos [FOR count])
```

**Arguments**

* **str**: String expression to operate on.
* **substr**: the string to replace part of str.
* **pos**: the start position to replace of str.
* **count**: the count of characters to be replaced from start position of str. If not specified, will use substr length instead.

## `levenshtein`

Returns the Levenshtein distance between the two given strings. For example, `levenshtein('kitten', 'sitting') = 3`

```plaintext
levenshtein(str1, str2)
```

**Arguments**

* **str1**: String expression to compute Levenshtein distance with str2.
* **str2**: String expression to compute Levenshtein distance with str1.

## `substr_index`

Returns the substring from str before count occurrences of the delimiter delim. If count is positive, everything to the left of the final delimiter (counting from the left) is returned. If count is negative, everything to the right of the final delimiter (counting from the right) is returned. For example, `substr_index('www.apache.org', '.', 1) = www`, `substr_index('www.apache.org', '.', -1) = org`

```plaintext
substr_index(str, delim, count)
```

**Arguments**

* **str**: String expression to operate on.
* **delim**: the string to find in str to split str.
* **count**: The number of times to search for the delimiter. Can be both a positive or negative number.

## `find_in_set`

Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings. For example, `find_in_set('b', 'a,b,c,d') = 2`

```plaintext
find_in_set(str, strlist)
```

**Arguments**

* **str**: String expression to find in strlist.
* **strlist**: A string list is a string composed of substrings separated by , characters.

</page>

<page>
---
title: Struct functions · Cloudflare Pipelines Docs
description: Scalar functions for manipulating structs
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/struct/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/struct/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `struct`

Returns an Arrow struct using the specified input expressions. Fields in the returned struct use the `cN` naming convention. For example: `c0`, `c1`, `c2`, etc.

```plaintext
struct(expression1[, ..., expression_n])
```

For example, this query converts two columns `a` and `b` to a single column with a struct type of fields `c0` and `c1`:

```plaintext
select * from t;
+---+---+
| a | b |
+---+---+
| 1 | 2 |
| 3 | 4 |
+---+---+


select struct(a, b) from t;
+-----------------+
| struct(t.a,t.b) |
+-----------------+
| {c0: 1, c1: 2}  |
| {c0: 3, c1: 4}  |
+-----------------+
```

#### Arguments

* **expression\_n**: Expression to include in the output struct. Can be a constant, column, or function, and any combination of arithmetic or string operators.

</page>

<page>
---
title: Time and date functions · Cloudflare Pipelines Docs
description: Scalar functions for handling times and dates
lastUpdated: 2025-09-25T04:07:16.000Z
chatbotDeprioritize: false
source_url:
  html: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/time-and-date/
  md: https://developers.cloudflare.com/pipelines/sql-reference/scalar-functions/time-and-date/index.md
---

*Cloudflare Pipelines scalar function implementations are based on [Apache DataFusion](https://arrow.apache.org/datafusion/) (via [Arroyo](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference.*

## `date_bin`

Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.

For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.

```plaintext
date_bin(interval, expression, origin-timestamp)
```

**Arguments**

* **interval**: Bin interval.
* **expression**: Time expression to operate on. Can be a constant, column, or function.
* **origin-timestamp**: Optional. Starting point used to determine bin boundaries. If not specified defaults `1970-01-01T00:00:00Z` (the UNIX epoch in UTC).

The following intervals are supported:

* nanoseconds
* microseconds
* milliseconds
* seconds
* minutes
* hours
* days
* weeks
* months
* years
* century

## `date_trunc`

Truncates a timestamp value to a specified precision.

```plaintext
date_trunc(precision, expression)
```

**Arguments**

* **precision**: Time precision to truncate to. The following precisions are supported:

  * year / YEAR
  * quarter / QUARTER
  * month / MONTH
  * week / WEEK
  * day / DAY
  * hour / HOUR
  * minute / MINUTE
  * second / SECOND

* **expression**: Time expression to operate on. Can be a constant, column, or function.

**Aliases**

* datetrunc

## `datetrunc`

*Alias of [date\_trunc](#date_trunc).*

## `date_part`

Returns the specified part of the date as an integer.

```plaintext
date_part(part, expression)
```

**Arguments**

* **part**: Part of the date to return. The following date parts are supported:

  * year
  * quarter *(emits value in inclusive range \[1, 4] based on which quartile of the year the date is in)*
  * month
  * week *(week of the year)*
  * day *(day of the month)*
  * hour
  * minute
  * second
  * millisecond
  * microsecond
  * nanosecond
  * dow *(day of the week)*
  * doy *(day of the year)*
  * epoch *(seconds since Unix epoch)*

* **expression**: Time expression to operate on. Can be a constant, column, or function.

**Aliases**

* datepart

## `datepart`

*Alias of [date\_part](#date_part).*

## `extract`

Returns a sub-field from a time value as an integer.

```plaintext
extract(field FROM source)
```

Equivalent to calling `date_part('field', source)`. For example, these are equivalent:

```sql
extract(day FROM '2024-04-13'::date)
date_part('day', '2024-04-13'::date)
```

See [date\_part](#date_part).

## `make_date`

Make a date from year/month/day component parts.

```plaintext
make_date(year, month, day)
```

**Arguments**

* **year**: Year to use when making the date. Can be a constant, column or function, and any combination of arithmetic operators.
* **month**: Month to use when making the date. Can be a constant, column or function, and any combination of arithmetic operators.
* **day**: Day to use when making the date. Can be a constant, column or function, and any combination of arithmetic operators.

**Example**

```plaintext
> select make_date(2023, 1, 31);
+-------------------------------------------+
| make_date(Int64(2023),Int64(1),Int64(31)) |
+-------------------------------------------+
| 2023-01-31                                |
+-------------------------------------------+
> select make_date('2023', '01', '31');
+-----------------------------------------------+
| make_date(Utf8("2023"),Utf8("01"),Utf8("31")) |
+-----------------------------------------------+
| 2023-01-31                                    |
+-----------------------------------------------+
```

## `to_char`

Returns a string representation of a date, time, timestamp or duration based on a [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html). Unlike the PostgreSQL equivalent of this function numerical formatting is not supported.

```plaintext
to_char(expression, format)
```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function that results in a date, time, timestamp or duration.
* **format**: A [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) string to use to convert the expression.

**Example**

```plaintext
> > select to_char('2023-03-01'::date, '%d-%m-%Y');
+----------------------------------------------+
| to_char(Utf8("2023-03-01"),Utf8("%d-%m-%Y")) |
+----------------------------------------------+
| 01-03-2023                                   |
+----------------------------------------------+
```

**Aliases**

* date\_format

## `to_timestamp`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00Z`). Supports strings, integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no \[Chrono formats] are provided. Integers, unsigned integers, and doubles are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

Note: `to_timestamp` returns `Timestamp(Nanosecond)`. The supported range for integer input is between `-9223372037` and `9223372036`. Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds` for the input outside of supported bounds.

```plaintext
to_timestamp(expression[, ..., format_n])
```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```plaintext
> select to_timestamp('2023-01-31T09:26:56.123456789-05:00');
+-----------------------------------------------------------+
| to_timestamp(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
+-----------------------------------------------------------+
| 2023-01-31T14:26:56.123456789                             |
+-----------------------------------------------------------+
> select to_timestamp('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
+--------------------------------------------------------------------------------------------------------+
| to_timestamp(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
+--------------------------------------------------------------------------------------------------------+
| 2023-05-17T03:59:00.123456789                                                                          |
+--------------------------------------------------------------------------------------------------------+
```

## `to_timestamp_millis`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as milliseconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

```plaintext
to_timestamp_millis(expression[, ..., format_n])
```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```plaintext
> select to_timestamp_millis('2023-01-31T09:26:56.123456789-05:00');
+------------------------------------------------------------------+
| to_timestamp_millis(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
+------------------------------------------------------------------+
| 2023-01-31T14:26:56.123                                          |
+------------------------------------------------------------------+
> select to_timestamp_millis('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
+---------------------------------------------------------------------------------------------------------------+
| to_timestamp_millis(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
+---------------------------------------------------------------------------------------------------------------+
| 2023-05-17T03:59:00.123                                                                                       |
+---------------------------------------------------------------------------------------------------------------+
```

## `to_timestamp_micros`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as microseconds since the unix epoch (`1970-01-01T00:00:00Z`) Returns the corresponding timestamp.

```plaintext
to_timestamp_micros(expression[, ..., format_n])
```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```plaintext
> select to_timestamp_micros('2023-01-31T09:26:56.123456789-05:00');
+------------------------------------------------------------------+
| to_timestamp_micros(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
+------------------------------------------------------------------+
| 2023-01-31T14:26:56.123456                                       |
+------------------------------------------------------------------+
> select to_timestamp_micros('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
+---------------------------------------------------------------------------------------------------------------+
| to_timestamp_micros(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
+---------------------------------------------------------------------------------------------------------------+
| 2023-05-17T03:59:00.123456                                                                                    |
+---------------------------------------------------------------------------------------------------------------+
```

## `to_timestamp_nanos`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no \[Chrono formats] are provided. Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

```plaintext
to_timestamp_nanos(expression[, ..., format_n])
```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```plaintext
> select to_timestamp_nanos('2023-01-31T09:26:56.123456789-05:00');
+-----------------------------------------------------------------+
| to_timestamp_nanos(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
+-----------------------------------------------------------------+
| 2023-01-31T14:26:56.123456789                                   |
+-----------------------------------------------------------------+
> select to_timestamp_nanos('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
+--------------------------------------------------------------------------------------------------------------+
| to_timestamp_nanos(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
+--------------------------------------------------------------------------------------------------------------+
| 2023-05-17T03:59:00.123456789                                                                                |
+---------------------------------------------------------------------------------------------------------------+
```

## `to_timestamp_seconds`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

```plaintext
to_timestamp_seconds(expression[, ..., format_n])
```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```plaintext
> select to_timestamp_seconds('2023-01-31T09:26:56.123456789-05:00');
+-------------------------------------------------------------------+
| to_timestamp_seconds(Utf8("2023-01-31T09:26:56.123456789-05:00")) |
+-------------------------------------------------------------------+
| 2023-01-31T14:26:56                                               |
+-------------------------------------------------------------------+
> select to_timestamp_seconds('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
+----------------------------------------------------------------------------------------------------------------+
| to_timestamp_seconds(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |
+----------------------------------------------------------------------------------------------------------------+
| 2023-05-17T03:59:00                                                                                            |
+----------------------------------------------------------------------------------------------------------------+
```

## `from_unixtime`

Converts an integer to RFC3339 timestamp format (`YYYY-MM-DDT00:00:00.000000000Z`). Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp.

```plaintext
from_unixtime(expression)
```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `now`

Returns the UTC timestamp at pipeline start.

The now() return value is determined at query compilation time, and will be constant across the execution of the pipeline.

</page>

