cancel
Showing results for 
Search instead for 
Did you mean: 

Lost contact with Snaplex node

msalter
New Contributor II

I have a very small pipeline (3 snaps) that I’m reading from a SQL table and writing the results to a new table in the same DB. I keep getting an error: Lost contact with Snaplex node while the pipeline was running. The select statement is pulling 250+M records and I’m using the Azure bulk insert to write. In order to avoid this error I keep having to reduce the batch size, from 20K to 10K to now 8K. Any thoughts on what could be causing the error?

1 ACCEPTED SOLUTION

koryknick
Employee
Employee

@darshthakkar - I believe @dmiller is correct that the Shard Offsets is a custom snap. I’ve replicated the logic with core snaps and built in expressions. See the attached example.
Shard Example_2023_07_05.slp (5.6 KB)

The Mapper is where the magic happens.

sl.range(0, $TOTAL, Math.ceil ($TOTAL / parseInt(_shard_count)))
.map((val,idx,arr)=> 
  idx == arr.length - 1 
  ? { offset : val, limit : $TOTAL - val } 
  : { offset : val, limit : arr[idx + 1] - val } 
)

First, use the sl.range() built-in function to generate an array with the offsets to be used, then use Array.map() method to recreate the simple array of integers as an array of objects to provide both the offset and limit for each shard.

After the Mapper, just use a JSON Splitter to get a new document for each limit and offset combination (same as Shard Offsets snap) that would feed your child pipeline with the Pipeline Execute.

Hope this helps!

View solution in original post

18 REPLIES 18

Thank you, @dmiller for your inputs and sharing knowledge on this.

koryknick
Employee
Employee

@darshthakkar - I believe @dmiller is correct that the Shard Offsets is a custom snap. I’ve replicated the logic with core snaps and built in expressions. See the attached example.
Shard Example_2023_07_05.slp (5.6 KB)

The Mapper is where the magic happens.

sl.range(0, $TOTAL, Math.ceil ($TOTAL / parseInt(_shard_count)))
.map((val,idx,arr)=> 
  idx == arr.length - 1 
  ? { offset : val, limit : $TOTAL - val } 
  : { offset : val, limit : arr[idx + 1] - val } 
)

First, use the sl.range() built-in function to generate an array with the offsets to be used, then use Array.map() method to recreate the simple array of integers as an array of objects to provide both the offset and limit for each shard.

After the Mapper, just use a JSON Splitter to get a new document for each limit and offset combination (same as Shard Offsets snap) that would feed your child pipeline with the Pipeline Execute.

Hope this helps!

Thanks a TON @koryknick for your help on this.

Hello @koryknick, can you please help in my case. I am facing the same issue, due to large vol of data and I did not quite get how to do. Where do I add these mappers. I am attaching some screenshots herewith. So basically, my child pipeline reads from s3uri passed from parent pipeline and when it splits that file content, it will have 2M+ records that will insert the data into the respective tables

ashis1upreti_0-1717695526817.png

ashis1upreti_0-1717696329438.png

 

ashis1upreti_1-1717695554808.pngashis1upreti_2-1717695641628.png

 

it. Attaching my child and parent pipelines, where do I modify it exactly

@ashis1upreti - The error you are getting has nothing to do with transaction volume.  You are using an incorrect account for the snap type.  When creating or selecting accounts, I recommend you do it in Designer by using the Account dropdown list in the snap you want to use it in.  Keep in mind that accounts are specific to the snap pack associated with the snap being used.  So you cannot use an "AWS S3" account (which exists in the AWS S3 snap pack) in the File Reader or S3 File Reader (which are both  in the Binary snap pack).

In the future, please open a new thread rather than re-opening a new thread that has been resolved.  This just helps keep the focus of the topic at hand.  You can always refer to another thread by including a link to the previous thread if you believe it is related.