Theme NexT works best with JavaScript enabled
0%

简单情感分类

^ _ ^

Step1: Vacabulary Mapping

Firstly, we need to trans natural signal(token) to integer. So we code a Vacab to record the mapping of token and index.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from collections import defaultdict

class Vocab:
def __init__(self):
self.idx_to_token = []
self.token_to_idx = {}
self.unk = "<unk>"
def build(self, text, min_freq=1, reserved_tokens=None):
token_freqs = defaultdict(int)
for sentence in text:
for token in sentence:
token_freqs[token] += 1
uniq_tokens = ["<unk>"] + (reserved_tokens if reserved_tokens else [])
uniq_tokens += [token for token, freq in token_freqs.items() if freq >= min_freq and token != "<unk>"]
# TODO figure out why return cls rather object
return cls(uniq_tokens)
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, token):
return self.token_to_idx.get(token, self.unk)
def convert_tokens_to_ids(self, tokens):
return [self[token] for token in tokens]
def convert_ids_to_tokens(self, indices):
return [self.idx_to_token[index] for index in indices]

Step2: Word Embedding Layer

Trans a word to a vector which is low dimension, dense, continues is called Embedding.

1
2
3
4
5
6
7
# vacabulary size: 8; embedding vector dimesion: 3
embedding = nn.Embedding(8, 3)

# input size: [2, 4]
input = torch.tensor([[0, 1, 2, 1], [4, 6, 6, 7]], dtype=torch.long)
# output size: [2, 4, 3]
output = embedding(input)

Step3: Word Embedding

Usually, we need to trans word to word embedding vector first, then take the embedding vector as the input of MLP network. But a sequence usually contains many word vectors. The problem is How can we take them as the input vector of MLP?
One way is Concating the n vectos as a new vector whose dimension is $n \times d$. By the way, $d$ is the dimension of word vector.
But this way will cause a problem: The final prediction results are too related with the position of tokens in sequence. If add a new token in the head of sequence, then all parameters of network will change and the results will be much different.

To solve this problem, we can use Bag Of Words. In BOW method, we don’t consider the order of words in the sequence, and simply view the sequence as a set of words. So, we can use aggregation to process many word vectors in one sequence, such as average, sum, max.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import torch
from torch import nn
from torch.nn import functional as F

class MLP(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
super(MLP, self).__init__()
# word vector layer
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# word vector layer --> linear layer
self.linear1 = nn.Linear(embedding_dim, hidden_dim)
# activation layer
self.activate = F.relu
# activate layer --> output layer
self.linear2 = nn.Linear(hidden_dim, num_class)
def forward(self, inputs):
embeddings = self.embeddings(inputs)
# aggregation
embedding = embeddings.mean(dim=1)
hidden = self.activate(self.linear1(embedding))
outputs = self.linear2(hidden)
probs = F.log_softmax(outputs, dim=1)
return probs

mlp = MLP(vocab_size=8, embedding_dim=3, hidden_dim=5, num_class=5)
# input size: [2, 4]
input = torch.tensor([[0, 1, 2, 1], [4, 6, 6, 7]], dtype=torch.long)
# output size: [2, 2]
output = mlp(input)

However, in real situation, input sequences in one batch often have different length. In other word, sometimes the input can not be represented by a matrix because of different sequence length.
A solution for this problem is EmbeddingBag:

  1. Firstly, Concating all the sequences.
  2. Finally, Using Offsets to record every start position of every sequence.
1
2
3
4
5
6
7
8
9
# Suppose every input_i has different length
inputs = [input1, input2, input3, input4]
offsets = [0] + [i.shape[0] for i in inputs]
offsets = torch.tensor(offsets[:-1].cumsum(dim=0))
inputs = torch.cat(inputs)

embedding_bag = nn.EmbeddingBag(num_embeddings=8, embedding_dim=3)
# output size: [sequence_num, embedding_dim]
embeddings = embedding_bag(inputs, offsets)

Moreover, we can use n-gram as a token, which considered local information besides but added data sparsity.

Step4: Data Process

create dataset

1
2
3
4
5
6
7
8
9
10
11
def load_sentebce_polarity():
from nltk.corpus import sentence_polarity
# create vocabulary
vocab = Vocab.build(sentence_polarity.sents())
# create train dataset
train_data = ([(vocab.convert_tokens_to_ids(sentence), 0) for sentence in sentence_polarity.sents(categories='pos')][:4000]
+ [(vocab.convert_tokens_to_ids(sentence), 1) for sentence in sentence_polarity.sents(categories='neg')][:4000])
# create test dataset
test_data = ([(vocab.convert_tokens_to_ids(sentence), 0) for sentence in sentence_polarity.sents(categories='pos')][4000:]
+ [(vocab.convert_tokens_to_ids(sentence), 1) for sentence in sentence_polarity.sents(categories='neg')][4000:])
return train_data, test_data, vocab

create data loader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from torch.utils.data import DataLoader
'''
DataLoader(dataset, batch_size, collate_fn, shuffle)
- dataset: a subclass of torch.utils.data.Dataset
- collate_fn: transform function applies in one mini-batch. e.g. transform origin data to tensor
'''

# dataset example
class BowDataset(Dataset):
def __init__(self, data):
# data is the original data
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]

# collate_fn example
'''
examples is a mini-batch of dastaset
suppose data structure is (sentence, polarity)
through collate_fn, we will transfer it to an input tensor
'''
def collate_fn(examples):
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
offets = [0] + [i.shape[0] for i in inputs]
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
inputs = torch.cat(inputs)
return inputs, offsets, targets

Step5: Create Models

MLP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MLP(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
super(MLP, self).__init__()
# word vector layer
self.embedding_bag = nn.EmbeddingBag(vocab_size, embedding_dim)
# word vector layer --> linear layer
self.linear1 = nn.Linear(embedding_dim, hidden_dim)
# activation layer
self.activate = F.relu
# activate layer --> output layer
self.linear2 = nn.Linear(hidden_dim, num_class)
def forward(self, inputs, offsets):
embedding = self.embedding_bag(inputs, offsets)
hidden = self.activate(self.linear1(embedding))
outputs = self.linear2(hidden)
probs = F.log_softmax(outputs, dim=1)
return probs

CNN

In the BOW(Bag-Of-Word) Machenism, we ignored phrase meaning, such as “喜欢”, “不喜欢”. To avoid this defect, we can use CNN to extract local information.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class CNN(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, filter_size, num_filter, num_class):
super(CNN, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.conv1d = nn.Conv1d(embedding_dim, num_filter, filter_size, padding=1)
self.activate = F.relu
self.linear = nn.Linear(num_filter, num_class)
def forward(self, inputs):
# embedding size: [batch_size, num_seq, embedding_dim]
embedding = self.embedding(inputs)
# after permute(0, 2, 1) size: [batch_size, embedding_dim, num_seq]
# after conv1d, size: [batch_size, num_filter, num_seq+pad-kernel_size+1]
convolution = self.activate(self.conv1d(embedding.permute(0, 2, 1)))
# after pooling, size: [batch_size, num_filter, 1]
pooling = F.max_pool1d(convolution, kernel_size=convolution.shape[2])
# after squeeze, size: [batch_size, num_filter]
# output size: [batch_size, num_class]
outputs = self.linear(pooling.squeeze(dim=2))
# log_prob size: [batch_size, num_class]
log_probs = F.log_softmax(outputs, dim=1)
return log_probs

data process function need to modify some:

1
2
3
4
5
6
7
from torch.nn.utils.rnn import pad_sequence
def collate_fn(examples):
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
# padding every sequence to the same length, default char is '0'
inputs = pad_sequence(inputs, batch_first=True)
return inputs, targets

LSTM

In the BOW(Bag-Of-Word) machenism, we ignore the order of sequence. For example, pharase “张三打李四” is equal to “李四打张三” in the BOW. So BOW is not reasonable in some situations. To avoid this defect, we can use RNN, specially LSTM.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from torch.nn.utils.rnn import pack_padded_sequence

class LSTM(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
super(LSTM, self).__init__()
self.embeddings = nn.Embedding(vocan_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
self.output = nn.Linear(hidden_dim, num_class)

def forward(self, inputs, lengths):
embeddings = self.embeddings(inputs)
x_pack = pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False)
hidden, (hn, cn) = self.lstm(x_pack)
outputs = self.output(hn[-1])
log_probs = F.log_softmax(outputs, dim=1)
return log_probs

new data process function:

1
2
3
4
5
6
7
8
from torch.nn.utils.rnn import pad_sequence

def collate_fn(examples):
lengths = torch.tensor([len(ex[0]) for ex in examples])
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
inputs = pad_sequence(inputs, batch_first=True)
return inputs, lengths, target

Transformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Transformer(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class,
dim_forward=512, num_head=2, num_layers=2, dropout=0.1,
max_len=128, activation="relu"):
super(Transformer, self).__init__()
self.embedding_dim = embedding_dim
self.embeddings = nn.Embedding(vocab_size, embedding_dim)
self.position_embedding = PositionalEncdoing(embedding_dim, dropout, max_len)
encoder_layer = nn.TransformerEncoderLayer(hidden_dim, num_head, dim_feedforward, dropout, activation)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
self.output = nn.Linear(hidden_dim, num_class)
def forward(self, inputs, lengths):
# [batch_size, num_seq, ...] to [num_seq, batch_size, ...]
inputs = torch.transpose(inputs, 0, 1)
hidden_states = self.embeddings(inputs)
hidden_states = self.position_embedding(hidden_states)
# no-attention part would be true
attention_mask = length_to_mask(lengths) == False
hidden_states = self.transformer(hidden_states, src_key_padding_mask=attention_mask)
hidden_states = hidden_states[0, :, :]
output = self.output(hidden_states)
log_probs = F.log_softmax(output, dim=1)
return log_probs

def length_to_mask(lengths):
'''transfer sequence length to mask matrix
@param lengths: [batch, ]
@return [batch, max_len]
'''
max_len = torch.max(lengths)
mask = torch.arange(max_len).expand(lengths.shape[0], max_len) < lengths.unsqueeze(1)
return mask

class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=512):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exo(torch.arange(0, d_model, 2).float() * (-math.log(10000.0)/d_model))
# position encoding for even position
pe[:, 0::2] = torch.sin(position * div_term)
# position encoding for odd position
pe[:, 1:2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
# no gradient operation in positiong embedding
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return x

Step6: Training

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from tqdm.auto import tqdm

# hyper-parameter
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 5

# load data
train_data, test_data, vocab = load_sentence_polarity()
trian_dataset = BowDataset(train_data)
test_dataset = BowDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)

# create model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MLP(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device)

# traning
nll_loss = nn.NLLLoss()
optimizer = optim.Adm(model.parameters(), lr=0.001)
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(trian_data_loader, desc=f"Training Epoch {epoch}"):
inputs, offsets, targets = [x.to(device) for x in batch]
log_probs = model(inputs, offsets)
loss = nll_loss(log_probs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")

Step7: Testing

1
2
3
4
5
6
7
acc = 0
for batch in tqdm(test_data_loader, desc="Testing"):
inputs, offsets, targets = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, offsets)
acc += (output.argmax(dim=1) == target).sum().item()
print(f"Acc: {acc / len(test_data_loader):.2f}")