垃圾分类纸片图是来自一些早教产品的,这样只通过截取纸片的垃圾比收集数据集更方便。我们选取5种垃圾展现情况,酒精、篮球、面包,帽子,还有空物下检测台的“十字架”,数据集下载链接:https://download.csdn.net/download/YANGJINCHAHAHA/19867998
将原始数据打乱并按照train-valid-test的文件结构分成三部分,split_dataset.py,代码假设你的源数据集在文件夹original_dataset,保存在dataset文件夹下。
import os
import random
import shutil
class SplitDataset():
def __init__(self, dataset_dir, saved_dataset_dir, train_ratio=0.6, test_ratio=0.2, show_progress=False):
self.dataset_dir = dataset_dir
self.saved_dataset_dir = saved_dataset_dir
self.saved_train_dir = saved_dataset_dir + "/train/"
self.saved_valid_dir = saved_dataset_dir + "/valid/"
self.saved_test_dir = saved_dataset_dir + "/test/"
self.train_ratio = train_ratio
self.test_radio = test_ratio
self.valid_ratio = 1 - train_ratio - test_ratio
self.train_file_path = []
self.valid_file_path = []
self.test_file_path = []
self.index_label_dict = {}
self.show_progress = show_progress
if not os.path.exists(self.saved_train_dir):
os.mkdir(self.saved_train_dir)
if not os.path.exists(self.saved_test_dir):
os.mkdir(self.saved_test_dir)
if not os.path.exists(self.saved_valid_dir):
os.mkdir(self.saved_valid_dir)
def __get_label_names(self):
label_names = []
for item in os.listdir(self.dataset_dir):
item_path = os.path.join(self.dataset_dir, item)
if os.path.isdir(item_path):
label_names.append(item)
return label_names
def __get_all_file_path(self):
all_file_path = []
index = 0
for file_type in self.__get_label_names():
self.index_label_dict[index] = file_type
index += 1
type_file_path = os.path.join(self.dataset_dir, file_type)
file_path = []
for file in os.listdir(type_file_path):
single_file_path = os.path.join(type_file_path, file)
file_path.append(single_file_path)
all_file_path.append(file_path)
return all_file_path
def __copy_files(self, type_path, type_saved_dir):
for item in type_path:
src_path_list = item[1]
dst_path = type_saved_dir + "%s/" % (item[0])
if not os.path.exists(dst_path):
os.mkdir(dst_path)
for src_path in src_path_list:
shutil.copy(src_path, dst_path)
if self.show_progress:
print("Copying file "+src_path+" to "+dst_path)
def __split_dataset(self):
all_file_paths = self.__get_all_file_path()
for index in range(len(all_file_paths)):
file_path_list = all_file_paths[index]
file_path_list_length = len(file_path_list)
random.shuffle(file_path_list)
train_num = int(file_path_list_length * self.train_ratio)
test_num = int(file_path_list_length * self.test_radio)
self.train_file_path.append([self.index_label_dict[index], file_path_list[: train_num]])
self.test_file_path.append([self.index_label_dict[index], file_path_list[train_num:train_num + test_num]])
self.valid_file_path.append([self.index_label_dict[index], file_path_list[train_num + test_num:]])
def start_splitting(self):
self.__split_dataset()
self.__copy_files(type_path=self.train_file_path, type_saved_dir=self.saved_train_dir)
self.__copy_files(type_path=self.valid_file_path, type_saved_dir=self.saved_valid_dir)
self.__copy_files(type_path=self.test_file_path, type_saved_dir=self.saved_test_dir)
if __name__ == '__main__':
split_dataset = SplitDataset(dataset_dir="original_dataset",
saved_dataset_dir="dataset",
show_progress=True)
split_dataset.start_splitting()
数据集制作完毕后,再进行加载和读取数据,prepare_data.py,该部分有很多超参数在config中,最后再介绍。
import tensorflow as tf
import config
import pathlib
from config import image_height, image_width, channels
def load_and_preprocess_image(img_path):
# read pictures
img_raw = tf.io.read_file(img_path)
# decode pictures
img_tensor = tf.image.decode_jpeg(img_raw, channels=channels)
# resize
img_tensor = tf.image.resize(img_tensor, [image_height, image_width])
img_tensor = tf.cast(img_tensor, tf.float32)
# normalization
img = img_tensor / 255.0
return img
def get_images_and_labels(data_root_dir):
# get all images' paths (format: string)
data_root = pathlib.Path(data_root_dir)
all_image_path = [str(path) for path in list(data_root.glob('*/*'))]
# get labels' names
label_names = sorted(item.name for item in data_root.glob('*/'))
# dict: {label : index}
label_to_index = dict((label, index) for index, label in enumerate(label_names))
# get all images' labels
all_image_label = [label_to_index[pathlib.Path(single_image_path).parent.name] for single_image_path in all_image_path]
return all_image_path, all_image_label
def get_dataset(dataset_root_dir):
all_image_path, all_image_label = get_images_and_labels(data_root_dir=dataset_root_dir)
# load the dataset and preprocess images
image_dataset = tf.data.Dataset.from_tensor_slices(all_image_path).map(load_and_preprocess_image)
label_dataset = tf.data.Dataset.from_tensor_slices(all_image_label)
dataset = tf.data.Dataset.zip((image_dataset, label_dataset))
image_count = len(all_image_path)
return dataset, image_count
def generate_datasets():
train_dataset, train_count = get_dataset(dataset_root_dir=config.train_dir)
valid_dataset, valid_count = get_dataset(dataset_root_dir=config.valid_dir)
test_dataset, test_count = get_dataset(dataset_root_dir=config.test_dir)
# read the original_dataset in the form of batch
train_dataset = train_dataset.shuffle(buffer_size=train_count).batch(batch_size=config.BATCH_SIZE)
valid_dataset = valid_dataset.batch(batch_size=config.BATCH_SIZE)
test_dataset = test_dataset.batch(batch_size=config.BATCH_SIZE)
return train_dataset, valid_dataset, test_dataset, train_count, valid_count, test_count
def generate_test_dataset(path):
#path=[r"E:\garbage\Crop\test\188.jpg"]
label=[18]
img=load_and_preprocess_image(path[0])
image_dataset = tf.data.Dataset.from_tensor_slices(path).map(load_and_preprocess_image)
#image_dataset = tf.data.Dataset.from_tensor_slices(img)
label_dataset = tf.data.Dataset.from_tensor_slices(label)
dataset = tf.data.Dataset.zip((image_dataset, label_dataset))
test_dataset = dataset.batch(batch_size=1)
return test_dataset
if __name__=="__main__":
img=load_and_preprocess_image("pic/1/wait.png")
print(img)
网络模型的搭建,tensorflow2.0搭建网络很方便。我选择vgg16。
import tensorflow as tf
from tensorflow.keras import layers,models,Model,Sequential
def vgg16(im_height=224,im_width=224,class_num=1000):
#input_image=layers.Input(shape=(im_height,im_width,3),dtype="float32")
input_image = tf.keras.Input(shape=(im_height, im_width, 3), dtype="float32")
x=layers.Conv2D(64,kernel_size=3,strides=1,padding="SAME",activation='relu',name="conv2d_1")(input_image)
x=layers.MaxPool2D(pool_size=3,strides=2,padding="SAME",name="maxpool_1")(x)
x=layers.Conv2D(128,kernel_size=3,strides=1,padding="SAME",activation='relu',name='conv2d_2')(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_2")(x)
x = layers.Conv2D(256, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_3.1')(x)
x = layers.Conv2D(256, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_3.2')(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_3")(x)
x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_4.1')(x)
x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_4.2')(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_4")(x)
x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_5.1')(x)
x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_5.2')(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_5")(x)
x=layers.Flatten(name="output_flatten")(x)
x=layers.Dense(4096,name="output_dense_1")(x)
x = layers.Dense(4096, name="output_dense_2")(x)
x = layers.Dense(class_num, name="output_dense_3",activation=tf.keras.activations.softmax)(x)
model=Model(inputs=input_image,outputs=x)
return model
if __name__=="__main__":
model=vgg16()
model.summary()
最后就是训练模型了,train.py中包括,损失函数优化器定义,训练阶段、测试阶段,数据和模型的导出。
from __future__ import absolute_import, division, print_function
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
from models import vgg
import config
from prepare_data import generate_datasets
import math
if __name__ == '__main__':
# GPU settings
# get the original_dataset
train_dataset, valid_dataset, test_dataset, train_count, valid_count, test_count = generate_datasets()
# create model
model=vgg.vgg16(class_num=5)
# define loss and optimizer
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adadelta()
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
valid_loss = tf.keras.metrics.Mean(name='valid_loss')
valid_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='valid_accuracy')
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_object(y_true=labels, y_pred=predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(grads_and_vars=zip(gradients, model.trainable_variables))
train_loss(loss)
train_accuracy(labels, predictions)
@tf.function
def valid_step(images, labels):
predictions = model(images, training=False)
v_loss = loss_object(labels, predictions)
valid_loss(v_loss)
valid_accuracy(labels, predictions)
# start training
for epoch in range(config.EPOCHS):
train_loss.reset_states()
train_accuracy.reset_states()
valid_loss.reset_states()
valid_accuracy.reset_states()
step = 0
for images, labels in train_dataset:
step += 1
train_step(images, labels)
print("Epoch: {}/{}, step: {}/{}, loss: {:.5f}, accuracy: {:.5f}".format(epoch + 1,
config.EPOCHS,
step,
math.ceil(train_count / config.BATCH_SIZE),
train_loss.result(),
train_accuracy.result()))
for valid_images, valid_labels in valid_dataset:
valid_step(valid_images, valid_labels)
print("Epoch: {}/{}, train loss: {:.5f}, train accuracy: {:.5f}, "
"valid loss: {:.5f}, valid accuracy: {:.5f}".format(epoch + 1,
config.EPOCHS,
train_loss.result(),
train_accuracy.result(),
valid_loss.result(),
valid_accuracy.result()))
model.save_weights(filepath=config.save_model_dir, save_format='tf')
最后补上放置参数的config.py,有些文件夹目录参数可根据自行更改。
# some training parameters
EPOCHS = 100
BATCH_SIZE = 8
NUM_CLASSES = 5
image_height = 224
image_width = 224
channels = 3
save_model_dir = "saved_model/model"
dataset_dir = "dataset/"
train_dir = dataset_dir + "train"
valid_dir = dataset_dir + "valid"
test_dir = dataset_dir + "test"
test_our_dir=r"E:\garbage\Crop\test"
训练完成的模型链接如下,假如你想测试单张图片的效果,可以通过evalute.py测试,这里的Num_CLASSES5.txt是五个类别,与模型文件下载链接在一起。
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
import config
from prepare_data import generate_datasets,generate_test_dataset
from train import get_model
import matplotlib.pyplot as plt
import matplotlib
from models import vgg
matplotlib.rcParams['font.sans-serif'] = ['KaiTi']
import os
f=open("dataset/Num_CLASSES5.txt","r")
label_list=[]
for i in f.readlines():
i=i.strip()
label_list.append(i)
print(label_list)
if __name__ == '__main__':
# GPU settings
# get the original_dataset
test_dataset=generate_test_dataset(path=[r"dataset/test/hat/254.jpg"])
model = vgg.vgg16(class_num=5)
model.load_weights(filepath=config.save_model_dir)
# Get the accuracy on the test set
loss_object = tf.keras.metrics.SparseCategoricalCrossentropy()
test_loss = tf.keras.metrics.Mean()
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
def test_step(images):
predictions = model(images, training=False)
return predictions
for test_images, test_labels in test_dataset:
pr=test_step(test_images)
print(pr)
label=label_list[int(tf.argmax(pr,1))]
print(label)
plt.imshow(test_images[0])
plt.text(1, 1, label,fontsize=30)
plt.show()