[Home-K8S] #22 FluxCD 계층과 분리 / 다중 클러스터 리소스 공유와 설정 분리
FluxCD - yaml 앞서 fluxcd 를 이용해서 helm chart 를 구성했습니다. 그 외에 일반적인 yaml
유전자 프로그래밍(Genetic Programming, GP)는 진화 알고리즘을 기반으로 학습, 추론, 문제 해결을 실현하는 방식입니다. 돌연변이 및 교차에 따른 유전자 선택을 적용해서 새로운 세대의 프로그램에 적용하는 방법으로, 일반적으로 각 세대의 구성원들은 이전 세대의 구성원들보다 평균적으로 더 적합합니다.
말은 어렵지만, 결국은 가장 적합한 파라미터를 찾는 과정입니다. 그 과정중에 초기 파라미터부터 좋은 건 남기고, 아닌 건 돌연변이해서 더 좋은 게 나올 때까지 실행하는 겁니다.
Python에서는 이 GP 알고리즘을 사용할 수 있게 deap 패키지가 있습니다.
ttrade는 자동 매매를 원하니까, 간단한 전략을 사용해 봅시다.
스토캐스틱 시그널로 매매를 진행하되, 가장 최적의 매매 포인트를 잡아 봅시다.
기본적인 구조는 다음과 같습니다.
여기서 k_window, d_window, k, d 의 최적 값을 구하는 알고리즘을 구현합니다.
최적 값을 구할 파라미터를 선언합니다. 파라미터의 최대/최소는 나중에 정합니다.
from deap import base,creator
import numpy as np
class Individual:
def __init__(self, k_win, d_win, buy_p, sell_p):
self.k_win = k_win
self.d_win = d_win
self.buy_p = buy_p
self.sell_p = sell_p
def __str__(self):
return f"k_win({self.k_win}), d_win({self.d_win}), Buy/Sell {self.buy_p}/{self.sell_p}"
def __len__(self):
return 4
def __getitem__(self, key):
return [self.k_win, self.d_win, self.buy_p, self.sell_p][key]
def __setitem__(self, key, value):
if key == 0:
self.k_win = value
elif key == 1:
self.d_win = value
elif key == 2:
self.buy_p = value
elif key == 3:
self.sell_p = value
else:
raise IndexError("Index out of range")
creator.create("FitnessMax",base.Fitness,weights=(1.0,))
creator.create("Individual",Individual,fitness=creator.FitnessMax)
def create_individual(): # 초기값 생성 함수
return creator.Individual(
np.random.randint(4,24), # k_win
np.random.randint(1,10), # d_win
np.random.randint(5,35), # buy_p
np.random.randint(60,100) # sell_p
)creator.create("FitnessMax", ... ) 는 최댓값을 구하겠다는 뜻입니다.
weights=(-1.0,) 값은 최솟값을 구하겠다는 뜻입니다.
weights=(1.0,-1.0) 으로 여러 값을 최대/최소 로 구할 수 있습니다.
변수를 이용해서 모의 거래를 진행하고, 수익률을 나타낼 함수를 구성합니다.
def stochastic_signal(df,k_window=14,d_window=3,buy=20,sell=80):
k,d = stochastic(df,k_window,d_window)
signal = np.where((k > d)&(k > buy),1,0)
signal = pd.Series(np.where((k < d)&(k > sell),-1,signal),index=df.index)
return signal
def fitness_function(individual,df):
sum = 1 # 수익률
signal = stochastic_signal(df, # 스토캐스틱 시그널
individual.k_win, # individual의 값으로 구하기
individual.d_win,
individual.buy_p,
individual.sell_p
)
mask = signal != 0 # signal = 0 이면 무시하고
signal = signal.loc[mask]
df_filtered = df.loc[mask] # df index 맞추기
mask = signal != signal.shift() # signal 연속이면 무시하고
signal = signal.where(mask,0)
mask = signal != 0
signal = signal.loc[mask]
df_filtered = df_filtered.loc[mask]
if len(signal) < 5: # 거래가 너무 적으면 다
return (-1,)
if signal.iloc[-1] == 1: # 살 때의 값 가져오기
buy = df['close'][signal == 1].iloc[:-1].reset_index(drop=True)
else: # 마지막 사고 안판 건 무시
buy = df['close'][signal == 1].reset_index(drop=True)
if signal.iloc[0] == -1: # 팔 때의 값 가져오기
sell = df['close'][signal == -1].iloc[1:].reset_index(drop=True)
else: # 안사고 판 건 무시
sell = df['close'][signal == -1].reset_index(drop=True)
gain = ((sell - buy) / buy) # 각 거래의 수익률 계산
performance = (1 + gain - 0.0025).cumprod() # 수수료 떼고 수익률 곱
sum *= performance.iloc[-1] # 수익률 확인
return (sum,) 벡터 연산 만세입니다. for문으로 했다가 내 나이를 매수하는 게 수익률이 좋겠어요.
GP 알고리즘은 교배와 돌연변이로 최적의 값을 찾아냅니다.
교배는 서로 다른 변수들의 일정 부분을 섞는 것입니다.
# cxTwoPoint의 경우 index가 ind1 ~ ind2 사이를 전부 섞습니다.
Before:
ind1 = [1, 2, 3, 4, 5]
ind2 = [10,20,30,40,50]
After cxTwoPoint(ind1, ind2):
ind1 = [1, 20, 30, 4, 5]
ind2 = [10, 2, 3, 40, 50]랜덤을 위해 교배함수를 직접 만들어 줍시다. 각각 50% 확률로 교환합니다.
def custom_crossover(ind1, ind2):
child1, child2 = creator.Individual(ind1.k_win, ind1.d_win, ind1.buy_p, ind1.sell_p), creator.Individual(ind2.k_win, ind2.d_win, ind2.buy_p, ind2.sell_p)
if random.random() < 0.5:
child1.k_win, child2.k_win = child2.k_win, child1.k_win
if random.random() < 0.5:
child1.d_win, child2.d_win = child2.d_win, child1.d_win
if random.random() < 0.5:
child1.buy_p, child2.buy_p = child2.buy_p, child1.buy_p
if random.random() < 0.5:
child1.sell_p, child2.sell_p = child2.sell_p, child1.sell_p
return child1, child2돌연변이 함수도 만들어 줍니다. 변수의 최소/최대 값을 주어주고 일정 확률로 변이가 일어나게 합니다. 확률과 최소/최대값은 설정할 때 넣어줍니다.
def custom_mutation(individual, indpb, low, up):
for i in range(len(individual)):
if random.random() < indpb:
individual[i] = random.randint(low[i], up[i])
return individual,내가 어떤 조건으로 거래를 할 지 조건을 설정하고, 조건에 대한 환경변수를 설정했습니다. 이제 알고리즘을 실행할 환경을 만들어 줍니다.
from deap import tools,algorithms
toolbox = base.Toolbox()이제 환경에서 살아갈 개체들을 생성해 줍니다. 얘네들이 교배하고 변이할 녀석들 입니다.
toolbox.register("individual",create_individual)
toolbox.register("population",tools.initRepeat,list,toolbox.individual)toolbox.register("mate",custom_crossover)
toolbox.register("mutate",custom_mutation,low=[4,1,5,60],up=[24,10,35,100], indpb=0.2)
toolbox.register("select",tools.selTournament,tournsize=3)toolbox.register("evaluate",fitness_function,df=df)population = toolbox.population(n=50)
hof = tools.HallOfFame(1)stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg",np.mean)
stats.register("max",np.max)population,logbook = algorithms.eaSimple(
population,toolbox,
cxpb=0.7,mutpb=0.2,
ngen=generations,
stats=stats,halloffame=hof,verbose=True
)
best_individual = tools.selBest(population,k=1)[0]최종 개체와 그 값을 확인합니다.
print(f"Best individual: {best_individual}")
print(f"Best fitness: {best_individual.fitness.values[0]}")
print(f"Final performance: {fitness_function(best_individual,[data])}")ML과 다르게 유전 알고리즘은 학습으로 파라미터를 수정하거나 하지 않기 때문에 CPU로 돌려도 충분할 뿐더러 결과도 잘 나옵니다.

Comments