python 멀티프로세싱 - process간 데이터 교환 디자인패턴
2023-03-12
.
Data_Engineering & Python_TIL(20230312)
인프런 “고수가 되는 파이썬 : 동시성과 병렬성 문법 배우기 Feat. 멀티스레딩 vs 멀티프로세싱 (Inflearn Original)” 강의를 공부하고 정리한 내용입니다.
** URL : https://www.inflearn.com/course/프로그래밍-파이썬-완성-인프런-오리지널
[공부한 내용]
** 파이썬 참고문서 : https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes
디자인패턴 예시 1. Queue를 이용한 멀티프로세스간 데이터 교환 예시코드
# 프로세스 통신 구현 - Queue
from multiprocessing import Process, Queue, current_process
import time
import os
# 실행 함수
def worker(id, baseNum, q):
process_id = os.getpid()
process_name = current_process().name
# 누적
sub_total = 0
# 계산
for i in range(baseNum):
sub_total += 1
# Produce
q.put(sub_total)
# 정보 출력
print(f"Process ID: {process_id}, Process Name: {process_name}")
print(f"Result : {sub_total}")
def main():
# 부모 프로세스 아이디
parent_process_id = os.getpid()
# 출력
print(f"Parent process ID {parent_process_id}")
# 프로세스 리스트 선언
processes = list()
# 시작 시간
start_time = time.time()
# Queue 선언
q = Queue()
# 프로세스 생성 및 실행
for i in range(5): # 1 ~ 100 적절히 조절
# 생성
t = Process(name=str(i), target=worker, args=(1, 100000000, q))
# 배열에 담기
processes.append(t)
# 시작
t.start()
# Join
for process in processes:
process.join()
# 순수 계산 시간
print("--- %s seconds ---" % (time.time() - start_time))
# 종료 플래그
q.put('exit')
total = 0
# 대기
while True:
tmp = q.get()
if tmp == 'exit':
break
else:
total += tmp
print()
print("Main-Processing Total_count={}".format(total))
print("Main-Processing Done!")
if __name__ == "__main__":
main()
위에 코드를 실행하면 아래와 같이 출력됨
Parent process ID 77373
Process ID: 77376, Process Name: 1
Result : 100000000
Process ID: 77375, Process Name: 0
Result : 100000000
Process ID: 77377, Process Name: 2
Result : 100000000
Process ID: 77379, Process Name: 4
Result : 100000000
Process ID: 77378, Process Name: 3
Result : 100000000
--- 3.417874813079834 seconds ---
Main-Processing Total_count=500000000
Main-Processing Done!
디자인패턴 예시 2. Pipe를 이용한 멀티프로세스간 데이터 교환 예시코드
# 프로세스 통신 구현 - Pipe
from multiprocessing import Process, Pipe, current_process
import time
import os
# 실행 함수
def worker(id, baseNum, conn):
process_id = os.getpid()
process_name = current_process().name
# 누적
sub_total = 0
# 계산
for _ in range(baseNum):
sub_total += 1
# Produce
conn.send(sub_total)
conn.close()
# 정보 출력
print(f"Process ID: {process_id}, Process Name: {process_name}")
print(f"Result : {sub_total}")
return True
def main():
# 부모 프로세스 아이디
parent_process_id = os.getpid()
# 출력
print(f"Parent process ID {parent_process_id}")
# 시작 시간
start_time = time.time()
# Pipe 선언
parent_conn, child_conn = Pipe()
processes = list()
shared_result = 0
# 프로세스 생성 및 실행
for i in range(5):
# 생성
t = Process(target=worker, args=(1, 100000000, child_conn))
# 배열에 담기
processes.append(t)
# 시작
t.start()
# Join
for process in processes:
shared_result += parent_conn.recv()
process.join()
# 순수 계산 시간
print("--- %s seconds ---" % (time.time() - start_time))
print("Main-Processing : {}".format(shared_result))
print("Main-Processing Done!")
if __name__ == "__main__":
main()
Parent process ID 77328
Process ID: 77332, Process Name: Process-3
Result : 100000000
Process ID: 77334, Process Name: Process-5
Result : 100000000
Process ID: 77333, Process Name: Process-4
Result : 100000000
Process ID: 77330, Process Name: Process-1
Result : 100000000
Process ID: 77331, Process Name: Process-2
Result : 100000000
--- 3.3975470066070557 seconds ---
Main-Processing : 500000000
Main-Processing Done!