cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Dynamic keys in Redshift Bulk Upsert

nisars
New Contributor III

Hello,

We have a pipeline design(in concept) which will pick up files from source and upsert into the corresponding redshift table (currently scoping approximately 250 source files and target tables). Weโ€™d like to do this in a single pipeline by defining the source, target and keys (and other information) in the database.

Weโ€™ve hit a hurdle at the specifying the keys in the redshift bulk upsert snap (applies to other snaps too), each table has a different number of keys. Is it possible to define the keys in an array format perhaps?

Any suggestions to try will be greatly appreciated. It would also be interesting to see how other similar projects have been implemented.

Many thanks in advance,
Shasta

5 REPLIES 5

aditya_sharma
New Contributor III

Hi,

You can do something like below to achieve your requirements.

In Redshift:

Create a control table which will take care of number of Snaplogic jobs going to run:

You can create schema of control table something like this:

Import_order: Start from 1 to n where n is the number of files for your source system
Source_system_name: XYZ
Source_table_name: In your case file name
target_schema_name: Redshift target schema name
target_table_name: Redshift table name where you need that file to be loaded.
Query: If the source is RDBMS then you can use this column to write the select query here.
Condition: Incremental condition for your next delta load.
Enable_Flag: Boolean, if you want to run the job for this table or not.
Status: Status of the job when it was ran last time.
upsert_key: comma separated values.

In Snaplogic:

Create 2 pipelines, one parent and another child.
1)In the parent pipeline use Redshift read snap to read the record from your control table like " select * from control table where source_system_name =โ€˜filesโ€™ order by import_order.
2) Use mapper to read the upsert_key column values which is comma separated, split those values and create variable for each upsert key:
$upsert_key1.split(โ€˜,โ€™).length == 2 ? $upsert_key1.split(โ€˜,โ€™).sort()[0].trim() : โ€˜any_column_which_you_think_can be_available_in_every_tableโ€™ ---------- $upsertKey21 (you need to create like these variable equal to maximum upsert key you have. For example in 250 files if there is one table which has maximum upsert keys 5, you need to create these variable like this upsert51,upsert52,upsert53,upsert54,upsert55. For 2 upserkey: upsert21,upsert22

$upsert_key1.split(โ€˜,โ€™).length ------$upsertKeysCount

  1. Send the output of above into pipeline execute snap, so if your control table has 250 entries for source_system files, you will 250 child pipeline running.

  2. In the child pipeline check the upsertkey count and based on that route the flow to appropriate redshift bulk_upsert snap for eg: if table has 2 upsert keys, your redshift upsert will have upsert_key values like $upsertkey21 and $upsertkey22

Hope it will helps you.

nisars
New Contributor III

Hi @aditya.sharma,

Many thanks for responding to my query in such detail.

I did see your post and solution on another thread and it is a decent work around, however we want to create something with more flexibility as there could be more than 2 upsert keys (or a fixed number).

Many thanks again

aditya_sharma
New Contributor III

Hi Nisar,

You can make the solution flexible, but you need to do little bit of some work before you design the pipeline and I do the same. I assume that whenever you work with any kind of source system you know the schema of all the tables/files, so what I mean to say if somehow you know the number of upsert_keys for each table may be by writing a small python script if the metadata is in files or if it is a RDMS you can easily get it from information_schema, you need to just see what is the max number of upsert keys you have for that source system and then in the mapper as I suggested in my initial response add those number of values.
I have attached sample pipeline just to show the logic. I have designed pipelines for different source systems in my project based on this logic and they are working fine. Our table count is minimum 300 for each source system.

Dynamic_upsert_keys_parent_2018_02_02.slp (10.4 KB)
Dynamic_upsert_keys_child_2018_02_02.slp (11.6 KB)

nisars
New Contributor III

Thank you Aditya - will respond regardin your solution shortly - apologies for the delays in my response!

We have decided to take a different route right now. Weโ€™ve moved away from using the bulk upsert snap and instead place the new data in a staging area, delete the rows from the table which need to be updated and insert the new ones to replace them. The keys for the where condition are assigned using a script and this is used in the expression to build the sql statement to be executed.