Skip to main content
Skip table of contents

Customizing File Load

Guide objective

This guide explains how you can customize a file load either by using load options or writing your own fully customized file load statement.

Load options can be used to customize the file load generated by Agile Data Engine when your file load does not contain transformations.

The OVERRIDE_FILE_LOAD load step type can be used when you need to fully customize a file load i.e. manually define the load SQL. With OVERRIDE_FILE_LOAD you can, for example, transform data during a file load, generate metadata attributes that are not part of the source file itself, or explicitly specify the copied data in a SELECT list. Note that the possibilities on what type of transformations can be done during a file load depend on your target database product.

Snowflake

Loading CSV Data

Example Data

The example CSV file used in this guide:

CODE
locationid, borough, zone, service_zone
1, EWR, Newark Airport, EWR
2, Queens, Jamaica Bay, Boro Zone
4, Manhattan, Alphabet City, Yellow Zone

Notification Process

The example source file is in an S3 bucket:

CODE
s3://example-data-bucket/tripdata-taxizone-csv/2023/08/29/taxi_zone_lookup.csv

The bucket is defined in Snowflake as an external stage named staging. An external stage is required for transforming data during a load. Note that in Snowflake stages are referenced with an @ character. The notification process must replace the actual S3 file path with an external stage reference and the relative file path:

CODE
"sourceFile": "@staging/tripdata-taxizone-csv/2023/08/29/taxi_zone_lookup.csv"

Agile Data Engine is notified (using Notify API) about the CSV file using the following data:

JSON
# Manifest request body:
{
  "batch": 20230829010101,
  "delim": "COMMA",
  "format": "CSV", 
  "fullscanned": true,
  "skiph": 1
}

# Manifest entry request body
{
  "sourceFile": "@staging/tripdata-taxizone-csv/2023/08/29/taxi_zone_lookup.csv"
}

Loading Raw CSV

If the CSV file is loaded as is and you have only the four attributes in your staging entity, you don’t need to customize the file load.

The example table contains the following attributes:

First create a load using LOAD_FILE load type and then map the source entity to the staging entity. If you notify Agile Data Engine about the new file using the above mentioned external stage (@stage), you don’t need to use the OPT_STORAGE_INTEGRATION load option.

Agile Data Engine uses the following to form the load statement:

SQL
COPY INTO <target_schema>.<target_entity_name>
  FROM <from_path>
  FILE_FORMAT = (<file_format>)
  FILES=(<from_file_list>)

The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:

SQL
COPY INTO staging.STG_TAXI_ZONES_CSV
  FROM '@staging/tripdata-taxizone-csv/2023/08/29/'
  FILE_FORMAT=(type='csv' skip_header=1 field_delimiter=',' FIELD_OPTIONALLY_ENCLOSED_BY='"' COMPRESSION='AUTO')
  FILES=('taxi_zone_lookup.csv')

Transforming Data During CSV Load

If you have additional attributes in your staging table, you need to customize the file load using the OVERRIDE_FILE_LOAD load step type.

The example table contains the following attributes:

Create a LOAD_FILE load, map the source entity, and create an OVERRIDE_FILE_LOAD load step.

Add the following statement to the OVERRIDE_FILE_LOAD load step:

SQL
COPY INTO <target_schema>.<target_entity_name>
FROM (SELECT
        <manifestrunid> as stg_batch_id,
        current_timestamp as stg_create_time,
        'taxidata' as stg_source_system,
        'taxi_zone_lookup' as stg_source_entity,
        metadata$filename as stg_file_name,
        $1,
        $2,
        $3,
        $4
    FROM
       <from_path>
) 
FILES=(<from_file_list>)
FILE_FORMAT = (<file_format>);

The from_path, from_file_list, and manifestrunid variables are set based on the manifest content sent to Notify API. The file_format variable is set either based on the manifest content or on the OPT_FILE_FORMAT_OPTIONS load option. Metadata$filename is a Snowflake metadata column and it provides the path of the loaded file.

See also Snowflake’s guide on how to transform data during a load.

The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:

SQL
COPY INTO staging.STG_TAXI_ZONES_CSV
FROM (SELECT
        20230829010101 as stg_batch_id,
        current_timestamp as stg_create_time,
        'taxidata' as stg_source_system,
        'taxi_zone_lookup' as stg_source_entity,
        metadata$filename as stg_file_name,
        $1,
        $2,
        $3,
        $4
    FROM
       '@staging/tripdata-taxizone-csv/2023/08/29/'
)
FILES=('taxi_zone_lookup.csv')
FILE_FORMAT = (type='csv' skip_header=1 field_delimiter=',' FIELD_OPTIONALLY_ENCLOSED_BY='"' COMPRESSION='AUTO')

Loading Parquet Data

Example Data

The representative example of the example Parquet file used in this guide:

JSON
[
  {
    "locationid": 1,
    "borough": "EWR",
    "zone": "Newark Airport",
    "service_zone": "EWR"
  },
  {
    "locationid": 2,
    "borough": "Queens",
    "zone": "Jamaica Bay",
    "service_zone": "Boro Zone"
  },
  {
    "locationid": 4,
    "borough": "Manhattan",
    "zone": "Alphabet City",
    "service_zone": "Yellow Zone"
  }
]

Notification Process

The example source file is in an S3 bucket:

CODE
s3://example-data-bucket/tripdata-taxizone-parquet/2023/08/29/taxi_zone_lookup.parquet

The bucket is defined in Snowflake as an external stage named staging. An external stage is required for transforming data during a load. Note that in Snowflake stages are referenced with an @ character. The notification process must replace the actual S3 file path with an external stage reference and the relative file path:

CODE
"sourceFile": "@staging/tripdata-taxizone-parquet/2023/08/29/taxi_zone_lookup.parquet"

Agile Data Engine is notified (using Notify API) about the PARQUET file using the following values:

JSON
# Manifest request body:
{
  "batch": 20230829010101,
  "format": "UNKNOWN"
}

# Manifest entry request body
{
  "sourceFile": "@staging/tripdata-taxizone-parquet/2023/08/29/taxi_zone_lookup.parquet"
}

Loading Raw Parquet

If the Parquet file is loaded as is and you have only the four attributes in your staging entity, you don’t need to write your own file load.

The example table contains the following attributes:

First create a load using the LOAD_FILE load type, then map the source entity to the staging entity, and finally add OPT_FILE_FORMAT_OPTIONS and OPT_SF_FILE_COPY_OPTIONS load options with values provided later.

Set the following values to the corresponding load options:

CODE
OPT_FILE_FORMAT_OPTIONS: TYPE = 'PARQUET' 
CODE
OPT_SF_FILE_COPY_OPTIONS: MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE'

Agile Data Engine uses the following to form the load statement:

SQL
COPY INTO <target_schema>.<target_entity_name>
  FROM <from_path>
  FILE_FORMAT = (<file_format>)
  FILES=(<from_file_list>)

The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:

SQL
COPY INTO staging.STG_TAXI_ZONES_PARQUET
  FROM '@staging/tripdata-taxizone-parquet/2023/08/29/'
  FILE_FORMAT=(TYPE = 'PARQUET')
  FILES=('taxi_zone_lookup.parquet')
  MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE'

Transforming Data During Parquet Load

If you have additional attributes in your staging table, you need to customize the file load using OVERRIDE_FILE_LOAD load step type.

The example table contains the following attributes:

First create a load using the LOAD_FILE load type, then map the source entity to the staging entity, and finally add the OPT_FILE_FORMAT_OPTIONS load option with a value provided later.

Set the following value to the load option:

CODE
OPT_FILE_FORMAT_OPTIONS: TYPE = 'PARQUET' 

Add the following statement to the OVERRIDE_FILE_LOAD load step:

SQL
COPY INTO <target_schema>.<target_entity_name>
FROM (SELECT
        <manifestrunid> as stg_batch_id,
        current_timestamp as stg_create_time,
        'taxidata' as stg_source_system,
        'taxi_zone_lookup' as stg_source_entity,
        metadata$filename as stg_file_name,
        $1:locationid::integer,
        $1:borough::varchar,
        $1:zone::varchar,
        $1:service_zone::varchar
    FROM
       <from_path>
) 
FILES=(<from_file_list>)
FILE_FORMAT = (<file_format>);

The from_path, from_file_list, and manifestrunid variables are set based on the manifest content sent to Notify API. The file_format variable is set either based on the manifest content or on the OPT_FILE_FORMAT_OPTIONS load option. Metadata$filename is Snowflake metadata column and it provides the path of the loaded file.

See also Snowflake’s guide on how to transform data during a load.

The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:

SQL
COPY INTO staging.STG_TAXI_ZONES_PARQUET
FROM (SELECT
        20230829010101 as stg_batch_id,
        current_timestamp as stg_create_time,
        'taxidata' as stg_source_system,
        'taxi_zone_lookup' as stg_source_entity,
        metadata$filename as stg_file_name,
        $1:locationid::integer,
        $1:borough::varchar,
        $1:zone::varchar,
        $1:service_zone::varchar
    FROM
       '@staging/tripdata-taxizone-parquet/2023/08/29/'
)
FILES=('taxi_zone_lookup.parquet')
FILE_FORMAT=(TYPE = 'PARQUET')

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.