May 25, 2025

[Project-ttrade] #4 GP 알고리즘 (Python deap)

[Project-ttrade] #4  GP 알고리즘 (Python deap)

유전자 프로그래밍(Genetic Programming, GP)는 진화 알고리즘을 기반으로 학습, 추론, 문제 해결을 실현하는 방식입니다. 돌연변이 및 교차에 따른 유전자 선택을 적용해서 새로운 세대의 프로그램에 적용하는 방법으로, 일반적으로 각 세대의 구성원들은 이전 세대의 구성원들보다 평균적으로 더 적합합니다.

말은 어렵지만, 결국은 가장 적합한 파라미터를 찾는 과정입니다. 그 과정중에 초기 파라미터부터 좋은 건 남기고, 아닌 건 돌연변이해서 더 좋은 게 나올 때까지 실행하는 겁니다.

Python에서는 이 GP 알고리즘을 사용할 수 있게 deap 패키지가 있습니다.

Python - GP알고리즘 자동매매

ttrade는 자동 매매를 원하니까, 간단한 전략을 사용해 봅시다.
스토캐스틱 시그널로 매매를 진행하되, 가장 최적의 매매 포인트를 잡아 봅시다.

기본적인 구조는 다음과 같습니다.

  1. k, d = stochastic(k_window(14),d_window(3) )
  2. k > d & k > buy_point(20) : 구매
  3. k < d & k > sell_point(80) : 판매

여기서 k_window, d_window, k, d 의 최적 값을 구하는 알고리즘을 구현합니다.

변수 선언

최적 값을 구할 파라미터를 선언합니다. 파라미터의 최대/최소는 나중에 정합니다.

from deap import base,creator
import numpy as np

class Individual:
    def __init__(self, k_win, d_win, buy_p, sell_p):
        self.k_win = k_win
        self.d_win = d_win
        self.buy_p = buy_p
        self.sell_p = sell_p
    
    def __str__(self):
        return f"k_win({self.k_win}), d_win({self.d_win}), Buy/Sell {self.buy_p}/{self.sell_p}"
    
    def __len__(self):
        return 4
    
    def __getitem__(self, key):
        return [self.k_win, self.d_win, self.buy_p, self.sell_p][key]
    
    def __setitem__(self, key, value):
        if key == 0:
            self.k_win = value
        elif key == 1:
            self.d_win = value
        elif key == 2:
            self.buy_p = value
        elif key == 3:
            self.sell_p = value
        else:
            raise IndexError("Index out of range")

creator.create("FitnessMax",base.Fitness,weights=(1.0,))
creator.create("Individual",Individual,fitness=creator.FitnessMax)

def create_individual():            # 초기값 생성 함수
    return creator.Individual(
        np.random.randint(4,24),    # k_win
        np.random.randint(1,10),    # d_win
        np.random.randint(5,35),    # buy_p
        np.random.randint(60,100)   # sell_p
    )

creator.create("FitnessMax", ... ) 는 최댓값을 구하겠다는 뜻입니다.
weights=(-1.0,) 값은 최솟값을 구하겠다는 뜻입니다.
weights=(1.0,-1.0) 으로 여러 값을 최대/최소 로 구할 수 있습니다.

거래 환경 선언

변수를 이용해서 모의 거래를 진행하고, 수익률을 나타낼 함수를 구성합니다.

def stochastic_signal(df,k_window=14,d_window=3,buy=20,sell=80):
    k,d = stochastic(df,k_window,d_window)
    signal = np.where((k > d)&(k > buy),1,0)
    signal = pd.Series(np.where((k < d)&(k > sell),-1,signal),index=df.index)
    return signal

def fitness_function(individual,df):
    sum = 1                                         # 수익률
    
    signal = stochastic_signal(df,                  # 스토캐스틱 시그널
        individual.k_win,                           # individual의 값으로 구하기
        individual.d_win,
        individual.buy_p,
        individual.sell_p
    )
    
    mask = signal != 0                               # signal = 0 이면 무시하고
    signal = signal.loc[mask]
    df_filtered = df.loc[mask]                       # df index 맞추기

    mask = signal != signal.shift()                  # signal 연속이면 무시하고
    signal = signal.where(mask,0)
    mask = signal != 0
    signal = signal.loc[mask]
    df_filtered = df_filtered.loc[mask]
    
    if len(signal) < 5:                              # 거래가 너무 적으면 다
        return (-1,)
    
    if signal.iloc[-1] == 1:                         # 살 때의 값 가져오기
        buy = df['close'][signal == 1].iloc[:-1].reset_index(drop=True)
    else:                                            # 마지막 사고 안판 건 무시
        buy = df['close'][signal == 1].reset_index(drop=True)
    
    if signal.iloc[0] == -1:                         # 팔 때의 값 가져오기
        sell = df['close'][signal == -1].iloc[1:].reset_index(drop=True)
    else:                                            # 안사고 판 건 무시
        sell = df['close'][signal == -1].reset_index(drop=True)
    
    gain = ((sell - buy) / buy)                      # 각 거래의 수익률 계산
    performance = (1 + gain - 0.0025).cumprod()      # 수수료 떼고 수익률 곱
    sum *= performance.iloc[-1]                      # 수익률 확인
    return (sum,) 

벡터 연산 만세입니다. for문으로 했다가 내 나이를 매수하는 게 수익률이 좋겠어요.

Crossover 와 Mutation 함수

GP 알고리즘은 교배와 돌연변이로 최적의 값을 찾아냅니다.
교배는 서로 다른 변수들의 일정 부분을 섞는 것입니다.

# cxTwoPoint의 경우 index가 ind1 ~ ind2 사이를 전부 섞습니다. 
Before:
  ind1 = [1, 2, 3, 4, 5]
  ind2 = [10,20,30,40,50]

After cxTwoPoint(ind1, ind2):
  ind1 = [1, 20, 30, 4, 5]
  ind2 = [10, 2, 3, 40, 50]

랜덤을 위해 교배함수를 직접 만들어 줍시다. 각각 50% 확률로 교환합니다.

def custom_crossover(ind1, ind2):
    child1, child2 = creator.Individual(ind1.k_win, ind1.d_win, ind1.buy_p, ind1.sell_p), creator.Individual(ind2.k_win, ind2.d_win, ind2.buy_p, ind2.sell_p)
    if random.random() < 0.5:
        child1.k_win, child2.k_win = child2.k_win, child1.k_win
    if random.random() < 0.5:
        child1.d_win, child2.d_win = child2.d_win, child1.d_win
    if random.random() < 0.5:
        child1.buy_p, child2.buy_p = child2.buy_p, child1.buy_p
    if random.random() < 0.5:
        child1.sell_p, child2.sell_p = child2.sell_p, child1.sell_p
    return child1, child2

돌연변이 함수도 만들어 줍니다. 변수의 최소/최대 값을 주어주고 일정 확률로 변이가 일어나게 합니다. 확률과 최소/최대값은 설정할 때 넣어줍니다.

def custom_mutation(individual, indpb, low, up):
    for i in range(len(individual)):
        if random.random() < indpb:
            individual[i] = random.randint(low[i], up[i])
    return individual,

GP 알고리즘 함수

내가 어떤 조건으로 거래를 할 지 조건을 설정하고, 조건에 대한 환경변수를 설정했습니다. 이제 알고리즘을 실행할 환경을 만들어 줍니다.

from deap import tools,algorithms
toolbox = base.Toolbox()

개체 설정

이제 환경에서 살아갈 개체들을 생성해 줍니다. 얘네들이 교배하고 변이할 녀석들 입니다.

toolbox.register("individual",create_individual)
toolbox.register("population",tools.initRepeat,list,toolbox.individual)
  1. individual 이란 개체와 초기 함수를 설정해 줍니다.
  2. individual 이란 개체를 여러개 생성해 초기 집단을 만들어 줍니다.
toolbox.register("mate",custom_crossover)
toolbox.register("mutate",custom_mutation,low=[4,1,5,60],up=[24,10,35,100], indpb=0.2)
toolbox.register("select",tools.selTournament,tournsize=3)
  1. 교배할 방법으로 custom_crossover를 선택합니다.
  2. 변이할 방법으로 custom_mutation을 선택하고, 최소/최대/변이확률을 설정합니_다. (k_win :4~24, d_win : 1~10, buy_p : 5~35, sell_p: 60~100)
  3. 3개 개체(tournsize) 중에서 토너먼트(selTournament) 방식으로 가장 적합한 개체를 선택하는 방법을 사용합니다.
toolbox.register("evaluate",fitness_function,df=df)
  1. 평가를 할 함수를 설정합니다. fitness_function의 값으로 평가를 진행합니다.
population = toolbox.population(n=50)
hof = tools.HallOfFame(1)
  1. population 함수로 초기 집단 50개체를 생성합니다.
  2. 최고 개체 "1"개만 저장합니다.
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg",np.mean)
stats.register("max",np.max)
  1. 통계 기록용 객체를 생성합니다. (적합도 값만 추출합니다. ind.fitness.values)
  2. 각 세대의 평균 적합도를 기록합니다.
  3. 각 세대의 최대 적합도를 기록합니다.
population,logbook = algorithms.eaSimple(
    population,toolbox,
    cxpb=0.7,mutpb=0.2,
    ngen=generations,
    stats=stats,halloffame=hof,verbose=True
)
best_individual = tools.selBest(population,k=1)[0]
  1. 유전 알고리즘을 실행합니다.
    (cxpb: 교배 확률, mutpb: 변이 확률, ngen: 전체 세대 수, verbose: 로그 출력)
  2. 최종 집단에서 적합도가 가장 높은 개체 1명을 선택합니다.

실행

최종 개체와 그 값을 확인합니다.

print(f"Best individual: {best_individual}")
print(f"Best fitness: {best_individual.fitness.values[0]}")
print(f"Final performance: {fitness_function(best_individual,[data])}")

결과

ML과 다르게 유전 알고리즘은 학습으로 파라미터를 수정하거나 하지 않기 때문에 CPU로 돌려도 충분할 뿐더러 결과도 잘 나옵니다.

[Project-ttrade] #5 GP 알고리즘 사용 결과
[Project-ttrade] #4 GP 알고리즘 (Python deap)유전자 프로그래밍(Genetic Programming, GP)는 진화 알고리즘을 기반으로 학습, 추론, 문제 해결을 실현하는 방식입니다. 돌연변이 및 교차에 따른 유전자 선택을 적용해서 새로운 세대의 프로그램에 적용하는 방법으로, 일반적으로 각 세대의 구성원들은 이전 세대의 구성원들보다 평균적으로 더 적합합니다. 말은 어렵지만, 결국은 가장 적합한 파라미터를 찾는

Comments