Customizing File Load
Guide objective
This guide explains how you can customize a file load either by using load options or writing your own fully customized file load statement.
Load options can be used to customize the file load generated by Agile Data Engine when your file load does not contain transformations.
The OVERRIDE_FILE_LOAD load step type can be used when you need to fully customize a file load i.e. manually define the load SQL. With OVERRIDE_FILE_LOAD you can, for example, transform data during a file load, generate metadata attributes that are not part of the source file itself, or explicitly specify the copied data in a SELECT list. Note that the possibilities on what type of transformations can be done during a file load depend on your target database product.
Snowflake
Loading CSV Data
Example Data
The example CSV file used in this guide:
locationid, borough, zone, service_zone
1, EWR, Newark Airport, EWR
2, Queens, Jamaica Bay, Boro Zone
4, Manhattan, Alphabet City, Yellow Zone
Notification Process
The example source file is in an S3 bucket:
s3://example-data-bucket/tripdata-taxizone-csv/2023/08/29/taxi_zone_lookup.csv
The bucket is defined in Snowflake as an external stage named staging. An external stage is required for transforming data during a load. Note that in Snowflake stages are referenced with an @ character. The notification process must replace the actual S3 file path with an external stage reference and the relative file path:
"sourceFile": "@staging/tripdata-taxizone-csv/2023/08/29/taxi_zone_lookup.csv"
Agile Data Engine is notified (using Notify API) about the CSV file using the following data:
# Manifest request body:
{
"batch": 20230829010101,
"delim": "COMMA",
"format": "CSV",
"fullscanned": true,
"skiph": 1
}
# Manifest entry request body
{
"sourceFile": "@staging/tripdata-taxizone-csv/2023/08/29/taxi_zone_lookup.csv"
}
Loading Raw CSV
If the CSV file is loaded as is and you have only the four attributes in your staging entity, you don’t need to customize the file load.
The example table contains the following attributes:
First create a load using LOAD_FILE load type and then map the source entity to the staging entity. If you notify Agile Data Engine about the new file using the above mentioned external stage (@stage), you don’t need to use the OPT_STORAGE_INTEGRATION load option.
Agile Data Engine uses the following to form the load statement:
COPY INTO <target_schema>.<target_entity_name>
FROM <from_path>
FILE_FORMAT = (<file_format>)
FILES=(<from_file_list>)
The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:
COPY INTO staging.STG_TAXI_ZONES_CSV
FROM '@staging/tripdata-taxizone-csv/2023/08/29/'
FILE_FORMAT=(type='csv' skip_header=1 field_delimiter=',' FIELD_OPTIONALLY_ENCLOSED_BY='"' COMPRESSION='AUTO')
FILES=('taxi_zone_lookup.csv')
Transforming Data During CSV Load
If you have additional attributes in your staging table, you need to customize the file load using the OVERRIDE_FILE_LOAD load step type.
The example table contains the following attributes:
Create a LOAD_FILE load, map the source entity, and create an OVERRIDE_FILE_LOAD load step.
Add the following statement to the OVERRIDE_FILE_LOAD load step:
COPY INTO <target_schema>.<target_entity_name>
FROM (SELECT
<manifestrunid> as stg_batch_id,
current_timestamp as stg_create_time,
'taxidata' as stg_source_system,
'taxi_zone_lookup' as stg_source_entity,
metadata$filename as stg_file_name,
$1,
$2,
$3,
$4
FROM
<from_path>
)
FILES=(<from_file_list>)
FILE_FORMAT = (<file_format>);
The from_path, from_file_list, and manifestrunid variables are set based on the manifest content sent to Notify API. The file_format variable is set either based on the manifest content or on the OPT_FILE_FORMAT_OPTIONS load option. Metadata$filename is a Snowflake metadata column and it provides the path of the loaded file.
See also Snowflake’s guide on how to transform data during a load.
The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:
COPY INTO staging.STG_TAXI_ZONES_CSV
FROM (SELECT
20230829010101 as stg_batch_id,
current_timestamp as stg_create_time,
'taxidata' as stg_source_system,
'taxi_zone_lookup' as stg_source_entity,
metadata$filename as stg_file_name,
$1,
$2,
$3,
$4
FROM
'@staging/tripdata-taxizone-csv/2023/08/29/'
)
FILES=('taxi_zone_lookup.csv')
FILE_FORMAT = (type='csv' skip_header=1 field_delimiter=',' FIELD_OPTIONALLY_ENCLOSED_BY='"' COMPRESSION='AUTO')
Loading Parquet Data
Example Data
The representative example of the example Parquet file used in this guide:
[
{
"locationid": 1,
"borough": "EWR",
"zone": "Newark Airport",
"service_zone": "EWR"
},
{
"locationid": 2,
"borough": "Queens",
"zone": "Jamaica Bay",
"service_zone": "Boro Zone"
},
{
"locationid": 4,
"borough": "Manhattan",
"zone": "Alphabet City",
"service_zone": "Yellow Zone"
}
]
Notification Process
The example source file is in an S3 bucket:
s3://example-data-bucket/tripdata-taxizone-parquet/2023/08/29/taxi_zone_lookup.parquet
The bucket is defined in Snowflake as an external stage named staging. An external stage is required for transforming data during a load. Note that in Snowflake stages are referenced with an @ character. The notification process must replace the actual S3 file path with an external stage reference and the relative file path:
"sourceFile": "@staging/tripdata-taxizone-parquet/2023/08/29/taxi_zone_lookup.parquet"
Agile Data Engine is notified (using Notify API) about the PARQUET file using the following values:
# Manifest request body:
{
"batch": 20230829010101,
"format": "UNKNOWN"
}
# Manifest entry request body
{
"sourceFile": "@staging/tripdata-taxizone-parquet/2023/08/29/taxi_zone_lookup.parquet"
}
Loading Raw Parquet
If the Parquet file is loaded as is and you have only the four attributes in your staging entity, you don’t need to write your own file load.
The example table contains the following attributes:
First create a load using the LOAD_FILE load type, then map the source entity to the staging entity, and finally add OPT_FILE_FORMAT_OPTIONS and OPT_SF_FILE_COPY_OPTIONS load options with values provided later.
Set the following values to the corresponding load options:
OPT_FILE_FORMAT_OPTIONS: TYPE = 'PARQUET'
OPT_SF_FILE_COPY_OPTIONS: MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE'
Agile Data Engine uses the following to form the load statement:
COPY INTO <target_schema>.<target_entity_name>
FROM <from_path>
FILE_FORMAT = (<file_format>)
FILES=(<from_file_list>)
The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:
COPY INTO staging.STG_TAXI_ZONES_PARQUET
FROM '@staging/tripdata-taxizone-parquet/2023/08/29/'
FILE_FORMAT=(TYPE = 'PARQUET')
FILES=('taxi_zone_lookup.parquet')
MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE'
Transforming Data During Parquet Load
If you have additional attributes in your staging table, you need to customize the file load using OVERRIDE_FILE_LOAD load step type.
The example table contains the following attributes:
First create a load using the LOAD_FILE load type, then map the source entity to the staging entity, and finally add the OPT_FILE_FORMAT_OPTIONS load option with a value provided later.
Set the following value to the load option:
OPT_FILE_FORMAT_OPTIONS: TYPE = 'PARQUET'
Add the following statement to the OVERRIDE_FILE_LOAD load step:
COPY INTO <target_schema>.<target_entity_name>
FROM (SELECT
<manifestrunid> as stg_batch_id,
current_timestamp as stg_create_time,
'taxidata' as stg_source_system,
'taxi_zone_lookup' as stg_source_entity,
metadata$filename as stg_file_name,
$1:locationid::integer,
$1:borough::varchar,
$1:zone::varchar,
$1:service_zone::varchar
FROM
<from_path>
)
FILES=(<from_file_list>)
FILE_FORMAT = (<file_format>);
The from_path, from_file_list, and manifestrunid variables are set based on the manifest content sent to Notify API. The file_format variable is set either based on the manifest content or on the OPT_FILE_FORMAT_OPTIONS load option. Metadata$filename is Snowflake metadata column and it provides the path of the loaded file.
See also Snowflake’s guide on how to transform data during a load.
The attributes are then replaced in Workflow Orchestration by the values given in Designer and in the notification:
COPY INTO staging.STG_TAXI_ZONES_PARQUET
FROM (SELECT
20230829010101 as stg_batch_id,
current_timestamp as stg_create_time,
'taxidata' as stg_source_system,
'taxi_zone_lookup' as stg_source_entity,
metadata$filename as stg_file_name,
$1:locationid::integer,
$1:borough::varchar,
$1:zone::varchar,
$1:service_zone::varchar
FROM
'@staging/tripdata-taxizone-parquet/2023/08/29/'
)
FILES=('taxi_zone_lookup.parquet')
FILE_FORMAT=(TYPE = 'PARQUET')