Using NiFi to react to DNS changes

As I’ve already stated in previous articles before, Apache NiFi has a lot to offer. This time, I’ll review the DNS part of it.

I’ve been given a task — to use CaptureChangeMySQL to track delete operations in certain tables in certain MySQL DB using the binlog. The problem is that we don’t use GTID in our database. This means that if we use multiple servers and it switches to next server, it will continue from the same offset, even if it’s a different server. So if this thing does happen, we might miss a few deletions.

The general solution to these problems in my job is to use a hostname that change its IP address in accordance with the most accurate server to master.
For example, if we have servers: 192.168.1.1, 192.168.1.2, 192.168.1.3 all replicate from Master, but one is considered more accurate than the others. Or even better — one is working while the others don’t. Additionally, we have the hostname testing-db.com for our replicators.

Now let’s say that the IP 192.168.1.2 is registered for our hostname and 192.168.1.1 is the one that works. Our service will then change the IP registered in our DNS server to 192.168.1.1. But now we need to catch that change and reset our CaptureChangeMySQL state and status.

So how can we track DNS changes?

well, it isn’t that complicated. Apache is supplying a processor named QueryDNS which sends a query to the DNS server with requested type and retrieves the answer.

So what is this request type?

Well, it’s a bit misleading since it’s not a request type but rather the returned record type. There are a few commonly used record types, for example:

  • IPv4 Address Mapping Record(A) — which stores a hostname and its corresponding IPv4 address
  • IPv6 Address Mapping Record(AAAA) — which stores a hostname and its corresponding IPv6 address
  • Certificate Record(CERT) — which stores encryption certificates
  • etc..

In our case, we require the IPv4 Address Mapping Record. That’s why we’ll set DNS Query Type to be A.

So now we have our query type. Since we store our hostname and port in a parameter context in a parameter called src.server, we shall call it and take off the port. So if we have testing-db.com:3306, we would like to extract only testing-db.com which is simple using the expression language: ${#{src.server}:substringBefore(‘:’)} .

The QueryDNS configuration

The processor will then produce an attribute named enrich.dns.record0.group0 which contains the current IPv4 address of the hostname.

Let’s use UpdateAttribute to put it in a more indicative attribute like currentIP.
Now, we need to compare it to the old IP address. We can use cache for that using FetchDistributedMapCache and PutDistrubtedMapCache which can be based on Redis, Cassandra and more. I decided to use a local server hosted by NiFi using a Controller Service.

We’ll query the last known IP from cache using FetchDistributedMapCache and if we found anything, compare it to the new known IP address. If they match, we do nothing. If we haven’t found anything in cache we will store the known IP in cache. If we found and they don’t match, we’ll initiate a process that will eventually end up storing the new known IP in cache instead of the old one.

The process of fetching the last known IP and comparing it with the current known IP

So what if the current IP does not match the last known IP?

It’s a bit more complicated than the flow so far but it’s not that complicated. That’s where NiFi’s Rest API comes in handy. To update a processor’s state, we need to provide its current version. To make sure you are aware of the processor’s current state & configuration, NiFi is requiring a proof provided by the current known version of the processor.

So before we even stop the processor, we need to query it, extract its revision and then send a request to stop the processor. When we send a successful request to stop the processor, the API shall return the new status of the processor which we could extract the new version from and use it to start the processor later.

the entire flow of stopping the processor
configuration for UpdateState ReplicateMySQL- I stored the processor id in #{replicate.id} and used a default value of 8080 for the instance’s port

Now, to make sure it’s absolutely down and we can reset its state, we need to terminate its threads which is DELETE request to /processors/${processor.id}/threads.

After clearing everything, you can start the processor and store the new known IP in cache.

The whole flow

So overall, we queried the DNS server, compared the IP to the stored one, fetched the processor, stopped the processor, cleared its threads & state, started it and stored the new IP back in our cache! Easy!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store