Skip to main content
Skip table of contents

Staging with streaming tables in Databricks

Guide objective

This guide explains how you can use streaming tables in Databricks for staging source data as an alternative to the regular Notify API & COPY INTO approach.

Streaming tables are a commonly used method for data ingestion in Databricks. Streaming tables can be used for source data staging with ADE as an alternative approach to notifying source files with Notify API and using COPY INTO statements in file loads.

Entities

In our example we have two METADATA_ONLY entities:

image-20250220-073914.png

Entity

Type

Physical type

Description

LATEST_COMBINED_1

SOURCE

METADATA_ONLY

Defines the source data schema i.e. attributes and their data types.

STG_BARENTSWATCH_LATEST_COMBINED_1

DB_STREAMING_STAGE*

METADATA_ONLY

Represents the streaming table. Additional metadata attributes can be introduced in this entity as needed. The physical streaming table is defined in an OVERRIDE load step.

*DB_STREAMING_STAGE is a custom entity type we have defined for this example in CONFIG_ENTITY_DEFAULTS. This entity could also be defined as a STAGE but a dedicated entity type allows further customization and automation for the use case such as setting default metadata attributes, default transformations and load templates.

Load

A load is defined in the target entity as type TRANSFORM_PERSIST. Note that since we are not using Notify API, we cannot use LOAD_FILE.

An entity mapping is defined as follows:

Source attribute

Transformation type

Target attribute

Description

-

RUN_ID

stg_run_id

Used for Run ID logic. A new run id is written to new rows when streaming table refresh is called.

-

CURRENT_TS

stg_create_time

Current timestamp is written to new rows when streaming table refresh is called. Note that rows added in the same refresh execution may end up having different timestamps due to distributed processing.

-

SOURCE_SYSTEM_NAME

stg_source_system

Inserts source system name using the source_system_name variable.

Transformation type defined in CONFIG_LOAD_TRANSFORMATIONS.

-

SOURCE_ENTITY_NAME

stg_source_entity

Inserts source entity name using the source_entity_name variable.

Transformation type defined in CONFIG_LOAD_TRANSFORMATIONS.

-

FILE_PATH_EXTERNAL

stg_file_name

Inserts source file path using the Databricks _metadata.file_path column.

Transformation type defined in CONFIG_LOAD_TRANSFORMATIONS.

mmsi

NONE

mmsi

Source data attributes mapped without transformation.

msgtime

NONE

msgtime

An OVERRIDE load step is defined as follows:

SQL
CREATE OR REFRESH STREAMING TABLE <target_schema>.<target_entity_name>
AS
SELECT
    <target_entity_attribute_list_with_transform_cast_and_names>
FROM STREAM read_files(
    "abfss://container@storageaccount.dfs.core.windows.net/barentswatch/latest_combined/",
    format => 'csv',
    delimiter => '|'
)

Where variables

are used to make the load template generic. In this case only the external file location and file format settings need to be customized.

The following load options are defined to enable Run ID logic:

See this example to understand how Run ID logic works when Notify API is not used.

Generated load:

SQL
CREATE OR REFRESH STREAMING TABLE staging_db.STG_BARENTSWATCH_LATEST_COMBINED_1
AS
SELECT
    <targetrunid> AS stg_batch_id
    , current_timestamp() AS stg_create_time
    , 'BARENTSWATCH' AS stg_source_system
    , 'LATEST_COMBINED_1' AS stg_source_entity
    , _metadata.file_path AS stg_file_name
    , CAST(mmsi AS BIGINT) AS mmsi
    , CAST(msgtime AS TIMESTAMP) AS msgtime
    ...
FROM STREAM read_files(
    "abfss://container@storageaccount.dfs.core.windows.net/barentswatch/latest_combined/",
    format => 'csv',
    delimiter => '|'
);

Orchestrating the load in a workflow refreshes the streaming table and Run ID logic can be used for loading new rows to further entities.

Note that since streaming tables are not natively supported as a physical entity type in ADE yet, their lifecycle must be managed in Databricks. For example, dropping streaming tables can be done manually in Databricks. Major changes may also require dropping and recreating the streaming table.

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.