Tensorflow数据读取指南

栏目: 编程工具 · 发布时间: 7年前

内容简介:tensorflow的灵活性带来的学习成本是很多人头疼的问题,在tf中,读取数据基本有四种方法:1. tf.data (官方推荐):方便地构建复杂的输入管道2. Feeding:通过input_fn来yield数据

tensorflow的灵活性带来的学习成本是很多人头疼的问题,在tf中,读取数据基本有四种方法:

1. tf.data (官方推荐):方便地构建复杂的输入管道

2. Feeding:通过input_fn来yield数据

3. QueueRunner:基于队列的输入管道

4. 预加载数据。用constant或variable在内存中存储所有的数据

其实从tf的历史发展轨迹可以看出,tf.data是在feedintg技术和QueueRunner的基础上发展而来。

Feeding技术

with tf.Session():
   input = tf.placeholder(tf.float32)
   classifier = ...
   print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))

简单教程中的数据加载,都是一行伪代码:(x, y) = load_data() 就搞定了

但现实中,令训练数据的使用变得复杂的主要原因有很多:

一、数据集不一致,不同的数据源的数据格式不一样,为了统一,我们需要一个转接层,我们称之为统一化过程

二、数据集过大,不适合一次性导入,需要分批次导入,边训练边导入

三、需要数据增强,在线生成,甚至完全在线生成,这样就需要将训练时的feed和读取的统一化数据之间加入一个动态生成过程

四、当然批次化(batch),train和val的划分也都是重复性的工作

tf.data就是用来解决以上问题的。

两个重要类介绍:tf.data.Dataset和tf.data.Iterator,

– Dataset用来表示一组数据,是一个序列,序列的每个元素包含一个或多个Tensor对象。

– Iterator用户从数据集中抽取元素,Iterator.get_next()可以返回数据集的下一个元素,可以看作是输入管道与模型之间的接口。

Dataset

面对输入的数据集,我们要建立一个清晰的数据流概念,一个原始的数据集开始,后续的每一次转换都是流水线式的操作。因而我们可以看到这样的代码形式:data.TextLineDataset(filenames).map(decode_func).shuffle(buffer_size=10000).batch(batch_size).make_initializable_iterator()。

Dataset的每一个元素一定是相同的结构,每个元素包含一个或多个Tensor对象,每个Tensor都有一个tf.DType来表示tensor中的元素数据类型,还有伊特tf.TensorShape来表示tensor中每个元素的形状。

Dataset的map、flat_map、filter等等方法都是常用的操作链上过滤器方法。过滤器模式是常见的设计模式。

Iterator

如果说Dataset是从源中一步一步创建出一个统一化的数据集的话,Iterator就是负责怎么访问这个统一后的数据集然后喂给你的模型。有四种迭代器

– one-shot

– initializable

– reinitializable

– feedable

one-shot最简单,就是一个元素一个元素的吐,但不能初始化,即参数化,就是遍历一遍数据集就什么也干不了了。

initializable迭代器就可以初始化,可以接受一个placeholder,作为初始化参数

max_value = tf.placeholder(tf.int64, shape=[])
 dataset = tf.data.Dataset.range(max_value)
 iterator = dataset.make_initializable_iterator()
 next_element = iterator.get_next()
 # Initialize an iterator over a dataset with 10 elements.
 sess.run(iterator.initializer, feed_dict={max_value: 10})

initializable更典型的应用方式:

# initializable iterator to switch between dataset
 EPOCHS = 10
 x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
 dataset = tf.data.Dataset.from_tensor_slices((x, y))
 train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
 test_data = (np.array([[1,2]]), np.array([[0]]))
 iter = dataset.make_initializable_iterator()
 features, labels = iter.get_next()
 with tf.Session() as sess:
 #     initialise iterator with train data
     sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
     for _ in range(EPOCHS):
         sess.run([features, labels])
 #     switch to test data
     sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1]})
     print(sess.run([features, labels]))

Dataset的创建依赖一个placeholder,所以只是表明了数据集的结构信息,具体的数据是什么还得在运行期的时候赋值。

但如果我们不想在运行期使用feed_dict怎么办呢,用reinitializable迭代器支持多个数据集对象的初始化,创建的迭代器不再是dataset.make_xxx_iterator这种形式,而是使用结构直接使用Iterator的静态方法初始化tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes)

# Reinitializable iterator to switch between Datasets
 EPOCHS = 10
 # making fake data using numpy
 train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
 test_data = (np.random.sample((10,2)), np.random.sample((10,1)))
 # create two datasets, one for training and one for test
 train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
 test_dataset = tf.data.Dataset.from_tensor_slices(test_data)
 # create a iterator of the correct shape and type
 iter = tf.data.Iterator.from_structure(train_dataset.output_types,
 train_dataset.output_shapes)
 features, labels = iter.get_next()
 # create the initialisation operations
 train_init_op = iter.make_initializer(train_dataset)
 test_init_op = iter.make_initializer(test_dataset)
 with tf.Session() as sess:
     sess.run(train_init_op) # switch to train dataset
     for _ in range(EPOCHS):
         sess.run([features, labels])
     sess.run(test_init_op) # switch to val dataset
     print(sess.run([features, labels]))

feedable结合了initializable和reinitializable两者,可以使用placeholder和多个数据集一起初始化。如果说reinitializable方便在数据集间切换,那么feedable就是方便在迭代器间切换。

# Define training and validation datasets with the same structure.
 training_dataset = tf.data.Dataset.range(100).map(
     lambda x: x + tf.random_uniform([], -10, 10, tf.int64)).repeat()
 validation_dataset = tf.data.Dataset.range(50)
 #
 # A feedable iterator is defined by a handle placeholder and its structure. We
 # could use the `output_types` and `output_shapes` properties of either
 # `training_dataset` or `validation_dataset` here, because they have
 # identical structure.
 handle = tf.placeholder(tf.string, shape=[])
 iterator = tf.data.Iterator.from_string_handle(
     handle, training_dataset.output_types, training_dataset.output_shapes)
 #
 next_element = iterator.get_next()
 #
 # You can use feedable iterators with a variety of different kinds of iterator
 # (such as one-shot and initializable iterators).
 training_iterator = training_dataset.make_one_shot_iterator()
 validation_iterator = validation_dataset.make_initializable_iterator()
 #
 # The `Iterator.string_handle()` method returns a tensor that can be evaluated
 # and used to feed the `handle` placeholder.
 training_handle = sess.run(training_iterator.string_handle())
 validation_handle = sess.run(validation_iterator.string_handle())
 #
 # Loop forever, alternating between training and validation.
 while True:
   # Run 200 steps using the training dataset. Note that the training dataset is
   # infinite, and we resume from where we left off in the previous `while` loop
   # iteration.
   for _ in range(200):
     sess.run(next_element, feed_dict={handle: training_handle})
   #
   # Run one pass over the validation dataset.
   sess.run(validation_iterator.initializer)
   for _ in range(50):
     sess.run(next_element, feed_dict={handle: validation_handle})

但怎么说呢,tensorflow的设计师一如既往地发挥了他们接口设计纷繁复杂的能力,真的是极其混乱难用,一点也不清晰,要不然tensorflow里也不会引入keras包了。

数据的消费

EPOCHS = 10
 BATCH_SIZE = 16
 # using two numpy arrays
 features, labels = (np.array([np.random.sample((100,2))]),
                     np.array([np.random.sample((100,1))]))
 dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE)
 iter = dataset.make_one_shot_iterator()
 x, y = iter.get_next()
 # make a simple model
 net = tf.layers.dense(x, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input
 net = tf.layers.dense(net, 8, activation=tf.tanh)
 prediction = tf.layers.dense(net, 1, activation=tf.tanh)
 loss = tf.losses.mean_squared_error(prediction, y) # pass the second value from iter.get_net() as label
 train_op = tf.train.AdamOptimizer().minimize(loss)
 with tf.Session() as sess:
     sess.run(tf.global_variables_initializer())
     for i in range(EPOCHS):
         _, loss_value = sess.run([train_op, loss])
         print("Iter: {}, Loss: {:.4f}".format(i, loss_value))

其中x, y = iter.get_next()表示x,y都是一个获取数据的操作,每次sess.run就是执行一次x(), y()这样就获得一个样本数据。

kera使用数据集就特别简单

dataset = tf.data.Dataset.from_tensor_slices((data, labels))
 dataset = dataset.batch(32).repeat()
 #
 val_dataset = tf.data.Dataset.from_tensor_slices((val_data, val_labels))
 val_dataset = val_dataset.batch(32).repeat()
 #
 model.fit(dataset, epochs=10, steps_per_epoch=30,
           validation_data=val_dataset,
           validation_steps=3)

过滤器函数

从原始数据变成统一的数据格式,最重要的就是map

def input_parser(img_path, label):
     # convert the label to one-hot encoding
     one_hot = tf.one_hot(label, NUM_CLASSES)
     #
     # read the img from file
     img_file = tf.read_file(img_path)
     img_decoded = tf.image.decode_image(img_file, channels=3)
     #
     return img_decoded, one_hot
 
 tr_data = tr_data.map(input_parser)

input_parser接受了数据集每个元素,将转换完的结果返回,数据集就脱胎换骨了。

图像的转换处理

对于图像的转换处理,我们常常用到 tf.image下的decode_bmp decode_jpeg resize_images transpose_image等等函数,具体的参考[2]。

一个image文件,变成一个Tensor一般需要这样几步:

1. 获取文件名列表

2. [Optional]shuffle文件名列表

3. [Optional]根据epoch扔掉一些文件

4. 建立文件名的queue

5. 使用对应文件的Reader,不同的文件格式用不同的reader,比如TFRecordReader解析TFRecord格式,普通文件的话,就用FileReader,但读取普通文件也比较复杂,比如读单个文件可以使用tf.gfile.FastGfile(path, “r”).read(),对于filename_queue,则使用WholeFileReader(),如下面的语句

image_reader = tf.WholeFileReader()
 data_queue = tf.train.string_input_producer([image_dir], shuffle=False)
 image_key, image_value = image_reader.read(data_queue)
 img = tf.image.decode_jpeg(image_value, channels=3)

实际上还有更方便的read_file方法:

image_value = tf.read_file(image_dir)
 img = tf.image.decode_jpeg(image_value, channels=3)

以csv的读取为例,看queue的使用

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])
 #
 reader = tf.TextLineReader()
 key, value = reader.read(filename_queue)
 #
 # Default values, in case of empty columns. Also specifies the type of the
 # decoded result.
 record_defaults = [[1], [1], [1], [1], [1]]
 col1, col2, col3, col4, col5 = tf.decode_csv(
 value, record_defaults=record_defaults)
 features = tf.stack([col1, col2, col3, col4])
 #
 with tf.Session() as sess:
   # Start populating the filename queue.
   coord = tf.train.Coordinator()
   threads = tf.train.start_queue_runners(coord=coord)
   #
   for i in range(1200):
     # Retrieve a single instance:
     example, label = sess.run([features, col5])
   #
   coord.request_stop()
   coord.join(threads)

当然,google建议将任何形式的原始数据都转换成TFRecords文件。关于TFRecords文件如何制作,请参考[9]。

dataset = tf.data.TFRecordDataset(filename)
 dataset = dataset.repeat(num_epochs)
 #
 # map takes a python function and applies it to every sample
 dataset = dataset.map(decode)

6. 使用Decoder从Reader中反解成Tensor

7. [Optional] preprocessing预处理

def read_my_file_format(filename_queue):
   reader = tf.SomeReader()
   key, record_string = reader.read(filename_queue)
   example, label = tf.some_decoder(record_string)
   processed_example = some_processing(example)
   return processed_example, label
 #
 def input_pipeline(filenames, batch_size, num_epochs=None):
   filename_queue = tf.train.string_input_producer(
       filenames, num_epochs=num_epochs, shuffle=True)
   example, label = read_my_file_format(filename_queue)
   # min_after_dequeue defines how big a buffer we will randomly sample
   #   from -- bigger means better shuffling but slower start up and more
   #   memory used.
   # capacity must be larger than min_after_dequeue and the amount larger
   #   determines the maximum we will prefetch.  Recommendation:
   #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
   min_after_dequeue = 10000
   capacity = min_after_dequeue + 3 * batch_size
   example_batch, label_batch = tf.train.shuffle_batch(
       [example, label], batch_size=batch_size, capacity=capacity,
       min_after_dequeue=min_after_dequeue)
   return example_batch, label_batch

参考资料

[1] tf.data指引 https://www.tensorflow.org/guide/datasets

[2] tensorflow Images处理指引 https://www.tensorflow.org/api_guides/python/image

[3] 使用coordinator加载图像 https://gist.github.com/eerwitt/518b0c9564e500b4b50f

[4] 使用Tensorlofw读取大数据踩坑之路 https://zhuanlan.zhihu.com/p/28450111

[5] Example of TensorFlows new Input Pipeline https://kratzert.github.io/2017/06/15/example-of-tensorflows-new-input-pipeline.html

[6]load_jpeg_with_tensorflow https://gist.github.com/eerwitt/518b0c9564e500b4b50f

[7] tensorflow Inputs and Readers指引 https://www.tensorflow.org/api_guides/python/io_ops#Readers

[8]tensorflow example mnist 数据集的读取 https://github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/examples/tutorials/mnist/fully_connected_feed.py

[9] http://www.machinelearninguru.com/deep_learning/tensorflow/basics/tfrecord/tfrecord.html


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

JavaScript & jQuery

JavaScript & jQuery

David Sawyer McFarland / O Reilly / 2011-10-28 / USD 39.99

You don't need programming experience to add interactive and visual effects to your web pages with JavaScript. This Missing Manual shows you how the jQuery library makes JavaScript programming fun, ea......一起来看看 《JavaScript & jQuery》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具