Blog
nifikubernetesoperatorpythonkopfcrdclusterdevops

Building a NiFi 2.x Cluster Kubernetes Operator with Python - From Basics to Production

Step-by-step implementation of a Kubernetes Operator for managing Apache NiFi 2.x clusters using Python (kopf). Covers CRD design, cluster auto-configuration, scaling, TLS, Flow deployment, and monitoring across 3 skill levels.

Data DynamicsApril 16, 202619 min read

Apache NiFi 2.x removed the ZooKeeper dependency and introduced its own Raft-based clustering. In this post, we implement a Kubernetes Operator in Python that automatically manages NiFi 2.x clusters, structured across 3 skill levels.


Level 1: Basics — Operator Concepts and Single NiFi Node


1. What Is a Kubernetes Operator

The Operator Pattern

A Kubernetes Operator is software that automates operational tasks previously performed by humans. You declare the desired state using a CRD (Custom Resource Definition), and the Controller reconciles the actual state to match the desired state.

[Operator Pattern]

User declares:                     Operator performs:
┌─────────────────┐              ┌──────────────────────────────┐
│ NiFiCluster CR  │              │ Controller (Python)           │
│                 │  ─────────→  │                               │
│ replicas: 3     │  Watch/      │ 1. Create 3 StatefulSets      │
│ version: 2.4.0  │  Reconcile   │ 2. Generate config files      │
│ tls: true       │              │ 3. Issue certificates         │
└─────────────────┘              │ 4. Verify cluster join        │
                                 │ 5. Auto-recover on failure    │
                                 └──────────────────────────────┘

Why NiFi Needs an Operator

Manual Management IssuesOperator Solutions
Manually editing nifi.properties when adding nodesAutomatic configuration by simply changing node count
Manual TLS certificate creation/renewalAutomated via cert-manager integration
Risk of data loss during scale downAutomated safe offboarding
Full restart required on config changesAutomated rolling restart
Manual recovery of failed nodesAutomatic detection + recovery
Manual Flow deployment managementGitOps-based automated deployment

2. NiFi 2.x Clustering Architecture

NiFi 1.x → 2.x Changes

ItemNiFi 1.xNiFi 2.x
Cluster CoordinationZooKeeper requiredBuilt-in Raft-based (ZK removed)
Node DiscoveryZooKeeperInitial node list (nifi.properties)
Primary Node ElectionZooKeeperBuilt-in election mechanism
Flow SynchronizationZooKeeperManaged directly by Cluster Coordinator
DependenciesNiFi + ZooKeeper (3–5 nodes)NiFi only
Operational ComplexityHigh (ZK management required)Low

Node Discovery in Kubernetes

In NiFi 2.x, instead of ZooKeeper, you directly specify an initial node list. By leveraging Kubernetes Headless Services and DNS, nodes can be discovered automatically.

[K8s-based NiFi Cluster Architecture]

Headless Service: nifi-cluster-headless (ClusterIP: None)
  │
  ├─ nifi-0.nifi-cluster-headless.nifi.svc.cluster.local
  ├─ nifi-1.nifi-cluster-headless.nifi.svc.cluster.local
  └─ nifi-2.nifi-cluster-headless.nifi.svc.cluster.local

nifi.properties (each node):
  nifi.cluster.is.node=true
  nifi.cluster.node.address=nifi-{ordinal}.nifi-cluster-headless.{namespace}.svc.cluster.local
  nifi.cluster.node.protocol.port=11443
  nifi.cluster.flow.election.max.candidates=3

3. Setting Up the Development Environment

Prerequisites

# Python 3.11+
python --version
 
# kubectl
kubectl version --client
 
# kind (local K8s cluster)
kind create cluster --name nifi-dev
 
# Install Python dependencies
pip install kopf kubernetes pyyaml jinja2 httpx

Introduction to kopf

kopf (Kubernetes Operator Pythonic Framework) is a framework that makes it easy to develop Kubernetes Operators in Python.

import kopf
 
@kopf.on.create('nificlusters')
def create_fn(spec, name, namespace, **kwargs):
    """Called when a NiFiCluster resource is created"""
    print(f"NiFi cluster '{name}' creation requested: {spec}")
 
@kopf.on.update('nificlusters')
def update_fn(spec, name, namespace, diff, **kwargs):
    """Called when a NiFiCluster resource is modified"""
    print(f"NiFi cluster '{name}' changed: {diff}")
 
@kopf.on.delete('nificlusters')
def delete_fn(name, namespace, **kwargs):
    """Called when a NiFiCluster resource is deleted"""
    print(f"NiFi cluster '{name}' deleted")

Project Structure

nifi-operator/
├── operator/
│   ├── __init__.py
│   ├── main.py              # kopf handlers (entry point)
│   ├── controllers/
│   │   ├── __init__.py
│   │   ├── cluster.py        # NiFiCluster controller
│   │   ├── flow.py           # NiFiFlow controller
│   │   └── scaling.py        # Scaling logic
│   ├── resources/
│   │   ├── __init__.py
│   │   ├── statefulset.py    # StatefulSet creation
│   │   ├── service.py        # Service creation
│   │   ├── configmap.py      # ConfigMap creation
│   │   └── certificate.py    # TLS certificates
│   ├── nifi_api/
│   │   ├── __init__.py
│   │   └── client.py         # NiFi REST API client
│   └── templates/
│       ├── nifi.properties.j2
│       ├── bootstrap.conf.j2
│       └── authorizers.xml.j2
├── deploy/
│   ├── crds/
│   │   └── nificluster-crd.yaml
│   ├── rbac.yaml
│   └── operator-deployment.yaml
├── Dockerfile
├── requirements.txt
└── README.md

4. CRD (Custom Resource Definition) Design

NiFiCluster CRD

# deploy/crds/nificluster-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: nificlusters.datadynamics.io
spec:
  group: datadynamics.io
  names:
    kind: NiFiCluster
    plural: nificlusters
    singular: nificluster
    shortNames: ["nifi"]
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              required: ["replicas", "version"]
              properties:
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 20
                  description: "Number of NiFi cluster nodes"
                version:
                  type: string
                  description: "NiFi Docker image version"
                image:
                  type: string
                  default: "apache/nifi"
                  description: "NiFi Docker image"
                resources:
                  type: object
                  properties:
                    requests:
                      type: object
                      properties:
                        cpu: { type: string, default: "1" }
                        memory: { type: string, default: "2Gi" }
                    limits:
                      type: object
                      properties:
                        cpu: { type: string, default: "2" }
                        memory: { type: string, default: "4Gi" }
                storage:
                  type: object
                  properties:
                    size: { type: string, default: "10Gi" }
                    storageClassName: { type: string }
                config:
                  type: object
                  properties:
                    jvmHeapSize: { type: string, default: "1g" }
                    maxThreads: { type: integer, default: 10 }
                    sensitivePropsKey: { type: string }
                tls:
                  type: object
                  properties:
                    enabled: { type: boolean, default: false }
                    issuerRef:
                      type: object
                      properties:
                        name: { type: string }
                        kind: { type: string, default: "ClusterIssuer" }
                auth:
                  type: object
                  properties:
                    type: { type: string, enum: ["single-user", "ldap", "oidc"] }
                    ldap:
                      type: object
                      properties:
                        url: { type: string }
                        searchBase: { type: string }
                        searchFilter: { type: string }
                    oidc:
                      type: object
                      properties:
                        discoveryUrl: { type: string }
                        clientId: { type: string }
                        clientSecretRef: { type: string }
            status:
              type: object
              properties:
                phase:
                  type: string
                  enum: ["Creating", "Running", "Scaling", "Updating", "Error"]
                readyNodes:
                  type: integer
                clusterCoordinator:
                  type: string
                message:
                  type: string
      subresources:
        status: {}
      additionalPrinterColumns:
        - name: Replicas
          type: integer
          jsonPath: .spec.replicas
        - name: Ready
          type: integer
          jsonPath: .status.readyNodes
        - name: Phase
          type: string
          jsonPath: .status.phase
        - name: Version
          type: string
          jsonPath: .spec.version
        - name: Age
          type: date
          jsonPath: .metadata.creationTimestamp

NiFiCluster CR Example

# examples/nifi-cluster.yaml
apiVersion: datadynamics.io/v1alpha1
kind: NiFiCluster
metadata:
  name: my-nifi
  namespace: nifi
spec:
  replicas: 3
  version: "2.4.0"
  image: "apache/nifi"
  resources:
    requests:
      cpu: "1"
      memory: "2Gi"
    limits:
      cpu: "2"
      memory: "4Gi"
  storage:
    size: "20Gi"
    storageClassName: "standard"
  config:
    jvmHeapSize: "1536m"
    maxThreads: 15
    sensitivePropsKey: "my-secret-key-12345"
  tls:
    enabled: true
    issuerRef:
      name: letsencrypt-prod
      kind: ClusterIssuer
  auth:
    type: single-user
# Register CRD and create CR
kubectl apply -f deploy/crds/nificluster-crd.yaml
kubectl apply -f examples/nifi-cluster.yaml
 
# Verify
kubectl get nifi
# NAME      REPLICAS   READY   PHASE     VERSION   AGE
# my-nifi   3          3       Running   2.4.0     5m

5. First Operator: Deploying a Single NiFi Node

Core Handler Implementation

# operator/main.py
import kopf
import kubernetes
from kubernetes import client
from operator.resources.statefulset import build_statefulset
from operator.resources.service import build_headless_service, build_ui_service
from operator.resources.configmap import build_nifi_config
 
kubernetes.config.load_incluster_config()  # Inside cluster
# kubernetes.config.load_kube_config()    # Local development
 
apps_v1 = client.AppsV1Api()
core_v1 = client.CoreV1Api()
 
 
@kopf.on.create('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_create(spec, name, namespace, logger, patch, **kwargs):
    """Provision resources when a NiFiCluster is created"""
    logger.info(f"Creating NiFi cluster '{name}' with {spec['replicas']} nodes")
 
    # Update status
    patch.status['phase'] = 'Creating'
    patch.status['readyNodes'] = 0
 
    # 1. Create ConfigMap (nifi.properties, etc.)
    configmap = build_nifi_config(name, namespace, spec)
    kopf.adopt(configmap)  # Set Owner Reference
    core_v1.create_namespaced_config_map(namespace, configmap)
    logger.info(f"ConfigMap '{configmap.metadata.name}' created")
 
    # 2. Create Headless Service (for node discovery)
    headless_svc = build_headless_service(name, namespace)
    kopf.adopt(headless_svc)
    core_v1.create_namespaced_service(namespace, headless_svc)
    logger.info(f"Headless Service created")
 
    # 3. Create UI Service (for external access)
    ui_svc = build_ui_service(name, namespace, spec)
    kopf.adopt(ui_svc)
    core_v1.create_namespaced_service(namespace, ui_svc)
    logger.info(f"UI Service created")
 
    # 4. Create StatefulSet
    statefulset = build_statefulset(name, namespace, spec)
    kopf.adopt(statefulset)
    apps_v1.create_namespaced_stateful_set(namespace, statefulset)
    logger.info(f"StatefulSet created with {spec['replicas']} replicas")
 
    patch.status['phase'] = 'Running'
    return {'message': f"NiFi cluster '{name}' created successfully"}
 
 
@kopf.on.delete('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_delete(name, namespace, logger, **kwargs):
    """NiFiCluster deletion — automatically deleted via Owner References"""
    logger.info(f"NiFi cluster '{name}' is being deleted (cascade)")
    # Owner References set by kopf.adopt() ensure
    # StatefulSet, Service, and ConfigMap are automatically deleted

StatefulSet Creation Logic

# operator/resources/statefulset.py
from kubernetes import client
 
 
def build_statefulset(name: str, namespace: str, spec: dict) -> client.V1StatefulSet:
    """Create NiFi StatefulSet"""
    replicas = spec.get('replicas', 1)
    version = spec['version']
    image = spec.get('image', 'apache/nifi')
    resources = spec.get('resources', {})
    storage = spec.get('storage', {})
    config = spec.get('config', {})
 
    # Container definition
    container = client.V1Container(
        name='nifi',
        image=f"{image}:{version}",
        ports=[
            client.V1ContainerPort(container_port=8443, name='https'),
            client.V1ContainerPort(container_port=11443, name='cluster'),
            client.V1ContainerPort(container_port=6342, name='load-balance'),
        ],
        env=[
            client.V1EnvVar(name='NIFI_JVM_HEAP_INIT', value=config.get('jvmHeapSize', '1g')),
            client.V1EnvVar(name='NIFI_JVM_HEAP_MAX', value=config.get('jvmHeapSize', '1g')),
            client.V1EnvVar(
                name='NIFI_POD_NAME',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.name')
                )
            ),
            client.V1EnvVar(
                name='NIFI_POD_NAMESPACE',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace')
                )
            ),
        ],
        resources=client.V1ResourceRequirements(
            requests={
                'cpu': resources.get('requests', {}).get('cpu', '1'),
                'memory': resources.get('requests', {}).get('memory', '2Gi'),
            },
            limits={
                'cpu': resources.get('limits', {}).get('cpu', '2'),
                'memory': resources.get('limits', {}).get('memory', '4Gi'),
            }
        ),
        volume_mounts=[
            client.V1VolumeMount(name='data', mount_path='/opt/nifi/nifi-current/data'),
            client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/nifi.properties',
                                 sub_path='nifi.properties'),
            client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/bootstrap.conf',
                                 sub_path='bootstrap.conf'),
        ],
        readiness_probe=client.V1Probe(
            http_get=client.V1HTTPGetAction(
                path='/nifi-api/controller/cluster',
                port=8443,
                scheme='HTTPS'
            ),
            initial_delay_seconds=90,
            period_seconds=20,
            failure_threshold=10,
        ),
        liveness_probe=client.V1Probe(
            http_get=client.V1HTTPGetAction(
                path='/nifi-api/system-diagnostics',
                port=8443,
                scheme='HTTPS'
            ),
            initial_delay_seconds=120,
            period_seconds=30,
            failure_threshold=5,
        ),
    )
 
    # Init Container: substitute Pod name in nifi.properties
    init_container = client.V1Container(
        name='init-config',
        image='busybox:1.36',
        command=['sh', '-c', '''
            cp /config-template/nifi.properties /config-output/nifi.properties
            sed -i "s/NIFI_POD_NAME/$NIFI_POD_NAME/g" /config-output/nifi.properties
            sed -i "s/NIFI_POD_NAMESPACE/$NIFI_POD_NAMESPACE/g" /config-output/nifi.properties
        '''],
        env=[
            client.V1EnvVar(name='NIFI_POD_NAME',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.name'))),
            client.V1EnvVar(name='NIFI_POD_NAMESPACE',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace'))),
        ],
        volume_mounts=[
            client.V1VolumeMount(name='config-template', mount_path='/config-template'),
            client.V1VolumeMount(name='config', mount_path='/config-output'),
        ]
    )
 
    # PVC Template
    pvc_template = client.V1PersistentVolumeClaim(
        metadata=client.V1ObjectMeta(name='data'),
        spec=client.V1PersistentVolumeClaimSpec(
            access_modes=['ReadWriteOnce'],
            storage_class_name=storage.get('storageClassName'),
            resources=client.V1VolumeResourceRequirements(
                requests={'storage': storage.get('size', '10Gi')}
            )
        )
    )
 
    return client.V1StatefulSet(
        metadata=client.V1ObjectMeta(name=f"{name}-nifi", namespace=namespace),
        spec=client.V1StatefulSetSpec(
            replicas=replicas,
            service_name=f"{name}-headless",
            pod_management_policy='OrderedReady',
            selector=client.V1LabelSelector(
                match_labels={'app': 'nifi', 'cluster': name}
            ),
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(
                    labels={'app': 'nifi', 'cluster': name}
                ),
                spec=client.V1PodSpec(
                    init_containers=[init_container],
                    containers=[container],
                    volumes=[
                        client.V1Volume(
                            name='config-template',
                            config_map=client.V1ConfigMapVolumeSource(
                                name=f"{name}-config"
                            )
                        ),
                        client.V1Volume(
                            name='config',
                            empty_dir=client.V1EmptyDirVolumeSource()
                        ),
                    ],
                    termination_grace_period_seconds=60,
                )
            ),
            volume_claim_templates=[pvc_template],
        )
    )

Service Creation Logic

# operator/resources/service.py
from kubernetes import client
 
 
def build_headless_service(name: str, namespace: str) -> client.V1Service:
    """Headless Service (for node discovery)"""
    return client.V1Service(
        metadata=client.V1ObjectMeta(
            name=f"{name}-headless",
            namespace=namespace,
        ),
        spec=client.V1ServiceSpec(
            cluster_ip='None',  # Headless
            selector={'app': 'nifi', 'cluster': name},
            ports=[
                client.V1ServicePort(name='https', port=8443, target_port=8443),
                client.V1ServicePort(name='cluster', port=11443, target_port=11443),
                client.V1ServicePort(name='load-balance', port=6342, target_port=6342),
            ],
            publish_not_ready_addresses=True,  # Register in DNS even before startup
        )
    )
 
 
def build_ui_service(name: str, namespace: str, spec: dict) -> client.V1Service:
    """UI Service (for web UI access)"""
    return client.V1Service(
        metadata=client.V1ObjectMeta(
            name=f"{name}-ui",
            namespace=namespace,
        ),
        spec=client.V1ServiceSpec(
            type='ClusterIP',
            selector={'app': 'nifi', 'cluster': name},
            ports=[
                client.V1ServicePort(name='https', port=8443, target_port=8443),
            ]
        )
    )

ConfigMap Creation (nifi.properties)

# operator/resources/configmap.py
from kubernetes import client
from jinja2 import Template
 
NIFI_PROPERTIES_TEMPLATE = """
# NiFi 2.x Properties (Generated by Operator)
# Web Properties
nifi.web.https.host=0.0.0.0
nifi.web.https.port=8443
nifi.web.proxy.host=
 
# Cluster Properties
nifi.cluster.is.node=true
nifi.cluster.node.address=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.node.protocol.port=11443
nifi.cluster.node.protocol.max.threads={{ max_threads }}
nifi.cluster.flow.election.max.wait.time=1 min
nifi.cluster.flow.election.max.candidates={{ replicas }}
 
# Load Balancing
nifi.cluster.load.balance.host=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.load.balance.port=6342
 
# Initial Node Identities (Kubernetes DNS-based)
{% for i in range(replicas) %}
nifi.cluster.node.{{ i + 1 }}={{ name }}-nifi-{{ i }}.{{ headless_service }}.{{ namespace }}.svc.cluster.local:11443
{% endfor %}
 
# State Management
nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.provider.cluster=local-provider
 
# Security Properties
nifi.sensitive.props.key={{ sensitive_props_key }}
nifi.security.autoreload.enabled=true
 
# Performance
nifi.bored.yield.duration=10 millis
nifi.queue.backpressure.count=10000
nifi.queue.backpressure.size=1 GB
 
# Data Directories
nifi.flowfile.repository.directory=./data/flowfile_repository
nifi.content.repository.directory.default=./data/content_repository
nifi.provenance.repository.directory.default=./data/provenance_repository
nifi.database.directory=./data/database_repository
"""
 
 
def build_nifi_config(name: str, namespace: str, spec: dict) -> client.V1ConfigMap:
    """Create NiFi configuration ConfigMap"""
    replicas = spec.get('replicas', 1)
    config = spec.get('config', {})
    headless_service = f"{name}-headless"
 
    template = Template(NIFI_PROPERTIES_TEMPLATE)
    nifi_properties = template.render(
        name=name,
        namespace=namespace,
        headless_service=headless_service,
        replicas=replicas,
        max_threads=config.get('maxThreads', 10),
        sensitive_props_key=config.get('sensitivePropsKey', 'default-key-change-me'),
    )
 
    bootstrap_conf = f"""
# Bootstrap Config (Generated by Operator)
java.arg.2=-Xms{config.get('jvmHeapSize', '1g')}
java.arg.3=-Xmx{config.get('jvmHeapSize', '1g')}
java.arg.14=-Djava.protocol.handler.pkgs=sun.net.www.protocol
"""
 
    return client.V1ConfigMap(
        metadata=client.V1ObjectMeta(
            name=f"{name}-config",
            namespace=namespace,
            labels={'app': 'nifi', 'cluster': name}
        ),
        data={
            'nifi.properties': nifi_properties,
            'bootstrap.conf': bootstrap_conf,
        }
    )

Verification

# 1. Register CRD
kubectl apply -f deploy/crds/nificluster-crd.yaml
 
# 2. Run Operator locally (development mode)
kopf run operator/main.py --verbose
 
# 3. Create NiFi cluster in another terminal
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
 
# 4. Check status
kubectl get nifi -n nifi
kubectl get pods -n nifi
kubectl get svc -n nifi
 
# 5. Access NiFi UI
kubectl port-forward svc/my-nifi-ui -n nifi 8443:8443
# Visit https://localhost:8443/nifi

Level 2: Intermediate — Cluster Management and Configuration Automation


6. NiFi 2.x Cluster Auto-Configuration

Cluster Bootstrap Order Control

When initially configuring a NiFi 2.x cluster, the node startup order is critical. The first node must be elected as the Cluster Coordinator, and the remaining nodes join this coordinator.

# operator/controllers/cluster.py
import asyncio
import httpx
 
async def wait_for_cluster_ready(name: str, namespace: str, replicas: int, logger):
    """Wait until the cluster is fully formed"""
    headless = f"{name}-headless"
    coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
    for attempt in range(60):  # Wait up to 10 minutes
        try:
            async with httpx.AsyncClient(verify=False) as client:
                resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
                if resp.status_code == 200:
                    cluster_info = resp.json()
                    connected = sum(
                        1 for n in cluster_info['cluster']['nodes']
                        if n['status'] == 'CONNECTED'
                    )
                    logger.info(f"Cluster nodes connected: {connected}/{replicas}")
                    if connected >= replicas:
                        return True
        except Exception as e:
            logger.debug(f"Waiting for cluster... ({attempt}/60): {e}")
 
        await asyncio.sleep(10)
 
    return False

Status Monitoring Timer

# operator/main.py
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def monitor_cluster(spec, name, namespace, patch, logger, **kwargs):
    """Check cluster status every 30 seconds"""
    try:
        headless = f"{name}-headless"
        coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
        async with httpx.AsyncClient(verify=False) as client:
            resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
 
            if resp.status_code == 200:
                cluster = resp.json()['cluster']
                connected = sum(1 for n in cluster['nodes'] if n['status'] == 'CONNECTED')
                coordinator = next(
                    (n['address'] for n in cluster['nodes']
                     if 'PRIMARY' in n.get('roles', [])),
                    'unknown'
                )
                patch.status['readyNodes'] = connected
                patch.status['clusterCoordinator'] = coordinator
                patch.status['phase'] = 'Running'
            else:
                patch.status['phase'] = 'Error'
                patch.status['message'] = f"API returned {resp.status_code}"
 
    except Exception as e:
        logger.warning(f"Health check failed: {e}")
        patch.status['phase'] = 'Error'
        patch.status['message'] = str(e)

7. Scaling (Scale Up/Down)

Update Handler

@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.replicas')
async def on_replicas_change(spec, name, namespace, old, new, logger, patch, **kwargs):
    """Detect replicas change → perform scaling"""
    old_replicas = old
    new_replicas = new
 
    logger.info(f"Scaling '{name}': {old_replicas}{new_replicas}")
    patch.status['phase'] = 'Scaling'
 
    if new_replicas > old_replicas:
        await scale_up(name, namespace, old_replicas, new_replicas, spec, logger)
    elif new_replicas < old_replicas:
        await scale_down(name, namespace, old_replicas, new_replicas, logger)
 
    # Update ConfigMap (node list changed)
    configmap = build_nifi_config(name, namespace, spec)
    core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
 
    # Update StatefulSet replicas
    apps_v1.patch_namespaced_stateful_set(
        f"{name}-nifi", namespace,
        {'spec': {'replicas': new_replicas}}
    )
 
    patch.status['phase'] = 'Running'

Safe Scale Down

When removing NiFi nodes, you must follow the Offload → Disconnect → Delete sequence.

# operator/controllers/scaling.py
async def scale_down(name: str, namespace: str, old: int, new: int, logger):
    """Safe scale down: remove nodes starting from the highest index"""
    headless = f"{name}-headless"
    coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
    async with httpx.AsyncClient(verify=False) as client:
        # Nodes to remove (starting from highest index)
        for i in range(old - 1, new - 1, -1):
            node_address = f"{name}-nifi-{i}.{headless}.{namespace}.svc.cluster.local"
            logger.info(f"Offloading node: {node_address}")
 
            # 1. Look up node ID from cluster
            resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
            cluster = resp.json()['cluster']
            node = next(
                (n for n in cluster['nodes'] if node_address in n['address']),
                None
            )
 
            if not node:
                logger.warning(f"Node {node_address} not found in cluster")
                continue
 
            node_id = node['nodeIdentifier']['id']
 
            # 2. OFFLOADING: migrate data to other nodes
            await client.put(
                f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
                json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'OFFLOADING'}}
            )
            logger.info(f"Node {i} offloading started")
 
            # 3. Wait for OFFLOADING to complete
            for _ in range(60):
                resp = await client.get(
                    f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
                )
                status = resp.json()['node']['status']
                if status == 'OFFLOADED':
                    break
                await asyncio.sleep(5)
 
            # 4. DISCONNECTING
            await client.put(
                f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
                json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'DISCONNECTING'}}
            )
            logger.info(f"Node {i} disconnected")
 
            # 5. Remove from cluster
            await client.delete(
                f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
            )
            logger.info(f"Node {i} removed from cluster")

8. Configuration Change Management

Rolling Restart

@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.config')
async def on_config_change(spec, name, namespace, logger, patch, **kwargs):
    """On config change: update ConfigMap + rolling restart"""
    logger.info(f"Config changed for '{name}', performing rolling restart")
    patch.status['phase'] = 'Updating'
 
    # 1. Update ConfigMap
    configmap = build_nifi_config(name, namespace, spec)
    core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
 
    # 2. Trigger rolling restart (via annotation change)
    import datetime
    restart_annotation = {
        'spec': {
            'template': {
                'metadata': {
                    'annotations': {
                        'nifi-operator/restartedAt': datetime.datetime.now().isoformat()
                    }
                }
            }
        }
    }
    apps_v1.patch_namespaced_stateful_set(f"{name}-nifi", namespace, restart_annotation)
    logger.info("Rolling restart triggered")
 
    # 3. Wait for rolling restart to complete
    replicas = spec['replicas']
    for _ in range(120):
        sts = apps_v1.read_namespaced_stateful_set(f"{name}-nifi", namespace)
        if sts.status.ready_replicas == replicas and sts.status.updated_replicas == replicas:
            break
        await asyncio.sleep(5)
 
    patch.status['phase'] = 'Running'
    logger.info("Rolling restart completed")

9. TLS/SSL Certificate Automation

Creating cert-manager Certificate CRs

# operator/resources/certificate.py
from kubernetes import client
 
 
def build_tls_certificate(name: str, namespace: str, spec: dict, ordinal: int) -> dict:
    """Create cert-manager Certificate resource"""
    tls = spec.get('tls', {})
    headless = f"{name}-headless"
 
    return {
        'apiVersion': 'cert-manager.io/v1',
        'kind': 'Certificate',
        'metadata': {
            'name': f'{name}-nifi-{ordinal}-tls',
            'namespace': namespace,
        },
        'spec': {
            'secretName': f'{name}-nifi-{ordinal}-tls',
            'issuerRef': {
                'name': tls['issuerRef']['name'],
                'kind': tls['issuerRef'].get('kind', 'ClusterIssuer'),
            },
            'commonName': f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
            'dnsNames': [
                f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
                f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc',
                f'{name}-nifi-{ordinal}.{headless}',
                f'{name}-nifi-{ordinal}',
                'localhost',
            ],
            'duration': '8760h',    # 1 year
            'renewBefore': '720h',  # Renew 30 days before expiry
            'privateKey': {
                'algorithm': 'RSA',
                'size': 2048,
            },
            'keystores': {
                'jks': {
                    'create': True,
                    'passwordSecretRef': {
                        'name': f'{name}-keystore-password',
                        'key': 'password',
                    }
                }
            }
        }
    }
# Create certificates when TLS is enabled (add to on_create)
if spec.get('tls', {}).get('enabled', False):
    custom_api = client.CustomObjectsApi()
    for i in range(spec['replicas']):
        cert = build_tls_certificate(name, namespace, spec, i)
        custom_api.create_namespaced_custom_object(
            group='cert-manager.io', version='v1',
            namespace=namespace, plural='certificates', body=cert
        )
    logger.info(f"TLS certificates created for {spec['replicas']} nodes")

Level 3: Advanced — Production-Grade Operator


10. NiFi Flow Deployment Automation

NiFiFlow CRD

# deploy/crds/nififlow-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: nififlows.datadynamics.io
spec:
  group: datadynamics.io
  names:
    kind: NiFiFlow
    plural: nififlows
    singular: nififlow
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                clusterRef:
                  type: string
                  description: "NiFiCluster name"
                registryUrl:
                  type: string
                  description: "NiFi Registry URL"
                bucketId:
                  type: string
                flowId:
                  type: string
                flowVersion:
                  type: integer
                processGroupId:
                  type: string
                  default: "root"
                autoStart:
                  type: boolean
                  default: true

Flow Deployment Handler

@kopf.on.create('datadynamics.io', 'v1alpha1', 'nififlows')
async def on_flow_create(spec, name, namespace, logger, patch, **kwargs):
    """NiFiFlow CR created → deploy Flow to NiFi"""
    cluster_name = spec['clusterRef']
    headless = f"{cluster_name}-headless"
    nifi_url = f"https://{cluster_name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
    async with httpx.AsyncClient(verify=False) as client:
        # 1. Fetch Flow from NiFi Registry
        flow_payload = {
            'registryId': await get_registry_id(client, nifi_url, spec['registryUrl']),
            'bucketId': spec['bucketId'],
            'flowId': spec['flowId'],
            'version': spec['flowVersion'],
        }
 
        # 2. Deploy Flow to Process Group
        pg_id = spec.get('processGroupId', 'root')
        resp = await client.post(
            f"{nifi_url}/nifi-api/process-groups/{pg_id}/process-groups",
            json={
                'revision': {'version': 0},
                'component': {
                    'position': {'x': 0, 'y': 0},
                    'versionControlInformation': flow_payload,
                }
            }
        )
 
        if resp.status_code == 201:
            deployed_pg = resp.json()
            logger.info(f"Flow deployed: {deployed_pg['id']}")
 
            # 3. Auto-start
            if spec.get('autoStart', True):
                await client.put(
                    f"{nifi_url}/nifi-api/flow/process-groups/{deployed_pg['id']}",
                    json={'id': deployed_pg['id'], 'state': 'RUNNING'}
                )
                logger.info("Flow started")
 
            patch.status['deployedProcessGroupId'] = deployed_pg['id']
            patch.status['phase'] = 'Running'
        else:
            logger.error(f"Flow deployment failed: {resp.text}")
            patch.status['phase'] = 'Error'

GitOps Workflow

[Git → NiFiFlow CR → Operator → NiFi]

1. Developer commits Flow to NiFi Registry
2. Updates Flow version in NiFiFlow CR YAML
3. Commits + pushes to Git
4. ArgoCD/FluxCD detects CR change → applies to K8s
5. Operator detects NiFiFlow CR change
6. Automatically deploys/updates Flow via NiFi REST API

11. Monitoring and Auto-Recovery

Exposing Prometheus Metrics

# operator/main.py
from prometheus_client import start_http_server, Gauge, Counter
 
# Metric definitions
nifi_cluster_nodes = Gauge('nifi_cluster_nodes_total', 'Total NiFi nodes', ['cluster'])
nifi_cluster_ready = Gauge('nifi_cluster_nodes_ready', 'Ready NiFi nodes', ['cluster'])
nifi_scaling_events = Counter('nifi_scaling_events_total', 'Scaling events', ['cluster', 'direction'])
nifi_reconcile_errors = Counter('nifi_reconcile_errors_total', 'Reconciliation errors', ['cluster'])
 
# Start Prometheus metrics server
start_http_server(8080)
 
# Update metrics in timer
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def update_metrics(spec, name, status, **kwargs):
    nifi_cluster_nodes.labels(cluster=name).set(spec.get('replicas', 0))
    nifi_cluster_ready.labels(cluster=name).set(status.get('readyNodes', 0))

Auto-Recovery

@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=60)
async def auto_heal(spec, name, namespace, patch, logger, **kwargs):
    """Automatic detection and recovery of failed nodes"""
    try:
        headless = f"{name}-headless"
        url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
        async with httpx.AsyncClient(verify=False) as client:
            resp = await client.get(f"{url}/nifi-api/controller/cluster")
            if resp.status_code != 200:
                return
 
            cluster = resp.json()['cluster']
            for node in cluster['nodes']:
                if node['status'] == 'DISCONNECTED':
                    node_id = node['nodeIdentifier']['id']
                    node_addr = node['address']
                    logger.warning(f"Disconnected node detected: {node_addr}")
 
                    # Attempt reconnection
                    await client.put(
                        f"{url}/nifi-api/controller/cluster/nodes/{node_id}",
                        json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'CONNECTING'}}
                    )
                    logger.info(f"Reconnection initiated for {node_addr}")
 
    except Exception as e:
        nifi_reconcile_errors.labels(cluster=name).inc()
        logger.error(f"Auto-heal error: {e}")

12. LDAP/OIDC Authentication Automation

Auto-Generating Authentication Configuration

def build_auth_config(spec: dict) -> dict:
    """Generate configuration files based on authentication type"""
    auth = spec.get('auth', {})
    auth_type = auth.get('type', 'single-user')
 
    if auth_type == 'ldap':
        login_identity_providers = f"""
<loginIdentityProviders>
    <provider>
        <identifier>ldap-provider</identifier>
        <class>org.apache.nifi.ldap.LdapProvider</class>
        <property name="Authentication Strategy">SIMPLE</property>
        <property name="Manager DN"></property>
        <property name="Manager Password"></property>
        <property name="Url">{auth['ldap']['url']}</property>
        <property name="User Search Base">{auth['ldap']['searchBase']}</property>
        <property name="User Search Filter">{auth['ldap']['searchFilter']}</property>
    </provider>
</loginIdentityProviders>"""
        return {'login-identity-providers.xml': login_identity_providers}
 
    elif auth_type == 'oidc':
        nifi_properties_auth = f"""
nifi.security.user.oidc.discovery.url={auth['oidc']['discoveryUrl']}
nifi.security.user.oidc.client.id={auth['oidc']['clientId']}
nifi.security.user.oidc.client.secret=${{OIDC_CLIENT_SECRET}}
"""
        return {'auth.properties': nifi_properties_auth}
 
    return {}

13. Operator Deployment and Operations

Dockerfile

FROM python:3.11-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY operator/ ./operator/
 
USER 1000:1000
 
CMD ["kopf", "run", "operator/main.py", "--all-namespaces", "--verbose"]

RBAC

# deploy/rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nifi-operator
  namespace: nifi-operator-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: nifi-operator-role
rules:
  - apiGroups: ["datadynamics.io"]
    resources: ["nificlusters", "nificlusters/status", "nififlows", "nififlows/status"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: ["apps"]
    resources: ["statefulsets"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["services", "configmaps", "secrets", "pods", "persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: ["cert-manager.io"]
    resources: ["certificates"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: nifi-operator-binding
subjects:
  - kind: ServiceAccount
    name: nifi-operator
    namespace: nifi-operator-system
roleRef:
  kind: ClusterRole
  name: nifi-operator-role
  apiGroup: rbac.authorization.k8s.io

Operator Deployment

# deploy/operator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nifi-operator
  namespace: nifi-operator-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nifi-operator
  template:
    metadata:
      labels:
        app: nifi-operator
    spec:
      serviceAccountName: nifi-operator
      containers:
        - name: operator
          image: datadynamics/nifi-operator:latest
          ports:
            - containerPort: 8080
              name: metrics
          resources:
            requests:
              cpu: 100m
              memory: 256Mi
            limits:
              cpu: 500m
              memory: 512Mi
          env:
            - name: OPERATOR_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace

Deployment Steps

# 1. Create Operator namespace
kubectl create namespace nifi-operator-system
 
# 2. Register CRDs
kubectl apply -f deploy/crds/
 
# 3. Configure RBAC
kubectl apply -f deploy/rbac.yaml
 
# 4. Deploy Operator
kubectl apply -f deploy/operator-deployment.yaml
 
# 5. Check Operator logs
kubectl logs -f deployment/nifi-operator -n nifi-operator-system
 
# 6. Create NiFi cluster
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
 
# 7. Check status
kubectl get nifi -n nifi -w

Troubleshooting Guide

ProblemCauseSolution
Pod CrashLoopBackOffInsufficient JVM memoryIncrease spec.config.jvmHeapSize
Cluster join failureDNS resolution failureVerify Headless Service publishNotReadyAddresses: true
Node DISCONNECTEDNetwork timeoutIncrease nifi.cluster.node.connection.timeout
Flow Election failureStartup time difference between nodesIncrease flow.election.max.wait.time
TLS handshake failureCertificate SAN mismatchCheck cert-manager Certificate dnsNames
OOM KilledContainer memory limit exceededIncrease spec.resources.limits.memory
PVC PendingStorageClass not foundCheck spec.storage.storageClassName
Operator permission errorInsufficient RBACCheck ClusterRole rules

Note: This Operator is intended for learning and demonstration purposes. For production use, additional work is required including enhanced error handling, Finalizer support, integration testing, and Helm chart packaging.


References


— Data Dynamics Engineering Team