import yfinance as yf
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ticker = "005930.KS" # 삼성전자 주식 코드
df = yf.download(ticker, start="2023-01-01", end="2023-12-31")
# 입력 특성 (Open, High, Low, Volume) 및 타겟 (Close) 설정
X = df[['Open', 'High', 'Low', 'Volume']].values
y = df['Close'].values
# 데이터 정규화
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_normalized = scaler_X.fit_transform(X)
# 정답값 열벡터로 줄 것이다
y_normalized = scaler_y.fit_transform(y.reshape(-1, 1))
# 시퀀스 데이터 생성 함수
def create_sequences(data_X, data_y, seq_length, predict_length):
xs, ys = [], []
for i in range(len(data_X) - seq_length - predict_length):
x = data_X[i:i+seq_length]
y = data_y[i+seq_length:i+seq_length+predict_length]
xs.append(x)
ys.append(y)
# y값 인덱스 헷갈릴때 인덱스를 출력 해보자.
# if i >= (len(data_X) - seq_length - predict_length):
# print(i+seq_length:i+seq_length+predict_length)
return np.array(xs), np.array(ys)
seq_length = 200 # 입력 시퀀스 길이 200일치.
predict_length = 30 # 예측할 일수 30일 예측.
X_seq, y_seq = create_sequences(X_normalized, y_normalized, seq_length, predict_length)
# numpy array -> torch tensor
X_seq = torch.FloatTensor(X_seq)
y_seq = torch.FloatTensor(y_seq).squeeze(-1)
X_seq.shape, y_seq.shape
# 데이터셋 및 데이터로더 생성
split = int(len(X_seq) * 0.8)
train_X, test_X = X_seq[:split], X_seq[split:]
train_y, test_y = y_seq[:split], y_seq[split:]
train_dataset = TensorDataset(train_X, train_y)
test_dataset = TensorDataset(test_X, test_y)
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# LSTM 모델 정의
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, predict_length):
super(LSTMModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.predict_length = predict_length
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional =False)
self.fc = nn.Linear(hidden_size * 200, predict_length)
def forward(self, x):
h0 = torch.zeros(self.num_layers , x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers , x.size(0), self.hidden_size).to(device)
out, _ = self.lstm(x, (h0, c0))
out = out.reshape(out.shape[0], -1)
#out = self.fc(out[:, -1, :]) # 모든 배치(32개 데이터)의 마지막레이어의 모든 피처값을 가져와라
return out
# BiLSTM 모델 정의
class BiLSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, predict_length):
super(BiLSTMModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.predict_length = predict_length
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional =True)
self.fc = nn.Linear(hidden_size * 2, predict_length)
def forward(self, x):
h0 = torch.zeros(self.num_layers*2 , x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers*2 , x.size(0), self.hidden_size).to(device)
out, _ = self.lstm(x, (h0, c0))
#print(out.shape) # torch.Size([12, 200, 100])
out = self.fc(out[:, -1, :]) # 모든 배치(32개 데이터)의 마지막레이어의 모든 피처값을 가져와라
return out
input_size = X_seq.size(2)
hidden_size = 50
num_layers = 1
#model = LSTMModel(input_size, hidden_size, num_layers, predict_length).to(device)
model = BiLSTMModel(input_size, hidden_size, num_layers, predict_length).to(device)
# 손실 함수 및 옵티마이저 설정
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
# 모델 훈련
num_epochs = 100
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
running_loss += loss.item()
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
# 모델 평가 및 예측
model.eval()
with torch.no_grad():
# 3개 데이터 torch.Size([3, 200, 4])
test_predictions = model(test_X.to(device)).cpu().numpy()
# 정답값 torch.Size([3, 30])
test_actual = test_y.cpu().numpy()
# print(test_predictions.shape) (3, 30)
# 데이터 리스케일링 복원
test_predictions = scaler_y.inverse_transform(test_predictions)
test_actual = scaler_y.inverse_transform(test_actual)
# # 전체 실제 주가 데이터 시각화 (파란색 선)
plt.figure(figsize=(14,5))
plt.plot(y, label='Actual Prices', color='blue') # 실제 Close 값
# # 마지막 예측 결과 (주황색 선)
plt.plot(range(len(y) - predict_length, len(y)), test_predictions[-1], label='Predicted Prices', color='orange')
plt.title('Samsung Electronics Stock Price Prediction')
plt.xlabel('Time Steps')
plt.ylabel('Price (KRW)')
plt.legend()
plt.show()