Warning

SingleStore 9.0 gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 8.9 is recommended for production workloads, which can later be upgraded to SingleStore 9.0.

Iceberg Ingest

Note

This is a Preview feature.

Apache Iceberg is an open-source table format that helps simplify analytical data processing for large datasets in data lakes. SingleStore can be used to add a speed layer to Iceberg tables. Iceberg tables can be directly ingested into SingleStore without the need for an external ETL tool and subsequently processed using SingleStore's high-performance database engine.

Remarks

The following are supported:

  • Iceberg Version 1 tables and Iceberg Version 2 tables with data files in Parquet format.

  • Iceberg tables stored in Amazon S3 with catalogs: GLUE, Snowflake, REST, JDBC, Hive, and Polaris.

  • Initial load of Iceberg tables and continuous ingest of updates to Iceberg tables.

Syntax

CREATE [OR REPLACE] PIPELINE <pipeline_name> AS
LOAD DATA S3 '[<table_identifier>]'
CONFIG '{"catalog_type": "GLUE|SNOWFLAKE|REST|JDBC|HIVE",
<configuration_json>
[, "ingest_mode": "append|upsert|one_time"]
   [, "catalog_name": "<your_catalog_name>" ]
[, “catalog.<property>” : “property_value” [, …]]}'
CREDENTIALS '<credentials_json>’
        [, "schema.change.stop":true|false ]
}'
CREDENTIALS '<credentials_json>
[REPLACE|MERGE] INTO TABLE <table_name>
[ON DUPLICATE KEY UPDATE]
<iceberg_subvalue_mapping>
FORMAT ICEBERG;
<iceberg_subvalue_mapping>:(
{<singlestore_col_name> | @<variable_name>}<- <iceberg_subvalue_path> [, ... ])
<iceberg_subvalue_path>: {ident [::ident ...]}

All the data shaping options for Parquet pipelines are supported for Iceberg pipelines. Refer to Data Shaping with Pipelines for more information.

Schema inference for Iceberg pipelines is supported. Refer to Schema and Pipeline Inference and Infer Data from Iceberg Files for details.

Table Identifier

  • The <table_identifier> identifies the Iceberg table. The <table_identifier> is catalog-specific but is typically in the form: database_name.table_name.

Catalog Specification

  • The catalog_type is required for the catalog specification.

  • The catalog_name is a name to associate with the catalog when reading table metadata and is used internally in SingleStore for logging and metrics purposes. The catalog_name is required for the JDBC catalog and is optional for other catalogs.

  • The catalog.property is a list of key-value pairs for configuring the catalog connection. The property and value are passed directly to the Iceberg SDK to establish the catalog connection.

S3 Specification

  • The configuration_json is a JSON string for S3 configuration parameters such as region, endpoint_url, and compatibility_mode. Refer to CREATE PIPELINE and CREATE LINK for more information.

Ingest Mode

SingleStore supports two types of mechanisms for managing updates to Iceberg tables, ingest_mode and MERGE (merge pipelines). MERGE and ingest_mode have different uses and cannot both be used in the same pipeline definition.

  • Three ingest modes are supported: one_time for one-time loads, and two modes for continuous ingest: append for append-only workloads and upsert for upsert workloads. The default ingest_mode is append.

  • With continuous ingest ("ingest_mode":"append" or "ingest_mode":"upsert"), a running pipeline automatically detects updates to the Iceberg table and ingests them into the SingleStore table. Refer to Continuous Ingest for details on append and upsert mode.

  • In one-time ingest ("ingest_mode":"one_time") SingleStore will request Iceberg table metadata and load data from the latest snapshot available at that moment. Subsequent updates to the Iceberg table are not automatically ingested.

  • The ingest mode is specified by setting ingest_mode in the pipeline specification as shown in the syntax above.

Refer to Continuous Ingest for details on ingest_mode and a comparison of MERGE and ingest_mode.

Schema Change Stop

  • The Iceberg specification allows schema evolution; columns may be renamed, added, dropped, or type-promoted.

  • To detect schema changes to Iceberg tables that are being ingested with pipelines, specify "schema.change.stop":true in the CONFIG section of the pipeline definition.

  • With this setting, when the engine detects a schema change, it pauses the ingestion and throws an error.

  • The error message can be found in the ERROR_MESSAGE column of the information_schema.PIPELINES_ERRORS view.

Refer to Schema Change Detection for details on schema change detection and resuming ingestion after a schema change has been detected.

MERGE

The MERGE INTO TABLE clause creates a pipeline, called a merge pipeline, that continuously ingests data from an Iceberg table into a SingleStore table. A merge pipeline continuously detects inserts, updates, and deletes to/from the Iceberg table and propagates those updates to the SingleStore table.

Merge pipeline handles the following types of Iceberg V2 snapshots:

  • Append

  • Overwrite

  • Replace

  • Delete (positional and equality)

In addition to merge pipelines, SingleStore supports ingest_mode for continuous ingest. MERGE and ingest_mode cannot be used in the same pipeline definition.

Refer to Continuous Ingest for details on merge pipelines and a comparison of MERGE and ingest_mode.

Credentials

  • The credentials_json specifies S3 credentials in JSON format. For information about the supported credential options, refer to CREATE PIPELINE.

  • Refer to the Minimum Required S3 Pipeline Syntax and AWS Elastic Kubernetes Service (EKS)  IAM Roles for Service Accounts (IRSA) Authentication examples in CREATE PIPELINE..

Subvalue Mappings

The iceberg_subvalue_mapping assigns fields from the Iceberg table to columns in the SingleStore table or to temporary variables. A ::-separated list of field names is used in iceberg_subvalue_path to look up fields in nested schemas. When the files in the Iceberg table are Parquet files, the ::-separated list of field names is used to look up fields in nested Parquet schemas. The following rules apply:

  • The last field in iceberg_subvalue_path must be a primitive type.

  • All iceberg_subvalue_path components containing whitespace or punctuation must be surrounded by backticks (`).

  • The iceberg_subvalue_path may not contain Parquet nested types (list or map types). Refer to Parquet - Nested Types for more information.

Enable and Configure Iceberg Ingest

The global engine variable enable_iceberg_ingest must be set to ON to use Iceberg ingest. This variable is set to OFF by default.

SET GLOBAL enable_iceberg_ingest = ON;

In addition, to use Iceberg ingest, the global engine variable java_pipelines_java11_path must be set on all nodes to the path of the JRE 11+ java binary.

SET GLOBAL java_pipelines_java11_path = <path to JRE 11+ java binary>;

Engine variables can be used to control parameters including memory usage and timeouts for Iceberg pipelines. The default values of these engine variables are defined to work well for common Iceberg pipelines uses.

Pipeline parameters including memory usage and timeouts can be controlled using global engine variables with the pipelines_* prefix as specified in List of Engine Variables.

Memory usage can also be controlled using the java_pipelines_heap_size and pipelines_iceberg_data_workers_heap_size engine variables. Separate variables are used to control the memory used by leaf and aggregator nodes. Adjust these variables if there are out of memory (OOM) errors in Java extractors.

Pipeline timeouts can be controlled using pipelines_extractor_get_offsets_timeout_ms and pipelines_extractor_idle_timeout_ms. For Iceberg pipelines, the values of these variables are adjusted to be a minimum of 5 minutes. Increase the values of these variables if there are timeouts in pipeline creation or while running pipeline batches.

Refer to List of Engine Variables for more information.

Continuous Ingest

With continuous ingestion, a running pipeline automatically detects updates to an Iceberg table and ingests them into the SingleStore table.

Continuous Ingest - Append-Only

Append-only mode is used for scenarios in which new rows are added to the Iceberg table, but no rows are deleted or modified.

In append-only mode ("ingest_mode":"append") the pipeline will process APPEND Iceberg snapshots. If a DELETE or OVERWRITE snapshot is encountered, an error is raised. Users may override those errors by specifying "ignore_unsupported_modifications":true in the pipeline configuration. SingleStore does not recommend setting "ignore_unsupported_modifications":true as doing so may lead to data inconsistency.

Continuous Ingest - Upsert

Upsert mode is used for scenarios where updates modify non-key columns based on a specified key column(s). An upsert is an insert and update, an upsert updates a row in the table if there is a primary key match or inserts the row if there is no primary key match.

In upsert mode ("ingest_mode":"upsert"), the pipeline will process updates to the Iceberg table as upserts to the SingleStore table. That is, the pipeline will process Iceberg APPEND and OVERWRITE snapshots of the Iceberg table as upserts to the SingleStore table.

Requirements:

  • The SingleStore table must have a primary key or a unique index. In the <iceberg_subvalue_mapping>, a column(s) in the Iceberg table must be mapped to the column(s) in SingleStore on which there is a key or unique index. Typically, the column(s) in the Iceberg table will also be declared as a key.

  • The pipeline must be created using REPLACE.  Refer to Additional CREATE PIPELINE Examples for more information.

Limitations:

  • Pipelines will fail on delete snapshots. Users may override these errors by specifying "ignore_unsupported_modifications":true in the pipeline configuration. SingleStore does not recommend setting "ignore_unsupported_modifications":true as doing so may lead to data inconsistency.

  • Limitations are expected to be addressed in future releases.

Continuous Ingest - MERGE (Merge Pipelines)

The MERGE INTO TABLE clause creates a merge pipeline, that continuously ingests data from an Iceberg table into a SingleStore table. Append, Overwrite, Replace, and Delete (positional and equality) Iceberg V2 snapshots are supported.

For a merge pipeline, the SingleStore (destination) table:

  • Must have the following three columns:

    • `$_file` VARCHAR(2048) COMMENT 'ICEBERG_FILE_PATH'

    • `$_row` BIGINT COMMENT 'ICEBERG_FILE_POS'

    • `$_delete` JSON DEFAULT '{}' COMMENT 'ICEBERG_DELETED_BY'

  • You may use different column names for the `$_file`, `$_row`, and `$_delete` columns as long as the columns are marked with the COMMENT clauses shown above.

  • Must have shard key defined as:

    • SHARD KEY(`$_file`, `$_row`)

  • Must not be updated or modified. If the destination table is modified, updates may be applied incorrectly.

The pipeline declaration must set the `$_file` and `$_row` columns as follows:

SET `$_file` = pipeline_source_file(), `$_row` = pipeline_source_file_row()

To ingest deletes, a merge pipeline must be created using a VIEW over the destination table. This view must have the following WHERE clause:

WHERE JSON_LENGTH(`$_delete`) = 0

Merge pipelines perform merge-on-read on-demand during table scan when a view is used.

Example - Continuous Ingest with a Merge Pipeline provides a merge pipeline example.

Compare Types of Continuous Ingest

Merge pipelines (MERGE) and Append-Only and Upsert (ingest_mode) have different characteristics, SingleStore recommends you select the variant that works best with your workload.

  • MERGE requires adding columns ($_file, $_row, $_delete) in the destination (SingleStore) table.

  • When using MERGE, users may not directly update the destination (SingleStore) table. If the destination (SingleStore) table is updated, updates made to the source Iceberg table may not propagate properly.

  • MERGE implements merge-on-read.

  • ingest_mode updates the SingleStore table directly and does not have a read penalty.

  • MERGE can process Append, Overwrite, Replace, and Delete (positional and equality) Iceberg snapshots; ingest_mode has more limitations on the types of Iceberg snapshots processed.

Schema Change Detection

If "schema.change.stop":true is specified in the pipeline definition, when the engine detects a schema change, it pauses the ingestion and throws an error. The error message can be found in the ERROR_MESSAGE column of the information_schema.PIPELINES_ERRORS view and includes a diff in JSON that contains information about how the schema has evolved.

Consider an Iceberg table stored external to SingleStore that is to be ingested into SingleStore with a pipeline. We refer to the Iceberg table as the source table and the SingleStore table as the destination table.

Let the source Iceberg table have the following schema:

(
id int,
bigid long,
fl float,
doub double,
occurrence_da date,
data string,
st struct<a int, b int, c struct<d string, e int>>
)

Assume a pipeline has been created to ingest the source Iceberg table into a destination SingleStore table.

Assume the following alter table queries are performed on the source Iceberg table. These queries demonstrate the possible types of schema evolution: promote a type, rename a column, add a column, and drop a column.

ALTER TABLE <table_name> ALTER COLUMN id TYPE bigint
ALTER TABLE <table_name> RENAME COLUMN id to identity
ALTER TABLE <table_name> ADD COLUMN name string
ALTER TABLE <table_name> DROP COLUMN occurrence_da

If "schema.change.stop":true is set, when the engine detects these changes to the source Iceberg table, the pipeline is stopped, and the following error message is generated.

Iceberg Table Schema at Source has changed. Schema Diff :
{
"schema_id": {
"before": 0,
"after": 8
},
"diff": [
{
"op": "column_type_change",
"column": "id",
"before": "int",
"after": "long",
"column_id": 1
},
{
"op": "column_rename",
"before": "id",
"after": "identity",
"column_id": 1
},
{
"op": "column_add",
"before": null,
"after": "name",
"column_id": 13
},
{
"op": "column_delete",
"before": "occurrence_da",
"after": null,
"column_id": 5
}
]
}

In this error message, the Iceberg schema version is included in schema_id, and the diff includes one entry for each modification to the table schema.

Resume Ingestion

Follow these steps to resume ingestion after a pipeline has been paused due to schema change.

  1. Run SHOW CREATE PIPELINE <pipeline_name> EXTENDED and capture the results of this command.

    1. The result of this query will contain ALTER PIPELINE statements that move the "file offsets" back to the position before the pipeline was paused.

  2. Drop the pipeline.

  3. Apply the schema changes manually to the SingleStore table.

  4. Recreate the pipeline.

  5. Run the ALTER PIPELINE statements captured in step 1.

  6. Start the new pipeline.

Ingest Data Type Mapping

The table below lists Iceberg Types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a SET clause.

Iceberg Type

Recommended SingleStore Data Type

Recommended Conversion

boolean

TINYINT/BOOL/BOOLEAN

int

INT

long

BIGINT

float

FLOAT

double

DOUBLE

decimal(P,S)

DECIMAL

date

DATE

DATE_ADD('1970-01-01', INTERVAL @date DAY)

time

TIME(6)

DATE_ADD('1970-01-01', INTERVAL @time_value MICROSECOND)

timestamp

DATETIME(6)

DATE_ADD('1970-01-01', INTERVAL @timestamp_value MICROSECOND)

timestamptz

DATETIME(6)

DATE_ADD('1970-01-01', INTERVAL @timestamp_value MICROSECOND)

string

LONGTEXT (utf8mb4_bin)

uuid

BINARY(16)

fixed (L)

BINARY(L)

binary

LONGBLOB

CREATE OR REPLACE

When a pipeline with a specified pipeline_name already exists, the CREATE OR REPLACE command functions similarly to CREATE PIPELINE, with the added benefit of preserving existing pipeline metadata, such as loaded offsets and data files. Running CREATE OR REPLACE on an existing pipeline initiates the Iceberg pipeline to retrieve a new snapshot, schema, and data files, and inserts data from these new files into the destination table in SingleStore.

Executing CREATE OR REPLACE on an existing Iceberg pipeline may cause some data files to be ingested twice. To avoid this, use CREATE OR REPLACE only with REPLACE INTO statements or in an upsert configuration.

CREATE PIPELINE books_create_pipe AS
LOAD DATA S3 'db.books'
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE",
"catalog_name": "s3_catalog",
"ingest_mode": "one_time"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
REPLACE INTO TABLE books
(Id <- id,
Name <- name,
NumPages <- numPages,
Rating <- rating)
FORMAT ICEBERG;

Refer to CREATE PIPELINE for syntax for CREATE OR REPLACE PIPELINE, REPLACE INTO TABLE, and ON DUPLICATE KEY UPDATE. Refer to Performing Upserts for more information on upserts.

Example - Glue Catalog on Amazon S3

An Iceberg table with data files in Parquet format that is stored in an AWS S3 bucket using AWS Glue can be loaded into a SingleStore table using a pipeline (CREATE PIPELINE. Refer to Apache Iceberg - Glue Catalog for information on using a GLUE catalog with Iceberg.

In this example, a table named books is created and data from an Iceberg table that meets this schema is loaded into the books table.

Create the table.

CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PRIMARY KEY(Id));

The following data is used for this example.

(1, 'Happy Place', 400, 4.9)
(2, 'Legends & Lattes', 304, 4.9)
(3, 'The Vanishing Half', 352, 4.9)
(4, 'The Wind Knows My Name', 304, 4.9)

The PIPELINE statement below will load data from an Iceberg table containing the data above into the books table. The column names on the left side of the <- are the column names from the SingleStore table into which the data will be loaded. The column names on the right side of the <- are the column names from the Iceberg table which is to be loaded into SingleStore.

CREATE PIPELINE books_pipe AS
LOAD DATA S3 'db.books'
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE books
(Id <- id,
Name <- name,
NumPages <- numPages,
Rating <- rating)
FORMAT ICEBERG;

Test the pipeline.

TEST PIPELINE books_pipe;
+------+------------------------+----------+--------+
| Id   | Name                   | NumPages | Rating |
+------+------------------------+----------+--------+
|    4 | The Wind Knows My Name |      304 |    4.9 |
|    1 | Happy Place            |      400 |    4.9 |
|    2 | Legends & Lattes       |      304 |    4.9 |
|    3 | The Vanishing Half     |      352 |    4.9 |
+------+------------------------+----------+--------+

Refer to START PIPELINE for more information on starting pipelines.

Example - Use Subvalue Mappings

This example shows the use of subvalue mappings to load nested elements from an Iceberg schema into a SingleStore table.

Create a table.

CREATE TABLE addresses(
Id INT,
Name TEXT,
Street TEXT,
City TEXT,
Country TEXT,
PRIMARY KEY(Id));

The following data is used for this example.

(1, 'Mylo', struct('123 Main St', 'New York', 'USA'))
(2, 'Naya', struct('456 Elm St', 'San Francisco', 'USA'))

The PIPELINE statement below will load data from an Iceberg table containing the data above into the addresses table. The column names on the left side of the <- are the column names from the SingleStore table into which the data will be loaded. The column names on the right side of the <- are the column names from the Iceberg table which is to be loaded into SingleStore.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 'db2.addresses'
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE",
"catalog_name": "s3_catalog"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

Test the pipeline.

TEST PIPELINE addresses_pipe;
+------+------+-------------+---------------+---------+
| Id   | Name | Street      | City          | Country |
+------+------+-------------+---------------+---------+
|    1 | Mylo | 123 Main St | New York      | USA     |
|    2 | Naya | 456 Elm St  | San Francisco | USA     |
+------+------+-------------+---------------+---------+

Refer to START PIPELINE for more information on starting pipelines.

Example - Snowflake Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 with a Snowflake catalog.

Iceberg tables to be ingested in SingleStore must be created on an external volume. Refer to Tutorial: Create your first Iceberg table, Create an external volume, and Snowflake Iceberg Catalog SDK for more information.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 'db_name.schema_name.table_name'
CONFIG '{"region" : "us-west-2",
"catalog_type": "SNOWFLAKE",
"catalog.uri": "jdbc:snowflake://tpq12345.snowflakecomputing.com",
"catalog.jdbc.user":"<user_name>",
"catalog.jdbc.password":"<password>",
"catalog.jdbc.role":"<user role>"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

For the Snowflake catalog, the <table_identifier> must consist of three parts - the database name, the schema name, and the table name, db_name.schema_name.table_name in the example above.

The catalog.uri can be obtained from running SELECT SYSTEM$ALLOWLIST(); in the Snowflake system.

In addition, the catalog.uri, catalog.jdbc.user, catalog.jdbc.password, and catalog.jdbc.role are required when using the Snowflake catalog.

Example - REST Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 with REST catalog.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 'db_name.table_name'
CONFIG '{"region" : "us-west-2",
"catalog_type": "REST",
"catalog.uri": "http://host.addresss:8181"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

Example - JDBC Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 with JDBC catalog. Refer to Iceberg JDBC Integration for more information on using Iceberg with JDBC catalog.

SingleStore supports Postgres, MySQL, and SQLite JDBC drivers by default. Additional drivers can be added using java_pipelines_class_path.

The following example uses JDBC with SQLite.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 'db_name.table_name'
CONFIG '{"region" : "us-west-2",
"catalog_type": "JDBC",
"catalog_name": "catalog_name",
"catalog.warehouse": "s3://path_to_warehouse",
"catalog.uri":"jdbc:sqlite_:file:/path_jdbc"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

The following example uses JDBC with MySQL.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 'db_name.table_name'
CONFIG '{"region" : "us-west-2",
"catalog_type": "JDBC",
"catalog_name": "catalog_name",
"catalog.warehouse": "s3://path_to_warehouse",
"catalog.uri": "jdbc:mysql://host.address:3306/default",
"catalog.jdbc.user": "<user_name>",
"catalog.jdbc.password": "<password>"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Country <- address::country)
FORMAT ICEBERG;

The catalog.warehouse and catalog_name are required for JDBC catalogs.

Example - Hive Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 using Hive Catalog. Iceberg tables to be ingested in the Hive catalog must use Hive Metastore service. Refer to Apache Hive and Hive - Apache Iceberg for more information.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 'db_name.table_name'
CONFIG '{"catalog_type": "HIVE",
"catalog.uri": "thrift://<service_endpoint>:46590",
"region": "us-east-1",
"catalog.hive.metastore.client.auth.mode": "PLAIN",
"catalog.hive.metastore.client.plain.username": "<username>",
"catalog.hive.metastore.client.plain.password": "<password>",
"catalog.metastore.use.SSL": "true",
"catalog.hive.metastore.truststore.type": "PKCS12",
"catalog.hive.metastore.truststore.path": "/path/to/your/project/hive/truststore.12"
"catalog.hive.metastore.truststore.password": <truststore_password>
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

The catalog.uri is the base URL for accessing the Hive catalog's API or service endpoint. 

The catalog.hive.metastore.client.auth.mode is the authentication mode for connecting to the Hive Metastore. 

The catalog.hive.metastore.client.plain.username is the username used to authenticate with the Hive Metastore. 

The catalog.hive.metastore.client.plain.password is the password for the authenticated user. 

The catalog.metastore.use.SSL is a boolean flag that secures communication with the Hive Metastore. 

The catalog.hive.metastore.truststore.type is the truststore format used to validate the SSL certificate. 

The catalog.hive.metastore.truststore.path is the file path that contains the SSL certificate. 

The catalog.hive.metastore.truststore.password is the password needed to access the truststore.

Refer to GitHub for additional Hive configurations.

Example - Polaris Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 using Polaris Catalog. Polaris catalog uses the Apache Iceberg REST API. Refer to Getting Started with Snowflake Open Catalog for more information.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 'db_name.table_name'
CONFIG '{"catalog_type": "REST",
"catalog.warehouse": "<polaris_catalog_name>",
"table_id": "db_name.table_name",
"region":"us-east-1",
"catalog.uri":"https://tpq12345.snowflakecomputing.com/polaris/api/catalog",
"region":"us-east-1",
"catalog.credential":"catalog.credential",
"catalog.scope": "PRINCIPAL_ROLE:ALL"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

The catalog.uri is the base URL for accessing the Polaris catalog's API or service endpoint.

The catalog.scope defines the access permissions for the Polaris catalog.

The catalog.connection is a secret key from Polaris catalog connection, formatted as <ClientID>:<Secret>. Get this when you configure the service connection in Snowflake while creating a connection for a Polaris catalog.

Example - Continuous Ingest - Upsert Mode

The example below shows a pipeline using ingest_mode of upsert. As required when ingest_mode is upsert, the books table has a primary key, Id, in this example.

When started, this pipeline will ingest data from the latest snapshot of the Iceberg table into the SingleStore books table. Then, when the Iceberg table is updated, those updates will automatically be applied to the SingleStore table as upserts.

CREATE PIPELINE books_upsert_pipe AS
LOAD DATA S3 'db.books'
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE",
"catalog_name": "s3_catalog"
"ingest_mode": "upsert"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
REPLACE INTO TABLE books
(Id <- id,
Name <- name,
NumPages <- numPages,
Rating <- rating)
FORMAT ICEBERG;

Example - Continuous Ingest - Append Mode

The example below shows a pipeline using ingest_mode of append.

When started, this pipeline will ingest data from the latest snapshot of the Iceberg table into the SingleStore books table. Then, when the Iceberg table is updated, any appends to that table will automatically be appended to the SingleStore table. If non-append updates are encountered, an error will be thrown.

CREATE PIPELINE books_append_pipe AS
LOAD DATA S3 'db.books'
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE",
"catalog_name": "s3_catalog",
"ingest_mode": "append"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE books
(Id <- id,
Name <- name,
NumPages <- numPages,
Rating <- rating)
FORMAT ICEBERG;

Example - Hadoop Catalog on FS

Ingest an Iceberg table stored in the local filesystem using Hadoop Catalog. When loading an Iceberg table from a local file system, the file system must be accessible from all SingleStore nodes.

CREATE PIPELINE addresses_pipe AS
LOAD DATA FS 'db_name.table_name'
CONFIG '{"catalog_type": "HADOOP",
"catalog.warehouse": "/tmp/warehouse_path"
}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

Example - Continuous Ingest with a Merge Pipeline

The example below shows an example of a creating a merge pipeline.

In this example, a destination table in SingleStore named _books is created with the necessary $_file, $_row, and $_delete columns. Then a view books is created on top of the _books table. Finally a merge pipeline is defined.

When started, this pipeline will ingest data from the latest snapshot of the source Iceberg table into the SingleStore _books table. When the source Iceberg table is updated, those updates (append, replace, delete) will be propagated to the _books table.

Create the destination _books table.

CREATE TABLE _books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
`$_file` VARCHAR(2048),
`$_row` BIGINT,
`$_delete` JSON DEFAULT '{}',
KEY(Id),
PRIMARY KEY(`$_file`,`$_row`));

Create the books view.

CREATE VIEW books
AS
SELECT Id, Name, NumPages, Rating from _books
WHERE JSON_LENGTH(`$_delete`) = 0;

Define the pipeline.

CREATE PIPELINE books_pipe AS
LOAD DATA S3 'db.books'
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE",
"catalog_name": "s3_catalog"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
MERGE INTO TABLE _books
(Id <- id,
Name <- name,
NumPages <- numPages,
Rating <- rating)
FORMAT ICEBERG
SET `$_file` = pipeline_source_file(),
`$_row` = pipeline_source_file_row();

Troubleshooting

The following table lists errors that can occur when creating an Iceberg Ingest pipeline. Also, refer to Debugging Pipeline Errors for additional information on troubleshooting pipeline errors.

Catalog

Error

Cause and Resolution

All

protocol error … Process died unexpectedly or didn't start.

An incorrect value of the engine variable java_pipelines_java11_path may cause this error. Ensure sure the path is valid, for example, “/usr/bin/java”.

Snowflake

Certificate for <...> doesn't match any of the subject alternative names.

An incorrect URI may cause this error. 

Verify that the catalog.uri is valid.

Snowflake

SEVERE: WARNING!!! Using fail-open to connect. Driver is connecting to an HTTPS endpoint without OCSP based Certificate Revocation checking as it could not obtain a valid OCSP Response to use from the CA OCSP responder.

This issue needs to be resolved on the Snowflake side. Refer to OCSP Configuration for more information

Snowflake

Parquet parsing errors such as “Dictionary encoding not implemented".

Set Snowflake table property STORAGE_SERIALIZATION_POLICY = COMPATIBLE as in CREATE ICEBERG TABLE (Snowflake as the Iceberg catalog) | Snowflake Documentation. 

Last modified: June 25, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK