Kafka consumer Skip messages when fail

Hi ,

We pull the data from Kafka and put it and the database, but we realized the Kafka consumer skips the data/offsets if the pipeline fails.
for example: in a run Kafka consumer is supposed to read offset 3,4,5 but it pipeline fails so it skips these offsets in the next run.

I tried using Kafka acknowledge snap after data inserted in the database it always time out.
anybody has any solution

Please share more details about everything. How is your Consumer snap configured? Please show a screenshot of the settings. Does it have an error view? What exactly is failing – a snap downstream from the Consumer? Using the Acknowledge snap is probably what you need here, but it may be necessary to adjust the configuration of the Consumer, so please share.

Hi @ptaylor,

I would like to implement the solution like if the pipeline fails we should not lose the data. in my case, if the Kafka Consumer read the data and later on any downstream snap fails during execution it skips those Offsets.
we do not want to skip any data, I read about Kafka Aknowlage but seems like I am not able to use it correctly.
adding the sample snap I am using,
I changed the topic name account information etc, rest the properties are the same.

NP new pipeline 1_2022_02_19 (1).slp (7.0 KB)

Thanks for attaching the pipeline, which looks like this:

The Consumer’s settings actually look fine. However, for the Kafka Acknowledge to work, it needs the full metadata object from the output of the Kafka Consumer. In the Mapper after your Consumer, you’re passing through only the metadata.offset, so the Acknowledge doesn’t have what it needs to tell the Consumer which record is being acknowledged. So that’s why the Consumer times out. I’m sure you’re seeing an error on the Acknowledge as well, correct?

But adjusting your pipeline may not be as straightforward as you’d like. If you modify the Mapper to pass through the full metadata and then that becomes the input to the Snowflake Insert, then the insert will probably either fail or you’ll end up inserting data you don’t really want or need into Snowflake. Ideally, the Snowflake Insert would have a setting that allows you to select a JSON path to specify just the subset of the input document that you want to insert. Unfortunately, it looks like that snap doesn’t have such a setting. So the trick that many customers use in this sort of scenario involves using the Copy and Join snaps, which looks something like this:

This allows the full metadata from the Consumer output to make it all the way to the Acknowledge while bypassing the Insert. Make sense? Note that it’s easiest if you configure the Join with a Join type of Merge.

Also, I noticed something about your Mapper that you may want to consider. You’re only mapping the metadata.offset. Keep in mind that in Kafka, an offset is relative to a specific partition. So if your intention is to store enough information to uniquely identify the Kafka record corresponding to the record you’re inserting into Snowflake, the offset alone isn’t enough, unless your topic only has a single partition, which is not likely or recommended for real data. You would also need to map and store the metadata.partition. The combined offset + partition allows you to uniquely identify a single record in a given topic.

Hope that helps.

2 Likes

Hi,
Yes, I was not adding metadata in the mapper, I did insert metadata in snowflake and then Acknowledge it started to work, but as you mentioned this is the unnecessary information to same on the table, I tried the method you mentioned, copying the Kafka Consumer and join in before Acknowledge using MERGE join without any key
or with key: partition and offset but it does not work it give me error like:

Failure: Unable to acknowledge message: Test-2:25, Reason: Acknowledgement will succeed only if the Consumer that produced the document runs during the same validation., Resolution: Hold Shift key when clicking Validate to force all snaps to run.

I am missing some trick, Attached Pipeline

NP new pipeline 9_2022_02_20.slp (9.6 KB)

and one more follow-up question, we need to pull a lot of data maybe millions every day, I think acknowledge is a bit slow even a batch “process acknowledge method”.
should we put all Kafka messages to S3 drive and then read from there, so even if we skip any data we could go back to file and reprocess.
please let me know what you think.

The Acknowledge is failing because the metadata is present in the input document, but it’s not in the default location under the document root ($metadata) because of how the Join combines and restructures the data from its inputs. Try validating the pipeline, then preview the Join’s output to note where the full metadata is located within the document. Then open the Acknowledge snap, click the suggest button for the Metadata Path setting, and select the location of the metadata.

Also, note the advice in the error message about holding the Shift key when you click the Validate icon. That will force all snaps to run instead of relying on cached preview data from previous validations for snaps that you haven’t edited. This is important for the way the Consumer and Acknowledge snaps interact.

As for performance, the bottleneck in your pipeline is the fact that you’re inserting one record at a time into Snowflake. You’ll have far better performance with data warehouses like Snowflake if you do bulk loading (inserting many records in one operation). Frankly, I’m not really familiar with our Snowflake snaps, but I think Bulk Load or Bulk Upsert are better suited for your use case. Check our documentation for those snaps and if you still have questions, ask them here in the Community in a new post.

However, right now your Kafka Consumer snap is configured with Acknowledge Mode = Wait after each record, which means the the Consumer will output a single document, then wait for the Acknowledge snap to ack that document before it outputs the next record. Obviously that’s incompatible with the requirements of a bulk loading snap. (You also have Message Count set to 1, but I’m guessing that was for debugging purposes and you’ll set it back to the default, -1.)

Fortunately, the Kafka Consumer snap has a lot of flexibility to deal with such scenarios. At a minimum, you’ll need to change Acknowledge Mode to Wait after each batch of records. This lets the Consumer output many records at a time, then wait for all of those records to be acknowledged before asking the Kafka broker for more records to process. In your case, you’ll probably also need to change the Output Mode to One output document per batch and then use the Pipeline Execute snap to process each batch in a child pipeline. You would put the Snowflake bulk loading snap in the child pipeline; each execution of the child pipeline would process one batch of records received from Kafka. That will vastly improve your performance.

You can find an article I wrote about this to get a much better idea of how this works here:

Hope this helps.

1 Like

I remembered a few more things I wanted to say.

For performance, you also want to take advantage of Kafka’s ability to automatically distribute partitions to all of the different Consumer instances in the same consumer group. You would do this by running multiple instances of the same pipeline, typically one per node. So if your plex has 4 nodes, you would run one instance of your pipeline on each of those 4 nodes. Assuming the topic your processing has multiple partitions (and it certainly should if you care about performance), and you’ve left the Partition setting blank in the Consumer snap’s settings, Kafka will automatically assign the partitions across the different instances in the same group (having the same Group ID value). So if your topic has 24 partitions, each of your 4 nodes will get 6 partitions each. If one node is temporarily removed, Kafka will automatically rebalance the partitions so that each of the 3 remaining nodes gets 8 partitions. This is called horizontal scaling and is the key to reliable high performance in distributed computing.

I don’t think that copying data to S3 really solves anything. It’s just adding more overhead to a system that you’re trying to optimize. A well-managed Kafka cluster designed for production loads (multiple nodes with elastic disks, etc) is a very reliable place to keep data. Data in Kafka topics can be read and re-read any number of times for any number of different applications, provided you’ve configured the retention policy for your topics appropriately.

Once you’ve reconfigured your pipelines as discussed in my last reply, and then run a sufficient number of instances of that pipeline in parallel, I think you’ll find that you have the reliable, high-throughput solution that you’re looking for.

1 Like

Hi @ptaylor,

Appreciate you in depth response.

I agree I should use “Snowflake bulk Load” snap with Kafka Consumer`s Batch Mode option, I usually Validate pipeline and then I select the $variable this ensures correctness, but the pipeline did not work last time, I re-validated and it start to working.
snowflake insert

So then I replace snowflake insert to Snowflake bulk insert, but then again it started to timeout

So, I thought Bulk insert might work with “Kafka Consumer`s” Batch acknowledge property but it does not work, it gives Time Out.

I think I am missing something to add, can you please see the properties what I am missing here and please add pipeline as well if you are trying it out.

Attaching all 3 pipeline, I added Extra mappers etc for debug purpose only.

Bulk insert with Batch not Working Pipeline_2022_02_22.slp (17.3 KB)
Bulk insert not Working Pipeline_2022_02_22.slp (12.1 KB)
Working Pipiline_2022_02_22.slp (11.4 KB)

1 Like

Yes, with pipeline Execute snap, it started to work and perform very well.

Just one follow up question earlier we wanted to pull the data from different Kafka Topics and joining all together in one pipeline and inserting in one table, now looks like it’s not possible, now we need to create different pipelines for all topics, need to push the data into the tables and then create one more pipeline to join all data and put into the final table?

Ok, I’m glad that you were able to get it working well with a single topic.

Performing joins on streaming data in real time is a very advanced subject. To discuss it in detail would require much more information about your use cases and is not a discussion I can really get into here in this forum. I would consider whether it might make sense to read the data into separate Snowflake tables and then use Snowflake to do the joins. If you need true streaming functionality like windowed joins then you might look at KsqlDB or Kafka Streams. It might be possible to do the joins in SnapLogic pipelines but that can get very tricky with real-time streams that don’t end, as our Join is designed for finite input streams. One thing to consider is a hybrid approach where you use KsqlDB to do the joins of the separate Kafka topics, which will produce a new topic containing the joined data. Then use our Kafka Consumer snap to read that topic and insert into Snowflake.

1 Like

just a follow up, I think if you use merge join the bottom pipeline(input 1) won`t wait insert to finish it would just send the acknowledge, it is correct?

Because when a Validate the pipeline it skips that row and mark it as acknowledged, even without inserting.

It merges input 1 with input 0. Did input 0 have any documents?

Yes, Input0 is having the “$message” output from “Snowflake Bulk Load” Snap.
“Snowflake Bulk Load” is Execute Only -

image

So, if the insert has not been executed so it should not pass the message from input0 and join should not perform.
The “Join” snap`s properties look like-

Am I missing something here, for reference I am attaching both parent and child pipelines:

Parent-test_2022_03_17.slp (4.3 KB)

test_2022_03_17.slp (14.0 KB)

A merge join will merge available documents from both inputs but if one input provides fewer documents than the other (or no documents) it will just pass through the additional documents to the output without merging them with anything. Try changing the join type to Inner and select the left path and right path to join on a matching field in the input documents.

I think there is no direct way to “Inner join” both Input as the Snowflake Bulk Load snap does not passthrough what is inserted or there are no way to pass through the parent snap`s documents.
I think for now “Merge Join” is the closest to verifying the data inserted into the Snowflake or not, if I am afraid about Validating a pipeline would “Skip” some data so I should change the “Snap Execution” of Kafka Aknowladge snap to Execute only.

let me know if you think the same.

Yes, just change the Ack’s mode to Execute Only.

1 Like