카테고리 보관물: Python

numba guvectorize 활용하기

numba의 guvectorize를 사용하면 array 형태의 값 처리의 속도를 매우 빠르게 할 수 있다.

우선 실행 결과를 먼저 보겠다. aaa는 guvectorize를 사용해서 연산한 경우이고, bbb는 그냥 python 코드로 실행한 결과이다. ccc는 numpy의 벡터 연산을 실시한 결과다. guvectorize를 적용했을 때 단순 for loop를 사용했을 때에 비해 약 300배 가까이 빠른 실행을 보였다. numpy의 벡터 연산을 활용하면 for loop을 사용했을 때 보다 1400배 가까이 빠르게 실행되었다.

aaa: elapsed=0.0275056362s
bbb: elapsed=9.2325780392s
ccc: elapsed=0.0066127777s
>>> 9.2325780392 / 0.0275056362
335.6613158142476
>>> 9.2325780392 / 0.0066127777
1396.1724494685493

결론부터 말하자면, Array의 단순 연산은 numpy의 벡터연산이 가장 빠르다. 특정 로직에 따른 연산이 필요할 땐 guvectorize를 활용하자.

guvectorize의 사용법을 간단히 살펴보자. guvectorize의 경우에는 값을 return하지 않는다. 대신에 파라미터에 대한 선언을 정확히 해 주면 된다.

@guvectorize(['void(int64[:], int64[:])'], '(n) -> (n)')
def function_name(x, y):

순서대로 int64[:], int64[:]는 뒤의 (n) -> (n)과 매칭된다.

Array가 아닌 단순 값을 파라미터로 추가하는 경우에는 아래와 같이 처리하면 된다. 데이터 타입의 정의 개수와 입력, 출력 파라미터의 형식, 그리고 함수의 정의까지 모두 일치해야 한다.

@guvectorize(['void(int64[:], boolean, int64[:])'], '(n),() -> (n)')
def function_name(x, twist, y):

아래에는 위의 결과를 만든 소스 코드를 붙인다.

import time
import numpy as np
from numba import guvectorize


def timer(func):
    def measure(*args, **kwargs):
        begin = time.time()
        val = func(*args, **kwargs)
        end = time.time()
        print(f">> {func.__name__}: elapsed={'%.10f' % (end - begin)}s")
        return val

    return measure

@timer
@guvectorize(['void(int64[:], int64[:])'], '(n) -> (n)')
def aaa(x, y):
    for i, val in enumerate(x):
        y[i] = val + 3 * 3 / 5 * 1.23 / 6.53

@timer
def bbb(x):
    y = np.empty([len(x), 1])
    for i, val in enumerate(x):
        y[i] = (val + 3 * 3 / 5 * 1.23 / 6.53)
    return y

@timer
def ccc(x):
    return x + 3 * 3 / 5 * 1.23 / 6.53

a = np.int32(np.round(np.random.rand(5000000) * 1000))
out = aaa(a)
out = bbb(a)
out = ccc(a)

매매로직 재사용

빠른 배포가 중요하다는 포스팅에서 이야기 한 방법은 사실 안정적이지 않다. 더 좋은 방법은 백테스트할 때 활용한 매매 시스템의 로직을 트레이딩 시스템에서 동일하게 활용하는 방법이다. 1호 시스템을 만들고 나서, 앞으로 2호 이후의 시스템들을 만들 생각을 하니 매번 모든 코드를 작성해야 하는 것은 비효율적이라는 생각이 들었다. 매매로직을 모듈화해서 백테스트와 트레이딩 시스템에 모두 동일하게 활용할 수 있게 한다면, 백테스트에서 실전 매매 시스템 개발까지의 기간을 최소화 할 수 있을 것이라는 생각이 들었고, 이 작업에 이제 곧 착수할 예정이다.

백테스트의 성능을 저하시키지 않으면서, 트레이딩 시스템에 어떻게 장착할 수 있을지에 대한 설계가 필요하다. 특히 키움의 QEventLoop 사용방식은 코드의 복잡도를 증가시키기 때문에 ES7의 async, await 처럼 비동기를 동기식으로 사용할 수 있는 방법을 찾아보려고 한다.

그리고 앞으로는 TDD를 사용하여 시스템을 개발하려고 한다. 과거의 경험을 비추어봐도 TDD로 개발할 때 생산성이 가장 좋았고, 오류가 적었다. 1호 시스템을 개발하면서 매매로직 테스트하는 부분만 TDD를 활용하긴 했는데, case에 대한 정리가 부족한 탓에 오류 상황들이 발생했다. 그럼에도 테스트코드가 있어서 빠르게 문제를 발견하고 해결할 수 있었다.

쨌든 Do not Repeat Yourself(DRY 원칙)를 시스템 구축에도 활용해서 시스템 제작의 생산성을 높여야겠다.

Python에서 용량이 큰 테이블의 insert, update 속도 문제

테이블 하나에 약 800만개의 레코드를 가진 테이블(약 1.2GB)을 만들고 분석할 기회가 생겼는데, 매일의 데이터 변경사항에 대해 추가하거나 업데이트를 해야 한다.

약 1,000개의 레코드를 업데이트하는데 25초 정도 걸렸다. 분명 뭔가 잘못됐다는 신호다. 초당 40개 정도의 업데이트밖에 되지 않았다.

해결책은 그리 복잡하지 않았다. 필자가 했던 적용법들은 다음과 같다.

1. select query를 사용하여 레코드 하나씩 1,000번씩 쿼리를 던져서 필요한 값들을 확인하던 것을 한 번의 쿼리로 작업 단위에 필요한 만큼 불러온 후, python에서 관련 값들을 확인하고 처리하도록 변경했다.

2. insert시 1,000개의 insert query를 실행하던 것을 다음과 같이 한 개의 insert query로 변경했다. 이 방법을 사용하는 경우 query문이 너무 길어지는 경우 실행되지 않을 수 있다. my.cnf에 관련 값을 얼마로 설정했느냐에 따라 달라지는 것으로 보이는데, key값이 지금은 기억나지 않는다. 추후에 확인해 보고 추가해 놓으려 한다.

insert into 테이블명 (필드1, 필드2, ...) values (값1a, 값2a, ...), (값1b, 값2b, ...), ..., (값na, 값 nb, ...)

3. update시 1,000개의 query를 실행하는 것은 동일하지만, where절에 키 두 개를 사용해서 update 하던 것을 primary key 하나만 사용하도록 변경하였다.

위의 3가지 방법을 모두 적용한 후 1,000개 레코드 처리에 25초 정도 걸리던 것은 평균 0.6초 이내로 처리시간이 단축되었다.

적용하기 전에는 Mysqld 프로세스의 CPU 점유율이 단일코어기준으로 100%이었는데, 변경 후에는 약6% 언저리에서 작동했다. 작업 환경은 다음과 같다. CPU는 인텔 G4400, 메모리 8G, SSD 128G, OS는 Windows 10, MariaDB 버전은 10.2를 사용하였다. 그리고 프로그래밍 언어는 Python3에서 MySQLdb를 사용하였다.

테이블 용량이 조금 크다고 해서 테이블을 여러개로 쪼개는 것이 우선이 아니라는 것을 이번에 배웠다. 특히 테이블 크기가 큰 경우에는 DB의 부하를 줄이는 방향으로 최적화할 필요가 있음을 느꼈다. 재미있는 경험이었으며, 기본적인 원칙이 얼마나 중요한지 다시금 생각해 보는 계기가 됐다.