coding: utf-8
import sys
from collections import Counter
import numpy as np
import tensorflow.keras as kr
import os
if sys.version_info[0] > 2:
is_py3 = True
else:
reload(sys)
sys.setdefaultencoding(“utf-8”)
is_py3 = False
def open_file(filename, mode=‘r’):
“”"
常用文件操作,可在python2和python3间切换.
mode: ‘r’ or ‘w’ for read or write
“”"
if is_py3:
return open(filename, mode, encoding=‘utf-8’, errors=‘ignore’)
else:
return open(filename, mode)
def read_file(filename):
“”“读取单个文件,文件中包含多个类别”""
contents = []
labels = []
with open_file(filename) as f:
for line in f:
try:
raw = line.strip().split("\t")
content = raw[1].split(’ ')
if content:
contents.append(content)
labels.append(raw[0])
except:
pass
return contents, labels
def read_single_file(filename):
“”“读取单个文件,文件为单一类别”""
contents = []
label = filename.split(’/’)[-1].split(’.’)[0]
with open_file(filename) as f:
for line in f:
try:
content = line.strip().split(’ ')
if content:
contents.append(content)
except:
pass
return contents, label
def read_files(dirname):
“”“读取文件夹”""
contents = []
labels = []
files = [f for f in os.listdir(dirname) if f.endswith(".txt")]
for filename in files:
content, label = read_single_file(os.path.join(dirname, filename))
contents.extend(content)
labels.extend([label]*len(content))
return contents, labels
def build_vocab(train_dir, vocab_file, vocab_size=5000):
“”“根据训练集构建词汇表,存储”""
data_train, _ = read_files(train_dir)
all_data = []
for content in data_train:
all_data.extend(content)
counter = Counter(all_data)
count_pairs = counter.most_common(vocab_size - 1)
words, _ = list(zip(*count_pairs))
# 添加一个 <PAD> 来将所有文本pad为同一长度
words = ['<PAD>'] + list(words)
open_file(vocab_file, mode='w').write('\n'.join(words) + '\n')
def read_vocab(vocab_file):
“”“读取词汇表”""
# words = open_file(vocab_dir).read().strip().split(’\n’)
with open_file(vocab_file) as fp:
# 如果是py2 则每个值都转化为unicode
words = [_.strip() for _ in fp.readlines()]
word_to_id = dict(zip(words, range(len(words))))
return words, word_to_id
def read_category():
“”“读取分类,编码”""
categories = [‘car’, ‘entertainment’, ‘military’, ‘sports’, ‘technology’]
cat_to_id = dict(zip(categories, range(len(categories))))
return categories, cat_to_id
def encode_cate(content, words):
“”“将id表示的内容转换为文字”""
return [(words[x] if x in words else 40000) for x in content]
def encode_sentences(contents, words):
“”“将id表示的内容转换为文字”""
return [encode_cate(x,words) for x in contents]
def process_file(filename, word_to_id, cat_to_id, max_length=600):
“”“将文件转换为id表示”""
contents, labels = read_file(filename)
data_id, label_id = [], []
for i in range(len(contents)):
data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id])
label_id.append(cat_to_id[labels[i]])
# 使用keras提供的pad_sequences来将文本pad为固定长度
x_pad = kr.preprocessing.sequence.pad_sequences(data_id, max_length)
y_pad = kr.utils.to_categorical(label_id, num_classes=len(cat_to_id)) # 将标签转换为one-hot表示
return x_pad, y_pad
def batch_iter(x, y, batch_size=64):
“”“生成批次数据”""
data_len = len(x)
num_batch = int((data_len - 1) / batch_size) + 1
indices = np.random.permutation(np.arange(data_len))
x_shuffle = x[indices]
y_shuffle = y[indices]
for i in range(num_batch):
start_id = i * batch_size
end_id = min((i + 1) * batch_size, data_len)
yield x_shuffle[start_id:end_id], y_shuffle[start_id:end_id]
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Embedding, Dense, Conv1D, GlobalMaxPooling1D, Concatenate, Dropout
class TextCNN(object):
def init(self, maxlen, max_features, embedding_dims,
class_num=5,
last_activation=‘softmax’):
self.maxlen = maxlen
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
input = Input((self.maxlen,))
embedding = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen)(input)
convs = []
for kernel_size in [3, 4, 5]:
c = Conv1D(128, kernel_size, activation='relu')(embedding)
c = GlobalMaxPooling1D()(c)
convs.append(c)
x = Concatenate()(convs)
output = Dense(self.class_num, activation=self.last_activation)(x)
model = Model(inputs=input, outputs=output)
return model
from tensorflow.keras.preprocessing import sequence
import random
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
from utils import *
路径等配置
data_dir = “./processed_data”
vocab_file = “./vocab/vocab.txt”
vocab_size = 40000
神经网络配置
max_features = 40001
maxlen = 100
batch_size = 256
embedding_dims = 50
epochs = 8
print(‘数据预处理与加载数据…’)
如果不存在词汇表,重建
if not os.path.exists(vocab_file):
build_vocab(data_dir, vocab_file, vocab_size)
获得 词汇/类别 与id映射字典
categories, cat_to_id = read_category()
words, word_to_id = read_vocab(vocab_file)
全部数据
x, y = read_files(data_dir)
data = list(zip(x,y))
del x,y
乱序
random.shuffle(data)
切分训练集和测试集
train_data, test_data = train_test_split(data)
对文本的词id和类别id进行编码
x_train = encode_sentences([content[0] for content in train_data], word_to_id)
y_train = to_categorical(encode_cate([content[1] for content in train_data], cat_to_id))
x_test = encode_sentences([content[0] for content in test_data], word_to_id)
y_test = to_categorical(encode_cate([content[1] for content in test_data], cat_to_id))
print(‘对序列做padding,保证是 samples*timestep 的维度’)
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
print(‘x_train shape:’, x_train.shape)
print(‘x_test shape:’, x_test.shape)
print(‘构建模型…’)
model = TextCNN(maxlen, max_features, embedding_dims).get_model()
model.compile(‘adam’, ‘categorical_crossentropy’, metrics=[‘accuracy’])
print(‘训练…’)
设定callbacks回调函数
my_callbacks = [
ModelCheckpoint(’./cnn_model.h5’, verbose=1),
EarlyStopping(monitor=‘val_accuracy’, patience=2, mode=‘max’)
]
fit拟合数据
history = model.fit(x_train, y_train,
batch_size=batch_size,
epochs=epochs,
callbacks=my_callbacks,
validation_data=(x_test, y_test))
#print(‘对测试集预测…’)
#result = model.predict(x_test)
import matplotlib.pyplot as plt
plt.switch_backend(‘agg’)
%matplotlib inline
fig1 = plt.figure()
plt.plot(history.history[‘loss’],‘r’,linewidth=3.0)
plt.plot(history.history[‘val_loss’],‘b’,linewidth=3.0)
plt.legend([‘Training loss’, ‘Validation Loss’],fontsize=18)
plt.xlabel('Epochs ',fontsize=16)
plt.ylabel(‘Loss’,fontsize=16)
plt.title(‘Loss Curves :CNN’,fontsize=16)
fig1.savefig(‘loss_cnn.png’)
plt.show()
fig2=plt.figure()
plt.plot(history.history[‘accuracy’],‘r’,linewidth=3.0)
plt.plot(history.history[‘val_accuracy’],‘b’,linewidth=3.0)
plt.legend([‘Training Accuracy’, ‘Validation Accuracy’],fontsize=18)
plt.xlabel('Epochs ',fontsize=16)
plt.ylabel(‘Accuracy’,fontsize=16)
plt.title(‘Accuracy Curves : CNN’,fontsize=16)
fig2.savefig(‘accuracy_cnn.png’)
plt.show()
from tensorflow.keras.utils import plot_model
plot_model(model, show_shapes=True, show_layer_names=True)
import tensorflow as tf
import shutil
model = tf.keras.models.load_model(’./cnn_model.h5’)
指定路径
if os.path.exists(’./Models/CNN/1’):
shutil.rmtree(’./Models/CNN/1’)
export_path = ‘./Models/CNN/1’
导出tensorflow模型以便部署
tf.saved_model.save(model,export_path)
import json
import numpy
import requests
import jieba
text = “这是该国史上最大的一次军事演习”
text_seg = encode_sentences([jieba.lcut(text)], word_to_id)
text_input = sequence.pad_sequences(text_seg, maxlen=maxlen)
data = json.dumps({“signature_name”: “serving_default”,
“instances”: text_input.reshape(1,100).tolist()})
headers = {“content-type”: “application/json”}
json_response = requests.post(‘http://localhost:8505/v1/models/default:predict’,
data=data, headers=headers)
#print(json.loads(json_response.text))
print(json_response.text)
from tensorflow.keras import backend as K
#from tensorflow.python.keras import backend as K
from tensorflow.keras import initializers, regularizers, constraints
from tensorflow.keras.layers import Layer
#from keras.engine.topology import Layer
class Attention(Layer):
def init(self, step_dim,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True, **kwargs):
“”"
Keras Layer that implements an Attention mechanism for temporal data.
Supports Masking.
Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
# Input shape
3D tensor with shape: (samples, steps, features)
.
# Output shape
2D tensor with shape: (samples, features)
.
:param kwargs:
Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
The dimensions are inferred based on the output shape of the RNN.
Example:
# 1
model.add(LSTM(64, return_sequences=True))
model.add(Attention())
# next add a Dense layer (for classification/regression) or whatever…
# 2
hidden = LSTM(64, return_sequences=True)(words)
sentence = Attention()(hidden)
# next add a Dense layer (for classification/regression) or whatever…
“”"
self.supports_masking = True
self.init = initializers.get(‘glorot_uniform’)
self.W_regularizer = regularizers.get(W_regularizer)
self.b_regularizer = regularizers.get(b_regularizer)
self.W_constraint = constraints.get(W_constraint)
self.b_constraint = constraints.get(b_constraint)
self.bias = bias
self.step_dim = step_dim
self.features_dim = 0
super(Attention, self).__init__(**kwargs)
def build(self, input_shape):
assert len(input_shape) == 3
self.W = self.add_weight(shape=(input_shape[-1],),
initializer=self.init,
name='{}_W'.format(self.name),
regularizer=self.W_regularizer,
constraint=self.W_constraint)
self.features_dim = input_shape[-1]
if self.bias:
self.b = self.add_weight(shape=(input_shape[1],),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.b_regularizer,
constraint=self.b_constraint)
else:
self.b = None
self.built = True
def compute_mask(self, input, input_mask=None):
# do not pass the mask to the next layers
return None
def call(self, x, mask=None):
features_dim = self.features_dim
step_dim = self.step_dim
e = K.reshape(K.dot(K.reshape(x, (-1, features_dim)), K.reshape(self.W, (features_dim, 1))), (-1, step_dim)) # e = K.dot(x, self.W)
if self.bias:
e += self.b
e = K.tanh(e)
a = K.exp(e)
# apply mask after the exp. will be re-normalized next
if mask is not None:
# cast the mask to floatX to avoid float64 upcasting in theano
a *= K.cast(mask, K.floatx())
# in some cases especially in the early stages of training the sum may be almost zero
# and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())
a = K.expand_dims(a)
c = K.sum(a * x, axis=1)
return c
def compute_output_shape(self, input_shape):
return input_shape[0], self.features_dim
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Embedding, Dense, Dropout, Bidirectional, LSTM, TimeDistributed
class HAN(object):
def init(self, maxlen_sentence, maxlen_word, max_features, embedding_dims,
class_num=5,
last_activation=‘softmax’):
self.maxlen_sentence = maxlen_sentence
self.maxlen_word = maxlen_word
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
# Word part
input_word = Input(shape=(self.maxlen_word,))
x_word = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen_word)(input_word)
x_word = Bidirectional(LSTM(128, return_sequences=True))(x_word) # LSTM or GRU
x_word = Attention(self.maxlen_word)(x_word)
model_word = Model(input_word, x_word)
# Sentence part
input = Input(shape=(self.maxlen_sentence, self.maxlen_word))
x_sentence = TimeDistributed(model_word)(input)
x_sentence = Bidirectional(LSTM(128, return_sequences=True))(x_sentence) # LSTM or GRU
x_sentence = Attention(self.maxlen_sentence)(x_sentence)
output = Dense(self.class_num, activation=self.last_activation)(x_sentence)
model = Model(inputs=input, outputs=output)
return model
from tensorflow.keras.preprocessing import sequence
import random
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from utils import *
路径等配置
data_dir = “./processed_data”
vocab_file = “./vocab/vocab.txt”
vocab_size = 40000
神经网络配置
max_features = 40001
maxlen_sentence = 16
maxlen_word = 25
batch_size = 32
embedding_dims = 50
epochs = 10
print(‘数据预处理与加载数据…’)
如果不存在词汇表,重建
if not os.path.exists(vocab_file):
build_vocab(data_dir, vocab_file, vocab_size)
获得 词汇/类别 与id映射字典
categories, cat_to_id = read_category()
words, word_to_id = read_vocab(vocab_file)
全部数据
x, y = read_files(data_dir)
data = list(zip(x,y))
del x,y
乱序
random.shuffle(data)
切分训练集和测试集
train_data, test_data = train_test_split(data)
对文本的词id和类别id进行编码
x_train = encode_sentences([content[0] for content in train_data], word_to_id)
y_train = to_categorical(encode_cate([content[1] for content in train_data], cat_to_id))
x_test = encode_sentences([content[0] for content in test_data], word_to_id)
y_test = to_categorical(encode_cate([content[1] for content in test_data], cat_to_id))
print(‘对序列做padding’)
x_train = sequence.pad_sequences(x_train, maxlen=maxlen_sentence * maxlen_word)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen_sentence * maxlen_word)
x_train = x_train.reshape((len(x_train), maxlen_sentence, maxlen_word))
x_test = x_test.reshape((len(x_test), maxlen_sentence, maxlen_word))
print(‘x_train shape:’, x_train.shape)
print(‘x_test shape:’, x_test.shape)
print(‘构建模型…’)
model = HAN(maxlen_sentence, maxlen_word, max_features, embedding_dims).get_model()
model.compile(‘adam’, ‘categorical_crossentropy’, metrics=[‘accuracy’])
print(‘Train…’)
early_stopping = EarlyStopping(monitor=‘val_accuracy’, patience=2, mode=‘max’)
history = model.fit(x_train, y_train,
batch_size=batch_size,
epochs=epochs,
callbacks=[early_stopping],
validation_data=(x_test, y_test))
print(‘Test…’)
result = model.predict(x_test)
from tensorflow.keras import backend as K
#from tensorflow.python.keras import backend as K
from tensorflow.keras import initializers, regularizers, constraints
from tensorflow.keras.layers import Layer
#from keras.engine.topology import Layer
class Attention(Layer):
def init(self, step_dim,
W_regularizer=None, b_regularizer=None,
W_constraint=None, b_constraint=None,
bias=True, **kwargs):
“”"
Keras Layer that implements an Attention mechanism for temporal data.
Supports Masking.
Follows the work of Raffel et al. [https://arxiv.org/abs/1512.08756]
# Input shape
3D tensor with shape: (samples, steps, features)
.
# Output shape
2D tensor with shape: (samples, features)
.
:param kwargs:
Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
The dimensions are inferred based on the output shape of the RNN.
Example:
# 1
model.add(LSTM(64, return_sequences=True))
model.add(Attention())
# next add a Dense layer (for classification/regression) or whatever…
# 2
hidden = LSTM(64, return_sequences=True)(words)
sentence = Attention()(hidden)
# next add a Dense layer (for classification/regression) or whatever…
“”"
self.supports_masking = True
self.init = initializers.get(‘glorot_uniform’)
self.W_regularizer = regularizers.get(W_regularizer)
self.b_regularizer = regularizers.get(b_regularizer)
self.W_constraint = constraints.get(W_constraint)
self.b_constraint = constraints.get(b_constraint)
self.bias = bias
self.step_dim = step_dim
self.features_dim = 0
super(Attention, self).__init__(**kwargs)
def build(self, input_shape):
assert len(input_shape) == 3
self.W = self.add_weight(shape=(input_shape[-1],),
initializer=self.init,
name='{}_W'.format(self.name),
regularizer=self.W_regularizer,
constraint=self.W_constraint)
self.features_dim = input_shape[-1]
if self.bias:
self.b = self.add_weight(shape=(input_shape[1],),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.b_regularizer,
constraint=self.b_constraint)
else:
self.b = None
self.built = True
def compute_mask(self, input, input_mask=None):
# do not pass the mask to the next layers
return None
def call(self, x, mask=None):
features_dim = self.features_dim
step_dim = self.step_dim
e = K.reshape(K.dot(K.reshape(x, (-1, features_dim)), K.reshape(self.W, (features_dim, 1))), (-1, step_dim)) # e = K.dot(x, self.W)
if self.bias:
e += self.b
e = K.tanh(e)
a = K.exp(e)
# apply mask after the exp. will be re-normalized next
if mask is not None:
# cast the mask to floatX to avoid float64 upcasting in theano
a *= K.cast(mask, K.floatx())
# in some cases especially in the early stages of training the sum may be almost zero
# and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())
a = K.expand_dims(a)
c = K.sum(a * x, axis=1)
return c
def compute_output_shape(self, input_shape):
return input_shape[0], self.features_dim
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Embedding, Dense, Dropout, Bidirectional, LSTM
class TextAttBiRNN(object):
def init(self, maxlen, max_features, embedding_dims,
class_num=5,
last_activation=‘softmax’):
self.maxlen = maxlen
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
input = Input((self.maxlen,))
embedding = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen)(input)
x = Bidirectional(LSTM(128, return_sequences=True))(embedding) # LSTM or GRU
x = Attention(self.maxlen)(x)
output = Dense(self.class_num, activation=self.last_activation)(x)
model = Model(inputs=input, outputs=output)
return model
from tensorflow.keras import Input, Model
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Embedding, Dense, SimpleRNN, Lambda, Concatenate, Conv1D, GlobalMaxPooling1D
class RCNN(object):
def init(self, maxlen, max_features, embedding_dims,
class_num=5,
last_activation=‘softmax’):
self.maxlen = maxlen
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
input_current = Input((self.maxlen,))
input_left = Input((self.maxlen,))
input_right = Input((self.maxlen,))
embedder = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen)
embedding_current = embedder(input_current)
embedding_left = embedder(input_left)
embedding_right = embedder(input_right)
x_left = SimpleRNN(128, return_sequences=True)(embedding_left)
x_right = SimpleRNN(128, return_sequences=True, go_backwards=True)(embedding_right)
x_right = Lambda(lambda x: K.reverse(x, axes=1))(x_right)
x = Concatenate(axis=2)([x_left, embedding_current, x_right])
x = Conv1D(64, kernel_size=1, activation='tanh')(x)
x = GlobalMaxPooling1D()(x)
output = Dense(self.class_num, activation=self.last_activation)(x)
model = Model(inputs=[input_current, input_left, input_right], outputs=output)
return model
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Embedding, Dense, Dropout, Bidirectional, LSTM
class TextBiRNN(object):
def init(self, maxlen, max_features, embedding_dims,
class_num=5,
last_activation=‘softmax’):
self.maxlen = maxlen
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
input = Input((self.maxlen,))
embedding = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen)(input)
#x = Bidirectional(CuDNNLSTM(128))(embedding)
x = Bidirectional(LSTM(128))(embedding)
output = Dense(self.class_num, activation=self.last_activation)(x)
model = Model(inputs=input, outputs=output)
return model
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Embedding, Dense, Dropout, LSTM
class TextRNN(object):
def init(self, maxlen, max_features, embedding_dims,
class_num=5,
last_activation=‘softmax’):
self.maxlen = maxlen
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
input = Input((self.maxlen,))
embedding = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen)(input)
x = LSTM(128)(embedding)
output = Dense(self.class_num, activation=self.last_activation)(x)
model = Model(inputs=input, outputs=output)
return model
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Embedding, Dense, Conv1D, GlobalMaxPooling1D, Concatenate, Dropout
class TextCNN(object):
def init(self, maxlen, max_features, embedding_dims,
class_num=5,
last_activation=‘softmax’):
self.maxlen = maxlen
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
input = Input((self.maxlen,))
embedding = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen)(input)
convs = []
for kernel_size in [3, 4, 5]:
c = Conv1D(128, kernel_size, activation='relu')(embedding)
c = GlobalMaxPooling1D()(c)
convs.append(c)
x = Concatenate()(convs)
output = Dense(self.class_num, activation=self.last_activation)(x)
model = Model(inputs=input, outputs=output)
return model
import jieba
import pandas as pd
import random
cate_dic = {‘technology’:1, ‘car’:2, ‘entertainment’:3, ‘military’:4, ‘sports’:5}
df_technology = pd.read_csv("./origin_data/technology_news.csv", encoding=‘utf-8’)
df_technology = df_technology.dropna()
df_car = pd.read_csv("./origin_data/car_news.csv", encoding=‘utf-8’)
df_car = df_car.dropna()
df_entertainment = pd.read_csv("./origin_data/entertainment_news.csv", encoding=‘utf-8’)
df_entertainment = df_entertainment.dropna()
df_military = pd.read_csv("./origin_data/military_news.csv", encoding=‘utf-8’)
df_military = df_military.dropna()
df_sports = pd.read_csv("./origin_data/sports_news.csv", encoding=‘utf-8’)
df_sports = df_sports.dropna()
technology = df_technology.content.values.tolist()[1000:21000]
car = df_car.content.values.tolist()[1000:21000]
entertainment = df_entertainment.content.values.tolist()[:20000]
military = df_military.content.values.tolist()[:20000]
sports = df_sports.content.values.tolist()[:20000]
stopwords=pd.read_csv(“origin_data/stopwords.txt”,index_col=False,quoting=3,sep="\t",names=[‘stopword’], encoding=‘utf-8’)
stopwords=stopwords[‘stopword’].values
def preprocess_text(content_lines, sentences, category):
for line in content_lines:
try:
segs=jieba.lcut(line)
segs = list(filter(lambda x:len(x)>1, segs))
segs = list(filter(lambda x:x not in stopwords, segs))
sentences.append(“label”+str(category)+" , “+” ".join(segs))
except Exception as e:
print(line)
continue
#生成训练数据
sentences = []
preprocess_text(technology, sentences, cate_dic[‘technology’])
preprocess_text(car, sentences, cate_dic[‘car’])
preprocess_text(entertainment, sentences, cate_dic[‘entertainment’])
preprocess_text(military, sentences, cate_dic[‘military’])
preprocess_text(sports, sentences, cate_dic[‘sports’])
random.shuffle(sentences)
print(“writing data to fasttext format…”)
out = open(‘train_data.txt’, ‘w’, encoding=‘utf8’)
for sentence in sentences:
out.write(sentence+"\n")
print(“done!”)
import fasttext
import fasttext
classifier = fasttext.train_supervised(‘train_data.txt’, label_prefix=‘label’)
result = classifier.test(‘train_data.txt’)
print(‘P@1:’, result[0])
print(‘R@1:’, result[1])
print(‘Number of examples:’, result[2])
label_to_cate = {1:‘technology’, 2:‘car’, 3:‘entertainment’, 4:‘military’, 5:‘sports’}
#texts = [‘中新网 日电 2018 预赛 亚洲区 强赛 中国队 韩国队 较量 比赛 上半场 分钟 主场 作战 中国队 率先 打破 场上 僵局 利用 角球 机会 大宝 前点 攻门 得手 中国队 领先’]
texts = [‘这 是 中国 第 一 次 军舰 演习’]
labels = classifier.predict(texts)
print(labels)
print(label_to_cate[int(labels[0][0])])
labels = classifier.predict_proba(texts)
print(labels)
labels = classifier.predict(texts, k=3)
print(labels)
labels = classifier.predict_proba(texts, k=3)
print(labels)
def preprocess_text_unsupervised(content_lines, sentences, category):
for line in content_lines:
try:
segs=jieba.lcut(line)
segs = list(filter(lambda x:len(x)>1, segs))
segs = list(filter(lambda x:x not in stopwords, segs))
sentences.append(" ".join(segs))
except Exception as e:
print( line)
continue
#生成无监督训练数据
sentences = []
preprocess_text(technology, sentences, cate_dic[‘technology’])
preprocess_text(car, sentences, cate_dic[‘car’])
preprocess_text(entertainment, sentences, cate_dic[‘entertainment’])
preprocess_text(military, sentences, cate_dic[‘military’])
preprocess_text(sports, sentences, cate_dic[‘sports’])
print(“writing data to fasttext unsupervised learning format…”)
out = open(‘unsupervised_train_data.txt’, ‘wb’)
for sentence in sentences:
out.write(sentence.encode(‘utf8’)+b’\n’)
print( “done!” )
import fasttext
Skipgram model
model = fasttext.skipgram(‘unsupervised_train_data.txt’, ‘model’)
print model.words # list of words in dictionary
CBOW model
model = fasttext.cbow(‘unsupervised_train_data.txt’, ‘model’)
print(model.words) # list of words in dictionary
print(model.words)
print(model[‘赛季’])
def preprocess_text_unsupervised(content_lines, sentences):
for line in content_lines:
try:
segs=jieba.lcut(line)
segs = filter(lambda x:len(x)>1, segs)
segs = filter(lambda x:x not in stopwords, segs)
sentences.append(" ".join(segs))
except Exception as e:
print(line)
continue
#生成无监督训练数据
sentences = []
preprocess_text(technology, sentences)
preprocess_text(car, sentences)
preprocess_text(entertainment, sentences)
preprocess_text(military, sentences)
preprocess_text(sports, sentences)
model = Word2Vec(sentences, size=100, window=5, min_count=5, workers=4)
model.save(“gensim_word2vec.model”)
model.wv[‘赛季’]
model.wv.most_similar(‘赛季’)
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
def parse_lines§:
lines = p[1].split(’\n’)
category = p[0].split(’/’)[-1].split(’.’)[0]
return [Row(cate=category, sentence=sent) for sent in lines]
def words_classify_main(spark):
sc = spark.sparkContext
# Tokenizer将输入的字符串格式化为小写,并按空格进行分割
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# 自定义使用numFeatures个hash桶来存储特征值
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=8000)
# Inverse Document Frequency(计算逆文本频率指数)
idf = IDF(inputCol="rawFeatures", outputCol="features")
# 从HDFS中加载输入源到dataframe中
srcdf = sc.wholeTextFiles("file://"+'./processed_data').map(parse_lines).flatmap(lambda x:x)
# 这里按80%-20%的比例分成训练集和测试集
training, testing = srcdf.randomSplit([0.8, 0.2])
# 得到训练集的词条集合
wordsData = tokenizer.transform(training)
# 将词条集合转换为特征向量集合
featurizedData = hashingTF.transform(wordsData)
# 在特征向量上应用fit()来得到model
idfModel = idf.fit(featurizedData)
# 得到每个单词对应的TF-IDF度量值
rescaledData = idfModel.transform(featurizedData)
# 类别编码
label_stringIdx = StringIndexer(inputCol = "cate", outputCol = "label")
pipeline = Pipeline(stages=[label_stringIdx])
pipelineFit = pipeline.fit(rescaledData)
trainData = pipelineFit.transform(rescaledData)
# 持久化,避免重复加载
trainData.persist()
# 转换数据集用于NaiveBayes的输入
trainDF = trainData.select("features", "label").rdd.map(
lambda x:Row(label=x['label'], features=Vectors.dense(x['features']))
).toDF()
# NaiveBayes分类器
naivebayes = NaiveBayes(smoothing=1.0, modelType="multinomial")
# 通过训练集得到NaiveBayesModel
model = naivebayes.fit(trainDF)
# 得到测试集的词条集合
testWordsData = tokenizer.transform(testing)
# 将词条集合转换为特征向量集合
testFeaturizedData = hashingTF.transform(testWordsData)
# 在特征向量上应用fit()来得到model
testIDFModel = idf.fit(testFeaturizedData)
# 得到每个单词对应的TF-IDF度量值
testRescaledData = testIDFModel.transform(testFeaturizedData)
# 测试集
testData = pipelineFit.transform(testRescaledData)
# 持久化,避免重复加载
testData.persist()
testDF = testRescaledData.select("features", "label").rdd.map(
lambda x:Row(label=x['label'], features=Vectors.dense(x['features']))
).toDF()
# 使用训练模型对测试集进行预测
predictions = model.transform(testDF)
predictions.show()
# 计算model在测试集上的准确性
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("测试集上的准确率为 " + str(accuracy))
if name == “main”:
spark = SparkSession
.builder
.appName(“spark_naivebayes_classify”)
.getOrCreate()
words_classify_main(spark)
spark.stop()
import jieba
import pandas as pd
df_technology = pd.read_csv("./origin_data/technology_news.csv", encoding=‘utf-8’)
df_technology = df_technology.dropna()
df_car = pd.read_csv("./origin_data/car_news.csv", encoding=‘utf-8’)
df_car = df_car.dropna()
df_entertainment = pd.read_csv("./origin_data/entertainment_news.csv", encoding=‘utf-8’)
df_entertainment = df_entertainment.dropna()
df_military = pd.read_csv("./origin_data/military_news.csv", encoding=‘utf-8’)
df_military = df_military.dropna()
df_sports = pd.read_csv("./origin_data/sports_news.csv", encoding=‘utf-8’)
df_sports = df_sports.dropna()
technology = df_technology.content.values.tolist()[1000:21000]
car = df_car.content.values.tolist()[1000:21000]
entertainment = df_entertainment.content.values.tolist()[:20000]
military = df_military.content.values.tolist()[:20000]
sports = df_sports.content.values.tolist()[:20000]
print(technology[12])
stopwords=pd.read_csv(“origin_data/stopwords.txt”,index_col=False,quoting=3,sep="\t",names=[‘stopword’], encoding=‘utf-8’)
stopwords=stopwords[‘stopword’].values
def preprocess_text(content_lines, sentences, category, target_path):
out_f = open(target_path+"/"+category+".txt", ‘w’)
for line in content_lines:
try:
segs=jieba.lcut(line)
segs = list(filter(lambda x:len(x)>1, segs)) #没有解析出来的新闻过滤掉
segs = list(filter(lambda x:x not in stopwords, segs)) #把停用词过滤掉
sentences.append((" “.join(segs), category))
out_f.write(” “.join(segs)+”\n")
except Exception as e:
print(line)
continue
out_f.close()
#生成训练数据
sentences = []
preprocess_text(technology, sentences, ‘technology’, ‘processed_data’)
preprocess_text(car, sentences, ‘car’, ‘processed_data’)
preprocess_text(entertainment, sentences, ‘entertainment’, ‘processed_data’)
preprocess_text(military, sentences, ‘military’, ‘processed_data’)
preprocess_text(sports, sentences, ‘sports’, ‘processed_data’)
import random
random.shuffle(sentences)
sentences[:10]
from sklearn.model_selection import train_test_split
x, y = zip(*sentences)
x_train, x_test, y_train, y_test = train_test_split(x, y, random_state=1234)
from sklearn.feature_extraction.text import CountVectorizer
vec = CountVectorizer(
analyzer=‘word’, # tokenise by character ngrams
max_features=4000, # keep the most common 4000 ngrams
)
vec.fit(x_train)
def get_features(x):
vec.transform(x)
from sklearn.naive_bayes import MultinomialNB
classifier = MultinomialNB()
classifier.fit(vec.transform(x_train), y_train)
classifier.score(vec.transform(x_test), y_test)
from sklearn.feature_extraction.text import CountVectorizer
vec = CountVectorizer(
analyzer=‘word’, # tokenise by character ngrams
ngram_range=(1,4), # use ngrams of size 1, 2, 3, 4
max_features=20000, # keep the most common 2000 ngrams
)
vec.fit(x_train)
def get_features(x):
vec.transform(x)
from sklearn.naive_bayes import MultinomialNB
classifier = MultinomialNB()
classifier.fit(vec.transform(x_train), y_train)
classifier.score(vec.transform(x_test), y_test)
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score
import numpy as np
def stratifiedkfold_cv(x, y, clf_class, shuffle=True, n_folds=5, **kwargs):
stratifiedk_fold = StratifiedKFold(n_splits=n_folds, shuffle=shuffle)
y_pred = y[:]
for train_index, test_index in stratifiedk_fold.split(x, y):
X_train, X_test = x[train_index], x[test_index]
y_train = y[train_index]
clf = clf_class(**kwargs)
clf.fit(X_train,y_train)
y_pred[test_index] = clf.predict(X_test)
return y_pred
NB = MultinomialNB
print(precision_score(y, stratifiedkfold_cv(vec.transform(x),np.array(y),NB), average=‘macro’))
import re
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
class TextClassifier():
def __init__(self, classifier=MultinomialNB()):
self.classifier = classifier
self.vectorizer = CountVectorizer(analyzer='word', ngram_range=(1,4), max_features=20000)
def features(self, X):
return self.vectorizer.transform(X)
def fit(self, X, y):
self.vectorizer.fit(X)
self.classifier.fit(self.features(X), y)
def predict(self, x):
return self.classifier.predict(self.features([x]))
def score(self, X, y):
return self.classifier.score(self.features(X), y)
def save_model(self, path):
dump((self.classifier, self.vectorizer), path)
def load_model(self, path):
self.classifier, self.vectorizer = load(path)
text_classifier = TextClassifier()
text_classifier.fit(x_train, y_train)
print(text_classifier.predict(‘这 是 有史以来 最 大 的 一 次 军舰 演习’))
print(text_classifier.score(x_test, y_test))
from sklearn.svm import SVC
svm = SVC(kernel=‘linear’)
svm.fit(vec.transform(x_train), y_train)
svm.score(vec.transform(x_test), y_test)
from sklearn.svm import SVC
svm = SVC()
svm.fit(vec.transform(x_train), y_train)
svm.score(vec.transform(x_test), y_test)
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
class TextClassifier():
def __init__(self, classifier=SVC(kernel='linear')):
self.classifier = classifier
self.vectorizer = TfidfVectorizer(analyzer='word', ngram_range=(1,3), max_features=12000)
def features(self, X):
return self.vectorizer.transform(X)
def fit(self, X, y):
self.vectorizer.fit(X)
self.classifier.fit(self.features(X), y)
def predict(self, x):
return self.classifier.predict(self.features([x]))
def score(self, X, y):
return self.classifier.score(self.features(X), y)
text_classifier = TextClassifier()
text_classifier.fit(x_train, y_train)
print(text_classifier.predict(‘这 是 有史以来 最 大 的 一 次 军舰 演习’))
print(text_classifier.score(x_test, y_test))
import jieba.analyse as analyse
import pandas as pd
df = pd.read_csv("./origin_data/technology_news.csv", encoding=‘utf-8’)
df = df.dropna()
lines=df.content.values.tolist()
content = “”.join(lines)
print(" ".join(analyse.extract_tags(content, topK=30, withWeight=False, allowPOS=())))
import jieba.analyse as analyse
import pandas as pd
df = pd.read_csv("./origin_data/military_news.csv", encoding=‘utf-8’)
df = df.dropna()
lines=df.content.values.tolist()
content = “”.join(lines)
print(" ".join(analyse.extract_tags(content, topK=30, withWeight=False, allowPOS=())))
import jieba.analyse as analyse
import pandas as pd
df = pd.read_csv("./origin_data/military_news.csv", encoding=‘utf-8’)
df = df.dropna()
lines=df.content.values.tolist()
content = “”.join(lines)
print(" “.join(analyse.textrank(content, topK=20, withWeight=False, allowPOS=(‘ns’, ‘n’, ‘vn’, ‘v’))))
print(”---------------------我是分割线----------------")
print(" ".join(analyse.textrank(content, topK=20, withWeight=False, allowPOS=(‘ns’, ‘n’))))
from gensim import corpora, models, similarities
import gensim
stopwords=pd.read_csv(“origin_data/stopwords.txt”,index_col=False,quoting=3,sep="\t",names=[‘stopword’], encoding=‘utf-8’)
stopwords=stopwords[‘stopword’].values
import jieba
import pandas as pd
df = pd.read_csv("./origin_data/technology_news.csv", encoding=‘utf-8’)
df = df.dropna()
lines=df.content.values.tolist()
sentences=[]
for line in lines:
try:
segs=jieba.lcut(line)
segs = list(filter(lambda x:len(x)>1, segs))
segs = list(filter(lambda x:x not in stopwords, segs))
sentences.append(list(segs))
except Exception as e:
print(line)
continue
dictionary = corpora.Dictionary(sentences)
corpus = [dictionary.doc2bow(sentence) for sentence in sentences]
lda = gensim.models.ldamodel.LdaModel(corpus=corpus, id2word=dictionary, num_topics=20)
print(lda.print_topic(3, topn=5))
for topic in lda.print_topics(num_topics=20, num_words=8):
print(topic[1])
#coding:utf-8
author = ‘Hanxiaoyang’
import warnings
warnings.filterwarnings(“ignore”)
import jieba #分词包
import numpy #numpy计算包
import codecs #codecs提供的open方法来指定打开的文件的语言编码,它会在读取的时候自动转换为内部unicode
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib
matplotlib.rcParams[‘figure.figsize’] = (10.0, 5.0)
from wordcloud import WordCloud#词云包
df = pd.read_csv("./origin_data/entertainment_news.csv", encoding=‘utf-8’)
sen = ‘我爱学习自然语言处理’
jieba.lcut(sen)
df = pd.read_csv("./origin_data/entertainment_news.csv", encoding=‘utf-8’)
df = df.dropna()
content=df[“content”].values.tolist()
segment=[]
for line in content:
try:
segs=jieba.lcut(line)
for seg in segs:
if len(seg)>1 and seg!=’\r\n’:
segment.append(seg)
except:
print(line)
continue
words_df=pd.DataFrame({‘segment’:segment})
#words_df.head()
stopwords=pd.read_csv(“origin_data/stopwords.txt”,index_col=False,quoting=3,sep="\t",names=[‘stopword’], encoding=‘utf-8’)#quoting=3全不引用
#stopwords.head()
words_df=words_df[~words_df.segment.isin(stopwords.stopword)]
#words_stat=words_df.groupby(by=[‘segment’])[‘segment’].agg({“计数”:numpy.size})
#words_stat=words_stat.reset_index().sort_values(by=[“计数”],ascending=False)
words_stat = words_df.groupby(‘segment’).agg(计数=pd.NamedAgg(column=‘segment’, aggfunc=‘size’)).reset_index().sort_values(
by=‘计数’, ascending=False)
words_stat.head()
matplotlib.rcParams[‘figure.figsize’] = (12.0, 12.0)
wordcloud=WordCloud(font_path=“origin_data/simhei.ttf”,background_color=“white”,max_font_size=80)
word_frequence = {x[0]:x[1] for x in words_stat.head(1000).values}
wordcloud=wordcloud.fit_words(word_frequence)
plt.imshow(wordcloud)
#from scipy.misc import imread
from imageio import imread
matplotlib.rcParams[‘figure.figsize’] = (12.0, 12.0)
from wordcloud import WordCloud,ImageColorGenerator
bimg=imread(‘img/entertainment.jpeg’)
wordcloud=WordCloud(background_color=“white”,mask=bimg,font_path=‘origin_data/simhei.ttf’,max_font_size=200)
word_frequence = {x[0]:x[1] for x in words_stat.head(1000).values}
wordcloud=wordcloud.fit_words(word_frequence)
bimgColors=ImageColorGenerator(bimg)
plt.axis(“off”)
plt.imshow(wordcloud.recolor(color_func=bimgColors))
df = pd.read_csv("./origin_data/sports_news.csv", encoding=‘utf-8’)
df = df.dropna()
content=df.content.values.tolist()
#jieba.load_userdict(u"data/user_dic.txt")
segment=[]
for line in content:
try:
segs=jieba.lcut(line)
for seg in segs:
if len(seg)>1 and seg!=’\r\n’:
segment.append(seg)
except:
print(line)
continue
matplotlib.rcParams[‘figure.figsize’] = (10.0, 8.0)
words_df=pd.DataFrame({‘segment’:segment})
#words_df.head()
stopwords=pd.read_csv(“origin_data/stopwords.txt”,index_col=False,quoting=3,sep="\t",names=[‘stopword’], encoding=‘utf-8’)#quoting=3全不引用
#stopwords.head()
words_df=words_df[~words_df.segment.isin(stopwords.stopword)]
#words_stat=words_df.groupby(by=[‘segment’])[‘segment’].agg({“计数”:numpy.size})
#words_stat=words_stat.reset_index().sort_values(by=[“计数”],ascending=False)
words_stat = words_df.groupby(‘segment’).agg(计数=pd.NamedAgg(column=‘segment’, aggfunc=‘size’)).reset_index().sort_values(
by=‘计数’, ascending=False)
words_stat.head()
wordcloud=WordCloud(font_path=“origin_data/simhei.ttf”,background_color=“black”,max_font_size=80)
word_frequence = {x[0]:x[1] for x in words_stat.head(1000).values}
wordcloud=wordcloud.fit_words(word_frequence)
plt.imshow(wordcloud)
#可视化词云绘图
#from scipy.misc import imread
from imageio import imread
matplotlib.rcParams[‘figure.figsize’] = (12.0, 10.0)
from wordcloud import WordCloud,ImageColorGenerator
bimg=imread(‘img/sports.jpeg’)
wordcloud=WordCloud(background_color=“white”,mask=bimg,font_path=‘origin_data/simhei.ttf’,max_font_size=200)
word_frequence = {x[0]:x[1] for x in words_stat.head(1000).values}
wordcloud=wordcloud.fit_words(word_frequence)
bimgColors=ImageColorGenerator(bimg)
plt.axis(“off”)
plt.imshow(wordcloud.recolor(color_func=bimgColors))