[Home-K8S] #22 FluxCD 계층과 분리 / 다중 클러스터 리소스 공유와 설정 분리
FluxCD - yaml 앞서 fluxcd 를 이용해서 helm chart 를 구성했습니다. 그 외에 일반적인 yaml
학습은 어디서 데이터를 구해서 한다고 해도, 매매에 들어가게 되면 실시간으로 데이터를 받아와야 합니다.
국내의 거래소는 API를 제공해서 그냥 값을 가져오면 됩니다.
소켓으로 실시간 거래 데이터를 가져오는 방법도 있고, 분/시간 데이터로 ohlcv를 가져오는 경우도 있습니다.
interval_len=int(60/interval)
while q.empty(): continue
price,_,_=q.get()
interval_data=[[price]*4+[0]]*interval_len
now=time.time()
start=time.localtime(now)
print(f'Start at {start.tm_year}-{start.tm_mon}-{start.tm_mday} {start.tm_hour+9}:{start.tm_min}:{start.tm_sec} KST')
now_interval=now-now%interval+interval
try:
while True:
if time.time()<now_interval:
continue
index=int(now_interval%60/interval)
if not q.empty():
price,ttms,volume=q.get()
interval_data[index]=[price]*4+[volume]
while not q.empty():
price,ttms,volume=q.get()
if interval_data[index][1]>price: interval_data[index][1]=price
if interval_data[index][2]<price: interval_data[index][2]=price
interval_data[index][3]=price
else:
interval_data[index]=[interval_data[index-1][3]]*4+[0]
open=interval_data[(index+1)%interval_len][3]
close=interval_data[index][3]
low=min(row[1] for row in interval_data)
high=max(row[2] for row in interval_data)
volume=sum(row[4] for row in interval_data)
kf_message(topic,message={'tick':tick,'timestamp':now_interval,'open':open,'low':low,'high':high,'close':close,'value':volume})
now_interval+=interval
left_time=now_interval-time.time()
if left_time<0: print("interval is too short")
time.sleep(left_time-0.035)
except Exception as e:
print(f"Error message: {str(e)}")
finally:
kf.close()
분 단위의 초단기 트레이딩을 할 생각은 없지만, 소켓으로 진행하려고 합니다.
ohlcv를 직접 가져오게 되면 바로 바로바로 반영되지 않는가 봐요. 가끔 요청 시각의 데이터가 없거나 안들어 오기도 하더라구요.
Pod를 한번에 여러개 띄워 두고, 한 Pod만 데이터를 가져오려고 합니다.
다른 Pod는 대기하고 있다가 리더가 죽으면 시작하는 방식으로 사용합니다.
from kazoo.client import KazooClient
from kazoo.recipe.election import Election
from candle import candle_interval
import os
election_path="/ttrade/"+os.environ['TICK']
zookeeper_host=os.environ.get('ZK_SERVICE','zk-cs.zookeeper.svc:2181')
zk=KazooClient(hosts=zookeeper_host)
if __name__=="__main__":
try:
zk.start()
election = Election(zk,election_path)
print("Zookeeper Election")
election.run(candle_interval)
except KeyboardInterrupt:
print("Shutting down...")
finally:
zk.stop()
zk.close()다양한 종류의 가상화폐를 사용할 수 있으니 해당 Tick의 이름으로 Path를 설정합니다.
kafka에 메시지를 보내는 코드를 작성합니다. strimzi-kafka에서 topic을 설정해줘도 되지만, 안해도 메시지가 등록되긴 하더라구요.
from kafka import KafkaProducer
import json
import os
tick=os.environ['TICK']
topic=os.environ['TOPIC']
kafka_host=os.environ.get('KAFKA_SERVICE','my-cluster-kafka-bootstrap.kafka.svc:9092')
kf=KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=[kafka_host],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def kf_message(topic,message):
future=kf.send(topic,value=message)
try:
record_metadata = future.get(timeout=5)
except Exception as e:
print(f"Failed to send message: {str(e)}")kafka에 Producer를 생성하고, message를 보내는 함수를 작성합니다.
거래소의 공식 문서를 보고 코드를 작성해 줍니다. 빗썸, 업비트 다 비슷합니다.
import websockets
import asyncio
import json
async def upbit_ws_client(q, ticker):
uri="wss://api.web.com/websocket/v1"
while True:
try:
async with websockets.connect(uri,ping_interval=1440) as websocket:
subscribe_fmt=[
{"ticket": "test1234"},
{
"type": "ticker",
"codes": [ticker],
"isOnlyRealtime":True
},
{"format": "SIMPLE"}
]
subscribe_data=json.dumps(subscribe_fmt)
await websocket.send(subscribe_data)
while True:
data=await websocket.recv()
data=json.loads(data)
data=(data['tp'],data['ttms']/1000.0,data['tv'])
q.put(data)
except websockets.exceptions.ConnectionClosedError as e:
print(f"WebSocket connection closed: {e}. Retrying...")
await asyncio.sleep(5)
except Exception as e:
print(f"An error occurred: {e}. Retrying...")
await asyncio.sleep(5)
async def main(q,ticker):
await upbit_ws_client(q,ticker=ticker)
def update_data(q,ticker):
asyncio.run(main(q,ticker))웹소켓을 연결해서 q에 데이터를 넣는 방식입니다.
q에 쌓은 데이터를 가져와서 사용할 수 있는 저장할 데이터로 처리해 줍니다.
import multiprocessing as mp
from bitsocket import update_data
import time
def candle_interval():
print('Working pod')
q=mp.Queue()
p=mp.Process(name="Producer",target=update_data,args=(q,tick,),daemon=True)
p.start() while q.empty(): continue # 값이 들어오면
price,_,_=q.get()
interval_len=int(60/interval)
interval_data=[[price]*4+[0]]*interval_len now=time.time()
start=time.localtime(now)
print(f'Start at {start.tm_year}-{start.tm_mon}-{start.tm_mday} {start.tm_hour+9}:{start.tm_min}:{start.tm_sec} KST')
now_interval=now-now%interval+interval
try:
while True:
if time.time()<now_interval:
continue index=int(now_interval%60/interval)
if not q.empty():
price,ttms,volume=q.get()
interval_data[index]=[price]*4+[volume]
while not q.empty():
price,ttms,volume=q.get()
if interval_data[index][1]>price: interval_data[index][1]=price
if interval_data[index][2]<price: interval_data[index][2]=price
interval_data[index][3]=price
else:
interval_data[index]=[interval_data[index-1][3]]*4+[0] open=interval_data[(index+1)%interval_len][3]
close=interval_data[index][3]
low=min(row[1] for row in interval_data)
high=max(row[2] for row in interval_data)
volume=sum(row[4] for row in interval_data)
kf_message(topic,message={'tick':tick,'timestamp':now_interval,'open':open,'low':low,'high':high,'close':close,'value':volume})
now_interval+=interval
left_time=now_interval-time.time()
if left_time<0: print("interval is too short")
time.sleep(left_time-0.035) except Exception as e:
print(f"Error message: {str(e)}")
finally:
kf.close()이제 값이 잘 들어가고 있는 지 확인 해 봅니다.
from kafka import KafkaConsumer, TopicPartition
import json
topic = 'krw-btc'
kafka_host='my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092'
consumer=KafkaConsumer(
topic,
bootstrap_servers=[kafka_host],
group_id='group1',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=30000
)
try:
for message in consumer:
print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
finally:
consumer.close()짧은 시간동안 많은 data가 쌓였습니다.

이제 이 data를 가공해서 지표로 바꾸고 torchserve로 보내면 예측값이 나옵니다.
Comments