from airflow import DAG
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerProcessingOperator
from datetime import datetime, timedelta
# 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
with DAG(
dag_id='sagemaker_processing_job',
default_args=default_args,
description='A DAG to run a SageMaker Processing Job',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# SageMaker Processing Job 설정
processing_job_config = {
'AppSpecification': {
'ImageUri': '<your-ecr-repository>:latest', # 사용할 Docker 이미지 URI
'ContainerEntrypoint': ['python', 'your_script.py'], # 실행할 스크립트
},
'ProcessingResources': {
'ClusterConfig': {
'InstanceCount': 1, # 사용할 인스턴스 수
'InstanceType': 'ml.m5.large', # 사용할 인스턴스 타입
'VolumeSizeInGB': 30, # EBS 볼륨 크기
}
},
'StoppingCondition': {
'MaxRuntimeInSeconds': 3600, # 최대 실행 시간 (초)
},
'ProcessingInputs': [
{
'InputName': 'input-data',
'S3Input': {
'S3Uri': 's3://your-bucket/input-data/', # 입력 데이터가 저장된 S3 경로
'LocalPath': '/opt/ml/processing/input', # 컨테이너 내의 로컬 경로
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
}
}
],
'ProcessingOutputConfig': {
'Outputs': [
{
'OutputName': 'output-data',
'S3Output': {
'S3Uri': 's3://your-bucket/output-data/', # 출력 데이터를 저장할 S3 경로
'LocalPath': '/opt/ml/processing/output', # 컨테이너 내의 로컬 경로
'S3UploadMode': 'EndOfJob', # 작업 종료 후 업로드
}
}
]
},
'RoleArn': 'arn:aws:iam::<your-account-id>:role/<your-sagemaker-execution-role>', # SageMaker 실행 역할 ARN
'Region': 'us-west-2', # 사용할 AWS 리전
}
# SageMaker Processing Operator를 사용하여 Processing Job 실행
run_processing_job = SageMakerProcessingOperator(
task_id='run_processing_job',
config=processing_job_config,
aws_conn_id='aws_default', # Airflow의 AWS 연결 ID
wait_for_completion=True, # 작업 완료까지 대기 여부
check_interval=30, # 상태 확인 간격 (초)
)
run_processing_job
카테고리 없음
airflow sagemaker
반응형
반응형