App下載

pytorch怎么實現(xiàn)textCNN?

陪你演戲 2021-08-07 11:58:28 瀏覽數(shù) (2351)
反饋

CNN算法(卷積神經(jīng)網(wǎng)絡)是機器學習中最出名的算法之一,它的應用是比較廣泛的,廣為人知的是利用CNN來進行圖像識別處理,但是CNN也可以用在文本分類上。接下來這篇文章我們就來了解一下pytorch怎么用CNN實現(xiàn)文本分類吧。

1. 原理

2014年的一篇文章,開創(chuàng)cnn用到文本分類的先河。

Convolutional Neural Networks for Sentence Classification

原理說簡單也簡單,其實就是單層CNN加個全連接層:

CNN算法示例

不過與圖像中的cnn相比,改動為將卷積核的寬固定為一個詞向量的維度,而長度一般取2,3,4,5這樣。

上圖中第一幅圖的每個詞對應的一行為一個詞向量,可以使用word2vec或者glove預訓練得到。本例中使用隨機初始化的向量。

2. 數(shù)據(jù)預處理

手中有三個文件,分別為train.txt,valid.txt,test.txt。其中每一行是一個字符串化的字典,格式為{‘type': ‘xx', ‘text':‘xxxxx'}。

2.1 轉換為csv格式

首先將每個文件轉換為csv文件,分為text和label兩列。一共有4種label,可以轉換為數(shù)字表示。代碼如下:

# 獲取文件內容
def getData(file):
    f = open(file,'r')
    raw_data = f.readlines()
    return raw_data
# 轉換文件格式
def d2csv(raw_data,label_map,name):
    texts = []
    labels = []
    i = 0
    for line in raw_data:
        d = eval(line) #將每行字符串轉換為字典
        if len(d['type']) <= 1 or len(d['text']) <= 1: #篩掉無效數(shù)據(jù)
            continue
        y = label_map[d['type']] #根據(jù)label_map將label轉換為數(shù)字表示
        x = d['text']
        texts.append(x)
        labels.append(y)
        i+=1
        if i%1000 == 0:
            print(i)
    df = pd.DataFrame({'text':texts,'label':labels})
    df.to_csv('data/'+name+'.csv',index=False,sep='	') # 保存文件
label_map = {'執(zhí)行':0,'刑事':1,'民事':2,'行政':3}
train_data = getData('data/train.txt') #22000+行
d2csv(train_data,label_map,'train')
valid_data = getData('data/valid.txt') # 2000+行
d2csv(valid_data,label_map,'valid')
test_data = getData('data/test.txt') # 2000+行
d2csv(test_data,label_map,'test')

2.2 觀察數(shù)據(jù)分布

對于本任務來說,需要觀察每個文本分詞之后的長度。因為每個句子是不一樣長的,所以需要設定一個固定的長度給模型,數(shù)據(jù)中不夠長的部分填充,超出部分舍去。

訓練的時候只有訓練數(shù)據(jù),因此觀察訓練數(shù)據(jù)的文本長度分布即可。分詞可以使用jieba分詞等工具。

train_text = []
for line in train_data:
    d = eval(line)
    t = jieba.cut(d['text'])
    train_text.append(t)
sentence_length = [len(x) for x in train_text] #train_text是train.csv中每一行分詞之后的數(shù)據(jù)
%matplotlib notebook
import matplotlib.pyplot as plt
plt.hist(sentence_length,1000,normed=1,cumulative=True)
plt.xlim(0,1000)
plt.show()

得到長度的分布圖:

長度分布圖

可以看到長度小于1000的文本占據(jù)所有訓練數(shù)據(jù)的80%左右,因此訓練時每個文本固定長度為1000個詞。

2.3 由文本得到訓練用的mini-batch數(shù)據(jù)

目前我們手里的數(shù)據(jù)為csv形式的兩列數(shù)據(jù),一列字符串text,一列數(shù)字label。label部分不需要再處理了,不過text部分跟可訓練的數(shù)據(jù)還差得遠。

假設每個詞對應的詞向量維度為 D i m Dim Dim,每一個樣本的分詞后的長度已知設為 W = 1000 W=1000 W=1000,每個mini-batch的大小為 N N N。那么我們希望得到的是一個個維度為 N ? W ? D i m N*W*Dim N?W?Dim的浮點數(shù)數(shù)據(jù)作為mini-batch輸入到模型。

于是還需要以下幾個步驟:

分詞去除停用詞建立詞匯表(詞匯表是詞語到index的映射,index從0到M,M為已知詞匯的個數(shù),形如{'可愛‘:0, ‘美好':1,…})將分詞且去除停用詞之后的數(shù)據(jù)轉換為下標數(shù)據(jù),維度應該為 N a l l ? W N_{all}*W Nall??W, N a l l N_{all} Nall?是所有樣本的數(shù)量。其中長度不足W的樣本在后面補特定字符,長度超過W的樣本截斷。將數(shù)據(jù)分割為一個個 N ? W N*W N?W大小的mini-batch作為模型的輸入。根據(jù)mini-batch數(shù)據(jù)向詞向量中映射得到 N ? W ? D i m N*W*Dim N?W?Dim大小的最終輸入。(這步在模型中)

看起來復雜哭了,手動處理起來確實有些麻煩。不過后來發(fā)現(xiàn)跟pytorch很相關的有個包torchtext能夠很方便的做到這幾步,所以直接來介紹用這個包的做法。

在貼代碼之前先貼兩個torchtext的教程。torchtext入門教程 還是不懂的話看torchtext文檔。 還還是不懂請直接看源碼。對照教程看以下代碼。

首先是分詞函數(shù),寫為有一個參數(shù)的函數(shù):

def tokenizer(x):
    res = [w for w in jieba.cut(x)]
    return res

接著是停用詞表,在網(wǎng)上找的一個停用詞資源(也可以跳過這步):

stop_words = []
print('build stop words set')
with open('data/stopwords.dat') as f:
    for l in f.readlines():
        stop_words.append(l.strip())

然后設定TEXT和LABEL兩個field。定義以及參數(shù)含義看上面的文檔或教程。

TEXT = data.Field(sequential=True, tokenize=tokenizer,fix_length=1000,stop_words=stop_words)
LABEL = data.Field(sequential=False,use_vocab=False)

讀取文件,分詞,去掉停用詞等等。直接一波帶走:

train,valid,test = data.TabularDataset.splits(path='data',train='train.csv',
                                              validation='valid.csv',test='test.csv',
                                              format='csv',
                                              skip_header=True,csv_reader_params={'delimiter':'	'},
                                              fields=[('text',TEXT),('label',LABEL)])

建立詞匯表:

TEXT.build_vocab(train)

生成iterator形式的mini-batch數(shù)據(jù):

train_iter, val_iter, test_iter = data.Iterator.splits((train,valid,test),
                                                             batch_sizes=(args.batch_size,args.batch_size,args.batch_size),
                                                             device=args.device,
                                                             sort_key=lambda x:len(x.text),
                                                             sort_within_batch=False,
                                                             repeat=False)

That's all! 簡單得令人發(fā)指!雖然為了搞懂這幾個函數(shù)整了大半天。最終的這幾個xxx_iter就會生成我們需要的維度為N ? W N*WN?W的數(shù)據(jù)。

3. 模型

模型其實相對很簡單,只有一個embedding映射,加一層cnn加一個激活函數(shù)以及一個全連接。

不過需要注意使用不同大小的卷積核的寫法。

可以選擇使用多個nn.Conv2d然后手動拼起來,這里使用nn.ModuleList模塊。其實本質上還是使用多個Conv2d然后拼起來。

import torch
import torch.nn as nn
import torch.nn.functional as F
class textCNN(nn.Module):
    def __init__(self, args):
        super(textCNN, self).__init__()
        self.args = args
        
        Vocab = args.embed_num ## 已知詞的數(shù)量
        Dim = args.embed_dim ##每個詞向量長度
        Cla = args.class_num ##類別數(shù)
        Ci = 1 ##輸入的channel數(shù)
        Knum = args.kernel_num ## 每種卷積核的數(shù)量
        Ks = args.kernel_sizes ## 卷積核list,形如[2,3,4]
        
        self.embed = nn.Embedding(Vocab,Dim) ## 詞向量,這里直接隨機
        
        self.convs = nn.ModuleList([nn.Conv2d(Ci,Knum,(K,Dim)) for K in Ks]) ## 卷積層
        self.dropout = nn.Dropout(args.dropout) 
        self.fc = nn.Linear(len(Ks)*Knum,Cla) ##全連接層
        
    def forward(self,x):
        x = self.embed(x) #(N,W,D)
        
        x = x.unsqueeze(1) #(N,Ci,W,D)
        x = [F.relu(conv(x)).squeeze(3) for conv in self.convs] # len(Ks)*(N,Knum,W)
        x = [F.max_pool1d(line,line.size(2)).squeeze(2) for line in x]  # len(Ks)*(N,Knum)
        
        x = torch.cat(x,1) #(N,Knum*len(Ks))
        
        x = self.dropout(x)
        logit = self.fc(x)
        return logit

4. 訓練腳本

import os
import sys
import torch
import torch.autograd as autograd
import torch.nn.functional as F
def train(train_iter, dev_iter, model, args):
    if args.cuda:
        model.cuda(args.device)    
    optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
    
    steps = 0
    best_acc = 0
    last_step = 0
    model.train()
    print('training...')
    for epoch in range(1, args.epochs+1):
        for batch in train_iter:
            feature, target = batch.text, batch.label #(W,N) (N)
            feature.data.t_()
            
            if args.cuda:
                feature, target = feature.cuda(), target.cuda()
            
            optimizer.zero_grad()
            logit = model(feature)
            loss = F.cross_entropy(logit, target)
            loss.backward()
            optimizer.step()
            
            steps += 1
            if steps % args.log_interval == 0:
                result = torch.max(logit,1)[1].view(target.size())
                corrects = (result.data == target.data).sum()
                accuracy = corrects*100.0/batch.batch_size
                sys.stdout.write('
Batch[{}] - loss: {:.6f} acc: {:.4f}$({}/{})'.format(steps,
                                                                                        loss.data.item(),
                                                                                        accuracy,
                                                                                        corrects,
                                                                                        batch.batch_size))
            if steps % args.dev_interval == 0:
                dev_acc = eval(dev_iter, model, args)
                if dev_acc > best_acc:
                    best_acc = dev_acc
                    last_step = steps
                    if args.save_best:
                        save(model,args.save_dir,'best',steps)
                else:
                    if steps - last_step >= args.early_stop:
                        print('early stop by {} steps.'.format(args.early_stop))
            elif steps % args.save_interval == 0:
                save(model,args.save_dir,'snapshot',steps)

訓練腳本中還有設置optimizer以及l(fā)oss的部分。其余部分比較trivial。

模型的保存:

def save(model, save_dir, save_prefix, steps):
    if not os.path.isdir(save_dir):
        os.makedirs(save_dir)
    save_prefix = os.path.join(save_dir,save_prefix)
    save_path = '{}_steps_{}.pt'.format(save_prefix,steps)
    torch.save(model.state_dict(),save_path)

eval函數(shù),用來評估驗證集與測試集合上的準確率acc。

def eval(data_iter, model, args):
    model.eval()
    corrects, avg_loss = 0,0
    for batch in data_iter:
        feature, target = batch.text, batch.label
        feature.data.t_()
        
        if args.cuda:
            feature, target = feature.cuda(), target.cuda()
        
        logit = model(feature)
        loss = F.cross_entropy(logit,target)
        
        avg_loss += loss.data[0]
        result = torch.max(logit,1)[1]
        corrects += (result.view(target.size()).data == target.data).sum()
    
    size = len(data_iter.dataset)
    avg_loss /= size 
    accuracy = 100.0 * corrects/size
    print('
Evaluation - loss: {:.6f} acc: {:.4f}%({}/{}) 
'.format(avg_loss,accuracy,corrects,size))
    return accuracy

5. main函數(shù)

這暫時就不貼了??梢詤⒖枷乱徊糠纸o出的github。

最終在測試集合上accuracy為97%(畢竟只是四分類)。

但是遇到個問題就是隨著accuracy上升,loss也在迅速增大。

測試結果

在一番探究之后大致得出結論就是,這樣是沒問題的。比如在本例中是個四分類,加入全連接層輸出的結果是[-10000,0,0,10000],而正確分類是0。

那么這就是個錯誤的結果。計算一下這個單個樣例的loss。先算softmax,約等于[ e ? 20000 , e ? 10000 , e ? 10000 , 1 e^{-20000},e^{-10000},e^{-10000},1 e?20000,e?10000,e?10000,1]。真實的label為[1,0,0,0],因此交叉熵為20000。

所以我們發(fā)現(xiàn)這一個錯誤樣例的loss就會這么大。最終的loss大一些也是正常的。

不過為什么隨著accuracy接近100%而導致loss迅速增加這個問題還需要進一步研究。大概是因為隨著accuracy升高導致結果接近訓練集的分布,這樣與驗證集或測試集的分布產(chǎn)生比較極端差別的個例會增加。

6.引用

代碼部分參考了很多這位老哥的github,在此感謝。跟他不一樣的地方主要是數(shù)據(jù)處理部分。

以上就是pytorch怎么用CNN實現(xiàn)文本分類的全部內容,希望能給大家一個參考,也希望大家多多支持W3Cschool。


0 人點贊