深度有趣 | 27 服饰关键点定位

栏目: Python · 发布时间: 6年前

内容简介:关键点定位是一类常见而有用的任务,某种意义上可以理解为一种特征工程输入是一张图片,输出是每个关键点的x、y坐标,一般会归一化到0~1区间中,所以可以理解为回归问题但是直接对坐标值进行回归会导致较大误差,更好的做法是输出一个低分辨率的热图,使得关键点所在位置输出较高响应,而其他位置则输出较低响应

关键点定位是一类常见而有用的任务,某种意义上可以理解为一种特征工程

  • 人脸关键点定位,可用于人脸识别、表情识别
  • 人体骨骼关键点定位,可用于姿态估计
  • 手部关键点定位,可用于手势识别

输入是一张图片,输出是每个关键点的x、y坐标,一般会归一化到0~1区间中,所以可以理解为回归问题

但是直接对坐标值进行回归会导致较大误差,更好的做法是输出一个低分辨率的热图,使得关键点所在位置输出较高响应,而其他位置则输出较低响应

深度有趣 | 27 服饰关键点定位

CPM(Convolutional Pose Machines)的基本思想是使用多个级联的stage,每个stage包含多个CNN并且都输出热图

通过最小化每个stage的热图和ground truth之间的差距,从而得到越来越准确的关键点定位结果

深度有趣 | 27 服饰关键点定位

数据

使用天池FashionAI全球挑战赛提供的数据, fashionai.alibaba.com/

其中服饰关键点定位赛题提供的训练集包括4W多张图片,测试集包括将近1W张图片

每张图片都指定了对应的服饰类别,共5类:上衣(blouse)、外套(outwear)、连身裙(dress)、半身裙(skirt)、裤子(trousers)

深度有趣 | 27 服饰关键点定位

训练集还提供了每张图片对应的24个关键点的标注,包括x坐标、y坐标、是否可见三项信息,但并不是每类服饰都有24个关键点

关于以上数据的更多介绍可以参考以下文章, zhuanlan.zhihu.com/p/34928763

为了简化问题,以下仅使用dress类别的训练集数据训练CPM模型

实现

加载库

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import cv2
import matplotlib.pyplot as plt
%matplotlib inline
from imageio import imread, imsave
import os
import glob
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
复制代码

加载训练集和测试集

train = pd.read_csv(os.path.join('data', 'train', 'train.csv'))
train_warm = pd.read_csv(os.path.join('data', 'train_warm', 'train_warm.csv'))
test = pd.read_csv(os.path.join('data', 'test', 'test.csv'))
print(len(train), len(train_warm), len(test))

columns = train.columns
print(len(columns), columns)

train['image_id'] = train['image_id'].apply(lambda x:os.path.join('train', x))
train_warm['image_id'] = train_warm['image_id'].apply(lambda x:os.path.join('train_warm', x))
train = pd.concat([train, train_warm])
del train_warm
train.head()
复制代码

仅保留dress类别的数据

train = train[train.image_category == 'dress']
test = test[test.image_category == 'dress']
print(len(train), len(test))
复制代码

拆分标注信息中的x坐标、y坐标、是否可见

for col in columns:
    if col in ['image_id', 'image_category']:
        continue
    train[col + '_x'] = train[col].apply(lambda x:float(x.split('_')[0]))
    train[col + '_y'] = train[col].apply(lambda x:float(x.split('_')[1]))
    train[col + '_s'] = train[col].apply(lambda x:float(x.split('_')[2]))
    train.drop([col], axis=1, inplace=True)
train.head()
复制代码

将x坐标和y坐标进行归一化

features = [
    'neckline_left', 'neckline_right', 'center_front', 'shoulder_left', 'shoulder_right', 
    'armpit_left', 'armpit_right', 'waistline_left', 'waistline_right', 
    'cuff_left_in', 'cuff_left_out', 'cuff_right_in', 'cuff_right_out', 'hemline_left', 'hemline_right']

train = train.to_dict('records')
for i in tqdm(range(len(train))):
    record = train[i]
    img = imread(os.path.join('data', record['image_id']))
    h = img.shape[0]
    w = img.shape[1]
    for col in features:
        if record[col + '_s'] >= 0:
            train[i][col + '_x'] /= w
            train[i][col + '_y'] /= h
        else:
            train[i][col + '_x'] = 0
            train[i][col + '_y'] = 0
复制代码

随机选一些训练数据并绘图查看

img_size = 256
r = 10
c = 10
puzzle = np.ones((img_size * r, img_size * c, 3))
random_indexs = np.random.choice(len(train), 100)
for i in range(100):
    record = train[random_indexs[i]]
    img = imread(os.path.join('data', record['image_id']))
    img = cv2.resize(img, (img_size, img_size))
    for col in features:
        if record[col + '_s'] >= 0:
            cv2.circle(img, (int(img_size * record[col + '_x']), int(img_size * record[col + '_y'])), 3, (120, 240, 120), 2)
    img = img / 255.
    r = i // 10
    c = i % 10
    puzzle[r * img_size: (r + 1) * img_size, c * img_size: (c + 1) * img_size, :] = img
plt.figure(figsize=(15, 15))
plt.imshow(puzzle)
plt.show()
复制代码

整理数据并分割训练集和验证集

X_train = []
Y_train = []
for i in tqdm(range(len(train))):
    record = train[i]
    img = imread(os.path.join('data', record['image_id']))
    img = cv2.resize(img, (img_size, img_size))
    
    y = []
    for col in features:
        y.append([record[col + '_x'], record[col + '_y']])

    X_train.append(img)
    Y_train.append(y)

X_train = np.array(X_train)
Y_train = np.array(Y_train)
print(X_train.shape)
print(Y_train.shape)

X_train, X_valid, Y_train, Y_valid = train_test_split(X_train, Y_train, test_size=0.1)
print(X_train.shape, Y_train.shape)
print(X_valid.shape, Y_valid.shape)
复制代码

定义一些参数和模型输入

batch_size = 16
heatmap_size = 32
stages = 6
y_dim = Y_train.shape[1]

X = tf.placeholder(tf.float32, [None, img_size, img_size, 3], name='X')
Y = tf.placeholder(tf.float32, [None, heatmap_size, heatmap_size, y_dim + 1], name='Y')

def conv2d(inputs, filters, kernel_size, padding='same', activation=tf.nn.relu, name=''):
    if name:
        return tf.layers.conv2d(inputs, filters=filters, kernel_size=kernel_size, strides=1, padding=padding, 
                                activation=activation, name=name, kernel_initializer=tf.contrib.layers.xavier_initializer())
    else:
        return tf.layers.conv2d(inputs, filters=filters, kernel_size=kernel_size, strides=1, padding=padding, 
                                activation=activation, kernel_initializer=tf.contrib.layers.xavier_initializer())

def maxpool2d(inputs):
    return tf.layers.max_pooling2d(inputs, pool_size=2, strides=2, padding='valid')
复制代码

定义CPM模型,使用6个stage

stage_heatmaps = []

# sub_stage
h0 = maxpool2d(conv2d(conv2d(X, 64, 3), 64, 3))
h0 = maxpool2d(conv2d(conv2d(h0, 128, 3), 128, 3))
h0 = maxpool2d(conv2d(conv2d(conv2d(conv2d(h0, 256, 3), 256, 3), 256, 3), 256, 3))
for i in range(6):
    h0 = conv2d(h0, 512, 3)
sub_stage = conv2d(h0, 128, 3) # batch_size, 32, 32, 128

# stage_1
h0 = conv2d(sub_stage, 512, 1, padding='valid')
h0 = conv2d(h0, y_dim + 1, 1, padding='valid', activation=None, name='stage_1')
stage_heatmaps.append(h0)

# other stages
for stage in range(2, stages + 1):
    h0 = tf.concat([stage_heatmaps[-1], sub_stage], axis=3)
    for i in range(5):
        h0 = conv2d(h0, 128, 7)
    h0 = conv2d(h0, 128, 1, padding='valid')
    h0 = conv2d(h0, y_dim + 1, 1, padding='valid', activation=None, name='stage_%d' % stage)
    stage_heatmaps.append(h0)
复制代码

定义损失函数和优化器

global_step = tf.Variable(0, trainable=False)
learning_rate = tf.train.exponential_decay(0.001, global_step=global_step, decay_steps=1000, decay_rate=0.9)

losses = [0 for _ in range(stages)]
total_loss = 0
for stage in range(stages):
    losses[stage] = tf.losses.mean_squared_error(Y, stage_heatmaps[stage])
    total_loss += losses[stage]

total_loss_with_reg = total_loss + tf.contrib.layers.apply_regularization(tf.contrib.layers.l2_regularizer(1e-10), tf.trainable_variables())
total_loss = total_loss / stages
total_loss_with_reg = total_loss_with_reg / stages

optimizer = tf.contrib.layers.optimize_loss(total_loss_with_reg, global_step=global_step, learning_rate=learning_rate, 
                                            optimizer='Adam', increment_global_step=True)
复制代码

由于训练数据较少,因此定义一个数据增强的函数

def transform(X_batch, Y_batch):
    X_data = []
    Y_data = []
    
    offset = 20
    for i in range(X_batch.shape[0]):
        img = X_batch[i]
        # random rotation
        degree = int(np.random.random() * offset - offset / 2)
        rad = degree / 180 * np.pi
        mat = cv2.getRotationMatrix2D((img_size / 2, img_size / 2), degree, 1)
        img_ = cv2.warpAffine(img, mat, (img_size, img_size), borderValue=(255, 255, 255))
        # random translation
        x0 = int(np.random.random() * offset - offset / 2)
        y0 = int(np.random.random() * offset - offset / 2)
        mat = np.float32([[1, 0, x0], [0, 1, y0]])
        img_ = cv2.warpAffine(img_, mat, (img_size, img_size), borderValue=(255, 255, 255))
        # random flip
        if np.random.random() > 0.5:
            img_ = np.fliplr(img_)
            flip = True
        else:
            flip = False
    
        X_data.append(img_)
        
        points = []
        for j in range(y_dim):
            x = Y_batch[i, j, 0] * img_size
            y = Y_batch[i, j, 1] * img_size
            # random rotation
            dx = x - img_size / 2  
            dy = y - img_size / 2
            x = int(dx * np.cos(rad) + dy * np.sin(rad) + img_size / 2)  
            y = int(-dx * np.sin(rad) + dy * np.cos(rad) + img_size / 2)
            # random translation
            x += x0
            y += y0

            x = x / img_size
            y = y / img_size
            points.append([x, y])
        # random flip
        if flip:
            data = {features[j]: points[j] for j in range(y_dim)}
            points = []
            for j in range(y_dim):
                col = features[j]
                if col.find('left') >= 0:
                    col = col.replace('left', 'right')
                elif col.find('right') >= 0:
                    col = col.replace('right', 'left')
                [x, y] = data[col]
                x = 1 - x
                points.append([x, y])
            
        Y_data.append(points)
        
    X_data = np.array(X_data)
    Y_data = np.array(Y_data)
    
    # preprocess
    X_data = (X_data / 255. - 0.5) * 2
    Y_heatmap = []
    for i in range(Y_data.shape[0]):
        heatmaps = []
        invert_heatmap = np.ones((heatmap_size, heatmap_size))
        for j in range(Y_data.shape[1]):
            x0 = int(Y_data[i, j, 0] * heatmap_size)
            y0 = int(Y_data[i, j, 1] * heatmap_size)
            x = np.arange(0, heatmap_size, 1, float)
            y = x[:, np.newaxis]
            cur_heatmap = np.exp(-((x - x0) ** 2 + (y - y0) ** 2) / (2.0 * 1.0 ** 2))
            heatmaps.append(cur_heatmap)
            invert_heatmap -= cur_heatmap
        heatmaps.append(invert_heatmap)
        Y_heatmap.append(heatmaps)
    Y_heatmap = np.array(Y_heatmap)
    Y_heatmap = np.transpose(Y_heatmap, (0, 2, 3, 1)) # batch_size, heatmap_size, heatmap_size, y_dim + 1
    
    return X_data, Y_data, Y_heatmap
复制代码

查看数据增强之后的图片和关键点是否正确

X_batch = X_train[:batch_size]
Y_batch = Y_train[:batch_size]
X_data, Y_data, Y_heatmap = transform(X_batch, Y_batch)

n = int(np.sqrt(batch_size))
puzzle = np.ones((img_size * n, img_size * n, 3))
for i in range(batch_size):
    img = (X_data[i] + 1) / 2
    for j in range(y_dim):
        cv2.circle(img, (int(img_size * Y_data[i, j, 0]), int(img_size * Y_data[i, j, 1])), 3, (120, 240, 120), 2)
    r = i // n
    c = i % n
    puzzle[r * img_size: (r + 1) * img_size, c * img_size: (c + 1) * img_size, :] = img
plt.figure(figsize=(12, 12))
plt.imshow(puzzle)
plt.show()
复制代码

可以看到,在经过随机旋转、随机平移、随机水平翻转之后,图片和关键点依旧一一对应

深度有趣 | 27 服饰关键点定位

训练模型,使用early stopping

sess = tf.Session()
sess.run(tf.global_variables_initializer())

OUTPUT_DIR = 'samples'
if not os.path.exists(OUTPUT_DIR):
    os.mkdir(OUTPUT_DIR)

for stage in range(stages):
    tf.summary.scalar('loss/loss_stage_%d' % (stage + 1), losses[stage])
tf.summary.scalar('loss/total_loss', total_loss)
summary = tf.summary.merge_all()
writer = tf.summary.FileWriter(OUTPUT_DIR)

loss_valid_min = np.inf
saver = tf.train.Saver()

epochs = 100
patience = 10
for e in range(epochs):
    loss_train = []
    loss_valid = []
    
    X_train, Y_train = shuffle(X_train, Y_train)
    for i in tqdm(range(X_train.shape[0] // batch_size)):
        X_batch = X_train[i * batch_size: (i + 1) * batch_size, :, :, :]
        Y_batch = Y_train[i * batch_size: (i + 1) * batch_size, :, :]
        X_data, Y_data, Y_heatmap = transform(X_batch, Y_batch)
        _, ls, lr, stage_heatmaps_ = sess.run([optimizer, total_loss, learning_rate, stage_heatmaps], 
                                               feed_dict={X: X_data, Y: Y_heatmap})
        loss_train.append(ls)
        
        if i > 0 and i % 100 == 0:
            writer.add_summary(sess.run(summary, feed_dict={X: X_data, Y: Y_heatmap}), 
                               e * X_train.shape[0] // batch_size + i)
            writer.flush()
    loss_train = np.mean(loss_train)
    
    demo_img = (X_data[0] + 1) / 2
    demo_heatmaps = []
    for stage in range(stages):
        demo_heatmap = stage_heatmaps_[stage][0, :, :, :y_dim].reshape((heatmap_size, heatmap_size, y_dim))
        demo_heatmap = cv2.resize(demo_heatmap, (img_size, img_size))
        demo_heatmap = np.amax(demo_heatmap, axis=2)
        demo_heatmap = np.reshape(demo_heatmap, (img_size, img_size, 1))
        demo_heatmap = np.repeat(demo_heatmap, 3, axis=2)
        demo_heatmaps.append(demo_heatmap)

    demo_gt_heatmap = Y_heatmap[0, :, :, :y_dim].reshape((heatmap_size, heatmap_size, y_dim))
    demo_gt_heatmap = cv2.resize(demo_gt_heatmap, (img_size, img_size))
    demo_gt_heatmap = np.amax(demo_gt_heatmap, axis=2)
    demo_gt_heatmap = np.reshape(demo_gt_heatmap, (img_size, img_size, 1))
    demo_gt_heatmap = np.repeat(demo_gt_heatmap, 3, axis=2)

    upper_img = np.concatenate((demo_heatmaps[0], demo_heatmaps[1], demo_heatmaps[2]), axis=1)
    blend_img = 0.5 * demo_img + 0.5 * demo_gt_heatmap
    lower_img = np.concatenate((demo_heatmaps[-1], demo_gt_heatmap, blend_img), axis=1)
    demo_img = np.concatenate((upper_img, lower_img), axis=0)
    imsave(os.path.join(OUTPUT_DIR, 'sample_%d.jpg' % e), demo_img)
    
    X_valid, Y_valid = shuffle(X_valid, Y_valid)
    for i in range(X_valid.shape[0] // batch_size):
        X_batch = X_valid[i * batch_size: (i + 1) * batch_size, :, :, :]
        Y_batch = Y_valid[i * batch_size: (i + 1) * batch_size, :, :]
        X_data, Y_data, Y_heatmap = transform(X_batch, Y_batch)
        ls = sess.run(total_loss, feed_dict={X: X_data, Y: Y_heatmap})
        loss_valid.append(ls)
    loss_valid = np.mean(loss_valid)
    
    print('Epoch %d, lr %.6f, train loss %.6f, valid loss %.6f' % (e, lr, loss_train, loss_valid))
    if loss_valid < loss_valid_min:
        print('Saving model...')
        saver.save(sess, os.path.join(OUTPUT_DIR, 'cpm'))
        loss_valid_min = loss_valid
        patience = 10
    else:
        patience -= 1
        if patience == 0:
            break
复制代码

经过87个epoch之后训练提前停止,训练集损失为0.001228,验证集损失为0.001871,验证集最低损失为0.001821

训练集结果如下,上面三张图依次为stage1、stage2、stage3的结果,下面三张图依次为stage6、ground truth、groud truth和原图

深度有趣 | 27 服饰关键点定位

在本机上使用以下代码,对测试集中的图片进行关键点定位

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
import cv2
import matplotlib.pyplot as plt
from imageio import imread, imsave
import os

test = pd.read_csv(os.path.join('data', 'test', 'test.csv'))
test['image_id'] = test['image_id'].apply(lambda x:os.path.join('test', x))
test = test[test.image_category == 'dress']
test = test['image_id'].values

batch_size = 16
img_size = 256
test = shuffle(test)
test = test[:batch_size]
X_test = []
for i in range(batch_size):
    img = imread(os.path.join('data', test[i]))
    img = cv2.resize(img, (img_size, img_size))
    X_test.append(img)
X_test = np.array(X_test)
print(X_test.shape)

sess = tf.Session()
sess.run(tf.global_variables_initializer())

OUTPUT_DIR = 'samples'
saver = tf.train.import_meta_graph(os.path.join(OUTPUT_DIR, 'cpm.meta'))
saver.restore(sess, tf.train.latest_checkpoint(OUTPUT_DIR))

stages = 6
y_dim = 15
heatmap_size = 32
graph = tf.get_default_graph()
X = graph.get_tensor_by_name('X:0')
stage_heatmap = graph.get_tensor_by_name('stage_%d/BiasAdd:0' % stages)

def visualize_result(imgs, heatmap, joints):
    imgs = imgs.astype(np.int32)
    coords = []
    for i in range(imgs.shape[0]):
        hp = heatmap[i, :, :, :joints].reshape((heatmap_size, heatmap_size, joints))
        hp = cv2.resize(hp, (img_size, img_size))
        coord = np.zeros((joints, 2))

        for j in range(joints):
            xy = np.unravel_index(np.argmax(hp[:, :, j]), (img_size, img_size))
            coord[j, :] = [xy[0], xy[1]]
            cv2.circle(imgs[i], (xy[1], xy[0]), 3, (120, 240, 120), 2)

        coords.append(coord)
    
    return imgs / 255., coords

heatmap = sess.run(stage_heatmap, feed_dict={X: (X_test / 255. - 0.5) * 2})
X_test, coords = visualize_result(X_test, heatmap, y_dim) 

n = int(np.sqrt(batch_size))
puzzle = np.ones((img_size * n, img_size * n, 3))
for i in range(batch_size):
    img = X_test[i]
    r = i // n
    c = i % n
    puzzle[r * img_size: (r + 1) * img_size, c * img_size: (c + 1) * img_size, :] = img
plt.figure(figsize=(12, 12))
plt.imshow(puzzle)
plt.show()
imsave('服饰关键点定位测试集结果.jpg', puzzle)
复制代码

测试集结果如下,对于大多数情况能够得到很准确的关键点定位结果

深度有趣 | 27 服饰关键点定位

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro JavaScript Design Patterns

Pro JavaScript Design Patterns

Dustin Diaz、Ross Harmes / Apress / 2007-12-16 / USD 44.99

As a web developer, you’ll already know that JavaScript™ is a powerful language, allowing you to add an impressive array of dynamic functionality to otherwise static web sites. But there is more power......一起来看看 《Pro JavaScript Design Patterns》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换