Merge multiple documents to an array in ultra pipeline

We need to merge multiple documents to single array and looking for help. Since it is ultra task, we are running out of options…

Input data:

[
{“Record”: “1”},
{“Record”: “2”},
{“Record”: “5”},
{“Record”: “4”},
{“Record”: “6”}
]

Expected Output:
[ “response”: [{“Record”: “1”}, {“Record”: “2”}, {“Record”: “5”}, {“Record”: “4”}, {“Record”: “6”}]
]

Hello @thilkip,

You can try and use Group by N snap. Set the group size to 0 and this is what you get:

image

Regards,
Bojan

yes, but Group by is not supported in Ultra tasks

Sorry, I missed that information.

Hi @thilkip ,
Use Pipeline Execute with the option Reuse executions to process documents set to disabled. Inside this pipeline put your input data and use group by n. Something like this:
image
image

We tried with script as below. it works in design mode but not when we call ultra tasks. When we call ultra tasks the script snap doesnt provide any output

execute : function () {
this.log.info(“Executing Transform Script”);
var outDoc = new LinkedHashMap();
var inDoc=null;
var items = new ArrayList();
var i=0;

	while (this.input.hasNext()) 
	{
		try {
			// Read the next input document, store it a new LinkedHashMap, and write this as an output document.
			// We must use a serializable Java type liked LinkedHashMap for each output instead of a native
			// JavaScript object so that downstream Snaps like Copy can process it correctly.
			inDoc = this.input.next();
			i=i+1;
			items.add(inDoc);

			outDoc.put("index", i);         
			
			if ((!this.input.hasNext()) || (this.input.hasNext()==false))
			{   
				outDoc.put("data", items);
				this.output.write(inDoc, outDoc);
			}
			
		}
		catch (err) {
			var errDoc = new LinkedHashMap();
			errDoc.put("error", err);
			this.log.error(err);
			this.error.write(errDoc);
		}
	}
     
    this.log.info("Script executed");

yes. this will work but we dont want to call child as it is a ultra task and wanted to have ms response time

Use this script, it groups all inputs into single array:

execute : function () {
   this.log.info("Executing Transform Script");
    var groupByN = new ArrayList();
    var wrapper = {};
    while (this.input.hasNext()) {
        try {
            // Read the next input document, store it a new LinkedHashMap, and write this as an output document.
            // We must use a serializable Java type liked LinkedHashMap for each output instead of a native
            // JavaScript object so that downstream Snaps like Copy can process it correctly.
            var inDoc = this.input.next();

            groupByN.add(inDoc);
            wrapper = {"Data": groupByN};
        }
        catch (err) {
            var errDoc = new LinkedHashMap();
            errDoc.put("error", err);
            this.log.error(err);
            this.error.write(errDoc);
        }
    }
    this.output.write(wrapper);
    this.log.info("Script executed");
}

I tested it with ultra task and it seems it works.

A triggered task has more latency than ultra, but doesn’t have any restriction on types of Snaps. Is the performance suitable for your use case?

not really, that is the challenge we are running into…looking to get the millisecond response from api using ultra task

Thanks. but please see below, 40 documents went in and 0 out. if we put the write inside the while loop, it will write 40 documents in this case but we need the 40th document as the final result. if there is a way to take the last document, that will also help

@thilkip - what is generating the input documents?

a SQL snap gets the data from tables

SQL Server 2016 (13.x) and later has a “FOR JSON” clause:

You should be able to return the result set exactly how you want.

the data coming from DB2

I don’t have a DB2 instance to test with at the moment, but it appears you could do the same thing with different syntax:

https://www.ibm.com/docs/en/i/7.3?topic=functions-json-array

It should be noted that I’m recommending that you push the creation of the array back to the source database because you are trying to prevent calling a child pipeline, which is the recommended pattern for aggregating data in an Ultra pipeline. This is required because Ultra is a streaming service of one record in, one record out. So any type of function that aggregates data (including Sort, Group By *, etc) will be stuck waiting on end of upstream input which will never come because of the nature of the data flow in Ultra. This is also why your Script snap isn’t working the way you were expecting - it’s waiting on the end of input, which never comes in an Ultra pipeline.

1 Like

Hi again @thilkip,

This task of yours was very intriguing, so I dedicated quite sometime experimenting and I found a solution that works in development mode, with snaps supported by Ultra Task. Here is the pipeline:

Documents_to_Array_Format.slp (7.3 KB)

Here is the final result:

image

I am pretty curious about this one, can you please send feedback on the pipeline’s behaviour?
In addition I am sending you a Python script, that is also tested and works. I know that you allready tried with a script but I am offering some options. Thanks,

Doc_to_Array.txt (1.6 KB)

Best Regards,
Bojan

Thank you for your time and help. We did validate the script in ultra task and saw the same behavior like earlier.

We will test the other approach using Formatter snap and let you know.

Thank you again

The documentation on the Fixed Width Formatter being Ultra compatible is incorrect and we have filed a defect on it. I just tested an Ultra task with the Fixed Width Formatter and received an “incompatible” error.

Suggested approach is still to:

  1. Use a child pipeline to query the database and aggregate
    or
  2. Have the database return the JSON Array

One thing to keep in mind with Ultra - the latency to start a child pipeline is extremely low since the pipeline metadata is cached locally so there is no communication to the control plane to start the process. It is pretty close to having the snaps contained within the main pipeline. Have you tried this approach and tested timing?

I did a quick sample that consumes the request and starts a child that generates several documents and uses a Gate to accumulate them to an array and passes the document back to the parent for return to the client. Total execution is under half a second based on “curl” stats.

I compared this with a pipeline that simply generates an array using a JSON Generator (no child) and total execution time is over half a second… actually longer than with the child pipeline!

I’ll continue to monitor this post. Please keep me updated on your solution!

  • Kory K