Staging with streaming tables in Databricks
Guide objective
This guide explains how you can use streaming tables in Databricks for staging source data as an alternative to the regular Notify API & COPY INTO approach.
Streaming tables are a commonly used method for data ingestion in Databricks. Streaming tables can be used for source data staging with ADE as an alternative approach to notifying source files with Notify API and using COPY INTO statements in file loads.
Entities
In our example we have two METADATA_ONLY entities:

Entity | Type | Physical type | Description |
---|---|---|---|
LATEST_COMBINED_1 | SOURCE | METADATA_ONLY | Defines the source data schema i.e. attributes and their data types. |
STG_BARENTSWATCH_LATEST_COMBINED_1 | DB_STREAMING_STAGE* | METADATA_ONLY | Represents the streaming table. Additional metadata attributes can be introduced in this entity as needed. The physical streaming table is defined in an OVERRIDE load step. |
*DB_STREAMING_STAGE is a custom entity type we have defined for this example in CONFIG_ENTITY_DEFAULTS. This entity could also be defined as a STAGE but a dedicated entity type allows further customization and automation for the use case such as setting default metadata attributes, default transformations and load templates.
Load
A load is defined in the target entity as type TRANSFORM_PERSIST. Note that since we are not using Notify API, we cannot use LOAD_FILE.
An entity mapping is defined as follows:
Source attribute | Transformation type | Target attribute | Description |
---|---|---|---|
- | RUN_ID | stg_run_id | Used for Run ID logic. A new run id is written to new rows when streaming table refresh is called. |
- | CURRENT_TS | stg_create_time | Current timestamp is written to new rows when streaming table refresh is called. Note that rows added in the same refresh execution may end up having different timestamps due to distributed processing. |
- | SOURCE_SYSTEM_NAME | stg_source_system | Inserts source system name using the source_system_name variable. Transformation type defined in CONFIG_LOAD_TRANSFORMATIONS. |
- | SOURCE_ENTITY_NAME | stg_source_entity | Inserts source entity name using the source_entity_name variable. Transformation type defined in CONFIG_LOAD_TRANSFORMATIONS. |
- | FILE_PATH_EXTERNAL | stg_file_name | Inserts source file path using the Databricks _metadata.file_path column. Transformation type defined in CONFIG_LOAD_TRANSFORMATIONS. |
mmsi | NONE | mmsi | Source data attributes mapped without transformation. |
msgtime | NONE | msgtime | … |
… | … | … | … |
An OVERRIDE load step is defined as follows:
CREATE OR REFRESH STREAMING TABLE <target_schema>.<target_entity_name>
AS
SELECT
<target_entity_attribute_list_with_transform_cast_and_names>
FROM STREAM read_files(
"abfss://container@storageaccount.dfs.core.windows.net/barentswatch/latest_combined/",
format => 'csv',
delimiter => '|'
)
Where variables
are used to make the load template generic. In this case only the external file location and file format settings need to be customized.
The following load options are defined to enable Run ID logic:
Option name | Option value |
---|---|
true | |
-1 |
See this example to understand how Run ID logic works when Notify API is not used.
Generated load:
CREATE OR REFRESH STREAMING TABLE staging_db.STG_BARENTSWATCH_LATEST_COMBINED_1
AS
SELECT
<targetrunid> AS stg_batch_id
, current_timestamp() AS stg_create_time
, 'BARENTSWATCH' AS stg_source_system
, 'LATEST_COMBINED_1' AS stg_source_entity
, _metadata.file_path AS stg_file_name
, CAST(mmsi AS BIGINT) AS mmsi
, CAST(msgtime AS TIMESTAMP) AS msgtime
...
FROM STREAM read_files(
"abfss://container@storageaccount.dfs.core.windows.net/barentswatch/latest_combined/",
format => 'csv',
delimiter => '|'
);
Orchestrating the load in a workflow refreshes the streaming table and Run ID logic can be used for loading new rows to further entities.
Note that since streaming tables are not natively supported as a physical entity type in ADE yet, their lifecycle must be managed in Databricks. For example, dropping streaming tables can be done manually in Databricks. Major changes may also require dropping and recreating the streaming table.