Building a NiFi 2.x Cluster Kubernetes Operator with Python - From Basics to Production
Step-by-step implementation of a Kubernetes Operator for managing Apache NiFi 2.x clusters using Python (kopf). Covers CRD design, cluster auto-configuration, scaling, TLS, Flow deployment, and monitoring across 3 skill levels.
Apache NiFi 2.x removed the ZooKeeper dependency and introduced its own Raft-based clustering. In this post, we implement a Kubernetes Operator in Python that automatically manages NiFi 2.x clusters, structured across 3 skill levels.
Level 1: Basics — Operator Concepts and Single NiFi Node
1. What Is a Kubernetes Operator
The Operator Pattern
A Kubernetes Operator is software that automates operational tasks previously performed by humans. You declare the desired state using a CRD (Custom Resource Definition), and the Controller reconciles the actual state to match the desired state.
[Operator Pattern]
User declares: Operator performs:
┌─────────────────┐ ┌──────────────────────────────┐
│ NiFiCluster CR │ │ Controller (Python) │
│ │ ─────────→ │ │
│ replicas: 3 │ Watch/ │ 1. Create 3 StatefulSets │
│ version: 2.4.0 │ Reconcile │ 2. Generate config files │
│ tls: true │ │ 3. Issue certificates │
└─────────────────┘ │ 4. Verify cluster join │
│ 5. Auto-recover on failure │
└──────────────────────────────┘
Why NiFi Needs an Operator
| Manual Management Issues | Operator Solutions |
|---|---|
| Manually editing nifi.properties when adding nodes | Automatic configuration by simply changing node count |
| Manual TLS certificate creation/renewal | Automated via cert-manager integration |
| Risk of data loss during scale down | Automated safe offboarding |
| Full restart required on config changes | Automated rolling restart |
| Manual recovery of failed nodes | Automatic detection + recovery |
| Manual Flow deployment management | GitOps-based automated deployment |
2. NiFi 2.x Clustering Architecture
NiFi 1.x → 2.x Changes
| Item | NiFi 1.x | NiFi 2.x |
|---|---|---|
| Cluster Coordination | ZooKeeper required | Built-in Raft-based (ZK removed) |
| Node Discovery | ZooKeeper | Initial node list (nifi.properties) |
| Primary Node Election | ZooKeeper | Built-in election mechanism |
| Flow Synchronization | ZooKeeper | Managed directly by Cluster Coordinator |
| Dependencies | NiFi + ZooKeeper (3–5 nodes) | NiFi only |
| Operational Complexity | High (ZK management required) | Low |
Node Discovery in Kubernetes
In NiFi 2.x, instead of ZooKeeper, you directly specify an initial node list. By leveraging Kubernetes Headless Services and DNS, nodes can be discovered automatically.
[K8s-based NiFi Cluster Architecture]
Headless Service: nifi-cluster-headless (ClusterIP: None)
│
├─ nifi-0.nifi-cluster-headless.nifi.svc.cluster.local
├─ nifi-1.nifi-cluster-headless.nifi.svc.cluster.local
└─ nifi-2.nifi-cluster-headless.nifi.svc.cluster.local
nifi.properties (each node):
nifi.cluster.is.node=true
nifi.cluster.node.address=nifi-{ordinal}.nifi-cluster-headless.{namespace}.svc.cluster.local
nifi.cluster.node.protocol.port=11443
nifi.cluster.flow.election.max.candidates=3
3. Setting Up the Development Environment
Prerequisites
# Python 3.11+
python --version
# kubectl
kubectl version --client
# kind (local K8s cluster)
kind create cluster --name nifi-dev
# Install Python dependencies
pip install kopf kubernetes pyyaml jinja2 httpxIntroduction to kopf
kopf (Kubernetes Operator Pythonic Framework) is a framework that makes it easy to develop Kubernetes Operators in Python.
import kopf
@kopf.on.create('nificlusters')
def create_fn(spec, name, namespace, **kwargs):
"""Called when a NiFiCluster resource is created"""
print(f"NiFi cluster '{name}' creation requested: {spec}")
@kopf.on.update('nificlusters')
def update_fn(spec, name, namespace, diff, **kwargs):
"""Called when a NiFiCluster resource is modified"""
print(f"NiFi cluster '{name}' changed: {diff}")
@kopf.on.delete('nificlusters')
def delete_fn(name, namespace, **kwargs):
"""Called when a NiFiCluster resource is deleted"""
print(f"NiFi cluster '{name}' deleted")Project Structure
nifi-operator/
├── operator/
│ ├── __init__.py
│ ├── main.py # kopf handlers (entry point)
│ ├── controllers/
│ │ ├── __init__.py
│ │ ├── cluster.py # NiFiCluster controller
│ │ ├── flow.py # NiFiFlow controller
│ │ └── scaling.py # Scaling logic
│ ├── resources/
│ │ ├── __init__.py
│ │ ├── statefulset.py # StatefulSet creation
│ │ ├── service.py # Service creation
│ │ ├── configmap.py # ConfigMap creation
│ │ └── certificate.py # TLS certificates
│ ├── nifi_api/
│ │ ├── __init__.py
│ │ └── client.py # NiFi REST API client
│ └── templates/
│ ├── nifi.properties.j2
│ ├── bootstrap.conf.j2
│ └── authorizers.xml.j2
├── deploy/
│ ├── crds/
│ │ └── nificluster-crd.yaml
│ ├── rbac.yaml
│ └── operator-deployment.yaml
├── Dockerfile
├── requirements.txt
└── README.md
4. CRD (Custom Resource Definition) Design
NiFiCluster CRD
# deploy/crds/nificluster-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: nificlusters.datadynamics.io
spec:
group: datadynamics.io
names:
kind: NiFiCluster
plural: nificlusters
singular: nificluster
shortNames: ["nifi"]
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required: ["replicas", "version"]
properties:
replicas:
type: integer
minimum: 1
maximum: 20
description: "Number of NiFi cluster nodes"
version:
type: string
description: "NiFi Docker image version"
image:
type: string
default: "apache/nifi"
description: "NiFi Docker image"
resources:
type: object
properties:
requests:
type: object
properties:
cpu: { type: string, default: "1" }
memory: { type: string, default: "2Gi" }
limits:
type: object
properties:
cpu: { type: string, default: "2" }
memory: { type: string, default: "4Gi" }
storage:
type: object
properties:
size: { type: string, default: "10Gi" }
storageClassName: { type: string }
config:
type: object
properties:
jvmHeapSize: { type: string, default: "1g" }
maxThreads: { type: integer, default: 10 }
sensitivePropsKey: { type: string }
tls:
type: object
properties:
enabled: { type: boolean, default: false }
issuerRef:
type: object
properties:
name: { type: string }
kind: { type: string, default: "ClusterIssuer" }
auth:
type: object
properties:
type: { type: string, enum: ["single-user", "ldap", "oidc"] }
ldap:
type: object
properties:
url: { type: string }
searchBase: { type: string }
searchFilter: { type: string }
oidc:
type: object
properties:
discoveryUrl: { type: string }
clientId: { type: string }
clientSecretRef: { type: string }
status:
type: object
properties:
phase:
type: string
enum: ["Creating", "Running", "Scaling", "Updating", "Error"]
readyNodes:
type: integer
clusterCoordinator:
type: string
message:
type: string
subresources:
status: {}
additionalPrinterColumns:
- name: Replicas
type: integer
jsonPath: .spec.replicas
- name: Ready
type: integer
jsonPath: .status.readyNodes
- name: Phase
type: string
jsonPath: .status.phase
- name: Version
type: string
jsonPath: .spec.version
- name: Age
type: date
jsonPath: .metadata.creationTimestampNiFiCluster CR Example
# examples/nifi-cluster.yaml
apiVersion: datadynamics.io/v1alpha1
kind: NiFiCluster
metadata:
name: my-nifi
namespace: nifi
spec:
replicas: 3
version: "2.4.0"
image: "apache/nifi"
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
storage:
size: "20Gi"
storageClassName: "standard"
config:
jvmHeapSize: "1536m"
maxThreads: 15
sensitivePropsKey: "my-secret-key-12345"
tls:
enabled: true
issuerRef:
name: letsencrypt-prod
kind: ClusterIssuer
auth:
type: single-user# Register CRD and create CR
kubectl apply -f deploy/crds/nificluster-crd.yaml
kubectl apply -f examples/nifi-cluster.yaml
# Verify
kubectl get nifi
# NAME REPLICAS READY PHASE VERSION AGE
# my-nifi 3 3 Running 2.4.0 5m5. First Operator: Deploying a Single NiFi Node
Core Handler Implementation
# operator/main.py
import kopf
import kubernetes
from kubernetes import client
from operator.resources.statefulset import build_statefulset
from operator.resources.service import build_headless_service, build_ui_service
from operator.resources.configmap import build_nifi_config
kubernetes.config.load_incluster_config() # Inside cluster
# kubernetes.config.load_kube_config() # Local development
apps_v1 = client.AppsV1Api()
core_v1 = client.CoreV1Api()
@kopf.on.create('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_create(spec, name, namespace, logger, patch, **kwargs):
"""Provision resources when a NiFiCluster is created"""
logger.info(f"Creating NiFi cluster '{name}' with {spec['replicas']} nodes")
# Update status
patch.status['phase'] = 'Creating'
patch.status['readyNodes'] = 0
# 1. Create ConfigMap (nifi.properties, etc.)
configmap = build_nifi_config(name, namespace, spec)
kopf.adopt(configmap) # Set Owner Reference
core_v1.create_namespaced_config_map(namespace, configmap)
logger.info(f"ConfigMap '{configmap.metadata.name}' created")
# 2. Create Headless Service (for node discovery)
headless_svc = build_headless_service(name, namespace)
kopf.adopt(headless_svc)
core_v1.create_namespaced_service(namespace, headless_svc)
logger.info(f"Headless Service created")
# 3. Create UI Service (for external access)
ui_svc = build_ui_service(name, namespace, spec)
kopf.adopt(ui_svc)
core_v1.create_namespaced_service(namespace, ui_svc)
logger.info(f"UI Service created")
# 4. Create StatefulSet
statefulset = build_statefulset(name, namespace, spec)
kopf.adopt(statefulset)
apps_v1.create_namespaced_stateful_set(namespace, statefulset)
logger.info(f"StatefulSet created with {spec['replicas']} replicas")
patch.status['phase'] = 'Running'
return {'message': f"NiFi cluster '{name}' created successfully"}
@kopf.on.delete('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_delete(name, namespace, logger, **kwargs):
"""NiFiCluster deletion — automatically deleted via Owner References"""
logger.info(f"NiFi cluster '{name}' is being deleted (cascade)")
# Owner References set by kopf.adopt() ensure
# StatefulSet, Service, and ConfigMap are automatically deletedStatefulSet Creation Logic
# operator/resources/statefulset.py
from kubernetes import client
def build_statefulset(name: str, namespace: str, spec: dict) -> client.V1StatefulSet:
"""Create NiFi StatefulSet"""
replicas = spec.get('replicas', 1)
version = spec['version']
image = spec.get('image', 'apache/nifi')
resources = spec.get('resources', {})
storage = spec.get('storage', {})
config = spec.get('config', {})
# Container definition
container = client.V1Container(
name='nifi',
image=f"{image}:{version}",
ports=[
client.V1ContainerPort(container_port=8443, name='https'),
client.V1ContainerPort(container_port=11443, name='cluster'),
client.V1ContainerPort(container_port=6342, name='load-balance'),
],
env=[
client.V1EnvVar(name='NIFI_JVM_HEAP_INIT', value=config.get('jvmHeapSize', '1g')),
client.V1EnvVar(name='NIFI_JVM_HEAP_MAX', value=config.get('jvmHeapSize', '1g')),
client.V1EnvVar(
name='NIFI_POD_NAME',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.name')
)
),
client.V1EnvVar(
name='NIFI_POD_NAMESPACE',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace')
)
),
],
resources=client.V1ResourceRequirements(
requests={
'cpu': resources.get('requests', {}).get('cpu', '1'),
'memory': resources.get('requests', {}).get('memory', '2Gi'),
},
limits={
'cpu': resources.get('limits', {}).get('cpu', '2'),
'memory': resources.get('limits', {}).get('memory', '4Gi'),
}
),
volume_mounts=[
client.V1VolumeMount(name='data', mount_path='/opt/nifi/nifi-current/data'),
client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/nifi.properties',
sub_path='nifi.properties'),
client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/bootstrap.conf',
sub_path='bootstrap.conf'),
],
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path='/nifi-api/controller/cluster',
port=8443,
scheme='HTTPS'
),
initial_delay_seconds=90,
period_seconds=20,
failure_threshold=10,
),
liveness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path='/nifi-api/system-diagnostics',
port=8443,
scheme='HTTPS'
),
initial_delay_seconds=120,
period_seconds=30,
failure_threshold=5,
),
)
# Init Container: substitute Pod name in nifi.properties
init_container = client.V1Container(
name='init-config',
image='busybox:1.36',
command=['sh', '-c', '''
cp /config-template/nifi.properties /config-output/nifi.properties
sed -i "s/NIFI_POD_NAME/$NIFI_POD_NAME/g" /config-output/nifi.properties
sed -i "s/NIFI_POD_NAMESPACE/$NIFI_POD_NAMESPACE/g" /config-output/nifi.properties
'''],
env=[
client.V1EnvVar(name='NIFI_POD_NAME',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.name'))),
client.V1EnvVar(name='NIFI_POD_NAMESPACE',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace'))),
],
volume_mounts=[
client.V1VolumeMount(name='config-template', mount_path='/config-template'),
client.V1VolumeMount(name='config', mount_path='/config-output'),
]
)
# PVC Template
pvc_template = client.V1PersistentVolumeClaim(
metadata=client.V1ObjectMeta(name='data'),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=['ReadWriteOnce'],
storage_class_name=storage.get('storageClassName'),
resources=client.V1VolumeResourceRequirements(
requests={'storage': storage.get('size', '10Gi')}
)
)
)
return client.V1StatefulSet(
metadata=client.V1ObjectMeta(name=f"{name}-nifi", namespace=namespace),
spec=client.V1StatefulSetSpec(
replicas=replicas,
service_name=f"{name}-headless",
pod_management_policy='OrderedReady',
selector=client.V1LabelSelector(
match_labels={'app': 'nifi', 'cluster': name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={'app': 'nifi', 'cluster': name}
),
spec=client.V1PodSpec(
init_containers=[init_container],
containers=[container],
volumes=[
client.V1Volume(
name='config-template',
config_map=client.V1ConfigMapVolumeSource(
name=f"{name}-config"
)
),
client.V1Volume(
name='config',
empty_dir=client.V1EmptyDirVolumeSource()
),
],
termination_grace_period_seconds=60,
)
),
volume_claim_templates=[pvc_template],
)
)Service Creation Logic
# operator/resources/service.py
from kubernetes import client
def build_headless_service(name: str, namespace: str) -> client.V1Service:
"""Headless Service (for node discovery)"""
return client.V1Service(
metadata=client.V1ObjectMeta(
name=f"{name}-headless",
namespace=namespace,
),
spec=client.V1ServiceSpec(
cluster_ip='None', # Headless
selector={'app': 'nifi', 'cluster': name},
ports=[
client.V1ServicePort(name='https', port=8443, target_port=8443),
client.V1ServicePort(name='cluster', port=11443, target_port=11443),
client.V1ServicePort(name='load-balance', port=6342, target_port=6342),
],
publish_not_ready_addresses=True, # Register in DNS even before startup
)
)
def build_ui_service(name: str, namespace: str, spec: dict) -> client.V1Service:
"""UI Service (for web UI access)"""
return client.V1Service(
metadata=client.V1ObjectMeta(
name=f"{name}-ui",
namespace=namespace,
),
spec=client.V1ServiceSpec(
type='ClusterIP',
selector={'app': 'nifi', 'cluster': name},
ports=[
client.V1ServicePort(name='https', port=8443, target_port=8443),
]
)
)ConfigMap Creation (nifi.properties)
# operator/resources/configmap.py
from kubernetes import client
from jinja2 import Template
NIFI_PROPERTIES_TEMPLATE = """
# NiFi 2.x Properties (Generated by Operator)
# Web Properties
nifi.web.https.host=0.0.0.0
nifi.web.https.port=8443
nifi.web.proxy.host=
# Cluster Properties
nifi.cluster.is.node=true
nifi.cluster.node.address=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.node.protocol.port=11443
nifi.cluster.node.protocol.max.threads={{ max_threads }}
nifi.cluster.flow.election.max.wait.time=1 min
nifi.cluster.flow.election.max.candidates={{ replicas }}
# Load Balancing
nifi.cluster.load.balance.host=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.load.balance.port=6342
# Initial Node Identities (Kubernetes DNS-based)
{% for i in range(replicas) %}
nifi.cluster.node.{{ i + 1 }}={{ name }}-nifi-{{ i }}.{{ headless_service }}.{{ namespace }}.svc.cluster.local:11443
{% endfor %}
# State Management
nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.provider.cluster=local-provider
# Security Properties
nifi.sensitive.props.key={{ sensitive_props_key }}
nifi.security.autoreload.enabled=true
# Performance
nifi.bored.yield.duration=10 millis
nifi.queue.backpressure.count=10000
nifi.queue.backpressure.size=1 GB
# Data Directories
nifi.flowfile.repository.directory=./data/flowfile_repository
nifi.content.repository.directory.default=./data/content_repository
nifi.provenance.repository.directory.default=./data/provenance_repository
nifi.database.directory=./data/database_repository
"""
def build_nifi_config(name: str, namespace: str, spec: dict) -> client.V1ConfigMap:
"""Create NiFi configuration ConfigMap"""
replicas = spec.get('replicas', 1)
config = spec.get('config', {})
headless_service = f"{name}-headless"
template = Template(NIFI_PROPERTIES_TEMPLATE)
nifi_properties = template.render(
name=name,
namespace=namespace,
headless_service=headless_service,
replicas=replicas,
max_threads=config.get('maxThreads', 10),
sensitive_props_key=config.get('sensitivePropsKey', 'default-key-change-me'),
)
bootstrap_conf = f"""
# Bootstrap Config (Generated by Operator)
java.arg.2=-Xms{config.get('jvmHeapSize', '1g')}
java.arg.3=-Xmx{config.get('jvmHeapSize', '1g')}
java.arg.14=-Djava.protocol.handler.pkgs=sun.net.www.protocol
"""
return client.V1ConfigMap(
metadata=client.V1ObjectMeta(
name=f"{name}-config",
namespace=namespace,
labels={'app': 'nifi', 'cluster': name}
),
data={
'nifi.properties': nifi_properties,
'bootstrap.conf': bootstrap_conf,
}
)Verification
# 1. Register CRD
kubectl apply -f deploy/crds/nificluster-crd.yaml
# 2. Run Operator locally (development mode)
kopf run operator/main.py --verbose
# 3. Create NiFi cluster in another terminal
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
# 4. Check status
kubectl get nifi -n nifi
kubectl get pods -n nifi
kubectl get svc -n nifi
# 5. Access NiFi UI
kubectl port-forward svc/my-nifi-ui -n nifi 8443:8443
# Visit https://localhost:8443/nifiLevel 2: Intermediate — Cluster Management and Configuration Automation
6. NiFi 2.x Cluster Auto-Configuration
Cluster Bootstrap Order Control
When initially configuring a NiFi 2.x cluster, the node startup order is critical. The first node must be elected as the Cluster Coordinator, and the remaining nodes join this coordinator.
# operator/controllers/cluster.py
import asyncio
import httpx
async def wait_for_cluster_ready(name: str, namespace: str, replicas: int, logger):
"""Wait until the cluster is fully formed"""
headless = f"{name}-headless"
coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
for attempt in range(60): # Wait up to 10 minutes
try:
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
if resp.status_code == 200:
cluster_info = resp.json()
connected = sum(
1 for n in cluster_info['cluster']['nodes']
if n['status'] == 'CONNECTED'
)
logger.info(f"Cluster nodes connected: {connected}/{replicas}")
if connected >= replicas:
return True
except Exception as e:
logger.debug(f"Waiting for cluster... ({attempt}/60): {e}")
await asyncio.sleep(10)
return FalseStatus Monitoring Timer
# operator/main.py
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def monitor_cluster(spec, name, namespace, patch, logger, **kwargs):
"""Check cluster status every 30 seconds"""
try:
headless = f"{name}-headless"
coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
if resp.status_code == 200:
cluster = resp.json()['cluster']
connected = sum(1 for n in cluster['nodes'] if n['status'] == 'CONNECTED')
coordinator = next(
(n['address'] for n in cluster['nodes']
if 'PRIMARY' in n.get('roles', [])),
'unknown'
)
patch.status['readyNodes'] = connected
patch.status['clusterCoordinator'] = coordinator
patch.status['phase'] = 'Running'
else:
patch.status['phase'] = 'Error'
patch.status['message'] = f"API returned {resp.status_code}"
except Exception as e:
logger.warning(f"Health check failed: {e}")
patch.status['phase'] = 'Error'
patch.status['message'] = str(e)7. Scaling (Scale Up/Down)
Update Handler
@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.replicas')
async def on_replicas_change(spec, name, namespace, old, new, logger, patch, **kwargs):
"""Detect replicas change → perform scaling"""
old_replicas = old
new_replicas = new
logger.info(f"Scaling '{name}': {old_replicas} → {new_replicas}")
patch.status['phase'] = 'Scaling'
if new_replicas > old_replicas:
await scale_up(name, namespace, old_replicas, new_replicas, spec, logger)
elif new_replicas < old_replicas:
await scale_down(name, namespace, old_replicas, new_replicas, logger)
# Update ConfigMap (node list changed)
configmap = build_nifi_config(name, namespace, spec)
core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
# Update StatefulSet replicas
apps_v1.patch_namespaced_stateful_set(
f"{name}-nifi", namespace,
{'spec': {'replicas': new_replicas}}
)
patch.status['phase'] = 'Running'Safe Scale Down
When removing NiFi nodes, you must follow the Offload → Disconnect → Delete sequence.
# operator/controllers/scaling.py
async def scale_down(name: str, namespace: str, old: int, new: int, logger):
"""Safe scale down: remove nodes starting from the highest index"""
headless = f"{name}-headless"
coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
# Nodes to remove (starting from highest index)
for i in range(old - 1, new - 1, -1):
node_address = f"{name}-nifi-{i}.{headless}.{namespace}.svc.cluster.local"
logger.info(f"Offloading node: {node_address}")
# 1. Look up node ID from cluster
resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
cluster = resp.json()['cluster']
node = next(
(n for n in cluster['nodes'] if node_address in n['address']),
None
)
if not node:
logger.warning(f"Node {node_address} not found in cluster")
continue
node_id = node['nodeIdentifier']['id']
# 2. OFFLOADING: migrate data to other nodes
await client.put(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'OFFLOADING'}}
)
logger.info(f"Node {i} offloading started")
# 3. Wait for OFFLOADING to complete
for _ in range(60):
resp = await client.get(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
)
status = resp.json()['node']['status']
if status == 'OFFLOADED':
break
await asyncio.sleep(5)
# 4. DISCONNECTING
await client.put(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'DISCONNECTING'}}
)
logger.info(f"Node {i} disconnected")
# 5. Remove from cluster
await client.delete(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
)
logger.info(f"Node {i} removed from cluster")8. Configuration Change Management
Rolling Restart
@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.config')
async def on_config_change(spec, name, namespace, logger, patch, **kwargs):
"""On config change: update ConfigMap + rolling restart"""
logger.info(f"Config changed for '{name}', performing rolling restart")
patch.status['phase'] = 'Updating'
# 1. Update ConfigMap
configmap = build_nifi_config(name, namespace, spec)
core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
# 2. Trigger rolling restart (via annotation change)
import datetime
restart_annotation = {
'spec': {
'template': {
'metadata': {
'annotations': {
'nifi-operator/restartedAt': datetime.datetime.now().isoformat()
}
}
}
}
}
apps_v1.patch_namespaced_stateful_set(f"{name}-nifi", namespace, restart_annotation)
logger.info("Rolling restart triggered")
# 3. Wait for rolling restart to complete
replicas = spec['replicas']
for _ in range(120):
sts = apps_v1.read_namespaced_stateful_set(f"{name}-nifi", namespace)
if sts.status.ready_replicas == replicas and sts.status.updated_replicas == replicas:
break
await asyncio.sleep(5)
patch.status['phase'] = 'Running'
logger.info("Rolling restart completed")9. TLS/SSL Certificate Automation
Creating cert-manager Certificate CRs
# operator/resources/certificate.py
from kubernetes import client
def build_tls_certificate(name: str, namespace: str, spec: dict, ordinal: int) -> dict:
"""Create cert-manager Certificate resource"""
tls = spec.get('tls', {})
headless = f"{name}-headless"
return {
'apiVersion': 'cert-manager.io/v1',
'kind': 'Certificate',
'metadata': {
'name': f'{name}-nifi-{ordinal}-tls',
'namespace': namespace,
},
'spec': {
'secretName': f'{name}-nifi-{ordinal}-tls',
'issuerRef': {
'name': tls['issuerRef']['name'],
'kind': tls['issuerRef'].get('kind', 'ClusterIssuer'),
},
'commonName': f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
'dnsNames': [
f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc',
f'{name}-nifi-{ordinal}.{headless}',
f'{name}-nifi-{ordinal}',
'localhost',
],
'duration': '8760h', # 1 year
'renewBefore': '720h', # Renew 30 days before expiry
'privateKey': {
'algorithm': 'RSA',
'size': 2048,
},
'keystores': {
'jks': {
'create': True,
'passwordSecretRef': {
'name': f'{name}-keystore-password',
'key': 'password',
}
}
}
}
}# Create certificates when TLS is enabled (add to on_create)
if spec.get('tls', {}).get('enabled', False):
custom_api = client.CustomObjectsApi()
for i in range(spec['replicas']):
cert = build_tls_certificate(name, namespace, spec, i)
custom_api.create_namespaced_custom_object(
group='cert-manager.io', version='v1',
namespace=namespace, plural='certificates', body=cert
)
logger.info(f"TLS certificates created for {spec['replicas']} nodes")Level 3: Advanced — Production-Grade Operator
10. NiFi Flow Deployment Automation
NiFiFlow CRD
# deploy/crds/nififlow-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: nififlows.datadynamics.io
spec:
group: datadynamics.io
names:
kind: NiFiFlow
plural: nififlows
singular: nififlow
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
clusterRef:
type: string
description: "NiFiCluster name"
registryUrl:
type: string
description: "NiFi Registry URL"
bucketId:
type: string
flowId:
type: string
flowVersion:
type: integer
processGroupId:
type: string
default: "root"
autoStart:
type: boolean
default: trueFlow Deployment Handler
@kopf.on.create('datadynamics.io', 'v1alpha1', 'nififlows')
async def on_flow_create(spec, name, namespace, logger, patch, **kwargs):
"""NiFiFlow CR created → deploy Flow to NiFi"""
cluster_name = spec['clusterRef']
headless = f"{cluster_name}-headless"
nifi_url = f"https://{cluster_name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
# 1. Fetch Flow from NiFi Registry
flow_payload = {
'registryId': await get_registry_id(client, nifi_url, spec['registryUrl']),
'bucketId': spec['bucketId'],
'flowId': spec['flowId'],
'version': spec['flowVersion'],
}
# 2. Deploy Flow to Process Group
pg_id = spec.get('processGroupId', 'root')
resp = await client.post(
f"{nifi_url}/nifi-api/process-groups/{pg_id}/process-groups",
json={
'revision': {'version': 0},
'component': {
'position': {'x': 0, 'y': 0},
'versionControlInformation': flow_payload,
}
}
)
if resp.status_code == 201:
deployed_pg = resp.json()
logger.info(f"Flow deployed: {deployed_pg['id']}")
# 3. Auto-start
if spec.get('autoStart', True):
await client.put(
f"{nifi_url}/nifi-api/flow/process-groups/{deployed_pg['id']}",
json={'id': deployed_pg['id'], 'state': 'RUNNING'}
)
logger.info("Flow started")
patch.status['deployedProcessGroupId'] = deployed_pg['id']
patch.status['phase'] = 'Running'
else:
logger.error(f"Flow deployment failed: {resp.text}")
patch.status['phase'] = 'Error'GitOps Workflow
[Git → NiFiFlow CR → Operator → NiFi]
1. Developer commits Flow to NiFi Registry
2. Updates Flow version in NiFiFlow CR YAML
3. Commits + pushes to Git
4. ArgoCD/FluxCD detects CR change → applies to K8s
5. Operator detects NiFiFlow CR change
6. Automatically deploys/updates Flow via NiFi REST API
11. Monitoring and Auto-Recovery
Exposing Prometheus Metrics
# operator/main.py
from prometheus_client import start_http_server, Gauge, Counter
# Metric definitions
nifi_cluster_nodes = Gauge('nifi_cluster_nodes_total', 'Total NiFi nodes', ['cluster'])
nifi_cluster_ready = Gauge('nifi_cluster_nodes_ready', 'Ready NiFi nodes', ['cluster'])
nifi_scaling_events = Counter('nifi_scaling_events_total', 'Scaling events', ['cluster', 'direction'])
nifi_reconcile_errors = Counter('nifi_reconcile_errors_total', 'Reconciliation errors', ['cluster'])
# Start Prometheus metrics server
start_http_server(8080)
# Update metrics in timer
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def update_metrics(spec, name, status, **kwargs):
nifi_cluster_nodes.labels(cluster=name).set(spec.get('replicas', 0))
nifi_cluster_ready.labels(cluster=name).set(status.get('readyNodes', 0))Auto-Recovery
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=60)
async def auto_heal(spec, name, namespace, patch, logger, **kwargs):
"""Automatic detection and recovery of failed nodes"""
try:
headless = f"{name}-headless"
url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get(f"{url}/nifi-api/controller/cluster")
if resp.status_code != 200:
return
cluster = resp.json()['cluster']
for node in cluster['nodes']:
if node['status'] == 'DISCONNECTED':
node_id = node['nodeIdentifier']['id']
node_addr = node['address']
logger.warning(f"Disconnected node detected: {node_addr}")
# Attempt reconnection
await client.put(
f"{url}/nifi-api/controller/cluster/nodes/{node_id}",
json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'CONNECTING'}}
)
logger.info(f"Reconnection initiated for {node_addr}")
except Exception as e:
nifi_reconcile_errors.labels(cluster=name).inc()
logger.error(f"Auto-heal error: {e}")12. LDAP/OIDC Authentication Automation
Auto-Generating Authentication Configuration
def build_auth_config(spec: dict) -> dict:
"""Generate configuration files based on authentication type"""
auth = spec.get('auth', {})
auth_type = auth.get('type', 'single-user')
if auth_type == 'ldap':
login_identity_providers = f"""
<loginIdentityProviders>
<provider>
<identifier>ldap-provider</identifier>
<class>org.apache.nifi.ldap.LdapProvider</class>
<property name="Authentication Strategy">SIMPLE</property>
<property name="Manager DN"></property>
<property name="Manager Password"></property>
<property name="Url">{auth['ldap']['url']}</property>
<property name="User Search Base">{auth['ldap']['searchBase']}</property>
<property name="User Search Filter">{auth['ldap']['searchFilter']}</property>
</provider>
</loginIdentityProviders>"""
return {'login-identity-providers.xml': login_identity_providers}
elif auth_type == 'oidc':
nifi_properties_auth = f"""
nifi.security.user.oidc.discovery.url={auth['oidc']['discoveryUrl']}
nifi.security.user.oidc.client.id={auth['oidc']['clientId']}
nifi.security.user.oidc.client.secret=${{OIDC_CLIENT_SECRET}}
"""
return {'auth.properties': nifi_properties_auth}
return {}13. Operator Deployment and Operations
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY operator/ ./operator/
USER 1000:1000
CMD ["kopf", "run", "operator/main.py", "--all-namespaces", "--verbose"]RBAC
# deploy/rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: nifi-operator
namespace: nifi-operator-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: nifi-operator-role
rules:
- apiGroups: ["datadynamics.io"]
resources: ["nificlusters", "nificlusters/status", "nififlows", "nififlows/status"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["statefulsets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services", "configmaps", "secrets", "pods", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["cert-manager.io"]
resources: ["certificates"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: nifi-operator-binding
subjects:
- kind: ServiceAccount
name: nifi-operator
namespace: nifi-operator-system
roleRef:
kind: ClusterRole
name: nifi-operator-role
apiGroup: rbac.authorization.k8s.ioOperator Deployment
# deploy/operator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nifi-operator
namespace: nifi-operator-system
spec:
replicas: 1
selector:
matchLabels:
app: nifi-operator
template:
metadata:
labels:
app: nifi-operator
spec:
serviceAccountName: nifi-operator
containers:
- name: operator
image: datadynamics/nifi-operator:latest
ports:
- containerPort: 8080
name: metrics
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
env:
- name: OPERATOR_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespaceDeployment Steps
# 1. Create Operator namespace
kubectl create namespace nifi-operator-system
# 2. Register CRDs
kubectl apply -f deploy/crds/
# 3. Configure RBAC
kubectl apply -f deploy/rbac.yaml
# 4. Deploy Operator
kubectl apply -f deploy/operator-deployment.yaml
# 5. Check Operator logs
kubectl logs -f deployment/nifi-operator -n nifi-operator-system
# 6. Create NiFi cluster
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
# 7. Check status
kubectl get nifi -n nifi -wTroubleshooting Guide
| Problem | Cause | Solution |
|---|---|---|
| Pod CrashLoopBackOff | Insufficient JVM memory | Increase spec.config.jvmHeapSize |
| Cluster join failure | DNS resolution failure | Verify Headless Service publishNotReadyAddresses: true |
| Node DISCONNECTED | Network timeout | Increase nifi.cluster.node.connection.timeout |
| Flow Election failure | Startup time difference between nodes | Increase flow.election.max.wait.time |
| TLS handshake failure | Certificate SAN mismatch | Check cert-manager Certificate dnsNames |
| OOM Killed | Container memory limit exceeded | Increase spec.resources.limits.memory |
| PVC Pending | StorageClass not found | Check spec.storage.storageClassName |
| Operator permission error | Insufficient RBAC | Check ClusterRole rules |
Note: This Operator is intended for learning and demonstration purposes. For production use, additional work is required including enhanced error handling, Finalizer support, integration testing, and Helm chart packaging.
References
- Apache NiFi 2.x Documentation — https://nifi.apache.org/docs.html
- NiFi REST API Reference — https://nifi.apache.org/docs/nifi-docs/rest-api/
- kopf (Kubernetes Operator Framework for Python) — https://kopf.readthedocs.io/
- Kubernetes Custom Resources — https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/
- cert-manager Documentation — https://cert-manager.io/docs/
— Data Dynamics Engineering Team