Airflow cluster setup for multi Tenance and kubernetes operator
Airflow cluster setup for multi-tenance and kubernetes operator
Airflow는 프로그래밍 방식으로 워크플로우를 개발하여 스케줄링 및 모니터링하는 플랫폼이다. Airflow를 사용하면 DAG(Directed Acyclic Graph) 형태의 워크플로우 개발이 가능하며 DAG를 연결하여 확장 가능한 Pipeline도 개발 가능하다. 이러한 편의성으로 많은 프로젝트에서 Airflow를 사용하고 있으며 관련 자료도 매우 쉽게 찾을수 있어 레거시 코드 마이그레이션도 매우 쉽게 가능하다. 그리고 Public Cloud에서는 Managed Airflow를 제공하고 있어 비용만 지불한다면 운영 부담도 매우 낮다.
Airflow Cluster를 설치하는 방법은 공식 홈페이지를 통해서 확인할 수 있으며 운영 환경을 고려한다면 Production Docker Image, Helm Chart, Managed Airflow Service를 사용하여 Airflow를 설치하는 방법이 가장 좋은 방법이다. 일반적인 단일 프로젝트에서는 Airflow 홈페이지에서 제공하는 시스템 구조로도 모든 요구사항에 대하여 대응이 가능하지만 GPU Cluster와 같은 대규모 컴퓨팅 리소스를 여러개의 프로젝트 멥버가 공유해서 사용해야 하는 환경에서는 위 구성으로는 많은 제약 사항이있다. 한 예로 Data Engineer와 Data Scientist가 동일한 Airflow Cluster를 사용해야 한다면 DAG에 대한 ACL부터 고민해야한다. 이번 글에서는 Managed Airflow를 사용하는 방식이 아닌 VM과 K8S가 혼재된 환경에서 Airflow Cluster를 설치해보고 더 나아가 멀티테넌시 지원 가능한 Airflow Cluster 설정을 진행해보고자 한다.
Airflow architecture
Airflow 1.10 이상을 사용한다면 Airflow Worker의 리소스 확장과 편의성으로 대부분 Kubernetes Executor를 사용할것이다. 이때의 시스템 구조는 Airflow 홈페이지에서 가이드하는 아래의 이미지와 같으며 Airflow Web Server, Scheduler의 DAG 파일을 Worker Pod와 공유하는 구조이다.
Airflow에서 DAG 실행을 위해서는 Web Server, Scheduler 그리고 Worker에 DAG 파일이 공유되어야 하며, Kubernetes Pod로 실행되는 Worker Pod에 DAG 파일을 공유하기 위해서는 Pod Image에 DAG를 저장하는 방법, Persistent Volume을 사용하는 방법 마지막으로 Git Sync를 사용하는 방법 등이 있다. Pod Image와 PVC를 사용하는 방법은 시스템 구축면에서 매우 쉽지만 사용자가 DAG 파일을 수정하기 매우 까다롭기 때문에 일반적으로 많이 사용하지 않으며 Git Sync를 사용하는 방법은 구축과 사용에 매우 쉬운 장점이 있지만 DAG 파일 수정이 필요한 사용자 모두에서 Git 권한을 설정해야하는 단점이 존재한다. Git 브랜치로 분리하여 사용하는 방법도 좋은 대안이 될 수 있겠지만 브랜치를 분리하는 방법은 Git을 공유하는 방법이므로 이 또한 멀티테넌시 측면에서 좋은 방법은 아닌것으로 판단된다.
Airflow architecture considering multi-tenancy with S3
이번글에서는 이러한 DAG 공유 문제를 비교적 사용하기 쉬운 S3를 사용하여 Web Server, Scheduler 그리고 Kubernetes Executor 간의 DAG 공유를 해결하였고 Load Balance를 사용하여 Web Server와 Scheduler의 이중화도 구성하였다. 전체적인 System Architecture는 아래 이미지와 같다.
Airflow cluster installation
Airflow의 Web Server와 Scheduler는 VM 환경에 설치할것이며 Airflow Cluster 구축에 필요한 Load Balancer, Metadata DB 그리고 S3는 이미 구축되어 있다는 가정으로 설치 진행해보도록 한다. 설치는 HA 구성을 고려하여 3대로 결정하였고 모든 설정은 3대 모두 동일하게 실행하도록 한다.
Airflow web server and scheduler H/W resource
Airflow web server와 scheduler 서버의 사양은 아래와 같으며 web server와 scheduler에서는 별도의 Job이 실행되지 않으므로 높은 H/W Resource를 요구하지 않는다.
Hostname | OS | IP | Role | vCPU | Memory | Disk |
---|---|---|---|---|---|---|
airflow1 | Rocky 8 | 10.0.0.120 | Web Server, Scheduler | 3 | 8 GiB | 50 GiB |
airflow2 | Rocky 8 | 10.0.0.121 | Web Server, Scheduler | 3 | 8 GiB | 50 GiB |
airflow3 | Rocky 8 | 10.0.0.122 | Web Server, Scheduler | 3 | 8 GiB | 50 GiB |
Kernel parameter setting
Airflow 관련 프로세스에서 CPU와 Memory를 최대한 사용하기 위해 관련 kernel parameter 설정을 진행하자. /etc/security/limits.d/airflow.conf 파일을 생성하고 관련 항목을 설정한다.
$ cat <<EOF > /etc/security/limits.d/airflow.conf
airflow soft fsize unlimited
airflow hard fsize unlimited
airflow soft cpu unlimited
airflow hard cpu unlimited
airflow soft as unlimited
airflow hard as unlimited
airflow soft memlock unlimited
airflow hard memlock unlimited
airflow soft nofile 65534
airflow hard nofile 65534
airflow soft rss unlimited
airflow hard rss unlimited
airflow soft nproc 65534
airflow hard nproc 65534
EOF
Python installation
Airflow는 파이썬으로 개발되었으므로 python3 버전을 설치하도록 한다.
$ sudo dnf install -y python38 python38-pip python38-devel python38-requests s3fs-fuse
Firewall setting
Airflow에서 사용할 로컬 방화벽 포트를 오픈합니다.
$ sudo firewall-cmd --permanent --add-port 8080/tcp
$ sudo firewall-cmd --reload
User creation
Airflow에서 사용할 OS 사용자를 생성한다. 사용자 UID와 GID는 각각 50000과 100을 사용하도록 한다. 50000과 100을 사용하는 이유는 airflow container 이미지에서 사용하는 아이디가 50000과 100이다.
$ sudo useradd --gid 100 --uid 50000 --create-home --home-dir /opt/airflow airflow
Default directory creation
Airflow에서 사용하는 기본 디렉토리를 생성한다. 모두 airflow 홈디렉토리 아래에 생성하도록 한다. 디렉토리 생성은 airflow 계정으로 생성되도록 한다.
$ mkdir -p /opt/airflow/configs /opt/airflow/dags /opt/airflow/plugins /opt/airflow/logs
DAGs directory mount from S3
Airflow의 DAG 파일들은 Web Server, Scheduler 그리고 Worker에 공유되어야하며 여기에서는 S3를 사용하여 동기화하기로 하였다. s3fuse를 사용하여 DAG 디렉토리를 동기화하도록 하자.
$ echo "${S3_ACCESS_KEY_ID}:${S3_SECRET_KEY} > /etc/s3fs
$ chmod 400 /etc/s3fs
$ mount -t fuse.s3fs airflow /opt/airflow/dags fuse.s3fs -o url=http://192.168.11.4:9000 -o passwd_file=/etc/s3fs -o uid=50000 -o gid=100 -o _netdev -o use_path_request_style -o allow_other -o mp_umask=0022 -o umask=0133 -o dbglevel=info
Airflow installation
pip을 이용하여 airflow를 설치하도록 한다. 여기서는 OIDC, Kubernetes, S3를 사용할 예정이므로 추가 패키지로 앞의 3가지 패키지를 지정하였다. 설치는 성능에 따라 대략 5분정도 소요되므로 천천히 기다려보도록 하자.
$ python3 -m pip install --user --quiet --no-cache-dir --no-warn-script-location \
"apache-airflow[celery,google_auth,kubernetes,postgres,s3]==2.2.5" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.5/constraints-no-providers-3.8.txt"
Airflow config setting
Reference #1를 참고하여 Airflow의 홈디렉토리에 airflow.cfg 파일을 복사하고 기본 설정을 진행한다. 여기에서 수정한 내용은 아래와 같다.
Parameter | Default value | Changed value | Description |
---|---|---|---|
dags_folder | /opt/airflow/airflow/dags | /opt/airflow/airflow/dags/s3 | DAG directory path |
default_timezone | utc | Asia/Seoul | timezone |
executor | SequentialExecutor | KubernetesExecutor | executor type |
sql_alchemy_conn | sqlite:////opt/airflow/airflow/airflow.db | postgresql+psycopg2://${DB_USER}:${DB_PASS}@${DB_IP}:${DB_PORT}/${DB_NAME} | Database Connection String |
load_examples | True | False | Load example dags |
base_log_folder | /opt/airflow/airflow/logs | /opt/airflow/airflow/logs | log directory |
remote_logging | False | True | remote logging for sharing log files |
remote_log_conn_id | airflow_log | s3 connection name | |
remote_base_log_folder | s3://airflow/logs | s3 bucket path | |
encrypt_s3_logs | False | False | encrypt s3 logs |
endpoint_url | http://localhost:8080 | http://192.168.11.31:8080 | L4 ip, port or uri |
base_url | http://localhost:8080 | http://192.168.11.31:8080 | L4 ip, port or uri |
default_ui_timezone | utc | Asia/Seoul | ui timezone |
result_backend | db+postgresql://postgres:airflow@postgres/airflow | postgresql+psycopg2://${DB_USER}:${DB_PASS}@${DB_IP}:${DB_PORT}/${DB_NAME} | Database Connection String for result |
pod_template_file | /opt/airflow/configs/pod_template.yaml | kubernetes executor template pod | |
worker_container_repository | docker.io/apache/airflow | container repository | |
worker_container_tag | 2.2.5-python3.8 | container tag | |
namespace | default | airflow | kubernetes executor namespace |
in_cluster | True | False | |
config_file | /opt/airflow/.kube/config | kubectl config file |
Web server config for OIDC
OIDC 인증을 위해 webserver_config.py 파일을 설정한다. webserver_config.py에 대한 자세한 설명은 Airflow authentication with RBAC and Keycloak을 참조하도록 하자. 여기서는 Airflow 2.2.X에 맞도록 webserver_config.py 파일을 수정하였다.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Default configuration for the Airflow webserver"""
import os
import logging
import jwt
from flask import redirect, session
from flask_appbuilder import expose
from airflow.www.fab_security.manager import AUTH_OAUTH
from airflow.www.security import AirflowSecurityManager
from flask_appbuilder.security.views import AuthOAuthView
basedir = os.path.abspath(os.path.dirname(__file__))
log = logging.getLogger(__name__)
# Flask-WTF flag for CSRF
WTF_CSRF_ENABLED = True
# ----------------------------------------------------
# AUTHENTICATION CONFIG
# ----------------------------------------------------
# For details on how to set up each of the following authentication, see
# http://flask-appbuilder.readthedocs.io/en/latest/security.html# authentication-methods
# for details.
AUTH_TYPE = AUTH_OAUTH
# Uncomment to setup Full admin role name
AUTH_ROLE_ADMIN = 'Admin'
# Uncomment to setup Public role name, no authentication needed
AUTH_ROLE_PUBLIC = 'Public'
# Will allow user self registration
AUTH_USER_REGISTRATION = True
# The default user self registration role
AUTH_USER_REGISTRATION_ROLE = "Public"
AUTH_ROLES_MAPPING = {
"airflow_admin": ["Admin"],
"airflow_op": ["Op"],
"airflow_user": ["User"],
"airflow_viewer": ["Viewer"],
"airflow_public": ["Public"],
}
MY_PROVIDER = 'keycloak'
OAUTH_PROVIDERS = [
{
'name': MY_PROVIDER,
'icon': 'fa-circle-o',
'token_key': 'access_token',
'remote_app': {
'client_id': '',
'client_secret': '',
'client_kwargs': {
'scope': 'email profile'
},
'api_base_url': '/',
'request_token_url': None,
'access_token_url': '/token',
'authorize_url': '/auth',
},
},
]
class CustomAuthRemoteUserView(AuthOAuthView):
@expose("/logout/")
def logout(self):
"""Delete access token before logging out."""
super().logout()
return redirect("/logout?redirect_uri=")
class CustomSecurityManager(AirflowSecurityManager):
authoauthview = CustomAuthRemoteUserView
def oauth_user_info(self, provider, response):
if provider == MY_PROVIDER:
token = response["access_token"]
me = jwt.decode(token, algorithms="RS256", verify=False)
# {
# "resource_access": { "airflow": { "roles": ["airflow_admin"] }}
# }
groups = me["resource_access"][""]["roles"] # unsafe
# log.info("groups: {0}".format(groups))
if len(groups) < 1:
groups = ["airflow_public"]
dct = {
"username": me.get("preferred_username"),
"email": me.get("email"),
"first_name": me.get("given_name"),
"last_name": me.get("family_name"),
"role_keys": groups,
}
log.info("user info: {0}".format(dct))
return dct
else:
return {}
SECURITY_MANAGER_CLASS = CustomSecurityManager
APP_THEME = "simplex.css"
initialize airflow database
airflow에서 사용할 database를 먼저 초기화한다.
$ airflow db reset --yes
add s3 connection information
airflow 로깅을 위해 사용할 s3 접속 정보를 설정한다.
$ airflow connections add airflow_log \
--conn-type s3 \
--conn-host \
--conn-port \
--conn-login \
--conn-password \
--conn-extra '{ "host": "", "aws_access_key_id":"", "aws_secret_access_key": "" }'
Kubernetes Executor setting
Airflow worker 역할을 하는 kubernetes executor를 설정해보도록 하자. executor에 대한 전체적인 설정은 Web Server와 Scheduler에 한 설정을 Kubernetes 상에서 동작하도록 설정하는 내용이다. kubernetes executor에 대한 내용은 Reference #2의 링크에서 확인 가능하다.
kubeconfig file setting
airflow web service와 scheduler가 kubernetes에 명령을 내릴수 있도록 kubeconfig 파일 설정하자. kubeconfig 파일은 위의 airflow.cfg의 config_file에 기록된 /opt/airflow/.kube/config에 복사하도록 하자. 그리고 KUBECONFIG 환경변수에 kubeconfig 파일 위치를 설정하도록 하자.
$ cp $KUBE_CONFIG_FILE /opt/airflow/.kube/config
$ export KUBECONFIG=/opt/airflow/.kube/config
secret, serviceaccount and configmap creation
kubernetes executor에서 사용할 설정들을 생성하도록 한다.
$ kubectl create secret generic homelab-airflow-s3-secret \
--namespace airflow \
--dry-run=client \
--from-literal=aws_access_key_id= \
--from-literal=aws_secret_access_key= \
-o yaml | kubectl apply -f -
$ kubectl create secret generic homelab-airflow-fernet-key \
--namespace airflow \
--dry-run=client \
--from-literal=fernet-key= \
-o yaml | kubectl apply -f -
$ kubectl create secret generic homelab-airflow-metadata \
--namespace airflow \
--dry-run=client \
--from-literal=connection= \
-o yaml | kubectl apply -f -
$ kubectl create serviceaccount homelab-airflow-worker-serviceaccount \
--namespace airflow \
--dry-run=client \
-o yaml | kubectl apply -f -
$ kubectl create configmap homelab-airflow-config \
--namespace airflow \
--dry-run=client \
--from-file=/opt/airflow/airflow.cfg \
-o yaml | kubectl apply -f -
copy pod_template yaml to airflow home directory
pod_template 파일을 생성 후 airflow 홈 디렉토리로 복사한다.
---
apiVersion: v1
kind: Pod
metadata:
name: homelab-airflow-worker
spec:
containers:
- name: airflow-worker
image: docker.io/apache/airflow:2.2.5-python3.8
imagePullPolicy: IfNotPresent
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: homelab-airflow-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: homelab-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: homelab-airflow-metadata
key: connection
securityContext:
privileged: true
runAsUser: 50000
volumeMounts:
- mountPath: /opt/airflow/dags
name: airflow-dags
mountPropagation: Bidirectional
readOnly: true
- mountPath: /opt/airflow/logs
name: airflow-logs
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
- name: s3fs
image: docker.io/efrecon/s3fs:1.89
imagePullPolicy: IfNotPresent
env:
- name: AWS_S3_URL
value:
- name: AWS_S3_BUCKET
value:
- name: AWS_S3_REGION
value: ""
- name: UID
value: "50000"
- name: GID
value: "100"
- name: S3FS_ARGS
value: use_path_request_style,allow_other,mp_umask=0022,umask=0133,dbglevel=info
- name: AWS_S3_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: homelab-airflow-s3-secret
key: aws_access_key_id
- name: AWS_S3_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: homelab-airflow-s3-secret
key: aws_secret_access_key
resources: {}
securityContext:
privileged: true
runAsUser: 0
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /opt/s3fs/bucket
name: airflow-dags
mountPropagation: Bidirectional
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 100
serviceAccountName: "homelab-airflow-worker-serviceaccount"
volumes:
- name: airflow-dags
emptyDir: {}
- name: airflow-logs
emptyDir: {}
- name: airflow-config
configMap:
name: homelab-airflow-config
Start web server and scheduler
web server와 scheduler 설정이 완료되었으면 web server와 scheduler의 데몬을 설정하고 시작하도록 하자.
start web server daemon
web server 데몬은 airflow github 페이지를 참조하여 생성하였으며 서비스 등록 후 시작하도록 한다.
web server 데몬
[Unit]
Description=Airflow webserver daemon
After=network.target
[Service]
Environment=AIRFLOW_HOME=
Environment=PATH=/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin
User=airflow
Group=
Type=simple
WorkingDirectory=
RuntimeDirectory=airflow
LogsDirectory=airflow
ExecStart=/.local/bin/airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
web server 서비스 시작
$ systemctl daemon-reload
$ systemctl start airflow-webserver
start scheduler daemon
scheduler 데몬은 airflow github 페이지를 참조하여 생성하였으며 서비스 등록 후 시작하도록 한다.
scheduler 데몬
[Unit]
Description=Airflow scheduler daemon
After=network.target
[Service]
Environment=AIRFLOW_HOME=
Environment=PATH=/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin
User=airflow
Group=
Type=simple
WorkingDirectory=
RuntimeDirectory=airflow
LogsDirectory=airflow
ExecStart=/.local/bin/airflow scheduler --pid /run/airflow/scheduler.pid
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
scheduler 서비스 시작
$ systemctl daemon-reload
$ systemctl start airflow-scheduler
Conclusion
Airflow Cluster에 DAG 파일을 공유하기 위한 VM과 Kubernetes가 혼재된 환경에서 설치를 해보았다. Airflow 홈페이지에 이미지, Persistent Volume 그리고 Git Sync를 사용하여 공유하는 방법이 있지만 DAG를 좀 더 쉽게 공유하기 위해 S3를 사용하였다. 모든 환경을 Kubernetes에 구성하는 방법이 훨씬 쉽지만 부득이 하게 Kubernetes를 제한적으로 사용할 수 밖에 없다면 위에서 가이드하는 방법도 좋은 방법일것으로 생각한다.
댓글남기기