Skip to content
Start here

Sinks

List Sinks
pipelines.sinks.list(SinkListParams**kwargs) -> SyncV4PagePaginationArray[SinkListResponse]
GET/accounts/{account_id}/pipelines/v1/sinks
Get Sink Details
pipelines.sinks.get(strsink_id, SinkGetParams**kwargs) -> SinkGetResponse
GET/accounts/{account_id}/pipelines/v1/sinks/{sink_id}
Create Sink
pipelines.sinks.create(SinkCreateParams**kwargs) -> SinkCreateResponse
POST/accounts/{account_id}/pipelines/v1/sinks
Delete Sink
pipelines.sinks.delete(strsink_id, SinkDeleteParams**kwargs)
DELETE/accounts/{account_id}/pipelines/v1/sinks/{sink_id}
ModelsExpand Collapse
class SinkListResponse:
id: str

Indicates a unique identifier for this sink.

created_at: datetime
formatdate-time
modified_at: datetime
formatdate-time
name: str

Defines the name of the Sink.

maxLength128
minLength1
type: Literal["r2", "r2_data_catalog"]

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config: Optional[Config]

Defines the configuration of the R2 Sink.

One of the following:
class ConfigCloudflarePipelinesR2TablePublic:

R2 Sink public configuration.

account_id: str

Cloudflare Account ID for the bucket

bucket: str

R2 Bucket to write to

file_naming: Optional[ConfigCloudflarePipelinesR2TablePublicFileNaming]

Controls filename prefix/suffix and strategy.

prefix: Optional[str]

The prefix to use in file name. i.e prefix-.parquet

strategy: Optional[Literal["serial", "uuid", "uuid_v7", "ulid"]]

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix: Optional[str]

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction: Optional[str]

Jurisdiction this bucket is hosted in

partitioning: Optional[ConfigCloudflarePipelinesR2TablePublicPartitioning]

Data-layout partitioning for sinks.

time_pattern: Optional[str]

The pattern of the date string

path: Optional[str]

Subpath within the bucket to write to

rolling_policy: Optional[ConfigCloudflarePipelinesR2TablePublicRollingPolicy]

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: Optional[int]

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: Optional[int]

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: Optional[int]

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
class ConfigCloudflarePipelinesR2DataCatalogTablePublic:

R2 Data Catalog Sink public configuration.

account_id: str

Cloudflare Account ID

formaturi
bucket: str

The R2 Bucket that hosts this catalog

table_name: str

Table name

namespace: Optional[str]

Table namespace

rolling_policy: Optional[ConfigCloudflarePipelinesR2DataCatalogTablePublicRollingPolicy]

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: Optional[int]

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: Optional[int]

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: Optional[int]

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format: Optional[Format]
One of the following:
class FormatJson:
type: Literal["json"]
decimal_encoding: Optional[Literal["number", "string", "bytes"]]
One of the following:
"number"
"string"
"bytes"
timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]
One of the following:
"rfc3339"
"unix_millis"
unstructured: Optional[bool]
class FormatParquet:
type: Literal["parquet"]
compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: Optional[int]
formatint64
minimum0
schema: Optional[Schema]
fields: Optional[List[SchemaField]]
One of the following:
class SchemaFieldInt32:
type: Literal["int32"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldInt64:
type: Literal["int64"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldFloat32:
type: Literal["float32"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldFloat64:
type: Literal["float64"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldBool:
type: Literal["bool"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldString:
type: Literal["string"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldBinary:
type: Literal["binary"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldTimestamp:
type: Literal["timestamp"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
unit: Optional[Literal["second", "millisecond", "microsecond", "nanosecond"]]
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
class SchemaFieldJson:
type: Literal["json"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldStruct:
class SchemaFieldList:
format: Optional[SchemaFormat]
One of the following:
class SchemaFormatJson:
type: Literal["json"]
decimal_encoding: Optional[Literal["number", "string", "bytes"]]
One of the following:
"number"
"string"
"bytes"
timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]
One of the following:
"rfc3339"
"unix_millis"
unstructured: Optional[bool]
class SchemaFormatParquet:
type: Literal["parquet"]
compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: Optional[int]
formatint64
minimum0
inferred: Optional[bool]
class SinkGetResponse:
id: str

Indicates a unique identifier for this sink.

created_at: datetime
formatdate-time
modified_at: datetime
formatdate-time
name: str

Defines the name of the Sink.

maxLength128
minLength1
type: Literal["r2", "r2_data_catalog"]

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config: Optional[Config]

Defines the configuration of the R2 Sink.

One of the following:
class ConfigCloudflarePipelinesR2TablePublic:

R2 Sink public configuration.

account_id: str

Cloudflare Account ID for the bucket

bucket: str

R2 Bucket to write to

file_naming: Optional[ConfigCloudflarePipelinesR2TablePublicFileNaming]

Controls filename prefix/suffix and strategy.

prefix: Optional[str]

The prefix to use in file name. i.e prefix-.parquet

strategy: Optional[Literal["serial", "uuid", "uuid_v7", "ulid"]]

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix: Optional[str]

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction: Optional[str]

Jurisdiction this bucket is hosted in

partitioning: Optional[ConfigCloudflarePipelinesR2TablePublicPartitioning]

Data-layout partitioning for sinks.

time_pattern: Optional[str]

The pattern of the date string

path: Optional[str]

Subpath within the bucket to write to

rolling_policy: Optional[ConfigCloudflarePipelinesR2TablePublicRollingPolicy]

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: Optional[int]

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: Optional[int]

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: Optional[int]

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
class ConfigCloudflarePipelinesR2DataCatalogTablePublic:

R2 Data Catalog Sink public configuration.

account_id: str

Cloudflare Account ID

formaturi
bucket: str

The R2 Bucket that hosts this catalog

table_name: str

Table name

namespace: Optional[str]

Table namespace

rolling_policy: Optional[ConfigCloudflarePipelinesR2DataCatalogTablePublicRollingPolicy]

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: Optional[int]

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: Optional[int]

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: Optional[int]

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format: Optional[Format]
One of the following:
class FormatJson:
type: Literal["json"]
decimal_encoding: Optional[Literal["number", "string", "bytes"]]
One of the following:
"number"
"string"
"bytes"
timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]
One of the following:
"rfc3339"
"unix_millis"
unstructured: Optional[bool]
class FormatParquet:
type: Literal["parquet"]
compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: Optional[int]
formatint64
minimum0
schema: Optional[Schema]
fields: Optional[List[SchemaField]]
One of the following:
class SchemaFieldInt32:
type: Literal["int32"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldInt64:
type: Literal["int64"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldFloat32:
type: Literal["float32"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldFloat64:
type: Literal["float64"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldBool:
type: Literal["bool"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldString:
type: Literal["string"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldBinary:
type: Literal["binary"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldTimestamp:
type: Literal["timestamp"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
unit: Optional[Literal["second", "millisecond", "microsecond", "nanosecond"]]
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
class SchemaFieldJson:
type: Literal["json"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldStruct:
class SchemaFieldList:
format: Optional[SchemaFormat]
One of the following:
class SchemaFormatJson:
type: Literal["json"]
decimal_encoding: Optional[Literal["number", "string", "bytes"]]
One of the following:
"number"
"string"
"bytes"
timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]
One of the following:
"rfc3339"
"unix_millis"
unstructured: Optional[bool]
class SchemaFormatParquet:
type: Literal["parquet"]
compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: Optional[int]
formatint64
minimum0
inferred: Optional[bool]
class SinkCreateResponse:
id: str

Indicates a unique identifier for this sink.

created_at: datetime
formatdate-time
modified_at: datetime
formatdate-time
name: str

Defines the name of the Sink.

maxLength128
minLength1
type: Literal["r2", "r2_data_catalog"]

Specifies the type of sink.

One of the following:
"r2"
"r2_data_catalog"
config: Optional[Config]

R2 Data Catalog Sink

One of the following:
class ConfigCloudflarePipelinesR2Table:
account_id: str

Cloudflare Account ID for the bucket

bucket: str

R2 Bucket to write to

credentials: ConfigCloudflarePipelinesR2TableCredentials
access_key_id: str

Cloudflare Account ID for the bucket

formatvar-str
secret_access_key: str

Cloudflare Account ID for the bucket

formatvar-str
file_naming: Optional[ConfigCloudflarePipelinesR2TableFileNaming]

Controls filename prefix/suffix and strategy.

prefix: Optional[str]

The prefix to use in file name. i.e prefix-.parquet

strategy: Optional[Literal["serial", "uuid", "uuid_v7", "ulid"]]

Filename generation strategy.

One of the following:
"serial"
"uuid"
"uuid_v7"
"ulid"
suffix: Optional[str]

This will overwrite the default file suffix. i.e .parquet, use with caution

jurisdiction: Optional[str]

Jurisdiction this bucket is hosted in

partitioning: Optional[ConfigCloudflarePipelinesR2TablePartitioning]

Data-layout partitioning for sinks.

time_pattern: Optional[str]

The pattern of the date string

path: Optional[str]

Subpath within the bucket to write to

rolling_policy: Optional[ConfigCloudflarePipelinesR2TableRollingPolicy]

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: Optional[int]

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: Optional[int]

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: Optional[int]

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
class ConfigCloudflarePipelinesR2DataCatalogTable:

R2 Data Catalog Sink

token: str

Authentication token

formatvar-str
account_id: str

Cloudflare Account ID

formaturi
bucket: str

The R2 Bucket that hosts this catalog

table_name: str

Table name

namespace: Optional[str]

Table namespace

rolling_policy: Optional[ConfigCloudflarePipelinesR2DataCatalogTableRollingPolicy]

Rolling policy for file sinks (when & why to close a file and open a new one).

file_size_bytes: Optional[int]

Files will be rolled after reaching this number of bytes

formatuint64
minimum0
inactivity_seconds: Optional[int]

Number of seconds of inactivity to wait before rolling over to a new file

formatuint64
minimum1
interval_seconds: Optional[int]

Number of seconds to wait before rolling over to a new file

formatuint64
minimum1
format: Optional[Format]
One of the following:
class FormatJson:
type: Literal["json"]
decimal_encoding: Optional[Literal["number", "string", "bytes"]]
One of the following:
"number"
"string"
"bytes"
timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]
One of the following:
"rfc3339"
"unix_millis"
unstructured: Optional[bool]
class FormatParquet:
type: Literal["parquet"]
compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: Optional[int]
formatint64
minimum0
schema: Optional[Schema]
fields: Optional[List[SchemaField]]
One of the following:
class SchemaFieldInt32:
type: Literal["int32"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldInt64:
type: Literal["int64"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldFloat32:
type: Literal["float32"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldFloat64:
type: Literal["float64"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldBool:
type: Literal["bool"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldString:
type: Literal["string"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldBinary:
type: Literal["binary"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldTimestamp:
type: Literal["timestamp"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
unit: Optional[Literal["second", "millisecond", "microsecond", "nanosecond"]]
One of the following:
"second"
"millisecond"
"microsecond"
"nanosecond"
class SchemaFieldJson:
type: Literal["json"]
metadata_key: Optional[str]
name: Optional[str]
required: Optional[bool]
sql_name: Optional[str]
class SchemaFieldStruct:
class SchemaFieldList:
format: Optional[SchemaFormat]
One of the following:
class SchemaFormatJson:
type: Literal["json"]
decimal_encoding: Optional[Literal["number", "string", "bytes"]]
One of the following:
"number"
"string"
"bytes"
timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]
One of the following:
"rfc3339"
"unix_millis"
unstructured: Optional[bool]
class SchemaFormatParquet:
type: Literal["parquet"]
compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]
One of the following:
"uncompressed"
"snappy"
"gzip"
"zstd"
"lz4"
row_group_bytes: Optional[int]
formatint64
minimum0
inferred: Optional[bool]