小卡片下的垃圾分类tensorflow2.0系列

    垃圾分类纸片图是来自一些早教产品的,这样只通过截取纸片的垃圾比收集数据集更方便。我们选取5种垃圾展现情况,酒精、篮球、面包,帽子,还有空物下检测台的“十字架”,数据集下载链接:https://download.csdn.net/download/YANGJINCHAHAHA/19867998

    将原始数据打乱并按照train-valid-test的文件结构分成三部分,split_dataset.py,代码假设你的源数据集在文件夹original_dataset,保存在dataset文件夹下。

import os
import random
import shutil


class SplitDataset():
    def __init__(self, dataset_dir, saved_dataset_dir, train_ratio=0.6, test_ratio=0.2, show_progress=False):
        self.dataset_dir = dataset_dir
        self.saved_dataset_dir = saved_dataset_dir
        self.saved_train_dir = saved_dataset_dir + "/train/"
        self.saved_valid_dir = saved_dataset_dir + "/valid/"
        self.saved_test_dir = saved_dataset_dir + "/test/"


        self.train_ratio = train_ratio
        self.test_radio = test_ratio
        self.valid_ratio = 1 - train_ratio - test_ratio

        self.train_file_path = []
        self.valid_file_path = []
        self.test_file_path = []

        self.index_label_dict = {}

        self.show_progress = show_progress

        if not os.path.exists(self.saved_train_dir):
            os.mkdir(self.saved_train_dir)
        if not os.path.exists(self.saved_test_dir):
            os.mkdir(self.saved_test_dir)
        if not os.path.exists(self.saved_valid_dir):
            os.mkdir(self.saved_valid_dir)


    def __get_label_names(self):
        label_names = []
        for item in os.listdir(self.dataset_dir):
            item_path = os.path.join(self.dataset_dir, item)
            if os.path.isdir(item_path):
                label_names.append(item)
        return label_names

    def __get_all_file_path(self):
        all_file_path = []
        index = 0
        for file_type in self.__get_label_names():
            self.index_label_dict[index] = file_type
            index += 1
            type_file_path = os.path.join(self.dataset_dir, file_type)
            file_path = []
            for file in os.listdir(type_file_path):
                single_file_path = os.path.join(type_file_path, file)
                file_path.append(single_file_path)
            all_file_path.append(file_path)
        return all_file_path

    def __copy_files(self, type_path, type_saved_dir):
        for item in type_path:
            src_path_list = item[1]
            dst_path = type_saved_dir + "%s/" % (item[0])
            if not os.path.exists(dst_path):
                os.mkdir(dst_path)
            for src_path in src_path_list:
                shutil.copy(src_path, dst_path)
                if self.show_progress:
                    print("Copying file "+src_path+" to "+dst_path)

    def __split_dataset(self):
        all_file_paths = self.__get_all_file_path()
        for index in range(len(all_file_paths)):
            file_path_list = all_file_paths[index]
            file_path_list_length = len(file_path_list)
            random.shuffle(file_path_list)

            train_num = int(file_path_list_length * self.train_ratio)
            test_num = int(file_path_list_length * self.test_radio)

            self.train_file_path.append([self.index_label_dict[index], file_path_list[: train_num]])
            self.test_file_path.append([self.index_label_dict[index], file_path_list[train_num:train_num + test_num]])
            self.valid_file_path.append([self.index_label_dict[index], file_path_list[train_num + test_num:]])

    def start_splitting(self):
        self.__split_dataset()
        self.__copy_files(type_path=self.train_file_path, type_saved_dir=self.saved_train_dir)
        self.__copy_files(type_path=self.valid_file_path, type_saved_dir=self.saved_valid_dir)
        self.__copy_files(type_path=self.test_file_path, type_saved_dir=self.saved_test_dir)


if __name__ == '__main__':
    split_dataset = SplitDataset(dataset_dir="original_dataset",
                                 saved_dataset_dir="dataset",
                                 show_progress=True)
    split_dataset.start_splitting()

 数据集制作完毕后,再进行加载和读取数据,prepare_data.py,该部分有很多超参数在config中,最后再介绍。

import tensorflow as tf
import config
import pathlib
from config import image_height, image_width, channels


def load_and_preprocess_image(img_path):
    # read pictures
    img_raw = tf.io.read_file(img_path)
    # decode pictures
    img_tensor = tf.image.decode_jpeg(img_raw, channels=channels)

    # resize
    img_tensor = tf.image.resize(img_tensor, [image_height, image_width])
    img_tensor = tf.cast(img_tensor, tf.float32)
    # normalization
    img = img_tensor / 255.0
    return img

def get_images_and_labels(data_root_dir):
    # get all images' paths (format: string)
    data_root = pathlib.Path(data_root_dir)
    all_image_path = [str(path) for path in list(data_root.glob('*/*'))]
    # get labels' names
    label_names = sorted(item.name for item in data_root.glob('*/'))
    # dict: {label : index}
    label_to_index = dict((label, index) for index, label in enumerate(label_names))
    # get all images' labels
    all_image_label = [label_to_index[pathlib.Path(single_image_path).parent.name] for single_image_path in all_image_path]

    return all_image_path, all_image_label


def get_dataset(dataset_root_dir):
    all_image_path, all_image_label = get_images_and_labels(data_root_dir=dataset_root_dir)
    # load the dataset and preprocess images
    image_dataset = tf.data.Dataset.from_tensor_slices(all_image_path).map(load_and_preprocess_image)
    label_dataset = tf.data.Dataset.from_tensor_slices(all_image_label)
    dataset = tf.data.Dataset.zip((image_dataset, label_dataset))
    image_count = len(all_image_path)

    return dataset, image_count


def generate_datasets():
    train_dataset, train_count = get_dataset(dataset_root_dir=config.train_dir)
    valid_dataset, valid_count = get_dataset(dataset_root_dir=config.valid_dir)
    test_dataset, test_count = get_dataset(dataset_root_dir=config.test_dir)


    # read the original_dataset in the form of batch
    train_dataset = train_dataset.shuffle(buffer_size=train_count).batch(batch_size=config.BATCH_SIZE)
    valid_dataset = valid_dataset.batch(batch_size=config.BATCH_SIZE)
    test_dataset = test_dataset.batch(batch_size=config.BATCH_SIZE)
    return train_dataset, valid_dataset, test_dataset, train_count, valid_count, test_count
def generate_test_dataset(path):
    #path=[r"E:\garbage\Crop\test\188.jpg"]
    label=[18]
    img=load_and_preprocess_image(path[0])


    image_dataset = tf.data.Dataset.from_tensor_slices(path).map(load_and_preprocess_image)
    #image_dataset = tf.data.Dataset.from_tensor_slices(img)
    label_dataset = tf.data.Dataset.from_tensor_slices(label)
    dataset = tf.data.Dataset.zip((image_dataset, label_dataset))
    test_dataset = dataset.batch(batch_size=1)
    return test_dataset
if __name__=="__main__":
    img=load_and_preprocess_image("pic/1/wait.png")
    print(img)

网络模型的搭建,tensorflow2.0搭建网络很方便。我选择vgg16。

import tensorflow as tf
from tensorflow.keras import layers,models,Model,Sequential

def vgg16(im_height=224,im_width=224,class_num=1000):
    #input_image=layers.Input(shape=(im_height,im_width,3),dtype="float32")
    input_image = tf.keras.Input(shape=(im_height, im_width, 3), dtype="float32")
    x=layers.Conv2D(64,kernel_size=3,strides=1,padding="SAME",activation='relu',name="conv2d_1")(input_image)
    x=layers.MaxPool2D(pool_size=3,strides=2,padding="SAME",name="maxpool_1")(x)
    x=layers.Conv2D(128,kernel_size=3,strides=1,padding="SAME",activation='relu',name='conv2d_2')(x)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_2")(x)
    x = layers.Conv2D(256, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_3.1')(x)
    x = layers.Conv2D(256, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_3.2')(x)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_3")(x)
    x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_4.1')(x)
    x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_4.2')(x)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_4")(x)
    x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_5.1')(x)
    x = layers.Conv2D(512, kernel_size=3, strides=1, padding="SAME", activation='relu', name='conv2d_5.2')(x)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_5")(x)
    x=layers.Flatten(name="output_flatten")(x)
    x=layers.Dense(4096,name="output_dense_1")(x)
    x = layers.Dense(4096, name="output_dense_2")(x)
    x = layers.Dense(class_num, name="output_dense_3",activation=tf.keras.activations.softmax)(x)
    model=Model(inputs=input_image,outputs=x)
    return model
if __name__=="__main__":
    model=vgg16()
    model.summary()

最后就是训练模型了,train.py中包括,损失函数优化器定义,训练阶段、测试阶段,数据和模型的导出。

from __future__ import absolute_import, division, print_function
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

from models import vgg
import config
from prepare_data import generate_datasets
import math





if __name__ == '__main__':
    # GPU settings



    # get the original_dataset
    train_dataset, valid_dataset, test_dataset, train_count, valid_count, test_count = generate_datasets()


    # create model
    model=vgg.vgg16(class_num=5)
    # define loss and optimizer
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
    optimizer = tf.keras.optimizers.Adadelta()

    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

    valid_loss = tf.keras.metrics.Mean(name='valid_loss')
    valid_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='valid_accuracy')

    @tf.function
    def train_step(images, labels):
        with tf.GradientTape() as tape:
            predictions = model(images, training=True)
            loss = loss_object(y_true=labels, y_pred=predictions)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(grads_and_vars=zip(gradients, model.trainable_variables))

        train_loss(loss)
        train_accuracy(labels, predictions)

    @tf.function
    def valid_step(images, labels):
        predictions = model(images, training=False)
        v_loss = loss_object(labels, predictions)
        valid_loss(v_loss)
        valid_accuracy(labels, predictions)
    # start training
    for epoch in range(config.EPOCHS):
        train_loss.reset_states()
        train_accuracy.reset_states()
        valid_loss.reset_states()
        valid_accuracy.reset_states()
        step = 0
        for images, labels in train_dataset:
            step += 1
            train_step(images, labels)
            print("Epoch: {}/{}, step: {}/{}, loss: {:.5f}, accuracy: {:.5f}".format(epoch + 1,
                                                                                     config.EPOCHS,
                                                                                     step,
                                                                                     math.ceil(train_count / config.BATCH_SIZE),
                                                                                     train_loss.result(),
                                                                                     train_accuracy.result()))
        for valid_images, valid_labels in valid_dataset:
            valid_step(valid_images, valid_labels)

        print("Epoch: {}/{}, train loss: {:.5f}, train accuracy: {:.5f}, "
              "valid loss: {:.5f}, valid accuracy: {:.5f}".format(epoch + 1,
                                                                  config.EPOCHS,
                                                                  train_loss.result(),
                                                                  train_accuracy.result(),
                                                                  valid_loss.result(),
                                                                  valid_accuracy.result()))

    model.save_weights(filepath=config.save_model_dir, save_format='tf')

最后补上放置参数的config.py,有些文件夹目录参数可根据自行更改。

# some training parameters
EPOCHS = 100
BATCH_SIZE = 8
NUM_CLASSES = 5
image_height = 224
image_width = 224
channels = 3
save_model_dir = "saved_model/model"
dataset_dir = "dataset/"
train_dir = dataset_dir + "train"
valid_dir = dataset_dir + "valid"
test_dir = dataset_dir + "test"
test_our_dir=r"E:\garbage\Crop\test"

训练完成的模型链接如下,假如你想测试单张图片的效果,可以通过evalute.py测试,这里的Num_CLASSES5.txt是五个类别,与模型文件下载链接在一起。

import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
import config
from prepare_data import generate_datasets,generate_test_dataset
from train import get_model
import  matplotlib.pyplot as plt
import matplotlib
from models import vgg
matplotlib.rcParams['font.sans-serif'] = ['KaiTi']
import os
f=open("dataset/Num_CLASSES5.txt","r")
label_list=[]
for i in f.readlines():
    i=i.strip()
    label_list.append(i)
print(label_list)

if __name__ == '__main__':

    # GPU settings


    # get the original_dataset

    test_dataset=generate_test_dataset(path=[r"dataset/test/hat/254.jpg"])
    model = vgg.vgg16(class_num=5)
    model.load_weights(filepath=config.save_model_dir)

    # Get the accuracy on the test set
    loss_object = tf.keras.metrics.SparseCategoricalCrossentropy()
    test_loss = tf.keras.metrics.Mean()
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()


    def test_step(images):
        predictions = model(images, training=False)
        return  predictions

    for test_images, test_labels in test_dataset:
        pr=test_step(test_images)
        print(pr)
        label=label_list[int(tf.argmax(pr,1))]
        print(label)
        plt.imshow(test_images[0])
        plt.text(1, 1, label,fontsize=30)
        plt.show()

上一篇:pytorch学习---dataset


下一篇:人工智能与机器学习(三)-----决策树算法挑选好西瓜学习