cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Pipeline Execute parallel + sequential execution

krupalibshah
Contributor

The documentation mentions below,

Execute a pipeline for every input document. If the target pipeline has an unlinked input view, the input document to this Snap will be passed into the unlinked input of the child execution. Similarly, if the target pipeline has an unlinked output view, the output document from the child execution will be used as the output document from the Pipeline Execute Snap.
Execute one or more pipelines and reuse them to process multiple input documents to this Snap. The child pipeline must have one unlinked input view and one unlinked output view. As documents are received by this Snap, they will be passed to the child executions for processing. Any output documents from the child executions will be used as the output of this Snap.

So as per what I interpret it is when we select the option of reusing the child pipeline execution is like if a pipeline is already executing and we get more input document for it then the same execution will process each of the document in sequential order as it is given as input.

But in case where a new pipeline is to be triggered for execution, in that case a parallel version of the new pipeline will be started without disturbing the existing execution of already running pipeline.

Please add if I am missing anything here, for our use case we want to use this functionality to make sure if we have 2 pipelines to be executed with 5 documents each in parallel but giving the documents in sequence to the already running pipeline.

So this feature will help us achieve this?

9 REPLIES 9

nganapathiraju
Former Employee

You nailed it!

You can do a simple POC how it behaves using dummy JSON data.

tstack
Former Employee

I think youโ€™re on the right track, but I wanted to clarify that parallelization is controlled by the โ€˜Pool Sizeโ€™ setting, which is orthogonal to the reuse setting. If you enable reuse, with a pool size of 1, only one child execution will be started and documents will be sent to it in order. If you enable reuse, with a pool size greater than one, more than one child execution will be started and documents can be sent to any of the child executions.

Thanks for the valuable inputs.

So do you mean if I keep pool size as 1 then it will initiate one instance of the pipeline as per the input document and it will not allow even start another instance which is a different pipeline?

What my understanding was if I send 5 input documents, the document has the name of the pipeline to be executed out of which 2 documents have p1-pipeline and 3 has p2-pipeline. Here I would like both p1 and p2 to start running parallel but process their inputs in sequence.

so is this how it should it work? If not can you help me how can I enable such processing? I tried the below,

image

I am extracting documents from DB and after sorting them as per pipeline name and timestamp sending it to pipeline execute with โ€œreuseโ€ enabled,

image

here is what my pipeline execute will receive as input, and below is the configuration,

image

image

I get error in pipeline execute

{error:Invalid configuration, stacktrace:com.snaplogic.snap.api.SnapDataException: Invalid configuration\n	at com.snaplogic.snaps.flow.PipeExec.process(PipeExec.java:722)\n	at com.sna...} 
"error": "Invalid configuration"
"stacktrace": "com.snaplogic.snap.api.SnapDataException: Invalid configuration\n	at com.snaplogic.snaps.flow.PipeExec.process(PipeExec.java:722)\n	at com.snaplogic.snaps.flow.PipeExec.processSafely(PipeExec.java:699)\n	at com.snaplogic.snaps.flow.PipeExec.execute(PipeExec.java:628)\n	at com.snaplogic.snaps.flow.PipeExec.executeForSuggest(PipeExec.java:1273)\n	at com.snaplogic.cc.snap.common.SnapRunnableImpl.executeSnap(SnapRunnableImpl.java:677)\n	at com.snaplogic.cc.snap.common.SnapRunnableImpl.executeForSuggest(SnapRunnableImpl.java:556)\n	at com.snaplogic.cc.snap.common.SnapRunnableImpl.doRun(SnapRunnableImpl.java:735)\n	at com.snaplogic.cc.snap.common.SnapRunnableImpl.access$000(SnapRunnableImpl.java:105)\n	at com.snaplogic.cc.snap.common.SnapRunnableImpl$1.run(SnapRunnableImpl.java:330)\n	at com.snaplogic.cc.snap.common.SnapRunnableImpl$1.run(SnapRunnableImpl.java:326)\n	at java.security.AccessController.doPrivileged(Native Method)\n	at javax.security.auth.Subject.doAs(Subject.java:422)\n	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)\n	at com.snaplogic.cc.snap.common.SnapRunnableImpl.run(SnapRunnableImpl.java:325)\n	at com.snaplogic.snap.threadpool.SnapExecutorService$SnapRunnableWrapper.run(SnapExecutorService.java:86)\n	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n	at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n	at java.lang.Thread.run(Thread.java:745)\n"
"reason": "The pipeline path changed between documents when reuse was enabled"
"resolution": "Change the pipeline expression so that the same value is returned for every input document"
                "snap_details":  {label:Pipeline Execute} 
"label": "Pipeline Execute"

                "original":  {RUNID:3716be8b-aa2b-4158-be89-792302c2a570, PIPELINENAME:CheckRunID, PAYLOAD:{pipelineName=P_NewExceptionTest}, ERRORTYPE:Technical, ERRORTIMESTAMP:2017-09-07T08:19:05.883Z, EXTRAP...} 
"RUNID": "3716be8b-aa2b-4158-be89-792302c2a570"
"PIPELINENAME": "CheckRunID"
"PAYLOAD": "{pipelineName=P_NewExceptionTest}"
"ERRORTYPE": "Technical"
"ERRORTIMESTAMP": "2017-09-07T08:19:05.883Z"
"EXTRAPARAMETERS": "NONE"

Awaiting for your inputs.

You have to be careful with the reuse checkbox.

From documentation:

Parameters

Pipeline parameter values can only be changed if this flag is not enabled. In other words, reusable executions cannot have different pipeline parameter values for different documents.

Pipeline name

When this property is an expression, the Snap will need to contact the SnapLogic cloud servers to load the pipeline information for execution. Also, if reuse is enabled, the result of the expression cannot change between documents.

As you can see from the error, it has a problem with the expression value being changed when the reuse is enabled.

This is happening for the 4th document in your case.

โ€œreasonโ€: โ€œThe pipeline path changed between documents when reuse was enabledโ€
โ€œresolutionโ€: โ€œChange the pipeline expression so that the same value is returned for every input documentโ€
โ€œsnap_detailsโ€: {label:Pipeline Execute}
โ€œlabelโ€: โ€œPipeline Executeโ€

What is your use case here? What are you trying to achieve?