Skip to content
Start here

Sinks

List Sinks
GET/accounts/{account_id}/pipelines/v1/sinks
Get Sink Details
GET/accounts/{account_id}/pipelines/v1/sinks/{sink_id}
Create Sink
POST/accounts/{account_id}/pipelines/v1/sinks
Delete Sink
DELETE/accounts/{account_id}/pipelines/v1/sinks/{sink_id}
ModelsExpand Collapse
SinkListResponse = object { id, created_at, modified_at, 5 more }
id: string

Indicates a unique identifier for this sink.

created_at: string
formatdate-time
modified_at: string
formatdate-time
name: string

Defines the name of the Sink.

maxLength128
minLength1
type: "r2" or "r2_data_catalog"

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config: optional object { account_id, bucket, file_naming, 4 more } or object { account_id, bucket, table_name, 2 more }

Defines the configuration of the R2 Sink.

One of the following:
CloudflarePipelinesR2TablePublic = object { account_id, bucket, file_naming, 4 more }

R2 Sink public configuration.

account_id: string

Cloudflare Account ID for the bucket

bucket: string

R2 Bucket to write to

file_naming: optional object { prefix, strategy, suffix }

Controls filename prefix/suffix and strategy.

prefix: optional string

The prefix to use in file name. i.e prefix-.parquet

strategy: optional "serial" or "uuid" or "uuid_v7" or "ulid"

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix: optional string

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction: optional string

Jurisdiction this bucket is hosted in

partitioning: optional object { time_pattern }

Data-layout partitioning for sinks.

time_pattern: optional string

The pattern of the date string

path: optional string

Subpath within the bucket to write to

rolling_policy: optional object { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: optional number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: optional number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: optional number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
CloudflarePipelinesR2DataCatalogTablePublic = object { account_id, bucket, table_name, 2 more }

R2 Data Catalog Sink public configuration.

account_id: string

Cloudflare Account ID

formaturi
bucket: string

The R2 Bucket that hosts this catalog

table_name: string

Table name

namespace: optional string

Table namespace

rolling_policy: optional object { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: optional number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: optional number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: optional number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format: optional object { type, decimal_encoding, timestamp_format, unstructured } or object { type, compression, row_group_bytes }
One of the following:
Json = object { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding: optional "number" or "string" or "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format: optional "rfc3339" or "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured: optional boolean
Parquet = object { type, compression, row_group_bytes }
type: "parquet"
compression: optional "uncompressed" or "snappy" or "gzip" or 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: optional number
formatint64
minimum0
schema: optional object { fields, format, inferred }
fields: optional array of object { type, metadata_key, name, 2 more } or object { type, metadata_key, name, 2 more } or object { type, metadata_key, name, 2 more } or 8 more
One of the following:
Int32 = object { type, metadata_key, name, 2 more }
type: "int32"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Int64 = object { type, metadata_key, name, 2 more }
type: "int64"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Float32 = object { type, metadata_key, name, 2 more }
type: "float32"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Float64 = object { type, metadata_key, name, 2 more }
type: "float64"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Bool = object { type, metadata_key, name, 2 more }
type: "bool"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
String = object { type, metadata_key, name, 2 more }
type: "string"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Binary = object { type, metadata_key, name, 2 more }
type: "binary"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Timestamp = object { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
unit: optional "second" or "millisecond" or "microsecond" or "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json = object { type, metadata_key, name, 2 more }
type: "json"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Struct =
List =
format: optional object { type, decimal_encoding, timestamp_format, unstructured } or object { type, compression, row_group_bytes }
One of the following:
Json = object { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding: optional "number" or "string" or "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format: optional "rfc3339" or "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured: optional boolean
Parquet = object { type, compression, row_group_bytes }
type: "parquet"
compression: optional "uncompressed" or "snappy" or "gzip" or 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: optional number
formatint64
minimum0
inferred: optional boolean
SinkGetResponse = object { id, created_at, modified_at, 5 more }
id: string

Indicates a unique identifier for this sink.

created_at: string
formatdate-time
modified_at: string
formatdate-time
name: string

Defines the name of the Sink.

maxLength128
minLength1
type: "r2" or "r2_data_catalog"

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config: optional object { account_id, bucket, file_naming, 4 more } or object { account_id, bucket, table_name, 2 more }

Defines the configuration of the R2 Sink.

One of the following:
CloudflarePipelinesR2TablePublic = object { account_id, bucket, file_naming, 4 more }

R2 Sink public configuration.

account_id: string

Cloudflare Account ID for the bucket

bucket: string

R2 Bucket to write to

file_naming: optional object { prefix, strategy, suffix }

Controls filename prefix/suffix and strategy.

prefix: optional string

The prefix to use in file name. i.e prefix-.parquet

strategy: optional "serial" or "uuid" or "uuid_v7" or "ulid"

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix: optional string

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction: optional string

Jurisdiction this bucket is hosted in

partitioning: optional object { time_pattern }

Data-layout partitioning for sinks.

time_pattern: optional string

The pattern of the date string

path: optional string

Subpath within the bucket to write to

rolling_policy: optional object { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: optional number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: optional number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: optional number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
CloudflarePipelinesR2DataCatalogTablePublic = object { account_id, bucket, table_name, 2 more }

R2 Data Catalog Sink public configuration.

account_id: string

Cloudflare Account ID

formaturi
bucket: string

The R2 Bucket that hosts this catalog

table_name: string

Table name

namespace: optional string

Table namespace

rolling_policy: optional object { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: optional number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: optional number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: optional number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format: optional object { type, decimal_encoding, timestamp_format, unstructured } or object { type, compression, row_group_bytes }
One of the following:
Json = object { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding: optional "number" or "string" or "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format: optional "rfc3339" or "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured: optional boolean
Parquet = object { type, compression, row_group_bytes }
type: "parquet"
compression: optional "uncompressed" or "snappy" or "gzip" or 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: optional number
formatint64
minimum0
schema: optional object { fields, format, inferred }
fields: optional array of object { type, metadata_key, name, 2 more } or object { type, metadata_key, name, 2 more } or object { type, metadata_key, name, 2 more } or 8 more
One of the following:
Int32 = object { type, metadata_key, name, 2 more }
type: "int32"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Int64 = object { type, metadata_key, name, 2 more }
type: "int64"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Float32 = object { type, metadata_key, name, 2 more }
type: "float32"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Float64 = object { type, metadata_key, name, 2 more }
type: "float64"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Bool = object { type, metadata_key, name, 2 more }
type: "bool"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
String = object { type, metadata_key, name, 2 more }
type: "string"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Binary = object { type, metadata_key, name, 2 more }
type: "binary"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Timestamp = object { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
unit: optional "second" or "millisecond" or "microsecond" or "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json = object { type, metadata_key, name, 2 more }
type: "json"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Struct =
List =
format: optional object { type, decimal_encoding, timestamp_format, unstructured } or object { type, compression, row_group_bytes }
One of the following:
Json = object { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding: optional "number" or "string" or "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format: optional "rfc3339" or "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured: optional boolean
Parquet = object { type, compression, row_group_bytes }
type: "parquet"
compression: optional "uncompressed" or "snappy" or "gzip" or 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: optional number
formatint64
minimum0
inferred: optional boolean
SinkCreateResponse = object { id, created_at, modified_at, 5 more }
id: string

Indicates a unique identifier for this sink.

created_at: string
formatdate-time
modified_at: string
formatdate-time
name: string

Defines the name of the Sink.

maxLength128
minLength1
type: "r2" or "r2_data_catalog"

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config: optional object { account_id, bucket, credentials, 5 more } or object { token, account_id, bucket, 3 more }

R2 Data Catalog Sink

One of the following:
CloudflarePipelinesR2Table = object { account_id, bucket, credentials, 5 more }
account_id: string

Cloudflare Account ID for the bucket

bucket: string

R2 Bucket to write to

credentials: object { access_key_id, secret_access_key }
access_key_id: string

Cloudflare Account ID for the bucket

formatvar-str
secret_access_key: string

Cloudflare Account ID for the bucket

formatvar-str
file_naming: optional object { prefix, strategy, suffix }

Controls filename prefix/suffix and strategy.

prefix: optional string

The prefix to use in file name. i.e prefix-.parquet

strategy: optional "serial" or "uuid" or "uuid_v7" or "ulid"

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix: optional string

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction: optional string

Jurisdiction this bucket is hosted in

partitioning: optional object { time_pattern }

Data-layout partitioning for sinks.

time_pattern: optional string

The pattern of the date string

path: optional string

Subpath within the bucket to write to

rolling_policy: optional object { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: optional number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: optional number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: optional number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
CloudflarePipelinesR2DataCatalogTable = object { token, account_id, bucket, 3 more }

R2 Data Catalog Sink

token: string

Authentication token

formatvar-str
account_id: string

Cloudflare Account ID

formaturi
bucket: string

The R2 Bucket that hosts this catalog

table_name: string

Table name

namespace: optional string

Table namespace

rolling_policy: optional object { file_size_bytes, inactivity_seconds, interval_seconds }

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: optional number

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: optional number

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: optional number

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format: optional object { type, decimal_encoding, timestamp_format, unstructured } or object { type, compression, row_group_bytes }
One of the following:
Json = object { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding: optional "number" or "string" or "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format: optional "rfc3339" or "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured: optional boolean
Parquet = object { type, compression, row_group_bytes }
type: "parquet"
compression: optional "uncompressed" or "snappy" or "gzip" or 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: optional number
formatint64
minimum0
schema: optional object { fields, format, inferred }
fields: optional array of object { type, metadata_key, name, 2 more } or object { type, metadata_key, name, 2 more } or object { type, metadata_key, name, 2 more } or 8 more
One of the following:
Int32 = object { type, metadata_key, name, 2 more }
type: "int32"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Int64 = object { type, metadata_key, name, 2 more }
type: "int64"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Float32 = object { type, metadata_key, name, 2 more }
type: "float32"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Float64 = object { type, metadata_key, name, 2 more }
type: "float64"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Bool = object { type, metadata_key, name, 2 more }
type: "bool"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
String = object { type, metadata_key, name, 2 more }
type: "string"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Binary = object { type, metadata_key, name, 2 more }
type: "binary"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Timestamp = object { type, metadata_key, name, 3 more }
type: "timestamp"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
unit: optional "second" or "millisecond" or "microsecond" or "nanosecond"
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
Json = object { type, metadata_key, name, 2 more }
type: "json"
metadata_key: optional string
name: optional string
required: optional boolean
sql_name: optional string
Struct =
List =
format: optional object { type, decimal_encoding, timestamp_format, unstructured } or object { type, compression, row_group_bytes }
One of the following:
Json = object { type, decimal_encoding, timestamp_format, unstructured }
type: "json"
decimal_encoding: optional "number" or "string" or "bytes"
One of the following:
"number"
"string"
"bytes"
timestamp_format: optional "rfc3339" or "unix_millis"
One of the following:
"rfc3339"
"unix_millis"
unstructured: optional boolean
Parquet = object { type, compression, row_group_bytes }
type: "parquet"
compression: optional "uncompressed" or "snappy" or "gzip" or 2 more
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: optional number
formatint64
minimum0
inferred: optional boolean