EPOCHS = 10
for e in range(1, EPOCHS + 1):
    train(model, optimizer, train_loader)
    val_loss, val_accuracy = evaluate(model, valid_loader)
    print("[EPOCH: %d], Validation Loss: %5.2f | Validation Accuracy: %5.2f" % (e, val_loss, val_accuracy))​
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F

import torchtext
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import random
import requests
import os
import tarfile
import time
import math
import urllib.request

# IMDB 데이터셋 다운로드 및 추출
url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
dataset_path = "./data"
file_path = os.path.join(dataset_path, "aclImdb_v1.tar.gz")

# 데이터 디렉토리 생성
os.makedirs(dataset_path, exist_ok=True)

# 파일 다운로드 및 저장
if not os.path.isfile(file_path):
    urllib.request.urlretrieve(url, file_path)

# 데이터셋 압축 풀기
extracted_path = os.path.join(dataset_path, "aclImdb")
if not os.path.exists(extracted_path):
    with tarfile.open(file_path, "r:gz") as tar:
        tar.extractall(path=dataset_path)

# 데이터 로드
def load_imdb_data(split='train'):
    data_path = os.path.join(dataset_path, "aclImdb", split)
    data = []
    for label in ['pos', 'neg']:
        dir_path = os.path.join(data_path, label)
        for fname in os.listdir(dir_path):
            if fname.endswith(".txt"):
                with open(os.path.join(dir_path, fname), "r", encoding="utf-8") as f:
                    text = f.read()
                    data.append((label, text))
    return data

dataset_path = "./data"
train_data = load_imdb_data('train')
test_data = load_imdb_data('test')

# 토크나이저 정의
tokenizer = get_tokenizer("basic_english")

# 텍스트 전처리
def preprocess_text(text):
    tokens = tokenizer(text.lower())
    tokens = tokens[:200] + ['<pad>'] * (200 - len(tokens))
    return tokens

# 레이블 전처리
def preprocess_label(label):
    return 1 if label == "pos" else 0

# 데이터 전처리
train_data = [(preprocess_text(text), preprocess_label(label)) for label, text in train_data]
test_data = [(preprocess_text(text), preprocess_label(label)) for label, text in test_data]

# 훈련 데이터를 훈련 데이터와 검증 데이터로 분할
random.seed(0)
random.shuffle(train_data)
train_size = int(0.8 * len(train_data))
train_data, valid_data = train_data[:train_size], train_data[train_size:]

print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

# 어휘집 생성 함수
def yield_tokens(data_iter):
    for text, _ in data_iter:
        yield text


TEXT_vocab = build_vocab_from_iterator(yield_tokens(train_data), specials=["<unk>", "<pad>"], min_freq=10, max_tokens=10000)
TEXT_vocab.set_default_index(TEXT_vocab["<unk>"])

LABEL_vocab = {0: "neg", 1: "pos"}

print(f"Unique tokens in TEXT vocabulary: {len(TEXT_vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL_vocab)}")

print(LABEL_vocab)

# 첫 번째 예제 출력
print(train_data[0])

class IMDBDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text, label = self.data[idx]
        return torch.tensor(TEXT_vocab(text)), torch.tensor(label)


train_dataset = IMDBDataset(train_data)
valid_dataset = IMDBDataset(valid_data)
test_dataset = IMDBDataset(test_data)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE = 256

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
class GRUCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(GRUCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 입력과 은닉 상태를 합친 후, 리셋 게이트, 업데이트 게이트를 위한 가중치 행렬
        self.gates_linear = nn.Linear(input_size + hidden_size, 2 * hidden_size)

        # 새로운 은닉 상태 후보를 위한 가중치 행렬
        self.candidate_linear = nn.Linear(input_size + hidden_size, hidden_size)

    def forward(self, x, hidden):
        # 입력과 이전 은닉 상태를 연결
        combined = torch.cat((x, hidden), 1)

        # 리셋 게이트와 업데이트 게이트 계산
        gate_values = self.gates_linear(combined)
        zr, rr = gate_values.chunk(2, 1)

        resetgate = F.sigmoid(rr)
        updategate = F.sigmoid(zr)

        # 리셋 게이트가 적용된 은닉 상태를 새로운 입력과 결합
        reset_hidden = resetgate * hidden
        reset_combined = torch.cat((x, reset_hidden), 1)

        # 새로운 은닉 상태 후보 계산
        candidate = self.candidate_linear(reset_combined)
        new_h_candidate = F.tanh(candidate)

        # 최종 은닉 상태 계산
        hy = (1 - updategate) * hidden + updategate * new_h_candidate

        return hy
class GRUModel(nn.Module):
    def __init__(self, hidden_dim, n_vocab, embed_dim=128):
        super(GRUModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.embed = nn.Embedding(n_vocab, embed_dim)
        self.gru = GRUCell(embed_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embed(x)
        # 초기 은닉 상태 정의
        h0 = torch.zeros(x.size(0), self.hidden_dim).to(x.device)

        hn = h0
        outs = []
        # 배치, 타임스텝, 임베딩사이즈
        for seq in range(x.size(1)):
            # 각 타임스텝에서 GRU 셀을 호출하여 은닉 상태를 업데이트
            hn = self.gru(x[:, seq, :], hn)
            outs.append(hn) # 각 타임스텝의 은닉 상태를 저장

        # 마지막 타임스텝의 은닉 상태를 사용
        out = outs[-1]
        out = self.fc(out)
        out = self.sigmoid(out)
        return out
hidden_dim = 128
n_vocab = len(TEXT_vocab)
embed_dim = 256

# GRU 모델 초기화
model = GRUModel(hidden_dim, n_vocab, embed_dim).to(device)

loss_fn = nn.BCELoss()
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
def train(model, optimizer, train_iter):
    model.train()
    total_loss = 0  # 에포크 동안의 총 손실을 저장할 변수
    corrects, total = 0, 0  # 정확도 계산을 위한 변수

    for idx, batch in enumerate(train_iter):
        x, y = batch[0].to(device), batch[1].to(device).unsqueeze(1).float()
        optimizer.zero_grad()
        logit = model(x)
        loss = loss_fn(logit, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()  # 배치 손실을 총 손실에 더함

        # 정확도 계산
        pred = (logit > 0.5).float()
        corrects += (pred == y).sum().item()
        total += y.size(0)

    avg_loss = total_loss / len(train_iter)  # 에포크 동안의 평균 손실 계산
    accuracy = corrects / total  # 에포크 동안의 정확도 계산
    print("Train Epoch: {} \tAverage Loss: {:.6f} \tAccuracy: {:.4f}".format(e, avg_loss, accuracy))
def evaluate(model, val_iter):
    model.eval()
    corrects, total_loss, total = 0, 0, 0

    with torch.no_grad():
        for x, y in val_iter:
            x, y = x.to(device), y.to(device).unsqueeze(1).float()
            logit = model(x)
            loss = loss_fn(logit, y)
            total_loss += loss.item()
            total += y.size(0)
            pred = (logit > 0.5).float()
            corrects += (pred == y).sum().item()

    avg_loss = total_loss / len(val_iter)
    avg_accuracy = corrects / total
    return avg_loss, avg_accuracy

 

GRU 간단버전

class GRUModel(nn.Module):
    def __init__(self, hidden_dim, layer_dim, n_vocab, embed_dim=128, bias=True):
        super(GRUModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.embed = nn.Embedding(n_vocab, embed_dim)
        self.gru = nn.GRU(embed_dim, hidden_dim, layer_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        x = self.embed(x)
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(device)
        out, _ = self.gru(x, h0)
        out = self.fc(out[:, -1, :])
        out = self.sigmoid(out)
        return out
hidden_dim = 128
n_vocab = len(TEXT_vocab)
embed_dim = 256
layer_dim = 2
# GRU 모델 초기화
model = GRUModel(hidden_dim, layer_dim, n_vocab, embed_dim).to(device)

 

네이버 감성예측

!pip install konlpy
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
from torchtext.vocab import build_vocab_from_iterator
from konlpy.tag import Okt
import random

tokenizer = Okt()

train = pd.read_csv('https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt',
                    header=0, delimiter='\t', quoting=3).iloc[:3000,:]

# 결측치 제거
train = train.dropna()

# 텍스트 전처리 함수
def preprocess_text(text):
    tokens = tokenizer.morphs(text)  # 한글 형태소 분석

    tokens = tokens[:200] + ['<pad>'] * (200 - len(tokens))  # 최대 길이 200으로 패딩
    return tokens


train_data = [(preprocess_text(row[1]), row[2]) for row in train.values]

random.seed(0)
random.shuffle(train_data)
train_size = int(0.8 * len(train_data))
train_data, valid_data = train_data[:train_size], train_data[train_size:]

def yield_tokens(data_iter):
    for text, _ in data_iter:
        yield text

TEXT_vocab = build_vocab_from_iterator(yield_tokens(train_data), specials=["<unk>", "<pad>"], min_freq=10, max_tokens=10000)
TEXT_vocab.set_default_index(TEXT_vocab["<unk>"])

LABEL_vocab = {0: "neg", 1: "pos"}

print(f"Unique tokens in TEXT vocabulary: {len(TEXT_vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL_vocab)}")

class nsmcDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text, label = self.data[idx]
        return torch.tensor(TEXT_vocab(text)), torch.tensor(label)

train_dataset = nsmcDataset(train_data)
valid_dataset = nsmcDataset(valid_data)

BATCH_SIZE = 256
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)


class GRUModel(nn.Module):
    def __init__(self, hidden_dim, n_vocab, embed_dim=128):
        super(GRUModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.embed = nn.Embedding(n_vocab, embed_dim)
        self.gru = nn.GRU(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embed(x)
        _, hn = self.gru(x)  # GRU의 최종 은닉 상태를 사용
        out = self.fc(hn[-1])  # 최종 은닉 상태를 FC에 전달
        out = self.sigmoid(out)
        return out

class GRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size):
        super(GRUModel, self).__init__()
        self.em = nn.Embedding(vocab_size, embedding_dim)
        self.gru = nn.GRU(embedding_dim, hidden_size, batch_first=True, bidirectional=True, num_layers=2)
        self.fc1 = nn.Linear(hidden_size*2, 128)
        self.relu = nn.ReLU()
        self.dout = nn.Dropout(0.4)
        self.fc2 = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.em(x)
        out, _ = self.gru(x)
        x = self.fc1(out[:, -1, :])
        x = self.dout(self.relu(x))
        x = self.fc2(x)
        x = self.sigmoid(x)
        return x

hidden_dim = 128
n_vocab = len(TEXT_vocab)
embed_dim = 256

model = GRUModel(n_vocab, embed_dim, hidden_dim).to(device)
loss_fn = nn.BCELoss()
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

def train(model, optimizer, train_loader):
    model.train()
    total_loss = 0
    corrects, total = 0, 0

    for x, y in train_loader:
        x, y = x.to(device), y.to(device).float().unsqueeze(1)
        optimizer.zero_grad()
        logit = model(x)
        loss = loss_fn(logit, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        pred = (logit > 0.5).float()
        corrects += (pred == y).sum().item()
        total += y.size(0)

    avg_loss = total_loss / len(train_loader)
    accuracy = corrects / total
    print(f"Train Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")

def evaluate(model, valid_loader):
    model.eval()
    total_loss = 0
    corrects, total = 0, 0

    with torch.no_grad():
        for x, y in valid_loader:
            x, y = x.to(device), y.to(device).float().unsqueeze(1)
            logit = model(x)
            loss = loss_fn(logit, y)
            total_loss += loss.item()
            pred = (logit > 0.5).float()
            corrects += (pred == y).sum().item()
            total += y.size(0)

    avg_loss = total_loss / len(valid_loader)
    accuracy = corrects / total
    return avg_loss, accuracy

EPOCHS = 10
for epoch in range(1, EPOCHS + 1):
    print(f"Epoch {epoch}")
    train(model, optimizer, train_loader)
    val_loss, val_accuracy = evaluate(model, valid_loader)
    print(f"Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}")

+ Recent posts