import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 삼성전자
ticker = '005930.KS'
data = yf.download(ticker, start="2010-01-01", end="2023-01-01")
# 데이터에서 사용할 피처 (종가, 거래량, 오픈가, 고가, 저가)
data = data[['Close', 'Volume', 'Open', 'High', 'Low']]
# 검증용 데이터 생성 (2023-01-01 ~ 2023-12-31)
valid_data = yf.download(ticker, start="2023-01-01", end="2023-12-31")
valid_data = valid_data[['Close', 'Volume', 'Open', 'High', 'Low']]
# 데이터 정규화
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
valid_scaled_data = scaler.transform(valid_data)
# 시퀀스 데이터 생성
def create_sequences(data, seq_length):
xs = []
ys = []
for i in range(len(data) - seq_length):
x = data[i:i + seq_length]
y = data[i + seq_length][0] # 하루 뒤 종가(Close)를 예측 대상으로 사용
xs.append(x)
ys.append(y)
return np.array(xs), np.array(ys)
seq_length = 60
X, y = create_sequences(scaled_data, seq_length)
X_train = torch.tensor(X, dtype=torch.float32).to(device)
y_train = torch.tensor(y, dtype=torch.float32).to(device)
X_valid, y_valid = create_sequences(valid_scaled_data, seq_length)
X_valid = torch.tensor(X_valid, dtype=torch.float32).to(device)
y_valid = torch.tensor(y_valid, dtype=torch.float32).to(device)
batch_size = 32
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
# 0부터 max_len-1까지의 정수를 포함하는 1차원 텐서를 생성후 (max_len, 1) 차원으로 만든다.
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# 0부터 d_model-1까지의 정수 중 짝수 인덱스만 포함하는 1차원 텐서를 생성
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
# 포지셔널 인코딩에서 각 차원의 주파수를 조절하는 스케일링 인자
pe[:, 0::2] = torch.sin(position * div_term) # 짝수 위치에 사인 함수 값.
pe[:, 1::2] = torch.cos(position * div_term) # 홀수 위치에 코사인 함수 값.
pe = pe.unsqueeze(1) # (max_len, d_model) -> (max_len, 1, d_model) 형태로 해서 나중에 x 랑 더할 것임.
# Transformer는 (sequence_length, batch_size, d_model) 형태의 입력을 받기 때문!
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :]
class Transformer(nn.Module):
def __init__(self, input_dim, nhead, num_layers, hidden_dim):
super(Transformer, self).__init__()
self.input_fc = nn.Linear(input_dim, hidden_dim) # 입력 차원을 hidden_dim으로 변환하기 위한 식.
self.positional_encoding = PositionalEncoding(hidden_dim) # 포지셔널 인코딩 모듈.
self.transformer = nn.Transformer(
d_model=hidden_dim,
nhead=nhead,
num_encoder_layers=num_layers,
num_decoder_layers=num_layers
)
self.fc = nn.Linear(hidden_dim, 1)
def forward(self, src):
src = self.input_fc(src) # 입력 차원을 hidden_dim으로 반환
src = self.positional_encoding(src.permute(1, 0, 2))
# src의 차원 순서 (batch_size, seq_len, feature_dim) 이기 때문에.
# (seq_len, batch_size, feature_dim)로 src의 차원 순서를 바꾸어,
# 포지셔널 인코딩 적용 (입력의 순서 정보를 추가)
# 트랜스포머의 인풋 쉐잎이 (seq_len, batch_size, feature_dim) 이기 때문.
output = self.transformer(src, src) # # Transformer 블록을 통해 시퀀스 데이터를 인코딩하고 디코딩.
output = self.fc(output[-1]) # 마지막 타임스텝만 사용
return output
input_dim = X_train.shape[2] # 피처의 수로 디멘전 설정
nhead = 2
num_layers = 2
hidden_dim = 64 # nhead의 배수로 설정해야함.
model = Transformer(input_dim, nhead, num_layers, hidden_dim).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
num_epochs = 10
# 학습 루프
for epoch in range(num_epochs):
model.train()
epoch_loss = 0
with tqdm(train_loader, unit="batch") as tepoch:
for batch_X, batch_y in tepoch:
tepoch.set_description(f"Epoch {epoch + 1}/{num_epochs}")
optimizer.zero_grad()
output = model(batch_X)
loss = criterion(output.squeeze(), batch_y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
tepoch.set_postfix(loss=loss.item())
print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {epoch_loss / len(train_loader):.4f}")
# 모델 평가 및 예측
model.eval()
with torch.no_grad():
predictions = model(X_train).squeeze().cpu().numpy()
# 예측값 역정규화 # 2차원 배열 전
predicted_prices = scaler.inverse_transform(np.concatenate((predictions.reshape(-1, 1), # 원래 스케일로 역정규화하려면 입력 데이터와 같은 모양의 데이터가 필요
# 예측된 데이터의 수, #Volume, Open, High, Low 네 가지 피처 (종가는 제외해야 한다) ,
np.zeros((predictions.shape[0], scaled_data.shape[1] - 1))), axis=1))[:, 0]
# np.concatenate를 사용하여 predictions와 0으로 채운 배열을 함께 합치고 0 번째 컬럼만 가져옴
# 시각화
plt.figure(figsize=(10, 6))
plt.plot(data.index[seq_length:], predicted_prices, label='Predicted Prices')
plt.plot(data.index, data['Close'], label='Actual Prices')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.title('Samsung Electronics Stock Price Prediction 2010~2023')
plt.show()
# 모델 평가 및 예측 (검증 데이터 사용)
model.eval()
with torch.no_grad():
valid_predictions = model(X_valid).squeeze().cpu().numpy()
# 예측값 역정규화
valid_predicted_prices = scaler.inverse_transform(np.concatenate((valid_predictions.reshape(-1, 1),
np.zeros((valid_predictions.shape[0], valid_scaled_data.shape[1] - 1))), axis=1))[:, 0]
# 시각화 (2023년 데이터)
plt.figure(figsize=(10, 6))
plt.plot(valid_data.index[seq_length:], valid_predicted_prices, label='Predicted Prices')
plt.plot(valid_data.index[seq_length:], valid_data['Close'][seq_length:], label='Actual Prices')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.title('Samsung Electronics Stock Price Prediction 2023')
plt.show()