Python으로 NiFi 2.x Cluster Kubernetes Operator 만들기 - 기초부터 프로덕션까지
Python(kopf)으로 Apache NiFi 2.x 클러스터를 관리하는 Kubernetes Operator를 단계별로 구현합니다. CRD 설계, 클러스터 자동 구성, 스케일링, TLS, Flow 배포, 모니터링까지 3단계 역량 레벨로 다룹니다.
Apache NiFi 2.x는 ZooKeeper 의존성을 제거하고 자체 Raft 기반 클러스터링을 도입했습니다. 이 글에서는 Python으로 NiFi 2.x 클러스터를 Kubernetes에서 자동으로 관리하는 Operator를 3단계 역량 레벨로 구현합니다.
Level 1: 기초 — Operator 개념과 단일 NiFi 노드
1. Kubernetes Operator란 무엇인가
Operator 패턴
Kubernetes Operator는 사람이 수행하던 운영 작업을 자동화하는 소프트웨어입니다. CRD(Custom Resource Definition)로 원하는 상태를 선언하면, Controller가 실제 상태를 원하는 상태와 일치시킵니다.
[Operator 패턴]
사용자가 선언: Operator가 수행:
┌─────────────────┐ ┌─────────────────────────┐
│ NiFiCluster CR │ │ Controller (Python) │
│ │ ─────────→ │ │
│ replicas: 3 │ 감시/조정 │ 1. StatefulSet 3개 생성 │
│ version: 2.4.0 │ │ 2. 설정 파일 생성 │
│ tls: true │ │ 3. 인증서 발급 │
└─────────────────┘ │ 4. 클러스터 조인 확인 │
│ 5. 장애 시 자동 복구 │
└─────────────────────────┘
왜 NiFi에 Operator가 필요한가
| 수동 관리의 문제 | Operator의 해결 |
|---|---|
| 노드 추가 시 수동으로 nifi.properties 수정 | 노드 수 변경만으로 자동 설정 |
| TLS 인증서 수동 생성/갱신 | cert-manager 연동 자동화 |
| 스케일 다운 시 데이터 유실 위험 | 안전한 오프보딩 자동 수행 |
| 설정 변경 시 전체 재시작 | 롤링 재시작 자동화 |
| 장애 노드 수동 복구 | 자동 감지 + 복구 |
| Flow 배포 수동 관리 | GitOps 기반 자동 배포 |
2. NiFi 2.x 클러스터링 아키텍처
NiFi 1.x → 2.x 변경사항
| 항목 | NiFi 1.x | NiFi 2.x |
|---|---|---|
| 클러스터 코디네이션 | ZooKeeper 필수 | 자체 Raft 기반 (ZK 제거) |
| 노드 디스커버리 | ZooKeeper | 초기 노드 목록 (nifi.properties) |
| Primary Node 선출 | ZooKeeper | 자체 선출 메커니즘 |
| Flow 동기화 | ZooKeeper | 클러스터 코디네이터 직접 관리 |
| 의존 컴포넌트 | NiFi + ZooKeeper (3~5대) | NiFi만 |
| 운영 복잡도 | 높음 (ZK 관리 필요) | 낮음 |
Kubernetes에서의 노드 디스커버리
NiFi 2.x에서는 ZooKeeper 대신 초기 노드 목록을 직접 지정합니다. Kubernetes의 Headless Service와 DNS를 활용하면 자동으로 노드를 발견할 수 있습니다.
[K8s 기반 NiFi 클러스터 구조]
Headless Service: nifi-cluster-headless (ClusterIP: None)
│
├─ nifi-0.nifi-cluster-headless.nifi.svc.cluster.local
├─ nifi-1.nifi-cluster-headless.nifi.svc.cluster.local
└─ nifi-2.nifi-cluster-headless.nifi.svc.cluster.local
nifi.properties (각 노드):
nifi.cluster.is.node=true
nifi.cluster.node.address=nifi-{ordinal}.nifi-cluster-headless.{namespace}.svc.cluster.local
nifi.cluster.node.protocol.port=11443
nifi.cluster.flow.election.max.candidates=3
3. 개발 환경 준비
사전 요구사항
# Python 3.11+
python --version
# kubectl
kubectl version --client
# kind (로컬 K8s 클러스터)
kind create cluster --name nifi-dev
# Python 의존성 설치
pip install kopf kubernetes pyyaml jinja2 httpxkopf 소개
kopf(Kubernetes Operator Pythonic Framework)는 Python으로 Kubernetes Operator를 쉽게 개발할 수 있는 프레임워크입니다.
import kopf
@kopf.on.create('nificlusters')
def create_fn(spec, name, namespace, **kwargs):
"""NiFiCluster 리소스 생성 시 호출"""
print(f"NiFi 클러스터 '{name}' 생성 요청: {spec}")
@kopf.on.update('nificlusters')
def update_fn(spec, name, namespace, diff, **kwargs):
"""NiFiCluster 리소스 변경 시 호출"""
print(f"NiFi 클러스터 '{name}' 변경: {diff}")
@kopf.on.delete('nificlusters')
def delete_fn(name, namespace, **kwargs):
"""NiFiCluster 리소스 삭제 시 호출"""
print(f"NiFi 클러스터 '{name}' 삭제")프로젝트 구조
nifi-operator/
├── operator/
│ ├── __init__.py
│ ├── main.py # kopf 핸들러 (진입점)
│ ├── controllers/
│ │ ├── __init__.py
│ │ ├── cluster.py # NiFiCluster 컨트롤러
│ │ ├── flow.py # NiFiFlow 컨트롤러
│ │ └── scaling.py # 스케일링 로직
│ ├── resources/
│ │ ├── __init__.py
│ │ ├── statefulset.py # StatefulSet 생성
│ │ ├── service.py # Service 생성
│ │ ├── configmap.py # ConfigMap 생성
│ │ └── certificate.py # TLS 인증서
│ ├── nifi_api/
│ │ ├── __init__.py
│ │ └── client.py # NiFi REST API 클라이언트
│ └── templates/
│ ├── nifi.properties.j2
│ ├── bootstrap.conf.j2
│ └── authorizers.xml.j2
├── deploy/
│ ├── crds/
│ │ └── nificluster-crd.yaml
│ ├── rbac.yaml
│ └── operator-deployment.yaml
├── Dockerfile
├── requirements.txt
└── README.md
4. CRD (Custom Resource Definition) 설계
NiFiCluster CRD
# deploy/crds/nificluster-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: nificlusters.datadynamics.io
spec:
group: datadynamics.io
names:
kind: NiFiCluster
plural: nificlusters
singular: nificluster
shortNames: ["nifi"]
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required: ["replicas", "version"]
properties:
replicas:
type: integer
minimum: 1
maximum: 20
description: "NiFi 클러스터 노드 수"
version:
type: string
description: "NiFi Docker 이미지 버전"
image:
type: string
default: "apache/nifi"
description: "NiFi Docker 이미지"
resources:
type: object
properties:
requests:
type: object
properties:
cpu: { type: string, default: "1" }
memory: { type: string, default: "2Gi" }
limits:
type: object
properties:
cpu: { type: string, default: "2" }
memory: { type: string, default: "4Gi" }
storage:
type: object
properties:
size: { type: string, default: "10Gi" }
storageClassName: { type: string }
config:
type: object
properties:
jvmHeapSize: { type: string, default: "1g" }
maxThreads: { type: integer, default: 10 }
sensitivePropsKey: { type: string }
tls:
type: object
properties:
enabled: { type: boolean, default: false }
issuerRef:
type: object
properties:
name: { type: string }
kind: { type: string, default: "ClusterIssuer" }
auth:
type: object
properties:
type: { type: string, enum: ["single-user", "ldap", "oidc"] }
ldap:
type: object
properties:
url: { type: string }
searchBase: { type: string }
searchFilter: { type: string }
oidc:
type: object
properties:
discoveryUrl: { type: string }
clientId: { type: string }
clientSecretRef: { type: string }
status:
type: object
properties:
phase:
type: string
enum: ["Creating", "Running", "Scaling", "Updating", "Error"]
readyNodes:
type: integer
clusterCoordinator:
type: string
message:
type: string
subresources:
status: {}
additionalPrinterColumns:
- name: Replicas
type: integer
jsonPath: .spec.replicas
- name: Ready
type: integer
jsonPath: .status.readyNodes
- name: Phase
type: string
jsonPath: .status.phase
- name: Version
type: string
jsonPath: .spec.version
- name: Age
type: date
jsonPath: .metadata.creationTimestampNiFiCluster CR 예시
# examples/nifi-cluster.yaml
apiVersion: datadynamics.io/v1alpha1
kind: NiFiCluster
metadata:
name: my-nifi
namespace: nifi
spec:
replicas: 3
version: "2.4.0"
image: "apache/nifi"
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
storage:
size: "20Gi"
storageClassName: "standard"
config:
jvmHeapSize: "1536m"
maxThreads: 15
sensitivePropsKey: "my-secret-key-12345"
tls:
enabled: true
issuerRef:
name: letsencrypt-prod
kind: ClusterIssuer
auth:
type: single-user# CRD 등록 및 CR 생성
kubectl apply -f deploy/crds/nificluster-crd.yaml
kubectl apply -f examples/nifi-cluster.yaml
# 확인
kubectl get nifi
# NAME REPLICAS READY PHASE VERSION AGE
# my-nifi 3 3 Running 2.4.0 5m5. 첫 번째 Operator: 단일 NiFi 노드 배포
핵심 핸들러 구현
# operator/main.py
import kopf
import kubernetes
from kubernetes import client
from operator.resources.statefulset import build_statefulset
from operator.resources.service import build_headless_service, build_ui_service
from operator.resources.configmap import build_nifi_config
kubernetes.config.load_incluster_config() # 클러스터 내부
# kubernetes.config.load_kube_config() # 로컬 개발
apps_v1 = client.AppsV1Api()
core_v1 = client.CoreV1Api()
@kopf.on.create('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_create(spec, name, namespace, logger, patch, **kwargs):
"""NiFiCluster 생성 시 리소스 프로비저닝"""
logger.info(f"Creating NiFi cluster '{name}' with {spec['replicas']} nodes")
# 상태 업데이트
patch.status['phase'] = 'Creating'
patch.status['readyNodes'] = 0
# 1. ConfigMap 생성 (nifi.properties 등)
configmap = build_nifi_config(name, namespace, spec)
kopf.adopt(configmap) # Owner Reference 설정
core_v1.create_namespaced_config_map(namespace, configmap)
logger.info(f"ConfigMap '{configmap.metadata.name}' created")
# 2. Headless Service 생성 (노드 디스커버리용)
headless_svc = build_headless_service(name, namespace)
kopf.adopt(headless_svc)
core_v1.create_namespaced_service(namespace, headless_svc)
logger.info(f"Headless Service created")
# 3. UI Service 생성 (외부 접근용)
ui_svc = build_ui_service(name, namespace, spec)
kopf.adopt(ui_svc)
core_v1.create_namespaced_service(namespace, ui_svc)
logger.info(f"UI Service created")
# 4. StatefulSet 생성
statefulset = build_statefulset(name, namespace, spec)
kopf.adopt(statefulset)
apps_v1.create_namespaced_stateful_set(namespace, statefulset)
logger.info(f"StatefulSet created with {spec['replicas']} replicas")
patch.status['phase'] = 'Running'
return {'message': f"NiFi cluster '{name}' created successfully"}
@kopf.on.delete('datadynamics.io', 'v1alpha1', 'nificlusters')
def on_delete(name, namespace, logger, **kwargs):
"""NiFiCluster 삭제 — Owner Reference로 자동 삭제됨"""
logger.info(f"NiFi cluster '{name}' is being deleted (cascade)")
# kopf.adopt()으로 설정한 Owner Reference에 의해
# StatefulSet, Service, ConfigMap이 자동으로 삭제됨StatefulSet 생성 로직
# operator/resources/statefulset.py
from kubernetes import client
def build_statefulset(name: str, namespace: str, spec: dict) -> client.V1StatefulSet:
"""NiFi StatefulSet 생성"""
replicas = spec.get('replicas', 1)
version = spec['version']
image = spec.get('image', 'apache/nifi')
resources = spec.get('resources', {})
storage = spec.get('storage', {})
config = spec.get('config', {})
# 컨테이너 정의
container = client.V1Container(
name='nifi',
image=f"{image}:{version}",
ports=[
client.V1ContainerPort(container_port=8443, name='https'),
client.V1ContainerPort(container_port=11443, name='cluster'),
client.V1ContainerPort(container_port=6342, name='load-balance'),
],
env=[
client.V1EnvVar(name='NIFI_JVM_HEAP_INIT', value=config.get('jvmHeapSize', '1g')),
client.V1EnvVar(name='NIFI_JVM_HEAP_MAX', value=config.get('jvmHeapSize', '1g')),
client.V1EnvVar(
name='NIFI_POD_NAME',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.name')
)
),
client.V1EnvVar(
name='NIFI_POD_NAMESPACE',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace')
)
),
],
resources=client.V1ResourceRequirements(
requests={
'cpu': resources.get('requests', {}).get('cpu', '1'),
'memory': resources.get('requests', {}).get('memory', '2Gi'),
},
limits={
'cpu': resources.get('limits', {}).get('cpu', '2'),
'memory': resources.get('limits', {}).get('memory', '4Gi'),
}
),
volume_mounts=[
client.V1VolumeMount(name='data', mount_path='/opt/nifi/nifi-current/data'),
client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/nifi.properties',
sub_path='nifi.properties'),
client.V1VolumeMount(name='config', mount_path='/opt/nifi/nifi-current/conf/bootstrap.conf',
sub_path='bootstrap.conf'),
],
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path='/nifi-api/controller/cluster',
port=8443,
scheme='HTTPS'
),
initial_delay_seconds=90,
period_seconds=20,
failure_threshold=10,
),
liveness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path='/nifi-api/system-diagnostics',
port=8443,
scheme='HTTPS'
),
initial_delay_seconds=120,
period_seconds=30,
failure_threshold=5,
),
)
# Init Container: nifi.properties에서 Pod 이름 치환
init_container = client.V1Container(
name='init-config',
image='busybox:1.36',
command=['sh', '-c', '''
cp /config-template/nifi.properties /config-output/nifi.properties
sed -i "s/NIFI_POD_NAME/$NIFI_POD_NAME/g" /config-output/nifi.properties
sed -i "s/NIFI_POD_NAMESPACE/$NIFI_POD_NAMESPACE/g" /config-output/nifi.properties
'''],
env=[
client.V1EnvVar(name='NIFI_POD_NAME',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.name'))),
client.V1EnvVar(name='NIFI_POD_NAMESPACE',
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path='metadata.namespace'))),
],
volume_mounts=[
client.V1VolumeMount(name='config-template', mount_path='/config-template'),
client.V1VolumeMount(name='config', mount_path='/config-output'),
]
)
# PVC 템플릿
pvc_template = client.V1PersistentVolumeClaim(
metadata=client.V1ObjectMeta(name='data'),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=['ReadWriteOnce'],
storage_class_name=storage.get('storageClassName'),
resources=client.V1VolumeResourceRequirements(
requests={'storage': storage.get('size', '10Gi')}
)
)
)
return client.V1StatefulSet(
metadata=client.V1ObjectMeta(name=f"{name}-nifi", namespace=namespace),
spec=client.V1StatefulSetSpec(
replicas=replicas,
service_name=f"{name}-headless",
pod_management_policy='OrderedReady',
selector=client.V1LabelSelector(
match_labels={'app': 'nifi', 'cluster': name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={'app': 'nifi', 'cluster': name}
),
spec=client.V1PodSpec(
init_containers=[init_container],
containers=[container],
volumes=[
client.V1Volume(
name='config-template',
config_map=client.V1ConfigMapVolumeSource(
name=f"{name}-config"
)
),
client.V1Volume(
name='config',
empty_dir=client.V1EmptyDirVolumeSource()
),
],
termination_grace_period_seconds=60,
)
),
volume_claim_templates=[pvc_template],
)
)Service 생성 로직
# operator/resources/service.py
from kubernetes import client
def build_headless_service(name: str, namespace: str) -> client.V1Service:
"""Headless Service (노드 디스커버리용)"""
return client.V1Service(
metadata=client.V1ObjectMeta(
name=f"{name}-headless",
namespace=namespace,
),
spec=client.V1ServiceSpec(
cluster_ip='None', # Headless
selector={'app': 'nifi', 'cluster': name},
ports=[
client.V1ServicePort(name='https', port=8443, target_port=8443),
client.V1ServicePort(name='cluster', port=11443, target_port=11443),
client.V1ServicePort(name='load-balance', port=6342, target_port=6342),
],
publish_not_ready_addresses=True, # 시작 전에도 DNS 등록
)
)
def build_ui_service(name: str, namespace: str, spec: dict) -> client.V1Service:
"""UI Service (웹 UI 접근용)"""
return client.V1Service(
metadata=client.V1ObjectMeta(
name=f"{name}-ui",
namespace=namespace,
),
spec=client.V1ServiceSpec(
type='ClusterIP',
selector={'app': 'nifi', 'cluster': name},
ports=[
client.V1ServicePort(name='https', port=8443, target_port=8443),
]
)
)ConfigMap 생성 (nifi.properties)
# operator/resources/configmap.py
from kubernetes import client
from jinja2 import Template
NIFI_PROPERTIES_TEMPLATE = """
# NiFi 2.x Properties (Generated by Operator)
# Web Properties
nifi.web.https.host=0.0.0.0
nifi.web.https.port=8443
nifi.web.proxy.host=
# Cluster Properties
nifi.cluster.is.node=true
nifi.cluster.node.address=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.node.protocol.port=11443
nifi.cluster.node.protocol.max.threads={{ max_threads }}
nifi.cluster.flow.election.max.wait.time=1 min
nifi.cluster.flow.election.max.candidates={{ replicas }}
# Load Balancing
nifi.cluster.load.balance.host=NIFI_POD_NAME.{{ headless_service }}.NIFI_POD_NAMESPACE.svc.cluster.local
nifi.cluster.load.balance.port=6342
# Initial Node Identities (Kubernetes DNS 기반)
{% for i in range(replicas) %}
nifi.cluster.node.{{ i + 1 }}={{ name }}-nifi-{{ i }}.{{ headless_service }}.{{ namespace }}.svc.cluster.local:11443
{% endfor %}
# State Management
nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.provider.cluster=local-provider
# Security Properties
nifi.sensitive.props.key={{ sensitive_props_key }}
nifi.security.autoreload.enabled=true
# Performance
nifi.bored.yield.duration=10 millis
nifi.queue.backpressure.count=10000
nifi.queue.backpressure.size=1 GB
# Data Directories
nifi.flowfile.repository.directory=./data/flowfile_repository
nifi.content.repository.directory.default=./data/content_repository
nifi.provenance.repository.directory.default=./data/provenance_repository
nifi.database.directory=./data/database_repository
"""
def build_nifi_config(name: str, namespace: str, spec: dict) -> client.V1ConfigMap:
"""NiFi 설정 ConfigMap 생성"""
replicas = spec.get('replicas', 1)
config = spec.get('config', {})
headless_service = f"{name}-headless"
template = Template(NIFI_PROPERTIES_TEMPLATE)
nifi_properties = template.render(
name=name,
namespace=namespace,
headless_service=headless_service,
replicas=replicas,
max_threads=config.get('maxThreads', 10),
sensitive_props_key=config.get('sensitivePropsKey', 'default-key-change-me'),
)
bootstrap_conf = f"""
# Bootstrap Config (Generated by Operator)
java.arg.2=-Xms{config.get('jvmHeapSize', '1g')}
java.arg.3=-Xmx{config.get('jvmHeapSize', '1g')}
java.arg.14=-Djava.protocol.handler.pkgs=sun.net.www.protocol
"""
return client.V1ConfigMap(
metadata=client.V1ObjectMeta(
name=f"{name}-config",
namespace=namespace,
labels={'app': 'nifi', 'cluster': name}
),
data={
'nifi.properties': nifi_properties,
'bootstrap.conf': bootstrap_conf,
}
)동작 확인
# 1. CRD 등록
kubectl apply -f deploy/crds/nificluster-crd.yaml
# 2. Operator 로컬 실행 (개발 모드)
kopf run operator/main.py --verbose
# 3. 다른 터미널에서 NiFi 클러스터 생성
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
# 4. 상태 확인
kubectl get nifi -n nifi
kubectl get pods -n nifi
kubectl get svc -n nifi
# 5. NiFi UI 접근
kubectl port-forward svc/my-nifi-ui -n nifi 8443:8443
# https://localhost:8443/nifi 접속Level 2: 중급 — 클러스터 관리와 설정 자동화
6. NiFi 2.x 클러스터 자동 구성
클러스터 부트스트랩 순서 제어
NiFi 2.x 클러스터를 처음 구성할 때는 노드 시작 순서가 중요합니다. 첫 번째 노드가 클러스터 코디네이터로 선출되어야 하며, 나머지 노드는 이 코디네이터에 조인합니다.
# operator/controllers/cluster.py
import asyncio
import httpx
async def wait_for_cluster_ready(name: str, namespace: str, replicas: int, logger):
"""클러스터가 완전히 구성될 때까지 대기"""
headless = f"{name}-headless"
coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
for attempt in range(60): # 최대 10분 대기
try:
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
if resp.status_code == 200:
cluster_info = resp.json()
connected = sum(
1 for n in cluster_info['cluster']['nodes']
if n['status'] == 'CONNECTED'
)
logger.info(f"Cluster nodes connected: {connected}/{replicas}")
if connected >= replicas:
return True
except Exception as e:
logger.debug(f"Waiting for cluster... ({attempt}/60): {e}")
await asyncio.sleep(10)
return False상태 모니터링 타이머
# operator/main.py
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def monitor_cluster(spec, name, namespace, patch, logger, **kwargs):
"""30초마다 클러스터 상태 확인"""
try:
headless = f"{name}-headless"
coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
if resp.status_code == 200:
cluster = resp.json()['cluster']
connected = sum(1 for n in cluster['nodes'] if n['status'] == 'CONNECTED')
coordinator = next(
(n['address'] for n in cluster['nodes']
if 'PRIMARY' in n.get('roles', [])),
'unknown'
)
patch.status['readyNodes'] = connected
patch.status['clusterCoordinator'] = coordinator
patch.status['phase'] = 'Running'
else:
patch.status['phase'] = 'Error'
patch.status['message'] = f"API returned {resp.status_code}"
except Exception as e:
logger.warning(f"Health check failed: {e}")
patch.status['phase'] = 'Error'
patch.status['message'] = str(e)7. 스케일링 (Scale Up/Down)
Update 핸들러
@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.replicas')
async def on_replicas_change(spec, name, namespace, old, new, logger, patch, **kwargs):
"""replicas 변경 감지 → 스케일링 수행"""
old_replicas = old
new_replicas = new
logger.info(f"Scaling '{name}': {old_replicas} → {new_replicas}")
patch.status['phase'] = 'Scaling'
if new_replicas > old_replicas:
await scale_up(name, namespace, old_replicas, new_replicas, spec, logger)
elif new_replicas < old_replicas:
await scale_down(name, namespace, old_replicas, new_replicas, logger)
# ConfigMap 업데이트 (노드 목록 변경)
configmap = build_nifi_config(name, namespace, spec)
core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
# StatefulSet replicas 업데이트
apps_v1.patch_namespaced_stateful_set(
f"{name}-nifi", namespace,
{'spec': {'replicas': new_replicas}}
)
patch.status['phase'] = 'Running'안전한 Scale Down
NiFi 노드를 제거할 때는 반드시 Offload → Disconnect → Delete 순서를 지켜야 합니다.
# operator/controllers/scaling.py
async def scale_down(name: str, namespace: str, old: int, new: int, logger):
"""안전한 스케일 다운: 상위 노드부터 제거"""
headless = f"{name}-headless"
coordinator_url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
# 제거할 노드 (인덱스 큰 것부터)
for i in range(old - 1, new - 1, -1):
node_address = f"{name}-nifi-{i}.{headless}.{namespace}.svc.cluster.local"
logger.info(f"Offloading node: {node_address}")
# 1. 클러스터에서 노드 ID 조회
resp = await client.get(f"{coordinator_url}/nifi-api/controller/cluster")
cluster = resp.json()['cluster']
node = next(
(n for n in cluster['nodes'] if node_address in n['address']),
None
)
if not node:
logger.warning(f"Node {node_address} not found in cluster")
continue
node_id = node['nodeIdentifier']['id']
# 2. OFFLOADING: 데이터를 다른 노드로 이전
await client.put(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'OFFLOADING'}}
)
logger.info(f"Node {i} offloading started")
# 3. OFFLOADING 완료 대기
for _ in range(60):
resp = await client.get(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
)
status = resp.json()['node']['status']
if status == 'OFFLOADED':
break
await asyncio.sleep(5)
# 4. DISCONNECTING
await client.put(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}",
json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'DISCONNECTING'}}
)
logger.info(f"Node {i} disconnected")
# 5. 클러스터에서 제거
await client.delete(
f"{coordinator_url}/nifi-api/controller/cluster/nodes/{node_id}"
)
logger.info(f"Node {i} removed from cluster")8. 설정 변경 관리
롤링 재시작
@kopf.on.update('datadynamics.io', 'v1alpha1', 'nificlusters', field='spec.config')
async def on_config_change(spec, name, namespace, logger, patch, **kwargs):
"""설정 변경 시 ConfigMap 업데이트 + 롤링 재시작"""
logger.info(f"Config changed for '{name}', performing rolling restart")
patch.status['phase'] = 'Updating'
# 1. ConfigMap 업데이트
configmap = build_nifi_config(name, namespace, spec)
core_v1.patch_namespaced_config_map(f"{name}-config", namespace, configmap)
# 2. 롤링 재시작 (annotation 변경으로 트리거)
import datetime
restart_annotation = {
'spec': {
'template': {
'metadata': {
'annotations': {
'nifi-operator/restartedAt': datetime.datetime.now().isoformat()
}
}
}
}
}
apps_v1.patch_namespaced_stateful_set(f"{name}-nifi", namespace, restart_annotation)
logger.info("Rolling restart triggered")
# 3. 롤링 재시작 완료 대기
replicas = spec['replicas']
for _ in range(120):
sts = apps_v1.read_namespaced_stateful_set(f"{name}-nifi", namespace)
if sts.status.ready_replicas == replicas and sts.status.updated_replicas == replicas:
break
await asyncio.sleep(5)
patch.status['phase'] = 'Running'
logger.info("Rolling restart completed")9. TLS/SSL 인증서 자동화
cert-manager Certificate CR 생성
# operator/resources/certificate.py
from kubernetes import client
def build_tls_certificate(name: str, namespace: str, spec: dict, ordinal: int) -> dict:
"""cert-manager Certificate 리소스 생성"""
tls = spec.get('tls', {})
headless = f"{name}-headless"
return {
'apiVersion': 'cert-manager.io/v1',
'kind': 'Certificate',
'metadata': {
'name': f'{name}-nifi-{ordinal}-tls',
'namespace': namespace,
},
'spec': {
'secretName': f'{name}-nifi-{ordinal}-tls',
'issuerRef': {
'name': tls['issuerRef']['name'],
'kind': tls['issuerRef'].get('kind', 'ClusterIssuer'),
},
'commonName': f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
'dnsNames': [
f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc.cluster.local',
f'{name}-nifi-{ordinal}.{headless}.{namespace}.svc',
f'{name}-nifi-{ordinal}.{headless}',
f'{name}-nifi-{ordinal}',
'localhost',
],
'duration': '8760h', # 1년
'renewBefore': '720h', # 30일 전 갱신
'privateKey': {
'algorithm': 'RSA',
'size': 2048,
},
'keystores': {
'jks': {
'create': True,
'passwordSecretRef': {
'name': f'{name}-keystore-password',
'key': 'password',
}
}
}
}
}# TLS 활성화 시 인증서 생성 (on_create에 추가)
if spec.get('tls', {}).get('enabled', False):
custom_api = client.CustomObjectsApi()
for i in range(spec['replicas']):
cert = build_tls_certificate(name, namespace, spec, i)
custom_api.create_namespaced_custom_object(
group='cert-manager.io', version='v1',
namespace=namespace, plural='certificates', body=cert
)
logger.info(f"TLS certificates created for {spec['replicas']} nodes")Level 3: 고급 — 프로덕션 수준 Operator
10. NiFi Flow 배포 자동화
NiFiFlow CRD
# deploy/crds/nififlow-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: nififlows.datadynamics.io
spec:
group: datadynamics.io
names:
kind: NiFiFlow
plural: nififlows
singular: nififlow
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
clusterRef:
type: string
description: "NiFiCluster 이름"
registryUrl:
type: string
description: "NiFi Registry URL"
bucketId:
type: string
flowId:
type: string
flowVersion:
type: integer
processGroupId:
type: string
default: "root"
autoStart:
type: boolean
default: trueFlow 배포 핸들러
@kopf.on.create('datadynamics.io', 'v1alpha1', 'nififlows')
async def on_flow_create(spec, name, namespace, logger, patch, **kwargs):
"""NiFiFlow CR 생성 → NiFi에 Flow 배포"""
cluster_name = spec['clusterRef']
headless = f"{cluster_name}-headless"
nifi_url = f"https://{cluster_name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
# 1. NiFi Registry에서 Flow 가져오기
flow_payload = {
'registryId': await get_registry_id(client, nifi_url, spec['registryUrl']),
'bucketId': spec['bucketId'],
'flowId': spec['flowId'],
'version': spec['flowVersion'],
}
# 2. Process Group에 Flow 배포
pg_id = spec.get('processGroupId', 'root')
resp = await client.post(
f"{nifi_url}/nifi-api/process-groups/{pg_id}/process-groups",
json={
'revision': {'version': 0},
'component': {
'position': {'x': 0, 'y': 0},
'versionControlInformation': flow_payload,
}
}
)
if resp.status_code == 201:
deployed_pg = resp.json()
logger.info(f"Flow deployed: {deployed_pg['id']}")
# 3. 자동 시작
if spec.get('autoStart', True):
await client.put(
f"{nifi_url}/nifi-api/flow/process-groups/{deployed_pg['id']}",
json={'id': deployed_pg['id'], 'state': 'RUNNING'}
)
logger.info("Flow started")
patch.status['deployedProcessGroupId'] = deployed_pg['id']
patch.status['phase'] = 'Running'
else:
logger.error(f"Flow deployment failed: {resp.text}")
patch.status['phase'] = 'Error'GitOps 워크플로
[Git → NiFiFlow CR → Operator → NiFi]
1. 개발자가 Flow를 NiFi Registry에 커밋
2. Flow 버전을 NiFiFlow CR YAML에 업데이트
3. Git에 커밋 + Push
4. ArgoCD/FluxCD가 CR 변경 감지 → K8s에 적용
5. Operator가 NiFiFlow CR 변경 감지
6. NiFi REST API로 Flow 자동 배포/업데이트
11. 모니터링과 자동 복구
Prometheus 메트릭 노출
# operator/main.py
from prometheus_client import start_http_server, Gauge, Counter
# 메트릭 정의
nifi_cluster_nodes = Gauge('nifi_cluster_nodes_total', 'Total NiFi nodes', ['cluster'])
nifi_cluster_ready = Gauge('nifi_cluster_nodes_ready', 'Ready NiFi nodes', ['cluster'])
nifi_scaling_events = Counter('nifi_scaling_events_total', 'Scaling events', ['cluster', 'direction'])
nifi_reconcile_errors = Counter('nifi_reconcile_errors_total', 'Reconciliation errors', ['cluster'])
# Prometheus 메트릭 서버 시작
start_http_server(8080)
# 타이머에서 메트릭 업데이트
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=30)
async def update_metrics(spec, name, status, **kwargs):
nifi_cluster_nodes.labels(cluster=name).set(spec.get('replicas', 0))
nifi_cluster_ready.labels(cluster=name).set(status.get('readyNodes', 0))자동 복구
@kopf.timer('datadynamics.io', 'v1alpha1', 'nificlusters', interval=60)
async def auto_heal(spec, name, namespace, patch, logger, **kwargs):
"""장애 노드 자동 감지 및 복구"""
try:
headless = f"{name}-headless"
url = f"https://{name}-nifi-0.{headless}.{namespace}.svc.cluster.local:8443"
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get(f"{url}/nifi-api/controller/cluster")
if resp.status_code != 200:
return
cluster = resp.json()['cluster']
for node in cluster['nodes']:
if node['status'] == 'DISCONNECTED':
node_id = node['nodeIdentifier']['id']
node_addr = node['address']
logger.warning(f"Disconnected node detected: {node_addr}")
# 재연결 시도
await client.put(
f"{url}/nifi-api/controller/cluster/nodes/{node_id}",
json={'node': {'nodeIdentifier': {'id': node_id}, 'status': 'CONNECTING'}}
)
logger.info(f"Reconnection initiated for {node_addr}")
except Exception as e:
nifi_reconcile_errors.labels(cluster=name).inc()
logger.error(f"Auto-heal error: {e}")12. LDAP/OIDC 인증 자동화
인증 설정 자동 생성
def build_auth_config(spec: dict) -> dict:
"""인증 타입에 따른 설정 파일 생성"""
auth = spec.get('auth', {})
auth_type = auth.get('type', 'single-user')
if auth_type == 'ldap':
login_identity_providers = f"""
<loginIdentityProviders>
<provider>
<identifier>ldap-provider</identifier>
<class>org.apache.nifi.ldap.LdapProvider</class>
<property name="Authentication Strategy">SIMPLE</property>
<property name="Manager DN"></property>
<property name="Manager Password"></property>
<property name="Url">{auth['ldap']['url']}</property>
<property name="User Search Base">{auth['ldap']['searchBase']}</property>
<property name="User Search Filter">{auth['ldap']['searchFilter']}</property>
</provider>
</loginIdentityProviders>"""
return {'login-identity-providers.xml': login_identity_providers}
elif auth_type == 'oidc':
nifi_properties_auth = f"""
nifi.security.user.oidc.discovery.url={auth['oidc']['discoveryUrl']}
nifi.security.user.oidc.client.id={auth['oidc']['clientId']}
nifi.security.user.oidc.client.secret=${{OIDC_CLIENT_SECRET}}
"""
return {'auth.properties': nifi_properties_auth}
return {}13. Operator 배포와 운영
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY operator/ ./operator/
USER 1000:1000
CMD ["kopf", "run", "operator/main.py", "--all-namespaces", "--verbose"]RBAC
# deploy/rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: nifi-operator
namespace: nifi-operator-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: nifi-operator-role
rules:
- apiGroups: ["datadynamics.io"]
resources: ["nificlusters", "nificlusters/status", "nififlows", "nififlows/status"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["statefulsets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services", "configmaps", "secrets", "pods", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["cert-manager.io"]
resources: ["certificates"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: nifi-operator-binding
subjects:
- kind: ServiceAccount
name: nifi-operator
namespace: nifi-operator-system
roleRef:
kind: ClusterRole
name: nifi-operator-role
apiGroup: rbac.authorization.k8s.ioOperator Deployment
# deploy/operator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nifi-operator
namespace: nifi-operator-system
spec:
replicas: 1
selector:
matchLabels:
app: nifi-operator
template:
metadata:
labels:
app: nifi-operator
spec:
serviceAccountName: nifi-operator
containers:
- name: operator
image: datadynamics/nifi-operator:latest
ports:
- containerPort: 8080
name: metrics
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
env:
- name: OPERATOR_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace배포 순서
# 1. Operator 네임스페이스 생성
kubectl create namespace nifi-operator-system
# 2. CRDs 등록
kubectl apply -f deploy/crds/
# 3. RBAC 설정
kubectl apply -f deploy/rbac.yaml
# 4. Operator 배포
kubectl apply -f deploy/operator-deployment.yaml
# 5. Operator 로그 확인
kubectl logs -f deployment/nifi-operator -n nifi-operator-system
# 6. NiFi 클러스터 생성
kubectl create namespace nifi
kubectl apply -f examples/nifi-cluster.yaml
# 7. 상태 확인
kubectl get nifi -n nifi -w트러블슈팅 가이드
| 문제 | 원인 | 해결 |
|---|---|---|
| Pod CrashLoopBackOff | JVM 메모리 부족 | spec.config.jvmHeapSize 증가 |
| 클러스터 조인 실패 | DNS 해석 불가 | Headless Service publishNotReadyAddresses: true 확인 |
| 노드 DISCONNECTED | 네트워크 타임아웃 | nifi.cluster.node.connection.timeout 증가 |
| Flow Election 실패 | 노드 시작 시간 차이 | flow.election.max.wait.time 증가 |
| TLS 핸드셰이크 실패 | 인증서 SAN 불일치 | cert-manager Certificate dnsNames 확인 |
| OOM Killed | 컨테이너 메모리 한도 초과 | spec.resources.limits.memory 증가 |
| PVC Pending | StorageClass 없음 | spec.storage.storageClassName 확인 |
| Operator 권한 오류 | RBAC 부족 | ClusterRole rules 확인 |
참고: 이 Operator는 학습/예제용입니다. 프로덕션 사용 시에는 에러 처리 강화, Finalizer 추가, 통합 테스트, Helm 차트 패키징 등 추가 작업이 필요합니다.
References
- Apache NiFi 2.x Documentation — https://nifi.apache.org/docs.html
- NiFi REST API Reference — https://nifi.apache.org/docs/nifi-docs/rest-api/
- kopf (Kubernetes Operator Framework for Python) — https://kopf.readthedocs.io/
- Kubernetes Custom Resources — https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/
- cert-manager Documentation — https://cert-manager.io/docs/
— Data Dynamics 엔지니어링 팀