본문 바로가기
카테고리 없음

airflow sagemaker

by 제타 2024. 7. 30.
반응형

from airflow import DAG
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerProcessingOperator
from datetime import datetime, timedelta

# 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
with DAG(
    dag_id='sagemaker_processing_job',
    default_args=default_args,
    description='A DAG to run a SageMaker Processing Job',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # SageMaker Processing Job 설정
    processing_job_config = {
        'AppSpecification': {
            'ImageUri': '<your-ecr-repository>:latest',  # 사용할 Docker 이미지 URI
            'ContainerEntrypoint': ['python', 'your_script.py'],  # 실행할 스크립트
        },
        'ProcessingResources': {
            'ClusterConfig': {
                'InstanceCount': 1,  # 사용할 인스턴스 수
                'InstanceType': 'ml.m5.large',  # 사용할 인스턴스 타입
                'VolumeSizeInGB': 30,  # EBS 볼륨 크기
            }
        },
        'StoppingCondition': {
            'MaxRuntimeInSeconds': 3600,  # 최대 실행 시간 (초)
        },
        'ProcessingInputs': [
            {
                'InputName': 'input-data',
                'S3Input': {
                    'S3Uri': 's3://your-bucket/input-data/',  # 입력 데이터가 저장된 S3 경로
                    'LocalPath': '/opt/ml/processing/input',  # 컨테이너 내의 로컬 경로
                    'S3DataType': 'S3Prefix',
                    'S3InputMode': 'File',
                }
            }
        ],
        'ProcessingOutputConfig': {
            'Outputs': [
                {
                    'OutputName': 'output-data',
                    'S3Output': {
                        'S3Uri': 's3://your-bucket/output-data/',  # 출력 데이터를 저장할 S3 경로
                        'LocalPath': '/opt/ml/processing/output',  # 컨테이너 내의 로컬 경로
                        'S3UploadMode': 'EndOfJob',  # 작업 종료 후 업로드
                    }
                }
            ]
        },
        'RoleArn': 'arn:aws:iam::<your-account-id>:role/<your-sagemaker-execution-role>',  # SageMaker 실행 역할 ARN
        'Region': 'us-west-2',  # 사용할 AWS 리전
    }

    # SageMaker Processing Operator를 사용하여 Processing Job 실행
    run_processing_job = SageMakerProcessingOperator(
        task_id='run_processing_job',
        config=processing_job_config,
        aws_conn_id='aws_default',  # Airflow의 AWS 연결 ID
        wait_for_completion=True,  # 작업 완료까지 대기 여부
        check_interval=30,  # 상태 확인 간격 (초)
    )

    run_processing_job

반응형