Loading Data in chunks in Apache NiFi(pagination)

Apache NiFi lets us querying whole tables using pagination. But what if we have a custom query that we can’t put in a table?

Ben Yaakobi
6 min readMay 31, 2021

This was the question I had to answer when I was assigned to pull data from a MySQL database, push it to another database and then perform an aggregation on that other database. Normally, I could just use ExecuteSQL, but in this case — the returned data is far too large and it can cause an OutOfMemoryException. So I could have just used pagination using GenerateTableFetch but unfortunately it can only work on actual tables and not custom queries. A temporary table is not an option since I don’t have write permissions to the source database nor can I keep the current session open.

So how did I manage to pull the data anyway, and how did I wait until all of the pages have been successfully inserted to the other database? Well, I used the help of the limit clause of MySQL.

Executing The Query

At first, I configured the initiative GenerateFlowFile processor of the flow to generate one FlowFile on the primary node every configured time(to pull the data again) and set it to offset 0 and the wanted limit.

Then, I’ve set it to send the FlowFile to an ExecuteSQL query to execute my query:

The custom query gets injected using the parameter context

The Pagination Logic

I’ve created a process group to handle the logic of the pagination:

So what do we have here? The FlowFile results from the ExecuteSQL processor are transferred into the process group using the input port. Then we have the RouteOnAttribute processor that checks if the result count is smaller than the set limit:

That way, if we reached the last page, we would get less than the limit we set. If we have already done then we would get a zero results FlowFile so it would still work. But we now send the FlowFiles to three different processors and the relationships are even duplicated. Let us review them:

The first one, “unmatched” is sent to IncreaseOffset which increases the offset(and we will review it later also). It happens when we haven’t reached the last page yet so we want to increase the offset and query the next page until we reach the last one.

Then, we have the “true,unmatched” which is sent to output port called “output”. Even if we haven’t reached the last page yet(and even if we did), we’ll still want to use the pulled data. That’s why we duplicate the FlowFile no matter the result of “Is count < limit?” to “output”.

Lastly, we have “true” which is sent to “finish_signal”. When we do finish to process all of the query(meaning — we reached the last page), we want to know that somehow, so when we get a result that’s smaller than the limit — we duplicate it to “finish_signal”.

So, if we duplicate the FlowFiles into all of these output ports, doesn’t that mean that it would take a lot more disk space? No! Only the reference to the content is duplicated, not the content itself. The only way it will take up more space is if we transform the data and modify the content(then it would take a different content claim that would take up more space).
So now that we kicked that out of our way, let us review the last processor of the process group:

The “IncreaseOffset” processor(which is just UpdateAttribute processor) is configured to increase the offset by the limit so we can the take the next page using our current FlowFile. Next, it is sent to “next_page”.

So what’s next? Let’s see the flow:

Notify & Insert

So we have here two Notify processors. One is notifying the end of the pagination process and the other counts how many pages it has encountered.

The “Notify Paging Finished” is quite simple — just increase the counter by one(from zero):

The “Count Number of Fragments” decreases the counter of the pages by one. Then, after we’ve finished processing the current FlowFile, we would increase it by one and once we’ve hit zero we’ll know that we finished processing the FlowFiles(in combination of the finishing signal).

Now, let’s move on to the “Insert All” process group:

So we first insert the pages into the other database. Then, we increase the counter of the pages by 1. Now, up next in my flow, I run a few queries but I want to run them exactly once so I only let the initial FlowFile to pass through. To identify it, I use the offset:

there is only one FlowFile with the offset set to 0

Then, I wait for the pagination finished signal. If I’d wait for the pages counter first, then it would just pass through since while the next page is being processed, our FlowFile might have already done being inserted to the other database so it would just pass through. Even if we made it wait for the finishing signal after the wait for the pages insertion, the pagination could be finished but the insertion not — so we wait for the signal and then we can know for sure that all pages have passed through the first counter and we can wait for the second counter to be finished.

At last, we would want to reset the counters for our next execution:

The ReplaceText processor changes the content of the FlowFile to an empty JSON:

The PutDistributedMapCache processor resets the signals in the cache:

And now we have implemented pagination process although Apache NiFi doesn’t have a dedicated processor for this.

An example template can be found here.

--

--