캔들차트 분석 - 주가등락 이진 분류 part1

주가데이터를 이용해 캔들차트를 생성하고 이진분류기를 이용해 일일 주가의 등락을 예측한다. 주가의 등락을 단순하게 0, 1 로만 레이블링한다. 하루차이의 등락을 예측하기 때문에 노이즈가 당연히 많을 것이다. 추후에는 하루 차이의 등락이 아닌 일정기간의 텀을 둔 기간의 트렌드를 예측한다면 좀 더 실제 투자에 적용하기에 적합한 모델이 나올 것이라고 예측한다.

S0. 종목 선택

  • KOSPI200 의 시가총액 1위인 삼성전자 주식

S1. 데이터 찾기

  • FinanceDataReader를 통해 불러온 일일 주가데이터를 사용한다.

불러온 데이터를 디렉토리를 따로 지정하여 저장한다.

def removeOutput(filepath):
    if(Path(filepath)).is_file():
        os.remove(filepath)
    
def csv_initiator(market, ticker, head_date, tail_date):
    df = fdr.DataReader(ticker, head_date, tail_date, exchange=market)
    
    filedir = os.getcwd() + '\\dataset\\raw_data\\'
    filename = "{}_{}.csv".format(market, ticker)    
    filepath = filedir + filename
    
    if not os.path.exists(filedir):
        os.makedirs(filedir)    
    removeOutput(filepath)
    df.to_csv(filepath)
    
    print("csv file saved as : {}".format(filepath))
    
    return filepath ---

S2. 데이터 전처리

  • labeld data 생성

기간이 일정부분 중복되며 연속된 캔들차트들이 데이터가 분류 대상이 된다.

기간은 일단은 임의대로 설정하며 기간의 어떤 기간으로 설정했을때 좋은 결과가 나오는지도 중요한 포인트가 될 것이다.

레이블링하는 함수는 다음과 같다.

레이블링 대상 시퀀스의 길이와 트렌드 판단의 대상 시퀀스의 길이는 20으로 설정하고 둘 사이 거리인 gap은 0으로 설정했다.


def seq_seq_trend(csv_path, seq_len, tre_len, gap=0, seq_mode='close_mean', tre_mode='close_mean'):
    print("Creating label . . .")
    print("type : Sequence to Sequence")
    print("sequence_length : {}, trend_sequence_length : {}, gap : {}".format(seq_len, tre_len, gap))
    
    # 데이터프레임으로 일일주가데이터 불러오기, 결측치 제거
    df = pd.read_csv(csv_path, parse_dates=True, index_col=0)
    df.fillna(0)
    df = df[df.Open != 0]
    df.reset_index(inplace=True)
    df['Date'] = df['Date'].map(mdates.date2num) # Y-M-D 포멧에서 num 포멧으로 변경
        
    # 파일을 저장할 디렉토리 명과 파일이름 지정
    filedir = os.getcwd() + '\\dataset\\labeled_data\\'
    filename = "{}_label_seq{}_tseq{}_gap{}.txt".format(csv_path.split('\\')[-1][0:-4], seq_len, tre_len, gap) # ex) KRX_005930_label_seq30_tseq10_gap0
    filepath = filedir + filename
        
    # 디렉토리가 없을시 생성, 같은이름의 파일 제거
    if not os.path.exists(filedir):
        os.makedirs(filedir)
    removeOutput(filepath)

    # 레이블링
    for i in range(0, len(df) - tre_len - seq_len - gap):
        seq_df = df[i : i + seq_len]
        tre_df = df[i + seq_len + gap : i + seq_len + gap + tre_len]
        seq_sum = 0
        tre_sum = 0
        
        for j in range(seq_len):
            seq_sum = seq_sum + seq_df['Close'].iloc[j]
        seq_mean = seq_sum / seq_len
        
        for j in range(tre_len):
            tre_sum = tre_sum + tre_df['Close'].iloc[j]
        tre_mean = tre_sum / tre_len

        tmp_rtn = tre_mean - seq_mean
        
        if tmp_rtn > 0:
            label = 1
        else:
            label = 0
        # 레이블링한 sequence를 한 라인으로 파일에 입력        
        with open(filepath, 'a') as the_file:
            the_file.write("{}--{},{}".format(filename[0:-4], i, label))
            the_file.write("\n")

    print("Create label finished.")
    return filepath ---

위 함수들의 실행은 다음과 같이 메인코드를 작성한다.

데이터불러오기 - 데이터 전처리, 레이블링 - 캔들차트생성 까지의 과정을 실행한다.


market = 'KRX'
ticker = '005930'
seq_len = 20
tre_len = 20
head_date = '2000-01-01'
tail_date = '2022-01-01'
dimension = 60
use_volume = False

data_csv_path = csv_initiator(market, ticker, head_date, tail_date)
label_set_path = seq_seq_trend(data_csv_path, seq_len=seq_len, tre_len=tre_len)
candlechart_dir = candlechart_generator(data_csv_path, seq_len=seq_len, tre_len=tre_len, dimension=dimension, use_volume=use_volume) ---

이제 머신러닝 모델을선택하여서 적용시켜야한다. 이번에 선택할 모델은 sklearn에 있는 classifier 몇가지를 사용하고, gridsearchCV를 이용해서 하이퍼파라미터를 튜닝 할 것이다.

모델에 적용시키기전에 만들어둔 레이블데이터와 캔들차트 데이터들을 데이터어레이로 전환해야한다.


def image2dataarray(candlechart_dir):
    symbol = candlechart_dir.split('\\')[-3]
    imgs = list([])
    for i in range(len(os.listdir(candlechart_dir))):
        imgname = '{}-{}.png'.format(symbol, i)
        im = Image.open(candlechart_dir+imgname)
        pixels = list(im.getdata())
        img = list([])
        for pixel in pixels:
            for j in range(3): # Excepting alpha value
                img.append(pixel[j])
        imgs.append(img)
        # print(imgs)
    return np.array(imgs)

def label2dataarray(label_set_path):
    label_array = list([])
    with open(label_set_path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            label = int(line.split(',')[-1])
            label_array.append(label)
        return np.array(label_array)

candle_array = image2dataarray(candlechart_dir)
label_array = label2dataarray(label_set_path)
print(len(candle_array), len(label_array))

전체세트의 앞 80% 를 train 세트, 나머지를 test 셋으로 사용하기로 한다.


X_all = candle_array
y_all = label_array

slicer = int(0.8 * len(candle_array))

X_train, y_train = X_all[:slicer], y_all[:slicer]
X_test, y_test = X_all[slicer:], y_all[slicer:] ---

첫번째로 사용할 분류기는 사이킷런의 SGDClassifier이다.

데이터셋 크기 줄이기

  • mpl_finance의 candlestick2_ochl() 넘겨받은 데이터를 가지고 설정한 width를 가진 캔들차트를 생성한다. 픽셀수가 많을수록 연산시간이 늘어나므로 특성은 변하지 않으면서도 픽셀의 수를 줄이는 방법이 필요하다. 하지만 픽셀수를 너무 줄이면 캔들차트가 정상적인 모습을 가지지 않고 뭉게지는 현상이 발생한다. 단위 시퀀스의 길이와 캔들 하나의 width를 적절하게 설정해야만 깔끔하고 일정한 모양의 캔들차트가 만들어진다고 생각했고 적절한 크기가 얼마인지에 대해서 여러 시도를 해 보았다.

  • 연속된 두 캔들의 겹침 : 시퀀스는 길고 width의 크기는 크고 figure의 사이즈는 작으면 연속된 캔들끼리 겹치게된다.

seq_len=20, width=1, 1inch * 1inch, dpi = 109 seq_len = 10, width=0.5, 1inch * 1inch, dpi = 109
20 10

캔들차트끼리 겹치지 않더라도 open-close 막대와 high-low 선의 상대적 위치가 캔들마다 다르게 되는 경우가 많은것을 확인 할 수 있었다. 사람이 봤을때 형태가 깔끔하지 않고 구분하기 힘들다면 머신러닝 모델의 학습 정확도 또한 낮아질 것이라는 가정을 전제로 하고 캔들차트 생성 함수의 파라미터를 조정해 보기로 했다. 그러나 mplfinance의 캔들차트 생성함수를 사용해서 캔들차트를 생성하고 matplotlib로 그래프를 생성했을 때 캔들차트 에서 차트부분 이외에도 상하좌우로 여백이 생기기떄문에 필요없는 부분이 생기게 된다. 그래서 직접 캔들차트를 생성해 보기로 했다.

머신러닝 모델에 최적화된 비트맵 캔들차트 생성

  • 캔들차트 생성

mpl_finance의 candlestick2_ochl 대신에 candlechart4ML 의 candlechart_generator을 사용했다.


def candlechart_generator(csv_path, seq_len, tre_len, dimension, use_volume):
    print("Converting ohlc to candlestick")
    
    # 데이터프레임으로 일일주가데이터 불러오기, 결측치 제거
    df = pd.read_csv(csv_path, parse_dates=True, index_col=0)
    df.fillna(0)
    df = df[df.Open !=0]
    df.reset_index(inplace=True)
    df['Date'] = df['Date'].map(mdates.date2num) # Y-M-D 포멧에서 num 포멧으로 변경
    
    # 파일을 저장할 디렉토리 명과 파일이름 지정
    symbol = csv_path.split('\\')[-1][0:-4]
    filedir = os.getcwd() + '\\dataset\\candle_chart\\{}\\seq{}_dim{}_vol{}\\'.format(
                                symbol, seq_len, dimension, use_volume) # ex) seq30_dim536_volFalse
    
    # 디렉토리가 없을시 생성, 이미 디렉토리가 있다면 내용물 삭제하고 다시 생성
    if not os.path.exists(filedir):
        os.makedirs(filedir)
    else:
        shutil.rmtree(filedir)
        os.makedirs(filedir)
    
    for i in range(0, len(df)-seq_len-tre_len):
        pix_data = np.zeros([dimension, dimension, 3], dtype=np.uint8)
        tmp_df = df.iloc[i:i + seq_len]
        seq_upper = np.max([tmp_df['Open'].max(), tmp_df['Close'].max(), tmp_df['High'].max(), tmp_df['Low'].max()]) # 구간 최대 경계값
        seq_lower = np.min([tmp_df['Open'].min(), tmp_df['Close'].min(), tmp_df['High'].min(), tmp_df['Low'].min()]) # 구간 최소 경계값
        seq_differ = seq_upper - seq_lower
        
        for j in range(0, seq_len):
            if tmp_df.iloc[j]['Open'] < tmp_df.iloc[j]['Close']:
                pix_color = [255, 0, 0] # red-color
            else:
                pix_color = [0, 0, 255] # blue-color
                
            high_pos = int(dimension * (tmp_df.iloc[j]['High'] - seq_lower)  / seq_differ)
            low_pos = int(dimension * (tmp_df.iloc[j]['Low'] - seq_lower) / seq_differ)
            pix_data[low_pos:high_pos, 3*j + 1] = pix_color
            
            open_pos = int(dimension * (tmp_df.iloc[j]['Open'] - seq_lower)  / seq_differ)
            close_pos = int(dimension * (tmp_df.iloc[j]['Close'] - seq_lower)  / seq_differ)
            pix_data[np.min([close_pos, open_pos]):np.max([close_pos, open_pos]), [3*j, 3*j+2]] = pix_color
        
        filename = '{}-{}'.format(symbol, i)
        filepath = filedir + filename
            
        image = Image.fromarray(pix_data, 'RGB')
        image.save(filepath+'.png', 'png')
    print("Converting finished")
    
    return filedir ---

다음 포스트에서 계속->

Updated: