May 25, 2025

[Project-ttrade] #7 zk, Kafka - Python 실시간 거래소 데이터 가져오기

[Project-ttrade] #7  zk, Kafka - Python 실시간 거래소 데이터 가져오기

학습은 어디서 데이터를 구해서 한다고 해도, 매매에 들어가게 되면 실시간으로 데이터를 받아와야 합니다.
국내의 거래소는 API를 제공해서 그냥 값을 가져오면 됩니다.

소켓으로 실시간 거래 데이터를 가져오는 방법도 있고, 분/시간 데이터로 ohlcv를 가져오는 경우도 있습니다.


    interval_len=int(60/interval)
    while q.empty(): continue 
    price,_,_=q.get()
    interval_data=[[price]*4+[0]]*interval_len
    now=time.time()
    start=time.localtime(now)
    print(f'Start at {start.tm_year}-{start.tm_mon}-{start.tm_mday} {start.tm_hour+9}:{start.tm_min}:{start.tm_sec} KST')
    now_interval=now-now%interval+interval
    try:
        while True:
            if time.time()<now_interval:
                continue
            index=int(now_interval%60/interval)
            if not q.empty():
                price,ttms,volume=q.get()
                interval_data[index]=[price]*4+[volume]
                while not q.empty():
                    price,ttms,volume=q.get()
                    if interval_data[index][1]>price: interval_data[index][1]=price
                    if interval_data[index][2]<price: interval_data[index][2]=price
                    interval_data[index][3]=price
            else:
                interval_data[index]=[interval_data[index-1][3]]*4+[0]
            open=interval_data[(index+1)%interval_len][3]
            close=interval_data[index][3]
            low=min(row[1] for row in interval_data)
            high=max(row[2] for row in interval_data)
            volume=sum(row[4] for row in interval_data)
            kf_message(topic,message={'tick':tick,'timestamp':now_interval,'open':open,'low':low,'high':high,'close':close,'value':volume})
            now_interval+=interval
            left_time=now_interval-time.time()
            if left_time<0: print("interval is too short")
            time.sleep(left_time-0.035)
    except Exception as e:
        print(f"Error message: {str(e)}")
    finally:
        kf.close()

분 단위의 초단기 트레이딩을 할 생각은 없지만, 소켓으로 진행하려고 합니다.
ohlcv를 직접 가져오게 되면 바로 바로바로 반영되지 않는가 봐요. 가끔 요청 시각의 데이터가 없거나 안들어 오기도 하더라구요.

Zookeeper

Pod를 한번에 여러개 띄워 두고, 한 Pod만 데이터를 가져오려고 합니다.
다른 Pod는 대기하고 있다가 리더가 죽으면 시작하는 방식으로 사용합니다.

from kazoo.client import KazooClient
from kazoo.recipe.election import Election
from candle import candle_interval
import os

election_path="/ttrade/"+os.environ['TICK']
zookeeper_host=os.environ.get('ZK_SERVICE','zk-cs.zookeeper.svc:2181')
zk=KazooClient(hosts=zookeeper_host)

if __name__=="__main__":
    try:
        zk.start()
        election = Election(zk,election_path)
        print("Zookeeper Election")
        election.run(candle_interval)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        zk.stop()
        zk.close()

다양한 종류의 가상화폐를 사용할 수 있으니 해당 Tick의 이름으로 Path를 설정합니다.

Kafka

kafka에 메시지를 보내는 코드를 작성합니다. strimzi-kafka에서 topic을 설정해줘도 되지만, 안해도 메시지가 등록되긴 하더라구요.

from kafka import KafkaProducer
import json
import os

tick=os.environ['TICK']
topic=os.environ['TOPIC']
kafka_host=os.environ.get('KAFKA_SERVICE','my-cluster-kafka-bootstrap.kafka.svc:9092')
kf=KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=[kafka_host],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def kf_message(topic,message):
    future=kf.send(topic,value=message)
    try:
        record_metadata = future.get(timeout=5)
    except Exception as e:
        print(f"Failed to send message: {str(e)}")

kafka에 Producer를 생성하고, message를 보내는 함수를 작성합니다.

캔들 수집

거래소의 공식 문서를 보고 코드를 작성해 줍니다. 빗썸, 업비트 다 비슷합니다.

import websockets
import asyncio
import json

async def upbit_ws_client(q, ticker):
    uri="wss://api.web.com/websocket/v1"
    while True:
        try:
            async with websockets.connect(uri,ping_interval=1440) as websocket:
                subscribe_fmt=[
                    {"ticket": "test1234"},
                    {
                        "type": "ticker",
                        "codes": [ticker],
                        "isOnlyRealtime":True
                    },
                    {"format": "SIMPLE"}
                ]
                subscribe_data=json.dumps(subscribe_fmt)
                await websocket.send(subscribe_data)
                while True:
                    data=await websocket.recv()
                    data=json.loads(data)
                    data=(data['tp'],data['ttms']/1000.0,data['tv'])
                    q.put(data)
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"WebSocket connection closed: {e}. Retrying...")
            await asyncio.sleep(5)
        except Exception as e:
            print(f"An error occurred: {e}. Retrying...")
            await asyncio.sleep(5)
async def main(q,ticker):
    await upbit_ws_client(q,ticker=ticker)
def update_data(q,ticker):
    asyncio.run(main(q,ticker))

웹소켓을 연결해서 q에 데이터를 넣는 방식입니다.

캔들 처리

q에 쌓은 데이터를 가져와서 사용할 수 있는 저장할 데이터로 처리해 줍니다.

  1. q를 만들어서 캔들을 수집하는 Process를 실행합니다.
    이제 q에 데이터가 쌓입니다.
import multiprocessing as mp
from bitsocket import update_data
import time

def candle_interval():
    print('Working pod')
    q=mp.Queue()
    p=mp.Process(name="Producer",target=update_data,args=(q,tick,),daemon=True)
    p.start()
  1. q의 값을 하나 가져와 현재의 ohlcv를 채웁니다. (volume: 0)
    (interval_len 은 1분에 쌓이는 데이터의 갯수입니다. )
    (interval_data 는 1분간의 행 입니다. )
    while q.empty(): continue                       # 값이 들어오면
    price,_,_=q.get()
    interval_len=int(60/interval)                   
    interval_data=[[price]*4+[0]]*interval_len
  1. 수집 시간을 정하고 그 때까지 q를 쌓고 기다립니다.
    (30초마다 수집하는데 현재 시각이 1분15초일 경우 1분 30초부터 수집 시작)
    now=time.time()
    start=time.localtime(now)
    print(f'Start at {start.tm_year}-{start.tm_mon}-{start.tm_mday} {start.tm_hour+9}:{start.tm_min}:{start.tm_sec} KST')
    now_interval=now-now%interval+interval
    try:
        while True:
            if time.time()<now_interval:
                continue
  1. 시간이 되면 q에 있는 값들을 채워 넣습니다. 초기값으로 채우고, 값을 조정합니다.
    (interval이 30초면 row[0] 에 0~30초의 데이터, row[1]에 30~1분의 데이터)
            index=int(now_interval%60/interval)
            if not q.empty():
                price,ttms,volume=q.get()
                interval_data[index]=[price]*4+[volume]
                while not q.empty():
                    price,ttms,volume=q.get()
                    if interval_data[index][1]>price: interval_data[index][1]=price
                    if interval_data[index][2]<price: interval_data[index][2]=price
                    interval_data[index][3]=price
            else:
                interval_data[index]=[interval_data[index-1][3]]*4+[0]
  1. raw에 있는 값들을 가지고 ohlcv를 구해서 kafka message에 넣습니다.
            open=interval_data[(index+1)%interval_len][3]
            close=interval_data[index][3]
            low=min(row[1] for row in interval_data)
            high=max(row[2] for row in interval_data)
            volume=sum(row[4] for row in interval_data)
            kf_message(topic,message={'tick':tick,'timestamp':now_interval,'open':open,'low':low,'high':high,'close':close,'value':volume})
  1. 다음 interval 시각을 구하고 시작 전까지 좀 쉽니다.
            now_interval+=interval
            left_time=now_interval-time.time()
            if left_time<0: print("interval is too short")
            time.sleep(left_time-0.035)
  1. 쉬고 난 후에 while 문에 따라 interval 시각까지 기다리고 시작합니다.
  2. except와 finally 설정을 해줍니다.
    except Exception as e:
        print(f"Error message: {str(e)}")
    finally:
        kf.close()

값 확인

이제 값이 잘 들어가고 있는 지 확인 해 봅니다.

from kafka import KafkaConsumer, TopicPartition
import json

topic = 'krw-btc'
kafka_host='my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092'

consumer=KafkaConsumer(
    topic,
    bootstrap_servers=[kafka_host],
    group_id='group1',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    consumer_timeout_ms=30000
)

try:
    for message in consumer:
        print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
finally:
    consumer.close()

짧은 시간동안 많은 data가 쌓였습니다.

이제 이 data를 가공해서 지표로 바꾸고 torchserve로 보내면 예측값이 나옵니다.

k8s-TorchTrade/trade/kafka at main · DogRing/k8s-TorchTrade
Contribute to DogRing/k8s-TorchTrade development by creating an account on GitHub.

Comments