Getting started
This guide will instruct you through:
- Creating an API token needed for pipelines to authenticate with your data catalog.
- Creating your first pipeline with a simple ecommerce schema that writes to an Apache Iceberg ↗ table managed by R2 Data Catalog.
- Sending sample ecommerce data via HTTP endpoint.
- Validating data in your bucket and querying it with R2 SQL.
- Sign up for a Cloudflare account ↗.
- Install
Node.js↗.
Node.js version manager
Use a Node version manager like Volta ↗ or nvm ↗ to avoid permission issues and change Node.js versions. Wrangler, discussed later in this guide, requires a Node version of 16.17.0 or later.
Pipelines must authenticate to R2 Data Catalog with an R2 API token that has catalog and R2 permissions.
-
In the Cloudflare dashboard, go to the R2 object storage page.
Go to Overview -
Select Manage API tokens.
-
Select Create Account API token.
-
Give your API token a name.
-
Under Permissions, select the Admin Read & Write permission.
-
Select Create Account API Token.
-
Note the Token value.
First, create a schema file that defines your ecommerce data structure:
Create schema.json:
{ "fields": [ { "name": "user_id", "type": "string", "required": true }, { "name": "event_type", "type": "string", "required": true }, { "name": "product_id", "type": "string", "required": false }, { "name": "amount", "type": "float64", "required": false } ]}Use the interactive setup to create a pipeline that writes to R2 Data Catalog:
npx wrangler pipelines setupFollow the prompts:
-
Pipeline name: Enter
ecommerce -
Stream configuration:
- Enable HTTP endpoint:
yes - Require authentication:
no(for simplicity) - Configure custom CORS origins:
no - Schema definition:
Load from file - Schema file path:
schema.json(or your file path)
- Enable HTTP endpoint:
-
Sink configuration:
- Destination type:
Data Catalog (Iceberg) - Setup mode:
Simple (recommended defaults) - R2 bucket name:
pipelines-tutorial(created automatically if it does not exist) - Table name:
ecommerce - Catalog API token: Enter your token from step 1
- Destination type:
-
Review: Confirm the summary and select
Create resources -
SQL transformation: Choose
Simple ingestion (SELECT * FROM stream)
Advanced mode options
If you select Advanced instead of Simple during sink configuration, you can customize the following additional options:
- Format: Output file format (for example, Parquet)
- Compression: Compression algorithm (for example, zstd)
- Rolling policy: File size threshold (minimum 5 MB) and time interval (minimum 10 seconds) for creating new files
- Credentials: Choose between automatic credential generation or manually entering R2 credentials
- Namespace: Data Catalog namespace (defaults to
default)
After setup completes, the command outputs a configuration snippet for your Wrangler file, a Worker binding example with sample data, and a curl command for the HTTP endpoint. Note the HTTP endpoint URL and the pipelines configuration for use in the following steps.
You can also pre-set the pipeline name using the --name flag:
npx wrangler pipelines setup --name ecommerce-
In the Cloudflare dashboard, go to R2 object storage.
Go to Overview -
Select Create bucket and enter the bucket name:
pipelines-tutorial. -
Select Create bucket.
-
Select the bucket, switch to the Settings tab, scroll down to R2 Data Catalog, and select Enable.
-
Once enabled, note the Catalog URI and Warehouse name.
-
Go to Pipelines > Pipelines.
Go to Pipelines -
Select Create Pipeline.
-
Connect to a Stream:
- Pipeline name:
ecommerce - Enable HTTP endpoint for sending data: Enabled
- HTTP authentication: Disabled (default)
- Select Next
- Pipeline name:
-
Define Input Schema:
- Select JSON editor
- Copy in the schema:
{"fields": [{"name": "user_id","type": "string","required": true},{"name": "event_type","type": "string","required": true},{"name": "product_id","type": "string","required": false},{"name": "amount","type": "float64","required": false}]}
- Select Next
-
Define Sink:
- Select your R2 bucket:
pipelines-tutorial - Storage type: R2 Data Catalog
- Namespace:
default - Table name:
ecommerce - Advanced Settings: Change Maximum Time Interval to
10 seconds - Select Next
- Select your R2 bucket:
-
Credentials:
- Disable Automatically create an Account API token for your sink
- Enter Catalog Token from step 1
- Select Next
-
Pipeline Definition:
- Leave the default SQL query:
INSERT INTO ecommerce_sink SELECT * FROM ecommerce_stream;
- Select Create Pipeline
- Leave the default SQL query:
-
After pipeline creation, note the Stream ID for the next step.
Send ecommerce events to your pipeline's HTTP endpoint:
curl -X POST https://{stream-id}.ingest.cloudflare.com \ -H "Content-Type: application/json" \ -d '[ { "user_id": "user_12345", "event_type": "purchase", "product_id": "widget-001", "amount": 29.99 }, { "user_id": "user_67890", "event_type": "view_product", "product_id": "widget-002" }, { "user_id": "user_12345", "event_type": "add_to_cart", "product_id": "widget-003", "amount": 15.50 } ]'Replace {stream-id} with your actual stream endpoint from the pipeline setup.
-
In the Cloudflare dashboard, go to the R2 object storage page.
-
Select your bucket:
pipelines-tutorial. -
You should see Iceberg metadata files and data files created by your pipeline. If you are not seeing any files in your bucket, wait a couple of minutes and try again.
-
The data is organized in the Apache Iceberg format with metadata tracking table versions.
Set up your environment to use R2 SQL:
export WRANGLER_R2_SQL_AUTH_TOKEN=YOUR_API_TOKENOr create a .env file with:
WRANGLER_R2_SQL_AUTH_TOKEN=YOUR_API_TOKENWhere YOUR_API_TOKEN is the token you created in step 1. For more information on setting environment variables, refer to Wrangler system environment variables.
Query your data:
npx wrangler r2 sql query "YOUR_WAREHOUSE_NAME" "SELECT user_id, event_type, product_id, amountFROM default.ecommerceWHERE event_type = 'purchase'LIMIT 10"Replace YOUR_WAREHOUSE_NAME with the warehouse name noted during pipeline setup. You can find it in the Cloudflare dashboard under R2 object storage > your bucket > Settings > R2 Data Catalog.
You can also query this table with any engine that supports Apache Iceberg. To learn more about connecting other engines to R2 Data Catalog, refer to Connect to Iceberg engines.