cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Kafka consumer Skip messages when fail

neeraj_sharma
New Contributor II

Hi ,

We pull the data from Kafka and put it and the database, but we realized the Kafka consumer skips the data/offsets if the pipeline fails.
for example: in a run Kafka consumer is supposed to read offset 3,4,5 but it pipeline fails so it skips these offsets in the next run.

I tried using Kafka acknowledge snap after data inserted in the database it always time out.
anybody has any solution

1 ACCEPTED SOLUTION

View solution in original post

16 REPLIES 16

ptaylor
Employee
Employee

Please share more details about everything. How is your Consumer snap configured? Please show a screenshot of the settings. Does it have an error view? What exactly is failing โ€“ a snap downstream from the Consumer? Using the Acknowledge snap is probably what you need here, but it may be necessary to adjust the configuration of the Consumer, so please share.

neeraj_sharma
New Contributor II

Hi @ptaylor,

I would like to implement the solution like if the pipeline fails we should not lose the data. in my case, if the Kafka Consumer read the data and later on any downstream snap fails during execution it skips those Offsets.
we do not want to skip any data, I read about Kafka Aknowlage but seems like I am not able to use it correctly.
adding the sample snap I am using,
I changed the topic name account information etc, rest the properties are the same.

NP new pipeline 1_2022_02_19 (1).slp (7.0 KB)

Thanks for attaching the pipeline, which looks like this:

image

The Consumerโ€™s settings actually look fine. However, for the Kafka Acknowledge to work, it needs the full metadata object from the output of the Kafka Consumer. In the Mapper after your Consumer, youโ€™re passing through only the metadata.offset, so the Acknowledge doesnโ€™t have what it needs to tell the Consumer which record is being acknowledged. So thatโ€™s why the Consumer times out. Iโ€™m sure youโ€™re seeing an error on the Acknowledge as well, correct?

But adjusting your pipeline may not be as straightforward as youโ€™d like. If you modify the Mapper to pass through the full metadata and then that becomes the input to the Snowflake Insert, then the insert will probably either fail or youโ€™ll end up inserting data you donโ€™t really want or need into Snowflake. Ideally, the Snowflake Insert would have a setting that allows you to select a JSON path to specify just the subset of the input document that you want to insert. Unfortunately, it looks like that snap doesnโ€™t have such a setting. So the trick that many customers use in this sort of scenario involves using the Copy and Join snaps, which looks something like this:
image

This allows the full metadata from the Consumer output to make it all the way to the Acknowledge while bypassing the Insert. Make sense? Note that itโ€™s easiest if you configure the Join with a Join type of Merge.

Also, I noticed something about your Mapper that you may want to consider. Youโ€™re only mapping the metadata.offset. Keep in mind that in Kafka, an offset is relative to a specific partition. So if your intention is to store enough information to uniquely identify the Kafka record corresponding to the record youโ€™re inserting into Snowflake, the offset alone isnโ€™t enough, unless your topic only has a single partition, which is not likely or recommended for real data. You would also need to map and store the metadata.partition. The combined offset + partition allows you to uniquely identify a single record in a given topic.

Hope that helps.

neeraj_sharma
New Contributor II

Hi,
Yes, I was not adding metadata in the mapper, I did insert metadata in snowflake and then Acknowledge it started to work, but as you mentioned this is the unnecessary information to same on the table, I tried the method you mentioned, copying the Kafka Consumer and join in before Acknowledge using MERGE join without any key
or with key: partition and offset but it does not work it give me error like:

Failure: Unable to acknowledge message: Test-2:25, Reason: Acknowledgement will succeed only if the Consumer that produced the document runs during the same validation., Resolution: Hold Shift key when clicking Validate to force all snaps to run.

I am missing some trick, Attached Pipeline

NP new pipeline 9_2022_02_20.slp (9.6 KB)

and one more follow-up question, we need to pull a lot of data maybe millions every day, I think acknowledge is a bit slow even a batch โ€œprocess acknowledge methodโ€.
should we put all Kafka messages to S3 drive and then read from there, so even if we skip any data we could go back to file and reprocess.
please let me know what you think.