cancel
Showing results for 
Search instead for 
Did you mean: 

Kafka consumer Skip messages when fail

neeraj_sharma
New Contributor II

Hi ,

We pull the data from Kafka and put it and the database, but we realized the Kafka consumer skips the data/offsets if the pipeline fails.
for example: in a run Kafka consumer is supposed to read offset 3,4,5 but it pipeline fails so it skips these offsets in the next run.

I tried using Kafka acknowledge snap after data inserted in the database it always time out.
anybody has any solution

1 ACCEPTED SOLUTION

View solution in original post

16 REPLIES 16

The Acknowledge is failing because the metadata is present in the input document, but it’s not in the default location under the document root ($metadata) because of how the Join combines and restructures the data from its inputs. Try validating the pipeline, then preview the Join’s output to note where the full metadata is located within the document. Then open the Acknowledge snap, click the suggest button for the Metadata Path setting, and select the location of the metadata.

Also, note the advice in the error message about holding the Shift key when you click the Validate icon. That will force all snaps to run instead of relying on cached preview data from previous validations for snaps that you haven’t edited. This is important for the way the Consumer and Acknowledge snaps interact.

As for performance, the bottleneck in your pipeline is the fact that you’re inserting one record at a time into Snowflake. You’ll have far better performance with data warehouses like Snowflake if you do bulk loading (inserting many records in one operation). Frankly, I’m not really familiar with our Snowflake snaps, but I think Bulk Load or Bulk Upsert are better suited for your use case. Check our documentation for those snaps and if you still have questions, ask them here in the Community in a new post.

However, right now your Kafka Consumer snap is configured with Acknowledge Mode = Wait after each record, which means the the Consumer will output a single document, then wait for the Acknowledge snap to ack that document before it outputs the next record. Obviously that’s incompatible with the requirements of a bulk loading snap. (You also have Message Count set to 1, but I’m guessing that was for debugging purposes and you’ll set it back to the default, -1.)

Fortunately, the Kafka Consumer snap has a lot of flexibility to deal with such scenarios. At a minimum, you’ll need to change Acknowledge Mode to Wait after each batch of records. This lets the Consumer output many records at a time, then wait for all of those records to be acknowledged before asking the Kafka broker for more records to process. In your case, you’ll probably also need to change the Output Mode to One output document per batch and then use the Pipeline Execute snap to process each batch in a child pipeline. You would put the Snowflake bulk loading snap in the child pipeline; each execution of the child pipeline would process one batch of records received from Kafka. That will vastly improve your performance.

You can find an article I wrote about this to get a much better idea of how this works here:

Hope this helps.

ptaylor
Employee
Employee

I remembered a few more things I wanted to say.

For performance, you also want to take advantage of Kafka’s ability to automatically distribute partitions to all of the different Consumer instances in the same consumer group. You would do this by running multiple instances of the same pipeline, typically one per node. So if your plex has 4 nodes, you would run one instance of your pipeline on each of those 4 nodes. Assuming the topic your processing has multiple partitions (and it certainly should if you care about performance), and you’ve left the Partition setting blank in the Consumer snap’s settings, Kafka will automatically assign the partitions across the different instances in the same group (having the same Group ID value). So if your topic has 24 partitions, each of your 4 nodes will get 6 partitions each. If one node is temporarily removed, Kafka will automatically rebalance the partitions so that each of the 3 remaining nodes gets 8 partitions. This is called horizontal scaling and is the key to reliable high performance in distributed computing.

I don’t think that copying data to S3 really solves anything. It’s just adding more overhead to a system that you’re trying to optimize. A well-managed Kafka cluster designed for production loads (multiple nodes with elastic disks, etc) is a very reliable place to keep data. Data in Kafka topics can be read and re-read any number of times for any number of different applications, provided you’ve configured the retention policy for your topics appropriately.

Once you’ve reconfigured your pipelines as discussed in my last reply, and then run a sufficient number of instances of that pipeline in parallel, I think you’ll find that you have the reliable, high-throughput solution that you’re looking for.

neeraj_sharma
New Contributor II

Hi @ptaylor,

Appreciate you in depth response.

I agree I should use “Snowflake bulk Load” snap with Kafka Consumer`s Batch Mode option, I usually Validate pipeline and then I select the $variable this ensures correctness, but the pipeline did not work last time, I re-validated and it start to working.
snowflake insert

So then I replace snowflake insert to Snowflake bulk insert, but then again it started to timeout
snowflake bulk insert

So, I thought Bulk insert might work with “Kafka Consumer`s” Batch acknowledge property but it does not work, it gives Time Out.
snowflake bulk insert with Batch MODE

I think I am missing something to add, can you please see the properties what I am missing here and please add pipeline as well if you are trying it out.

Attaching all 3 pipeline, I added Extra mappers etc for debug purpose only.

Bulk insert with Batch not Working Pipeline_2022_02_22.slp (17.3 KB)
Bulk insert not Working Pipeline_2022_02_22.slp (12.1 KB)
Working Pipiline_2022_02_22.slp (11.4 KB)

Yes, with pipeline Execute snap, it started to work and perform very well.

Just one follow up question earlier we wanted to pull the data from different Kafka Topics and joining all together in one pipeline and inserting in one table, now looks like it’s not possible, now we need to create different pipelines for all topics, need to push the data into the tables and then create one more pipeline to join all data and put into the final table?