!pip install yfinance
import yfinance as yf
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import RobustScaler, MinMaxScaler
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from torch.utils.data import TensorDataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 주식 데이터
ticker = "139130.KS"
df = yf.download(ticker, start="2015-01-01", end="2024-01-01")
df = df.reset_index()
display(df.head(1))

X_scaler = MinMaxScaler()
df[['Open', 'High', 'Low', 'Volume']] = X_scaler.fit_transform(df[['Open', 'High', 'Low', 'Volume']])

y_scaler = MinMaxScaler()
df[['Close']] = y_scaler.fit_transform(df[['Close']])
print(df[['Close']].shape)
X = df[['Open', 'High', 'Low', 'Volume']].values
y = df['Close'].values
def seq_data(x, y, sequence_length):
  x_seq = [] # sequence 길이만큼 X를 추출해서 담을 리스트
  y_seq = [] # 정답값을 담을 리스트
  for i in range(len(x) - sequence_length): # 데이터 길이에서 5일치를 뽑아내기 때문에 seq_length만큼 남겨둬야함
    x_seq.append(x[i: i+sequence_length]) # 5일치 slice
    y_seq.append(y[i+sequence_length]) # 6일차 데이터
                                                                  # 정답을 열벡터로 줘야해서.
  return torch.FloatTensor(x_seq).to(device), torch.FloatTensor(y_seq).view([-1, 1]).to(device) # float형 tensor로 변형, gpu사용가능하게 .to(device)를 사용.
# train valid 쪼갤 기준. 200개를 train으로 쓰겠다는 것
# 200일치 데이터를 학습, 나머지를 valid로 사용할 것.
split = 200

# rnn state 몇개로 설정할건지, 5일치로 다음날(6일차) 예측
sequence_length = 5 # (5일치)

# rnn 전용 데이터 만드는 전처리. (feature, y값, 5)
x_seq, y_seq = seq_data(X, y, sequence_length)

# train 200개, valid 226개
x_train_seq = x_seq[:split]
y_train_seq = y_seq[:split]
x_test_seq = x_seq[split:]
y_test_seq = y_seq[split:]
print(x_train_seq.size(), y_train_seq.size())
print(x_test_seq.size(), y_test_seq.size())
train = TensorDataset(x_train_seq, y_train_seq)
test = TensorDataset(x_test_seq, y_test_seq)

batch_size = 16
train_loader = DataLoader(dataset=train, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(dataset=test, batch_size=batch_size, shuffle=False)
# 피처 갯수
input_size = x_seq.size(2)

# RNN 2층
num_layers = 2

# RNN hidden cell 1개 4노드
hidden_size = 4
class VanillaRNN(nn.Module):
  def __init__(self, input_size, hidden_size, sequence_length, num_layers, device):
    super(VanillaRNN, self).__init__()

    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
    self.fc = nn.Sequential(nn.Linear(hidden_size * sequence_length, 1))

  def forward(self, x):

    h0 = torch.zeros(self.num_layers, x.size()[0], self.hidden_size).to(device)
    out, _ = self.rnn(x, h0)
    out = out.reshape(out.shape[0], -1)
    out = self.fc(out)
    return out

model = VanillaRNN(input_size=input_size,
                   hidden_size=hidden_size,
                   sequence_length=sequence_length,
                   num_layers=num_layers,
                   device=device).to(device)
criterion = nn.MSELoss()

lr = 1e-3
num_epochs = 200
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_graph = [] # 그래프 그릴 목적인 loss.
n = len(train_loader)

for epoch in range(num_epochs):
  running_loss = 0.0

  for data in train_loader:

    seq, target = data # 배치 데이터.
    #print(seq.shape) # 16, 5, 4

    out = model(seq)   # 모델에 넣고,
    loss = criterion(out, target) # output 가지고 loss 구하고,

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    running_loss += loss.item() # 한 배치의 loss 더해주고,

  loss_graph.append(running_loss / n) # 한 epoch에 모든 배치들에 대한 평균 loss 리스트에 담고,
  if epoch % 10 == 0:
    print('[epoch: %d] loss: %.4f'%(epoch, running_loss/n))
plt.figure(figsize=(20,10))
plt.plot(loss_graph)
plt.show()
with torch.no_grad():
  train_pred = []
  test_pred = []

  # 200개 만큼 prediction 뽑아내고
  for data in train_loader:
    seq, target = data
    out = model(seq)

    # out 결과값을 계속 concatenate 시킨다.
    train_pred += out.cpu().numpy().tolist()

  # 228개 만큼 prediction 뽑음.
  for data in test_loader:
    seq, target = data
    out = model(seq)

    # out 결과값을 계속 concatenate 시킨다.
    test_pred += out.cpu().numpy().tolist()

# 두개 이어 붙이기
total = train_pred + test_pred
#                      # n x 4 컬럼의 제로행렬, n x 1 prediction 컬럼 = n x 5
# total = torch.concat([torch.zeros(len(total),4 ), torch.tensor(total)] ,axis = 1)
# print(total.shape)
# total = scaler.inverse_transform(total)[:,-1]
# print(total.shape)

total = y_scaler.inverse_transform(total)
print(total.shape)
# new_actual= df['Close'][sequence_length:].values
# new_actual = torch.concat([torch.zeros(len(new_actual),4), torch.tensor(new_actual)] ,axis = 1)
# print(new_actual.shape)
# new_actual = scaler.inverse_transform(new_actual)[:,-1]
# print(new_actual.shape)
new_actual = y_scaler.inverse_transform(df['Close'][sequence_length:].values)
print(new_actual.shape)
plt.figure(figsize=(20,10))
# train과 test의 경계
plt.plot(np.ones(100)*len(train_pred), np.linspace(0,1,100), '--', linewidth=0.6)
#실제 주가 데이터를 표시
plt.plot(new_actual, '--')
#모델이 예측한 주가 데이터
plt.plot(total, 'b', linewidth=0.6)
plt.legend(['train boundary', 'actual', 'prediction'])
plt.show()

+ Recent posts