일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Round
- seaborn
- having
- 머신러닝
- airflow.cfg
- 데이터리안 웨비나
- 데이터시각화
- PostgreSQL
- SQL
- 데이터분석
- Limit
- 결측값
- join
- 파이썬
- SUM
- solvesql
- MySQL
- GROUPBY
- airflow 설치
- 전처리
- Oracle
- matplotlib
- hackerrank
- SQLite
- TRUNCATE
- 프로그래머스
- 다중 JOIN
- 그로스해킹
- not in
- pandas
- Today
- Total
Milky's note
[Python] AWS Glue에서 비동기 처리 (concurrent.futures) 본문
◎ concurrent.furue 모듈
기존에 파이썬에서는 스레드를 구현하려면 threading 모듈을 사용하고 멀티 프로세스 프로그램을 구현하려면 multiprocessing 모듈을 사용해야 했었다. 하지만 3.2 버전부터는 concurrent.futures 모듈이 도입되었다.
concurrent.futures 모듈은 비동기적으로 callable을 실행하는 고수준 인터페이스를 제공하기 때문에
이 모듈을 사용하면 같은 규칙으로 스레드와 멀티 프로세스 코드를 더 쉽게 작성할 수 있다.
- Executor
ThreadPoolExecutor
ProcessPoolExecutor - Future
· Executor 클래스
Executor 클래스는 풀 기반으로 작업을 관리하고 비동기적으로 호출을 실행하는 메서드를 제공하는
추상 클래스다.
초기화 시에 몇 개의 worker가 사용될 것 인지를 정해주면 전달되는 작업들을 큐에 넣고 worker pool에서
사용 가능한 worker로 하여금 작업을 처리하게 한다. 직접 사용해서는 안되며, 구체적인 하위 클래스를 통해 사용해야 한다.
Executor의 서브 클래스는 스레드를 사용해서 비동기 호출을 실행할 것인가(ThreadPoolExecutor),
별도의 프로세스를 사용해서 비동기 호출을 실행할 것인가(ProcessPoolExecutor)에 따라 나뉜다.
1) ThreadPoolExecutor
- 스레드를 사용하여 작업을 비동기적으로 실행
- 쓰레드는 경량이므로 작업 전환 비용이 낮음
- 주로 I/O 바운드 작업(예: 데이터베이스 쿼리, 파일 I/O, 네트워크 호출)에 적합
- 동일한 프로세스 내에서 스레드를 사용하기 때문에 데이터 공유가 쉽고 오버헤드가 적음
- 대기 시간을 줄이고 리소스 사용을 줄일 수 있음
2) ProcessPoolExecutor
- 프로세스를 사용하여 작업을 비동기적으로 실행
- 각 프로세스는 독립적인 메모리 공간을 가지므로 CPU 및 메모리 자원을 더 많이 사용
- CPU 바운드 작업(예: 계산 집약적인 작업)에 적합
- 여러 CPU 코어에서 병렬로 작업을 수행할 수 있음
- 메모리를 공유하지 않으므로 데이터 공유가 어렵고 통신 비용이 더 듦
[코드 예시]
queries = [Query1, Query2]
start = time.time()
# Query Execution
def execute_query(query_string, database, output_location):
client = boto3.client('athena', region_name='ap-northeast-2')
query_start = client.start_query_execution(
QueryString=query_string,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': output_location}
)
return query_start
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
for query in queries:
executor.submit(execute_query, query, 'db', 's3://temp/')
end = time.time()
AWS Glue에서 수행한 코드 중의 일부인데 DynamoDB에서 2개의 쿼리를 불러와서 ThreadPool, ProcessPool로 각각 비동기로 처리한 코드이다.
데이터의 건수는 약 10만건, 테이블의 컬럼 수는 30개 정도이고
쿼리는 left join, full join이 있는 조금 복잡한 쿼리이다.
[수행 결과]
ThreadPoolExecutor 수행시간 : 132.19272017s
ProcessPoolExecutor 수행시간 : 163.20456743s
단순 테스트 결과는 위와 같이 나왔다.
I/O가 많은 쿼리에는 ThreadPool을 사용해서 Glue Job를 만들어야겠다!!
추가로 Executor 관련 함수와 future 클래스도 곧 내용 정리해서 올릴 예정이다.
'Python > 요약 정리' 카테고리의 다른 글
[pandas] Grand Total 구하기 (0) | 2024.12.09 |
---|---|
[Python] URL encoding (request 파라미터 ASCII 변환) (0) | 2024.03.26 |
[Pandas] 데이터프레임 컬럼 SHA256으로 암호화하기 (0) | 2024.01.08 |
[Pandas] 데이터프레임 컬럼 순서 변경 (1) | 2023.11.13 |
[Pandas] 데이터프레임 groupby로 연산된 컬럼 추가 (transform) (0) | 2023.11.13 |