Persistent staging
A persistent staging area (PSA) is a data warehouse zone that:
Stores source system data in its original format without transformation
Keeps a history of unique changes
Is by default insert-only, i.e. records are not deleted or updated
The persistent staging area can be used as a foundation for loading the publish layer where data from different source systems is combined and all the business logic is applied to support different end use cases. It takes care of data storage including change history enabling you to always come back to the original source system data with new use cases and modeling changes.
See the examples below on how the persistent staging approach can be applied with Agile Data Engine.
Notes
A data warehouse should have some form of raw data storage, e.g. a data lake, a persistent staging area or a raw Data Vault model, or a selection of these.
Storing source system data including its change history in its original format in a raw data storage layer allows more flexibility in the further data warehouse layers where data from multiple sources is combined and business logic is applied.
Examples
Configuring default settings for a persistent staging area
Default entity types STAGE or GENERIC could be used for persistent staging tables, but it is recommended to configure a new custom entity type with the desired default attributes, default transformations and load templates. Here we will present an example that you can modify to fit your specific needs.
Transformations
Let’s first define the new transformations needed in our example:
Transformation | Logic | Description |
---|---|---|
HASH_KEY | Mapped key attributes trimmed, concatenated and hashed. | This transformation will be used for loading a hashed primary key for each persistent staging table. Loads will use this entity key, an insert timestamp and a comparison hash to track change history per key. |
ENTITY_KEY | Mapped key attributes trimmed and concatenated (without hashing). | ENTITY_KEY will have the same transformations as HASH_KEY but without the hash function. This transformation will be used to load a human-readable entity key attribute which can be utilised later on other data warehouse layers and for troubleshooting purposes. |
COMPARISON_HASH | Mapped source attributes including key attributes concatenated and hashed. | The comparison hash will be used for comparing whether data has changed between loaded source data extracts. |
LOAD_NAME | References the loadname variable. | Provides the load name as additional metadata for auditing and troubleshooting purposes. |
PACKAGE_VERSION | References the packageversion variable. | Provides the package version as additional metadata for auditing and troubleshooting purposes. |
Examples of the transformations configured in CONFIG_LOAD_TRANSFORMATIONS for Snowflake and Synapse:
Making changes to configuration packages
Note that the configurations should be appended to CONFIG_LOAD_TRANSFORMATIONS, i.e. not replacing existing configurations.
More examples available in GitHub
Our latest default configuration contains the PSA entity type. You can find it and other examples in this GitHub repository.
Entity type
Then, let’s create a new entity type that defines the required default attributes and references the transformations created in the previous step:
Default attribute | Transformation | Description |
---|---|---|
dw_id | HASH_KEY | Entity key, see transformation description above. |
dw_entity_key | ENTITY_KEY | Non-hashed entity key, see transformation description above. |
dw_hash | COMPARISON_HASH | Change tracking comparison attribute, see transformation description above. |
dw_run_id | RUN_ID | Used optionally with Run ID Logic. |
meta_insert_time | CURRENT_TS | Record insert timestamp |
meta_update_time | CURRENT_TS | Record update timestamp. For regular use cases this will be the same as meta_insert_time as persistent staging is insert only. However, in special cases where you do not want to preserve change history, you can use a different loading pattern and this timestamp indicates when the record was updated. |
meta_load_name | LOAD_NAME | Metadata attribute, see transformation description above. |
meta_package_version | PACKAGE_VERSION | Metadata attribute, see transformation description above. |
meta_source_system | - | Source system name |
meta_source_entity | - | Source entity name |
meta_file_name | - | Source file name |
Example entity type configuration in CONFIG_ENTITY_DEFAULTS:
Making changes to configuration packages
Note that the configurations should be appended to CONFIG_ENTITY_DEFAULTS, i.e. not replacing existing configurations. Also, the example is referencing some existing default attribute and transformation types.
More examples available in GitHub
Our latest default configuration contains the PSA entity type. You can find it and other examples in this GitHub repository.
Creating a persistent staging table
Finally, let’s create a persistent staging entity using the configurations done in the previous example:
Select or create a package and create a new entity, choose type PSA and physical type TABLE.
Navigate to the source entity (staging table) and create an outgoing load selecting the PSA entity as target entity.
Create an entity mapping as follows:
Source attribute | Transformation | Target attribute | Notes |
---|---|---|---|
Primary key attribute(s) in the source system entity. | HASH_KEY | dw_id | Will be used in the entity load, can also be used for e.g. smoke tests and later on other data warehouse layers. |
Primary key attribute(s) in the source system entity. | ENTITY_KEY | dw_entity_key | Non-hashed and human-readable entity key, can be used later on other data warehouse layers and for troubleshooting purposes. |
All mapped source entity attributes including the key attributes. | COMPARISON_HASH | dw_hash | Change tracking comparison attribute used in the entity load. Map all mapped source entity attributes including the key attributes here. Consider excluding e.g. metadata attributes when you do not want them to affect the change capture. |
- | RUN_ID | dw_run_id | Used optionally with Run ID Logic. |
- | CURRENT_TS | meta_insert_time | Record insert timestamp |
- | CURRENT_TS | meta_update_time | Record update timestamp (see notes in entity type configuration above) |
- | LOAD_NAME | meta_load_name | Metadata attribute, see transformation description above. |
- | PACKAGE_VERSION | meta_package_version | Metadata attribute, see transformation description above. |
stg_source_system | - | meta_source_system | Source system name mapped from stage. |
stg_source_entity | - | meta_source_entity | Source entity name mapped from stage. |
stg_file_name | - | meta_file_name | Source file name mapped from stage. |
All source entity attributes. | NONE | Multiple, depending on source entity. | All source entity attributes mapped to target attributes. |
Staging metadata attributes
Note that staging tables in this example have metadata attributes stg_source_system, stg_source_entity and stg_file_name. For further traceability, you could add e.g. stg_file_hash to track whether source file content has changed.
It is a good practice to define the needed metadata attributes in staging tables so that they can be easily mapped to further data warehouse zones in loads. See more in Customizing default attributes and transformations
4. Set desired load options:
Load option | Notes |
---|---|
Enables the insert-only load logic that compares incoming data against the latest version in the entity by entity key using the comparison hashes. Note that for this to work, you need to ensure that the following technical attribute types are set:
These were configured in the defaults above. | |
ALTERNATIVELY use the merge logic in special cases where you do not need to store historical changes. | |
OPTIONALLY enable Run ID Logic. | |
OPTIONALLY use with Run ID Logic, set value 1 to load one run id per transaction ensuring that source data batches are inserted one at a time and in the correct order. This way there will not be duplicates over entity key and insert timestamp as long as there are no duplicates within one batch. | |
OPTIONALLY enable Timeslot Logic. Note that you need to set technical attribute type TIMESLOT for a timestamp attribute in the target entity. When executing the load, Agile Data Engine will get the minimum and maximum values in the incoming data over this attribute and use them to filter down the existence check when inserting into the target entity. | |
OPTIONALLY scan the latest version of the data from the source ensuring that no duplicates will be inserted into the target entity. | |
OPTIONALLY use with OPT_SCAN_ONLY_THE_LATEST_BY_KEY to define over which key the latest version is scanned. |
Use the Export SQL feature to preview entity SQL & load SQL.
Example load SQL using OPT_HISTORIZED_BY_DATAHASH_AS_INSERT:
INSERT INTO psa.PSA_DEPARTMENT (
dw_id
, dw_entity_key
, dw_hash
, dw_run_id
, meta_insert_time
, meta_update_time
, meta_load_name
, meta_package_version
, meta_source_system
, meta_source_entity
, meta_file_name
, departmentid
, name
, groupname
, modifieddate
)
SELECT DISTINCT
dw_id
, dw_entity_key
, dw_hash
, dw_run_id
, meta_insert_time
, meta_update_time
, meta_load_name
, meta_package_version
, meta_source_system
, meta_source_entity
, meta_file_name
, departmentid
, name
, groupname
, modifieddate
FROM (
SELECT
MD5(NVL(UPPER(CAST(departmentid AS VARCHAR)), '-1')) AS dw_id
, NVL(UPPER(CAST(departmentid AS VARCHAR)), '-1') AS dw_entity_key
, MD5(NVL(CAST(departmentid AS VARCHAR), '-1') || '~' || NVL(CAST(name AS VARCHAR), '-1') || '~' || NVL(CAST(groupname AS VARCHAR), '-1') || '~' || NVL(CAST(modifieddate AS VARCHAR), '-1') ) AS dw_hash
, <targetrunid> AS dw_run_id
, CURRENT_TIMESTAMP::timestamp_ntz AS meta_insert_time
, CURRENT_TIMESTAMP::timestamp_ntz AS meta_update_time
, 'load_psa_department_from_stg_adventureworks_department' AS meta_load_name
, 1001274 AS meta_package_version
, stg_source_system AS meta_source_system
, stg_source_entity AS meta_source_entity
, stg_file_name AS meta_file_name
, departmentid AS departmentid
, name AS name
, groupname AS groupname
, modifieddate AS modifieddate
FROM
staging.STG_ADVENTUREWORKS_DEPARTMENT src_entity
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dw_hash
FROM (
SELECT
s.dw_hash
, ROW_NUMBER()
OVER (
PARTITION BY
s.dw_id
ORDER BY
s.meta_insert_time DESC
)
AS
dv_load_time_last
FROM
edw.DW_TEST s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dw_hash = src.dw_hash
)
;