Python은 기본적으로 single-process, single-thread로 동작한다. GIL (Global Interpreter Lock) 이라는 것이 있어서 다중 스레드로 인한 데이터 오염을 막는 동시에 다중 프로세싱까지 막아버린다. 그렇잖아도 스크립트 언어라 느린데 멀티프로세싱이 쉽지많은 않으니 죽을 맛이다. GIL을 버린다 버린다 토론은 하는 모양인데 동시 접근으로 인한 문제를 고려하기가 어려우니 지지부진한가 보다. 애초에 다중 프로세싱 환경을 고려한 Rust 같은 언어로 갈아타면 되겠지만 아직 Python이 좋다...

한편 멀티프로세싱 또는 멀티스레딩이 필요한 경우는 크게 둘로 나눌 수 있다.

  1. 병렬화가 작업 레벨이든, 데이터 레벨이든 가능하긴 한 상황을 상정한다.
  2. I/O 작업이 오래 걸리는 경우, I/O 기다리는 동안 다른 작업을 수행해 보면 좋다. 이 때는 굳이 비싼 새 프로세스를 만들 것 없이 스레드를 하나 만들고 GIL을 새 스레드에 넘겨주도록 한다고 한다. ''concurrent.future.ThreadPoolExecutor''로 편리하게 할 수 있으나 안 써봤다.

1에 대해서는 3.5부터 도입된 AsyncIO를 공부하여 써먹어야 하겠다. 좋아보이는 정리글

  1. CPU 작업이 오래 걸리는 경우, 코어 하나를 공유하는 스레드로는 안 되고 여러 프로세스를 만들어서 최신 CPU의 다코어를 활발히 활용해야 한다. 문제는 비용 값싼 프로세스간 데이터 공유 방법을 적용하기.

이 중 두 번째 문제에 대해 Py3.2부터 도입된 ProcessPoolExecutor와 NumPy 행렬 공유를 위한 공유 메모리 multiprocessing.sharedctypes 를 사용해 보았다.

ProcessPoolExecutor는 프로세스간 통신을 Pickle을 통해 수행하는 다중 프로세스 관리 툴이다. Pickle을 쓰니 성능이 그리 좋지는 않을 것 같다. 연산 결과를 취합하는 가장 일반적인 방침은 메인 프로세스에서 Queue를 사용해서 결과를 모으는 것이 되겠다. 그러나 매우 큰 행렬을 대상으로 연산을 수행하는 경우, 데이터를 주고 받느라 (특히 여기서는 느린 Pickle로) 시간을 잡아먹게 되겠다. 공유 메모리를 사용하는 것이 각 프로세스에게 행렬을 보내거나 새로 만들고, 연산 결과를 받아오느라 불필요한 시간이 들지는 않으니 좋을 테지만, 공유 메모리에 질서를 지켜 접근하느라 병렬 처리의 매력이 떨어질 것이다.

그 전에 ProcessPoolExecutor만 사용해서, 각 프로세스에서 연산한 결과를 메인 프로세스에서 결합하는 예제이다.

공식문서/ProcessPoolExecutor 참고 https://www.reddit.com/r/learnpython/comments/4vpj1w/populate_numpy_array_through_concurrentfutures/

import concurrent.futures
squares = np.zeros((20, 2))
def make_square(i):
    print('iteration', i)
    # compute expensive data here ...
    # return row number and the computed data
    return i, ([i, i**2])
with concurrent.futures.ProcessPoolExecutor(2) as executor:
    for row, result in executor.map(make_square, range(20)):
        squares[row] = result
squares

다음으로 NumPy array를 공유 메모리에 담아서, 각 프로세스가 연산 결과를 바로 공유 메모리에 저장하는 경우이다.

공유 메모리 NumPy array

S_ctypes = None
def sq_prc(i, size, shape):
#     print('iteration', i)
    if i == 0:  # preserve first row
        return None
    from numpy import ctypeslib
    S = ctypeslib.as_array(S_ctypes)
    S.shape = shape
    S[i] = [i, i**2]
    return None

def run_conc(p_num):
    global S_ctypes  # very ugly...

    import numpy as np
    import concurrent.futures
    from numpy import ctypeslib
    S = np.zeros((20, 2), dtype=np.int32)
    S[0, 0] = 10; S[0, 1] = 20
    # copy NumPy array into shared memory
    from multiprocessing import sharedctypes
    size, shape = S.size, S.shape
    S.shape = size
    S_ctypes = sharedctypes.RawArray('i', S)  # double type 'd'; int type 'i'
    S = np.frombuffer(S_ctypes, dtype=np.int32, count=size)
    S.shape = shape
    with concurrent.futures.ProcessPoolExecutor(p_num) as executor:
        fs = []
        for i in range(20):
            f = executor.submit(sq_prc, i, size, shape)
            fs.append(f)
        concurrent.futures.wait(fs)  # has to wait each process finish

run_conc(8)    # or any number up to the number of your cpu core

https://stackoverflow.com/questions/8804830/python-multiprocessing-pickling-error 공식문서/Pickleable objects Pickle 가능한 객체 제한 때문에 worker function을 최상위 레벨로 끌어올린 뒤, 무식하게 공유 메모리 주소 변수를 전역 변수로(...) 두고는 메인 함수 run_conc()에서 같은 변수를 가져다 썼다. 부끄럽다...

반응형

+ Recent posts