02-18-2022 01:10 AM
Hi ,
We pull the data from Kafka and put it and the database, but we realized the Kafka consumer skips the data/offsets if the pipeline fails.
for example: in a run Kafka consumer is supposed to read offset 3,4,5 but it pipeline fails so it skips these offsets in the next run.
I tried using Kafka acknowledge snap after data inserted in the database it always time out.
anybody has any solution
Solved! Go to Solution.
02-22-2022 07:44 AM
02-23-2022 08:08 AM
Ok, I’m glad that you were able to get it working well with a single topic.
Performing joins on streaming data in real time is a very advanced subject. To discuss it in detail would require much more information about your use cases and is not a discussion I can really get into here in this forum. I would consider whether it might make sense to read the data into separate Snowflake tables and then use Snowflake to do the joins. If you need true streaming functionality like windowed joins then you might look at KsqlDB or Kafka Streams. It might be possible to do the joins in SnapLogic pipelines but that can get very tricky with real-time streams that don’t end, as our Join is designed for finite input streams. One thing to consider is a hybrid approach where you use KsqlDB to do the joins of the separate Kafka topics, which will produce a new topic containing the joined data. Then use our Kafka Consumer snap to read that topic and insert into Snowflake.
03-16-2022 07:02 AM
just a follow up, I think if you use merge join the bottom pipeline(input 1) won`t wait insert to finish it would just send the acknowledge, it is correct?
Because when a Validate the pipeline it skips that row and mark it as acknowledged, even without inserting.
03-16-2022 05:15 PM
It merges input 1 with input 0. Did input 0 have any documents?
03-17-2022 02:35 AM
Yes, Input0 is having the “$message” output from “Snowflake Bulk Load” Snap.
“Snowflake Bulk Load” is Execute Only -
So, if the insert has not been executed so it should not pass the message from input0 and join should not perform.
The “Join” snap`s properties look like-
Am I missing something here, for reference I am attaching both parent and child pipelines:
Parent-test_2022_03_17.slp (4.3 KB)
test_2022_03_17.slp (14.0 KB)
03-19-2022 10:41 AM
A merge join will merge available documents from both inputs but if one input provides fewer documents than the other (or no documents) it will just pass through the additional documents to the output without merging them with anything. Try changing the join type to Inner and select the left path and right path to join on a matching field in the input documents.