Design Patterns in NiFi

Design Patterns are important even in Apache NiFi. In this article, we shall review two design patterns and the use of Funnel.

Apache NiFi is a DataFlow Management tool. It basically lets you design code without writing it, only config it. Every good developer knows that a good code has good design patterns to reuse or make the code more readable and easy to maintain — but in NiFi, it can even add new functionality!

Let us review an example.

Say we have the following simple flow:

This flow consumes another Kafka cluster’s messages and publish them to another Kafka cluster. So what if one message fails? Well, it would retry indefinitely. But what if we want it to retry 3 times at max and then log if fails again? Since NiFi 1.9.0 we can simply use RetryFlowFile but before that — A simple retry design pattern can help us here.

I’ve created the following process group:

Let’s review what’s in it: First, we create a maximum retries variable in the variable registry and set it to 3.

Next, we’ll review the UpdateAttribute processor and use its Advanced tab.

Advanced tab is used for advanced update logic based on rules

We have two rules: one to reset the retry count to 1 in case we don’t have a retry count or want to pass a previously exceeded processed FlowFile through the retry counter again, and the other rule is to increase the retry count for a previously processed FlowFile.

The retry count would be set to 1 in case its counter is null or greater than max count
The retry count would be increased in case the retry count is little than the max count and not null

Now, for the RouteOnAttribute we only one rule: if it exceeded max retries count, then match. Otherwise, go to unmatched. Accordingly, we move unmatched FlowFiles to retry output port and matched FlowFiles to failure output port.

Now, let’s put it into use:

Retry output port goes back to PublishKafka and failure output port goes to LogMessage

Now we have a fully working retry mechanism even without RetryFlowFile.

You can find the full example in my repository, as a template.

Now, for our next design pattern — Say want to get files in a certain directory and do whatever with them. Our instinct would tell us to use GetFile, right? But what if we have a cluster and we want to use it all? Of course, the naive solution would be to just set GetFile’s execution policy to Primary Node but then it could lead to the primary node transferring some pretty big files over the network to the other nodes and it could slow the process down. Luckily, Apache has a solution for that:

Almost every file-processors bundle has a List & Fetch processors. Instead of using GetFile which lists the files and fetches them, we separate the process to two steps: the first one, List, executes on the Primary Node and the other one, Fetch, executes on all nodes. ListFile fetches the files’ metadata as FlowFiles, uses load balance to distribute the files between the nodes and then each node receive and fetches the file. ListFile also uses cluster based state management to store the last timestamp it queried so even if the primary node switches, it doesn’t matter!

While FetchFile is configured like so(even by default):

Many asked me about the usage of a funnel. It basically just shoots out FlowFiles as soon as it receives them, so how can it be useful?

I think that the most common use case for the Funnel is when you have a lot of processors(or even just two or more) that transfer to the same destination. For example, we have the following flow:

Each of the GenerateFlowFile processors are transferring their FlowFiles to funnel and then to PublishKafka. Now, let’s say we want to add a common attribute to all of them. If we didn’t have the funnel, we would have to replace the destination of all these queues to UpdateAttribute one by one. Instead, we can just change the destination of the funnel.

Let’s take the original example for another use case. let’s say we want that the FlowFiles would be published to Kafka in order of their creation. It would be hard doing it having each GenerateFlowFile has its own queue to PublishKafka. But it’s totally possible using a funnel — Just add a prioritizer to the one queue between the funnel and PublishKafka and viola! Every FlowFile transfers to this queue and then orderly transferred to PublishKafka.

In conclusion, design patterns in NiFi can save us a lot of pain and even save us custom processors to maintain! It’s always good to use the original Apache processors.