본문 바로가기
AWS

[실습] AWS Step Function을 사용하여 데이터 파이프라인 오케스트레이션

by 권미정 2023. 3. 14.

<Data Engineering with AWS> 10장을 실습 및 번역 요약한 내용입니다.


책의 10장에서는 '데이터 파이프라인 오케스트레이션'의 개념과 네 가지 옵션들을 학습했습니다. 그래서 이번 장의 실습 내용은 데이터 파이프라인을 오케스트레이션하는 데 사용할 수 있는 AWS Step Function 서비스를 직접 사용해 보는 것입니다.

 

실습해 볼 오케스트레이션할 파이프라인은 상대적으로 간단합니다. 그래서 Lambda 함수만 사용하여 데이터를 처리합니다.

 

1. 새 Lambda 함수 생성

먼저 오케스트레이션할 Lambda 함수를 생성해야 합니다. 

 

① Lambda 함수를 사용하여 파일 확장자 결정

첫번째 Lambda 함수는 Amazon S3 버킷에 업로드된 모든 파일의 확장자를 확인한 다음, 상태 시스템으로 반환합니다.

 

  1. AWS Management Console에 로그인 하고 AWS Lambda 서비스로 이동합니다.
  2. 이 책의 모든 실습에 사용했던 region에 있는지 확인합니다.
  3. 함수 생성 클릭합니다.
  4. 새로 작성 선택합니다. 그런 다음 함수 이름에 dataeng-check-file-ext 입력합니다.
  5. 런타임 Python 3.9 선택합니다. Architecture  Permissions 의 기본값은 그대로 두고 함수 생성 클릭합니다.
  6. 코드 소스 블록에서 기존 코드를 아래의 코드로 바꿉니다. 이 코드는 새 S3 파일이 업로드될 때 EventBridge 이벤트를 수신하고 이벤트에 포함된 메타데이터를 사용하여 파일의 확장자를 결정합니다.
import urllib.parse

import json

import os

print('Loading function')

def lambda_handler(event, context):

    print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type

    bucket = event['detail']['requestParameters']['bucketName']

    key = urllib.parse.unquote_plus(event['detail']['requestParameters']['key'], encoding='utf-8')

    filename, file_extension = os.path.splitext(key)

    print(f'File extension is: {file_extension}')

    payload = {

        "file_extension": file_extension,

        "bucket": bucket,

        "key": key

        }

    return payload

  7. Deploy 버튼을 클릭해서 Lambda 함수를 저장하고 배포합니다.

 

② 무작위로 실패를 생성하는 Lambda

이제 받은 파일을 처리할 두 번째 Lambda 함수를 생성할 수 있습니다. 이 람다의 경우함수를 사용하면 난수 생성기를 사용하여 Lambda 함수에서 오류를 일으킬지 또는 성공할지 결정합니다. 0, 1 또는 2가 되는 임의의 숫자를 생성한 다음 임의의 숫자를 10으로 나누어 이를 수행합니다. 임의의 숫자가 0이면 함수에서 "0으로 나누기" 오류가 발생합니다.

 

  1. 이전 섹션의 1~5단계를 반복하여 첫 번째 Lambda 함수를 생성하되 이번에는 함수 이름dataeng-random-failure-generator 입력합니다.
  2. 코드 소스 블록에서 기존 코드를 아래의 코드로 바꾸고, deploy 버튼을 눌러 배포합니다.
from random import randint

def lambda_handler(event, context):

    print('Processing')

    #Our ETL code to process the file would go here

    value = randint(0, 2)

    # We now divide 10 by our random number.

    # If the random number is 0, our function will fail

    newval = 10 / value

    print(f'New Value is: {newval}')

    return(newval)

 

 

2. SNS 주제 생성 및 이메일 주소 구독

이제 Step Function 상태 시스템에서 오케스트레이션할 수 있는 두 개의 Lambda 함수가 있습니다. 하지만 상태 시스템을 생성하기 전에 생성해야 할 몇 가지 추가 리소스가 있습니다.

 

상태 시스템에 오류가 발생하면 오류에 대한 이메일 알림을 보낼 수 있기를 원합니다. 우리는 SNS 서비스를 이용하여 이메일을 보낼 수 있는데, 이를 위해서는 알림을 보낼 SNS 주제를 만들어야 합니다. 그런 다음 해당 항목에 하나 이상의 이메일 주소를 등록할 수 있습니다.

 

  1. Amazon SNS 서비스로 이동합니다.
  2. 이 책의 모든 실습에 사용했던 region에 있는지 확인합니다.
  3. 왼쪽 메뉴에서 주제 클릭한 다음 주제 생성 클릭합니다.
  4. 유형으로 표준 선택합니다.
  5. 이름 dataeng-failure-notification을 입력합니다.
  6. 다른 모든 항목은 그대로 두고 주제 생성 클릭합니다.
  7. 주제 세부 정보 섹션에서 구독 생성 클릭합니다.
  8. 프로토콜에서 이메일 선택합니다.
  9. 엔드포인트 이메일 주소를 입력합니다. 그런 다음 구독 생성 클릭합니다.
  10. 이메일에 액세스하여 no-reply@sns.amazonaws.com에서 보낸 이메일을 찾습니다. 해당 이메일에서 Confirm_subscription 링크를 클릭합니다.

이제 SNS 알림을 받을 수 있는 이메일 구독이 확인된 SNS 주제가 있습니다.

 

 

3. 새 Step Function 상태 시스템 생성

이제 지금까지 생성한 다양한 구성 요소들을 오케스트레이션할 수 있습니다.

 

  1. Amazon Step Function 서비스로 이동합니다.
  2. 이 책의 모든 실습에 사용했던 region에 있는지 확인합니다.
  3. 왼쪽 메뉴의 상태 머신에서 상태 머신 생성을 클릭합니다.
  4. 기본값인 시각적으로 워크플로 설계를 그대로 두고 유형을 표준으로 설정합니다. 그리고 다음을 클릭합니다.
  5. 그러면 Start 블록과 End 블록이 있는 시각적 편집기가 표시됩니다. AWS Lambda Invoke 블록 Start 블록 과 End 블록 사이의 비주얼 디자이너로 드래그합니다.
  6. 화면 오른쪽에서 상태 이름을 Check File Extension으로 설정합니다.
  7. API 파라미터 아래에서 드롭다운 목록을 사용하여 파일 확장명을 추출하는 Lambda 함수(dataeng-check-file-ext)를 선택합니다.
  8. 출력 탭을 클릭하고 OutputPath로 출력 필터링에 대한 선택기를 클릭 하고 $.Payload 값을 제공합니다. 이 옵션을 선택하면 파일 확장자 확인 상태가 Lambda 함수에서 반환된 항목의 출력을 갖도록 구성합니다.

 

 

  9. 왼쪽에서 흐름 탭을 클릭합니다. 그런 다음 Choice 상태 Lambda Invoke 함수와 종료 상태 사이로 드래그합니다. 상태를 사용하여 파이프라인을 분기하고 이전 상태의 출력을 기반으로 다른 프로세스를 실행합니다. 이 경우 파이프라인은 처리 중인 파일의 확장자에 따라 다른 작업을 수행합니다.

  10. 오른쪽의 새 선택 상태에 대한 구성에서 Rule #1 옆에 있는 연필 편집 아이콘을 클릭한 다음 Add Conditions를 클릭합니다.

  11. 팝업 화면의 Variable 아래에 $.file_extension을 입력합니다(Lambda 함수는 처리 중인 파일의 확장자가 있는 문자열이 포함된 file_extension의 JSON 경로를 포함하여 일부 JSON을 반환함). Operatormatches string를 선택해 문자열과 일치하도록 설정하고 Value .csv 입력합니다. 그런 다음 조건 저장 클릭합니다.

 

  12. 왼쪽에서 작업 탭으로 다시 전환하고 AWS Lambda Invoke 상태를 흐름도의 Rule #1 상자로 드래그합니다.

  13. 규칙 #1 에 대한 새로운 Lambda Invoke 상태의 경우 상태 이름 Process CSV 설정합니다(Choice 함수 11단계 에서 설정한 대로 확장자가 .csv인 모든 파일에 대해 이 Lambda를 호출하기 때문입니다).

  14. API 매개변수에서 드롭다운을 사용하여 Lambda 함수 dataeng-random-failure-generator를 선택합니다. 이 실습에서는 파이프라인을 오케스트레이션하는 방법에만 초점을 맞추고 있으므로 Lambda 함수 코드는 단순히 10을 임의의 숫자로 나누도록 설계되어 임의의 숫자가 0일 때 임의의 실패가 발생합니다.

 

  15. 왼쪽에서 흐름 탭으로 다시 전환하고 Pass 상태를 Choice 상태 에서 이어지는 Default 규칙 상자로 드래그합니다. Lambda 함수의 출력이 다른 규칙과 일치하지 않는 경우 기본 규칙이 사용됩니다. 이 경우 유일한 다른 규칙은 .csv 확장자를 가진 파일을 처리하는 것이므로, 파일에 .csv 이외의 다른 확장자가 있는 경우 기본 규칙이 사용됩니다.

  16. 오른쪽에서 Pass 상태 구성 의 경우 상태 이름 Pass – Invalid File Ext로 변경합니다. 그런 다음 출력 탭을 클릭하고 다음을 결과 텍스트 상자에 붙여넣습니다. Pass 상태는 상태 시스템에서 다음 상태로 전달되는 데이터를 수정하는 데 사용됩니다. 이 경우 파일 형식이 유효하지 않다는 오류 메시지를 파이프라인의 다음 상태로 전달하려고 합니다.

{

  "Error": "InvalidFileFormat"

}

ResultPath를 사용하여 출력에 원래 입력 추가를 선택하고 드롭다운이 Combine original input with result로 설정되어 있는지 확인합니다. 텍스트 상자에 $.Payload 입력합니다.

  17. 만약 우리가 InvalidFileFormat 오류가 발생하면 Amazon SNS 서비스를 사용하여 알림을 보내려고 합니다. 이렇게 하려면 왼쪽의 작업 탭에서 Amazon SNS Publish 상태 Pass - Invalid File Ext 상태 아래로 드래그합니다. 오른쪽에 있는 SNS Publish 상태 에 대한 구성 탭의 API 파라미터 아래에서 주제 위에서 생성한 SNS 주제(dataeng-failure-notification)로 설정합니다. 이제 상태 시스템은 아래와 같이 표시됩니다.

Step Function 상태 머신의 현재 상태

 

  18. 이제 Process CSV 상태에 대한 오류 처리를 추가합니다. Process CSV 상태를 클릭하고 오른쪽에서 오류 처리 탭을 클릭합니다. Catch 오 아래에서 + 새 catcher 추가 버튼을 클릭합니다. Errors에 대해 States.ALL을 선택하고 Fallback state에 대해 SNS Publish 상태를 선택하고 ResultPath$.Payload 입력합니다. 이 구성은 어떤 이유로든 Lambda 함수가 실패하면(States.ALL) Payload 아래 JSON에 오류 메시지를 추가함을 의미합니다. 키를 누르고 이를 SNS 알림 상태로 전달합니다.

  19. 왼쪽에서 흐름 탭을 클릭하고 Success 상태를Process CSV 상태 아래로 드래그합니다. 그런 다음 SNS Publish 상태 아래에 Fail 상태 드래그합니다. 어떤 이유로든 실패하고 결국 SNS를 사용하여 실패 알림을 보내는 경우 Step Function이 실패한 것으로 표시되기를 원하기 때문에 이 작업을 수행하고 있습니다. 최종 상태는 아래와 같아야 합니다.

Step Function 상태 머신의 최종 상태

  20. 오른쪽 상단에서 다음을 클릭합니다. 이 화면에는 상태 시스템에 대해 생성된 JSON Amazon States 언어 코드가 표시됩니다. 다음을 클릭합니다.

  21. 상태 시스템 이름 ProcessFilesStateMachine을 입력합니다. 다른 모든 설정은 그대로 두고 상태 머신 생성 클릭합니다.

 

이를 통해 Step Function을 사용하여 파이프라인 오케스트레이션을 만들었습니다. 이제 Step Function을 트리거하기 위한 이벤트 기반 워크플로를 생성하려고 합니다. 다음 섹션에서는 새 파일이 특정 S3 버킷에 업로드될 때마다 상태 머신을 트리거하는 새 EventBridge 규칙을 생성합니다.

 

 

4. AWS CloudTrail 및 Amazon EventBridge 구성

AWS CloudTrail 서비스는 AWS 계정에서 수행되는 활동을 거의 실시간으로 기록하는 데 사용됩니다. Amazon EventBridege는 CloudTrail 로그를 모니터링하여 특정 이벤트를 감지하고 이에 대응할 수 있습니다. 그러나 Amazon S3의 경우 개체 수준 데이터 이벤트는 기본적으로 CloudTrail에 기록되지 않으므로 CloudTrail 데이터 이벤트를 생성하도록 S3 버킷을 구성해야 합니다.

 

① Amazon S3 데이터 이벤트 구성

  1. Amazon CloudTrail 서비스로 이동합니다.
  2. 이 책의 모든 실습에 사용했던 region에 있는지 확인합니다.
  3. 왼쪽 패널을 확장하고 대시보를 클릭합니다.
  4. 추적 아래에서 추척 생성 클릭합니다.
  5. 추적 이름 s3-data-events를 입력합니다.
  6. AWS KMS 별칭으로 s3-data-events-key를 입력합니다.
  7. 다른 모든 옵션은 그대로 두고 다음을 클릭합니다.
  8. 이벤트 유형에 대해 관리 이벤트 선택 취소하고 대신 데이터 이벤트 선택합니다.
  9. 기본 이벤트 선택기로 전환하고 데이터 이벤트 소스: S3 아래에서 현재 및 향후 모든 S3 버킷에 대한 읽기  쓰기 옵션을 선택 취소합니다.
  10. 개별 버킷 선택 아래에서 자신의 클린 영역 버킷의 이름(예: dataeng-clean-zone-<initials> )을 입력하거나 찾습니다. 읽기 선택 해제하고 쓰기 이벤트만 선택된 상태로 둡니다. 그리고 다음 클릭합니다.
  11. 요약 화면을 검토하면서 추적 생성 클릭합니다.

 

이렇게 모든 쓰기 유형 이벤트의 로그를 clean-zone 버킷에 기록하도록 CloudTrail을 구성했습니다.

 

② Step Function 상태 머신을 트리거하기 위한 EventBridge 규칙 생성

파이프라인을 테스트하기 전에 마지막 작업은 Step Function 상태 시스템을 트리거하는 EventBridge 규칙을 구성하는 것입니다.

  1. Amazon EventBridge 서비스로 이동합니다.
  2. 이 책의 모든 실습에 사용했던 region에 있는지 확인합니다.
  3. 왼쪽 패널에서 규칙에 들어가서 규칙생성 클릭합니다.
  4. 규칙 이름으로 dataeng-s3-trigger-rule 입력하고 다음을 클릭합니다.
  5. 이벤트 패턴 작성에서 생성 방법패턴 양식 사용을 선택합니다.
  6. AWS 서비스는 Simple Storage Service(S3)를 선택합니다. 이벤트 유형AWS API Call via CloudTrail을 선택합니다.
  7. 특정 작업 선택하고 PutObjec, CopyObject, CompleteMultipartUpload 작업을 하나씩 직접 추가합니다. 다음을 클릭합니다.

  8. 대상 선택드롭다운 목록에서 Step Functions 상태 머신을 선택합니다.

  9. 상태 머신은 위에서 생성한 ProcessFileStateMachine 선택합니다.

  10. 다른 설정은 그대로 두고 규칙 생성 클릭합니다.

 

이렇게 데이터 파이프라인을 오케스트레이션하는 이벤트 기반 워크플로를 구성했습니다.

 

③ 이벤트 기반 데이터 오케스트레이션 파이프라인 테스트

드디어 마지막 작업입니다! 파이프라인을 테스트하기 위해 clean-zone S3 버킷에 파일을 업로드해야 합니다. 파일이 업로드되면 Amazon EventBridge에서 생성한 규칙에 따라 Step Function 상태 시스템이 트리거됩니다.

 

  1. Amazon S3 서비스로 이동합니다.
  2. 버킷 목록에서 dataeng-clean-zone-<initials> 버킷을 클릭합니다.
  3. 필요에 따라 테스트를 위해 이 버킷에 새 폴더를 만듭니다.
  4. 업로드, 파일 추가 를 차례로 클릭합니다 . 컴퓨터에서 확장자가 CSV인 파일을 찾습니다(파일을 찾을 수 없는 경우 비어 있는 새 파일을 만들고 파일이 CSV 확장자로 저장되었는지 확인).
  5. 다른 설정은 그대로 두고 업로드를 클릭합니다.
  6. AWS Step Function 서비스로 이동합니다.
  7. 이전에 생성한 상태 머신( ProcessFilesStateMachine )을 클릭합니다. Executions 목록에서 상태 시스템이 Succeeded인지 Failed인지 확인합니다. 자세한 내용을 보려면 실행의 이름 속성을 클릭하세요.
  8. 동일한 .csv 파일을 다시 업로드하고(필요한 경우 여러 번) 일부 실행이 어떻게 성공하고 일부는 실패하는지 확인합니다. 난수 생성기는 숫자 1 또는 2를 생성할 확률이 66%이고 숫자 0을 생성할 확률이 33%입니다. 숫자 0이 생성되면 함수가 실패하므로 많은 실행을 통해 약 1/3이 실패해야 합니다.
  9. 실행 실패 후 Amazon SNS 알림 서비스를 구성할 때 지정한 이메일 주소를 확인하세요. 이전에 SNS 구독을 확인한 경우 상태 머신이 실패할 때마다 이메일을 받아야 합니다.
  10. 이제 동일한 Amazon S3 버킷에 다른 파일을 업로드하되 이 파일의 확장자가 .csv가 아닌 다른 파일(예: PDF)인지 확인합니다. 상태 머신에 대한 실행 세부 정보를 볼 때 선택 상태가 Pass – Invalid File Ext 상태로 진행된 후 SNS를 게시하는 것을 볼 수 있습니다.

 


이벤트 패턴 작성 화면이 책의 화면과 구성이 달라서, 파이프라인을 테스트하기 위한 파일을 업로드하는 버킷을 설정해 주지  못했습니다. 그래서 마지막 테스트 때 에러가 발생했습니다. 이 부분을 해결하면 아래로 이어서 블로깅하겠습니다.

댓글