- 목적/배경
- https://blog.eomsh.com/187 에서 AWS SES를 사용해서 이메일 발송 서비스를 구축했다면 bounce 관리가 필요하다고 간단히만 코멘트
- 그런데, 메일함이 꽉 차서 유저에게 메시지를 띄워주거나, 반송되는 이메일 주소에게 본인의 어플리케이션이 메일을 다시 보내지 않게 하려면 메일 주소를 어플리케이션에 DB로 구축해야함
- 수신거부 등의 이벤트도 수집 가능
- 방법
- AWS SES 웹 콘솔에서 Notification 설정 -> SNS로 전송 -> Lambda 호출 -> 작성한 파이썬 코드 호출
- 추가 로직이 필요 없으면 SNS에서 바로 http api호출해도 될 것 같기는 함
- 참고 링크
- 간단히 작성한 파이썬 소스코드
- AWS Lambda runtime 파이썬 레이어에 request 사용을 위해 추가 후 아래 코드 실행되게 함(참고 링크)
- 해당 API가 호출되어 저장되는 DB DDL과 Java 어플리케이션도 있는데 그건 추후 로그에 작성해보겠음
import json
import os
import requests
AWS_ACCOUNT_ID = "입력필요"
AWS_REGION_CD = "입력필요"
REQ_AUTH_KEY = "API호출할 때 보안을 위해서 사용하는 인증키"
SES_NOTIFICATION_SAVE_API_URL = "https://블라블라/" + AWS_ACCOUNT_ID + "/" + AWS_REGION_CD + "/블라블라"
def lambda_handler(event, context):
ses_notification = event['Records'][0]['Sns']['Message']
data_json = json.loads(ses_notification)
notification_type = data_json['notificationType']
if notification_type == "AmazonSnsSubscriptionSucceeded":
print(f"AmazonSnsSubscriptionSucceeded는 저장하지 않습니다.(SNS로 구독 성공시 이벤트) data_json: {data_json}")
return
data_json_str = json.dumps(data_json, ensure_ascii=False).encode('utf-8')
result_save_api = request_http_save(SES_NOTIFICATION_SAVE_API_URL, 10, data_json_str)
print(f"result_save_api: {result_save_api}", end="\n\n")
def test_save_from_sample_file():
file_name = "test-bounce-exist-dns-data.json"
current_directory = os.getcwd()
test_json_file_path = os.path.join(current_directory, file_name)
with open(test_json_file_path, 'r', encoding='UTF8') as f:
data_json = json.load(f)
print(f"테스트 파일 {file_name}의 내용 ===>\n\t", json.dumps(data_json, indent=4, ensure_ascii=False), end="\n\n")
notification_type = data_json['notificationType']
if notification_type == "AmazonSnsSubscriptionSucceeded":
print(f"AmazonSnsSubscriptionSucceeded는 저장하지 않습니다. data_json: {data_json}")
return
data_json_str = json.dumps(data_json, ensure_ascii=False).encode('utf-8')
result_save_api = request_http_save(SES_NOTIFICATION_SAVE_API_URL, 10, data_json_str)
print(f"result_save_api: {result_save_api}", end="\n\n")
def request_http_save(uri: str, timeout_seconds: int, req_json_str: str) -> object:
"""
외부 API에 SES 알림 데이터 전송하는 HTTP통신용 함수
:param uri: 요청 URI
:param timeout_seconds: 타임아웃(초)
:param req_json_str: 요청할 json(문자가 아니라 json타입)
:return:
"""
print("\n === Start request_http_save. uri: " + uri)
headers = {
'reqAuthKey': REQ_AUTH_KEY,
'Content-Type': 'application/json; charset=utf-8'
}
try:
response = requests.request("POST", uri, headers=headers, timeout=timeout_seconds, data=req_json_str)
return {
'statusCode': response.status_code,
'body': response.text
}
except requests.exceptions.Timeout as errd:
print("Timeout Error : ", errd)
except requests.exceptions.ConnectionError as errc:
print("Error Connecting : ", errc)
except requests.exceptions.HTTPError as errb:
print("Http Error : ", errb)
except requests.exceptions.RequestException as erra:
print("AnyException : ", erra)