Skip to main content
Skip table of contents

Sensitive Data Vault with FTL Templates

This documentation provides practical examples for implementing a sensitive data vault model, inspired by the ADE blog post on data modeling for data protection. It focuses on segregating sensitive and non-sensitive data within the data warehouse to comply with regulations like the General Data Protection Regulation (GDPR), particularly concerning individuals' rights to access and erase their personal data.

In data vault implementations, usually hashed business keys are used as entity identifiers. However, this approach poses challenges when handling sensitive business keys, such as social security numbers, because hashes can potentially be reversed if the original business keys are known. To address this, the examples use random surrogate keys and key lookups when linking sensitive entities. This strategy allows for the removal of sensitive data without compromising the integrity of the overall data model.


See also:


Load transformations

The examples use the default transformation types defined in GitHub repository agile-data-engine/ade-default-configuration. However, the approach is compatible with most default data vault transformations for hash keys, business keys etc.

Additionally, the following load transformation types are defined in CONFIG_LOAD_TRANSFORMATIONS:

SENSITIVE_DV_GENERATED_ID

This transformation type is used to generate a hashed, randomized dv_id for hubs that contain sensitive business keys.

Databricks SQL
SQL
MD5(UUID())
Google BigQuery
SQL
TO_HEX(MD5(GENERATE_UUID()))
Snowflake
SQL
MD5(UUID_STRING())

SENSITIVE_DV_ID_LOOKUP

This transformation type is used in conjunction with FTL templates to look up the randomized dv_id from a sensitive hub when loading satellites and links that reference a sensitive hub. This transformation can also be used with status satellites when the driving key is the sensitive business key. The sensitive hub should be included as the second entity mapping in the load configuration to enable this lookup.

Databricks SQL, Google BigQuery, Snowflake
NONE
<#-- Gets attribute name for DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
    <#stop "Error: SENSITIVE_DV_ID_LOOKUP transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>
src_lookup.${referenceTableKey.name}

SENSITIVE_DV_LINK_ID

Link dv_ids are typically composed from all the attributes used in the link's hub references. This transformation is used to incorporate the looked-up randomized hub key into the link’s dv_id when the link is referencing a sensitive hub. This transformation can also be used with link status satellites.

Databricks SQL
NONE
<#-- Handle optional prefixing -->
<#macro prefixHandler prefix><#if prefix?? && prefix?length &gt; 0>${prefix}.</#if></#macro>

<#-- Normalize and cast attributes based on datatype -->
<#macro castClause attribute prefix>
    <#if attribute.isAdditional?? && attribute.isAdditional>
        <#-- Prefix not added here -->
        ${attribute.attributeName}
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("bool")>
        COALESCE(CAST(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS INT) AS STRING), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("char")>
        COALESCE(NULLIF(TRIM(<@prefixHandler prefix/>${attribute.attributeName}), ''), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "date">
        COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "yyyy-MM-dd"), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "time">
        COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "kk:mm:ss.SSSSSS"), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp">
        COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "yyyy-MM-dd['T']kk:mm:ss.SSSSSS"), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp_tz">
        COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "yyyy-MM-dd['T']kk:mm:ss.SSSSSSxxx"), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("array")>
        COALESCE(ARRAY_JOIN(<@prefixHandler prefix/>${attribute.attributeName},','), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("struct")>
        COALESCE(TO_JSON(<@prefixHandler prefix/>${attribute.attributeName}), '-1')
    <#else>
        COALESCE(NULLIF(TRIM(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS STRING)), ''), '-1')
    </#if>
</#macro>

<#-- Create a composite string from all attributes -->
<#macro sourceAttributeList sourceAttributes prefix>
  <@compress single_line=true>
    <#if sourceAttributes?size == 0>
      null
    <#else>
      <#list sourceAttributes as attribute>
        <@castClause attribute prefix/><#if attribute_has_next> || '~' || </#if>
      </#list>
    </#if>
  </@compress>
</#macro>

<#-- Get DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
    <#stop "Error: SENSITIVE_DV_LINK_ID transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>

<#-- Extend source attributes list with lookup key -->
<#assign additionalAttr = {
  "attributeName": "src_lookup.${referenceTableKey.name}",
  "datatype": referenceTableKey.datatype,
  "isAdditional": true
}>
<#assign extendedSourceAttributes = sourceAttributes + [additionalAttr]>

<#-- Output final hash key -->
MD5(UPPER(<@sourceAttributeList sourceAttributes=extendedSourceAttributes prefix=prefix/>))
Google BigQuery
NONE
<#-- Handle optional prefixing -->
<#macro prefixHandler prefix><#if prefix?? && prefix?length &gt; 0>${prefix}.</#if></#macro>

<#-- Normalize and cast attributes based on datatype -->
<#macro castClause attribute prefix>
    <#if attribute.isAdditional?? && attribute.isAdditional>
        <#-- Prefix not added here -->
        ${attribute.attributeName}
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("bool")>
        COALESCE(CAST(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS INT64) AS STRING), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("char")>
        COALESCE(NULLIF(TRIM(<@prefixHandler prefix/>${attribute.attributeName}), ''), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("geography")>
        COALESCE(ST_ASTEXT(<@prefixHandler prefix/>${attribute.attributeName}), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("json")>
        COALESCE(NULLIF(TO_JSON_STRING(<@prefixHandler prefix/>${attribute.attributeName}), 'null'), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "date">
        COALESCE(FORMAT_DATE('%Y-%m-%d',<@prefixHandler prefix/>${attribute.attributeName}), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "time">
        COALESCE(FORMAT_TIME('%H:%M:%E6S',<@prefixHandler prefix/>${attribute.attributeName}), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp_tz">
        COALESCE(FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%E6S%Ez', <@prefixHandler prefix/>${attribute.attributeName}), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp">
        COALESCE(FORMAT_DATETIME('%Y-%m-%dT%H:%M:%E6S', <@prefixHandler prefix/>${attribute.attributeName}), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "decimal">
        COALESCE(TRIM(FORMAT('%${attribute.dataPrecision}.${attribute.dataScale}f', <@prefixHandler prefix/>${attribute.attributeName})), '-1')       
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("array")>
        COALESCE(NULLIF(ARRAY_TO_STRING(<@prefixHandler prefix/>${attribute.attributeName},','),''), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("struct")>
        COALESCE(NULLIF(TO_JSON_STRING(TO_JSON(<@prefixHandler prefix/>${attribute.attributeName})), 'null'), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("variant")>
        COALESCE(NULLIF(TO_JSON_STRING(<@prefixHandler prefix/>${attribute.attributeName}), 'null'), '-1')
    <#else>
        COALESCE(NULLIF(TRIM(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS STRING)), ''), '-1')
    </#if>
</#macro>

<#-- Create a composite string from all attributes -->
<#macro sourceAttributeList sourceAttributes prefix>
  <@compress single_line=true>
    <#if sourceAttributes?size == 0>
      null
    <#else>
      CONCAT(<#list sourceAttributes as attribute><@castClause attribute prefix/><#if attribute_has_next>,'~',</#if></#list>)
    </#if>
  </@compress>
</#macro>

<#-- Get DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
    <#stop "Error: SENSITIVE_DV_LINK_ID transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>

<#-- Extend source attributes list with lookup key -->
<#assign additionalAttr = {
  "attributeName": "src_lookup.${referenceTableKey.name}",
  "datatype": referenceTableKey.datatype,
  "isAdditional": true
}>
<#assign extendedSourceAttributes = sourceAttributes + [additionalAttr]>

<#-- Output final hash key -->
TO_HEX(MD5(UPPER(<@sourceAttributeList sourceAttributes=extendedSourceAttributes prefix=prefix/>)))
Snowflake
NONE
<#-- Handle optional prefixing -->
<#macro prefixHandler prefix><#if prefix?? && prefix?length &gt; 0>${prefix}.</#if></#macro>

<#-- Normalize and cast attributes based on datatype -->
<#macro castClause attribute prefix>
    <#if attribute.isAdditional?? && attribute.isAdditional>
        <#-- Prefix not added here -->
        ${attribute.attributeName}
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("bool")>
        CASE WHEN <@prefixHandler prefix/>${attribute.attributeName} IS NULL THEN '-1' ELSE IFF(<@prefixHandler prefix/>${attribute.attributeName}, '1','0') END
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("char")>
        NVL(NULLIF(TRIM(<@prefixHandler prefix/>${attribute.attributeName}), ''), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "date">
        NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'YYYY-MM-DD'), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("geography")>
        CASE WHEN <@prefixHandler prefix/>${attribute.attributeName} IS NULL THEN '-1' ELSE st_astext(<@prefixHandler prefix/>${attribute.attributeName}) END
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "time">
        NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'HH24:MI:SS.FF6'), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp">
        NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'YYYY-MM-DDTHH24:MI:SS.FF6'), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp_tz">
        NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'YYYY-MM-DDTHH24:MI:SS.FF6TZH:TZM'), '-1')
    <#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("array")>
        NVL(ARRAY_TO_STRING(<@prefixHandler prefix/>${attribute.attributeName},','), '-1')
    <#else>
        NVL(NULLIF(TRIM(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS VARCHAR)), ''), '-1')
    </#if>
</#macro>

<#-- Create a composite string from all attributes -->
<#macro sourceAttributeList sourceAttributes prefix>
  <@compress single_line=true>
    <#if sourceAttributes?size == 0>
      null
    <#else>
      <#list sourceAttributes as attribute>
        <@castClause attribute prefix/><#if attribute_has_next> || '~' || </#if>
      </#list>
    </#if>
  </@compress>
</#macro>

<#-- Get DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
    <#stop "Error: SENSITIVE_DV_LINK_ID transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>

<#-- Extend source attributes list with lookup key -->
<#assign additionalAttr = {
  "attributeName": "src_lookup.${referenceTableKey.name}",
  "datatype": referenceTableKey.datatype,
  "isAdditional": true
}>
<#assign extendedSourceAttributes = sourceAttributes + [additionalAttr]>

<#-- Output final hash key -->
MD5(UPPER(<@sourceAttributeList sourceAttributes=extendedSourceAttributes prefix=prefix/>))

FTL templates

The following FTL templates are defined in CONFIG_LOAD_TEMPLATES:

sensitive_dv_hub

Databricks SQL, Google BigQuery, Snowflake
NONE
<#assign target = model.load.target>

<#-- Target dv_id -->
<#assign targetDvId = target.attributes?filter(attr -> attr.attributeType = "DV_HASHKEY")>

<#-- Target attributes without randomized dv_id -->
<#assign targetAttributesWithoutDvId = target.attributes?filter(attr -> attr.attributeType != "DV_HASHKEY")>

<#-- Stage entity #-->
<#assign stage = loads.getSourceEntity()>

<#-- Hub attributes #-->
<#assign targetBk = loads.getAttributeForType(target.attributes, "BUSINESS_KEY")>
<#assign targetLt = loads.getAttributeForType(target.attributes, "DV_LOAD_TIME")>

<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>

<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
  <#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
  <#assign whereClauses += [optWhere]>
</#if>

INSERT INTO ${target.schemaName}.${target.name} (
    <@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT
   <@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
    SELECT
        <@sql.generateTargetAttributesTransformList attributes=targetDvId indentation=2/>
        ,<@sql.generateEntityAttributeList attributes=targetAttributesWithoutDvId indentation=2/>
    FROM (
        SELECT DISTINCT
            <@sql.generateTargetAttributesTransformList attributes=targetAttributesWithoutDvId prefix="src_entity" indentation=3/>
        FROM ${stage.schemaName}.${stage.name} src_entity
        <#-- Use whereClauses if it has elements -->
        <#if whereClauses?? && whereClauses?has_content> 
        WHERE
            ${whereClauses?join(" AND ")}
        </#if>
    )
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            ${target.schemaName}.${target.name} trg
        WHERE
            trg.${targetBk.name} = src.${targetBk.name}
    )

sensitive_dv_satellite

Databricks SQL, Google BigQuery, Snowflake
NONE
<#assign target = model.load.target>

<#-- Target sat dv_id, dv_datahash and dv_load_time #-->
<#assign targetKey = loads.getAttributeForType(target.attributes, "DV_REFERENCING_HASHKEY")>
<#assign targetHash = loads.getAttributeForType(target.attributes,"DV_DATAHASH")>
<#assign targetLoadTime = loads.getAttributeForType(target.attributes, "DV_LOAD_TIME")>

<#-- Stage & hub entities #-->
<#assign stage = loads.getSourceEntity()>
<#assign hub = loads.getSourceEntity(2)>

<#-- Changes transformation for the sensitive key attribute to "BUSINESS_KEY" so that it can be used in the join #-->
<#assign changeToBusinessKey = { targetKey.name: "BUSINESS_KEY" }>
<#assign generatedBusinessKey = loads.getSingleAttributeTransformation(targetKey,"src_entity",changeToBusinessKey)>
<#assign lookupBusinessKey = loads.getAttributeForType(hub.attributes, "BUSINESS_KEY")>

<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>

<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
  <#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
  <#assign whereClauses += [optWhere]>
</#if>

INSERT INTO ${target.schemaName}.${target.name} (
    <@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
   <@sql.generateEntityAttributeList attributes=target.attributes />
FROM (
    SELECT
    <@sql.generateTargetAttributesTransformList attributes=target.attributes indentation=2 />
    FROM ${stage.schemaName}.${stage.name} src_entity
    JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
    <#-- Use whereClauses if it has elements -->
    <#if whereClauses?? && whereClauses?has_content>
    WHERE
        ${whereClauses?join(" AND ")}
    </#if>
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.${targetKey.name}
                ,t1.${targetHash.name}
            FROM (
                SELECT
                    s.${targetKey.name}
                    ,s.${targetHash.name}
                    , ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.${targetKey.name}
                            ORDER BY
                                s.${targetLoadTime.name} DESC
                        )
                        AS
                            dv_load_time_last
                FROM
                    ${target.schemaName}.${target.name} s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.${targetHash.name} = src.${targetHash.name}
            AND
            trg.${targetKey.name} = src.${targetKey.name}
    )

sensitive_dv_link

Databricks SQL, Google BigQuery, Snowflake
NONE
<#assign target = model.load.target>

<#-- Sensitive hub referencing attribute #-->
<#assign foundAttributes = []>
<#list target.attributes as attribute>
    <#if attribute.transformationFormulaType?? && attribute.transformationFormulaType = "SENSITIVE_DV_ID_LOOKUP">
        <#assign foundAttributes += [attribute]>
    </#if>
</#list>
<#assign hubReference = foundAttributes[0]>

<#-- Link dv_id attribute #-->
<#assign targetKey = loads.getAttributeForType(target.attributes,"DV_HASHKEY")>

<#-- Stage & hub entities #-->
<#assign stage = loads.getSourceEntity()>
<#assign hub = loads.getSourceEntity(2)>

<#-- Changes transformation for the sensitive key attribute to "BUSINESS_KEY" so that it can be used in the join #-->
<#assign changeToBusinessKey = { hubReference.name: "BUSINESS_KEY" }>
<#assign generatedBusinessKey = loads.getSingleAttributeTransformation(hubReference,"src_entity",changeToBusinessKey)>

<#-- Hub sensitive business key attribute #-->
<#assign lookupBusinessKey = loads.getAttributeForType(hub.attributes, "BUSINESS_KEY")>

<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>

<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
  <#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
  <#assign whereClauses += [optWhere]>
</#if>

INSERT INTO ${target.schemaName}.${target.name} (
    <@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
   <@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
    SELECT
    <@sql.generateTargetAttributesTransformList attributes=target.attributes prefix="src_entity" indentation=2/>
    FROM ${stage.schemaName}.${stage.name} src_entity
    JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
    <#-- Use whereClauses if it has elements -->
    <#if whereClauses?? && whereClauses?has_content>
    WHERE
        ${whereClauses?join(" AND ")}
    </#if>
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            ${target.schemaName}.${target.name} trg
        WHERE
            trg.${targetKey.name} = src.${targetKey.name}
    )

sensitive_dv_status_satellite

Databricks SQL, Google BigQuery, Snowflake
NONE
<#assign target = model.load.target>

<#-- Sensitive hub referencing attribute #-->
<#assign foundAttributes = []>
<#list target.attributes as attribute>
    <#if attribute.transformationFormulaType?? && attribute.transformationFormulaType = "SENSITIVE_DV_ID_LOOKUP">
        <#assign foundAttributes += [attribute]>
    </#if>
</#list>
<#assign hubReference = foundAttributes[0]>

<#-- Target attributes without load time, run id, status -->
<#assign excludeTypes = ["DV_LOAD_TIME", "STATUS", "RUN_ID"]>
<#assign targetAttributesExcluded_0 = target.attributes?filter(attr -> 
    !(excludeTypes?seq_contains(attr.attributeType))
)>

<#-- Target attributes without load time -->
<#assign excludeTypes = ["DV_LOAD_TIME"]>
<#assign targetAttributesExcluded_1 = target.attributes?filter(attr -> 
    !(excludeTypes?seq_contains(attr.attributeType))
)>

<#-- Target technical attributes #-->
<#assign targetKey = loads.getAttributeForType(target.attributes, "DV_REFERENCING_HASHKEY")>
<#assign targetDrivingKey = loads.getAttributeForType(target.attributes,"DV_DRIVING_KEY")>
<#assign targetLoadTime = loads.getAttributeForType(target.attributes, "DV_LOAD_TIME")>
<#assign targetStatus = loads.getAttributeForType(target.attributes, "STATUS")>
<#assign targetRunId = loads.getAttributeForType(target.attributes, "RUN_ID")>

<#-- Load time transformation #-->
<#assign loadTimeTransform = loads.getSingleAttributeTransformation(targetLoadTime)>

<#-- Stage & hub entities #-->
<#assign stage = loads.getSourceEntity()>
<#assign hub = loads.getSourceEntity(2)>

<#-- Changes transformation for the sensitive key attribute to "BUSINESS_KEY" so that it can be used in the join #-->
<#assign changeToBusinessKey = { hubReference.name: "REFERENCE_BUSINESS_KEY" }>
<#assign generatedBusinessKey = loads.getSingleAttributeTransformation(hubReference,"src_entity",changeToBusinessKey)>

<#-- Hub sensitive business key attribute #-->
<#assign lookupBusinessKey = loads.getAttributeForType(hub.attributes, "BUSINESS_KEY")>

<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>
<#assign optDeltaExtract = loads.getLoadOptionOrDefault("OPT_DELTA_EXTRACT", "")>

<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
  <#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
  <#assign whereClauses += [optWhere]>
</#if>

<#-- Delta or full extract, default delta #-->
<#if optDeltaExtract?? && optDeltaExtract?has_content && optDeltaExtract == "false">
    <#assign isDelta = false>
<#else>
    <#assign isDelta = true>
</#if>


<#-- Load step outputs per step_type #-->
<#if step_type?? && step_type?has_content>

<#if step_type == "gatekeeper">
SELECT
    COUNT(1) AS gatekeeper
FROM
    ${stage.schemaName}.${stage.name} src_entity  
<#-- Use whereClauses if it has elements -->
<#if whereClauses?? && whereClauses?has_content>
WHERE
    ${whereClauses?join(" AND ")}
</#if>
HAVING
    COUNT(1) > 0


<#elseif step_type == "drop_temp">
DROP TABLE IF EXISTS ${target.schemaName}.Z_TMP_${target.name}


<#elseif step_type == "temp_1">
<#-- Using temporary table when supported -->
<#assign dbms = ["SNOWFLAKE"]>
<#if dbms?seq_contains(model.dbmsProduct)>
    CREATE TEMPORARY TABLE
<#else>
    CREATE TABLE
</#if>
    ${target.schemaName}.Z_TMP_${target.name}
AS
SELECT DISTINCT
    <@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
    SELECT
    <@sql.generateTargetAttributesTransformList attributes=target.attributes prefix="src_entity" indentation=2/>
    FROM ${stage.schemaName}.${stage.name} src_entity
    JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
    <#-- Use whereClauses if it has elements -->
    <#if whereClauses?? && whereClauses?has_content>
    WHERE
        ${whereClauses?join(" AND ")}
    </#if>
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.${targetKey.name}
                ,t1.${targetStatus.name}
            FROM (
                SELECT
                    s.${targetKey.name}
                    ,s.${targetStatus.name}
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.${targetDrivingKey.name}
                            ORDER BY
                                s.${targetLoadTime.name} DESC
                        ) AS dv_load_time_last
                FROM
                    ${target.schemaName}.${target.name} s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.${targetKey.name} = src.${targetKey.name}
            AND trg.${targetStatus.name} = 1
    )


<#elseif step_type == "insert_0">
INSERT INTO ${target.schemaName}.${target.name} (
    <@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
    <@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
    SELECT
        <@sql.generateEntityAttributeList attributes=target.attributes prefix="trg"/>
    FROM (  
        SELECT
            <@sql.generateEntityAttributeList attributes=targetAttributesExcluded_0 indentation=3/>
            , 0 AS ${targetStatus.name}
            , <@sql.generateTargetAttributesTransformList attributes=[targetLoadTime, targetRunId] indentation=3/>
        FROM
            ${target.schemaName}.${target.name}_C
    ) trg
    <#if isDelta>
    JOIN
        ${target.schemaName}.Z_TMP_${target.name} src ON (
            trg.${targetStatus.name} = 1
            AND trg.${targetDrivingKey.name} = src.${targetDrivingKey.name}
        )
    <#else>
    LEFT JOIN (
        SELECT DISTINCT
            <@sql.generateTargetAttributesTransformList attributes=[targetKey] prefix="src_entity" indentation=3/>
        FROM ${stage.schemaName}.${stage.name} src_entity
        JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
        <#-- Use whereClauses if it has elements -->
        <#if whereClauses?? && whereClauses?has_content>
        WHERE
            ${whereClauses?join(" AND ")}
        </#if>
    ) src
    ON (
        trg.dv_id = src.dv_id
    )
    WHERE
        trg.${targetStatus.name} = 1
        AND src.${targetKey.name} IS NULL
    </#if>
)


<#elseif step_type == "insert_1">
INSERT INTO ${target.schemaName}.${target.name} (
    <@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
    <@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
    SELECT
        <@sql.generateEntityAttributeList attributes=targetAttributesExcluded_1 indentation=2/>
        , ${loadTimeTransform} AS ${targetLoadTime.name}
    FROM
        ${target.schemaName}.Z_TMP_${target.name}
)

<#else>
    <#stop "Error: Invalid step_type value '${step_type}'. Allowed values are: gatekeeper, drop_temp, temp_1, insert_0, insert_1.">
</#if>
<#else>
    <#stop "Error: Parameter 'step_type' is not set or is empty.">
</#if>

The following technical attribute types are referenced by the templates (depending on the entity type):

  • DV_HASHKEY

  • DV_REFERENCING_HASHKEY

  • BUSINESS_KEY

  • DV_LOAD_TIME

  • DV_DATAHASH

  • DV_DRIVING_KEY

  • STATUS

  • RUN_ID

In addition to the transformation types created above, the following transformation types are referenced by the templates:

  • BUSINESS_KEY

The following load options are supported:

Additionally, load options that do not affect the load logic are supported if applicable.

Entity types

The examples use the default entity types:

  • HUB

  • LINK

  • SAT

  • S_SAT

Optionally, custom entity types can be configured, for example, for sensitive hubs and satellites containing sensitive attributes. This allows for the use of custom naming conventions, assignment to a separate default schema, and application of other default configurations tailored to entities that handle personally identifiable information (PII).


Examples

Sensitive hub

In this example, we will create a sensitive hub for customers, which will store identifiable customer keys:

Entity type

HUB

Entity name

H_CUSTOMER

Schema

sdv

Note that the schema is set to sdv, which differs from the default rdv (raw data vault) schema. This separation helps isolate sensitive information and, for example, simplifies the management of access rights.

A load and an entity mapping from staging is created as follows:

STG_CUSTOMER

Transformation

H_CUSTOMER

-

SENSITIVE_DV_GENERATED_ID

dv_id

-

CURRENT_TS

dv_load_time

-

RUN_ID

dv_run_id

stg_source_system

-

dv_source_system

stg_source_entity

-

dv_source_entity

-

PACKAGE_VERSION

dv_package_version

-

LOAD_NAME

dv_load_name

customerid

BUSINESS_KEY

business_key

An FTL template reference is added as an OVERRIDE load step:

CODE
<ftltemplate:sensitive_dv_hub>

Load option:

Generated load logic:

Databricks SQL
SQL
INSERT INTO sdv.H_CUSTOMER (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , business_key
)
SELECT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , business_key
FROM (
    SELECT
        MD5(UUID()) AS dv_id
        , dv_load_time
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , business_key

    FROM (
        SELECT DISTINCT
            CURRENT_TIMESTAMP() AS dv_load_time
            , <targetrunid> AS dv_run_id
            , src_entity.stg_source_system AS dv_source_system
            , src_entity.stg_source_entity AS dv_source_entity
            , <packageversion> AS dv_package_version
            , 'load_h_customer_from_stg_customer_01' AS dv_load_name
            , UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) AS business_key
        FROM staging.STG_CUSTOMER src_entity
        WHERE
            src_entity.stg_batch_id IN (<loadablerunids>)
    )
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            sdv.H_CUSTOMER trg
        WHERE
            trg.business_key = src.business_key
    );
Google BigQuery
SQL
INSERT INTO sdv.H_CUSTOMER (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , business_key
)
SELECT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , business_key
FROM (
    SELECT
        TO_HEX(MD5(GENERATE_UUID())) AS dv_id
        , dv_load_time
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , business_key

    FROM (
        SELECT DISTINCT
            CURRENT_TIMESTAMP() AS dv_load_time
            , <targetrunid> AS dv_run_id
            , src_entity.stg_source_system AS dv_source_system
            , src_entity.stg_source_entity AS dv_source_entity
            , <packageversion> AS dv_package_version
            , 'load_h_customer_from_stg_customer_01' AS dv_load_name
            , UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) AS business_key
        FROM staging.STG_CUSTOMER src_entity
        WHERE
            src_entity.stg_batch_id IN (<loadablerunids>)
    )
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            sdv.H_CUSTOMER trg
        WHERE
            trg.business_key = src.business_key
    );
Snowflake
SQL
INSERT INTO sdv.H_CUSTOMER (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , business_key
)
SELECT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , business_key
FROM (
    SELECT
        MD5(UUID_STRING()) AS dv_id
        , dv_load_time
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , business_key

    FROM (
        SELECT DISTINCT
            CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
            , <targetrunid> AS dv_run_id
            , src_entity.stg_source_system AS dv_source_system
            , src_entity.stg_source_entity AS dv_source_entity
            , <packageversion> AS dv_package_version
            , 'load_h_customer_from_stg_customer_01' AS dv_load_name
            , UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) AS business_key
        FROM staging.STG_CUSTOMER src_entity
        WHERE
            src_entity.stg_batch_id IN (<loadablerunids>)
    )
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            sdv.H_CUSTOMER trg
        WHERE
            trg.business_key = src.business_key
    );

Satellite for a sensitive hub

In this example, we will create a satellite for the sensitive customer hub to store sensitive customer attributes:

Entity type

SAT

Entity name

S_CUSTOMER

Schema

sdv

A good practice is to separate sensitive and non-sensitive attributes into different satellites, placing sensitive data in the sdv schema and non-sensitive data in the standard rdv schema. This separation supports clearer data governance and simplifies tasks such as planning the erasure of identifiable information. However, it is not always straightforward to determine which attributes are sensitive or whether combinations of attributes could lead to re-identification. These decisions should always be made carefully and in collaboration with subject matter experts.

A load and two entity mappings are added:

Source entity

Position

Type

STG_CUSTOMER

1

SOURCE (Driving Run ID Logic)

H_CUSTOMER

2

SOURCE

Attribute mapping from staging:

STG_CUSTOMER

Transformation

S_CUSTOMER

customerid

SENSITIVE_DV_ID_LOOKUP

dv_id

-

CURRENT_TS

dv_load_time

-

RUN_ID

dv_run_id

stg_source_system

-

dv_source_system

stg_source_entity

-

dv_source_entity

-

PACKAGE_VERSION

dv_package_version

-

LOAD_NAME

dv_load_name

  1. first_name

  2. last_name

  3. postal_address

  4. phone_number

HASH_DIFF

dv_datahash

first_name

-

first_name

last_name

-

last_name

postal_address

-

postal_address

phone_number

-

phone_number

An FTL template reference is added as an OVERRIDE load step:

CODE
<ftltemplate:sensitive_dv_satellite>

Load options:

Generated load logic:

Databricks SQL
SQL
INSERT INTO sdv.S_CUSTOMER (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_datahash
    , first_name
    , last_name
    , postal_address
    , phone_number
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_datahash
    , first_name
    , last_name
    , postal_address
    , phone_number
FROM (
    SELECT
        src_lookup.dv_id AS dv_id
        , CURRENT_TIMESTAMP() AS dv_load_time
        , <targetrunid> AS dv_run_id
        , stg_source_system AS dv_source_system
        , stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_s_customer_from_stg_customer_01' AS dv_load_name
        , MD5(COALESCE(first_name, '-1') || '~' || COALESCE(last_name, '-1') || '~' || COALESCE(postal_address, '-1') || '~' || COALESCE(phone_number, '-1')) AS dv_datahash
        , first_name AS first_name
        , last_name AS last_name
        , postal_address AS postal_address
        , phone_number AS phone_number
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.dv_datahash
            FROM (
                SELECT
                    s.dv_id
                    ,s.dv_datahash
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_id
                            ORDER BY
                                s.dv_load_time DESC
                        )
                        AS
                            dv_load_time_last
                FROM
                    sdv.S_CUSTOMER s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_datahash = src.dv_datahash
            AND
            trg.dv_id = src.dv_id
    );
Google BigQuery
SQL
INSERT INTO sdv.S_CUSTOMER (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_datahash
    , first_name
    , last_name
    , postal_address
    , phone_number
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_datahash
    , first_name
    , last_name
    , postal_address
    , phone_number
FROM (
    SELECT
        src_lookup.dv_id AS dv_id
        , CURRENT_TIMESTAMP() AS dv_load_time
        , <targetrunid> AS dv_run_id
        , stg_source_system AS dv_source_system
        , stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_s_customer_from_stg_customer' AS dv_load_name
        , TO_HEX(MD5(CONCAT(COALESCE(first_name, '-1') ,'~',COALESCE(last_name, '-1') ,'~',COALESCE(postal_address, '-1') ,'~',COALESCE(phone_number, '-1')))) AS dv_datahash
        , first_name AS first_name
        , last_name AS last_name
        , postal_address AS postal_address
        , phone_number AS phone_number
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.dv_datahash
            FROM (
                SELECT
                    s.dv_id
                    ,s.dv_datahash
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_id
                            ORDER BY
                                s.dv_load_time DESC
                        )
                        AS
                            dv_load_time_last
                FROM
                    sdv.S_CUSTOMER s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_datahash = src.dv_datahash
            AND
            trg.dv_id = src.dv_id
    );
Snowflake
SQL
INSERT INTO sdv.S_CUSTOMER (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_datahash
    , first_name
    , last_name
    , postal_address
    , phone_number
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_datahash
    , first_name
    , last_name
    , postal_address
    , phone_number
FROM (
    SELECT
        src_lookup.dv_id AS dv_id
        , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
        , <targetrunid> AS dv_run_id
        , stg_source_system AS dv_source_system
        , stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_s_customer_from_stg_customer_01' AS dv_load_name
        , MD5(NVL(first_name, '-1') || '~' || NVL(last_name, '-1') || '~' || NVL(postal_address, '-1') || '~' || NVL(phone_number, '-1')) AS dv_datahash
        , first_name AS first_name
        , last_name AS last_name
        , postal_address AS postal_address
        , phone_number AS phone_number
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.dv_datahash
            FROM (
                SELECT
                    s.dv_id
                    ,s.dv_datahash  
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_id
                            ORDER BY
                                s.dv_load_time DESC
                        )
                        AS
                            dv_load_time_last
                FROM
                    sdv.S_CUSTOMER s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_datahash = src.dv_datahash
            AND
            trg.dv_id = src.dv_id
    );

In this example, we will create a link that references a sensitive hub:

Entity type

LINK

Entity name

L_CUSTOMER_STORE_TERRITORY

Schema

rdv

Note that unlike sensitive hubs & satellites, the link is assigned to the rdv schema since it does not contain any sensitive attributes.

A load and two entity mappings are added:

Source entity

Position

Type

STG_CUSTOMER

1

SOURCE (Driving Run ID Logic)

H_CUSTOMER

2

SOURCE

Attribute mapping from staging:

STG_CUSTOMER

Transformation

L_CUSTOMER_STORE_TERRITORY

  1. storeid

  2. territoryid

SENSITIVE_DV_LINK_ID

dv_id

-

CURRENT_TS

dv_load_time

-

RUN_ID

dv_run_id

stg_source_system

-

dv_source_system

stg_source_entity

-

dv_source_entity

-

PACKAGE_VERSION

dv_package_version

-

LOAD_NAME

dv_load_name

customerid

SENSITIVE_KEY_LOOKUP

dv_id_customer

storeid

HASH_KEY

dv_id_store

territoryid

HASH_KEY

dv_id_territory

Note that the sensitive customerid should not be mapped to the link dv_id. Instead, the randomized sensitive hub dv_id is joined and appended to the link dv_id by the template & transformation.

An FTL template reference is added as an OVERRIDE load step:

CODE
<ftltemplate:sensitive_dv_link>

Load options:

Generated load logic:

Databricks SQL
SQL
INSERT INTO rdv.L_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_id_customer
    , dv_id_store
    , dv_id_territory
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_id_customer
    , dv_id_store
    , dv_id_territory
FROM (
    SELECT
        MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        , CURRENT_TIMESTAMP() AS dv_load_time
        , <timemillis> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_l_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_id_customer
        , MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1'))) AS dv_id_store
        , MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1'))) AS dv_id_territory
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            rdv.L_CUSTOMER_STORE_TERRITORY trg
        WHERE
            trg.dv_id = src.dv_id
    );
Google BigQuery
SQL
INSERT INTO rdv.L_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_id_customer
    , dv_id_store
    , dv_id_territory
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_id_customer
    , dv_id_store
    , dv_id_territory
FROM (
    SELECT
        TO_HEX(MD5(UPPER(CONCAT(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id)))) AS dv_id
        , CURRENT_TIMESTAMP() AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_l_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_id_customer
        , TO_HEX(MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1')))) AS dv_id_store
        , TO_HEX(MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1')))) AS dv_id_territory
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (IFNULL(UPPER(CAST(customerid AS STRING)), '-1') = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            rdv.L_CUSTOMER_STORE_TERRITORY trg
        WHERE
            trg.dv_id = src.dv_id
    );
Snowflake
SQL
INSERT INTO rdv.L_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_id_customer
    , dv_id_store
    , dv_id_territory
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_id_customer
    , dv_id_store
    , dv_id_territory
FROM (
    SELECT
        MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_l_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_id_customer
        , MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1'))) AS dv_id_store
        , MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1'))) AS dv_id_territory
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM
            rdv.L_CUSTOMER_STORE_TERRITORY trg
        WHERE
            trg.dv_id = src.dv_id
    );

In this example, we will create a status satellite for a link that references a sensitive hub:

Entity type

S_SAT

Entity name

STS_CUSTOMER_STORE_TERRITORY

Schema

bdv

A load and two entity mappings are added:

Source entity

Position

Type

STG_CUSTOMER

1

SOURCE (Driving Run ID Logic)

H_CUSTOMER

2

SOURCE

Attribute mapping from staging:

STG_CUSTOMER

Transformation

L_CUSTOMER_STORE_TERRITORY

  1. storeid

  2. territoryid

SENSITIVE_DV_LINK_ID

dv_id

-

CURRENT_TS

dv_load_time

-

RUN_ID

dv_run_id

stg_source_system

-

dv_source_system

stg_source_entity

-

dv_source_entity

-

PACKAGE_VERSION

dv_package_version

-

LOAD_NAME

dv_load_name

customerid

SENSITIVE_KEY_LOOKUP

dv_driving_key

-

DV_STATUS

status

In this example, the sensitive customer id is set as the driving key. See Status satellite for more details about choosing the correct driving key.

Like in the link load, the sensitive customerid should not be mapped to the status satellite dv_id. Instead, the randomized sensitive hub dv_id is joined and appended to the status satellite dv_id by the template & transformation.

Load option:

Status satellite load logic has multiple steps and it varies by extract type delta vs. full. When using the FTL template sensitive_dv_status_satellite in a load step, you must specify the step_type parameter, which controls each phase of the load process. Valid parameter values are:

  • gatekeeper

  • drop_temp

  • temp_1

  • insert_0

  • insert_1

See the examples below for full and delta extract scenarios using this template.

Delta extract

Load option:

FTL template references added for each step as OVERRIDE load steps:

#

Load step name

Logic

1

drop_temp_start

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=drop_temp>

2

create_temp_table_for_status_1_keys

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=temp_1>

3

insert_status_0_keys

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=insert_0>

4

insert_status_1_keys

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=insert_1>

5

drop_temp_end

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=drop_temp>

Generated load logic:

Databricks SQL
SQL
/* 1. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

/* 2. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TABLE
    bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        , current_timestamp() AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_driving_key
        , 1 AS status
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.status
            FROM (
                SELECT
                    s.dv_id
                    ,s.status
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_driving_key
                            ORDER BY
                                s.dv_load_time DESC
                        ) AS dv_load_time_last
                FROM
                    bdv.STS_CUSTOMER_STORE_TERRITORY s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_id = src.dv_id
            AND trg.status = 1
    );

/* 3. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
    trg.dv_id
    , trg.dv_load_time
    , trg.dv_run_id
    , trg.dv_source_system
    , trg.dv_source_entity
    , trg.dv_package_version
    , trg.dv_load_name
    , trg.dv_driving_key
    , trg.status
    FROM (  
        SELECT
            dv_id
            , dv_source_system
            , dv_source_entity
            , dv_package_version
            , dv_load_name
            , dv_driving_key
            , 0 AS status
            , current_timestamp() AS dv_load_time
            , <targetrunid> AS dv_run_id
        FROM
            bdv.STS_CUSTOMER_STORE_TERRITORY_C
    ) trg
    JOIN
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY src ON (
            trg.status = 1
            AND trg.dv_driving_key = src.dv_driving_key
        )
);

/* 4. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        dv_id
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , dv_driving_key
        , status
        , current_timestamp() AS dv_load_time
    FROM
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);

/* 5. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
Google BigQuery
SQL
/* 1. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

/* 2. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
    bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        TO_HEX(MD5(UPPER(CONCAT( COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id )))) AS dv_id
        , CURRENT_TIMESTAMP() AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_driving_key
        , 1 AS status
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.status
            FROM (
                SELECT
                    s.dv_id
                    ,s.status
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_driving_key
                            ORDER BY
                                s.dv_load_time DESC
                        ) AS dv_load_time_last
                FROM
                    bdv.STS_CUSTOMER_STORE_TERRITORY s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_id = src.dv_id
            AND trg.status = 1
    );

/* 3. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
    trg.dv_id
    , trg.dv_load_time
    , trg.dv_run_id
    , trg.dv_source_system
    , trg.dv_source_entity
    , trg.dv_package_version
    , trg.dv_load_name
    , trg.dv_driving_key
    , trg.status
    FROM (  
        SELECT
            dv_id
            , dv_source_system
            , dv_source_entity
            , dv_package_version
            , dv_load_name
            , dv_driving_key
            , 0 AS status
            , CURRENT_TIMESTAMP() AS dv_load_time
            , <targetrunid> AS dv_run_id

        FROM
            bdv.STS_CUSTOMER_STORE_TERRITORY_C
    ) trg
    JOIN
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY src ON (
            trg.status = 1
            AND trg.dv_driving_key = src.dv_driving_key
        )
);

/* 4. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        dv_id
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , dv_driving_key
        , status
        , CURRENT_TIMESTAMP() AS dv_load_time
    FROM
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);

/* 5. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
Snowflake
SQL
/* 1. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

/* 2. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
    bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_driving_key
        , 1 AS status
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.status
            FROM (
                SELECT
                    s.dv_id
                    ,s.status
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_driving_key
                            ORDER BY
                                s.dv_load_time DESC
                        ) AS dv_load_time_last
                FROM
                    bdv.STS_CUSTOMER_STORE_TERRITORY s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_id = src.dv_id
            AND trg.status = 1
    );

/* 3. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
    trg.dv_id
    , trg.dv_load_time
    , trg.dv_run_id
    , trg.dv_source_system
    , trg.dv_source_entity
    , trg.dv_package_version
    , trg.dv_load_name
    , trg.dv_driving_key
    , trg.status
    FROM (  
        SELECT
            dv_id
            , dv_source_system
            , dv_source_entity
            , dv_package_version
            , dv_load_name
            , dv_driving_key
            , 0 AS status
            , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
            , <targetrunid> AS dv_run_id
        FROM
            bdv.STS_CUSTOMER_STORE_TERRITORY_C
    ) trg
    JOIN
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY src ON (
            trg.status = 1
            AND trg.dv_driving_key = src.dv_driving_key
        )
);

/* 4. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        dv_id
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , dv_driving_key
        , status
        , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
    FROM
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);

/* 5. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

Full extract

Load option:

FTL template references added for each step. Note that the first step is a GATEKEEPER step, the rest are OVERRIDE steps:

#

Load step name

Logic

1

gatekeeper_staging_data_available
(load step type = GATEKEEPER)

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=gatekeeper>

2

drop_temp_start

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=drop_temp>

3

create_temp_table_for_status_1_keys

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=temp_1>

4

insert_status_0_keys

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=insert_0>

5

insert_status_1_keys

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=insert_1>

6

drop_temp_end

CODE
<ftltemplate:sensitive_dv_status_satellite:step_type=drop_temp>

Generated load logic:

Databricks SQL
SQL
/* 1. gatekeeper_staging_data_available (GATEKEEPER - SQL) */
SELECT
    COUNT(1) AS gatekeeper
FROM
    staging.STG_CUSTOMER src_entity  
WHERE
    src_entity.stg_batch_id IN (<loadablerunids>)
HAVING
    COUNT(1) > 0;

/* 2. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

/* 3. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TABLE
    bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        , current_timestamp() AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_driving_key
        , 1 AS status
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.status
            FROM (
                SELECT
                    s.dv_id
                    ,s.status
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_driving_key
                            ORDER BY
                                s.dv_load_time DESC
                        ) AS dv_load_time_last
                FROM
                    bdv.STS_CUSTOMER_STORE_TERRITORY s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_id = src.dv_id
            AND trg.status = 1
    );

/* 4. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
    trg.dv_id
    , trg.dv_load_time
    , trg.dv_run_id
    , trg.dv_source_system
    , trg.dv_source_entity
    , trg.dv_package_version
    , trg.dv_load_name
    , trg.dv_driving_key
    , trg.status
    FROM (  
        SELECT
            dv_id
            , dv_source_system
            , dv_source_entity
            , dv_package_version
            , dv_load_name
            , dv_driving_key
            , 0 AS status
            , current_timestamp() AS dv_load_time
            , <targetrunid> AS dv_run_id

        FROM
            bdv.STS_CUSTOMER_STORE_TERRITORY_C
    ) trg
    LEFT JOIN (
        SELECT DISTINCT
            MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        FROM staging.STG_CUSTOMER src_entity
        JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
        WHERE
            src_entity.stg_batch_id IN (<loadablerunids>)
    ) src
    ON (
        trg.dv_id = src.dv_id
    )
    WHERE
        trg.status = 1
        AND src.dv_id IS NULL
);

/* 5. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        dv_id
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , dv_driving_key
        , status
        , current_timestamp() AS dv_load_time
    FROM
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);

/* 6. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
Google BigQuery
SQL
/* 1. gatekeeper_staging_data_available (GATEKEEPER - SQL) */
SELECT
    COUNT(1) AS gatekeeper
FROM
    staging.STG_CUSTOMER src_entity  
WHERE
    src_entity.stg_batch_id IN (<loadablerunids>)
HAVING
    COUNT(1) > 0;

/* 2. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

/* 3. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
    bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        TO_HEX(MD5(UPPER(CONCAT( COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id )))) AS dv_id
        , CURRENT_TIMESTAMP() AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
        , src_lookup.dv_id AS dv_driving_key
        , 1 AS status
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.status
            FROM (
                SELECT
                    s.dv_id
                    ,s.status
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_driving_key
                            ORDER BY
                                s.dv_load_time DESC
                        ) AS dv_load_time_last
                FROM
                    bdv.STS_CUSTOMER_STORE_TERRITORY s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_id = src.dv_id
            AND trg.status = 1
    );

/* 4. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
    trg.dv_id
    , trg.dv_load_time
    , trg.dv_run_id
    , trg.dv_source_system
    , trg.dv_source_entity
    , trg.dv_package_version
    , trg.dv_load_name
    , trg.dv_driving_key
    , trg.status
    FROM (  
        SELECT
            dv_id
            , dv_source_system
            , dv_source_entity
            , dv_package_version
            , dv_load_name
            , dv_driving_key
            , 0 AS status
            , CURRENT_TIMESTAMP() AS dv_load_time
            , <targetrunid> AS dv_run_id

        FROM
            bdv.STS_CUSTOMER_STORE_TERRITORY_C
    ) trg
    LEFT JOIN (
        SELECT DISTINCT
            TO_HEX(MD5(UPPER(CONCAT(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id)))) AS dv_id
        FROM staging.STG_CUSTOMER src_entity
        JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
        WHERE
            src_entity.stg_batch_id IN (<loadablerunids>)
    ) src
    ON (
        trg.dv_id = src.dv_id
    )
    WHERE
        trg.status = 1
        AND src.dv_id IS NULL
);

/* 5. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        dv_id
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , dv_driving_key
        , status
        , CURRENT_TIMESTAMP() AS dv_load_time
    FROM
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);

/* 6. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
Snowflake
SQL
/* 1. gatekeeper_staging_data_available (GATEKEEPER - SQL) */
SELECT
    COUNT(1) AS gatekeeper
FROM
    staging.STG_CUSTOMER src_entity  
WHERE
    src_entity.stg_batch_id IN (<loadablerunids>)
HAVING
    COUNT(1) > 0;

/* 2. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

/* 3. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
    bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
        , <targetrunid> AS dv_run_id
        , src_entity.stg_source_system AS dv_source_system
        , src_entity.stg_source_entity AS dv_source_entity
        , <packageversion> AS dv_package_version
        , 'load_sts_customer_store_territory_from_stg_customer' AS dv_load_name
        , src_lookup.dv_id AS dv_driving_key
        , 1 AS status
    FROM staging.STG_CUSTOMER src_entity
    JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
    WHERE
        src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
    NOT EXISTS (
        SELECT
            1
        FROM (
            SELECT
                t1.dv_id
                ,t1.status
            FROM (
                SELECT
                    s.dv_id
                    ,s.status
                    ,ROW_NUMBER()
                        OVER (
                            PARTITION BY
                                s.dv_driving_key
                            ORDER BY
                                s.dv_load_time DESC
                        ) AS dv_load_time_last
                FROM
                    bdv.STS_CUSTOMER_STORE_TERRITORY s
            ) t1
            WHERE
                t1.dv_load_time_last = 1
        ) trg
        WHERE
            trg.dv_id = src.dv_id
            AND trg.status = 1
    );

/* 4. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
    trg.dv_id
    , trg.dv_load_time
    , trg.dv_run_id
    , trg.dv_source_system
    , trg.dv_source_entity
    , trg.dv_package_version
    , trg.dv_load_name
    , trg.dv_driving_key
    , trg.status
    FROM (  
        SELECT
            dv_id
            , dv_source_system
            , dv_source_entity
            , dv_package_version
            , dv_load_name
            , dv_driving_key
            , 0 AS status
            , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
            , <targetrunid> AS dv_run_id

        FROM
            bdv.STS_CUSTOMER_STORE_TERRITORY_C
    ) trg
    LEFT JOIN (
        SELECT DISTINCT
            MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
        FROM staging.STG_CUSTOMER src_entity
        JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
        WHERE
            src_entity.stg_batch_id IN (<loadablerunids>)
    ) src
    ON (
        trg.dv_id = src.dv_id
    )
    WHERE
        trg.status = 1
        AND src.dv_id IS NULL
);

/* 5. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
)
SELECT DISTINCT
    dv_id
    , dv_load_time
    , dv_run_id
    , dv_source_system
    , dv_source_entity
    , dv_package_version
    , dv_load_name
    , dv_driving_key
    , status
FROM (
    SELECT
        dv_id
        , dv_run_id
        , dv_source_system
        , dv_source_entity
        , dv_package_version
        , dv_load_name
        , dv_driving_key
        , status
        , CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
    FROM
        bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);

/* 6. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.