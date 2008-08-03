from pyspark . sql import SparkSession # Define catalog connection details (replace variables) WAREHOUSE = "<WAREHOUSE>" TOKEN = "<TOKEN>" CATALOG_URI = "<CATALOG_URI>" # Build Spark session with Iceberg configurations spark = SparkSession . builder \ . appName ( "R2DataCatalogExample" ) \ . config ( 'spark.jars.packages' , 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1' ) \ . config ( "spark.sql.extensions" , "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" ) \ . config ( "spark.sql.catalog.my_catalog" , "org.apache.iceberg.spark.SparkCatalog" ) \ . config ( "spark.sql.catalog.my_catalog.type" , "rest" ) \ . config ( "spark.sql.catalog.my_catalog.uri" , CATALOG_URI ) \ . config ( "spark.sql.catalog.my_catalog.warehouse" , WAREHOUSE ) \ . config ( "spark.sql.catalog.my_catalog.token" , TOKEN ) \ . config ( "spark.sql.catalog.my_catalog.header.X-Iceberg-Access-Delegation" , "vended-credentials" ) \ . config ( "spark.sql.catalog.my_catalog.s3.remote-signing-enabled" , "false" ) \ . config ( "spark.sql.defaultCatalog" , "my_catalog" ) \ . getOrCreate () spark . sql ( "USE my_catalog" ) # Create namespace if it does not exist spark . sql ( "CREATE NAMESPACE IF NOT EXISTS default" ) # Create a table in the namespace using Iceberg spark . sql ( """ CREATE TABLE IF NOT EXISTS default.my_table ( id BIGINT, name STRING ) USING iceberg """ ) # Create a simple DataFrame df = spark . createDataFrame ( [( 1 , "Alice" ), ( 2 , "Bob" ), ( 3 , "Charlie" )], [ "id" , "name" ] ) # Write the DataFrame to the Iceberg table df . write \ . format ( "iceberg" ) \ . mode ( "append" ) \ . save ( "default.my_table" ) # Read the data back from the Iceberg table result_df = spark . read \ . format ( "iceberg" ) \ . load ( "default.my_table" ) result_df . show ()