Skip to content
Cloudflare Docs

Spark (PySpark)

Below is an example of using PySpark to connect to R2 Data Catalog.

Prerequisites

Example usage

from pyspark.sql import SparkSession
# Define catalog connection details (replace variables)
WAREHOUSE = "<WAREHOUSE>"
TOKEN = "<TOKEN>"
CATALOG_URI = "<CATALOG_URI>"
# Build Spark session with Iceberg configurations
spark = SparkSession.builder \
.appName("R2DataCatalogExample") \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1') \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.my_catalog.type", "rest") \
.config("spark.sql.catalog.my_catalog.uri", CATALOG_URI) \
.config("spark.sql.catalog.my_catalog.warehouse", WAREHOUSE) \
.config("spark.sql.catalog.my_catalog.token", TOKEN) \
.config("spark.sql.catalog.my_catalog.header.X-Iceberg-Access-Delegation", "vended-credentials") \
.config("spark.sql.catalog.my_catalog.s3.remote-signing-enabled", "false") \
.config("spark.sql.defaultCatalog", "my_catalog") \
.getOrCreate()
spark.sql("USE my_catalog")
# Create namespace if it does not exist
spark.sql("CREATE NAMESPACE IF NOT EXISTS default")
# Create a table in the namespace using Iceberg
spark.sql("""
CREATE TABLE IF NOT EXISTS default.my_table (
id BIGINT,
name STRING
)
USING iceberg
""")
# Create a simple DataFrame
df = spark.createDataFrame(
[(1, "Alice"), (2, "Bob"), (3, "Charlie")],
["id", "name"]
)
# Write the DataFrame to the Iceberg table
df.write \
.format("iceberg") \
.mode("append") \
.save("default.my_table")
# Read the data back from the Iceberg table
result_df = spark.read \
.format("iceberg") \
.load("default.my_table")
result_df.show()