EPOCHS = 10
for e in range(1, EPOCHS + 1):
train(model, optimizer, train_loader)
val_loss, val_accuracy = evaluate(model, valid_loader)
print("[EPOCH: %d], Validation Loss: %5.2f | Validation Accuracy: %5.2f" % (e, val_loss, val_accuracy))
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
import torchtext
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import random
import requests
import os
import tarfile
import time
import math
import urllib.request
# IMDB 데이터셋 다운로드 및 추출
url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
dataset_path = "./data"
file_path = os.path.join(dataset_path, "aclImdb_v1.tar.gz")
# 데이터 디렉토리 생성
os.makedirs(dataset_path, exist_ok=True)
# 파일 다운로드 및 저장
if not os.path.isfile(file_path):
urllib.request.urlretrieve(url, file_path)
# 데이터셋 압축 풀기
extracted_path = os.path.join(dataset_path, "aclImdb")
if not os.path.exists(extracted_path):
with tarfile.open(file_path, "r:gz") as tar:
tar.extractall(path=dataset_path)
# 데이터 로드
def load_imdb_data(split='train'):
data_path = os.path.join(dataset_path, "aclImdb", split)
data = []
for label in ['pos', 'neg']:
dir_path = os.path.join(data_path, label)
for fname in os.listdir(dir_path):
if fname.endswith(".txt"):
with open(os.path.join(dir_path, fname), "r", encoding="utf-8") as f:
text = f.read()
data.append((label, text))
return data
dataset_path = "./data"
train_data = load_imdb_data('train')
test_data = load_imdb_data('test')
# 토크나이저 정의
tokenizer = get_tokenizer("basic_english")
# 텍스트 전처리
def preprocess_text(text):
tokens = tokenizer(text.lower())
tokens = tokens[:200] + ['<pad>'] * (200 - len(tokens))
return tokens
# 레이블 전처리
def preprocess_label(label):
return 1 if label == "pos" else 0
# 데이터 전처리
train_data = [(preprocess_text(text), preprocess_label(label)) for label, text in train_data]
test_data = [(preprocess_text(text), preprocess_label(label)) for label, text in test_data]
# 훈련 데이터를 훈련 데이터와 검증 데이터로 분할
random.seed(0)
random.shuffle(train_data)
train_size = int(0.8 * len(train_data))
train_data, valid_data = train_data[:train_size], train_data[train_size:]
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')
# 어휘집 생성 함수
def yield_tokens(data_iter):
for text, _ in data_iter:
yield text
TEXT_vocab = build_vocab_from_iterator(yield_tokens(train_data), specials=["<unk>", "<pad>"], min_freq=10, max_tokens=10000)
TEXT_vocab.set_default_index(TEXT_vocab["<unk>"])
LABEL_vocab = {0: "neg", 1: "pos"}
print(f"Unique tokens in TEXT vocabulary: {len(TEXT_vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL_vocab)}")
print(LABEL_vocab)
# 첫 번째 예제 출력
print(train_data[0])
class IMDBDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
text, label = self.data[idx]
return torch.tensor(TEXT_vocab(text)), torch.tensor(label)
train_dataset = IMDBDataset(train_data)
valid_dataset = IMDBDataset(valid_data)
test_dataset = IMDBDataset(test_data)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
BATCH_SIZE = 256
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
class GRUCell(nn.Module):
def __init__(self, input_size, hidden_size):
super(GRUCell, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
# 입력과 은닉 상태를 합친 후, 리셋 게이트, 업데이트 게이트를 위한 가중치 행렬
self.gates_linear = nn.Linear(input_size + hidden_size, 2 * hidden_size)
# 새로운 은닉 상태 후보를 위한 가중치 행렬
self.candidate_linear = nn.Linear(input_size + hidden_size, hidden_size)
def forward(self, x, hidden):
# 입력과 이전 은닉 상태를 연결
combined = torch.cat((x, hidden), 1)
# 리셋 게이트와 업데이트 게이트 계산
gate_values = self.gates_linear(combined)
zr, rr = gate_values.chunk(2, 1)
resetgate = F.sigmoid(rr)
updategate = F.sigmoid(zr)
# 리셋 게이트가 적용된 은닉 상태를 새로운 입력과 결합
reset_hidden = resetgate * hidden
reset_combined = torch.cat((x, reset_hidden), 1)
# 새로운 은닉 상태 후보 계산
candidate = self.candidate_linear(reset_combined)
new_h_candidate = F.tanh(candidate)
# 최종 은닉 상태 계산
hy = (1 - updategate) * hidden + updategate * new_h_candidate
return hy
class GRUModel(nn.Module):
def __init__(self, hidden_dim, n_vocab, embed_dim=128):
super(GRUModel, self).__init__()
self.hidden_dim = hidden_dim
self.embed = nn.Embedding(n_vocab, embed_dim)
self.gru = GRUCell(embed_dim, hidden_dim)
self.fc = nn.Linear(hidden_dim, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.embed(x)
# 초기 은닉 상태 정의
h0 = torch.zeros(x.size(0), self.hidden_dim).to(x.device)
hn = h0
outs = []
# 배치, 타임스텝, 임베딩사이즈
for seq in range(x.size(1)):
# 각 타임스텝에서 GRU 셀을 호출하여 은닉 상태를 업데이트
hn = self.gru(x[:, seq, :], hn)
outs.append(hn) # 각 타임스텝의 은닉 상태를 저장
# 마지막 타임스텝의 은닉 상태를 사용
out = outs[-1]
out = self.fc(out)
out = self.sigmoid(out)
return out
hidden_dim = 128
n_vocab = len(TEXT_vocab)
embed_dim = 256
# GRU 모델 초기화
model = GRUModel(hidden_dim, n_vocab, embed_dim).to(device)
loss_fn = nn.BCELoss()
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
def train(model, optimizer, train_iter):
model.train()
total_loss = 0 # 에포크 동안의 총 손실을 저장할 변수
corrects, total = 0, 0 # 정확도 계산을 위한 변수
for idx, batch in enumerate(train_iter):
x, y = batch[0].to(device), batch[1].to(device).unsqueeze(1).float()
optimizer.zero_grad()
logit = model(x)
loss = loss_fn(logit, y)
loss.backward()
optimizer.step()
total_loss += loss.item() # 배치 손실을 총 손실에 더함
# 정확도 계산
pred = (logit > 0.5).float()
corrects += (pred == y).sum().item()
total += y.size(0)
avg_loss = total_loss / len(train_iter) # 에포크 동안의 평균 손실 계산
accuracy = corrects / total # 에포크 동안의 정확도 계산
print("Train Epoch: {} \tAverage Loss: {:.6f} \tAccuracy: {:.4f}".format(e, avg_loss, accuracy))
def evaluate(model, val_iter):
model.eval()
corrects, total_loss, total = 0, 0, 0
with torch.no_grad():
for x, y in val_iter:
x, y = x.to(device), y.to(device).unsqueeze(1).float()
logit = model(x)
loss = loss_fn(logit, y)
total_loss += loss.item()
total += y.size(0)
pred = (logit > 0.5).float()
corrects += (pred == y).sum().item()
avg_loss = total_loss / len(val_iter)
avg_accuracy = corrects / total
return avg_loss, avg_accuracy
GRU 간단버전
class GRUModel(nn.Module):
def __init__(self, hidden_dim, layer_dim, n_vocab, embed_dim=128, bias=True):
super(GRUModel, self).__init__()
self.hidden_dim = hidden_dim
self.layer_dim = layer_dim
self.embed = nn.Embedding(n_vocab, embed_dim)
self.gru = nn.GRU(embed_dim, hidden_dim, layer_dim, batch_first=True)
self.fc = nn.Linear(hidden_dim, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.embed(x)
h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(device)
out, _ = self.gru(x, h0)
out = self.fc(out[:, -1, :])
out = self.sigmoid(out)
return out
hidden_dim = 128
n_vocab = len(TEXT_vocab)
embed_dim = 256
layer_dim = 2
# GRU 모델 초기화
model = GRUModel(hidden_dim, layer_dim, n_vocab, embed_dim).to(device)
네이버 감성예측
!pip install konlpy
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
from torchtext.vocab import build_vocab_from_iterator
from konlpy.tag import Okt
import random
tokenizer = Okt()
train = pd.read_csv('https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt',
header=0, delimiter='\t', quoting=3).iloc[:3000,:]
# 결측치 제거
train = train.dropna()
# 텍스트 전처리 함수
def preprocess_text(text):
tokens = tokenizer.morphs(text) # 한글 형태소 분석
tokens = tokens[:200] + ['<pad>'] * (200 - len(tokens)) # 최대 길이 200으로 패딩
return tokens
train_data = [(preprocess_text(row[1]), row[2]) for row in train.values]
random.seed(0)
random.shuffle(train_data)
train_size = int(0.8 * len(train_data))
train_data, valid_data = train_data[:train_size], train_data[train_size:]
def yield_tokens(data_iter):
for text, _ in data_iter:
yield text
TEXT_vocab = build_vocab_from_iterator(yield_tokens(train_data), specials=["<unk>", "<pad>"], min_freq=10, max_tokens=10000)
TEXT_vocab.set_default_index(TEXT_vocab["<unk>"])
LABEL_vocab = {0: "neg", 1: "pos"}
print(f"Unique tokens in TEXT vocabulary: {len(TEXT_vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL_vocab)}")
class nsmcDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
text, label = self.data[idx]
return torch.tensor(TEXT_vocab(text)), torch.tensor(label)
train_dataset = nsmcDataset(train_data)
valid_dataset = nsmcDataset(valid_data)
BATCH_SIZE = 256
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)
class GRUModel(nn.Module):
def __init__(self, hidden_dim, n_vocab, embed_dim=128):
super(GRUModel, self).__init__()
self.hidden_dim = hidden_dim
self.embed = nn.Embedding(n_vocab, embed_dim)
self.gru = nn.GRU(embed_dim, hidden_dim, batch_first=True)
self.fc = nn.Linear(hidden_dim, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.embed(x)
_, hn = self.gru(x) # GRU의 최종 은닉 상태를 사용
out = self.fc(hn[-1]) # 최종 은닉 상태를 FC에 전달
out = self.sigmoid(out)
return out
class GRUModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size):
super(GRUModel, self).__init__()
self.em = nn.Embedding(vocab_size, embedding_dim)
self.gru = nn.GRU(embedding_dim, hidden_size, batch_first=True, bidirectional=True, num_layers=2)
self.fc1 = nn.Linear(hidden_size*2, 128)
self.relu = nn.ReLU()
self.dout = nn.Dropout(0.4)
self.fc2 = nn.Linear(128, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.em(x)
out, _ = self.gru(x)
x = self.fc1(out[:, -1, :])
x = self.dout(self.relu(x))
x = self.fc2(x)
x = self.sigmoid(x)
return x
hidden_dim = 128
n_vocab = len(TEXT_vocab)
embed_dim = 256
model = GRUModel(n_vocab, embed_dim, hidden_dim).to(device)
loss_fn = nn.BCELoss()
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
def train(model, optimizer, train_loader):
model.train()
total_loss = 0
corrects, total = 0, 0
for x, y in train_loader:
x, y = x.to(device), y.to(device).float().unsqueeze(1)
optimizer.zero_grad()
logit = model(x)
loss = loss_fn(logit, y)
loss.backward()
optimizer.step()
total_loss += loss.item()
pred = (logit > 0.5).float()
corrects += (pred == y).sum().item()
total += y.size(0)
avg_loss = total_loss / len(train_loader)
accuracy = corrects / total
print(f"Train Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
def evaluate(model, valid_loader):
model.eval()
total_loss = 0
corrects, total = 0, 0
with torch.no_grad():
for x, y in valid_loader:
x, y = x.to(device), y.to(device).float().unsqueeze(1)
logit = model(x)
loss = loss_fn(logit, y)
total_loss += loss.item()
pred = (logit > 0.5).float()
corrects += (pred == y).sum().item()
total += y.size(0)
avg_loss = total_loss / len(valid_loader)
accuracy = corrects / total
return avg_loss, accuracy
EPOCHS = 10
for epoch in range(1, EPOCHS + 1):
print(f"Epoch {epoch}")
train(model, optimizer, train_loader)
val_loss, val_accuracy = evaluate(model, valid_loader)
print(f"Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}")
'알파코 11기 딥러닝 부트 (24.10.14 ~ 25.04.18) > 6. 오프라인 실습' 카테고리의 다른 글
82. Seq2Seq-LSTM (한영번역) (0) | 2025.01.23 |
---|---|
81. GRU로 문장생성 (0) | 2025.01.23 |
79. LSTM으로 감성예측 (0) | 2025.01.22 |
78. RNN 으로 감성예측 (1) | 2025.01.22 |
77. Transformer 로 삼성전자 주 예측하기 (0) | 2025.01.22 |