cancel
Showing results for 
Search instead for 
Did you mean: 

Kafka consumer Skip messages when fail

neeraj_sharma
New Contributor II

Hi ,

We pull the data from Kafka and put it and the database, but we realized the Kafka consumer skips the data/offsets if the pipeline fails.
for example: in a run Kafka consumer is supposed to read offset 3,4,5 but it pipeline fails so it skips these offsets in the next run.

I tried using Kafka acknowledge snap after data inserted in the database it always time out.
anybody has any solution

1 ACCEPTED SOLUTION

View solution in original post

16 REPLIES 16

Ok, I’m glad that you were able to get it working well with a single topic.

Performing joins on streaming data in real time is a very advanced subject. To discuss it in detail would require much more information about your use cases and is not a discussion I can really get into here in this forum. I would consider whether it might make sense to read the data into separate Snowflake tables and then use Snowflake to do the joins. If you need true streaming functionality like windowed joins then you might look at KsqlDB or Kafka Streams. It might be possible to do the joins in SnapLogic pipelines but that can get very tricky with real-time streams that don’t end, as our Join is designed for finite input streams. One thing to consider is a hybrid approach where you use KsqlDB to do the joins of the separate Kafka topics, which will produce a new topic containing the joined data. Then use our Kafka Consumer snap to read that topic and insert into Snowflake.

neeraj_sharma
New Contributor II

just a follow up, I think if you use merge join the bottom pipeline(input 1) won`t wait insert to finish it would just send the acknowledge, it is correct?

Because when a Validate the pipeline it skips that row and mark it as acknowledged, even without inserting.

It merges input 1 with input 0. Did input 0 have any documents?

Yes, Input0 is having the “$message” output from “Snowflake Bulk Load” Snap.
“Snowflake Bulk Load” is Execute Only -

image

So, if the insert has not been executed so it should not pass the message from input0 and join should not perform.
The “Join” snap`s properties look like-
image

Am I missing something here, for reference I am attaching both parent and child pipelines:

Parent-test_2022_03_17.slp (4.3 KB)

test_2022_03_17.slp (14.0 KB)

A merge join will merge available documents from both inputs but if one input provides fewer documents than the other (or no documents) it will just pass through the additional documents to the output without merging them with anything. Try changing the join type to Inner and select the left path and right path to join on a matching field in the input documents.