导入包并打开数据看看
#SVM文本分类
import codecs
import os
import jieba
train_file='cnews.train.txt' # training data file name
test_file='cnews.test.txt' # test data file name
vocab='cnews_dict.txt' # dictionary
with codecs.open(train_file, 'r', 'utf-8') as f:
lines = f.readlines()
# print sample content
label, content = lines[0].strip('\r\n').split('\t')
print(content)
效果
在前面的基础上加入结巴分词,并用/讲句子切分
#SVM文本分类
import codecs
import os
import jieba
train_file='cnews.train.txt' # training data file name
test_file='cnews.test.txt' # test data file name
vocab='cnews_dict.txt' # dictionary
with codecs.open(train_file, 'r', 'utf-8') as f:
lines = f.readlines()
# print sample content
label, content = lines[0].strip('\r\n').split('\t')
#print(content)
# print word segment results
segment = jieba.cut(content)
print('/'.join(segment))
效果
整理一波,载入训练和测试数据并进行分词。
#SVM文本分类
import codecs
import os
import jieba
train_file='cnews.train.txt' # training data file name
test_file='cnews.test.txt' # test data file name
vocab='cnews_dict.txt' # dictionary
# cut data
def process_line(idx, line):
data = tuple(line.strip('\r\n').split('\t'))
if not len(data)==2:
return None
content_segged = list(jieba.cut(data[1]))
if idx % 1000 == 0:
print('line number: {}'.format(idx))
return (data[0], content_segged)
# data loading method
def load_data(file):
with codecs.open(file, 'r', 'utf-8') as f:
lines = f.readlines()
data_records = [process_line(idx, line) for idx, line in enumerate(lines)]
data_records = [data for data in data_records if data is not None]
return data_records
# load and process training data
train_data = load_data(train_file)
print('first training data: label {} segment {}'.format(train_data[0][0], '/'.join(train_data[0][1])))
# load and process testing data
test_data = load_data(test_file)
print('first testing data: label {} segment {}'.format(test_data[0][0], '/'.join(test_data[0][1])))
效果
分完词后构建词典
#SVM文本分类
import codecs
import os
import jieba
train_file='cnews.train.txt' # training data file name
test_file='cnews.test.txt' # test data file name
vocab='cnews_dict.txt' # dictionary
#with codecs.open(train_file, 'r', 'utf-8') as f:
# lines = f.readlines()
#
## print sample content
#label, content = lines[0].strip('\r\n').split('\t')
##print(content)
#
#
#
## print word segment results
#segment = jieba.cut(content)
##print('/'.join(segment))
# cut data
def process_line(idx, line):
data = tuple(line.strip('\r\n').split('\t'))
if not len(data)==2:
return None
content_segged = list(jieba.cut(data[1]))
if idx % 1000 == 0:
print('line number: {}'.format(idx))
return (data[0], content_segged)
# data loading method
def load_data(file):
with codecs.open(file, 'r', 'utf-8') as f:
lines = f.readlines()
data_records = [process_line(idx, line) for idx, line in enumerate(lines)]
data_records = [data for data in data_records if data is not None]
return data_records
# load and process training data
train_data = load_data(train_file)
#print('first training data: label {} segment {}'.format(train_data[0][0], '/'.join(train_data[0][1])))
# load and process testing data
test_data = load_data(test_file)
#print('first testing data: label {} segment {}'.format(test_data[0][0], '/'.join(test_data[0][1])))
#分完词后,构建词典
def build_vocab(train_data, thresh):
vocab = {'<UNK>': 0}
word_count = {} # word frequency
for idx, data in enumerate(train_data):
content = data[1]
for word in content:
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
word_list = [(k, v) for k, v in word_count.items()]
print('word list length: {}'.format(len(word_list)))
word_list.sort(key = lambda x : x[1], reverse = True) # sorted by word frequency
word_list_filtered = [word for word in word_list if word[1] > thresh]
print('word list length after filtering: {}'.format(len(word_list_filtered)))
# construct vocab
for word in word_list_filtered:
vocab[word[0]] = len(vocab)
print('vocab size: {}'.format(len(vocab))) # vocab size is word list size +1 due to unk token
return vocab
vocab = build_vocab(train_data, 1)
print(vocab)
效果
标签本身也要有词典
#SVM文本分类
import codecs
import os
import jieba
train_file='cnews.train.txt' # training data file name
test_file='cnews.test.txt' # test data file name
vocab='cnews_dict.txt' # dictionary
#with codecs.open(train_file, 'r', 'utf-8') as f:
# lines = f.readlines()
#
## print sample content
#label, content = lines[0].strip('\r\n').split('\t')
##print(content)
#
#
#
## print word segment results
#segment = jieba.cut(content)
##print('/'.join(segment))
# cut data
def process_line(idx, line):
data = tuple(line.strip('\r\n').split('\t'))
if not len(data)==2:
return None
content_segged = list(jieba.cut(data[1]))
if idx % 1000 == 0:
print('line number: {}'.format(idx))
return (data[0], content_segged)
# data loading method
def load_data(file):
with codecs.open(file, 'r', 'utf-8') as f:
lines = f.readlines()
data_records = [process_line(idx, line) for idx, line in enumerate(lines)]
data_records = [data for data in data_records if data is not None]
return data_records
# load and process training data
train_data = load_data(train_file)
#print('first training data: label {} segment {}'.format(train_data[0][0], '/'.join(train_data[0][1])))
# load and process testing data
test_data = load_data(test_file)
#print('first testing data: label {} segment {}'.format(test_data[0][0], '/'.join(test_data[0][1])))
#分完词后,构建词典
def build_vocab(train_data, thresh):
vocab = {'<UNK>': 0}
word_count = {} # word frequency
for idx, data in enumerate(train_data):
content = data[1]
for word in content:
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
word_list = [(k, v) for k, v in word_count.items()]
print('word list length: {}'.format(len(word_list)))
word_list.sort(key = lambda x : x[1], reverse = True) # sorted by word frequency
word_list_filtered = [word for word in word_list if word[1] > thresh]
print('word list length after filtering: {}'.format(len(word_list_filtered)))
# construct vocab
for word in word_list_filtered:
vocab[word[0]] = len(vocab)
print('vocab size: {}'.format(len(vocab))) # vocab size is word list size +1 due to unk token
return vocab
vocab = build_vocab(train_data, 1)
#print(vocab)
#标签本身也要有词典
def build_label_vocab(cate_file):
label_vocab = {}
with codecs.open(cate_file, 'r', 'utf-8') as f:
for lines in f:
line = lines.strip().split('\t')
label_vocab[line[0]] = int(line[1])
return label_vocab
label_vocab = build_label_vocab('cnews.category.txt')
print('label vocab: {}'.format(label_vocab))
效果
将数据构造成符合该模型格式的样子
#SVM文本分类
import codecs
import os
import jieba
train_file='cnews.train.txt' # training data file name
test_file='cnews.test.txt' # test data file name
vocab='cnews_dict.txt' # dictionary
#with codecs.open(train_file, 'r', 'utf-8') as f:
# lines = f.readlines()
#
## print sample content
#label, content = lines[0].strip('\r\n').split('\t')
##print(content)
#
#
#
## print word segment results
#segment = jieba.cut(content)
##print('/'.join(segment))
# cut data
def process_line(idx, line):
data = tuple(line.strip('\r\n').split('\t'))
if not len(data)==2:
return None
content_segged = list(jieba.cut(data[1]))
if idx % 1000 == 0:
print('line number: {}'.format(idx))
return (data[0], content_segged)
# data loading method
def load_data(file):
with codecs.open(file, 'r', 'utf-8') as f:
lines = f.readlines()
data_records = [process_line(idx, line) for idx, line in enumerate(lines)]
data_records = [data for data in data_records if data is not None]
return data_records
# load and process training data
train_data = load_data(train_file)
#print('first training data: label {} segment {}'.format(train_data[0][0], '/'.join(train_data[0][1])))
# load and process testing data
test_data = load_data(test_file)
#print('first testing data: label {} segment {}'.format(test_data[0][0], '/'.join(test_data[0][1])))
#分完词后,构建词典
def build_vocab(train_data, thresh):
vocab = {'<UNK>': 0}
word_count = {} # word frequency
for idx, data in enumerate(train_data):
content = data[1]
for word in content:
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
word_list = [(k, v) for k, v in word_count.items()]
print('word list length: {}'.format(len(word_list)))
word_list.sort(key = lambda x : x[1], reverse = True) # sorted by word frequency
word_list_filtered = [word for word in word_list if word[1] > thresh]
print('word list length after filtering: {}'.format(len(word_list_filtered)))
# construct vocab
for word in word_list_filtered:
vocab[word[0]] = len(vocab)
print('vocab size: {}'.format(len(vocab))) # vocab size is word list size +1 due to unk token
return vocab
vocab = build_vocab(train_data, 1)
#print(vocab)
#标签本身也要有词典
def build_label_vocab(cate_file):
label_vocab = {}
with codecs.open(cate_file, 'r', 'utf-8') as f:
for lines in f:
line = lines.strip().split('\t')
label_vocab[line[0]] = int(line[1])
return label_vocab
label_vocab = build_label_vocab('cnews.category.txt')
#print('label vocab: {}'.format(label_vocab))
#只考虑bag of words(词袋),所以词的顺序被排除。
def construct_trainable_matrix(corpus, vocab, label_vocab, out_file):
records = []
for idx, data in enumerate(corpus):
if idx % 1000 == 0:
print('process {} data'.format(idx))
label = str(label_vocab[data[0]]) # label id
token_dict = {}
for token in data[1]:
token_id = vocab.get(token, 0)
if token_id in token_dict:
token_dict[token_id] += 1
else:
token_dict[token_id] = 1
feature = [str(int(k) + 1) + ':' + str(v) for k,v in token_dict.items()]
feature_text = ' '.join(feature)
records.append(label + ' ' + feature_text)
with open(out_file, 'w') as f:
f.write('\n'.join(records))
construct_trainable_matrix(train_data, vocab, label_vocab, 'train.svm.txt')
construct_trainable_matrix(test_data, vocab, label_vocab, 'test.svm.txt')
现在开始训练
#SVM文本分类
import codecs
import os
import jieba
train_file='cnews.train.txt' # training data file name
test_file='cnews.test.txt' # test data file name
vocab='cnews_dict.txt' # dictionary
#with codecs.open(train_file, 'r', 'utf-8') as f:
# lines = f.readlines()
#
## print sample content
#label, content = lines[0].strip('\r\n').split('\t')
##print(content)
#
#
#
## print word segment results
#segment = jieba.cut(content)
##print('/'.join(segment))
# cut data
def process_line(idx, line):
data = tuple(line.strip('\r\n').split('\t'))
if not len(data)==2:
return None
content_segged = list(jieba.cut(data[1]))
if idx % 1000 == 0:
print('line number: {}'.format(idx))
return (data[0], content_segged)
# data loading method
def load_data(file):
with codecs.open(file, 'r', 'utf-8') as f:
lines = f.readlines()
data_records = [process_line(idx, line) for idx, line in enumerate(lines)]
data_records = [data for data in data_records if data is not None]
return data_records
# load and process training data
train_data = load_data(train_file)
#print('first training data: label {} segment {}'.format(train_data[0][0], '/'.join(train_data[0][1])))
# load and process testing data
test_data = load_data(test_file)
#print('first testing data: label {} segment {}'.format(test_data[0][0], '/'.join(test_data[0][1])))
#分完词后,构建词典
def build_vocab(train_data, thresh):
vocab = {'<UNK>': 0}
word_count = {} # word frequency
for idx, data in enumerate(train_data):
content = data[1]
for word in content:
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
word_list = [(k, v) for k, v in word_count.items()]
print('word list length: {}'.format(len(word_list)))
word_list.sort(key = lambda x : x[1], reverse = True) # sorted by word frequency
word_list_filtered = [word for word in word_list if word[1] > thresh]
print('word list length after filtering: {}'.format(len(word_list_filtered)))
# construct vocab
for word in word_list_filtered:
vocab[word[0]] = len(vocab)
print('vocab size: {}'.format(len(vocab))) # vocab size is word list size +1 due to unk token
return vocab
vocab = build_vocab(train_data, 1)
#print(vocab)
#标签本身也要有词典
def build_label_vocab(cate_file):
label_vocab = {}
with codecs.open(cate_file, 'r', 'utf-8') as f:
for lines in f:
line = lines.strip().split('\t')
label_vocab[line[0]] = int(line[1])
return label_vocab
label_vocab = build_label_vocab('cnews.category.txt')
#print('label vocab: {}'.format(label_vocab))
#只考虑bag of words(词袋),所以词的顺序被排除。
def construct_trainable_matrix(corpus, vocab, label_vocab, out_file):
records = []
for idx, data in enumerate(corpus):
if idx % 1000 == 0:
print('process {} data'.format(idx))
label = str(label_vocab[data[0]]) # label id
token_dict = {}
for token in data[1]:
token_id = vocab.get(token, 0)
if token_id in token_dict:
token_dict[token_id] += 1
else:
token_dict[token_id] = 1
feature = [str(int(k) + 1) + ':' + str(v) for k,v in token_dict.items()]
feature_text = ' '.join(feature)
records.append(label + ' ' + feature_text)
with open(out_file, 'w') as f:
f.write('\n'.join(records))
construct_trainable_matrix(train_data, vocab, label_vocab, 'train.svm.txt')
construct_trainable_matrix(test_data, vocab, label_vocab, 'test.svm.txt')
#训练流程
from libsvm import svm
from libsvm.svmutil import svm_read_problem,svm_train,svm_predict,svm_save_model,svm_load_model
# train svm
train_label, train_feature = svm_read_problem('train.svm.txt')
print(train_label[0], train_feature[0])
model=svm_train(train_label,train_feature,'-s 0 -c 5 -t 0 -g 0.5 -e 0.1')
# predict
test_label, test_feature = svm_read_problem('test.svm.txt')
print(test_label[0], test_feature[0])
p_labs, p_acc, p_vals = svm_predict(test_label, test_feature, model)
print('accuracy: {}'.format(p_acc))
效果