Forum Discussion
Hi @ptaylor,
I would like to implement the solution like if the pipeline fails we should not lose the data. in my case, if the Kafka Consumer read the data and later on any downstream snap fails during execution it skips those Offsets.
we do not want to skip any data, I read about Kafka Aknowlage but seems like I am not able to use it correctly.
adding the sample snap I am using,
I changed the topic name account information etc, rest the properties are the same.
NP new pipeline 1_2022_02_19 (1).slp (7.0 KB)
Thanks for attaching the pipeline, which looks like this:
The Consumer’s settings actually look fine. However, for the Kafka Acknowledge to work, it needs the full metadata
object from the output of the Kafka Consumer. In the Mapper after your Consumer, you’re passing through only the metadata.offset
, so the Acknowledge doesn’t have what it needs to tell the Consumer which record is being acknowledged. So that’s why the Consumer times out. I’m sure you’re seeing an error on the Acknowledge as well, correct?
But adjusting your pipeline may not be as straightforward as you’d like. If you modify the Mapper to pass through the full metadata
and then that becomes the input to the Snowflake Insert, then the insert will probably either fail or you’ll end up inserting data you don’t really want or need into Snowflake. Ideally, the Snowflake Insert would have a setting that allows you to select a JSON path to specify just the subset of the input document that you want to insert. Unfortunately, it looks like that snap doesn’t have such a setting. So the trick that many customers use in this sort of scenario involves using the Copy and Join snaps, which looks something like this:
This allows the full metadata from the Consumer output to make it all the way to the Acknowledge while bypassing the Insert. Make sense? Note that it’s easiest if you configure the Join with a Join type
of Merge
.
Also, I noticed something about your Mapper that you may want to consider. You’re only mapping the metadata.offset
. Keep in mind that in Kafka, an offset is relative to a specific partition. So if your intention is to store enough information to uniquely identify the Kafka record corresponding to the record you’re inserting into Snowflake, the offset alone isn’t enough, unless your topic only has a single partition, which is not likely or recommended for real data. You would also need to map and store the metadata.partition
. The combined offset + partition allows you to uniquely identify a single record in a given topic.
Hope that helps.