NiFi & Scheduling
How does NiFi decide to schedule processors? How does NiFi not fail when a processor does? What are “yield” and “penalty”? How can NiFi be so fast with all of these processors running and not overloading the system?
NiFi allows configuration of number of threads that a processor will run(“Concurrent Tasks”). In fact, what would run is “onTrigger” function in the Processor interface that needs to be prepared to run on several threads and be thread safe as well. Of course there are exceptions, marked with “@TriggerSerially” annotation, and need to be run on 1 thread only(like ListHDFS that writes to cache which is based on ZooKeeper and slow on writes).
So, if I set “Concurrent Tasks” to be 100, will my processor be run on 100 threads? Not necessarily. NiFi has a configuration property named “Maximum Timer Driven Thread Count”. This property defines how many concurrent tasks can be run at once, at global scale. Meaning, that if we set this property to be 1, and we have two processors that are running on 1 concurrent tasks. Each of them will have to wait in a worker queue while the other one is running until it can be run too.
So basically, we can use “Maximum Timer Driven Thread Count” to control how many threads will run on our NiFi cluster(per node) and that way reduce system overloading. However, like everything, it also has a disadvantage: If a thread gets stuck, then it will forever consume this turn and it won’t be available for the other processors. The meaning of this is that if we have set the property to be 2 threads and now this processor is stuck, then the actual total available threads will be 1 and not 2. For example: an ExecuteSQL that runs a query that takes 1 week to complete will consume this turn for 1 whole week. Of course, in version 1.7.0 Apache solved it by adding “interrupt” option, but it still need to be used manually.
NiFi also has a scheduling mechanism. CRON(which based on crontab expression) or timer driven(which based on N TIME_UNIT). So you can schedule a processor to run every X time. But what would happen if we scheduled our processors to run every 0 seconds(as defined by default)? Would it be running all the time, consuming a lot of our resources? The short answer is NO.
NiFi has a property(that exists since version 0.0.2 but the logic existed before) that makes the processor yield and basically prevents it from running for 10 ms(by default). That way, idle processors consume much less resources. The property is configurable in nifi.properties under the name “nifi.bored.yield.duration”.
But what is this yield? It has several usages. Let us examine the administrative yield first. Administrative yield is used when a processor throws an unexpected exception. NiFi catches for us all of the exceptions, but if a processor fails to catch an exception, NiFi assumes that it is because it has encountered a bug and it might flood NiFi with exceptions. So NiFi, in return makes it yield for 30 seconds(by default, but configurable under the name “nifi.administrative.yield.duration” in nifi.properties) and prevents it from running for 30 seconds.
A yield can be triggered by a processor too. When a processor assumes that it should wait and try processing FlowFiles again later, it yields. The yield duration is configured by the user. For example: PutFTP fails to connect to server but assumes that it will be up again in a few seconds. Then it yields and stop processing FlowFiles until the yield duration is over. Then, it tries again.
But wait, what is “Penalty Duration”? Penalty is given to a FlowFile when a processor encountered a problem regarding this FlowFile that the processor believes that it might be resolved by itself after the penalty duration is over. So this specific FlowFile will be ignored for the penalty duration and not be processed until the time passes.
In conclusion, NiFi allows us to control our failures and scheduling easily and has a lot of neat features!