Skip to content
Start here

Pipelines

[DEPRECATED] List Pipelines
Deprecated
client.pipelines.list(PipelineListParams { account_id, page, per_page, search } params, RequestOptionsoptions?): PipelineListResponse { result_info, results, success }
GET/accounts/{account_id}/pipelines
[DEPRECATED] Get Pipeline
Deprecated
client.pipelines.get(stringpipelineName, PipelineGetParams { account_id } params, RequestOptionsoptions?): PipelineGetResponse { id, destination, endpoint, 3 more }
GET/accounts/{account_id}/pipelines/{pipeline_name}
[DEPRECATED] Create Pipeline
Deprecated
client.pipelines.create(PipelineCreateParams { account_id, destination, name, source } params, RequestOptionsoptions?): PipelineCreateResponse { id, destination, endpoint, 3 more }
POST/accounts/{account_id}/pipelines
[DEPRECATED] Update Pipeline
Deprecated
client.pipelines.update(stringpipelineName, PipelineUpdateParams { account_id, destination, name, source } params, RequestOptionsoptions?): PipelineUpdateResponse { id, destination, endpoint, 3 more }
PUT/accounts/{account_id}/pipelines/{pipeline_name}
[DEPRECATED] Delete Pipeline
Deprecated
client.pipelines.delete(stringpipelineName, PipelineDeleteParams { account_id } params, RequestOptionsoptions?): void
DELETE/accounts/{account_id}/pipelines/{pipeline_name}
List Pipelines
client.pipelines.listV1(PipelineListV1Params { account_id, page, per_page } params, RequestOptionsoptions?): V4PagePaginationArray<PipelineListV1Response { id, created_at, modified_at, 3 more } >
GET/accounts/{account_id}/pipelines/v1/pipelines
Get Pipeline Details
client.pipelines.getV1(stringpipelineId, PipelineGetV1Params { account_id } params, RequestOptionsoptions?): PipelineGetV1Response { id, created_at, modified_at, 5 more }
GET/accounts/{account_id}/pipelines/v1/pipelines/{pipeline_id}
Create Pipeline
client.pipelines.createV1(PipelineCreateV1Params { account_id, name, sql } params, RequestOptionsoptions?): PipelineCreateV1Response { id, created_at, modified_at, 3 more }
POST/accounts/{account_id}/pipelines/v1/pipelines
Delete Pipelines
client.pipelines.deleteV1(stringpipelineId, PipelineDeleteV1Params { account_id } params, RequestOptionsoptions?): void
DELETE/accounts/{account_id}/pipelines/v1/pipelines/{pipeline_id}
Validate SQL
client.pipelines.validateSql(PipelineValidateSqlParams { account_id, sql } params, RequestOptionsoptions?): PipelineValidateSqlResponse { tables, graph }
POST/accounts/{account_id}/pipelines/v1/validate_sql
ModelsExpand Collapse
PipelineListResponse { result_info, results, success }
result_info: ResultInfo { count, page, per_page, total_count }
count: number

Indicates the number of items on current page.

page: number

Indicates the current page number.

per_page: number

Indicates the number of items per page.

total_count: number

Indicates the total number of items.

results: Array<Result>
id: string

Specifies the pipeline identifier.

destination: Destination { batch, compression, format, 2 more }
batch: Batch { max_bytes, max_duration_s, max_rows }
max_bytes: number

Specifies rough maximum size of files.

maximum100000000
minimum1000
max_duration_s: number

Specifies duration to wait to aggregate batches files.

maximum300
minimum0.25
max_rows: number

Specifies rough maximum number of rows per file.

maximum10000000
minimum100
compression: Compression { type }
type: "none" | "gzip" | "deflate"

Specifies the desired compression algorithm and format.

One of the following:
"none"
"gzip"
"deflate"
format: "json"

Specifies the format of data to deliver.

path: Path { bucket, filename, filepath, prefix }
bucket: string

Specifies the R2 Bucket to store files.

filename?: string

Specifies the name pattern to for individual data files.

filepath?: string

Specifies the name pattern for directory.

prefix?: string

Specifies the base directory within the bucket.

type: "r2"

Specifies the type of destination.

endpoint: string

Indicates the endpoint URL to send traffic.

name: string

Defines the name of the pipeline.

maxLength128
minLength1
source: Array<CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors } | CloudflarePipelinesWorkersPipelinesBindingSource { format, type } >
One of the following:
CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors }

[DEPRECATED] HTTP source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
authentication?: boolean

Specifies whether authentication is required to send to this pipeline via HTTP.

cors?: CORS { origins }
origins?: Array<string>

Specifies allowed origins to allow Cross Origin HTTP Requests.

CloudflarePipelinesWorkersPipelinesBindingSource { format, type }

[DEPRECATED] Worker binding source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
version: number

Indicates the version number of last saved configuration.

success: boolean

Indicates whether the API call was successful.

PipelineGetResponse { id, destination, endpoint, 3 more }

[DEPRECATED] Describes the configuration of a pipeline. Use the new streams/sinks/pipelines API instead.

id: string

Specifies the pipeline identifier.

destination: Destination { batch, compression, format, 2 more }
batch: Batch { max_bytes, max_duration_s, max_rows }
max_bytes: number

Specifies rough maximum size of files.

maximum100000000
minimum1000
max_duration_s: number

Specifies duration to wait to aggregate batches files.

maximum300
minimum0.25
max_rows: number

Specifies rough maximum number of rows per file.

maximum10000000
minimum100
compression: Compression { type }
type: "none" | "gzip" | "deflate"

Specifies the desired compression algorithm and format.

One of the following:
"none"
"gzip"
"deflate"
format: "json"

Specifies the format of data to deliver.

path: Path { bucket, filename, filepath, prefix }
bucket: string

Specifies the R2 Bucket to store files.

filename?: string

Specifies the name pattern to for individual data files.

filepath?: string

Specifies the name pattern for directory.

prefix?: string

Specifies the base directory within the bucket.

type: "r2"

Specifies the type of destination.

endpoint: string

Indicates the endpoint URL to send traffic.

name: string

Defines the name of the pipeline.

maxLength128
minLength1
source: Array<CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors } | CloudflarePipelinesWorkersPipelinesBindingSource { format, type } >
One of the following:
CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors }

[DEPRECATED] HTTP source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
authentication?: boolean

Specifies whether authentication is required to send to this pipeline via HTTP.

cors?: CORS { origins }
origins?: Array<string>

Specifies allowed origins to allow Cross Origin HTTP Requests.

CloudflarePipelinesWorkersPipelinesBindingSource { format, type }

[DEPRECATED] Worker binding source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
version: number

Indicates the version number of last saved configuration.

PipelineCreateResponse { id, destination, endpoint, 3 more }

[DEPRECATED] Describes the configuration of a pipeline. Use the new streams/sinks/pipelines API instead.

id: string

Specifies the pipeline identifier.

destination: Destination { batch, compression, format, 2 more }
batch: Batch { max_bytes, max_duration_s, max_rows }
max_bytes: number

Specifies rough maximum size of files.

maximum100000000
minimum1000
max_duration_s: number

Specifies duration to wait to aggregate batches files.

maximum300
minimum0.25
max_rows: number

Specifies rough maximum number of rows per file.

maximum10000000
minimum100
compression: Compression { type }
type: "none" | "gzip" | "deflate"

Specifies the desired compression algorithm and format.

One of the following:
"none"
"gzip"
"deflate"
format: "json"

Specifies the format of data to deliver.

path: Path { bucket, filename, filepath, prefix }
bucket: string

Specifies the R2 Bucket to store files.

filename?: string

Specifies the name pattern to for individual data files.

filepath?: string

Specifies the name pattern for directory.

prefix?: string

Specifies the base directory within the bucket.

type: "r2"

Specifies the type of destination.

endpoint: string

Indicates the endpoint URL to send traffic.

name: string

Defines the name of the pipeline.

maxLength128
minLength1
source: Array<CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors } | CloudflarePipelinesWorkersPipelinesBindingSource { format, type } >
One of the following:
CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors }

[DEPRECATED] HTTP source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
authentication?: boolean

Specifies whether authentication is required to send to this pipeline via HTTP.

cors?: CORS { origins }
origins?: Array<string>

Specifies allowed origins to allow Cross Origin HTTP Requests.

CloudflarePipelinesWorkersPipelinesBindingSource { format, type }

[DEPRECATED] Worker binding source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
version: number

Indicates the version number of last saved configuration.

PipelineUpdateResponse { id, destination, endpoint, 3 more }

[DEPRECATED] Describes the configuration of a pipeline. Use the new streams/sinks/pipelines API instead.

id: string

Specifies the pipeline identifier.

destination: Destination { batch, compression, format, 2 more }
batch: Batch { max_bytes, max_duration_s, max_rows }
max_bytes: number

Specifies rough maximum size of files.

maximum100000000
minimum1000
max_duration_s: number

Specifies duration to wait to aggregate batches files.

maximum300
minimum0.25
max_rows: number

Specifies rough maximum number of rows per file.

maximum10000000
minimum100
compression: Compression { type }
type: "none" | "gzip" | "deflate"

Specifies the desired compression algorithm and format.

One of the following:
"none"
"gzip"
"deflate"
format: "json"

Specifies the format of data to deliver.

path: Path { bucket, filename, filepath, prefix }
bucket: string

Specifies the R2 Bucket to store files.

filename?: string

Specifies the name pattern to for individual data files.

filepath?: string

Specifies the name pattern for directory.

prefix?: string

Specifies the base directory within the bucket.

type: "r2"

Specifies the type of destination.

endpoint: string

Indicates the endpoint URL to send traffic.

name: string

Defines the name of the pipeline.

maxLength128
minLength1
source: Array<CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors } | CloudflarePipelinesWorkersPipelinesBindingSource { format, type } >
One of the following:
CloudflarePipelinesWorkersPipelinesHTTPSource { format, type, authentication, cors }

[DEPRECATED] HTTP source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
authentication?: boolean

Specifies whether authentication is required to send to this pipeline via HTTP.

cors?: CORS { origins }
origins?: Array<string>

Specifies allowed origins to allow Cross Origin HTTP Requests.

CloudflarePipelinesWorkersPipelinesBindingSource { format, type }

[DEPRECATED] Worker binding source configuration. Use the new streams API instead.

format: "json"

Specifies the format of source data.

type: string
version: number

Indicates the version number of last saved configuration.

PipelineListV1Response { id, created_at, modified_at, 3 more }
id: string

Indicates a unique identifier for this pipeline.

created_at: string
modified_at: string
name: string

Indicates the name of the Pipeline.

maxLength128
minLength1
sql: string

Specifies SQL for the Pipeline processing flow.

status: string

Indicates the current status of the Pipeline.

PipelineGetV1Response { id, created_at, modified_at, 5 more }
id: string

Indicates a unique identifier for this pipeline.

created_at: string
modified_at: string
name: string

Indicates the name of the Pipeline.

maxLength128
minLength1
sql: string

Specifies SQL for the Pipeline processing flow.

status: string

Indicates the current status of the Pipeline.

tables: Array<Table>

List of streams and sinks used by this pipeline.

id: string

Unique identifier for the connection (stream or sink).

latest: number

Latest available version of the connection.

name: string

Name of the connection.

maxLength128
minLength1
type: "stream" | "sink"

Type of the connection.

One of the following:
"stream"
"sink"
version: number

Current version of the connection used by this pipeline.

failure_reason?: string

Indicates the reason for the failure of the Pipeline.

PipelineCreateV1Response { id, created_at, modified_at, 3 more }
id: string

Indicates a unique identifier for this pipeline.

created_at: string
modified_at: string
name: string

Indicates the name of the Pipeline.

maxLength128
minLength1
sql: string

Specifies SQL for the Pipeline processing flow.

status: string

Indicates the current status of the Pipeline.

PipelineValidateSqlResponse { tables, graph }
tables: Record<string, Tables>

Indicates tables involved in the processing.

id: string
name: string
type: string
version: number
graph?: Graph { edges, nodes }
edges: Array<Edge>
dest_id: number
formatint32
minimum0
edge_type: string
key_type: string
src_id: number
formatint32
minimum0
value_type: string
nodes: Array<Node>
description: string
node_id: number
formatint32
minimum0
operator: string
parallelism: number
formatint32
minimum0

PipelinesSinks

List Sinks
client.pipelines.sinks.list(SinkListParams { account_id, page, per_page, pipeline_id } params, RequestOptionsoptions?): V4PagePaginationArray<SinkListResponse { id, created_at, modified_at, 5 more } >
GET/accounts/{account_id}/pipelines/v1/sinks
Get Sink Details
client.pipelines.sinks.get(stringsinkId, SinkGetParams { account_id } params, RequestOptionsoptions?): SinkGetResponse { id, created_at, modified_at, 5 more }
GET/accounts/{account_id}/pipelines/v1/sinks/{sink_id}
Create Sink
client.pipelines.sinks.create(SinkCreateParams { account_id, name, type, 3 more } params, RequestOptionsoptions?): SinkCreateResponse { id, created_at, modified_at, 5 more }
POST/accounts/{account_id}/pipelines/v1/sinks
Delete Sink
client.pipelines.sinks.delete(stringsinkId, SinkDeleteParams { account_id, force } params, RequestOptionsoptions?): void
DELETE/accounts/{account_id}/pipelines/v1/sinks/{sink_id}
ModelsExpand Collapse
SinkListResponse { id, created_at, modified_at, 5 more }
id: string

Indicates a unique identifier for this sink.

created_at: string
formatdate-time
modified_at: string
formatdate-time
name: string

Defines the name of the Sink.

maxLength128
minLength1
type: "r2" | "r2_data_catalog"

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config?: CloudflarePipelinesR2TablePublic { account_id, bucket, file_naming, 4 more } | CloudflarePipelinesR2DataCatalogTablePublic { account_id, bucket, table_name, 2 more }

Defines the configuration of the R2 Sink.

One of the following:
CloudflarePipelinesR2TablePublic { account_id, bucket, file_naming, 4 more }

R2 Sink public configuration.

account_id: string

Cloudflare Account ID for the bucket

bucket: string

R2 Bucket to write to

file_naming?: FileNaming { prefix, strategy, suffix }

Controls filename prefix/suffix and strategy.

prefix?: string

The prefix to use in file name. i.e prefix-.parquet

strategy?: "serial" | "uuid" | "uuid_v7" | "ulid"

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix?: string

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction?: string

Jurisdiction this bucket is hosted in

partitioning?: Partitioning { time_pattern }

Data-layout partitioning for sinks.

time_pattern?: string

The pattern of the date string

path?: string

Subpath within the bucket to write to

rolling_policy?: RollingPolicy { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes?: number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds?: number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds?: number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
CloudflarePipelinesR2DataCatalogTablePublic { account_id, bucket, table_name, 2 more }

R2 Data Catalog Sink public configuration.

account_id: string

Cloudflare Account ID

formaturi
bucket: string

The R2 Bucket that hosts this catalog

table_name: string

Table name

namespace?: string

Table namespace

rolling_policy?: RollingPolicy { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes?: number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds?: number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds?: number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
schema?: Schema { fields, format, inferred }
fields?: Array<Int32 { type, metadata_key, name, 2 more } | Int64 { type, metadata_key, name, 2 more } | Float32 { type, metadata_key, name, 2 more } | 8 more>
One of the following:
Int32 { type, metadata_key, name, 2 more }
type: "int32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Int64 { type, metadata_key, name, 2 more }
type: "int64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float32 { type, metadata_key, name, 2 more }
type: "float32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float64 { type, metadata_key, name, 2 more }
type: "float64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Bool { type, metadata_key, name, 2 more }
type: "bool"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
String { type, metadata_key, name, 2 more }
type: "string"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Binary { type, metadata_key, name, 2 more }
type: "binary"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Timestamp { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
unit?: "second" | "millisecond" | "microsecond" | "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json { type, metadata_key, name, 2 more }
type: "json"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Struct
List
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
inferred?: boolean | null
SinkGetResponse { id, created_at, modified_at, 5 more }
id: string

Indicates a unique identifier for this sink.

created_at: string
formatdate-time
modified_at: string
formatdate-time
name: string

Defines the name of the Sink.

maxLength128
minLength1
type: "r2" | "r2_data_catalog"

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config?: CloudflarePipelinesR2TablePublic { account_id, bucket, file_naming, 4 more } | CloudflarePipelinesR2DataCatalogTablePublic { account_id, bucket, table_name, 2 more }

Defines the configuration of the R2 Sink.

One of the following:
CloudflarePipelinesR2TablePublic { account_id, bucket, file_naming, 4 more }

R2 Sink public configuration.

account_id: string

Cloudflare Account ID for the bucket

bucket: string

R2 Bucket to write to

file_naming?: FileNaming { prefix, strategy, suffix }

Controls filename prefix/suffix and strategy.

prefix?: string

The prefix to use in file name. i.e prefix-.parquet

strategy?: "serial" | "uuid" | "uuid_v7" | "ulid"

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix?: string

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction?: string

Jurisdiction this bucket is hosted in

partitioning?: Partitioning { time_pattern }

Data-layout partitioning for sinks.

time_pattern?: string

The pattern of the date string

path?: string

Subpath within the bucket to write to

rolling_policy?: RollingPolicy { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes?: number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds?: number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds?: number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
CloudflarePipelinesR2DataCatalogTablePublic { account_id, bucket, table_name, 2 more }

R2 Data Catalog Sink public configuration.

account_id: string

Cloudflare Account ID

formaturi
bucket: string

The R2 Bucket that hosts this catalog

table_name: string

Table name

namespace?: string

Table namespace

rolling_policy?: RollingPolicy { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes?: number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds?: number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds?: number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
schema?: Schema { fields, format, inferred }
fields?: Array<Int32 { type, metadata_key, name, 2 more } | Int64 { type, metadata_key, name, 2 more } | Float32 { type, metadata_key, name, 2 more } | 8 more>
One of the following:
Int32 { type, metadata_key, name, 2 more }
type: "int32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Int64 { type, metadata_key, name, 2 more }
type: "int64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float32 { type, metadata_key, name, 2 more }
type: "float32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float64 { type, metadata_key, name, 2 more }
type: "float64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Bool { type, metadata_key, name, 2 more }
type: "bool"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
String { type, metadata_key, name, 2 more }
type: "string"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Binary { type, metadata_key, name, 2 more }
type: "binary"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Timestamp { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
unit?: "second" | "millisecond" | "microsecond" | "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json { type, metadata_key, name, 2 more }
type: "json"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Struct
List
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
inferred?: boolean | null
SinkCreateResponse { id, created_at, modified_at, 5 more }
id: string

Indicates a unique identifier for this sink.

created_at: string
formatdate-time
modified_at: string
formatdate-time
name: string

Defines the name of the Sink.

maxLength128
minLength1
type: "r2" | "r2_data_catalog"

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config?: CloudflarePipelinesR2Table { account_id, bucket, credentials, 5 more } | CloudflarePipelinesR2DataCatalogTable { token, account_id, bucket, 3 more }

R2 Data Catalog Sink

One of the following:
CloudflarePipelinesR2Table { account_id, bucket, credentials, 5 more }
account_id: string

Cloudflare Account ID for the bucket

bucket: string

R2 Bucket to write to

credentials: Credentials { access_key_id, secret_access_key }
access_key_id: string

Cloudflare Account ID for the bucket

formatvar-str
secret_access_key: string

Cloudflare Account ID for the bucket

formatvar-str
file_naming?: FileNaming { prefix, strategy, suffix }

Controls filename prefix/suffix and strategy.

prefix?: string

The prefix to use in file name. i.e prefix-.parquet

strategy?: "serial" | "uuid" | "uuid_v7" | "ulid"

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix?: string

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction?: string

Jurisdiction this bucket is hosted in

partitioning?: Partitioning { time_pattern }

Data-layout partitioning for sinks.

time_pattern?: string

The pattern of the date string

path?: string

Subpath within the bucket to write to

rolling_policy?: RollingPolicy { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes?: number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds?: number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds?: number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
CloudflarePipelinesR2DataCatalogTable { token, account_id, bucket, 3 more }

R2 Data Catalog Sink

token: string

Authentication token

formatvar-str
account_id: string

Cloudflare Account ID

formaturi
bucket: string

The R2 Bucket that hosts this catalog

table_name: string

Table name

namespace?: string

Table namespace

rolling_policy?: RollingPolicy { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes?: number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds?: number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds?: number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
schema?: Schema { fields, format, inferred }
fields?: Array<Int32 { type, metadata_key, name, 2 more } | Int64 { type, metadata_key, name, 2 more } | Float32 { type, metadata_key, name, 2 more } | 8 more>
One of the following:
Int32 { type, metadata_key, name, 2 more }
type: "int32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Int64 { type, metadata_key, name, 2 more }
type: "int64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float32 { type, metadata_key, name, 2 more }
type: "float32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float64 { type, metadata_key, name, 2 more }
type: "float64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Bool { type, metadata_key, name, 2 more }
type: "bool"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
String { type, metadata_key, name, 2 more }
type: "string"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Binary { type, metadata_key, name, 2 more }
type: "binary"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Timestamp { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
unit?: "second" | "millisecond" | "microsecond" | "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json { type, metadata_key, name, 2 more }
type: "json"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Struct
List
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
inferred?: boolean | null

PipelinesStreams

List Streams
client.pipelines.streams.list(StreamListParams { account_id, page, per_page, pipeline_id } params, RequestOptionsoptions?): V4PagePaginationArray<StreamListResponse { id, created_at, http, 7 more } >
GET/accounts/{account_id}/pipelines/v1/streams
Get Stream Details
client.pipelines.streams.get(stringstreamId, StreamGetParams { account_id } params, RequestOptionsoptions?): StreamGetResponse { id, created_at, http, 7 more }
GET/accounts/{account_id}/pipelines/v1/streams/{stream_id}
Create Stream
client.pipelines.streams.create(StreamCreateParams { account_id, name, format, 3 more } params, RequestOptionsoptions?): StreamCreateResponse { id, created_at, http, 7 more }
POST/accounts/{account_id}/pipelines/v1/streams
Update Stream
client.pipelines.streams.update(stringstreamId, StreamUpdateParams { account_id, http, worker_binding } params, RequestOptionsoptions?): StreamUpdateResponse { id, created_at, http, 6 more }
PATCH/accounts/{account_id}/pipelines/v1/streams/{stream_id}
Delete Stream
client.pipelines.streams.delete(stringstreamId, StreamDeleteParams { account_id, force } params, RequestOptionsoptions?): void
DELETE/accounts/{account_id}/pipelines/v1/streams/{stream_id}
ModelsExpand Collapse
StreamListResponse { id, created_at, http, 7 more }
id: string

Indicates a unique identifier for this stream.

created_at: string
formatdate-time
http: HTTP { authentication, enabled, cors }
authentication: boolean

Indicates that authentication is required for the HTTP endpoint.

enabled: boolean

Indicates that the HTTP endpoint is enabled.

cors?: CORS { origins }

Specifies the CORS options for the HTTP endpoint.

origins?: Array<string>
modified_at: string
formatdate-time
name: string

Indicates the name of the Stream.

maxLength128
minLength1
version: number

Indicates the current version of this stream.

worker_binding: WorkerBinding { enabled }
enabled: boolean

Indicates that the worker binding is enabled.

endpoint?: string

Indicates the endpoint URL of this stream.

formaturi
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
schema?: Schema { fields, format, inferred }
fields?: Array<Int32 { type, metadata_key, name, 2 more } | Int64 { type, metadata_key, name, 2 more } | Float32 { type, metadata_key, name, 2 more } | 8 more>
One of the following:
Int32 { type, metadata_key, name, 2 more }
type: "int32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Int64 { type, metadata_key, name, 2 more }
type: "int64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float32 { type, metadata_key, name, 2 more }
type: "float32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float64 { type, metadata_key, name, 2 more }
type: "float64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Bool { type, metadata_key, name, 2 more }
type: "bool"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
String { type, metadata_key, name, 2 more }
type: "string"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Binary { type, metadata_key, name, 2 more }
type: "binary"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Timestamp { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
unit?: "second" | "millisecond" | "microsecond" | "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json { type, metadata_key, name, 2 more }
type: "json"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Struct
List
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
inferred?: boolean | null
StreamGetResponse { id, created_at, http, 7 more }
id: string

Indicates a unique identifier for this stream.

created_at: string
formatdate-time
http: HTTP { authentication, enabled, cors }
authentication: boolean

Indicates that authentication is required for the HTTP endpoint.

enabled: boolean

Indicates that the HTTP endpoint is enabled.

cors?: CORS { origins }

Specifies the CORS options for the HTTP endpoint.

origins?: Array<string>
modified_at: string
formatdate-time
name: string

Indicates the name of the Stream.

maxLength128
minLength1
version: number

Indicates the current version of this stream.

worker_binding: WorkerBinding { enabled }
enabled: boolean

Indicates that the worker binding is enabled.

endpoint?: string

Indicates the endpoint URL of this stream.

formaturi
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
schema?: Schema { fields, format, inferred }
fields?: Array<Int32 { type, metadata_key, name, 2 more } | Int64 { type, metadata_key, name, 2 more } | Float32 { type, metadata_key, name, 2 more } | 8 more>
One of the following:
Int32 { type, metadata_key, name, 2 more }
type: "int32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Int64 { type, metadata_key, name, 2 more }
type: "int64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float32 { type, metadata_key, name, 2 more }
type: "float32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float64 { type, metadata_key, name, 2 more }
type: "float64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Bool { type, metadata_key, name, 2 more }
type: "bool"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
String { type, metadata_key, name, 2 more }
type: "string"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Binary { type, metadata_key, name, 2 more }
type: "binary"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Timestamp { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
unit?: "second" | "millisecond" | "microsecond" | "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json { type, metadata_key, name, 2 more }
type: "json"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Struct
List
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
inferred?: boolean | null
StreamCreateResponse { id, created_at, http, 7 more }
id: string

Indicates a unique identifier for this stream.

created_at: string
formatdate-time
http: HTTP { authentication, enabled, cors }
authentication: boolean

Indicates that authentication is required for the HTTP endpoint.

enabled: boolean

Indicates that the HTTP endpoint is enabled.

cors?: CORS { origins }

Specifies the CORS options for the HTTP endpoint.

origins?: Array<string>
modified_at: string
formatdate-time
name: string

Indicates the name of the Stream.

maxLength128
minLength1
version: number

Indicates the current version of this stream.

worker_binding: WorkerBinding { enabled }
enabled: boolean

Indicates that the worker binding is enabled.

endpoint?: string

Indicates the endpoint URL of this stream.

formaturi
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
schema?: Schema { fields, format, inferred }
fields?: Array<Int32 { type, metadata_key, name, 2 more } | Int64 { type, metadata_key, name, 2 more } | Float32 { type, metadata_key, name, 2 more } | 8 more>
One of the following:
Int32 { type, metadata_key, name, 2 more }
type: "int32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Int64 { type, metadata_key, name, 2 more }
type: "int64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float32 { type, metadata_key, name, 2 more }
type: "float32"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Float64 { type, metadata_key, name, 2 more }
type: "float64"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Bool { type, metadata_key, name, 2 more }
type: "bool"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
String { type, metadata_key, name, 2 more }
type: "string"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Binary { type, metadata_key, name, 2 more }
type: "binary"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Timestamp { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
unit?: "second" | "millisecond" | "microsecond" | "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json { type, metadata_key, name, 2 more }
type: "json"
metadata_key?: string | null
name?: string
required?: boolean
sql_name?: string
Struct
List
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0
inferred?: boolean | null
StreamUpdateResponse { id, created_at, http, 6 more }
id: string

Indicates a unique identifier for this stream.

created_at: string
formatdate-time
http: HTTP { authentication, enabled, cors }
authentication: boolean

Indicates that authentication is required for the HTTP endpoint.

enabled: boolean

Indicates that the HTTP endpoint is enabled.

cors?: CORS { origins }

Specifies the CORS options for the HTTP endpoint.

origins?: Array<string>
modified_at: string
formatdate-time
name: string

Indicates the name of the Stream.

maxLength128
minLength1
version: number

Indicates the current version of this stream.

worker_binding: WorkerBinding { enabled }
enabled: boolean

Indicates that the worker binding is enabled.

endpoint?: string

Indicates the endpoint URL of this stream.

formaturi
format?: Json { type, decimal_encoding, timestamp_format, unstructured } | Parquet { type, compression, row_group_bytes }
One of the following:
Json { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding?: "number" | "string" | "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format?: "rfc3339" | "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured?: boolean
Parquet { type, compression, row_group_bytes }
type: "parquet"
compression?: "uncompressed" | "snappy" | "gzip" | 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes?: number | null
formatint64
minimum0