Sensitive Data Vault with FTL Templates
This documentation provides practical examples for implementing a sensitive data vault model, inspired by the ADE blog post on data modeling for data protection. It focuses on segregating sensitive and non-sensitive data within the data warehouse to comply with regulations like the General Data Protection Regulation (GDPR), particularly concerning individuals' rights to access and erase their personal data.
In data vault implementations, usually hashed business keys are used as entity identifiers. However, this approach poses challenges when handling sensitive business keys, such as social security numbers, because hashes can potentially be reversed if the original business keys are known. To address this, the examples use random surrogate keys and key lookups when linking sensitive entities. This strategy allows for the removal of sensitive data without compromising the integrity of the overall data model.
See also:
CONFIG_LOAD_TEMPLATES - Configuring load templates including FTL templates
CONFIG_LOAD_TRANSFORMATIONS - Configuring load transformations
Custom FTL TEMPLATES in Entity Load Steps - FTL template reference
Load transformations
The examples use the default transformation types defined in GitHub repository agile-data-engine/ade-default-configuration. However, the approach is compatible with most default data vault transformations for hash keys, business keys etc.
Additionally, the following load transformation types are defined in CONFIG_LOAD_TRANSFORMATIONS:
SENSITIVE_DV_GENERATED_ID
This transformation type is used to generate a hashed, randomized dv_id
for hubs that contain sensitive business keys.
MD5(UUID())
TO_HEX(MD5(GENERATE_UUID()))
MD5(UUID_STRING())
SENSITIVE_DV_ID_LOOKUP
This transformation type is used in conjunction with FTL templates to look up the randomized dv_id
from a sensitive hub when loading satellites and links that reference a sensitive hub. This transformation can also be used with status satellites when the driving key is the sensitive business key. The sensitive hub should be included as the second entity mapping in the load configuration to enable this lookup.
<#-- Gets attribute name for DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
<#stop "Error: SENSITIVE_DV_ID_LOOKUP transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>
src_lookup.${referenceTableKey.name}
SENSITIVE_DV_LINK_ID
Link dv_ids
are typically composed from all the attributes used in the link's hub references. This transformation is used to incorporate the looked-up randomized hub key into the link’s dv_id
when the link is referencing a sensitive hub. This transformation can also be used with link status satellites.
<#-- Handle optional prefixing -->
<#macro prefixHandler prefix><#if prefix?? && prefix?length > 0>${prefix}.</#if></#macro>
<#-- Normalize and cast attributes based on datatype -->
<#macro castClause attribute prefix>
<#if attribute.isAdditional?? && attribute.isAdditional>
<#-- Prefix not added here -->
${attribute.attributeName}
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("bool")>
COALESCE(CAST(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS INT) AS STRING), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("char")>
COALESCE(NULLIF(TRIM(<@prefixHandler prefix/>${attribute.attributeName}), ''), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "date">
COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "yyyy-MM-dd"), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "time">
COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "kk:mm:ss.SSSSSS"), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp">
COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "yyyy-MM-dd['T']kk:mm:ss.SSSSSS"), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp_tz">
COALESCE(DATE_FORMAT(<@prefixHandler prefix/>${attribute.attributeName}, "yyyy-MM-dd['T']kk:mm:ss.SSSSSSxxx"), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("array")>
COALESCE(ARRAY_JOIN(<@prefixHandler prefix/>${attribute.attributeName},','), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("struct")>
COALESCE(TO_JSON(<@prefixHandler prefix/>${attribute.attributeName}), '-1')
<#else>
COALESCE(NULLIF(TRIM(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS STRING)), ''), '-1')
</#if>
</#macro>
<#-- Create a composite string from all attributes -->
<#macro sourceAttributeList sourceAttributes prefix>
<@compress single_line=true>
<#if sourceAttributes?size == 0>
null
<#else>
<#list sourceAttributes as attribute>
<@castClause attribute prefix/><#if attribute_has_next> || '~' || </#if>
</#list>
</#if>
</@compress>
</#macro>
<#-- Get DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
<#stop "Error: SENSITIVE_DV_LINK_ID transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>
<#-- Extend source attributes list with lookup key -->
<#assign additionalAttr = {
"attributeName": "src_lookup.${referenceTableKey.name}",
"datatype": referenceTableKey.datatype,
"isAdditional": true
}>
<#assign extendedSourceAttributes = sourceAttributes + [additionalAttr]>
<#-- Output final hash key -->
MD5(UPPER(<@sourceAttributeList sourceAttributes=extendedSourceAttributes prefix=prefix/>))
<#-- Handle optional prefixing -->
<#macro prefixHandler prefix><#if prefix?? && prefix?length > 0>${prefix}.</#if></#macro>
<#-- Normalize and cast attributes based on datatype -->
<#macro castClause attribute prefix>
<#if attribute.isAdditional?? && attribute.isAdditional>
<#-- Prefix not added here -->
${attribute.attributeName}
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("bool")>
COALESCE(CAST(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS INT64) AS STRING), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("char")>
COALESCE(NULLIF(TRIM(<@prefixHandler prefix/>${attribute.attributeName}), ''), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("geography")>
COALESCE(ST_ASTEXT(<@prefixHandler prefix/>${attribute.attributeName}), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("json")>
COALESCE(NULLIF(TO_JSON_STRING(<@prefixHandler prefix/>${attribute.attributeName}), 'null'), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "date">
COALESCE(FORMAT_DATE('%Y-%m-%d',<@prefixHandler prefix/>${attribute.attributeName}), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "time">
COALESCE(FORMAT_TIME('%H:%M:%E6S',<@prefixHandler prefix/>${attribute.attributeName}), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp_tz">
COALESCE(FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%E6S%Ez', <@prefixHandler prefix/>${attribute.attributeName}), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp">
COALESCE(FORMAT_DATETIME('%Y-%m-%dT%H:%M:%E6S', <@prefixHandler prefix/>${attribute.attributeName}), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "decimal">
COALESCE(TRIM(FORMAT('%${attribute.dataPrecision}.${attribute.dataScale}f', <@prefixHandler prefix/>${attribute.attributeName})), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("array")>
COALESCE(NULLIF(ARRAY_TO_STRING(<@prefixHandler prefix/>${attribute.attributeName},','),''), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("struct")>
COALESCE(NULLIF(TO_JSON_STRING(TO_JSON(<@prefixHandler prefix/>${attribute.attributeName})), 'null'), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("variant")>
COALESCE(NULLIF(TO_JSON_STRING(<@prefixHandler prefix/>${attribute.attributeName}), 'null'), '-1')
<#else>
COALESCE(NULLIF(TRIM(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS STRING)), ''), '-1')
</#if>
</#macro>
<#-- Create a composite string from all attributes -->
<#macro sourceAttributeList sourceAttributes prefix>
<@compress single_line=true>
<#if sourceAttributes?size == 0>
null
<#else>
CONCAT(<#list sourceAttributes as attribute><@castClause attribute prefix/><#if attribute_has_next>,'~',</#if></#list>)
</#if>
</@compress>
</#macro>
<#-- Get DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
<#stop "Error: SENSITIVE_DV_LINK_ID transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>
<#-- Extend source attributes list with lookup key -->
<#assign additionalAttr = {
"attributeName": "src_lookup.${referenceTableKey.name}",
"datatype": referenceTableKey.datatype,
"isAdditional": true
}>
<#assign extendedSourceAttributes = sourceAttributes + [additionalAttr]>
<#-- Output final hash key -->
TO_HEX(MD5(UPPER(<@sourceAttributeList sourceAttributes=extendedSourceAttributes prefix=prefix/>)))
<#-- Handle optional prefixing -->
<#macro prefixHandler prefix><#if prefix?? && prefix?length > 0>${prefix}.</#if></#macro>
<#-- Normalize and cast attributes based on datatype -->
<#macro castClause attribute prefix>
<#if attribute.isAdditional?? && attribute.isAdditional>
<#-- Prefix not added here -->
${attribute.attributeName}
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("bool")>
CASE WHEN <@prefixHandler prefix/>${attribute.attributeName} IS NULL THEN '-1' ELSE IFF(<@prefixHandler prefix/>${attribute.attributeName}, '1','0') END
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("char")>
NVL(NULLIF(TRIM(<@prefixHandler prefix/>${attribute.attributeName}), ''), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "date">
NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'YYYY-MM-DD'), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("geography")>
CASE WHEN <@prefixHandler prefix/>${attribute.attributeName} IS NULL THEN '-1' ELSE st_astext(<@prefixHandler prefix/>${attribute.attributeName}) END
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "time">
NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'HH24:MI:SS.FF6'), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp">
NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'YYYY-MM-DDTHH24:MI:SS.FF6'), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case == "timestamp_tz">
NVL(TO_VARCHAR(<@prefixHandler prefix/>${attribute.attributeName}, 'YYYY-MM-DDTHH24:MI:SS.FF6TZH:TZM'), '-1')
<#elseif attribute.datatype?? && attribute.datatype?lower_case?contains("array")>
NVL(ARRAY_TO_STRING(<@prefixHandler prefix/>${attribute.attributeName},','), '-1')
<#else>
NVL(NULLIF(TRIM(CAST(<@prefixHandler prefix/>${attribute.attributeName} AS VARCHAR)), ''), '-1')
</#if>
</#macro>
<#-- Create a composite string from all attributes -->
<#macro sourceAttributeList sourceAttributes prefix>
<@compress single_line=true>
<#if sourceAttributes?size == 0>
null
<#else>
<#list sourceAttributes as attribute>
<@castClause attribute prefix/><#if attribute_has_next> || '~' || </#if>
</#list>
</#if>
</@compress>
</#macro>
<#-- Get DV_HASHKEY from the second entity mapping -->
<#if model.load.sources?size != 2>
<#stop "Error: SENSITIVE_DV_LINK_ID transformation requires two entity mappings.">
</#if>
<#assign referenceTable = loads.getSourceEntity(2)>
<#assign referenceTableKey = loads.getAttributeForType(referenceTable.attributes, "DV_HASHKEY")>
<#-- Extend source attributes list with lookup key -->
<#assign additionalAttr = {
"attributeName": "src_lookup.${referenceTableKey.name}",
"datatype": referenceTableKey.datatype,
"isAdditional": true
}>
<#assign extendedSourceAttributes = sourceAttributes + [additionalAttr]>
<#-- Output final hash key -->
MD5(UPPER(<@sourceAttributeList sourceAttributes=extendedSourceAttributes prefix=prefix/>))
FTL templates
The following FTL templates are defined in CONFIG_LOAD_TEMPLATES:
sensitive_dv_hub
<#assign target = model.load.target>
<#-- Target dv_id -->
<#assign targetDvId = target.attributes?filter(attr -> attr.attributeType = "DV_HASHKEY")>
<#-- Target attributes without randomized dv_id -->
<#assign targetAttributesWithoutDvId = target.attributes?filter(attr -> attr.attributeType != "DV_HASHKEY")>
<#-- Stage entity #-->
<#assign stage = loads.getSourceEntity()>
<#-- Hub attributes #-->
<#assign targetBk = loads.getAttributeForType(target.attributes, "BUSINESS_KEY")>
<#assign targetLt = loads.getAttributeForType(target.attributes, "DV_LOAD_TIME")>
<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>
<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
<#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
<#assign whereClauses += [optWhere]>
</#if>
INSERT INTO ${target.schemaName}.${target.name} (
<@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT
<@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
SELECT
<@sql.generateTargetAttributesTransformList attributes=targetDvId indentation=2/>
,<@sql.generateEntityAttributeList attributes=targetAttributesWithoutDvId indentation=2/>
FROM (
SELECT DISTINCT
<@sql.generateTargetAttributesTransformList attributes=targetAttributesWithoutDvId prefix="src_entity" indentation=3/>
FROM ${stage.schemaName}.${stage.name} src_entity
<#-- Use whereClauses if it has elements -->
<#if whereClauses?? && whereClauses?has_content>
WHERE
${whereClauses?join(" AND ")}
</#if>
)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
${target.schemaName}.${target.name} trg
WHERE
trg.${targetBk.name} = src.${targetBk.name}
)
sensitive_dv_satellite
<#assign target = model.load.target>
<#-- Target sat dv_id, dv_datahash and dv_load_time #-->
<#assign targetKey = loads.getAttributeForType(target.attributes, "DV_REFERENCING_HASHKEY")>
<#assign targetHash = loads.getAttributeForType(target.attributes,"DV_DATAHASH")>
<#assign targetLoadTime = loads.getAttributeForType(target.attributes, "DV_LOAD_TIME")>
<#-- Stage & hub entities #-->
<#assign stage = loads.getSourceEntity()>
<#assign hub = loads.getSourceEntity(2)>
<#-- Changes transformation for the sensitive key attribute to "BUSINESS_KEY" so that it can be used in the join #-->
<#assign changeToBusinessKey = { targetKey.name: "BUSINESS_KEY" }>
<#assign generatedBusinessKey = loads.getSingleAttributeTransformation(targetKey,"src_entity",changeToBusinessKey)>
<#assign lookupBusinessKey = loads.getAttributeForType(hub.attributes, "BUSINESS_KEY")>
<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>
<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
<#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
<#assign whereClauses += [optWhere]>
</#if>
INSERT INTO ${target.schemaName}.${target.name} (
<@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
<@sql.generateEntityAttributeList attributes=target.attributes />
FROM (
SELECT
<@sql.generateTargetAttributesTransformList attributes=target.attributes indentation=2 />
FROM ${stage.schemaName}.${stage.name} src_entity
JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
<#-- Use whereClauses if it has elements -->
<#if whereClauses?? && whereClauses?has_content>
WHERE
${whereClauses?join(" AND ")}
</#if>
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.${targetKey.name}
,t1.${targetHash.name}
FROM (
SELECT
s.${targetKey.name}
,s.${targetHash.name}
, ROW_NUMBER()
OVER (
PARTITION BY
s.${targetKey.name}
ORDER BY
s.${targetLoadTime.name} DESC
)
AS
dv_load_time_last
FROM
${target.schemaName}.${target.name} s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.${targetHash.name} = src.${targetHash.name}
AND
trg.${targetKey.name} = src.${targetKey.name}
)
sensitive_dv_link
<#assign target = model.load.target>
<#-- Sensitive hub referencing attribute #-->
<#assign foundAttributes = []>
<#list target.attributes as attribute>
<#if attribute.transformationFormulaType?? && attribute.transformationFormulaType = "SENSITIVE_DV_ID_LOOKUP">
<#assign foundAttributes += [attribute]>
</#if>
</#list>
<#assign hubReference = foundAttributes[0]>
<#-- Link dv_id attribute #-->
<#assign targetKey = loads.getAttributeForType(target.attributes,"DV_HASHKEY")>
<#-- Stage & hub entities #-->
<#assign stage = loads.getSourceEntity()>
<#assign hub = loads.getSourceEntity(2)>
<#-- Changes transformation for the sensitive key attribute to "BUSINESS_KEY" so that it can be used in the join #-->
<#assign changeToBusinessKey = { hubReference.name: "BUSINESS_KEY" }>
<#assign generatedBusinessKey = loads.getSingleAttributeTransformation(hubReference,"src_entity",changeToBusinessKey)>
<#-- Hub sensitive business key attribute #-->
<#assign lookupBusinessKey = loads.getAttributeForType(hub.attributes, "BUSINESS_KEY")>
<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>
<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
<#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
<#assign whereClauses += [optWhere]>
</#if>
INSERT INTO ${target.schemaName}.${target.name} (
<@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
<@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
SELECT
<@sql.generateTargetAttributesTransformList attributes=target.attributes prefix="src_entity" indentation=2/>
FROM ${stage.schemaName}.${stage.name} src_entity
JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
<#-- Use whereClauses if it has elements -->
<#if whereClauses?? && whereClauses?has_content>
WHERE
${whereClauses?join(" AND ")}
</#if>
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
${target.schemaName}.${target.name} trg
WHERE
trg.${targetKey.name} = src.${targetKey.name}
)
sensitive_dv_status_satellite
<#assign target = model.load.target>
<#-- Sensitive hub referencing attribute #-->
<#assign foundAttributes = []>
<#list target.attributes as attribute>
<#if attribute.transformationFormulaType?? && attribute.transformationFormulaType = "SENSITIVE_DV_ID_LOOKUP">
<#assign foundAttributes += [attribute]>
</#if>
</#list>
<#assign hubReference = foundAttributes[0]>
<#-- Target attributes without load time, run id, status -->
<#assign excludeTypes = ["DV_LOAD_TIME", "STATUS", "RUN_ID"]>
<#assign targetAttributesExcluded_0 = target.attributes?filter(attr ->
!(excludeTypes?seq_contains(attr.attributeType))
)>
<#-- Target attributes without load time -->
<#assign excludeTypes = ["DV_LOAD_TIME"]>
<#assign targetAttributesExcluded_1 = target.attributes?filter(attr ->
!(excludeTypes?seq_contains(attr.attributeType))
)>
<#-- Target technical attributes #-->
<#assign targetKey = loads.getAttributeForType(target.attributes, "DV_REFERENCING_HASHKEY")>
<#assign targetDrivingKey = loads.getAttributeForType(target.attributes,"DV_DRIVING_KEY")>
<#assign targetLoadTime = loads.getAttributeForType(target.attributes, "DV_LOAD_TIME")>
<#assign targetStatus = loads.getAttributeForType(target.attributes, "STATUS")>
<#assign targetRunId = loads.getAttributeForType(target.attributes, "RUN_ID")>
<#-- Load time transformation #-->
<#assign loadTimeTransform = loads.getSingleAttributeTransformation(targetLoadTime)>
<#-- Stage & hub entities #-->
<#assign stage = loads.getSourceEntity()>
<#assign hub = loads.getSourceEntity(2)>
<#-- Changes transformation for the sensitive key attribute to "BUSINESS_KEY" so that it can be used in the join #-->
<#assign changeToBusinessKey = { hubReference.name: "REFERENCE_BUSINESS_KEY" }>
<#assign generatedBusinessKey = loads.getSingleAttributeTransformation(hubReference,"src_entity",changeToBusinessKey)>
<#-- Hub sensitive business key attribute #-->
<#assign lookupBusinessKey = loads.getAttributeForType(hub.attributes, "BUSINESS_KEY")>
<#-- Load options #-->
<#assign optUseRunIds = loads.getLoadOptionOrDefault("OPT_USE_RUN_IDS", "")>
<#assign optWhere = loads.getLoadOptionOrDefault("OPT_WHERE", "")>
<#assign optDeltaExtract = loads.getLoadOptionOrDefault("OPT_DELTA_EXTRACT", "")>
<#-- WHERE clause depending on load options #-->
<#assign whereClauses = []>
<#if optUseRunIds?? && optUseRunIds?has_content && optUseRunIds == "true">
<#assign whereClauses += ["src_entity.<sourcerunidattr> IN (<loadablerunids>)"]>
</#if>
<#if optWhere?? && optWhere?has_content>
<#assign whereClauses += [optWhere]>
</#if>
<#-- Delta or full extract, default delta #-->
<#if optDeltaExtract?? && optDeltaExtract?has_content && optDeltaExtract == "false">
<#assign isDelta = false>
<#else>
<#assign isDelta = true>
</#if>
<#-- Load step outputs per step_type #-->
<#if step_type?? && step_type?has_content>
<#if step_type == "gatekeeper">
SELECT
COUNT(1) AS gatekeeper
FROM
${stage.schemaName}.${stage.name} src_entity
<#-- Use whereClauses if it has elements -->
<#if whereClauses?? && whereClauses?has_content>
WHERE
${whereClauses?join(" AND ")}
</#if>
HAVING
COUNT(1) > 0
<#elseif step_type == "drop_temp">
DROP TABLE IF EXISTS ${target.schemaName}.Z_TMP_${target.name}
<#elseif step_type == "temp_1">
<#-- Using temporary table when supported -->
<#assign dbms = ["SNOWFLAKE"]>
<#if dbms?seq_contains(model.dbmsProduct)>
CREATE TEMPORARY TABLE
<#else>
CREATE TABLE
</#if>
${target.schemaName}.Z_TMP_${target.name}
AS
SELECT DISTINCT
<@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
SELECT
<@sql.generateTargetAttributesTransformList attributes=target.attributes prefix="src_entity" indentation=2/>
FROM ${stage.schemaName}.${stage.name} src_entity
JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
<#-- Use whereClauses if it has elements -->
<#if whereClauses?? && whereClauses?has_content>
WHERE
${whereClauses?join(" AND ")}
</#if>
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.${targetKey.name}
,t1.${targetStatus.name}
FROM (
SELECT
s.${targetKey.name}
,s.${targetStatus.name}
,ROW_NUMBER()
OVER (
PARTITION BY
s.${targetDrivingKey.name}
ORDER BY
s.${targetLoadTime.name} DESC
) AS dv_load_time_last
FROM
${target.schemaName}.${target.name} s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.${targetKey.name} = src.${targetKey.name}
AND trg.${targetStatus.name} = 1
)
<#elseif step_type == "insert_0">
INSERT INTO ${target.schemaName}.${target.name} (
<@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
<@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
SELECT
<@sql.generateEntityAttributeList attributes=target.attributes prefix="trg"/>
FROM (
SELECT
<@sql.generateEntityAttributeList attributes=targetAttributesExcluded_0 indentation=3/>
, 0 AS ${targetStatus.name}
, <@sql.generateTargetAttributesTransformList attributes=[targetLoadTime, targetRunId] indentation=3/>
FROM
${target.schemaName}.${target.name}_C
) trg
<#if isDelta>
JOIN
${target.schemaName}.Z_TMP_${target.name} src ON (
trg.${targetStatus.name} = 1
AND trg.${targetDrivingKey.name} = src.${targetDrivingKey.name}
)
<#else>
LEFT JOIN (
SELECT DISTINCT
<@sql.generateTargetAttributesTransformList attributes=[targetKey] prefix="src_entity" indentation=3/>
FROM ${stage.schemaName}.${stage.name} src_entity
JOIN ${hub.schemaName}.${hub.name} src_lookup ON (${generatedBusinessKey} = src_lookup.${lookupBusinessKey.name})
<#-- Use whereClauses if it has elements -->
<#if whereClauses?? && whereClauses?has_content>
WHERE
${whereClauses?join(" AND ")}
</#if>
) src
ON (
trg.dv_id = src.dv_id
)
WHERE
trg.${targetStatus.name} = 1
AND src.${targetKey.name} IS NULL
</#if>
)
<#elseif step_type == "insert_1">
INSERT INTO ${target.schemaName}.${target.name} (
<@sql.generateEntityAttributeList attributes=target.attributes/>
)
SELECT DISTINCT
<@sql.generateEntityAttributeList attributes=target.attributes/>
FROM (
SELECT
<@sql.generateEntityAttributeList attributes=targetAttributesExcluded_1 indentation=2/>
, ${loadTimeTransform} AS ${targetLoadTime.name}
FROM
${target.schemaName}.Z_TMP_${target.name}
)
<#else>
<#stop "Error: Invalid step_type value '${step_type}'. Allowed values are: gatekeeper, drop_temp, temp_1, insert_0, insert_1.">
</#if>
<#else>
<#stop "Error: Parameter 'step_type' is not set or is empty.">
</#if>
The following technical attribute types are referenced by the templates (depending on the entity type):
DV_HASHKEY
DV_REFERENCING_HASHKEY
BUSINESS_KEY
DV_LOAD_TIME
DV_DATAHASH
DV_DRIVING_KEY
STATUS
RUN_ID
In addition to the transformation types created above, the following transformation types are referenced by the templates:
BUSINESS_KEY
The following load options are supported:
OPT_DELTA_EXTRACT (status satellite)
Additionally, load options that do not affect the load logic are supported if applicable.
Entity types
The examples use the default entity types:
HUB
LINK
SAT
S_SAT
Optionally, custom entity types can be configured, for example, for sensitive hubs and satellites containing sensitive attributes. This allows for the use of custom naming conventions, assignment to a separate default schema, and application of other default configurations tailored to entities that handle personally identifiable information (PII).
Examples
Sensitive hub
In this example, we will create a sensitive hub for customers, which will store identifiable customer keys:
Entity type | HUB |
---|---|
Entity name | H_CUSTOMER |
Schema | sdv |
Note that the schema is set to sdv
, which differs from the default rdv
(raw data vault) schema. This separation helps isolate sensitive information and, for example, simplifies the management of access rights.
A load and an entity mapping from staging is created as follows:
STG_CUSTOMER | Transformation | H_CUSTOMER |
---|---|---|
- | SENSITIVE_DV_GENERATED_ID | dv_id |
- | CURRENT_TS | dv_load_time |
- | RUN_ID | dv_run_id |
stg_source_system | - | dv_source_system |
stg_source_entity | - | dv_source_entity |
- | PACKAGE_VERSION | dv_package_version |
- | LOAD_NAME | dv_load_name |
customerid | BUSINESS_KEY | business_key |
An FTL template reference is added as an OVERRIDE load step:
<ftltemplate:sensitive_dv_hub>
Load option:
OPT_USE_RUN_IDS = true
Generated load logic:
INSERT INTO sdv.H_CUSTOMER (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
)
SELECT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
FROM (
SELECT
MD5(UUID()) AS dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
FROM (
SELECT DISTINCT
CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_h_customer_from_stg_customer_01' AS dv_load_name
, UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) AS business_key
FROM staging.STG_CUSTOMER src_entity
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
sdv.H_CUSTOMER trg
WHERE
trg.business_key = src.business_key
);
INSERT INTO sdv.H_CUSTOMER (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
)
SELECT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
FROM (
SELECT
TO_HEX(MD5(GENERATE_UUID())) AS dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
FROM (
SELECT DISTINCT
CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_h_customer_from_stg_customer_01' AS dv_load_name
, UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) AS business_key
FROM staging.STG_CUSTOMER src_entity
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
sdv.H_CUSTOMER trg
WHERE
trg.business_key = src.business_key
);
INSERT INTO sdv.H_CUSTOMER (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
)
SELECT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
FROM (
SELECT
MD5(UUID_STRING()) AS dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, business_key
FROM (
SELECT DISTINCT
CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_h_customer_from_stg_customer_01' AS dv_load_name
, UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) AS business_key
FROM staging.STG_CUSTOMER src_entity
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
sdv.H_CUSTOMER trg
WHERE
trg.business_key = src.business_key
);
Satellite for a sensitive hub
In this example, we will create a satellite for the sensitive customer hub to store sensitive customer attributes:
Entity type | SAT |
---|---|
Entity name | S_CUSTOMER |
Schema | sdv |
A good practice is to separate sensitive and non-sensitive attributes into different satellites, placing sensitive data in the sdv
schema and non-sensitive data in the standard rdv
schema. This separation supports clearer data governance and simplifies tasks such as planning the erasure of identifiable information. However, it is not always straightforward to determine which attributes are sensitive or whether combinations of attributes could lead to re-identification. These decisions should always be made carefully and in collaboration with subject matter experts.
A load and two entity mappings are added:
Source entity | Position | Type |
---|---|---|
STG_CUSTOMER | 1 | SOURCE (Driving Run ID Logic) |
H_CUSTOMER | 2 | SOURCE |
Attribute mapping from staging:
STG_CUSTOMER | Transformation | S_CUSTOMER |
---|---|---|
customerid | SENSITIVE_DV_ID_LOOKUP | dv_id |
- | CURRENT_TS | dv_load_time |
- | RUN_ID | dv_run_id |
stg_source_system | - | dv_source_system |
stg_source_entity | - | dv_source_entity |
- | PACKAGE_VERSION | dv_package_version |
- | LOAD_NAME | dv_load_name |
| HASH_DIFF | dv_datahash |
first_name | - | first_name |
last_name | - | last_name |
postal_address | - | postal_address |
phone_number | - | phone_number |
An FTL template reference is added as an OVERRIDE load step:
<ftltemplate:sensitive_dv_satellite>
Load options:
Generated load logic:
INSERT INTO sdv.S_CUSTOMER (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_datahash
, first_name
, last_name
, postal_address
, phone_number
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_datahash
, first_name
, last_name
, postal_address
, phone_number
FROM (
SELECT
src_lookup.dv_id AS dv_id
, CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
, stg_source_system AS dv_source_system
, stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_s_customer_from_stg_customer_01' AS dv_load_name
, MD5(COALESCE(first_name, '-1') || '~' || COALESCE(last_name, '-1') || '~' || COALESCE(postal_address, '-1') || '~' || COALESCE(phone_number, '-1')) AS dv_datahash
, first_name AS first_name
, last_name AS last_name
, postal_address AS postal_address
, phone_number AS phone_number
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.dv_datahash
FROM (
SELECT
s.dv_id
,s.dv_datahash
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_id
ORDER BY
s.dv_load_time DESC
)
AS
dv_load_time_last
FROM
sdv.S_CUSTOMER s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_datahash = src.dv_datahash
AND
trg.dv_id = src.dv_id
);
INSERT INTO sdv.S_CUSTOMER (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_datahash
, first_name
, last_name
, postal_address
, phone_number
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_datahash
, first_name
, last_name
, postal_address
, phone_number
FROM (
SELECT
src_lookup.dv_id AS dv_id
, CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
, stg_source_system AS dv_source_system
, stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_s_customer_from_stg_customer' AS dv_load_name
, TO_HEX(MD5(CONCAT(COALESCE(first_name, '-1') ,'~',COALESCE(last_name, '-1') ,'~',COALESCE(postal_address, '-1') ,'~',COALESCE(phone_number, '-1')))) AS dv_datahash
, first_name AS first_name
, last_name AS last_name
, postal_address AS postal_address
, phone_number AS phone_number
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.dv_datahash
FROM (
SELECT
s.dv_id
,s.dv_datahash
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_id
ORDER BY
s.dv_load_time DESC
)
AS
dv_load_time_last
FROM
sdv.S_CUSTOMER s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_datahash = src.dv_datahash
AND
trg.dv_id = src.dv_id
);
INSERT INTO sdv.S_CUSTOMER (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_datahash
, first_name
, last_name
, postal_address
, phone_number
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_datahash
, first_name
, last_name
, postal_address
, phone_number
FROM (
SELECT
src_lookup.dv_id AS dv_id
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
, <targetrunid> AS dv_run_id
, stg_source_system AS dv_source_system
, stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_s_customer_from_stg_customer_01' AS dv_load_name
, MD5(NVL(first_name, '-1') || '~' || NVL(last_name, '-1') || '~' || NVL(postal_address, '-1') || '~' || NVL(phone_number, '-1')) AS dv_datahash
, first_name AS first_name
, last_name AS last_name
, postal_address AS postal_address
, phone_number AS phone_number
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.dv_datahash
FROM (
SELECT
s.dv_id
,s.dv_datahash
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_id
ORDER BY
s.dv_load_time DESC
)
AS
dv_load_time_last
FROM
sdv.S_CUSTOMER s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_datahash = src.dv_datahash
AND
trg.dv_id = src.dv_id
);
Link
In this example, we will create a link that references a sensitive hub:
Entity type | LINK |
---|---|
Entity name | L_CUSTOMER_STORE_TERRITORY |
Schema | rdv |
Note that unlike sensitive hubs & satellites, the link is assigned to the rdv
schema since it does not contain any sensitive attributes.
A load and two entity mappings are added:
Source entity | Position | Type |
---|---|---|
STG_CUSTOMER | 1 | SOURCE (Driving Run ID Logic) |
H_CUSTOMER | 2 | SOURCE |
Attribute mapping from staging:
STG_CUSTOMER | Transformation | L_CUSTOMER_STORE_TERRITORY |
---|---|---|
| SENSITIVE_DV_LINK_ID | dv_id |
- | CURRENT_TS | dv_load_time |
- | RUN_ID | dv_run_id |
stg_source_system | - | dv_source_system |
stg_source_entity | - | dv_source_entity |
- | PACKAGE_VERSION | dv_package_version |
- | LOAD_NAME | dv_load_name |
customerid | SENSITIVE_KEY_LOOKUP | dv_id_customer |
storeid | HASH_KEY | dv_id_store |
territoryid | HASH_KEY | dv_id_territory |
Note that the sensitive customerid
should not be mapped to the link dv_id
. Instead, the randomized sensitive hub dv_id
is joined and appended to the link dv_id
by the template & transformation.
An FTL template reference is added as an OVERRIDE load step:
<ftltemplate:sensitive_dv_link>
Load options:
OPT_USE_RUN_IDS = true
Generated load logic:
INSERT INTO rdv.L_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_id_customer
, dv_id_store
, dv_id_territory
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_id_customer
, dv_id_store
, dv_id_territory
FROM (
SELECT
MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
, CURRENT_TIMESTAMP() AS dv_load_time
, <timemillis> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_l_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_id_customer
, MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1'))) AS dv_id_store
, MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1'))) AS dv_id_territory
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
rdv.L_CUSTOMER_STORE_TERRITORY trg
WHERE
trg.dv_id = src.dv_id
);
INSERT INTO rdv.L_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_id_customer
, dv_id_store
, dv_id_territory
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_id_customer
, dv_id_store
, dv_id_territory
FROM (
SELECT
TO_HEX(MD5(UPPER(CONCAT(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id)))) AS dv_id
, CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_l_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_id_customer
, TO_HEX(MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1')))) AS dv_id_store
, TO_HEX(MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1')))) AS dv_id_territory
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (IFNULL(UPPER(CAST(customerid AS STRING)), '-1') = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
rdv.L_CUSTOMER_STORE_TERRITORY trg
WHERE
trg.dv_id = src.dv_id
);
INSERT INTO rdv.L_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_id_customer
, dv_id_store
, dv_id_territory
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_id_customer
, dv_id_store
, dv_id_territory
FROM (
SELECT
MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_l_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_id_customer
, MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1'))) AS dv_id_store
, MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1'))) AS dv_id_territory
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM
rdv.L_CUSTOMER_STORE_TERRITORY trg
WHERE
trg.dv_id = src.dv_id
);
Status satellite for a link
In this example, we will create a status satellite for a link that references a sensitive hub:
Entity type | S_SAT |
---|---|
Entity name | STS_CUSTOMER_STORE_TERRITORY |
Schema | bdv |
A load and two entity mappings are added:
Source entity | Position | Type |
---|---|---|
STG_CUSTOMER | 1 | SOURCE (Driving Run ID Logic) |
H_CUSTOMER | 2 | SOURCE |
Attribute mapping from staging:
STG_CUSTOMER | Transformation | L_CUSTOMER_STORE_TERRITORY |
---|---|---|
| SENSITIVE_DV_LINK_ID | dv_id |
- | CURRENT_TS | dv_load_time |
- | RUN_ID | dv_run_id |
stg_source_system | - | dv_source_system |
stg_source_entity | - | dv_source_entity |
- | PACKAGE_VERSION | dv_package_version |
- | LOAD_NAME | dv_load_name |
customerid | SENSITIVE_KEY_LOOKUP | dv_driving_key |
- | DV_STATUS | status |
In this example, the sensitive customer id is set as the driving key. See Status satellite for more details about choosing the correct driving key.
Like in the link load, the sensitive customerid
should not be mapped to the status satellite dv_id
. Instead, the randomized sensitive hub dv_id
is joined and appended to the status satellite dv_id
by the template & transformation.
Load option:
OPT_USE_RUN_IDS = true
Status satellite load logic has multiple steps and it varies by extract type delta vs. full. When using the FTL template sensitive_dv_status_satellite
in a load step, you must specify the step_type
parameter, which controls each phase of the load process. Valid parameter values are:
gatekeeper
drop_temp
temp_1
insert_0
insert_1
See the examples below for full and delta extract scenarios using this template.
Delta extract
Load option:
OPT_DELTA_EXTRACT = true (or unset, true is default)
FTL template references added for each step as OVERRIDE load steps:
# | Load step name | Logic |
---|---|---|
1 | drop_temp_start |
CODE
|
2 | create_temp_table_for_status_1_keys |
CODE
|
3 | insert_status_0_keys |
CODE
|
4 | insert_status_1_keys |
CODE
|
5 | drop_temp_end |
CODE
|
Generated load logic:
/* 1. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 2. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TABLE
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
, current_timestamp() AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_driving_key
, 1 AS status
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.status
FROM (
SELECT
s.dv_id
,s.status
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_driving_key
ORDER BY
s.dv_load_time DESC
) AS dv_load_time_last
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_id = src.dv_id
AND trg.status = 1
);
/* 3. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
trg.dv_id
, trg.dv_load_time
, trg.dv_run_id
, trg.dv_source_system
, trg.dv_source_entity
, trg.dv_package_version
, trg.dv_load_name
, trg.dv_driving_key
, trg.status
FROM (
SELECT
dv_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, 0 AS status
, current_timestamp() AS dv_load_time
, <targetrunid> AS dv_run_id
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY_C
) trg
JOIN
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY src ON (
trg.status = 1
AND trg.dv_driving_key = src.dv_driving_key
)
);
/* 4. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
dv_id
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
, current_timestamp() AS dv_load_time
FROM
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);
/* 5. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 1. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 2. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
TO_HEX(MD5(UPPER(CONCAT( COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id )))) AS dv_id
, CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_driving_key
, 1 AS status
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.status
FROM (
SELECT
s.dv_id
,s.status
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_driving_key
ORDER BY
s.dv_load_time DESC
) AS dv_load_time_last
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_id = src.dv_id
AND trg.status = 1
);
/* 3. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
trg.dv_id
, trg.dv_load_time
, trg.dv_run_id
, trg.dv_source_system
, trg.dv_source_entity
, trg.dv_package_version
, trg.dv_load_name
, trg.dv_driving_key
, trg.status
FROM (
SELECT
dv_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, 0 AS status
, CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY_C
) trg
JOIN
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY src ON (
trg.status = 1
AND trg.dv_driving_key = src.dv_driving_key
)
);
/* 4. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
dv_id
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
, CURRENT_TIMESTAMP() AS dv_load_time
FROM
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);
/* 5. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 1. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 2. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_driving_key
, 1 AS status
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.status
FROM (
SELECT
s.dv_id
,s.status
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_driving_key
ORDER BY
s.dv_load_time DESC
) AS dv_load_time_last
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_id = src.dv_id
AND trg.status = 1
);
/* 3. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
trg.dv_id
, trg.dv_load_time
, trg.dv_run_id
, trg.dv_source_system
, trg.dv_source_entity
, trg.dv_package_version
, trg.dv_load_name
, trg.dv_driving_key
, trg.status
FROM (
SELECT
dv_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, 0 AS status
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
, <targetrunid> AS dv_run_id
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY_C
) trg
JOIN
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY src ON (
trg.status = 1
AND trg.dv_driving_key = src.dv_driving_key
)
);
/* 4. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
dv_id
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
FROM
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);
/* 5. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
Full extract
Load option:
OPT_DELTA_EXTRACT = false
FTL template references added for each step. Note that the first step is a GATEKEEPER step, the rest are OVERRIDE steps:
# | Load step name | Logic |
---|---|---|
1 | gatekeeper_staging_data_available |
CODE
|
2 | drop_temp_start |
CODE
|
3 | create_temp_table_for_status_1_keys |
CODE
|
4 | insert_status_0_keys |
CODE
|
5 | insert_status_1_keys |
CODE
|
6 | drop_temp_end |
CODE
|
Generated load logic:
/* 1. gatekeeper_staging_data_available (GATEKEEPER - SQL) */
SELECT
COUNT(1) AS gatekeeper
FROM
staging.STG_CUSTOMER src_entity
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
HAVING
COUNT(1) > 0;
/* 2. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 3. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TABLE
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
, current_timestamp() AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_driving_key
, 1 AS status
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.status
FROM (
SELECT
s.dv_id
,s.status
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_driving_key
ORDER BY
s.dv_load_time DESC
) AS dv_load_time_last
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_id = src.dv_id
AND trg.status = 1
);
/* 4. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
trg.dv_id
, trg.dv_load_time
, trg.dv_run_id
, trg.dv_source_system
, trg.dv_source_entity
, trg.dv_package_version
, trg.dv_load_name
, trg.dv_driving_key
, trg.status
FROM (
SELECT
dv_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, 0 AS status
, current_timestamp() AS dv_load_time
, <targetrunid> AS dv_run_id
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY_C
) trg
LEFT JOIN (
SELECT DISTINCT
MD5(UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') || '~' || COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
ON (
trg.dv_id = src.dv_id
)
WHERE
trg.status = 1
AND src.dv_id IS NULL
);
/* 5. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
dv_id
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
, current_timestamp() AS dv_load_time
FROM
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);
/* 6. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 1. gatekeeper_staging_data_available (GATEKEEPER - SQL) */
SELECT
COUNT(1) AS gatekeeper
FROM
staging.STG_CUSTOMER src_entity
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
HAVING
COUNT(1) > 0;
/* 2. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 3. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
TO_HEX(MD5(UPPER(CONCAT( COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id )))) AS dv_id
, CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_sts_customer_store_territory_from_stg_customer_01' AS dv_load_name
, src_lookup.dv_id AS dv_driving_key
, 1 AS status
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.status
FROM (
SELECT
s.dv_id
,s.status
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_driving_key
ORDER BY
s.dv_load_time DESC
) AS dv_load_time_last
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_id = src.dv_id
AND trg.status = 1
);
/* 4. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
trg.dv_id
, trg.dv_load_time
, trg.dv_run_id
, trg.dv_source_system
, trg.dv_source_entity
, trg.dv_package_version
, trg.dv_load_name
, trg.dv_driving_key
, trg.status
FROM (
SELECT
dv_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, 0 AS status
, CURRENT_TIMESTAMP() AS dv_load_time
, <targetrunid> AS dv_run_id
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY_C
) trg
LEFT JOIN (
SELECT DISTINCT
TO_HEX(MD5(UPPER(CONCAT(COALESCE(NULLIF(TRIM(CAST(src_entity.storeid AS STRING)), ''), '-1') ,'~', COALESCE(NULLIF(TRIM(CAST(src_entity.territoryid AS STRING)), ''), '-1') ,'~', src_lookup.dv_id)))) AS dv_id
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(COALESCE(NULLIF(TRIM(CAST(src_entity.customerid AS STRING)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
ON (
trg.dv_id = src.dv_id
)
WHERE
trg.status = 1
AND src.dv_id IS NULL
);
/* 5. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
dv_id
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
, CURRENT_TIMESTAMP() AS dv_load_time
FROM
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);
/* 6. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 1. gatekeeper_staging_data_available (GATEKEEPER - SQL) */
SELECT
COUNT(1) AS gatekeeper
FROM
staging.STG_CUSTOMER src_entity
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
HAVING
COUNT(1) > 0;
/* 2. drop_temp_start (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;
/* 3. create_temp_table_for_status_1_keys (OVERRIDE - SQL) */
CREATE TEMPORARY TABLE
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
AS
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
, <targetrunid> AS dv_run_id
, src_entity.stg_source_system AS dv_source_system
, src_entity.stg_source_entity AS dv_source_entity
, <packageversion> AS dv_package_version
, 'load_sts_customer_store_territory_from_stg_customer' AS dv_load_name
, src_lookup.dv_id AS dv_driving_key
, 1 AS status
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
WHERE
NOT EXISTS (
SELECT
1
FROM (
SELECT
t1.dv_id
,t1.status
FROM (
SELECT
s.dv_id
,s.status
,ROW_NUMBER()
OVER (
PARTITION BY
s.dv_driving_key
ORDER BY
s.dv_load_time DESC
) AS dv_load_time_last
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY s
) t1
WHERE
t1.dv_load_time_last = 1
) trg
WHERE
trg.dv_id = src.dv_id
AND trg.status = 1
);
/* 4. insert_status_0_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
trg.dv_id
, trg.dv_load_time
, trg.dv_run_id
, trg.dv_source_system
, trg.dv_source_entity
, trg.dv_package_version
, trg.dv_load_name
, trg.dv_driving_key
, trg.status
FROM (
SELECT
dv_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, 0 AS status
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
, <targetrunid> AS dv_run_id
FROM
bdv.STS_CUSTOMER_STORE_TERRITORY_C
) trg
LEFT JOIN (
SELECT DISTINCT
MD5(UPPER(NVL(NULLIF(TRIM(CAST(src_entity.storeid AS VARCHAR)), ''), '-1') || '~' || NVL(NULLIF(TRIM(CAST(src_entity.territoryid AS VARCHAR)), ''), '-1') || '~' || src_lookup.dv_id)) AS dv_id
FROM staging.STG_CUSTOMER src_entity
JOIN sdv.H_CUSTOMER src_lookup ON (UPPER(NVL(NULLIF(TRIM(CAST(src_entity.customerid AS VARCHAR)), ''), '-1')) = src_lookup.business_key)
WHERE
src_entity.stg_batch_id IN (<loadablerunids>)
) src
ON (
trg.dv_id = src.dv_id
)
WHERE
trg.status = 1
AND src.dv_id IS NULL
);
/* 5. insert_status_1_keys (OVERRIDE - SQL) */
INSERT INTO bdv.STS_CUSTOMER_STORE_TERRITORY (
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
)
SELECT DISTINCT
dv_id
, dv_load_time
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
FROM (
SELECT
dv_id
, dv_run_id
, dv_source_system
, dv_source_entity
, dv_package_version
, dv_load_name
, dv_driving_key
, status
, CURRENT_TIMESTAMP::timestamp_ntz AS dv_load_time
FROM
bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY
);
/* 6. drop_temp_end (OVERRIDE - SQL) */
DROP TABLE IF EXISTS bdv.Z_TMP_STS_CUSTOMER_STORE_TERRITORY;