## Create Sink `pipelines.sinks.create(SinkCreateParams**kwargs) -> SinkCreateResponse` **post** `/accounts/{account_id}/pipelines/v1/sinks` Create a new Sink. ### Parameters - `account_id: str` Specifies the public ID of the account. - `name: str` Defines the name of the Sink. - `type: Literal["r2", "r2_data_catalog"]` Specifies the type of sink. - `"r2"` - `"r2_data_catalog"` - `config: Optional[Config]` Defines the configuration of the R2 Sink. - `class ConfigCloudflarePipelinesR2Table: …` - `account_id: str` Cloudflare Account ID for the bucket - `bucket: str` R2 Bucket to write to - `credentials: ConfigCloudflarePipelinesR2TableCredentials` - `access_key_id: str` Cloudflare Account ID for the bucket - `secret_access_key: str` Cloudflare Account ID for the bucket - `file_naming: Optional[ConfigCloudflarePipelinesR2TableFileNaming]` Controls filename prefix/suffix and strategy. - `prefix: Optional[str]` The prefix to use in file name. i.e prefix-.parquet - `strategy: Optional[Literal["serial", "uuid", "uuid_v7", "ulid"]]` Filename generation strategy. - `"serial"` - `"uuid"` - `"uuid_v7"` - `"ulid"` - `suffix: Optional[str]` This will overwrite the default file suffix. i.e .parquet, use with caution - `jurisdiction: Optional[str]` Jurisdiction this bucket is hosted in - `partitioning: Optional[ConfigCloudflarePipelinesR2TablePartitioning]` Data-layout partitioning for sinks. - `time_pattern: Optional[str]` The pattern of the date string - `path: Optional[str]` Subpath within the bucket to write to - `rolling_policy: Optional[ConfigCloudflarePipelinesR2TableRollingPolicy]` Rolling policy for file sinks (when & why to close a file and open a new one). - `file_size_bytes: Optional[int]` Files will be rolled after reaching this number of bytes - `inactivity_seconds: Optional[int]` Number of seconds of inactivity to wait before rolling over to a new file - `interval_seconds: Optional[int]` Number of seconds to wait before rolling over to a new file - `class ConfigCloudflarePipelinesR2DataCatalogTable: …` R2 Data Catalog Sink - `token: str` Authentication token - `account_id: str` Cloudflare Account ID - `bucket: str` The R2 Bucket that hosts this catalog - `table_name: str` Table name - `namespace: Optional[str]` Table namespace - `rolling_policy: Optional[ConfigCloudflarePipelinesR2DataCatalogTableRollingPolicy]` Rolling policy for file sinks (when & why to close a file and open a new one). - `file_size_bytes: Optional[int]` Files will be rolled after reaching this number of bytes - `inactivity_seconds: Optional[int]` Number of seconds of inactivity to wait before rolling over to a new file - `interval_seconds: Optional[int]` Number of seconds to wait before rolling over to a new file - `format: Optional[Format]` - `class FormatJson: …` - `type: Literal["json"]` - `"json"` - `decimal_encoding: Optional[Literal["number", "string", "bytes"]]` - `"number"` - `"string"` - `"bytes"` - `timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]` - `"rfc3339"` - `"unix_millis"` - `unstructured: Optional[bool]` - `class FormatParquet: …` - `type: Literal["parquet"]` - `"parquet"` - `compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]` - `"uncompressed"` - `"snappy"` - `"gzip"` - `"zstd"` - `"lz4"` - `row_group_bytes: Optional[int]` - `schema: Optional[Schema]` - `fields: Optional[Iterable[SchemaField]]` - `class SchemaFieldInt32: …` - `type: Literal["int32"]` - `"int32"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldInt64: …` - `type: Literal["int64"]` - `"int64"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldFloat32: …` - `type: Literal["float32"]` - `"float32"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldFloat64: …` - `type: Literal["float64"]` - `"float64"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldBool: …` - `type: Literal["bool"]` - `"bool"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldString: …` - `type: Literal["string"]` - `"string"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldBinary: …` - `type: Literal["binary"]` - `"binary"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldTimestamp: …` - `type: Literal["timestamp"]` - `"timestamp"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `unit: Optional[Literal["second", "millisecond", "microsecond", "nanosecond"]]` - `"second"` - `"millisecond"` - `"microsecond"` - `"nanosecond"` - `class SchemaFieldJson: …` - `type: Literal["json"]` - `"json"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldStruct: …` - `class SchemaFieldList: …` - `format: Optional[SchemaFormat]` - `class SchemaFormatJson: …` - `type: Literal["json"]` - `"json"` - `decimal_encoding: Optional[Literal["number", "string", "bytes"]]` - `"number"` - `"string"` - `"bytes"` - `timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]` - `"rfc3339"` - `"unix_millis"` - `unstructured: Optional[bool]` - `class SchemaFormatParquet: …` - `type: Literal["parquet"]` - `"parquet"` - `compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]` - `"uncompressed"` - `"snappy"` - `"gzip"` - `"zstd"` - `"lz4"` - `row_group_bytes: Optional[int]` - `inferred: Optional[bool]` ### Returns - `class SinkCreateResponse: …` - `id: str` Indicates a unique identifier for this sink. - `created_at: datetime` - `modified_at: datetime` - `name: str` Defines the name of the Sink. - `type: Literal["r2", "r2_data_catalog"]` Specifies the type of sink. - `"r2"` - `"r2_data_catalog"` - `config: Optional[Config]` R2 Data Catalog Sink - `class ConfigCloudflarePipelinesR2Table: …` - `account_id: str` Cloudflare Account ID for the bucket - `bucket: str` R2 Bucket to write to - `credentials: ConfigCloudflarePipelinesR2TableCredentials` - `access_key_id: str` Cloudflare Account ID for the bucket - `secret_access_key: str` Cloudflare Account ID for the bucket - `file_naming: Optional[ConfigCloudflarePipelinesR2TableFileNaming]` Controls filename prefix/suffix and strategy. - `prefix: Optional[str]` The prefix to use in file name. i.e prefix-.parquet - `strategy: Optional[Literal["serial", "uuid", "uuid_v7", "ulid"]]` Filename generation strategy. - `"serial"` - `"uuid"` - `"uuid_v7"` - `"ulid"` - `suffix: Optional[str]` This will overwrite the default file suffix. i.e .parquet, use with caution - `jurisdiction: Optional[str]` Jurisdiction this bucket is hosted in - `partitioning: Optional[ConfigCloudflarePipelinesR2TablePartitioning]` Data-layout partitioning for sinks. - `time_pattern: Optional[str]` The pattern of the date string - `path: Optional[str]` Subpath within the bucket to write to - `rolling_policy: Optional[ConfigCloudflarePipelinesR2TableRollingPolicy]` Rolling policy for file sinks (when & why to close a file and open a new one). - `file_size_bytes: Optional[int]` Files will be rolled after reaching this number of bytes - `inactivity_seconds: Optional[int]` Number of seconds of inactivity to wait before rolling over to a new file - `interval_seconds: Optional[int]` Number of seconds to wait before rolling over to a new file - `class ConfigCloudflarePipelinesR2DataCatalogTable: …` R2 Data Catalog Sink - `token: str` Authentication token - `account_id: str` Cloudflare Account ID - `bucket: str` The R2 Bucket that hosts this catalog - `table_name: str` Table name - `namespace: Optional[str]` Table namespace - `rolling_policy: Optional[ConfigCloudflarePipelinesR2DataCatalogTableRollingPolicy]` Rolling policy for file sinks (when & why to close a file and open a new one). - `file_size_bytes: Optional[int]` Files will be rolled after reaching this number of bytes - `inactivity_seconds: Optional[int]` Number of seconds of inactivity to wait before rolling over to a new file - `interval_seconds: Optional[int]` Number of seconds to wait before rolling over to a new file - `format: Optional[Format]` - `class FormatJson: …` - `type: Literal["json"]` - `"json"` - `decimal_encoding: Optional[Literal["number", "string", "bytes"]]` - `"number"` - `"string"` - `"bytes"` - `timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]` - `"rfc3339"` - `"unix_millis"` - `unstructured: Optional[bool]` - `class FormatParquet: …` - `type: Literal["parquet"]` - `"parquet"` - `compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]` - `"uncompressed"` - `"snappy"` - `"gzip"` - `"zstd"` - `"lz4"` - `row_group_bytes: Optional[int]` - `schema: Optional[Schema]` - `fields: Optional[List[SchemaField]]` - `class SchemaFieldInt32: …` - `type: Literal["int32"]` - `"int32"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldInt64: …` - `type: Literal["int64"]` - `"int64"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldFloat32: …` - `type: Literal["float32"]` - `"float32"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldFloat64: …` - `type: Literal["float64"]` - `"float64"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldBool: …` - `type: Literal["bool"]` - `"bool"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldString: …` - `type: Literal["string"]` - `"string"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldBinary: …` - `type: Literal["binary"]` - `"binary"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldTimestamp: …` - `type: Literal["timestamp"]` - `"timestamp"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `unit: Optional[Literal["second", "millisecond", "microsecond", "nanosecond"]]` - `"second"` - `"millisecond"` - `"microsecond"` - `"nanosecond"` - `class SchemaFieldJson: …` - `type: Literal["json"]` - `"json"` - `metadata_key: Optional[str]` - `name: Optional[str]` - `required: Optional[bool]` - `sql_name: Optional[str]` - `class SchemaFieldStruct: …` - `class SchemaFieldList: …` - `format: Optional[SchemaFormat]` - `class SchemaFormatJson: …` - `type: Literal["json"]` - `"json"` - `decimal_encoding: Optional[Literal["number", "string", "bytes"]]` - `"number"` - `"string"` - `"bytes"` - `timestamp_format: Optional[Literal["rfc3339", "unix_millis"]]` - `"rfc3339"` - `"unix_millis"` - `unstructured: Optional[bool]` - `class SchemaFormatParquet: …` - `type: Literal["parquet"]` - `"parquet"` - `compression: Optional[Literal["uncompressed", "snappy", "gzip", 2 more]]` - `"uncompressed"` - `"snappy"` - `"gzip"` - `"zstd"` - `"lz4"` - `row_group_bytes: Optional[int]` - `inferred: Optional[bool]` ### Example ```python import os from cloudflare import Cloudflare client = Cloudflare( api_token=os.environ.get("CLOUDFLARE_API_TOKEN"), # This is the default and can be omitted ) sink = client.pipelines.sinks.create( account_id="0123105f4ecef8ad9ca31a8372d0c353", name="my_sink", type="r2", ) print(sink.id) ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_sink", "type": "r2", "config": { "account_id": "account_id", "bucket": "bucket", "credentials": { "access_key_id": "access_key_id", "secret_access_key": "secret_access_key" }, "file_naming": { "prefix": "prefix", "strategy": "serial", "suffix": "suffix" }, "jurisdiction": "jurisdiction", "partitioning": { "time_pattern": "year=%Y/month=%m/day=%d/hour=%H" }, "path": "path", "rolling_policy": { "file_size_bytes": 0, "inactivity_seconds": 1, "interval_seconds": 1 } }, "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "schema": { "fields": [ { "type": "int32", "metadata_key": "metadata_key", "name": "name", "required": true, "sql_name": "sql_name" } ], "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "inferred": true } }, "success": true } ```