Skip to main content
Skip table of contents

Using Run ID Logic

Guide objective

This guide explains how Run ID Logic works and presents examples of its common use cases.

Run ID Logic is a delta loading feature in Agile Data Engine that keeps track of data batches (run ids) that have been loaded between a source entity and a target entity. This enables selective loading of new data between the source-target pair, and loading it exactly once. Run ID Logic can be enabled in loads throughout the data warehouse zones to form a chain of incremental loads from e.g. staging to data warehouse to publish.

Run ID Logic also enables inserting change history data batch by batch in the correct order in cases where there are multiple source data batches queued. This is useful when loading change history tracking entities such as data vault satellites.

See also:


Understanding Run ID Logic

Run ids are running integer values that are written into a metadata attribute of technical type RUN_ID when loading data into a table (target run id), and used for filtering in outgoing loads when selecting data from the table (source run id). At least the source table in a run id load must have a run id attribute.

Default naming for run id attributes, for example:

  • stg_batch_id

  • stg_run_id

  • sdt_stage_batch_id

  • dv_run_id

The below diagram illustrates loading source data first into staging and then into further data warehouse tables using Run ID Logic (click image to enlarge):

File load and staging

When source data files are loaded into a staging table, source data batch ids are logged as loaded target run ids for that staging table. Batch id identifies a unique data batch extracted from a source. Incoming batch ids have to be notified using Notify API with the corresponding file addesses (see example in the above diagram). Batch ids can be notified on manifest level (multiple files per batch) or on manifest entry level (one file per batch).

Target run ids must be written into metadata columns (technical type RUN_ID) so that Agile Data Engine can filter queries when running loads from one table to another. Therefore, also source batch ids must be inserted into staging tables. Source batch ids can be written into the source file contents during data extraction by the integration process, or extracted from the file name during file load if the target database supports this. Notified batch ids must match the ids loaded into staging tables.

If you do not have control over source file generation and naming, it is possible to use the manifestrunid variable in an OVERRIDE_FILE_LOAD load step. With the variable, you can insert a manifest-level run id generated by Agile Data Engine into the staging table (again, if the target database supports this). Note that in this case you also do not need to notify batch ids as Agile Data Engine will generate them.

If you are using an external process to load source data into staging tables (i.e. you are not using file loads in Agile Data Engine), it is possible to post new run ids to be loaded using the External API (SaaS Enterprise and higher only). This way Agile Data Engine can take over loads onwards from staging while still using Run ID Logic. This can also be useful in some cases when running an initial load with a large amount of files. Rather than going through the Notify API process, files can be loaded into the staging table manually and their run ids added with an External API call.

See examples of different use cases below.

Further data warehouse loads

To enable Run ID Logic for a load:

Various Run ID Logic related variables can be utilized when using OVERRIDE load steps:

As presented in the diagram above, Agile Data Engine keeps track of which run ids have been loaded per source-target entity pair and this information is used in run id loads to filter the SELECT query from the source entity. This limits the amount of data to be processed to new records only. As run id logging is source-target specific, loads from a staging entity into several target entities can be executed in different schedules, and data can be persisted in staging if needed.

OPT_NUM_RUN_IDS_PER_TRANSACTION can be used to limit the number of run ids processed per transaction. In the above example, loads into dw.table_3 and dw.table_4 are loaded one run id at a time. This is useful when there can be multiple batches of source data queued to be loaded in one load execution, and change history needs to be stored in the correct order. Assuming that the source batch ids are in the correct order, data will end up into the target tables in the correct order as well.

Managing Run IDs with External API

Edition: SaaS Enterprise

Loaded run ids are stored in Agile Data Engine for about 30 days. Agile Data Engine will not load already loaded run ids again if such run ids are queued again during that period. On the other hand, Agile Data Engine will also not load run ids older than that period if a new run id load is introduced.

Run IDs can be searched, added and deleted using the Run ID Logic related functionalities in External API. There is a command line interface (CLI) tool available in Github to get started with External API. Also see the External API related examples below which utilize the CLI.


Notes

External API is only available in SaaS Enterprise and higher editions, i.e. run IDs cannot be deleted in SaaS Standard edition. Create new batches to reload data in SaaS Standard or use the manifestrunid variable instead.

In the Private edition, run id API is found from the process-api service.

File loading practices and possibilities depend on the target database. Generally it is a good practice to generate batch ids in a data integration tool while extracting source data, write batch ids into source data files into a meta attribute, and notify batch ids to Agile Data Engine through Notify API when notifying new files.

Snowflake also enables extracting a batch id from a filename during a file load with metadata columns and transforming data during a load. This makes it optional to write the batch id into the source file content. See an example of this below.

The manifestrunid variable can be used in OVERRIDE_FILE_LOAD steps to insert a manifest-level run id into a staging table (if the target database supports this). Use this option to enable Run ID Logic even when you are not able to produce batch ids into source data files nor notify them. See how this would work in Azure SQL Database in the example below.

Run ID Logic is also supported in multi-source loads (loads with multiple entity mappings) so that one of the entity mappings is driving it. See entity mapping types for more information.


Examples

Snowflake: Extracting batch id from file name

Snowflake provides metadata columns for staged files which can be used to write file metadata into a staging table while running a COPY INTO statement. Follow this example to see how to extract the source data batch ids from file names in a file load.

Our example source files are in an S3 bucket which is defined in Snowflake as an external stage named staging.aws_stage. An external stage is required for transforming data during a load.

Source file foldering & naming convention:

EXAMPLE
{bucket}/{source_system}/{source_entity}/{yyyy}/{mm}/{dd}/{source_entity}.batch.{batch_id}.{file_type}.{compression}

Example files:

EXAMPLE
s3://demo-landing-bucket/barentswatch/ais_norway/2022/07/15/ais_norway.batch.1657875600000.json.gz
s3://demo-landing-bucket/barentswatch/ais_norway/2022/07/15/ais_norway.batch.1657879200000.json.gz
s3://demo-landing-bucket/barentswatch/ais_norway/2022/07/15/ais_norway.batch.1657882800000.json.gz

Note that in Snowflake stages are referenced with an @ character. The notification process must replace the actual S3 file path with an external stage reference and the relative file path. Also, the notified file specific batch ids have to match the ones in the file names.

Manifest entries posted to Notify API:

JSON
[
  {
    "batch": 1657875600000,
    "sourceFile": "@staging.aws_stage/barentswatch/ais_norway/2022/07/14/ais_norway.batch.1657875600000.json.gz"
  },
  {
    "batch": 1657879200000,
    "sourceFile": "@staging.aws_stage/barentswatch/ais_norway/2022/07/14/ais_norway.batch.1657879200000.json.gz"
  },
  {
    "batch": 1657882800000,
    "sourceFile": "@staging.aws_stage/barentswatch/ais_norway/2022/07/14/ais_norway.batch.1657882800000.json.gz"
  }
]

OVERRIDE_FILE_LOAD load step defined in the file load:

SQL
COPY INTO <target_schema>.<target_entity_name>
FROM (SELECT
        regexp_substr(metadata$filename, 'batch\\.([0-9]*)\\.', 1, 1, 'e')::integer as stg_batch_id,
        current_timestamp as stg_create_time,
        'barentswatch' as stg_source_system,
        'ais_norway' as stg_source_entity,
        metadata$filename as stg_file_name,
        $1 as payload
    FROM
       <from_path>
) 
FILES = (<from_file_list>)
FILE_FORMAT = (<file_format>)
<copy_options>;

Here we are using the REGEXP_SUBSTR function to extract the batch id from the file name with a regular expression. At the same time we are also populating other metadata attributes in the staging table. The contents of the JSON files are written into a VARIANT column. Various Agile Data Engine variables are used to make the load dynamic; it supports manifests with one or multiple files, different file formats and load options.

Load SQL generated by Agile Data Engine:

SQL
COPY INTO staging.STG_BARENTSWATCH_AIS_JSON
FROM (SELECT
        regexp_substr(metadata$filename, 'batch\\.([0-9]*)\\.', 1, 1, 'e')::integer as stg_batch_id,
        current_timestamp as stg_create_time,
        'barentswatch' as stg_source_system,
        'ais_norway' as stg_source_entity,
        metadata$filename as stg_file_name,
        $1 as payload
    FROM
       '@staging.aws_stage/barentswatch/ais_norway/2022/07/15/'
) 
FILES=(
    'ais_norway.batch.1657875600000.json.gz',
    'ais_norway.batch.1657879200000.json.gz',
    'ais_norway.batch.1657882800000.json.gz')
FILE_FORMAT = (type='JSON' ...);

Azure SQL Database: Using manifestrunid with OVERRIDE_FILE_LOAD

Azure SQL Database allows transformations within a file load with the INSERT INTO ... SELECT ... FROM OPENROWSET pattern. Using this file load pattern, an Agile Data Engine generated manifest-level batch id can be inserted into a staging table with the manifestrunid variable.

With this file load pattern:

  • Metadata attributes including the source batch id do not need to be written into the source files before loading them into the target database.

  • Batch ids do not need to be notified.

  • Agile Data Engine generates a batch id on manifest level, i.e. files added to the same manifest must be of the same batch. Create single-file manifests if necessary.

When using OPENROWSET, a format file is usually required. See the provided Microsoft documentation links for more details.

Manifest entries posted to Notify API:

JSON
[{
    "sourceFile": "digitraffic/locations_latest/2022/07/14/locations_latest_20220714001.csv"
}]

Note that the paths of manifest entries are relative to the external data source defined in the SQL database.

OVERRIDE_FILE_LOAD load step defined in the file load:

SQL
INSERT INTO <target_schema>.<target_entity_name>
SELECT
	<manifestrunid> as stg_batch_id,
	current_timestamp as stg_create_time,
	'digitraffic' as stg_source_system,
	'locations_latest' as stg_source_entity,
	src.*
FROM OPENROWSET(
    BULK <from_path>,
    DATA_SOURCE = '<storage_integration>',
    FORMATFILE = 'sql-format-files/<target_schema>.<target_entity_name>.xml',
    FORMATFILE_DATA_SOURCE = '<storage_integration>',
    FIRSTROW = 2
) src;

A run id will be automatically added using the manifestrunid variable, other metadata attributes can be defined as well. From_path is resolved as the notified file path and storage_integration as the external data source name during load execution. When using a format file, the format file location and data source must also be defined. Note that environment variables could be utilized here.

Load SQL generated by Agile Data Engine:

SQL
INSERT INTO staging.STG_DIGITRAFFIC_LOCATIONS_LATEST
SELECT
	1657881333982 as stg_batch_id,
	current_timestamp as stg_create_time,
	'digitraffic' as stg_source_system,
	'locations_latest' as stg_source_entity,
	src.*
FROM OPENROWSET(
    BULK 'digitraffic/locations_latest/2022/07/14/locations_latest_20220714001.csv',
    DATA_SOURCE = 'dlsadedemo',
    FORMATFILE = 'sql-format-files/staging.STG_DIGITRAFFIC_LOCATIONS_LATEST.xml',
    FORMATFILE_DATA_SOURCE = 'dlsadedemo',
    FIRSTROW = 2
) src;

Azure Synapse SQL: Using manifestrunid with OVERRIDE_FILE_LOAD

Azure Synapse SQL allows specifying a column list with default values within a COPY statement. This feature can be used as a workaround to insert an Agile Data Engine generated manifest-level batch id into a staging table with the manifestrunid variable.

With this file load pattern:

  • Metadata attributes including the source batch id do not need to be written into the source files before loading them into the target database. However, note that Azure Synapse SQL only supports constant default value definitions.

  • Batch ids do not need to be notified.

  • Agile Data Engine generates a batch id on manifest level, i.e. files added to the same manifest must be of the same batch. Create single-file manifests if necessary.

Manifest entries posted to Notify API:

JSON
[{
    "sourceFile": "https://dlsadedemodev.blob.core.windows.net/raw/digitraffic/locations_latest/2022/07/14/locations_latest_20220714001.csv"
}]

OVERRIDE_FILE_LOAD load step defined in the file load:

SQL
COPY INTO <target_schema>.<target_entity_name>
(
    stg_batch_id DEFAULT <manifestrunid> 101
    ,stg_source_system DEFAULT 'digitraffic' 102
    ,stg_source_entity DEFAULT 'locations_latest' 103
    ,mmsi 1
    ,type 2
    ,geometry 3
    ,geometry_coordinates 4
    ,properties_mmsi 5
    ,sog 6
    ,cog 7
    ,navstat 8
    ,rot 9
    ,posacc 10
    ,raim 11
    ,heading 12
    ,ts 13
    ,timestamp_external 14
)
FROM <from_path>
WITH (<file_format>);

Note how the target table attribute names are listed with their corresponding source file field indexes. Metadata attributes are specified with default values and dummy field indexes outside the source file field index range.

A run id will be automatically added using the manifestrunid variable, from_path is resolved as the notified file path (or list of paths), and file_format as the file format options during load execution.

Load SQL generated by Agile Data Engine:

SQL
COPY INTO staging.STG_DIGITRAFFIC_LOCATIONS_LATEST
(
    stg_batch_id DEFAULT 1659440355273 101
    ,stg_source_system DEFAULT 'digitraffic' 102
    ,stg_source_entity DEFAULT 'locations_latest' 103
    ,mmsi 1
    ,type 2
    ,geometry 3
    ,geometry_coordinates 4
    ,properties_mmsi 5
    ,sog 6
    ,cog 7
    ,navstat 8
    ,rot 9
    ,posacc 10
    ,raim 11
    ,heading 12
    ,ts 13
    ,timestamp_external 14
)
FROM 'https://dlsadedemodev.blob.core.windows.net/raw/digitraffic/locations_latest/2022/07/14/locations_latest_20220714001.csv'
WITH (
    FILE_TYPE = 'CSV',
    CREDENTIAL=(IDENTITY= 'Managed Identity'),
    FIELDQUOTE = '"',
    FIELDTERMINATOR='|',
    ENCODING = 'UTF8',
    FIRSTROW = 2);

Using Run ID Logic together with Timeslot Logic

Combining Run ID Logic with Timeslot Logic can be very effective performance-wise especially in high-frequency incremental load use cases such as near real-time (NRT) workflows with large amounts of non-updating event or transaction data.

Data warehouse loads often have an existence check that avoids inserting duplicate entries into the target entity and this existence check often defines how heavy the load is. While Run ID Logic filters the incoming data from the source entity down to new records only, Timeslot Logic narrows down the existence check against the target entity when inserting those records:

To enable timeslot logic:

Smoke testing new records only

Run ID Logic can be used with smoke tests (SMOKE_GREY, SMOKE_BLACK) to limit testing to new records only. This is useful when testing entities with a lot of data and when there is no need to repeat the test for data that has already been tested before.

In this example, we will be testing an attribute for null or zero values so that only new records are tested:

Parameter

Value

Source entity

STG_DIGITRAFFIC_METADATA_VESSELS

Target entity

H_MARINE_VESSEL_MMSI

Tested attribute

mmsi

Run ID Logic enabled with: OPT_USE_RUN_IDS = true

SMOKE_GREY load step using the loadablerunids variable:

SQL
select count(1)
from staging_ms.STG_DIGITRAFFIC_METADATA_VESSELS
where
    stg_run_id in (<loadablerunids>)
    and (mmsi is null or mmsi = 0)

Smoke test executed by Agile Data Engine:

SQL
SELECT count(*) as cntresult
FROM (
    select count(1)
    from staging_ms.STG_DIGITRAFFIC_METADATA_VESSELS
    where
        stg_batch_id in (
            1658110082143,
            1658113682651,
            1658117281752,
            1658120881978,
            1658124482459,
            1658128082145)
        and (mmsi is null or mmsi = 0)
) as cntquery

Loadablerunids is resolved as a list of run ids that have been queued between the source and target entities.

Note that Agile Data Engine adds a count query around the defined smoke test to ensure that data will not be transferred to the service.

Deleting run ids with External API to reload data

Edition: SaaS Enterprise

This example uses the External API CLI tool and assumes that it has been installed and configured for the target environment.

In this example we will delete a previously loaded run id (source batch id) using External API to enable reloading data with the same source batch id.

Parameter

Value

Load id

026c7f09-ce19-467c-9406-071547e1b91c

Source entity

staging.STG_DIGITRAFFIC_METADATA_VESSELS

Source entity id

451174ed-19b0-42f6-9316-07067fae1d91

Target entity

rdv.H_MARINE_VESSEL_IMO

Target entity id

f5be588d-849c-4dc6-b074-fadca3301db5

Run id to be deleted

1657951682127

First, we would like to list loaded run ids. Let's start by using the --help option to show available options:

BASH
ade run-ids search --help

Instructions will be printed:

BASH
Usage: ade run-ids search [OPTIONS]

  Search run ids stored in target environment

Options:
  --environment-name TEXT  [required]
  --load-id TEXT
  --source-entity-id TEXT
  --target-entity-id TEXT
  --source-run-id INTEGER
  --target-run-id INTEGER
  --days INTEGER           How many days backwards is searched for
  --limit INTEGER          Max amount of results returned
  --help                   Show this message and exit.

Next, we will search for run ids using load-id and limit the search with days:

BASH
ade run-ids search --environment-name dev --load-id 026c7f09-ce19-467c-9406-071547e1b91c --days 1

Entity ids and load ids are visible in the Designer user interface. A load is developed as part of its target entity (see incoming loads for an entity), and its source entities are mapped in the entity mapping of the load.

The CLI tool lists run ids as a JSON array:

JSON
[
    {
        "runTimestamp": 1658021510257,
        "targetRunId": 150,
        "targetEntityId": "f5be588d-849c-4dc6-b074-fadca3301db5",
        "loadId": "026c7f09-ce19-467c-9406-071547e1b91c",
        "sourceRunId": 1657958881971,
        "sourceEntityId": "451174ed-19b0-42f6-9316-07067fae1d91",
        "affectedRows": 0
    },
    {
        "runTimestamp": 1658021510257,
        "targetRunId": 150,
        "targetEntityId": "f5be588d-849c-4dc6-b074-fadca3301db5",
        "loadId": "026c7f09-ce19-467c-9406-071547e1b91c",
        "sourceRunId": 1657955282648,
        "sourceEntityId": "451174ed-19b0-42f6-9316-07067fae1d91",
        "affectedRows": 0
    },
    {
        "runTimestamp": 1658021510257,
        "targetRunId": 150,
        "targetEntityId": "f5be588d-849c-4dc6-b074-fadca3301db5",
        "loadId": "026c7f09-ce19-467c-9406-071547e1b91c",
        "sourceRunId": 1657951682127,
        "sourceEntityId": "451174ed-19b0-42f6-9316-07067fae1d91",
        "affectedRows": 0
    }
]

Where:

  • SourceRunId is the run id that Agile Data Engine used to filter the source entity when it ran the load into the target entity (loadablerunids).

  • TargetRunId is the run id that was written to the target entity (targetrunid) and what could be used again for further run id loads onwards from the target entity.

Now, let's see --help for the delete command:

BASH
ade run-ids delete --help

Instructions will be printed:

BASH
Usage: ade run-ids delete [OPTIONS]

  Deletes run ids according to given search criteria.

Options:
  --environment-name TEXT       [required]
  --load-id TEXT
  --source-entity-id TEXT
  --target-entity-id TEXT       [required]
  --source-run-id INTEGER
  --target-run-id INTEGER
  --from-run-timestamp INTEGER
  --to-run-timestamp INTEGER
  --help                        Show this message and exit.

Finally, let's run the delete command for the run id:

BASH
ade run-ids delete --environment-name dev --target-entity-id f5be588d-849c-4dc6-b074-fadca3301db5 --source-run-id 1657951682127

Success response:

BASH
OK

If we now trigger the load, the run id will be loaded again:

SQL
-- INFO - TargetRunId: 157, loaded runids: [1657951682127]

INSERT INTO rdv.H_MARINE_VESSEL_IMO
SELECT 
    ...
    157 AS dv_run_id
FROM staging.STG_DIGITRAFFIC_METADATA_VESSELS
WHERE
    stg_batch_id IN (1657951682127)
...

Adding run ids with External API

Edition: SaaS Enterprise

This example uses the External API CLI tool and assumes that it has been installed and configured for the target environment.

In this example we will load source data to a database staging table manually and add a run id with External API to enable run id loads onwards from the staging table. This is useful e.g. for running an initial load; often there are lots of files and it is easier to load them manually with a folder path rather than notifying individual files.

Parameter

Value

Target entity

staging.STG_DIGITRAFFIC_METADATA_VESSELS_JSON

Target entity id

e05e6bd4-4f4b-4d5b-b77b-26d55f85c83e

Run id

1658205691470 (current timemillis)

We will start by loading source files from a given path to the staging table by running the COPY INTO statement manually in the database. We have given the current time in milliseconds since epoch as the run id for the batch. You could also specify e.g. file specific run ids. Note that the load is presented in Snowflake SQL but the process is similar in other databases.

SQL
COPY INTO staging.stg_digitraffic_metadata_vessels_json
FROM 
    (SELECT
        1658205691470 as stg_batch_id,
        current_timestamp as stg_create_time,
        'digitraffic' as stg_source_system,
        'metadata_vessels' as stg_source_entity,
        metadata$filename as stg_file_name,
        $1
    FROM @AWS_STAGE/digitraffic/metadata_vessels/2022/07/19/)
FILE_FORMAT = (type='JSON' COMPRESSION = 'AUTO');

Next, we will add the run id with External API. Let's use the --help option first to show available options for the add command:

BASH
ade run-ids add --help

Note that you could also use the add-many command to add multiple run ids at the same time.

Instructions will be printed:

BASH
Usage: ade run-ids add [OPTIONS]

  Add run ids to target env to be loaded.

Options:
  --environment-name TEXT  [required]
  --affected-rows TEXT
  --target-entity-id TEXT  [required]
  --target-run-id INTEGER  [required]
  --help                   Show this message and exit.

Run ids are added as target run ids for a target entity. In this case, our target entity is the staging table and our target run id is the given millisecond timestamp:

BASH
ade run-ids add --environment-name dev --target-entity-id e05e6bd4-4f4b-4d5b-b77b-26d55f85c83e --target-run-id 1658205691470

The response indicates success:

JSON
[
    {
        "runTimestamp": 1658206510876,
        "targetRunId": 1658205691470,
        "targetEntityId": "e05e6bd4-4f4b-4d5b-b77b-26d55f85c83e",
        "loadId": null,
        "sourceRunId": null,
        "sourceEntityId": null,
        "affectedRows": null
    }
]

Finally, let's trigger the load and see the logs. Now we are interested in the outgoing loads from our staging entity where we added the run id. In this case there is one outgoing load to another staging table into which the loaded JSON files are parsed:

SQL
-- INFO - TargetRunId: 89, loaded runids: [1658205691470]

INSERT INTO staging.stg_digitraffic_metadata_vessels
SELECT
    89 as stg_batch_id,
    ...
FROM staging.stg_digitraffic_metadata_vessels_json ...
WHERE stg_batch_id in (1658205691470);
JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.