Forum Discussion
Hi,
Yes, I was not adding metadata in the mapper, I did insert metadata in snowflake and then Acknowledge it started to work, but as you mentioned this is the unnecessary information to same on the table, I tried the method you mentioned, copying the Kafka Consumer and join in before Acknowledge using MERGE join without any key
or with key: partition and offset but it does not work it give me error like:
Failure: Unable to acknowledge message: Test-2:25, Reason: Acknowledgement will succeed only if the Consumer that produced the document runs during the same validation., Resolution: Hold Shift key when clicking Validate to force all snaps to run.
I am missing some trick, Attached Pipeline
NP new pipeline 9_2022_02_20.slp (9.6 KB)
and one more follow-up question, we need to pull a lot of data maybe millions every day, I think acknowledge is a bit slow even a batch “process acknowledge method”.
should we put all Kafka messages to S3 drive and then read from there, so even if we skip any data we could go back to file and reprocess.
please let me know what you think.
- ptaylor4 years agoEmployee
The Acknowledge is failing because the
metadata
is present in the input document, but it’s not in the default location under the document root ($metadata
) because of how the Join combines and restructures the data from its inputs. Try validating the pipeline, then preview the Join’s output to note where the fullmetadata
is located within the document. Then open the Acknowledge snap, click the suggest button for theMetadata Path
setting, and select the location of themetadata
.Also, note the advice in the error message about holding the Shift key when you click the Validate icon. That will force all snaps to run instead of relying on cached preview data from previous validations for snaps that you haven’t edited. This is important for the way the Consumer and Acknowledge snaps interact.
As for performance, the bottleneck in your pipeline is the fact that you’re inserting one record at a time into Snowflake. You’ll have far better performance with data warehouses like Snowflake if you do bulk loading (inserting many records in one operation). Frankly, I’m not really familiar with our Snowflake snaps, but I think Bulk Load or Bulk Upsert are better suited for your use case. Check our documentation for those snaps and if you still have questions, ask them here in the Community in a new post.
However, right now your Kafka Consumer snap is configured with
Acknowledge Mode
=Wait after each record
, which means the the Consumer will output a single document, then wait for the Acknowledge snap to ack that document before it outputs the next record. Obviously that’s incompatible with the requirements of a bulk loading snap. (You also haveMessage Count
set to 1, but I’m guessing that was for debugging purposes and you’ll set it back to the default, -1.)Fortunately, the Kafka Consumer snap has a lot of flexibility to deal with such scenarios. At a minimum, you’ll need to change
Acknowledge Mode
toWait after each batch of records
. This lets the Consumer output many records at a time, then wait for all of those records to be acknowledged before asking the Kafka broker for more records to process. In your case, you’ll probably also need to change theOutput Mode
toOne output document per batch
and then use the Pipeline Execute snap to process each batch in a child pipeline. You would put the Snowflake bulk loading snap in the child pipeline; each execution of the child pipeline would process one batch of records received from Kafka. That will vastly improve your performance.You can find an article I wrote about this to get a much better idea of how this works here:
Hope this helps.