# Pipelines ## [DEPRECATED] List Pipelines `client.Pipelines.List(ctx, params) (*PipelineListResponse, error)` **get** `/accounts/{account_id}/pipelines` [DEPRECATED] List, filter, and paginate pipelines in an account. Use the new /pipelines/v1/pipelines endpoint instead. ### Parameters - `params PipelineListParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Page param.Field[string]` Query param: Specifies which page to retrieve. - `PerPage param.Field[string]` Query param: Specifies the number of pipelines per page. - `Search param.Field[string]` Query param: Specifies the prefix of pipeline name to search. ### Returns - `type PipelineListResponse struct{…}` - `ResultInfo PipelineListResponseResultInfo` - `Count float64` Indicates the number of items on current page. - `Page float64` Indicates the current page number. - `PerPage float64` Indicates the number of items per page. - `TotalCount float64` Indicates the total number of items. - `Results []PipelineListResponseResult` - `ID string` Specifies the pipeline identifier. - `Destination PipelineListResponseResultsDestination` - `Batch PipelineListResponseResultsDestinationBatch` - `MaxBytes int64` Specifies rough maximum size of files. - `MaxDurationS float64` Specifies duration to wait to aggregate batches files. - `MaxRows int64` Specifies rough maximum number of rows per file. - `Compression PipelineListResponseResultsDestinationCompression` - `Type PipelineListResponseResultsDestinationCompressionType` Specifies the desired compression algorithm and format. - `const PipelineListResponseResultsDestinationCompressionTypeNone PipelineListResponseResultsDestinationCompressionType = "none"` - `const PipelineListResponseResultsDestinationCompressionTypeGzip PipelineListResponseResultsDestinationCompressionType = "gzip"` - `const PipelineListResponseResultsDestinationCompressionTypeDeflate PipelineListResponseResultsDestinationCompressionType = "deflate"` - `Format PipelineListResponseResultsDestinationFormat` Specifies the format of data to deliver. - `const PipelineListResponseResultsDestinationFormatJson PipelineListResponseResultsDestinationFormat = "json"` - `Path PipelineListResponseResultsDestinationPath` - `Bucket string` Specifies the R2 Bucket to store files. - `Filename string` Specifies the name pattern to for individual data files. - `Filepath string` Specifies the name pattern for directory. - `Prefix string` Specifies the base directory within the bucket. - `Type PipelineListResponseResultsDestinationType` Specifies the type of destination. - `const PipelineListResponseResultsDestinationTypeR2 PipelineListResponseResultsDestinationType = "r2"` - `Endpoint string` Indicates the endpoint URL to send traffic. - `Name string` Defines the name of the pipeline. - `Source []PipelineListResponseResultsSource` - `type PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesHTTPSource struct{…}` [DEPRECATED] HTTP source configuration. Use the new streams API instead. - `Format PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat` Specifies the format of source data. - `const PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat = "json"` - `Type string` - `Authentication bool` Specifies whether authentication is required to send to this pipeline via HTTP. - `CORS PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceCORS` - `Origins []string` Specifies allowed origins to allow Cross Origin HTTP Requests. - `type PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesBindingSource struct{…}` [DEPRECATED] Worker binding source configuration. Use the new streams API instead. - `Format PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat` Specifies the format of source data. - `const PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormatJson PipelineListResponseResultsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat = "json"` - `Type string` - `Version float64` Indicates the version number of last saved configuration. - `Success bool` Indicates whether the API call was successful. ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) pipelines, err := client.Pipelines.List(context.TODO(), pipelines.PipelineListParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", pipelines.ResultInfo) } ``` #### Response ```json { "result_info": { "count": 1, "page": 0, "per_page": 10, "total_count": 1 }, "results": [ { "id": "123f8a8258064ed892a347f173372359", "destination": { "batch": { "max_bytes": 1000, "max_duration_s": 0.25, "max_rows": 100 }, "compression": { "type": "gzip" }, "format": "json", "path": { "bucket": "bucket", "filename": "${slug}${extension}", "filepath": "${date}/${hour}", "prefix": "base" }, "type": "r2" }, "endpoint": "https://123f8a8258064ed892a347f173372359.pipelines.cloudflare.com", "name": "sample_pipeline", "source": [ { "format": "json", "type": "type", "authentication": true, "cors": { "origins": [ "*" ] } } ], "version": 2 } ], "success": true } ``` ## [DEPRECATED] Get Pipeline `client.Pipelines.Get(ctx, pipelineName, query) (*PipelineGetResponse, error)` **get** `/accounts/{account_id}/pipelines/{pipeline_name}` [DEPRECATED] Get configuration of a pipeline. Use the new /pipelines/v1/pipelines endpoint instead. ### Parameters - `pipelineName string` Defines the name of the pipeline. - `query PipelineGetParams` - `AccountID param.Field[string]` Specifies the public ID of the account. ### Returns - `type PipelineGetResponse struct{…}` [DEPRECATED] Describes the configuration of a pipeline. Use the new streams/sinks/pipelines API instead. - `ID string` Specifies the pipeline identifier. - `Destination PipelineGetResponseDestination` - `Batch PipelineGetResponseDestinationBatch` - `MaxBytes int64` Specifies rough maximum size of files. - `MaxDurationS float64` Specifies duration to wait to aggregate batches files. - `MaxRows int64` Specifies rough maximum number of rows per file. - `Compression PipelineGetResponseDestinationCompression` - `Type PipelineGetResponseDestinationCompressionType` Specifies the desired compression algorithm and format. - `const PipelineGetResponseDestinationCompressionTypeNone PipelineGetResponseDestinationCompressionType = "none"` - `const PipelineGetResponseDestinationCompressionTypeGzip PipelineGetResponseDestinationCompressionType = "gzip"` - `const PipelineGetResponseDestinationCompressionTypeDeflate PipelineGetResponseDestinationCompressionType = "deflate"` - `Format PipelineGetResponseDestinationFormat` Specifies the format of data to deliver. - `const PipelineGetResponseDestinationFormatJson PipelineGetResponseDestinationFormat = "json"` - `Path PipelineGetResponseDestinationPath` - `Bucket string` Specifies the R2 Bucket to store files. - `Filename string` Specifies the name pattern to for individual data files. - `Filepath string` Specifies the name pattern for directory. - `Prefix string` Specifies the base directory within the bucket. - `Type PipelineGetResponseDestinationType` Specifies the type of destination. - `const PipelineGetResponseDestinationTypeR2 PipelineGetResponseDestinationType = "r2"` - `Endpoint string` Indicates the endpoint URL to send traffic. - `Name string` Defines the name of the pipeline. - `Source []PipelineGetResponseSource` - `type PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSource struct{…}` [DEPRECATED] HTTP source configuration. Use the new streams API instead. - `Format PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat` Specifies the format of source data. - `const PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat = "json"` - `Type string` - `Authentication bool` Specifies whether authentication is required to send to this pipeline via HTTP. - `CORS PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceCORS` - `Origins []string` Specifies allowed origins to allow Cross Origin HTTP Requests. - `type PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesBindingSource struct{…}` [DEPRECATED] Worker binding source configuration. Use the new streams API instead. - `Format PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat` Specifies the format of source data. - `const PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormatJson PipelineGetResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat = "json"` - `Type string` - `Version float64` Indicates the version number of last saved configuration. ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) pipeline, err := client.Pipelines.Get( context.TODO(), "sample_pipeline", pipelines.PipelineGetParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", pipeline.ID) } ``` #### Response ```json { "result": { "id": "123f8a8258064ed892a347f173372359", "destination": { "batch": { "max_bytes": 1000, "max_duration_s": 0.25, "max_rows": 100 }, "compression": { "type": "gzip" }, "format": "json", "path": { "bucket": "bucket", "filename": "${slug}${extension}", "filepath": "${date}/${hour}", "prefix": "base" }, "type": "r2" }, "endpoint": "https://123f8a8258064ed892a347f173372359.pipelines.cloudflare.com", "name": "sample_pipeline", "source": [ { "format": "json", "type": "type", "authentication": true, "cors": { "origins": [ "*" ] } } ], "version": 2 }, "success": true } ``` ## [DEPRECATED] Create Pipeline `client.Pipelines.New(ctx, params) (*PipelineNewResponse, error)` **post** `/accounts/{account_id}/pipelines` [DEPRECATED] Create a new pipeline. Use the new /pipelines/v1/pipelines endpoint instead. ### Parameters - `params PipelineNewParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Destination param.Field[PipelineNewParamsDestination]` Body param - `Batch PipelineNewParamsDestinationBatch` - `MaxBytes int64` Specifies rough maximum size of files. - `MaxDurationS float64` Specifies duration to wait to aggregate batches files. - `MaxRows int64` Specifies rough maximum number of rows per file. - `Compression PipelineNewParamsDestinationCompression` - `Type PipelineNewParamsDestinationCompressionType` Specifies the desired compression algorithm and format. - `const PipelineNewParamsDestinationCompressionTypeNone PipelineNewParamsDestinationCompressionType = "none"` - `const PipelineNewParamsDestinationCompressionTypeGzip PipelineNewParamsDestinationCompressionType = "gzip"` - `const PipelineNewParamsDestinationCompressionTypeDeflate PipelineNewParamsDestinationCompressionType = "deflate"` - `Credentials PipelineNewParamsDestinationCredentials` - `AccessKeyID string` Specifies the R2 Bucket Access Key Id. - `Endpoint string` Specifies the R2 Endpoint. - `SecretAccessKey string` Specifies the R2 Bucket Secret Access Key. - `Format PipelineNewParamsDestinationFormat` Specifies the format of data to deliver. - `const PipelineNewParamsDestinationFormatJson PipelineNewParamsDestinationFormat = "json"` - `Path PipelineNewParamsDestinationPath` - `Bucket string` Specifies the R2 Bucket to store files. - `Filename string` Specifies the name pattern to for individual data files. - `Filepath string` Specifies the name pattern for directory. - `Prefix string` Specifies the base directory within the bucket. - `Type PipelineNewParamsDestinationType` Specifies the type of destination. - `const PipelineNewParamsDestinationTypeR2 PipelineNewParamsDestinationType = "r2"` - `Name param.Field[string]` Body param: Defines the name of the pipeline. - `Source param.Field[[]PipelineNewParamsSource]` Body param - `type PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSource struct{…}` [DEPRECATED] HTTP source configuration. Use the new streams API instead. - `Format PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat` Specifies the format of source data. - `const PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat = "json"` - `Type string` - `Authentication bool` Specifies whether authentication is required to send to this pipeline via HTTP. - `CORS PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceCORS` - `Origins []string` Specifies allowed origins to allow Cross Origin HTTP Requests. - `type PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesBindingSource struct{…}` [DEPRECATED] Worker binding source configuration. Use the new streams API instead. - `Format PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat` Specifies the format of source data. - `const PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormatJson PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat = "json"` - `Type string` ### Returns - `type PipelineNewResponse struct{…}` [DEPRECATED] Describes the configuration of a pipeline. Use the new streams/sinks/pipelines API instead. - `ID string` Specifies the pipeline identifier. - `Destination PipelineNewResponseDestination` - `Batch PipelineNewResponseDestinationBatch` - `MaxBytes int64` Specifies rough maximum size of files. - `MaxDurationS float64` Specifies duration to wait to aggregate batches files. - `MaxRows int64` Specifies rough maximum number of rows per file. - `Compression PipelineNewResponseDestinationCompression` - `Type PipelineNewResponseDestinationCompressionType` Specifies the desired compression algorithm and format. - `const PipelineNewResponseDestinationCompressionTypeNone PipelineNewResponseDestinationCompressionType = "none"` - `const PipelineNewResponseDestinationCompressionTypeGzip PipelineNewResponseDestinationCompressionType = "gzip"` - `const PipelineNewResponseDestinationCompressionTypeDeflate PipelineNewResponseDestinationCompressionType = "deflate"` - `Format PipelineNewResponseDestinationFormat` Specifies the format of data to deliver. - `const PipelineNewResponseDestinationFormatJson PipelineNewResponseDestinationFormat = "json"` - `Path PipelineNewResponseDestinationPath` - `Bucket string` Specifies the R2 Bucket to store files. - `Filename string` Specifies the name pattern to for individual data files. - `Filepath string` Specifies the name pattern for directory. - `Prefix string` Specifies the base directory within the bucket. - `Type PipelineNewResponseDestinationType` Specifies the type of destination. - `const PipelineNewResponseDestinationTypeR2 PipelineNewResponseDestinationType = "r2"` - `Endpoint string` Indicates the endpoint URL to send traffic. - `Name string` Defines the name of the pipeline. - `Source []PipelineNewResponseSource` - `type PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSource struct{…}` [DEPRECATED] HTTP source configuration. Use the new streams API instead. - `Format PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat` Specifies the format of source data. - `const PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat = "json"` - `Type string` - `Authentication bool` Specifies whether authentication is required to send to this pipeline via HTTP. - `CORS PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceCORS` - `Origins []string` Specifies allowed origins to allow Cross Origin HTTP Requests. - `type PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesBindingSource struct{…}` [DEPRECATED] Worker binding source configuration. Use the new streams API instead. - `Format PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat` Specifies the format of source data. - `const PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormatJson PipelineNewResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat = "json"` - `Type string` - `Version float64` Indicates the version number of last saved configuration. ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) pipeline, err := client.Pipelines.New(context.TODO(), pipelines.PipelineNewParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), Destination: cloudflare.F(pipelines.PipelineNewParamsDestination{ Batch: cloudflare.F(pipelines.PipelineNewParamsDestinationBatch{ }), Compression: cloudflare.F(pipelines.PipelineNewParamsDestinationCompression{ }), Credentials: cloudflare.F(pipelines.PipelineNewParamsDestinationCredentials{ AccessKeyID: cloudflare.F(""), Endpoint: cloudflare.F("https://123f8a8258064ed892a347f173372359.r2.cloudflarestorage.com"), SecretAccessKey: cloudflare.F(""), }), Format: cloudflare.F(pipelines.PipelineNewParamsDestinationFormatJson), Path: cloudflare.F(pipelines.PipelineNewParamsDestinationPath{ Bucket: cloudflare.F("bucket"), }), Type: cloudflare.F(pipelines.PipelineNewParamsDestinationTypeR2), }), Name: cloudflare.F("sample_pipeline"), Source: cloudflare.F([]pipelines.PipelineNewParamsSourceUnion{pipelines.PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSource{ Format: cloudflare.F(pipelines.PipelineNewParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson), Type: cloudflare.F("type"), }}), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", pipeline.ID) } ``` #### Response ```json { "result": { "id": "123f8a8258064ed892a347f173372359", "destination": { "batch": { "max_bytes": 1000, "max_duration_s": 0.25, "max_rows": 100 }, "compression": { "type": "gzip" }, "format": "json", "path": { "bucket": "bucket", "filename": "${slug}${extension}", "filepath": "${date}/${hour}", "prefix": "base" }, "type": "r2" }, "endpoint": "https://123f8a8258064ed892a347f173372359.pipelines.cloudflare.com", "name": "sample_pipeline", "source": [ { "format": "json", "type": "type", "authentication": true, "cors": { "origins": [ "*" ] } } ], "version": 2 }, "success": true } ``` ## [DEPRECATED] Update Pipeline `client.Pipelines.Update(ctx, pipelineName, params) (*PipelineUpdateResponse, error)` **put** `/accounts/{account_id}/pipelines/{pipeline_name}` [DEPRECATED] Update an existing pipeline. Use the new /pipelines/v1/pipelines endpoint instead. ### Parameters - `pipelineName string` Defines the name of the pipeline. - `params PipelineUpdateParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Destination param.Field[PipelineUpdateParamsDestination]` Body param - `Batch PipelineUpdateParamsDestinationBatch` - `MaxBytes int64` Specifies rough maximum size of files. - `MaxDurationS float64` Specifies duration to wait to aggregate batches files. - `MaxRows int64` Specifies rough maximum number of rows per file. - `Compression PipelineUpdateParamsDestinationCompression` - `Type PipelineUpdateParamsDestinationCompressionType` Specifies the desired compression algorithm and format. - `const PipelineUpdateParamsDestinationCompressionTypeNone PipelineUpdateParamsDestinationCompressionType = "none"` - `const PipelineUpdateParamsDestinationCompressionTypeGzip PipelineUpdateParamsDestinationCompressionType = "gzip"` - `const PipelineUpdateParamsDestinationCompressionTypeDeflate PipelineUpdateParamsDestinationCompressionType = "deflate"` - `Format PipelineUpdateParamsDestinationFormat` Specifies the format of data to deliver. - `const PipelineUpdateParamsDestinationFormatJson PipelineUpdateParamsDestinationFormat = "json"` - `Path PipelineUpdateParamsDestinationPath` - `Bucket string` Specifies the R2 Bucket to store files. - `Filename string` Specifies the name pattern to for individual data files. - `Filepath string` Specifies the name pattern for directory. - `Prefix string` Specifies the base directory within the bucket. - `Type PipelineUpdateParamsDestinationType` Specifies the type of destination. - `const PipelineUpdateParamsDestinationTypeR2 PipelineUpdateParamsDestinationType = "r2"` - `Credentials PipelineUpdateParamsDestinationCredentials` - `AccessKeyID string` Specifies the R2 Bucket Access Key Id. - `Endpoint string` Specifies the R2 Endpoint. - `SecretAccessKey string` Specifies the R2 Bucket Secret Access Key. - `Name param.Field[string]` Body param: Defines the name of the pipeline. - `Source param.Field[[]PipelineUpdateParamsSource]` Body param - `type PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSource struct{…}` [DEPRECATED] HTTP source configuration. Use the new streams API instead. - `Format PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat` Specifies the format of source data. - `const PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat = "json"` - `Type string` - `Authentication bool` Specifies whether authentication is required to send to this pipeline via HTTP. - `CORS PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceCORS` - `Origins []string` Specifies allowed origins to allow Cross Origin HTTP Requests. - `type PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesBindingSource struct{…}` [DEPRECATED] Worker binding source configuration. Use the new streams API instead. - `Format PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat` Specifies the format of source data. - `const PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormatJson PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat = "json"` - `Type string` ### Returns - `type PipelineUpdateResponse struct{…}` [DEPRECATED] Describes the configuration of a pipeline. Use the new streams/sinks/pipelines API instead. - `ID string` Specifies the pipeline identifier. - `Destination PipelineUpdateResponseDestination` - `Batch PipelineUpdateResponseDestinationBatch` - `MaxBytes int64` Specifies rough maximum size of files. - `MaxDurationS float64` Specifies duration to wait to aggregate batches files. - `MaxRows int64` Specifies rough maximum number of rows per file. - `Compression PipelineUpdateResponseDestinationCompression` - `Type PipelineUpdateResponseDestinationCompressionType` Specifies the desired compression algorithm and format. - `const PipelineUpdateResponseDestinationCompressionTypeNone PipelineUpdateResponseDestinationCompressionType = "none"` - `const PipelineUpdateResponseDestinationCompressionTypeGzip PipelineUpdateResponseDestinationCompressionType = "gzip"` - `const PipelineUpdateResponseDestinationCompressionTypeDeflate PipelineUpdateResponseDestinationCompressionType = "deflate"` - `Format PipelineUpdateResponseDestinationFormat` Specifies the format of data to deliver. - `const PipelineUpdateResponseDestinationFormatJson PipelineUpdateResponseDestinationFormat = "json"` - `Path PipelineUpdateResponseDestinationPath` - `Bucket string` Specifies the R2 Bucket to store files. - `Filename string` Specifies the name pattern to for individual data files. - `Filepath string` Specifies the name pattern for directory. - `Prefix string` Specifies the base directory within the bucket. - `Type PipelineUpdateResponseDestinationType` Specifies the type of destination. - `const PipelineUpdateResponseDestinationTypeR2 PipelineUpdateResponseDestinationType = "r2"` - `Endpoint string` Indicates the endpoint URL to send traffic. - `Name string` Defines the name of the pipeline. - `Source []PipelineUpdateResponseSource` - `type PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSource struct{…}` [DEPRECATED] HTTP source configuration. Use the new streams API instead. - `Format PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat` Specifies the format of source data. - `const PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormat = "json"` - `Type string` - `Authentication bool` Specifies whether authentication is required to send to this pipeline via HTTP. - `CORS PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesHTTPSourceCORS` - `Origins []string` Specifies allowed origins to allow Cross Origin HTTP Requests. - `type PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesBindingSource struct{…}` [DEPRECATED] Worker binding source configuration. Use the new streams API instead. - `Format PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat` Specifies the format of source data. - `const PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormatJson PipelineUpdateResponseSourceCloudflarePipelinesWorkersPipelinesBindingSourceFormat = "json"` - `Type string` - `Version float64` Indicates the version number of last saved configuration. ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) pipeline, err := client.Pipelines.Update( context.TODO(), "sample_pipeline", pipelines.PipelineUpdateParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), Destination: cloudflare.F(pipelines.PipelineUpdateParamsDestination{ Batch: cloudflare.F(pipelines.PipelineUpdateParamsDestinationBatch{ }), Compression: cloudflare.F(pipelines.PipelineUpdateParamsDestinationCompression{ }), Format: cloudflare.F(pipelines.PipelineUpdateParamsDestinationFormatJson), Path: cloudflare.F(pipelines.PipelineUpdateParamsDestinationPath{ Bucket: cloudflare.F("bucket"), }), Type: cloudflare.F(pipelines.PipelineUpdateParamsDestinationTypeR2), }), Name: cloudflare.F("sample_pipeline"), Source: cloudflare.F([]pipelines.PipelineUpdateParamsSourceUnion{pipelines.PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSource{ Format: cloudflare.F(pipelines.PipelineUpdateParamsSourceCloudflarePipelinesWorkersPipelinesHTTPSourceFormatJson), Type: cloudflare.F("type"), }}), }, ) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", pipeline.ID) } ``` #### Response ```json { "result": { "id": "123f8a8258064ed892a347f173372359", "destination": { "batch": { "max_bytes": 1000, "max_duration_s": 0.25, "max_rows": 100 }, "compression": { "type": "gzip" }, "format": "json", "path": { "bucket": "bucket", "filename": "${slug}${extension}", "filepath": "${date}/${hour}", "prefix": "base" }, "type": "r2" }, "endpoint": "https://123f8a8258064ed892a347f173372359.pipelines.cloudflare.com", "name": "sample_pipeline", "source": [ { "format": "json", "type": "type", "authentication": true, "cors": { "origins": [ "*" ] } } ], "version": 2 }, "success": true } ``` ## [DEPRECATED] Delete Pipeline `client.Pipelines.Delete(ctx, pipelineName, body) error` **delete** `/accounts/{account_id}/pipelines/{pipeline_name}` [DEPRECATED] Delete a pipeline. Use the new /pipelines/v1/pipelines endpoint instead. ### Parameters - `pipelineName string` Defines the name of the pipeline. - `body PipelineDeleteParams` - `AccountID param.Field[string]` Specifies the public ID of the account. ### Example ```go package main import ( "context" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) err := client.Pipelines.Delete( context.TODO(), "sample_pipeline", pipelines.PipelineDeleteParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } } ``` ## List Pipelines `client.Pipelines.ListV1(ctx, params) (*V4PagePaginationArray[PipelineListV1Response], error)` **get** `/accounts/{account_id}/pipelines/v1/pipelines` List/Filter Pipelines in Account. ### Parameters - `params PipelineListV1Params` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Page param.Field[float64]` Query param - `PerPage param.Field[float64]` Query param ### Returns - `type PipelineListV1Response struct{…}` - `ID string` Indicates a unique identifier for this pipeline. - `CreatedAt string` - `ModifiedAt string` - `Name string` Indicates the name of the Pipeline. - `Sql string` Specifies SQL for the Pipeline processing flow. - `Status string` Indicates the current status of the Pipeline. ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) page, err := client.Pipelines.ListV1(context.TODO(), pipelines.PipelineListV1Params{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", page) } ``` #### Response ```json { "result": [ { "id": "01234567890123457689012345678901", "created_at": "created_at", "modified_at": "modified_at", "name": "my_pipeline", "sql": "insert into sink select * from source;", "status": "status" } ], "result_info": { "count": 1, "page": 0, "per_page": 10, "total_count": 1 }, "success": true } ``` ## Get Pipeline Details `client.Pipelines.GetV1(ctx, pipelineID, query) (*PipelineGetV1Response, error)` **get** `/accounts/{account_id}/pipelines/v1/pipelines/{pipeline_id}` Get Pipelines Details. ### Parameters - `pipelineID string` Specifies the public ID of the pipeline. - `query PipelineGetV1Params` - `AccountID param.Field[string]` Specifies the public ID of the account. ### Returns - `type PipelineGetV1Response struct{…}` - `ID string` Indicates a unique identifier for this pipeline. - `CreatedAt string` - `ModifiedAt string` - `Name string` Indicates the name of the Pipeline. - `Sql string` Specifies SQL for the Pipeline processing flow. - `Status string` Indicates the current status of the Pipeline. - `Tables []PipelineGetV1ResponseTable` List of streams and sinks used by this pipeline. - `ID string` Unique identifier for the connection (stream or sink). - `Latest int64` Latest available version of the connection. - `Name string` Name of the connection. - `Type PipelineGetV1ResponseTablesType` Type of the connection. - `const PipelineGetV1ResponseTablesTypeStream PipelineGetV1ResponseTablesType = "stream"` - `const PipelineGetV1ResponseTablesTypeSink PipelineGetV1ResponseTablesType = "sink"` - `Version int64` Current version of the connection used by this pipeline. - `FailureReason string` Indicates the reason for the failure of the Pipeline. ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) response, err := client.Pipelines.GetV1( context.TODO(), "043e105f4ecef8ad9ca31a8372d0c353", pipelines.PipelineGetV1Params{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", response.ID) } ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "created_at", "modified_at": "modified_at", "name": "my_pipeline", "sql": "insert into sink select * from source;", "status": "status", "tables": [ { "id": "1c9200d5872c018bb34e93e2cd8a438e", "latest": 5, "name": "my_table", "type": "stream", "version": 4 } ], "failure_reason": "failure_reason" }, "success": true } ``` ## Create Pipeline `client.Pipelines.NewV1(ctx, params) (*PipelineNewV1Response, error)` **post** `/accounts/{account_id}/pipelines/v1/pipelines` Create a new Pipeline. ### Parameters - `params PipelineNewV1Params` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Name param.Field[string]` Body param: Specifies the name of the Pipeline. - `Sql param.Field[string]` Body param: Specifies SQL for the Pipeline processing flow. ### Returns - `type PipelineNewV1Response struct{…}` - `ID string` Indicates a unique identifier for this pipeline. - `CreatedAt string` - `ModifiedAt string` - `Name string` Indicates the name of the Pipeline. - `Sql string` Specifies SQL for the Pipeline processing flow. - `Status string` Indicates the current status of the Pipeline. ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) response, err := client.Pipelines.NewV1(context.TODO(), pipelines.PipelineNewV1Params{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), Name: cloudflare.F("my_pipeline"), Sql: cloudflare.F("insert into sink select * from source;"), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", response.ID) } ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "created_at", "modified_at": "modified_at", "name": "my_pipeline", "sql": "insert into sink select * from source;", "status": "status" }, "success": true } ``` ## Delete Pipelines `client.Pipelines.DeleteV1(ctx, pipelineID, body) error` **delete** `/accounts/{account_id}/pipelines/v1/pipelines/{pipeline_id}` Delete Pipeline in Account. ### Parameters - `pipelineID string` Specifies the public ID of the pipeline. - `body PipelineDeleteV1Params` - `AccountID param.Field[string]` Specifies the public ID of the account. ### Example ```go package main import ( "context" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) err := client.Pipelines.DeleteV1( context.TODO(), "043e105f4ecef8ad9ca31a8372d0c353", pipelines.PipelineDeleteV1Params{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } } ``` ## Validate SQL `client.Pipelines.ValidateSql(ctx, params) (*PipelineValidateSqlResponse, error)` **post** `/accounts/{account_id}/pipelines/v1/validate_sql` Validate Arroyo SQL. ### Parameters - `params PipelineValidateSqlParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Sql param.Field[string]` Body param: Specifies SQL to validate. ### Returns - `type PipelineValidateSqlResponse struct{…}` - `Tables map[string, PipelineValidateSqlResponseTable]` Indicates tables involved in the processing. - `ID string` - `Name string` - `Type string` - `Version float64` - `Graph PipelineValidateSqlResponseGraph` - `Edges []PipelineValidateSqlResponseGraphEdge` - `DestID int64` - `EdgeType string` - `KeyType string` - `SrcID int64` - `ValueType string` - `Nodes []PipelineValidateSqlResponseGraphNode` - `Description string` - `NodeID int64` - `Operator string` - `Parallelism int64` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) response, err := client.Pipelines.ValidateSql(context.TODO(), pipelines.PipelineValidateSqlParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), Sql: cloudflare.F("insert into sink select * from source;"), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", response.Tables) } ``` #### Response ```json { "result": { "tables": { "foo": { "id": "id", "name": "name", "type": "type", "version": 0 } }, "graph": { "edges": [ { "dest_id": 0, "edge_type": "edge_type", "key_type": "key_type", "src_id": 0, "value_type": "value_type" } ], "nodes": [ { "description": "description", "node_id": 0, "operator": "operator", "parallelism": 0 } ] } }, "success": true } ``` # Sinks ## List Sinks `client.Pipelines.Sinks.List(ctx, params) (*V4PagePaginationArray[SinkListResponse], error)` **get** `/accounts/{account_id}/pipelines/v1/sinks` List/Filter Sinks in Account. ### Parameters - `params SinkListParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Page param.Field[float64]` Query param - `PerPage param.Field[float64]` Query param - `PipelineID param.Field[string]` Query param ### Returns - `type SinkListResponse struct{…}` - `ID string` Indicates a unique identifier for this sink. - `CreatedAt Time` - `ModifiedAt Time` - `Name string` Defines the name of the Sink. - `Type SinkListResponseType` Specifies the type of sink. - `const SinkListResponseTypeR2 SinkListResponseType = "r2"` - `const SinkListResponseTypeR2DataCatalog SinkListResponseType = "r2_data_catalog"` - `Config SinkListResponseConfig` Defines the configuration of the R2 Sink. - `type SinkListResponseConfigCloudflarePipelinesR2TablePublic struct{…}` R2 Sink public configuration. - `AccountID string` Cloudflare Account ID for the bucket - `Bucket string` R2 Bucket to write to - `FileNaming SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNaming` Controls filename prefix/suffix and strategy. - `Prefix string` The prefix to use in file name. i.e prefix-.parquet - `Strategy SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy` Filename generation strategy. - `const SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategySerial SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "serial"` - `const SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategyUUID SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "uuid"` - `const SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategyUUIDV7 SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "uuid_v7"` - `const SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategyUlid SinkListResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "ulid"` - `Suffix string` This will overwrite the default file suffix. i.e .parquet, use with caution - `Jurisdiction string` Jurisdiction this bucket is hosted in - `Partitioning SinkListResponseConfigCloudflarePipelinesR2TablePublicPartitioning` Data-layout partitioning for sinks. - `TimePattern string` The pattern of the date string - `Path string` Subpath within the bucket to write to - `RollingPolicy SinkListResponseConfigCloudflarePipelinesR2TablePublicRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `type SinkListResponseConfigCloudflarePipelinesR2DataCatalogTablePublic struct{…}` R2 Data Catalog Sink public configuration. - `AccountID string` Cloudflare Account ID - `Bucket string` The R2 Bucket that hosts this catalog - `TableName string` Table name - `Namespace string` Table namespace - `RollingPolicy SinkListResponseConfigCloudflarePipelinesR2DataCatalogTablePublicRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `Format SinkListResponseFormat` - `type SinkListResponseFormatJson struct{…}` - `Type SinkListResponseFormatJsonType` - `const SinkListResponseFormatJsonTypeJson SinkListResponseFormatJsonType = "json"` - `DecimalEncoding SinkListResponseFormatJsonDecimalEncoding` - `const SinkListResponseFormatJsonDecimalEncodingNumber SinkListResponseFormatJsonDecimalEncoding = "number"` - `const SinkListResponseFormatJsonDecimalEncodingString SinkListResponseFormatJsonDecimalEncoding = "string"` - `const SinkListResponseFormatJsonDecimalEncodingBytes SinkListResponseFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkListResponseFormatJsonTimestampFormat` - `const SinkListResponseFormatJsonTimestampFormatRfc3339 SinkListResponseFormatJsonTimestampFormat = "rfc3339"` - `const SinkListResponseFormatJsonTimestampFormatUnixMillis SinkListResponseFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkListResponseFormatParquet struct{…}` - `Type SinkListResponseFormatParquetType` - `const SinkListResponseFormatParquetTypeParquet SinkListResponseFormatParquetType = "parquet"` - `Compression SinkListResponseFormatParquetCompression` - `const SinkListResponseFormatParquetCompressionUncompressed SinkListResponseFormatParquetCompression = "uncompressed"` - `const SinkListResponseFormatParquetCompressionSnappy SinkListResponseFormatParquetCompression = "snappy"` - `const SinkListResponseFormatParquetCompressionGzip SinkListResponseFormatParquetCompression = "gzip"` - `const SinkListResponseFormatParquetCompressionZstd SinkListResponseFormatParquetCompression = "zstd"` - `const SinkListResponseFormatParquetCompressionLz4 SinkListResponseFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Schema SinkListResponseSchema` - `Fields []SinkListResponseSchemaField` - `type SinkListResponseSchemaFieldsInt32 struct{…}` - `Type SinkListResponseSchemaFieldsInt32Type` - `const SinkListResponseSchemaFieldsInt32TypeInt32 SinkListResponseSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsInt64 struct{…}` - `Type SinkListResponseSchemaFieldsInt64Type` - `const SinkListResponseSchemaFieldsInt64TypeInt64 SinkListResponseSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsFloat32 struct{…}` - `Type SinkListResponseSchemaFieldsFloat32Type` - `const SinkListResponseSchemaFieldsFloat32TypeFloat32 SinkListResponseSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsFloat64 struct{…}` - `Type SinkListResponseSchemaFieldsFloat64Type` - `const SinkListResponseSchemaFieldsFloat64TypeFloat64 SinkListResponseSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsBool struct{…}` - `Type SinkListResponseSchemaFieldsBoolType` - `const SinkListResponseSchemaFieldsBoolTypeBool SinkListResponseSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsString struct{…}` - `Type SinkListResponseSchemaFieldsStringType` - `const SinkListResponseSchemaFieldsStringTypeString SinkListResponseSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsBinary struct{…}` - `Type SinkListResponseSchemaFieldsBinaryType` - `const SinkListResponseSchemaFieldsBinaryTypeBinary SinkListResponseSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsTimestamp struct{…}` - `Type SinkListResponseSchemaFieldsTimestampType` - `const SinkListResponseSchemaFieldsTimestampTypeTimestamp SinkListResponseSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit SinkListResponseSchemaFieldsTimestampUnit` - `const SinkListResponseSchemaFieldsTimestampUnitSecond SinkListResponseSchemaFieldsTimestampUnit = "second"` - `const SinkListResponseSchemaFieldsTimestampUnitMillisecond SinkListResponseSchemaFieldsTimestampUnit = "millisecond"` - `const SinkListResponseSchemaFieldsTimestampUnitMicrosecond SinkListResponseSchemaFieldsTimestampUnit = "microsecond"` - `const SinkListResponseSchemaFieldsTimestampUnitNanosecond SinkListResponseSchemaFieldsTimestampUnit = "nanosecond"` - `type SinkListResponseSchemaFieldsJson struct{…}` - `Type SinkListResponseSchemaFieldsJsonType` - `const SinkListResponseSchemaFieldsJsonTypeJson SinkListResponseSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkListResponseSchemaFieldsStruct struct{…}` - `type SinkListResponseSchemaFieldsList struct{…}` - `Format SinkListResponseSchemaFormat` - `type SinkListResponseSchemaFormatJson struct{…}` - `Type SinkListResponseSchemaFormatJsonType` - `const SinkListResponseSchemaFormatJsonTypeJson SinkListResponseSchemaFormatJsonType = "json"` - `DecimalEncoding SinkListResponseSchemaFormatJsonDecimalEncoding` - `const SinkListResponseSchemaFormatJsonDecimalEncodingNumber SinkListResponseSchemaFormatJsonDecimalEncoding = "number"` - `const SinkListResponseSchemaFormatJsonDecimalEncodingString SinkListResponseSchemaFormatJsonDecimalEncoding = "string"` - `const SinkListResponseSchemaFormatJsonDecimalEncodingBytes SinkListResponseSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkListResponseSchemaFormatJsonTimestampFormat` - `const SinkListResponseSchemaFormatJsonTimestampFormatRfc3339 SinkListResponseSchemaFormatJsonTimestampFormat = "rfc3339"` - `const SinkListResponseSchemaFormatJsonTimestampFormatUnixMillis SinkListResponseSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkListResponseSchemaFormatParquet struct{…}` - `Type SinkListResponseSchemaFormatParquetType` - `const SinkListResponseSchemaFormatParquetTypeParquet SinkListResponseSchemaFormatParquetType = "parquet"` - `Compression SinkListResponseSchemaFormatParquetCompression` - `const SinkListResponseSchemaFormatParquetCompressionUncompressed SinkListResponseSchemaFormatParquetCompression = "uncompressed"` - `const SinkListResponseSchemaFormatParquetCompressionSnappy SinkListResponseSchemaFormatParquetCompression = "snappy"` - `const SinkListResponseSchemaFormatParquetCompressionGzip SinkListResponseSchemaFormatParquetCompression = "gzip"` - `const SinkListResponseSchemaFormatParquetCompressionZstd SinkListResponseSchemaFormatParquetCompression = "zstd"` - `const SinkListResponseSchemaFormatParquetCompressionLz4 SinkListResponseSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) page, err := client.Pipelines.Sinks.List(context.TODO(), pipelines.SinkListParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", page) } ``` #### Response ```json { "result": [ { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_sink", "type": "r2", "config": { "account_id": "account_id", "bucket": "bucket", "file_naming": { "prefix": "prefix", "strategy": "serial", "suffix": "suffix" }, "jurisdiction": "jurisdiction", "partitioning": { "time_pattern": "year=%Y/month=%m/day=%d/hour=%H" }, "path": "path", "rolling_policy": { "file_size_bytes": 0, "inactivity_seconds": 1, "interval_seconds": 1 } }, "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "schema": { "fields": [ { "type": "int32", "metadata_key": "metadata_key", "name": "name", "required": true, "sql_name": "sql_name" } ], "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "inferred": true } } ], "result_info": { "count": 1, "page": 0, "per_page": 10, "total_count": 1 }, "success": true } ``` ## Get Sink Details `client.Pipelines.Sinks.Get(ctx, sinkID, query) (*SinkGetResponse, error)` **get** `/accounts/{account_id}/pipelines/v1/sinks/{sink_id}` Get Sink Details. ### Parameters - `sinkID string` Specifies the publid ID of the sink. - `query SinkGetParams` - `AccountID param.Field[string]` Specifies the public ID of the account. ### Returns - `type SinkGetResponse struct{…}` - `ID string` Indicates a unique identifier for this sink. - `CreatedAt Time` - `ModifiedAt Time` - `Name string` Defines the name of the Sink. - `Type SinkGetResponseType` Specifies the type of sink. - `const SinkGetResponseTypeR2 SinkGetResponseType = "r2"` - `const SinkGetResponseTypeR2DataCatalog SinkGetResponseType = "r2_data_catalog"` - `Config SinkGetResponseConfig` Defines the configuration of the R2 Sink. - `type SinkGetResponseConfigCloudflarePipelinesR2TablePublic struct{…}` R2 Sink public configuration. - `AccountID string` Cloudflare Account ID for the bucket - `Bucket string` R2 Bucket to write to - `FileNaming SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNaming` Controls filename prefix/suffix and strategy. - `Prefix string` The prefix to use in file name. i.e prefix-.parquet - `Strategy SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy` Filename generation strategy. - `const SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategySerial SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "serial"` - `const SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategyUUID SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "uuid"` - `const SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategyUUIDV7 SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "uuid_v7"` - `const SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategyUlid SinkGetResponseConfigCloudflarePipelinesR2TablePublicFileNamingStrategy = "ulid"` - `Suffix string` This will overwrite the default file suffix. i.e .parquet, use with caution - `Jurisdiction string` Jurisdiction this bucket is hosted in - `Partitioning SinkGetResponseConfigCloudflarePipelinesR2TablePublicPartitioning` Data-layout partitioning for sinks. - `TimePattern string` The pattern of the date string - `Path string` Subpath within the bucket to write to - `RollingPolicy SinkGetResponseConfigCloudflarePipelinesR2TablePublicRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `type SinkGetResponseConfigCloudflarePipelinesR2DataCatalogTablePublic struct{…}` R2 Data Catalog Sink public configuration. - `AccountID string` Cloudflare Account ID - `Bucket string` The R2 Bucket that hosts this catalog - `TableName string` Table name - `Namespace string` Table namespace - `RollingPolicy SinkGetResponseConfigCloudflarePipelinesR2DataCatalogTablePublicRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `Format SinkGetResponseFormat` - `type SinkGetResponseFormatJson struct{…}` - `Type SinkGetResponseFormatJsonType` - `const SinkGetResponseFormatJsonTypeJson SinkGetResponseFormatJsonType = "json"` - `DecimalEncoding SinkGetResponseFormatJsonDecimalEncoding` - `const SinkGetResponseFormatJsonDecimalEncodingNumber SinkGetResponseFormatJsonDecimalEncoding = "number"` - `const SinkGetResponseFormatJsonDecimalEncodingString SinkGetResponseFormatJsonDecimalEncoding = "string"` - `const SinkGetResponseFormatJsonDecimalEncodingBytes SinkGetResponseFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkGetResponseFormatJsonTimestampFormat` - `const SinkGetResponseFormatJsonTimestampFormatRfc3339 SinkGetResponseFormatJsonTimestampFormat = "rfc3339"` - `const SinkGetResponseFormatJsonTimestampFormatUnixMillis SinkGetResponseFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkGetResponseFormatParquet struct{…}` - `Type SinkGetResponseFormatParquetType` - `const SinkGetResponseFormatParquetTypeParquet SinkGetResponseFormatParquetType = "parquet"` - `Compression SinkGetResponseFormatParquetCompression` - `const SinkGetResponseFormatParquetCompressionUncompressed SinkGetResponseFormatParquetCompression = "uncompressed"` - `const SinkGetResponseFormatParquetCompressionSnappy SinkGetResponseFormatParquetCompression = "snappy"` - `const SinkGetResponseFormatParquetCompressionGzip SinkGetResponseFormatParquetCompression = "gzip"` - `const SinkGetResponseFormatParquetCompressionZstd SinkGetResponseFormatParquetCompression = "zstd"` - `const SinkGetResponseFormatParquetCompressionLz4 SinkGetResponseFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Schema SinkGetResponseSchema` - `Fields []SinkGetResponseSchemaField` - `type SinkGetResponseSchemaFieldsInt32 struct{…}` - `Type SinkGetResponseSchemaFieldsInt32Type` - `const SinkGetResponseSchemaFieldsInt32TypeInt32 SinkGetResponseSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsInt64 struct{…}` - `Type SinkGetResponseSchemaFieldsInt64Type` - `const SinkGetResponseSchemaFieldsInt64TypeInt64 SinkGetResponseSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsFloat32 struct{…}` - `Type SinkGetResponseSchemaFieldsFloat32Type` - `const SinkGetResponseSchemaFieldsFloat32TypeFloat32 SinkGetResponseSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsFloat64 struct{…}` - `Type SinkGetResponseSchemaFieldsFloat64Type` - `const SinkGetResponseSchemaFieldsFloat64TypeFloat64 SinkGetResponseSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsBool struct{…}` - `Type SinkGetResponseSchemaFieldsBoolType` - `const SinkGetResponseSchemaFieldsBoolTypeBool SinkGetResponseSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsString struct{…}` - `Type SinkGetResponseSchemaFieldsStringType` - `const SinkGetResponseSchemaFieldsStringTypeString SinkGetResponseSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsBinary struct{…}` - `Type SinkGetResponseSchemaFieldsBinaryType` - `const SinkGetResponseSchemaFieldsBinaryTypeBinary SinkGetResponseSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsTimestamp struct{…}` - `Type SinkGetResponseSchemaFieldsTimestampType` - `const SinkGetResponseSchemaFieldsTimestampTypeTimestamp SinkGetResponseSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit SinkGetResponseSchemaFieldsTimestampUnit` - `const SinkGetResponseSchemaFieldsTimestampUnitSecond SinkGetResponseSchemaFieldsTimestampUnit = "second"` - `const SinkGetResponseSchemaFieldsTimestampUnitMillisecond SinkGetResponseSchemaFieldsTimestampUnit = "millisecond"` - `const SinkGetResponseSchemaFieldsTimestampUnitMicrosecond SinkGetResponseSchemaFieldsTimestampUnit = "microsecond"` - `const SinkGetResponseSchemaFieldsTimestampUnitNanosecond SinkGetResponseSchemaFieldsTimestampUnit = "nanosecond"` - `type SinkGetResponseSchemaFieldsJson struct{…}` - `Type SinkGetResponseSchemaFieldsJsonType` - `const SinkGetResponseSchemaFieldsJsonTypeJson SinkGetResponseSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkGetResponseSchemaFieldsStruct struct{…}` - `type SinkGetResponseSchemaFieldsList struct{…}` - `Format SinkGetResponseSchemaFormat` - `type SinkGetResponseSchemaFormatJson struct{…}` - `Type SinkGetResponseSchemaFormatJsonType` - `const SinkGetResponseSchemaFormatJsonTypeJson SinkGetResponseSchemaFormatJsonType = "json"` - `DecimalEncoding SinkGetResponseSchemaFormatJsonDecimalEncoding` - `const SinkGetResponseSchemaFormatJsonDecimalEncodingNumber SinkGetResponseSchemaFormatJsonDecimalEncoding = "number"` - `const SinkGetResponseSchemaFormatJsonDecimalEncodingString SinkGetResponseSchemaFormatJsonDecimalEncoding = "string"` - `const SinkGetResponseSchemaFormatJsonDecimalEncodingBytes SinkGetResponseSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkGetResponseSchemaFormatJsonTimestampFormat` - `const SinkGetResponseSchemaFormatJsonTimestampFormatRfc3339 SinkGetResponseSchemaFormatJsonTimestampFormat = "rfc3339"` - `const SinkGetResponseSchemaFormatJsonTimestampFormatUnixMillis SinkGetResponseSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkGetResponseSchemaFormatParquet struct{…}` - `Type SinkGetResponseSchemaFormatParquetType` - `const SinkGetResponseSchemaFormatParquetTypeParquet SinkGetResponseSchemaFormatParquetType = "parquet"` - `Compression SinkGetResponseSchemaFormatParquetCompression` - `const SinkGetResponseSchemaFormatParquetCompressionUncompressed SinkGetResponseSchemaFormatParquetCompression = "uncompressed"` - `const SinkGetResponseSchemaFormatParquetCompressionSnappy SinkGetResponseSchemaFormatParquetCompression = "snappy"` - `const SinkGetResponseSchemaFormatParquetCompressionGzip SinkGetResponseSchemaFormatParquetCompression = "gzip"` - `const SinkGetResponseSchemaFormatParquetCompressionZstd SinkGetResponseSchemaFormatParquetCompression = "zstd"` - `const SinkGetResponseSchemaFormatParquetCompressionLz4 SinkGetResponseSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) sink, err := client.Pipelines.Sinks.Get( context.TODO(), "0223105f4ecef8ad9ca31a8372d0c353", pipelines.SinkGetParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", sink.ID) } ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_sink", "type": "r2", "config": { "account_id": "account_id", "bucket": "bucket", "file_naming": { "prefix": "prefix", "strategy": "serial", "suffix": "suffix" }, "jurisdiction": "jurisdiction", "partitioning": { "time_pattern": "year=%Y/month=%m/day=%d/hour=%H" }, "path": "path", "rolling_policy": { "file_size_bytes": 0, "inactivity_seconds": 1, "interval_seconds": 1 } }, "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "schema": { "fields": [ { "type": "int32", "metadata_key": "metadata_key", "name": "name", "required": true, "sql_name": "sql_name" } ], "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "inferred": true } }, "success": true } ``` ## Create Sink `client.Pipelines.Sinks.New(ctx, params) (*SinkNewResponse, error)` **post** `/accounts/{account_id}/pipelines/v1/sinks` Create a new Sink. ### Parameters - `params SinkNewParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Name param.Field[string]` Body param: Defines the name of the Sink. - `Type param.Field[SinkNewParamsType]` Body param: Specifies the type of sink. - `const SinkNewParamsTypeR2 SinkNewParamsType = "r2"` - `const SinkNewParamsTypeR2DataCatalog SinkNewParamsType = "r2_data_catalog"` - `Config param.Field[SinkNewParamsConfig]` Body param: Defines the configuration of the R2 Sink. - `type SinkNewParamsConfigCloudflarePipelinesR2Table struct{…}` - `AccountID string` Cloudflare Account ID for the bucket - `Bucket string` R2 Bucket to write to - `Credentials SinkNewParamsConfigCloudflarePipelinesR2TableCredentials` - `AccessKeyID string` Cloudflare Account ID for the bucket - `SecretAccessKey string` Cloudflare Account ID for the bucket - `FileNaming SinkNewParamsConfigCloudflarePipelinesR2TableFileNaming` Controls filename prefix/suffix and strategy. - `Prefix string` The prefix to use in file name. i.e prefix-.parquet - `Strategy SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategy` Filename generation strategy. - `const SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategySerial SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategy = "serial"` - `const SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategyUUID SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategy = "uuid"` - `const SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategyUUIDV7 SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategy = "uuid_v7"` - `const SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategyUlid SinkNewParamsConfigCloudflarePipelinesR2TableFileNamingStrategy = "ulid"` - `Suffix string` This will overwrite the default file suffix. i.e .parquet, use with caution - `Jurisdiction string` Jurisdiction this bucket is hosted in - `Partitioning SinkNewParamsConfigCloudflarePipelinesR2TablePartitioning` Data-layout partitioning for sinks. - `TimePattern string` The pattern of the date string - `Path string` Subpath within the bucket to write to - `RollingPolicy SinkNewParamsConfigCloudflarePipelinesR2TableRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `type SinkNewParamsConfigCloudflarePipelinesR2DataCatalogTable struct{…}` R2 Data Catalog Sink - `Token string` Authentication token - `AccountID string` Cloudflare Account ID - `Bucket string` The R2 Bucket that hosts this catalog - `TableName string` Table name - `Namespace string` Table namespace - `RollingPolicy SinkNewParamsConfigCloudflarePipelinesR2DataCatalogTableRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `Format param.Field[SinkNewParamsFormat]` Body param - `type SinkNewParamsFormatJson struct{…}` - `Type SinkNewParamsFormatJsonType` - `const SinkNewParamsFormatJsonTypeJson SinkNewParamsFormatJsonType = "json"` - `DecimalEncoding SinkNewParamsFormatJsonDecimalEncoding` - `const SinkNewParamsFormatJsonDecimalEncodingNumber SinkNewParamsFormatJsonDecimalEncoding = "number"` - `const SinkNewParamsFormatJsonDecimalEncodingString SinkNewParamsFormatJsonDecimalEncoding = "string"` - `const SinkNewParamsFormatJsonDecimalEncodingBytes SinkNewParamsFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkNewParamsFormatJsonTimestampFormat` - `const SinkNewParamsFormatJsonTimestampFormatRfc3339 SinkNewParamsFormatJsonTimestampFormat = "rfc3339"` - `const SinkNewParamsFormatJsonTimestampFormatUnixMillis SinkNewParamsFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkNewParamsFormatParquet struct{…}` - `Type SinkNewParamsFormatParquetType` - `const SinkNewParamsFormatParquetTypeParquet SinkNewParamsFormatParquetType = "parquet"` - `Compression SinkNewParamsFormatParquetCompression` - `const SinkNewParamsFormatParquetCompressionUncompressed SinkNewParamsFormatParquetCompression = "uncompressed"` - `const SinkNewParamsFormatParquetCompressionSnappy SinkNewParamsFormatParquetCompression = "snappy"` - `const SinkNewParamsFormatParquetCompressionGzip SinkNewParamsFormatParquetCompression = "gzip"` - `const SinkNewParamsFormatParquetCompressionZstd SinkNewParamsFormatParquetCompression = "zstd"` - `const SinkNewParamsFormatParquetCompressionLz4 SinkNewParamsFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Schema param.Field[SinkNewParamsSchema]` Body param - `Fields []SinkNewParamsSchemaField` - `type SinkNewParamsSchemaFieldsInt32 struct{…}` - `Type SinkNewParamsSchemaFieldsInt32Type` - `const SinkNewParamsSchemaFieldsInt32TypeInt32 SinkNewParamsSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsInt64 struct{…}` - `Type SinkNewParamsSchemaFieldsInt64Type` - `const SinkNewParamsSchemaFieldsInt64TypeInt64 SinkNewParamsSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsFloat32 struct{…}` - `Type SinkNewParamsSchemaFieldsFloat32Type` - `const SinkNewParamsSchemaFieldsFloat32TypeFloat32 SinkNewParamsSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsFloat64 struct{…}` - `Type SinkNewParamsSchemaFieldsFloat64Type` - `const SinkNewParamsSchemaFieldsFloat64TypeFloat64 SinkNewParamsSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsBool struct{…}` - `Type SinkNewParamsSchemaFieldsBoolType` - `const SinkNewParamsSchemaFieldsBoolTypeBool SinkNewParamsSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsString struct{…}` - `Type SinkNewParamsSchemaFieldsStringType` - `const SinkNewParamsSchemaFieldsStringTypeString SinkNewParamsSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsBinary struct{…}` - `Type SinkNewParamsSchemaFieldsBinaryType` - `const SinkNewParamsSchemaFieldsBinaryTypeBinary SinkNewParamsSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsTimestamp struct{…}` - `Type SinkNewParamsSchemaFieldsTimestampType` - `const SinkNewParamsSchemaFieldsTimestampTypeTimestamp SinkNewParamsSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit SinkNewParamsSchemaFieldsTimestampUnit` - `const SinkNewParamsSchemaFieldsTimestampUnitSecond SinkNewParamsSchemaFieldsTimestampUnit = "second"` - `const SinkNewParamsSchemaFieldsTimestampUnitMillisecond SinkNewParamsSchemaFieldsTimestampUnit = "millisecond"` - `const SinkNewParamsSchemaFieldsTimestampUnitMicrosecond SinkNewParamsSchemaFieldsTimestampUnit = "microsecond"` - `const SinkNewParamsSchemaFieldsTimestampUnitNanosecond SinkNewParamsSchemaFieldsTimestampUnit = "nanosecond"` - `type SinkNewParamsSchemaFieldsJson struct{…}` - `Type SinkNewParamsSchemaFieldsJsonType` - `const SinkNewParamsSchemaFieldsJsonTypeJson SinkNewParamsSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewParamsSchemaFieldsStruct struct{…}` - `type SinkNewParamsSchemaFieldsList struct{…}` - `Format SinkNewParamsSchemaFormat` - `type SinkNewParamsSchemaFormatJson struct{…}` - `Type SinkNewParamsSchemaFormatJsonType` - `const SinkNewParamsSchemaFormatJsonTypeJson SinkNewParamsSchemaFormatJsonType = "json"` - `DecimalEncoding SinkNewParamsSchemaFormatJsonDecimalEncoding` - `const SinkNewParamsSchemaFormatJsonDecimalEncodingNumber SinkNewParamsSchemaFormatJsonDecimalEncoding = "number"` - `const SinkNewParamsSchemaFormatJsonDecimalEncodingString SinkNewParamsSchemaFormatJsonDecimalEncoding = "string"` - `const SinkNewParamsSchemaFormatJsonDecimalEncodingBytes SinkNewParamsSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkNewParamsSchemaFormatJsonTimestampFormat` - `const SinkNewParamsSchemaFormatJsonTimestampFormatRfc3339 SinkNewParamsSchemaFormatJsonTimestampFormat = "rfc3339"` - `const SinkNewParamsSchemaFormatJsonTimestampFormatUnixMillis SinkNewParamsSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkNewParamsSchemaFormatParquet struct{…}` - `Type SinkNewParamsSchemaFormatParquetType` - `const SinkNewParamsSchemaFormatParquetTypeParquet SinkNewParamsSchemaFormatParquetType = "parquet"` - `Compression SinkNewParamsSchemaFormatParquetCompression` - `const SinkNewParamsSchemaFormatParquetCompressionUncompressed SinkNewParamsSchemaFormatParquetCompression = "uncompressed"` - `const SinkNewParamsSchemaFormatParquetCompressionSnappy SinkNewParamsSchemaFormatParquetCompression = "snappy"` - `const SinkNewParamsSchemaFormatParquetCompressionGzip SinkNewParamsSchemaFormatParquetCompression = "gzip"` - `const SinkNewParamsSchemaFormatParquetCompressionZstd SinkNewParamsSchemaFormatParquetCompression = "zstd"` - `const SinkNewParamsSchemaFormatParquetCompressionLz4 SinkNewParamsSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` ### Returns - `type SinkNewResponse struct{…}` - `ID string` Indicates a unique identifier for this sink. - `CreatedAt Time` - `ModifiedAt Time` - `Name string` Defines the name of the Sink. - `Type SinkNewResponseType` Specifies the type of sink. - `const SinkNewResponseTypeR2 SinkNewResponseType = "r2"` - `const SinkNewResponseTypeR2DataCatalog SinkNewResponseType = "r2_data_catalog"` - `Config SinkNewResponseConfig` R2 Data Catalog Sink - `type SinkNewResponseConfigCloudflarePipelinesR2Table struct{…}` - `AccountID string` Cloudflare Account ID for the bucket - `Bucket string` R2 Bucket to write to - `Credentials SinkNewResponseConfigCloudflarePipelinesR2TableCredentials` - `AccessKeyID string` Cloudflare Account ID for the bucket - `SecretAccessKey string` Cloudflare Account ID for the bucket - `FileNaming SinkNewResponseConfigCloudflarePipelinesR2TableFileNaming` Controls filename prefix/suffix and strategy. - `Prefix string` The prefix to use in file name. i.e prefix-.parquet - `Strategy SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategy` Filename generation strategy. - `const SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategySerial SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategy = "serial"` - `const SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategyUUID SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategy = "uuid"` - `const SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategyUUIDV7 SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategy = "uuid_v7"` - `const SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategyUlid SinkNewResponseConfigCloudflarePipelinesR2TableFileNamingStrategy = "ulid"` - `Suffix string` This will overwrite the default file suffix. i.e .parquet, use with caution - `Jurisdiction string` Jurisdiction this bucket is hosted in - `Partitioning SinkNewResponseConfigCloudflarePipelinesR2TablePartitioning` Data-layout partitioning for sinks. - `TimePattern string` The pattern of the date string - `Path string` Subpath within the bucket to write to - `RollingPolicy SinkNewResponseConfigCloudflarePipelinesR2TableRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `type SinkNewResponseConfigCloudflarePipelinesR2DataCatalogTable struct{…}` R2 Data Catalog Sink - `Token string` Authentication token - `AccountID string` Cloudflare Account ID - `Bucket string` The R2 Bucket that hosts this catalog - `TableName string` Table name - `Namespace string` Table namespace - `RollingPolicy SinkNewResponseConfigCloudflarePipelinesR2DataCatalogTableRollingPolicy` Rolling policy for file sinks (when & why to close a file and open a new one). - `FileSizeBytes int64` Files will be rolled after reaching this number of bytes - `InactivitySeconds int64` Number of seconds of inactivity to wait before rolling over to a new file - `IntervalSeconds int64` Number of seconds to wait before rolling over to a new file - `Format SinkNewResponseFormat` - `type SinkNewResponseFormatJson struct{…}` - `Type SinkNewResponseFormatJsonType` - `const SinkNewResponseFormatJsonTypeJson SinkNewResponseFormatJsonType = "json"` - `DecimalEncoding SinkNewResponseFormatJsonDecimalEncoding` - `const SinkNewResponseFormatJsonDecimalEncodingNumber SinkNewResponseFormatJsonDecimalEncoding = "number"` - `const SinkNewResponseFormatJsonDecimalEncodingString SinkNewResponseFormatJsonDecimalEncoding = "string"` - `const SinkNewResponseFormatJsonDecimalEncodingBytes SinkNewResponseFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkNewResponseFormatJsonTimestampFormat` - `const SinkNewResponseFormatJsonTimestampFormatRfc3339 SinkNewResponseFormatJsonTimestampFormat = "rfc3339"` - `const SinkNewResponseFormatJsonTimestampFormatUnixMillis SinkNewResponseFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkNewResponseFormatParquet struct{…}` - `Type SinkNewResponseFormatParquetType` - `const SinkNewResponseFormatParquetTypeParquet SinkNewResponseFormatParquetType = "parquet"` - `Compression SinkNewResponseFormatParquetCompression` - `const SinkNewResponseFormatParquetCompressionUncompressed SinkNewResponseFormatParquetCompression = "uncompressed"` - `const SinkNewResponseFormatParquetCompressionSnappy SinkNewResponseFormatParquetCompression = "snappy"` - `const SinkNewResponseFormatParquetCompressionGzip SinkNewResponseFormatParquetCompression = "gzip"` - `const SinkNewResponseFormatParquetCompressionZstd SinkNewResponseFormatParquetCompression = "zstd"` - `const SinkNewResponseFormatParquetCompressionLz4 SinkNewResponseFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Schema SinkNewResponseSchema` - `Fields []SinkNewResponseSchemaField` - `type SinkNewResponseSchemaFieldsInt32 struct{…}` - `Type SinkNewResponseSchemaFieldsInt32Type` - `const SinkNewResponseSchemaFieldsInt32TypeInt32 SinkNewResponseSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsInt64 struct{…}` - `Type SinkNewResponseSchemaFieldsInt64Type` - `const SinkNewResponseSchemaFieldsInt64TypeInt64 SinkNewResponseSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsFloat32 struct{…}` - `Type SinkNewResponseSchemaFieldsFloat32Type` - `const SinkNewResponseSchemaFieldsFloat32TypeFloat32 SinkNewResponseSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsFloat64 struct{…}` - `Type SinkNewResponseSchemaFieldsFloat64Type` - `const SinkNewResponseSchemaFieldsFloat64TypeFloat64 SinkNewResponseSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsBool struct{…}` - `Type SinkNewResponseSchemaFieldsBoolType` - `const SinkNewResponseSchemaFieldsBoolTypeBool SinkNewResponseSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsString struct{…}` - `Type SinkNewResponseSchemaFieldsStringType` - `const SinkNewResponseSchemaFieldsStringTypeString SinkNewResponseSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsBinary struct{…}` - `Type SinkNewResponseSchemaFieldsBinaryType` - `const SinkNewResponseSchemaFieldsBinaryTypeBinary SinkNewResponseSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsTimestamp struct{…}` - `Type SinkNewResponseSchemaFieldsTimestampType` - `const SinkNewResponseSchemaFieldsTimestampTypeTimestamp SinkNewResponseSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit SinkNewResponseSchemaFieldsTimestampUnit` - `const SinkNewResponseSchemaFieldsTimestampUnitSecond SinkNewResponseSchemaFieldsTimestampUnit = "second"` - `const SinkNewResponseSchemaFieldsTimestampUnitMillisecond SinkNewResponseSchemaFieldsTimestampUnit = "millisecond"` - `const SinkNewResponseSchemaFieldsTimestampUnitMicrosecond SinkNewResponseSchemaFieldsTimestampUnit = "microsecond"` - `const SinkNewResponseSchemaFieldsTimestampUnitNanosecond SinkNewResponseSchemaFieldsTimestampUnit = "nanosecond"` - `type SinkNewResponseSchemaFieldsJson struct{…}` - `Type SinkNewResponseSchemaFieldsJsonType` - `const SinkNewResponseSchemaFieldsJsonTypeJson SinkNewResponseSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type SinkNewResponseSchemaFieldsStruct struct{…}` - `type SinkNewResponseSchemaFieldsList struct{…}` - `Format SinkNewResponseSchemaFormat` - `type SinkNewResponseSchemaFormatJson struct{…}` - `Type SinkNewResponseSchemaFormatJsonType` - `const SinkNewResponseSchemaFormatJsonTypeJson SinkNewResponseSchemaFormatJsonType = "json"` - `DecimalEncoding SinkNewResponseSchemaFormatJsonDecimalEncoding` - `const SinkNewResponseSchemaFormatJsonDecimalEncodingNumber SinkNewResponseSchemaFormatJsonDecimalEncoding = "number"` - `const SinkNewResponseSchemaFormatJsonDecimalEncodingString SinkNewResponseSchemaFormatJsonDecimalEncoding = "string"` - `const SinkNewResponseSchemaFormatJsonDecimalEncodingBytes SinkNewResponseSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat SinkNewResponseSchemaFormatJsonTimestampFormat` - `const SinkNewResponseSchemaFormatJsonTimestampFormatRfc3339 SinkNewResponseSchemaFormatJsonTimestampFormat = "rfc3339"` - `const SinkNewResponseSchemaFormatJsonTimestampFormatUnixMillis SinkNewResponseSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type SinkNewResponseSchemaFormatParquet struct{…}` - `Type SinkNewResponseSchemaFormatParquetType` - `const SinkNewResponseSchemaFormatParquetTypeParquet SinkNewResponseSchemaFormatParquetType = "parquet"` - `Compression SinkNewResponseSchemaFormatParquetCompression` - `const SinkNewResponseSchemaFormatParquetCompressionUncompressed SinkNewResponseSchemaFormatParquetCompression = "uncompressed"` - `const SinkNewResponseSchemaFormatParquetCompressionSnappy SinkNewResponseSchemaFormatParquetCompression = "snappy"` - `const SinkNewResponseSchemaFormatParquetCompressionGzip SinkNewResponseSchemaFormatParquetCompression = "gzip"` - `const SinkNewResponseSchemaFormatParquetCompressionZstd SinkNewResponseSchemaFormatParquetCompression = "zstd"` - `const SinkNewResponseSchemaFormatParquetCompressionLz4 SinkNewResponseSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) sink, err := client.Pipelines.Sinks.New(context.TODO(), pipelines.SinkNewParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), Name: cloudflare.F("my_sink"), Type: cloudflare.F(pipelines.SinkNewParamsTypeR2), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", sink.ID) } ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_sink", "type": "r2", "config": { "account_id": "account_id", "bucket": "bucket", "credentials": { "access_key_id": "access_key_id", "secret_access_key": "secret_access_key" }, "file_naming": { "prefix": "prefix", "strategy": "serial", "suffix": "suffix" }, "jurisdiction": "jurisdiction", "partitioning": { "time_pattern": "year=%Y/month=%m/day=%d/hour=%H" }, "path": "path", "rolling_policy": { "file_size_bytes": 0, "inactivity_seconds": 1, "interval_seconds": 1 } }, "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "schema": { "fields": [ { "type": "int32", "metadata_key": "metadata_key", "name": "name", "required": true, "sql_name": "sql_name" } ], "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "inferred": true } }, "success": true } ``` ## Delete Sink `client.Pipelines.Sinks.Delete(ctx, sinkID, params) error` **delete** `/accounts/{account_id}/pipelines/v1/sinks/{sink_id}` Delete Pipeline in Account. ### Parameters - `sinkID string` Specifies the publid ID of the sink. - `params SinkDeleteParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Force param.Field[string]` Query param: Delete sink forcefully, including deleting any dependent pipelines. ### Example ```go package main import ( "context" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) err := client.Pipelines.Sinks.Delete( context.TODO(), "0223105f4ecef8ad9ca31a8372d0c353", pipelines.SinkDeleteParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } } ``` # Streams ## List Streams `client.Pipelines.Streams.List(ctx, params) (*V4PagePaginationArray[StreamListResponse], error)` **get** `/accounts/{account_id}/pipelines/v1/streams` List/Filter Streams in Account. ### Parameters - `params StreamListParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Page param.Field[float64]` Query param - `PerPage param.Field[float64]` Query param - `PipelineID param.Field[string]` Query param: Specifies the public ID of the pipeline. ### Returns - `type StreamListResponse struct{…}` - `ID string` Indicates a unique identifier for this stream. - `CreatedAt Time` - `HTTP StreamListResponseHTTP` - `Authentication bool` Indicates that authentication is required for the HTTP endpoint. - `Enabled bool` Indicates that the HTTP endpoint is enabled. - `CORS StreamListResponseHTTPCORS` Specifies the CORS options for the HTTP endpoint. - `Origins []string` - `ModifiedAt Time` - `Name string` Indicates the name of the Stream. - `Version int64` Indicates the current version of this stream. - `WorkerBinding StreamListResponseWorkerBinding` - `Enabled bool` Indicates that the worker binding is enabled. - `Endpoint string` Indicates the endpoint URL of this stream. - `Format StreamListResponseFormat` - `type StreamListResponseFormatJson struct{…}` - `Type StreamListResponseFormatJsonType` - `const StreamListResponseFormatJsonTypeJson StreamListResponseFormatJsonType = "json"` - `DecimalEncoding StreamListResponseFormatJsonDecimalEncoding` - `const StreamListResponseFormatJsonDecimalEncodingNumber StreamListResponseFormatJsonDecimalEncoding = "number"` - `const StreamListResponseFormatJsonDecimalEncodingString StreamListResponseFormatJsonDecimalEncoding = "string"` - `const StreamListResponseFormatJsonDecimalEncodingBytes StreamListResponseFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamListResponseFormatJsonTimestampFormat` - `const StreamListResponseFormatJsonTimestampFormatRfc3339 StreamListResponseFormatJsonTimestampFormat = "rfc3339"` - `const StreamListResponseFormatJsonTimestampFormatUnixMillis StreamListResponseFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamListResponseFormatParquet struct{…}` - `Type StreamListResponseFormatParquetType` - `const StreamListResponseFormatParquetTypeParquet StreamListResponseFormatParquetType = "parquet"` - `Compression StreamListResponseFormatParquetCompression` - `const StreamListResponseFormatParquetCompressionUncompressed StreamListResponseFormatParquetCompression = "uncompressed"` - `const StreamListResponseFormatParquetCompressionSnappy StreamListResponseFormatParquetCompression = "snappy"` - `const StreamListResponseFormatParquetCompressionGzip StreamListResponseFormatParquetCompression = "gzip"` - `const StreamListResponseFormatParquetCompressionZstd StreamListResponseFormatParquetCompression = "zstd"` - `const StreamListResponseFormatParquetCompressionLz4 StreamListResponseFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Schema StreamListResponseSchema` - `Fields []StreamListResponseSchemaField` - `type StreamListResponseSchemaFieldsInt32 struct{…}` - `Type StreamListResponseSchemaFieldsInt32Type` - `const StreamListResponseSchemaFieldsInt32TypeInt32 StreamListResponseSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsInt64 struct{…}` - `Type StreamListResponseSchemaFieldsInt64Type` - `const StreamListResponseSchemaFieldsInt64TypeInt64 StreamListResponseSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsFloat32 struct{…}` - `Type StreamListResponseSchemaFieldsFloat32Type` - `const StreamListResponseSchemaFieldsFloat32TypeFloat32 StreamListResponseSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsFloat64 struct{…}` - `Type StreamListResponseSchemaFieldsFloat64Type` - `const StreamListResponseSchemaFieldsFloat64TypeFloat64 StreamListResponseSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsBool struct{…}` - `Type StreamListResponseSchemaFieldsBoolType` - `const StreamListResponseSchemaFieldsBoolTypeBool StreamListResponseSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsString struct{…}` - `Type StreamListResponseSchemaFieldsStringType` - `const StreamListResponseSchemaFieldsStringTypeString StreamListResponseSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsBinary struct{…}` - `Type StreamListResponseSchemaFieldsBinaryType` - `const StreamListResponseSchemaFieldsBinaryTypeBinary StreamListResponseSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsTimestamp struct{…}` - `Type StreamListResponseSchemaFieldsTimestampType` - `const StreamListResponseSchemaFieldsTimestampTypeTimestamp StreamListResponseSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit StreamListResponseSchemaFieldsTimestampUnit` - `const StreamListResponseSchemaFieldsTimestampUnitSecond StreamListResponseSchemaFieldsTimestampUnit = "second"` - `const StreamListResponseSchemaFieldsTimestampUnitMillisecond StreamListResponseSchemaFieldsTimestampUnit = "millisecond"` - `const StreamListResponseSchemaFieldsTimestampUnitMicrosecond StreamListResponseSchemaFieldsTimestampUnit = "microsecond"` - `const StreamListResponseSchemaFieldsTimestampUnitNanosecond StreamListResponseSchemaFieldsTimestampUnit = "nanosecond"` - `type StreamListResponseSchemaFieldsJson struct{…}` - `Type StreamListResponseSchemaFieldsJsonType` - `const StreamListResponseSchemaFieldsJsonTypeJson StreamListResponseSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamListResponseSchemaFieldsStruct struct{…}` - `type StreamListResponseSchemaFieldsList struct{…}` - `Format StreamListResponseSchemaFormat` - `type StreamListResponseSchemaFormatJson struct{…}` - `Type StreamListResponseSchemaFormatJsonType` - `const StreamListResponseSchemaFormatJsonTypeJson StreamListResponseSchemaFormatJsonType = "json"` - `DecimalEncoding StreamListResponseSchemaFormatJsonDecimalEncoding` - `const StreamListResponseSchemaFormatJsonDecimalEncodingNumber StreamListResponseSchemaFormatJsonDecimalEncoding = "number"` - `const StreamListResponseSchemaFormatJsonDecimalEncodingString StreamListResponseSchemaFormatJsonDecimalEncoding = "string"` - `const StreamListResponseSchemaFormatJsonDecimalEncodingBytes StreamListResponseSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamListResponseSchemaFormatJsonTimestampFormat` - `const StreamListResponseSchemaFormatJsonTimestampFormatRfc3339 StreamListResponseSchemaFormatJsonTimestampFormat = "rfc3339"` - `const StreamListResponseSchemaFormatJsonTimestampFormatUnixMillis StreamListResponseSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamListResponseSchemaFormatParquet struct{…}` - `Type StreamListResponseSchemaFormatParquetType` - `const StreamListResponseSchemaFormatParquetTypeParquet StreamListResponseSchemaFormatParquetType = "parquet"` - `Compression StreamListResponseSchemaFormatParquetCompression` - `const StreamListResponseSchemaFormatParquetCompressionUncompressed StreamListResponseSchemaFormatParquetCompression = "uncompressed"` - `const StreamListResponseSchemaFormatParquetCompressionSnappy StreamListResponseSchemaFormatParquetCompression = "snappy"` - `const StreamListResponseSchemaFormatParquetCompressionGzip StreamListResponseSchemaFormatParquetCompression = "gzip"` - `const StreamListResponseSchemaFormatParquetCompressionZstd StreamListResponseSchemaFormatParquetCompression = "zstd"` - `const StreamListResponseSchemaFormatParquetCompressionLz4 StreamListResponseSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) page, err := client.Pipelines.Streams.List(context.TODO(), pipelines.StreamListParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", page) } ``` #### Response ```json { "result": [ { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "http": { "authentication": false, "enabled": true, "cors": { "origins": [ "string" ] } }, "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_stream", "version": 3, "worker_binding": { "enabled": true }, "endpoint": "https://01234567890123457689012345678901.ingest.cloudflare.com/v1", "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "schema": { "fields": [ { "type": "int32", "metadata_key": "metadata_key", "name": "name", "required": true, "sql_name": "sql_name" } ], "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "inferred": true } } ], "result_info": { "count": 1, "page": 0, "per_page": 10, "total_count": 1 }, "success": true } ``` ## Get Stream Details `client.Pipelines.Streams.Get(ctx, streamID, query) (*StreamGetResponse, error)` **get** `/accounts/{account_id}/pipelines/v1/streams/{stream_id}` Get Stream Details. ### Parameters - `streamID string` Specifies the public ID of the stream. - `query StreamGetParams` - `AccountID param.Field[string]` Specifies the public ID of the account. ### Returns - `type StreamGetResponse struct{…}` - `ID string` Indicates a unique identifier for this stream. - `CreatedAt Time` - `HTTP StreamGetResponseHTTP` - `Authentication bool` Indicates that authentication is required for the HTTP endpoint. - `Enabled bool` Indicates that the HTTP endpoint is enabled. - `CORS StreamGetResponseHTTPCORS` Specifies the CORS options for the HTTP endpoint. - `Origins []string` - `ModifiedAt Time` - `Name string` Indicates the name of the Stream. - `Version int64` Indicates the current version of this stream. - `WorkerBinding StreamGetResponseWorkerBinding` - `Enabled bool` Indicates that the worker binding is enabled. - `Endpoint string` Indicates the endpoint URL of this stream. - `Format StreamGetResponseFormat` - `type StreamGetResponseFormatJson struct{…}` - `Type StreamGetResponseFormatJsonType` - `const StreamGetResponseFormatJsonTypeJson StreamGetResponseFormatJsonType = "json"` - `DecimalEncoding StreamGetResponseFormatJsonDecimalEncoding` - `const StreamGetResponseFormatJsonDecimalEncodingNumber StreamGetResponseFormatJsonDecimalEncoding = "number"` - `const StreamGetResponseFormatJsonDecimalEncodingString StreamGetResponseFormatJsonDecimalEncoding = "string"` - `const StreamGetResponseFormatJsonDecimalEncodingBytes StreamGetResponseFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamGetResponseFormatJsonTimestampFormat` - `const StreamGetResponseFormatJsonTimestampFormatRfc3339 StreamGetResponseFormatJsonTimestampFormat = "rfc3339"` - `const StreamGetResponseFormatJsonTimestampFormatUnixMillis StreamGetResponseFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamGetResponseFormatParquet struct{…}` - `Type StreamGetResponseFormatParquetType` - `const StreamGetResponseFormatParquetTypeParquet StreamGetResponseFormatParquetType = "parquet"` - `Compression StreamGetResponseFormatParquetCompression` - `const StreamGetResponseFormatParquetCompressionUncompressed StreamGetResponseFormatParquetCompression = "uncompressed"` - `const StreamGetResponseFormatParquetCompressionSnappy StreamGetResponseFormatParquetCompression = "snappy"` - `const StreamGetResponseFormatParquetCompressionGzip StreamGetResponseFormatParquetCompression = "gzip"` - `const StreamGetResponseFormatParquetCompressionZstd StreamGetResponseFormatParquetCompression = "zstd"` - `const StreamGetResponseFormatParquetCompressionLz4 StreamGetResponseFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Schema StreamGetResponseSchema` - `Fields []StreamGetResponseSchemaField` - `type StreamGetResponseSchemaFieldsInt32 struct{…}` - `Type StreamGetResponseSchemaFieldsInt32Type` - `const StreamGetResponseSchemaFieldsInt32TypeInt32 StreamGetResponseSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsInt64 struct{…}` - `Type StreamGetResponseSchemaFieldsInt64Type` - `const StreamGetResponseSchemaFieldsInt64TypeInt64 StreamGetResponseSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsFloat32 struct{…}` - `Type StreamGetResponseSchemaFieldsFloat32Type` - `const StreamGetResponseSchemaFieldsFloat32TypeFloat32 StreamGetResponseSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsFloat64 struct{…}` - `Type StreamGetResponseSchemaFieldsFloat64Type` - `const StreamGetResponseSchemaFieldsFloat64TypeFloat64 StreamGetResponseSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsBool struct{…}` - `Type StreamGetResponseSchemaFieldsBoolType` - `const StreamGetResponseSchemaFieldsBoolTypeBool StreamGetResponseSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsString struct{…}` - `Type StreamGetResponseSchemaFieldsStringType` - `const StreamGetResponseSchemaFieldsStringTypeString StreamGetResponseSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsBinary struct{…}` - `Type StreamGetResponseSchemaFieldsBinaryType` - `const StreamGetResponseSchemaFieldsBinaryTypeBinary StreamGetResponseSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsTimestamp struct{…}` - `Type StreamGetResponseSchemaFieldsTimestampType` - `const StreamGetResponseSchemaFieldsTimestampTypeTimestamp StreamGetResponseSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit StreamGetResponseSchemaFieldsTimestampUnit` - `const StreamGetResponseSchemaFieldsTimestampUnitSecond StreamGetResponseSchemaFieldsTimestampUnit = "second"` - `const StreamGetResponseSchemaFieldsTimestampUnitMillisecond StreamGetResponseSchemaFieldsTimestampUnit = "millisecond"` - `const StreamGetResponseSchemaFieldsTimestampUnitMicrosecond StreamGetResponseSchemaFieldsTimestampUnit = "microsecond"` - `const StreamGetResponseSchemaFieldsTimestampUnitNanosecond StreamGetResponseSchemaFieldsTimestampUnit = "nanosecond"` - `type StreamGetResponseSchemaFieldsJson struct{…}` - `Type StreamGetResponseSchemaFieldsJsonType` - `const StreamGetResponseSchemaFieldsJsonTypeJson StreamGetResponseSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamGetResponseSchemaFieldsStruct struct{…}` - `type StreamGetResponseSchemaFieldsList struct{…}` - `Format StreamGetResponseSchemaFormat` - `type StreamGetResponseSchemaFormatJson struct{…}` - `Type StreamGetResponseSchemaFormatJsonType` - `const StreamGetResponseSchemaFormatJsonTypeJson StreamGetResponseSchemaFormatJsonType = "json"` - `DecimalEncoding StreamGetResponseSchemaFormatJsonDecimalEncoding` - `const StreamGetResponseSchemaFormatJsonDecimalEncodingNumber StreamGetResponseSchemaFormatJsonDecimalEncoding = "number"` - `const StreamGetResponseSchemaFormatJsonDecimalEncodingString StreamGetResponseSchemaFormatJsonDecimalEncoding = "string"` - `const StreamGetResponseSchemaFormatJsonDecimalEncodingBytes StreamGetResponseSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamGetResponseSchemaFormatJsonTimestampFormat` - `const StreamGetResponseSchemaFormatJsonTimestampFormatRfc3339 StreamGetResponseSchemaFormatJsonTimestampFormat = "rfc3339"` - `const StreamGetResponseSchemaFormatJsonTimestampFormatUnixMillis StreamGetResponseSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamGetResponseSchemaFormatParquet struct{…}` - `Type StreamGetResponseSchemaFormatParquetType` - `const StreamGetResponseSchemaFormatParquetTypeParquet StreamGetResponseSchemaFormatParquetType = "parquet"` - `Compression StreamGetResponseSchemaFormatParquetCompression` - `const StreamGetResponseSchemaFormatParquetCompressionUncompressed StreamGetResponseSchemaFormatParquetCompression = "uncompressed"` - `const StreamGetResponseSchemaFormatParquetCompressionSnappy StreamGetResponseSchemaFormatParquetCompression = "snappy"` - `const StreamGetResponseSchemaFormatParquetCompressionGzip StreamGetResponseSchemaFormatParquetCompression = "gzip"` - `const StreamGetResponseSchemaFormatParquetCompressionZstd StreamGetResponseSchemaFormatParquetCompression = "zstd"` - `const StreamGetResponseSchemaFormatParquetCompressionLz4 StreamGetResponseSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) stream, err := client.Pipelines.Streams.Get( context.TODO(), "033e105f4ecef8ad9ca31a8372d0c353", pipelines.StreamGetParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", stream.ID) } ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "http": { "authentication": false, "enabled": true, "cors": { "origins": [ "string" ] } }, "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_stream", "version": 3, "worker_binding": { "enabled": true }, "endpoint": "https://01234567890123457689012345678901.ingest.cloudflare.com/v1", "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "schema": { "fields": [ { "type": "int32", "metadata_key": "metadata_key", "name": "name", "required": true, "sql_name": "sql_name" } ], "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "inferred": true } }, "success": true } ``` ## Create Stream `client.Pipelines.Streams.New(ctx, params) (*StreamNewResponse, error)` **post** `/accounts/{account_id}/pipelines/v1/streams` Create a new Stream. ### Parameters - `params StreamNewParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Name param.Field[string]` Body param: Specifies the name of the Stream. - `Format param.Field[StreamNewParamsFormat]` Body param - `type StreamNewParamsFormatJson struct{…}` - `Type StreamNewParamsFormatJsonType` - `const StreamNewParamsFormatJsonTypeJson StreamNewParamsFormatJsonType = "json"` - `DecimalEncoding StreamNewParamsFormatJsonDecimalEncoding` - `const StreamNewParamsFormatJsonDecimalEncodingNumber StreamNewParamsFormatJsonDecimalEncoding = "number"` - `const StreamNewParamsFormatJsonDecimalEncodingString StreamNewParamsFormatJsonDecimalEncoding = "string"` - `const StreamNewParamsFormatJsonDecimalEncodingBytes StreamNewParamsFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamNewParamsFormatJsonTimestampFormat` - `const StreamNewParamsFormatJsonTimestampFormatRfc3339 StreamNewParamsFormatJsonTimestampFormat = "rfc3339"` - `const StreamNewParamsFormatJsonTimestampFormatUnixMillis StreamNewParamsFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamNewParamsFormatParquet struct{…}` - `Type StreamNewParamsFormatParquetType` - `const StreamNewParamsFormatParquetTypeParquet StreamNewParamsFormatParquetType = "parquet"` - `Compression StreamNewParamsFormatParquetCompression` - `const StreamNewParamsFormatParquetCompressionUncompressed StreamNewParamsFormatParquetCompression = "uncompressed"` - `const StreamNewParamsFormatParquetCompressionSnappy StreamNewParamsFormatParquetCompression = "snappy"` - `const StreamNewParamsFormatParquetCompressionGzip StreamNewParamsFormatParquetCompression = "gzip"` - `const StreamNewParamsFormatParquetCompressionZstd StreamNewParamsFormatParquetCompression = "zstd"` - `const StreamNewParamsFormatParquetCompressionLz4 StreamNewParamsFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `HTTP param.Field[StreamNewParamsHTTP]` Body param - `Authentication bool` Indicates that authentication is required for the HTTP endpoint. - `Enabled bool` Indicates that the HTTP endpoint is enabled. - `CORS StreamNewParamsHTTPCORS` Specifies the CORS options for the HTTP endpoint. - `Origins []string` - `Schema param.Field[StreamNewParamsSchema]` Body param - `Fields []StreamNewParamsSchemaField` - `type StreamNewParamsSchemaFieldsInt32 struct{…}` - `Type StreamNewParamsSchemaFieldsInt32Type` - `const StreamNewParamsSchemaFieldsInt32TypeInt32 StreamNewParamsSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsInt64 struct{…}` - `Type StreamNewParamsSchemaFieldsInt64Type` - `const StreamNewParamsSchemaFieldsInt64TypeInt64 StreamNewParamsSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsFloat32 struct{…}` - `Type StreamNewParamsSchemaFieldsFloat32Type` - `const StreamNewParamsSchemaFieldsFloat32TypeFloat32 StreamNewParamsSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsFloat64 struct{…}` - `Type StreamNewParamsSchemaFieldsFloat64Type` - `const StreamNewParamsSchemaFieldsFloat64TypeFloat64 StreamNewParamsSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsBool struct{…}` - `Type StreamNewParamsSchemaFieldsBoolType` - `const StreamNewParamsSchemaFieldsBoolTypeBool StreamNewParamsSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsString struct{…}` - `Type StreamNewParamsSchemaFieldsStringType` - `const StreamNewParamsSchemaFieldsStringTypeString StreamNewParamsSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsBinary struct{…}` - `Type StreamNewParamsSchemaFieldsBinaryType` - `const StreamNewParamsSchemaFieldsBinaryTypeBinary StreamNewParamsSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsTimestamp struct{…}` - `Type StreamNewParamsSchemaFieldsTimestampType` - `const StreamNewParamsSchemaFieldsTimestampTypeTimestamp StreamNewParamsSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit StreamNewParamsSchemaFieldsTimestampUnit` - `const StreamNewParamsSchemaFieldsTimestampUnitSecond StreamNewParamsSchemaFieldsTimestampUnit = "second"` - `const StreamNewParamsSchemaFieldsTimestampUnitMillisecond StreamNewParamsSchemaFieldsTimestampUnit = "millisecond"` - `const StreamNewParamsSchemaFieldsTimestampUnitMicrosecond StreamNewParamsSchemaFieldsTimestampUnit = "microsecond"` - `const StreamNewParamsSchemaFieldsTimestampUnitNanosecond StreamNewParamsSchemaFieldsTimestampUnit = "nanosecond"` - `type StreamNewParamsSchemaFieldsJson struct{…}` - `Type StreamNewParamsSchemaFieldsJsonType` - `const StreamNewParamsSchemaFieldsJsonTypeJson StreamNewParamsSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewParamsSchemaFieldsStruct struct{…}` - `type StreamNewParamsSchemaFieldsList struct{…}` - `Format StreamNewParamsSchemaFormat` - `type StreamNewParamsSchemaFormatJson struct{…}` - `Type StreamNewParamsSchemaFormatJsonType` - `const StreamNewParamsSchemaFormatJsonTypeJson StreamNewParamsSchemaFormatJsonType = "json"` - `DecimalEncoding StreamNewParamsSchemaFormatJsonDecimalEncoding` - `const StreamNewParamsSchemaFormatJsonDecimalEncodingNumber StreamNewParamsSchemaFormatJsonDecimalEncoding = "number"` - `const StreamNewParamsSchemaFormatJsonDecimalEncodingString StreamNewParamsSchemaFormatJsonDecimalEncoding = "string"` - `const StreamNewParamsSchemaFormatJsonDecimalEncodingBytes StreamNewParamsSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamNewParamsSchemaFormatJsonTimestampFormat` - `const StreamNewParamsSchemaFormatJsonTimestampFormatRfc3339 StreamNewParamsSchemaFormatJsonTimestampFormat = "rfc3339"` - `const StreamNewParamsSchemaFormatJsonTimestampFormatUnixMillis StreamNewParamsSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamNewParamsSchemaFormatParquet struct{…}` - `Type StreamNewParamsSchemaFormatParquetType` - `const StreamNewParamsSchemaFormatParquetTypeParquet StreamNewParamsSchemaFormatParquetType = "parquet"` - `Compression StreamNewParamsSchemaFormatParquetCompression` - `const StreamNewParamsSchemaFormatParquetCompressionUncompressed StreamNewParamsSchemaFormatParquetCompression = "uncompressed"` - `const StreamNewParamsSchemaFormatParquetCompressionSnappy StreamNewParamsSchemaFormatParquetCompression = "snappy"` - `const StreamNewParamsSchemaFormatParquetCompressionGzip StreamNewParamsSchemaFormatParquetCompression = "gzip"` - `const StreamNewParamsSchemaFormatParquetCompressionZstd StreamNewParamsSchemaFormatParquetCompression = "zstd"` - `const StreamNewParamsSchemaFormatParquetCompressionLz4 StreamNewParamsSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` - `WorkerBinding param.Field[StreamNewParamsWorkerBinding]` Body param - `Enabled bool` Indicates that the worker binding is enabled. ### Returns - `type StreamNewResponse struct{…}` - `ID string` Indicates a unique identifier for this stream. - `CreatedAt Time` - `HTTP StreamNewResponseHTTP` - `Authentication bool` Indicates that authentication is required for the HTTP endpoint. - `Enabled bool` Indicates that the HTTP endpoint is enabled. - `CORS StreamNewResponseHTTPCORS` Specifies the CORS options for the HTTP endpoint. - `Origins []string` - `ModifiedAt Time` - `Name string` Indicates the name of the Stream. - `Version int64` Indicates the current version of this stream. - `WorkerBinding StreamNewResponseWorkerBinding` - `Enabled bool` Indicates that the worker binding is enabled. - `Endpoint string` Indicates the endpoint URL of this stream. - `Format StreamNewResponseFormat` - `type StreamNewResponseFormatJson struct{…}` - `Type StreamNewResponseFormatJsonType` - `const StreamNewResponseFormatJsonTypeJson StreamNewResponseFormatJsonType = "json"` - `DecimalEncoding StreamNewResponseFormatJsonDecimalEncoding` - `const StreamNewResponseFormatJsonDecimalEncodingNumber StreamNewResponseFormatJsonDecimalEncoding = "number"` - `const StreamNewResponseFormatJsonDecimalEncodingString StreamNewResponseFormatJsonDecimalEncoding = "string"` - `const StreamNewResponseFormatJsonDecimalEncodingBytes StreamNewResponseFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamNewResponseFormatJsonTimestampFormat` - `const StreamNewResponseFormatJsonTimestampFormatRfc3339 StreamNewResponseFormatJsonTimestampFormat = "rfc3339"` - `const StreamNewResponseFormatJsonTimestampFormatUnixMillis StreamNewResponseFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamNewResponseFormatParquet struct{…}` - `Type StreamNewResponseFormatParquetType` - `const StreamNewResponseFormatParquetTypeParquet StreamNewResponseFormatParquetType = "parquet"` - `Compression StreamNewResponseFormatParquetCompression` - `const StreamNewResponseFormatParquetCompressionUncompressed StreamNewResponseFormatParquetCompression = "uncompressed"` - `const StreamNewResponseFormatParquetCompressionSnappy StreamNewResponseFormatParquetCompression = "snappy"` - `const StreamNewResponseFormatParquetCompressionGzip StreamNewResponseFormatParquetCompression = "gzip"` - `const StreamNewResponseFormatParquetCompressionZstd StreamNewResponseFormatParquetCompression = "zstd"` - `const StreamNewResponseFormatParquetCompressionLz4 StreamNewResponseFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Schema StreamNewResponseSchema` - `Fields []StreamNewResponseSchemaField` - `type StreamNewResponseSchemaFieldsInt32 struct{…}` - `Type StreamNewResponseSchemaFieldsInt32Type` - `const StreamNewResponseSchemaFieldsInt32TypeInt32 StreamNewResponseSchemaFieldsInt32Type = "int32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsInt64 struct{…}` - `Type StreamNewResponseSchemaFieldsInt64Type` - `const StreamNewResponseSchemaFieldsInt64TypeInt64 StreamNewResponseSchemaFieldsInt64Type = "int64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsFloat32 struct{…}` - `Type StreamNewResponseSchemaFieldsFloat32Type` - `const StreamNewResponseSchemaFieldsFloat32TypeFloat32 StreamNewResponseSchemaFieldsFloat32Type = "float32"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsFloat64 struct{…}` - `Type StreamNewResponseSchemaFieldsFloat64Type` - `const StreamNewResponseSchemaFieldsFloat64TypeFloat64 StreamNewResponseSchemaFieldsFloat64Type = "float64"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsBool struct{…}` - `Type StreamNewResponseSchemaFieldsBoolType` - `const StreamNewResponseSchemaFieldsBoolTypeBool StreamNewResponseSchemaFieldsBoolType = "bool"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsString struct{…}` - `Type StreamNewResponseSchemaFieldsStringType` - `const StreamNewResponseSchemaFieldsStringTypeString StreamNewResponseSchemaFieldsStringType = "string"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsBinary struct{…}` - `Type StreamNewResponseSchemaFieldsBinaryType` - `const StreamNewResponseSchemaFieldsBinaryTypeBinary StreamNewResponseSchemaFieldsBinaryType = "binary"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsTimestamp struct{…}` - `Type StreamNewResponseSchemaFieldsTimestampType` - `const StreamNewResponseSchemaFieldsTimestampTypeTimestamp StreamNewResponseSchemaFieldsTimestampType = "timestamp"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `Unit StreamNewResponseSchemaFieldsTimestampUnit` - `const StreamNewResponseSchemaFieldsTimestampUnitSecond StreamNewResponseSchemaFieldsTimestampUnit = "second"` - `const StreamNewResponseSchemaFieldsTimestampUnitMillisecond StreamNewResponseSchemaFieldsTimestampUnit = "millisecond"` - `const StreamNewResponseSchemaFieldsTimestampUnitMicrosecond StreamNewResponseSchemaFieldsTimestampUnit = "microsecond"` - `const StreamNewResponseSchemaFieldsTimestampUnitNanosecond StreamNewResponseSchemaFieldsTimestampUnit = "nanosecond"` - `type StreamNewResponseSchemaFieldsJson struct{…}` - `Type StreamNewResponseSchemaFieldsJsonType` - `const StreamNewResponseSchemaFieldsJsonTypeJson StreamNewResponseSchemaFieldsJsonType = "json"` - `MetadataKey string` - `Name string` - `Required bool` - `SqlName string` - `type StreamNewResponseSchemaFieldsStruct struct{…}` - `type StreamNewResponseSchemaFieldsList struct{…}` - `Format StreamNewResponseSchemaFormat` - `type StreamNewResponseSchemaFormatJson struct{…}` - `Type StreamNewResponseSchemaFormatJsonType` - `const StreamNewResponseSchemaFormatJsonTypeJson StreamNewResponseSchemaFormatJsonType = "json"` - `DecimalEncoding StreamNewResponseSchemaFormatJsonDecimalEncoding` - `const StreamNewResponseSchemaFormatJsonDecimalEncodingNumber StreamNewResponseSchemaFormatJsonDecimalEncoding = "number"` - `const StreamNewResponseSchemaFormatJsonDecimalEncodingString StreamNewResponseSchemaFormatJsonDecimalEncoding = "string"` - `const StreamNewResponseSchemaFormatJsonDecimalEncodingBytes StreamNewResponseSchemaFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamNewResponseSchemaFormatJsonTimestampFormat` - `const StreamNewResponseSchemaFormatJsonTimestampFormatRfc3339 StreamNewResponseSchemaFormatJsonTimestampFormat = "rfc3339"` - `const StreamNewResponseSchemaFormatJsonTimestampFormatUnixMillis StreamNewResponseSchemaFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamNewResponseSchemaFormatParquet struct{…}` - `Type StreamNewResponseSchemaFormatParquetType` - `const StreamNewResponseSchemaFormatParquetTypeParquet StreamNewResponseSchemaFormatParquetType = "parquet"` - `Compression StreamNewResponseSchemaFormatParquetCompression` - `const StreamNewResponseSchemaFormatParquetCompressionUncompressed StreamNewResponseSchemaFormatParquetCompression = "uncompressed"` - `const StreamNewResponseSchemaFormatParquetCompressionSnappy StreamNewResponseSchemaFormatParquetCompression = "snappy"` - `const StreamNewResponseSchemaFormatParquetCompressionGzip StreamNewResponseSchemaFormatParquetCompression = "gzip"` - `const StreamNewResponseSchemaFormatParquetCompressionZstd StreamNewResponseSchemaFormatParquetCompression = "zstd"` - `const StreamNewResponseSchemaFormatParquetCompressionLz4 StreamNewResponseSchemaFormatParquetCompression = "lz4"` - `RowGroupBytes int64` - `Inferred bool` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) stream, err := client.Pipelines.Streams.New(context.TODO(), pipelines.StreamNewParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), Name: cloudflare.F("my_stream"), }) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", stream.ID) } ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "http": { "authentication": false, "enabled": true, "cors": { "origins": [ "string" ] } }, "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_stream", "version": 3, "worker_binding": { "enabled": true }, "endpoint": "https://01234567890123457689012345678901.ingest.cloudflare.com/v1", "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "schema": { "fields": [ { "type": "int32", "metadata_key": "metadata_key", "name": "name", "required": true, "sql_name": "sql_name" } ], "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true }, "inferred": true } }, "success": true } ``` ## Update Stream `client.Pipelines.Streams.Update(ctx, streamID, params) (*StreamUpdateResponse, error)` **patch** `/accounts/{account_id}/pipelines/v1/streams/{stream_id}` Update a Stream. ### Parameters - `streamID string` Specifies the public ID of the stream. - `params StreamUpdateParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `HTTP param.Field[StreamUpdateParamsHTTP]` Body param - `Authentication bool` Indicates that authentication is required for the HTTP endpoint. - `Enabled bool` Indicates that the HTTP endpoint is enabled. - `CORS StreamUpdateParamsHTTPCORS` Specifies the CORS options for the HTTP endpoint. - `Origins []string` - `WorkerBinding param.Field[StreamUpdateParamsWorkerBinding]` Body param - `Enabled bool` Indicates that the worker binding is enabled. ### Returns - `type StreamUpdateResponse struct{…}` - `ID string` Indicates a unique identifier for this stream. - `CreatedAt Time` - `HTTP StreamUpdateResponseHTTP` - `Authentication bool` Indicates that authentication is required for the HTTP endpoint. - `Enabled bool` Indicates that the HTTP endpoint is enabled. - `CORS StreamUpdateResponseHTTPCORS` Specifies the CORS options for the HTTP endpoint. - `Origins []string` - `ModifiedAt Time` - `Name string` Indicates the name of the Stream. - `Version int64` Indicates the current version of this stream. - `WorkerBinding StreamUpdateResponseWorkerBinding` - `Enabled bool` Indicates that the worker binding is enabled. - `Endpoint string` Indicates the endpoint URL of this stream. - `Format StreamUpdateResponseFormat` - `type StreamUpdateResponseFormatJson struct{…}` - `Type StreamUpdateResponseFormatJsonType` - `const StreamUpdateResponseFormatJsonTypeJson StreamUpdateResponseFormatJsonType = "json"` - `DecimalEncoding StreamUpdateResponseFormatJsonDecimalEncoding` - `const StreamUpdateResponseFormatJsonDecimalEncodingNumber StreamUpdateResponseFormatJsonDecimalEncoding = "number"` - `const StreamUpdateResponseFormatJsonDecimalEncodingString StreamUpdateResponseFormatJsonDecimalEncoding = "string"` - `const StreamUpdateResponseFormatJsonDecimalEncodingBytes StreamUpdateResponseFormatJsonDecimalEncoding = "bytes"` - `TimestampFormat StreamUpdateResponseFormatJsonTimestampFormat` - `const StreamUpdateResponseFormatJsonTimestampFormatRfc3339 StreamUpdateResponseFormatJsonTimestampFormat = "rfc3339"` - `const StreamUpdateResponseFormatJsonTimestampFormatUnixMillis StreamUpdateResponseFormatJsonTimestampFormat = "unix_millis"` - `Unstructured bool` - `type StreamUpdateResponseFormatParquet struct{…}` - `Type StreamUpdateResponseFormatParquetType` - `const StreamUpdateResponseFormatParquetTypeParquet StreamUpdateResponseFormatParquetType = "parquet"` - `Compression StreamUpdateResponseFormatParquetCompression` - `const StreamUpdateResponseFormatParquetCompressionUncompressed StreamUpdateResponseFormatParquetCompression = "uncompressed"` - `const StreamUpdateResponseFormatParquetCompressionSnappy StreamUpdateResponseFormatParquetCompression = "snappy"` - `const StreamUpdateResponseFormatParquetCompressionGzip StreamUpdateResponseFormatParquetCompression = "gzip"` - `const StreamUpdateResponseFormatParquetCompressionZstd StreamUpdateResponseFormatParquetCompression = "zstd"` - `const StreamUpdateResponseFormatParquetCompressionLz4 StreamUpdateResponseFormatParquetCompression = "lz4"` - `RowGroupBytes int64` ### Example ```go package main import ( "context" "fmt" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) stream, err := client.Pipelines.Streams.Update( context.TODO(), "033e105f4ecef8ad9ca31a8372d0c353", pipelines.StreamUpdateParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } fmt.Printf("%+v\n", stream.ID) } ``` #### Response ```json { "result": { "id": "01234567890123457689012345678901", "created_at": "2019-12-27T18:11:19.117Z", "http": { "authentication": false, "enabled": true, "cors": { "origins": [ "string" ] } }, "modified_at": "2019-12-27T18:11:19.117Z", "name": "my_stream", "version": 3, "worker_binding": { "enabled": true }, "endpoint": "https://01234567890123457689012345678901.ingest.cloudflare.com/v1", "format": { "type": "json", "decimal_encoding": "number", "timestamp_format": "rfc3339", "unstructured": true } }, "success": true } ``` ## Delete Stream `client.Pipelines.Streams.Delete(ctx, streamID, params) error` **delete** `/accounts/{account_id}/pipelines/v1/streams/{stream_id}` Delete Stream in Account. ### Parameters - `streamID string` Specifies the public ID of the stream. - `params StreamDeleteParams` - `AccountID param.Field[string]` Path param: Specifies the public ID of the account. - `Force param.Field[string]` Query param: Delete stream forcefully, including deleting any dependent pipelines. ### Example ```go package main import ( "context" "github.com/cloudflare/cloudflare-go" "github.com/cloudflare/cloudflare-go/option" "github.com/cloudflare/cloudflare-go/pipelines" ) func main() { client := cloudflare.NewClient( option.WithAPIToken("Sn3lZJTBX6kkg7OdcBUAxOO963GEIyGQqnFTOFYY"), ) err := client.Pipelines.Streams.Delete( context.TODO(), "033e105f4ecef8ad9ca31a8372d0c353", pipelines.StreamDeleteParams{ AccountID: cloudflare.F("0123105f4ecef8ad9ca31a8372d0c353"), }, ) if err != nil { panic(err.Error()) } } ```