Blog
nifikubernetesoperatorpythonkopfcrdclusterdevops

Python으로 NiFi 2.x Cluster Kubernetes Operator 만들기 - 기초부터 프로덕션까지

Python(kopf)으로 Apache NiFi 2.x 클러스터를 관리하는 Kubernetes Operator를 단계별로 구현합니다. CRD 설계, 클러스터 자동 구성, 스케일링, TLS, Flow 배포, 모니터링까지 3단계 역량 레벨로 다룹니다.

Data Dynamics2026年4月16日22 min read
This post is not yet translated. The original Korean version is shown below.

Apache NiFi 2.x는 ZooKeeper 의존성을 제거하고 자체 Raft 기반 클러스터링을 도입했습니다. 이 글에서는 Python으로 NiFi 2.x 클러스터를 Kubernetes에서 자동으로 관리하는 Operator를 3단계 역량 레벨로 구현합니다.


Level 1: 기초 — Operator 개념과 단일 NiFi 노드


1. Kubernetes Operator란 무엇인가

Operator 패턴

Kubernetes Operator는 사람이 수행하던 운영 작업을 자동화하는 소프트웨어입니다. CRD(Custom Resource Definition)로 원하는 상태를 선언하면, Controller가 실제 상태를 원하는 상태와 일치시킵니다.

[Operator 패턴]

사용자가 선언:                    Operator가 수행:
┌─────────────────┐              ┌─────────────────────────┐
│ NiFiCluster CR  │              │ Controller (Python)      │
│                 │  ─────────→  │                          │
│ replicas: 3     │   감시/조정   │ 1. StatefulSet 3개 생성  │
│ version: 2.4.0  │              │ 2. 설정 파일 생성         │
│ tls: true       │              │ 3. 인증서 발급           │
└─────────────────┘              │ 4. 클러스터 조인 확인     │
                                 │ 5. 장애 시 자동 복구     │
                                 └─────────────────────────┘

왜 NiFi에 Operator가 필요한가

수동 관리의 문제Operator의 해결
노드 추가 시 수동으로 nifi.properties 수정노드 수 변경만으로 자동 설정
TLS 인증서 수동 생성/갱신cert-manager 연동 자동화
스케일 다운 시 데이터 유실 위험안전한 오프보딩 자동 수행
설정 변경 시 전체 재시작롤링 재시작 자동화
장애 노드 수동 복구자동 감지 + 복구
Flow 배포 수동 관리GitOps 기반 자동 배포

2. NiFi 2.x 클러스터링 아키텍처

NiFi 1.x → 2.x 변경사항

항목NiFi 1.xNiFi 2.x
클러스터 코디네이션ZooKeeper 필수자체 Raft 기반 (ZK 제거)
노드 디스커버리ZooKeeper초기 노드 목록 (nifi.properties)
Primary Node 선출ZooKeeper자체 선출 메커니즘
Flow 동기화ZooKeeper클러스터 코디네이터 직접 관리
의존 컴포넌트NiFi + ZooKeeper (3~5대)NiFi만
운영 복잡도높음 (ZK 관리 필요)낮음

Kubernetes에서의 노드 디스커버리

NiFi 2.x에서는 ZooKeeper 대신 초기 노드 목록을 직접 지정합니다. Kubernetes의 Headless Service와 DNS를 활용하면 자동으로 노드를 발견할 수 있습니다.

[K8s 기반 NiFi 클러스터 구조]

Headless Service: nifi-cluster-headless (ClusterIP: None)
  │
  ├─ nifi-0.nifi-cluster-headless.nifi.svc.cluster.local
  ├─ nifi-1.nifi-cluster-headless.nifi.svc.cluster.local
  └─ nifi-2.nifi-cluster-headless.nifi.svc.cluster.local

nifi.properties (각 노드):
  nifi.cluster.is.node=true
  nifi.cluster.node.address=nifi-{ordinal}.nifi-cluster-headless.{namespace}.svc.cluster.local
  nifi.cluster.node.protocol.port=11443
  nifi.cluster.flow.election.max.candidates=3

3. 개발 환경 준비

사전 요구사항

# Python 3.11+
python --version
 
# kubectl
kubectl version --client
 
# kind (로컬 K8s 클러스터)
kind create cluster --name nifi-dev
 
# Python 의존성 설치
pip install kopf kubernetes pyyaml jinja2 httpx

kopf 소개

kopf(Kubernetes Operator Pythonic Framework)는 Python으로 Kubernetes Operator를 쉽게 개발할 수 있는 프레임워크입니다.

import kopf
 
@kopf.on.create('nificlusters')
def create_fn(spec, name, namespace, **kwargs):
    """NiFiCluster 리소스 생성 시 호출"""
    print(f"NiFi 클러스터 '{name}' 생성 요청: {spec}")
 
@kopf.on.update('nificlusters')
def update_fn(spec, name, namespace, diff, **kwargs):
    """NiFiCluster 리소스 변경 시 호출"""
    print(f"NiFi 클러스터 '{name}' 변경: {diff}")
 
@kopf.on.delete('nificlusters')
def delete_fn(name, namespace, **kwargs):
    """NiFiCluster 리소스 삭제 시 호출"""
    print(f"NiFi 클러스터 '{name}' 삭제")

프로젝트 구조

nifi-operator/
├── operator/
│   ├── __init__.py
│   ├── main.py              # kopf 핸들러 (진입점)
│   ├── controllers/
│   │   ├── __init__.py
│   │   ├── cluster.py        # NiFiCluster 컨트롤러
│   │   ├── flow.py           # NiFiFlow 컨트롤러
│   │   └── scaling.py        # 스케일링 로직
│   ├── resources/
│   │   ├── __init__.py
│   │   ├── statefulset.py    # StatefulSet 생성
│   │   ├── service.py        # Service 생성
│   │   ├── configmap.py      # ConfigMap 생성
│   │   └── certificate.py    # TLS 인증서
│   ├── nifi_api/
│   │   ├── __init__.py
│   │   └── client.py         # NiFi REST API 클라이언트
│   └── templates/
│       ├── nifi.properties.j2
│       ├── bootstrap.conf.j2
│       └── authorizers.xml.j2
├── deploy/
│   ├── crds/
│   │   └── nificluster-crd.yaml
│   ├── rbac.yaml
│   └── operator-deployment.yaml
├── Dockerfile
├── requirements.txt
└── README.md

4. CRD (Custom Resource Definition) 설계

NiFiCluster CRD

# deploy/crds/nificluster-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: nificlusters.datadynamics.io
spec:
  group: datadynamics.io
  names:
    kind: NiFiCluster
    plural: nificlusters
    singular: nificluster
    shortNames: ["nifi"]
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              required: ["replicas", "version"]
              properties:
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 20
                  description: "NiFi 클러스터 노드 수"
                version:
                  type: string
                  description: "NiFi Docker 이미지 버전"
                image:
                  type: string
                  default: "apache/nifi"
                  description: "NiFi Docker 이미지"
                resources:
                  type: object
                  properties:
                    requests:
                      type: object
                      properties:
                        cpu: { type: string, default: "1" }
                        memory: { type: string, default: "2Gi" }
                    limits:
                      type: object
                      properties:
                        cpu: { type: string, default: "2" }
                        memory: { type: string, default: "4Gi" }
                storage:
                  type: object
                  properties:
                    size: { type: string, default: "10Gi" }
                    storageClassName: { type: string }
                config:
                  type: object
                  properties:
                    jvmHeapSize: { type: string, default: "1g" }
                    maxThreads: { type: integer, default: 10 }
                    sensitivePropsKey: { type: string }
                tls:
                  type: object
                  properties:
                    enabled: { type: boolean, default: false }
                    issuerRef:
                      type: object
                      properties:
                        name: { type: string }
                        kind: { type: string, default: "ClusterIssuer" }
                auth:
                  type: object
                  properties:
                    type: { type: string, enum: ["single-user", "ldap", "oidc"] }
                    ldap:
                      type: object
                      properties:
                        url: { type: string }
                        searchBase: { type: string }
                        searchFilter: { type: string }
                    oidc:
                      type: object
                      properties:
                        discoveryUrl: { type: string }
                        clientId: { type: string }
                        clientSecretRef: { type: string }
            status:
              type: object
              properties:
                phase:
                  type: string
                  enum: ["Creating", "Running", "Scaling", "Updating", "Error"]
                readyNodes:
                  type: integer
                clusterCoordinator:
                  type: string
                message:
                  type: string
      subresources:
        status: {}
      additionalPrinterColumns:
        - name: Replicas
          type: integer
          jsonPath: .spec.replicas
        - name: Ready
          type: integer
          jsonPath: .status.readyNodes
        - name: Phase
          type: string
          jsonPath: .status.phase
        - name: Version
          type: string
          jsonPath: .spec.version
        - name: Age
          type: date
          jsonPath: .metadata.creationTimestamp

NiFiCluster CR 예시

# examples/nifi-cluster.yaml
apiVersion: datadynamics.io/v1alpha1
kind: NiFiCluster
metadata:
  name: my-nifi
  namespace: nifi
spec:
  replicas: 3
  version: "2.4.0"
  image: "apache/nifi"
  resources:
    requests:
      cpu: "1"
      memory: "2Gi"
    limits:
      cpu: "2"
      memory: "4Gi"
  storage:
    size: "20Gi"
    storageClassName: "standard"
  config:
    jvmHeapSize: "1536m"
    maxThreads: 15
    sensitivePropsKey: "my-secret-key-12345"
  tls:
    enabled: true
    issuerRef:
      name: letsencrypt-prod
      kind: ClusterIssuer
  auth:
    type: single-user
# CRD 등록 및 CR 생성
kubectl apply -f deploy/crds/nificluster-crd.yaml
kubectl apply -f examples/nifi-cluster.yaml
 
# 확인
kubectl get nifi
# NAME      REPLICAS   READY   PHASE     VERSION   AGE
# my-nifi   3          3       Running   2.4.0     5m

5. 첫 번째 Operator: 단일 NiFi 노드 배포

핵심 핸들러 구현

# operator/main.py
import kopf
import kubernetes
from kubernetes import client
from operator.resources.statefulset import build_statefulset
from operator.resources.service import build_headless_service, build_ui_service
from operator.resources.configmap import build_nifi_config
 
kubernetes.config.load_incluster_config()  # 클러스터 내부
# kubernetes.config.load_kube_config()    # 로컬 개발
 
apps_v1 = client.AppsV1Api()
core_v1 = client.CoreV1Api()
 
 
@kopf.on.create('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_create(spec, name, namespace, logger, patch, **kwargs):
    """NiFiCluster 생성 시 리소스 프로비저닝"""
    logger.info(f"Creating NiFi cluster '{name}' with {spec['replicas']} nodes")
 
    # 상태 업데이트
    patch.status['phase'] = 'Creating'
    patch.status['readyNodes'] = 0
 
    # 1. ConfigMap 생성 (nifi.properties 등)
    configmap = build_nifi_config(name, namespace, spec)
    kopf.adopt(configmap)  # Owner Reference 설정
    core_v1.create_namespaced_config_map(namespace, configmap)
    logger.info(f"ConfigMap '{configmap.metadata.name}' created")
 
    # 2. Headless Service 생성 (노드 디스커버리용)
    headless_svc = build_headless_service(name, namespace)
    kopf.adopt(headless_svc)
    core_v1.create_namespaced_service(namespace, headless_svc)
    logger.info(f"Headless Service created")
 
    # 3. UI Service 생성 (외부 접근용)
    ui_svc = build_ui_service(name, namespace, spec)
    kopf.adopt(ui_svc)
    core_v1.create_namespaced_service(namespace, ui_svc)
    logger.info(f"UI Service created")
 
    # 4. StatefulSet 생성
    statefulset = build_statefulset(name, namespace, spec)
    kopf.adopt(statefulset)
    apps_v1.create_namespaced_stateful_set(namespace, statefulset)
    logger.info(f"StatefulSet created with {spec['replicas']} replicas")
 
    patch.status['phase'] = 'Running'
    return {'message': f"NiFi cluster '{name}' created successfully"}
 
 
@kopf.on.delete('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_delete(name, namespace, logger, **kwargs):
    """NiFiCluster 삭제 — Owner Reference로 자동 삭제됨"""
    logger.info(f"NiFi cluster '{name}' is being deleted (cascade)")
    # kopf.adopt()으로 설정한 Owner Reference에 의해
    # StatefulSet, Service, ConfigMap이 자동으로 삭제됨

StatefulSet 생성 로직

# operator/resources/statefulset.py
from kubernetes import client
 
 
def build_statefulset(name: str, namespace: str, spec: dict) -> client.V1StatefulSet:
    """NiFi StatefulSet 생성"""
    replicas = spec.get('replicas', 1)
    version = spec['version']
    image = spec.get('image', 'apache/nifi')
    resources = spec.get('resources', {})
    storage = spec.get('storage', {})
    config = spec.get('config', {})
 
    # 컨테이너 정의
    container = client.V1Container(
        name='nifi',
        image=f"{image}:{version}",
        ports=[
            client.V1ContainerPort(container_port=8443, name='https'),
            client.V1ContainerPort(container_port=11443, name='cluster'),
            client.V1ContainerPort(container_port=6342, name='load-balance'),
        ],
        env=[
            client.V1EnvVar(name='NIFI_JVM_HEAP_INIT', value=config.get('jvmHeapSize', '1g')),
            client.V1EnvVar(name='NIFI_JVM_HEAP_MAX', value=config.get('jvmHeapSize', '1g')),
            client.V1EnvVar(
                name='NIFI_POD_NAME',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.name')
                )
            ),
            client.V1EnvVar(
                name='NIFI_POD_NAMESPACE',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace')
                )
            ),
        ],
        resources=client.V1ResourceRequirements(
            requests={
                'cpu': resources.get('requests', {}).get('cpu', '1'),
                'memory': resources.get('requests', {}).get('memory', '2Gi'),
            },
            limits={
                'cpu': resources.get('limits', {}).get('cpu', '2'),
                'memory': resources.get('limits', {}).get('memory', '4Gi'),
            }
        ),
        volume_mounts=[
            client.V1VolumeMount(name='data', mount_path='/opt/nifi/nifi-current/data'),
            client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/nifi.properties',
                                 sub_path='nifi.properties'),
            client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/bootstrap.conf',
                                 sub_path='bootstrap.conf'),
        ],
        readiness_probe=client.V1Probe(
            http_get=client.V1HTTPGetAction(
                path='/nifi-api/controller/cluster',
                port=8443,
                scheme='HTTPS'
            ),
            initial_delay_seconds=90,
            period_seconds=20,
            failure_threshold=10,
        ),
        liveness_probe=client.V1Probe(
            http_get=client.V1HTTPGetAction(
                path='/nifi-api/system-diagnostics',
                port=8443,
                scheme='HTTPS'
            ),
            initial_delay_seconds=120,
            period_seconds=30,
            failure_threshold=5,
        ),
    )
 
    # Init Container: nifi.properties에서 Pod 이름 치환
    init_container = client.V1Container(
        name='init-config',
        image='busybox:1.36',
        command=['sh', '-c', '''
            cp /config-template/nifi.properties /config-output/nifi.properties
            sed -i "s/NIFI_POD_NAME/$NIFI_POD_NAME/g" /config-output/nifi.properties
            sed -i "s/NIFI_POD_NAMESPACE/$NIFI_POD_NAMESPACE/g" /config-output/nifi.properties
        '''],
        env=[
            client.V1EnvVar(name='NIFI_POD_NAME',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.name'))),
            client.V1EnvVar(name='NIFI_POD_NAMESPACE',
                value_from=client.V1EnvVarSource(
                    field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace'))),
        ],
        volume_mounts=[
            client.V1VolumeMount(name='config-template', mount_path='/config-template'),
            client.V1VolumeMount(name='config', mount_path='/config-output'),
        ]
    )
 
    # PVC 템플릿
    pvc_template = client.V1PersistentVolumeClaim(
        metadata=client.V1ObjectMeta(name='data'),
        spec=client.V1PersistentVolumeClaimSpec(
            access_modes=['ReadWriteOnce'],
            storage_class_name=storage.get('storageClassName'),
            resources=client.V1VolumeResourceRequirements(
                requests={'storage': storage.get('size', '10Gi')}
            )
        )
    )
 
    return client.V1StatefulSet(
        metadata=client.V1ObjectMeta(name=f"{name}-nifi", namespace=namespace),
        spec=client.V1StatefulSetSpec(
            replicas=replicas,
            service_name=f"{name}-headless",
            pod_management_policy='OrderedReady',
            selector=client.V1LabelSelector(
                match_labels={'app': 'nifi', 'cluster': name}
            ),
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(
                    labels={'app': 'nifi', 'cluster': name}
                ),
                spec=client.V1PodSpec(
                    init_containers=[init_container],
                    containers=[container],
                    volumes=[
                        client.V1Volume(
                            name='config-template',
                            config_map=client.V1ConfigMapVolumeSource(
                                name=f"{name}-config"
                            )
                        ),
                        client.V1Volume(
                            name='config',
                            empty_dir=client.V1EmptyDirVolumeSource()
                        ),
                    ],
                    termination_grace_period_seconds=60,
                )
            ),
            volume_claim_templates=[pvc_template],
        )
    )

Service 생성 로직

# operator/resources/service.py
from kubernetes import client
 
 
def build_headless_service(name: str, namespace: str) -> client.V1Service:
    """Headless Service (노드 디스커버리용)"""
    return client.V1Service(
        metadata=client.V1ObjectMeta(
            name=f"{name}-headless",
            namespace=namespace,
        ),
        spec=client.V1ServiceSpec(
            cluster_ip='None',  # Headless
            selector={'app': 'nifi', 'cluster': name},
            ports=[
                client.V1ServicePort(name='https', port=8443, target_port=8443),
                client.V1ServicePort(name='cluster', port=11443, target_port=11443),
                client.V1ServicePort(name='load-balance', port=6342, target_port=6342),
            ],
            publish_not_ready_addresses=True,  # 시작 전에도 DNS 등록
        )
    )
 
 
def build_ui_service(name: str, namespace: str, spec: dict) -> client.V1Service:
    """UI Service (웹 UI 접근용)"""
    return client.V1Service(
        metadata=client.V1ObjectMeta(
            name=f"{name}-ui",
            namespace=namespace,
        ),
        spec=client.V1ServiceSpec(
            type='ClusterIP',
            selector={'app': 'nifi', 'cluster': name},
            ports=[
                client.V1ServicePort(name='https', port=8443, target_port=8443),
            ]
        )
    )

ConfigMap 생성 (nifi.properties)

# operator/resources/configmap.py
from kubernetes import client
from jinja2 import Template
 
NIFI_PROPERTIES_TEMPLATE = """
# NiFi 2.x Properties (Generated by Operator)
# Web Properties
nifi.web.https.host=0.0.0.0
nifi.web.https.port=8443
nifi.web.proxy.host=
 
# Cluster Properties
nifi.cluster.is.node=true
nifi.cluster.node.address=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.node.protocol.port=11443
nifi.cluster.node.protocol.max.threads={{ max_threads }}
nifi.cluster.flow.election.max.wait.time=1 min
nifi.cluster.flow.election.max.candidates={{ replicas }}
 
# Load Balancing
nifi.cluster.load.balance.host=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.load.balance.port=6342
 
# Initial Node Identities (Kubernetes DNS 기반)
{% for i in range(replicas) %}
nifi.cluster.node.{{ i + 1 }}={{ name }}-nifi-{{ i }}.{{ headless_service }}.{{ namespace }}.svc.cluster.local:11443
{% endfor %}
 
# State Management
nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.provider.cluster=local-provider
 
# Security Properties
nifi.sensitive.props.key={{ sensitive_props_key }}
nifi.security.autoreload.enabled=true
 
# Performance
nifi.bored.yield.duration=10 millis
nifi.queue.backpressure.count=10000
nifi.queue.backpressure.size=1 GB
 
# Data Directories
nifi.flowfile.repository.directory=./data/flowfile_repository
nifi.content.repository.directory.default=./data/content_repository
nifi.provenance.repository.directory.default=./data/provenance_repository
nifi.database.directory=./data/database_repository
"""
 
 
def build_nifi_config(name: str, namespace: str, spec: dict) -> client.V1ConfigMap:
    """NiFi 설정 ConfigMap 생성"""
    replicas = spec.get('replicas', 1)
    config = spec.get('config', {})
    headless_service = f"{name}-headless"
 
    template = Template(NIFI_PROPERTIES_TEMPLATE)
    nifi_properties = template.render(
        name=name,
        namespace=namespace,
        headless_service=headless_service,
        replicas=replicas,
        max_threads=config.get('maxThreads', 10),
        sensitive_props_key=config.get('sensitivePropsKey', 'default-key-change-me'),
    )
 
    bootstrap_conf = f"""
# Bootstrap Config (Generated by Operator)
java.arg.2=-Xms{config.get('jvmHeapSize', '1g')}
java.arg.3=-Xmx{config.get('jvmHeapSize', '1g')}
java.arg.14=-Djava.protocol.handler.pkgs=sun.net.www.protocol
"""
 
    return client.V1ConfigMap(
        metadata=client.V1ObjectMeta(
            name=f"{name}-config",
            namespace=namespace,
            labels={'app': 'nifi', 'cluster': name}
        ),
        data={
            'nifi.properties': nifi_properties,
            'bootstrap.conf': bootstrap_conf,
        }
    )

동작 확인

# 1. CRD 등록
kubectl apply -f deploy/crds/nificluster-crd.yaml
 
# 2. Operator 로컬 실행 (개발 모드)
kopf run operator/main.py --verbose
 
# 3. 다른 터미널에서 NiFi 클러스터 생성
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
 
# 4. 상태 확인
kubectl get nifi -n nifi
kubectl get pods -n nifi
kubectl get svc -n nifi
 
# 5. NiFi UI 접근
kubectl port-forward svc/my-nifi-ui -n nifi 8443:8443
# https://localhost:8443/nifi 접속

Level 2: 중급 — 클러스터 관리와 설정 자동화


6. NiFi 2.x 클러스터 자동 구성

클러스터 부트스트랩 순서 제어

NiFi 2.x 클러스터를 처음 구성할 때는 노드 시작 순서가 중요합니다. 첫 번째 노드가 클러스터 코디네이터로 선출되어야 하며, 나머지 노드는 이 코디네이터에 조인합니다.

# operator/controllers/cluster.py
import asyncio
import httpx
 
async def wait_for_cluster_ready(name: str, namespace: str, replicas: int, logger):
    """클러스터가 완전히 구성될 때까지 대기"""
    headless = f"{name}-headless"
    coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
    for attempt in range(60):  # 최대 10분 대기
        try:
            async with httpx.AsyncClient(verify=False) as client:
                resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
                if resp.status_code == 200:
                    cluster_info = resp.json()
                    connected = sum(
                        1 for n in cluster_info['cluster']['nodes']
                        if n['status'] == 'CONNECTED'
                    )
                    logger.info(f"Cluster nodes connected: {connected}/{replicas}")
                    if connected >= replicas:
                        return True
        except Exception as e:
            logger.debug(f"Waiting for cluster... ({attempt}/60): {e}")
 
        await asyncio.sleep(10)
 
    return False

상태 모니터링 타이머

# operator/main.py
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def monitor_cluster(spec, name, namespace, patch, logger, **kwargs):
    """30초마다 클러스터 상태 확인"""
    try:
        headless = f"{name}-headless"
        coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
        async with httpx.AsyncClient(verify=False) as client:
            resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
 
            if resp.status_code == 200:
                cluster = resp.json()['cluster']
                connected = sum(1 for n in cluster['nodes'] if n['status'] == 'CONNECTED')
                coordinator = next(
                    (n['address'] for n in cluster['nodes']
                     if 'PRIMARY' in n.get('roles', [])),
                    'unknown'
                )
                patch.status['readyNodes'] = connected
                patch.status['clusterCoordinator'] = coordinator
                patch.status['phase'] = 'Running'
            else:
                patch.status['phase'] = 'Error'
                patch.status['message'] = f"API returned {resp.status_code}"
 
    except Exception as e:
        logger.warning(f"Health check failed: {e}")
        patch.status['phase'] = 'Error'
        patch.status['message'] = str(e)

7. 스케일링 (Scale Up/Down)

Update 핸들러

@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.replicas')
async def on_replicas_change(spec, name, namespace, old, new, logger, patch, **kwargs):
    """replicas 변경 감지 → 스케일링 수행"""
    old_replicas = old
    new_replicas = new
 
    logger.info(f"Scaling '{name}': {old_replicas}{new_replicas}")
    patch.status['phase'] = 'Scaling'
 
    if new_replicas > old_replicas:
        await scale_up(name, namespace, old_replicas, new_replicas, spec, logger)
    elif new_replicas < old_replicas:
        await scale_down(name, namespace, old_replicas, new_replicas, logger)
 
    # ConfigMap 업데이트 (노드 목록 변경)
    configmap = build_nifi_config(name, namespace, spec)
    core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
 
    # StatefulSet replicas 업데이트
    apps_v1.patch_namespaced_stateful_set(
        f"{name}-nifi", namespace,
        {'spec': {'replicas': new_replicas}}
    )
 
    patch.status['phase'] = 'Running'

안전한 Scale Down

NiFi 노드를 제거할 때는 반드시 Offload → Disconnect → Delete 순서를 지켜야 합니다.

# operator/controllers/scaling.py
async def scale_down(name: str, namespace: str, old: int, new: int, logger):
    """안전한 스케일 다운: 상위 노드부터 제거"""
    headless = f"{name}-headless"
    coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
    async with httpx.AsyncClient(verify=False) as client:
        # 제거할 노드 (인덱스 큰 것부터)
        for i in range(old - 1, new - 1, -1):
            node_address = f"{name}-nifi-{i}.{headless}.{namespace}.svc.cluster.local"
            logger.info(f"Offloading node: {node_address}")
 
            # 1. 클러스터에서 노드 ID 조회
            resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
            cluster = resp.json()['cluster']
            node = next(
                (n for n in cluster['nodes'] if node_address in n['address']),
                None
            )
 
            if not node:
                logger.warning(f"Node {node_address} not found in cluster")
                continue
 
            node_id = node['nodeIdentifier']['id']
 
            # 2. OFFLOADING: 데이터를 다른 노드로 이전
            await client.put(
                f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
                json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'OFFLOADING'}}
            )
            logger.info(f"Node {i} offloading started")
 
            # 3. OFFLOADING 완료 대기
            for _ in range(60):
                resp = await client.get(
                    f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
                )
                status = resp.json()['node']['status']
                if status == 'OFFLOADED':
                    break
                await asyncio.sleep(5)
 
            # 4. DISCONNECTING
            await client.put(
                f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
                json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'DISCONNECTING'}}
            )
            logger.info(f"Node {i} disconnected")
 
            # 5. 클러스터에서 제거
            await client.delete(
                f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
            )
            logger.info(f"Node {i} removed from cluster")

8. 설정 변경 관리

롤링 재시작

@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.config')
async def on_config_change(spec, name, namespace, logger, patch, **kwargs):
    """설정 변경 시 ConfigMap 업데이트 + 롤링 재시작"""
    logger.info(f"Config changed for '{name}', performing rolling restart")
    patch.status['phase'] = 'Updating'
 
    # 1. ConfigMap 업데이트
    configmap = build_nifi_config(name, namespace, spec)
    core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
 
    # 2. 롤링 재시작 (annotation 변경으로 트리거)
    import datetime
    restart_annotation = {
        'spec': {
            'template': {
                'metadata': {
                    'annotations': {
                        'nifi-operator/restartedAt': datetime.datetime.now().isoformat()
                    }
                }
            }
        }
    }
    apps_v1.patch_namespaced_stateful_set(f"{name}-nifi", namespace, restart_annotation)
    logger.info("Rolling restart triggered")
 
    # 3. 롤링 재시작 완료 대기
    replicas = spec['replicas']
    for _ in range(120):
        sts = apps_v1.read_namespaced_stateful_set(f"{name}-nifi", namespace)
        if sts.status.ready_replicas == replicas and sts.status.updated_replicas == replicas:
            break
        await asyncio.sleep(5)
 
    patch.status['phase'] = 'Running'
    logger.info("Rolling restart completed")

9. TLS/SSL 인증서 자동화

cert-manager Certificate CR 생성

# operator/resources/certificate.py
from kubernetes import client
 
 
def build_tls_certificate(name: str, namespace: str, spec: dict, ordinal: int) -> dict:
    """cert-manager Certificate 리소스 생성"""
    tls = spec.get('tls', {})
    headless = f"{name}-headless"
 
    return {
        'apiVersion': 'cert-manager.io/v1',
        'kind': 'Certificate',
        'metadata': {
            'name': f'{name}-nifi-{ordinal}-tls',
            'namespace': namespace,
        },
        'spec': {
            'secretName': f'{name}-nifi-{ordinal}-tls',
            'issuerRef': {
                'name': tls['issuerRef']['name'],
                'kind': tls['issuerRef'].get('kind', 'ClusterIssuer'),
            },
            'commonName': f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
            'dnsNames': [
                f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
                f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc',
                f'{name}-nifi-{ordinal}.{headless}',
                f'{name}-nifi-{ordinal}',
                'localhost',
            ],
            'duration': '8760h',    # 1년
            'renewBefore': '720h',  # 30일 전 갱신
            'privateKey': {
                'algorithm': 'RSA',
                'size': 2048,
            },
            'keystores': {
                'jks': {
                    'create': True,
                    'passwordSecretRef': {
                        'name': f'{name}-keystore-password',
                        'key': 'password',
                    }
                }
            }
        }
    }
# TLS 활성화 시 인증서 생성 (on_create에 추가)
if spec.get('tls', {}).get('enabled', False):
    custom_api = client.CustomObjectsApi()
    for i in range(spec['replicas']):
        cert = build_tls_certificate(name, namespace, spec, i)
        custom_api.create_namespaced_custom_object(
            group='cert-manager.io', version='v1',
            namespace=namespace, plural='certificates', body=cert
        )
    logger.info(f"TLS certificates created for {spec['replicas']} nodes")

Level 3: 고급 — 프로덕션 수준 Operator


10. NiFi Flow 배포 자동화

NiFiFlow CRD

# deploy/crds/nififlow-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: nififlows.datadynamics.io
spec:
  group: datadynamics.io
  names:
    kind: NiFiFlow
    plural: nififlows
    singular: nififlow
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                clusterRef:
                  type: string
                  description: "NiFiCluster 이름"
                registryUrl:
                  type: string
                  description: "NiFi Registry URL"
                bucketId:
                  type: string
                flowId:
                  type: string
                flowVersion:
                  type: integer
                processGroupId:
                  type: string
                  default: "root"
                autoStart:
                  type: boolean
                  default: true

Flow 배포 핸들러

@kopf.on.create('datadynamics.io', 'v1alpha1', 'nififlows')
async def on_flow_create(spec, name, namespace, logger, patch, **kwargs):
    """NiFiFlow CR 생성 → NiFi에 Flow 배포"""
    cluster_name = spec['clusterRef']
    headless = f"{cluster_name}-headless"
    nifi_url = f"https://{cluster_name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
    async with httpx.AsyncClient(verify=False) as client:
        # 1. NiFi Registry에서 Flow 가져오기
        flow_payload = {
            'registryId': await get_registry_id(client, nifi_url, spec['registryUrl']),
            'bucketId': spec['bucketId'],
            'flowId': spec['flowId'],
            'version': spec['flowVersion'],
        }
 
        # 2. Process Group에 Flow 배포
        pg_id = spec.get('processGroupId', 'root')
        resp = await client.post(
            f"{nifi_url}/nifi-api/process-groups/{pg_id}/process-groups",
            json={
                'revision': {'version': 0},
                'component': {
                    'position': {'x': 0, 'y': 0},
                    'versionControlInformation': flow_payload,
                }
            }
        )
 
        if resp.status_code == 201:
            deployed_pg = resp.json()
            logger.info(f"Flow deployed: {deployed_pg['id']}")
 
            # 3. 자동 시작
            if spec.get('autoStart', True):
                await client.put(
                    f"{nifi_url}/nifi-api/flow/process-groups/{deployed_pg['id']}",
                    json={'id': deployed_pg['id'], 'state': 'RUNNING'}
                )
                logger.info("Flow started")
 
            patch.status['deployedProcessGroupId'] = deployed_pg['id']
            patch.status['phase'] = 'Running'
        else:
            logger.error(f"Flow deployment failed: {resp.text}")
            patch.status['phase'] = 'Error'

GitOps 워크플로

[Git → NiFiFlow CR → Operator → NiFi]

1. 개발자가 Flow를 NiFi Registry에 커밋
2. Flow 버전을 NiFiFlow CR YAML에 업데이트
3. Git에 커밋 + Push
4. ArgoCD/FluxCD가 CR 변경 감지 → K8s에 적용
5. Operator가 NiFiFlow CR 변경 감지
6. NiFi REST API로 Flow 자동 배포/업데이트

11. 모니터링과 자동 복구

Prometheus 메트릭 노출

# operator/main.py
from prometheus_client import start_http_server, Gauge, Counter
 
# 메트릭 정의
nifi_cluster_nodes = Gauge('nifi_cluster_nodes_total', 'Total NiFi nodes', ['cluster'])
nifi_cluster_ready = Gauge('nifi_cluster_nodes_ready', 'Ready NiFi nodes', ['cluster'])
nifi_scaling_events = Counter('nifi_scaling_events_total', 'Scaling events', ['cluster', 'direction'])
nifi_reconcile_errors = Counter('nifi_reconcile_errors_total', 'Reconciliation errors', ['cluster'])
 
# Prometheus 메트릭 서버 시작
start_http_server(8080)
 
# 타이머에서 메트릭 업데이트
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def update_metrics(spec, name, status, **kwargs):
    nifi_cluster_nodes.labels(cluster=name).set(spec.get('replicas', 0))
    nifi_cluster_ready.labels(cluster=name).set(status.get('readyNodes', 0))

자동 복구

@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=60)
async def auto_heal(spec, name, namespace, patch, logger, **kwargs):
    """장애 노드 자동 감지 및 복구"""
    try:
        headless = f"{name}-headless"
        url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
 
        async with httpx.AsyncClient(verify=False) as client:
            resp = await client.get(f"{url}/nifi-api/controller/cluster")
            if resp.status_code != 200:
                return
 
            cluster = resp.json()['cluster']
            for node in cluster['nodes']:
                if node['status'] == 'DISCONNECTED':
                    node_id = node['nodeIdentifier']['id']
                    node_addr = node['address']
                    logger.warning(f"Disconnected node detected: {node_addr}")
 
                    # 재연결 시도
                    await client.put(
                        f"{url}/nifi-api/controller/cluster/nodes/{node_id}",
                        json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'CONNECTING'}}
                    )
                    logger.info(f"Reconnection initiated for {node_addr}")
 
    except Exception as e:
        nifi_reconcile_errors.labels(cluster=name).inc()
        logger.error(f"Auto-heal error: {e}")

12. LDAP/OIDC 인증 자동화

인증 설정 자동 생성

def build_auth_config(spec: dict) -> dict:
    """인증 타입에 따른 설정 파일 생성"""
    auth = spec.get('auth', {})
    auth_type = auth.get('type', 'single-user')
 
    if auth_type == 'ldap':
        login_identity_providers = f"""
<loginIdentityProviders>
    <provider>
        <identifier>ldap-provider</identifier>
        <class>org.apache.nifi.ldap.LdapProvider</class>
        <property name="Authentication Strategy">SIMPLE</property>
        <property name="Manager DN"></property>
        <property name="Manager Password"></property>
        <property name="Url">{auth['ldap']['url']}</property>
        <property name="User Search Base">{auth['ldap']['searchBase']}</property>
        <property name="User Search Filter">{auth['ldap']['searchFilter']}</property>
    </provider>
</loginIdentityProviders>"""
        return {'login-identity-providers.xml': login_identity_providers}
 
    elif auth_type == 'oidc':
        nifi_properties_auth = f"""
nifi.security.user.oidc.discovery.url={auth['oidc']['discoveryUrl']}
nifi.security.user.oidc.client.id={auth['oidc']['clientId']}
nifi.security.user.oidc.client.secret=${{OIDC_CLIENT_SECRET}}
"""
        return {'auth.properties': nifi_properties_auth}
 
    return {}

13. Operator 배포와 운영

Dockerfile

FROM python:3.11-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY operator/ ./operator/
 
USER 1000:1000
 
CMD ["kopf", "run", "operator/main.py", "--all-namespaces", "--verbose"]

RBAC

# deploy/rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nifi-operator
  namespace: nifi-operator-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: nifi-operator-role
rules:
  - apiGroups: ["datadynamics.io"]
    resources: ["nificlusters", "nificlusters/status", "nififlows", "nififlows/status"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: ["apps"]
    resources: ["statefulsets"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["services", "configmaps", "secrets", "pods", "persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: ["cert-manager.io"]
    resources: ["certificates"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: nifi-operator-binding
subjects:
  - kind: ServiceAccount
    name: nifi-operator
    namespace: nifi-operator-system
roleRef:
  kind: ClusterRole
  name: nifi-operator-role
  apiGroup: rbac.authorization.k8s.io

Operator Deployment

# deploy/operator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nifi-operator
  namespace: nifi-operator-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nifi-operator
  template:
    metadata:
      labels:
        app: nifi-operator
    spec:
      serviceAccountName: nifi-operator
      containers:
        - name: operator
          image: datadynamics/nifi-operator:latest
          ports:
            - containerPort: 8080
              name: metrics
          resources:
            requests:
              cpu: 100m
              memory: 256Mi
            limits:
              cpu: 500m
              memory: 512Mi
          env:
            - name: OPERATOR_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace

배포 순서

# 1. Operator 네임스페이스 생성
kubectl create namespace nifi-operator-system
 
# 2. CRDs 등록
kubectl apply -f deploy/crds/
 
# 3. RBAC 설정
kubectl apply -f deploy/rbac.yaml
 
# 4. Operator 배포
kubectl apply -f deploy/operator-deployment.yaml
 
# 5. Operator 로그 확인
kubectl logs -f deployment/nifi-operator -n nifi-operator-system
 
# 6. NiFi 클러스터 생성
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
 
# 7. 상태 확인
kubectl get nifi -n nifi -w

트러블슈팅 가이드

문제원인해결
Pod CrashLoopBackOffJVM 메모리 부족spec.config.jvmHeapSize 증가
클러스터 조인 실패DNS 해석 불가Headless Service publishNotReadyAddresses: true 확인
노드 DISCONNECTED네트워크 타임아웃nifi.cluster.node.connection.timeout 증가
Flow Election 실패노드 시작 시간 차이flow.election.max.wait.time 증가
TLS 핸드셰이크 실패인증서 SAN 불일치cert-manager Certificate dnsNames 확인
OOM Killed컨테이너 메모리 한도 초과spec.resources.limits.memory 증가
PVC PendingStorageClass 없음spec.storage.storageClassName 확인
Operator 권한 오류RBAC 부족ClusterRole rules 확인

참고: 이 Operator는 학습/예제용입니다. 프로덕션 사용 시에는 에러 처리 강화, Finalizer 추가, 통합 테스트, Helm 차트 패키징 등 추가 작업이 필요합니다.


References


— Data Dynamics 엔지니어링 팀