TensorFlow匯入資料詳解

NO IMAGE

原文地址:https://www.tensorflow.org/programmers_guide/datasets

tf.data API 可以讓人從簡單可複用的piece中建立複雜的輸入管道。例如,一個對於影象模型的管道,可能要從分散式檔案系統中整合資料,對每個圖片產生隨機干擾,並且合併這些隨機選擇的圖片到batch中用於訓練;一個對於文字模型的管道可能涉及到從原始文字資料中提取符合, 利用查詢表將它們轉化成embedding identifier,並且整合不同長度的序列。

tf.data API 簡化了大量資料、不同資料格式以及複雜轉化的處理。

tf.data API帶來了TensorFlow的兩種新抽象:

  • tf.data.Dataset : 表示元素的序列,其中每個元素包含了一個或多個Tensor物件。例如,一個影象資料管道中,一個元素可能是一個具有一對張量表示其影象資料和標籤的訓練樣本 。有兩個不同的方法建立dataset :
    • 創造source (例如Dataset.from_tensor_slices()) 從一個或多個tf.Tensor物件中構建dataset
    • 使用transformation(例如Dataset.batch()) 從一個或多個tf.data.Dataset物件中構建dataset
  • tf.data.Iterator : 是從dataset中提取元素的主要方式。該操作通過 Iterator.get_next() yield出 Dataset的下一個元素,一般作為輸入管道和模型之間的介面。最簡單的迭代器是 “one-shot iterator” ,用來迭代一次某個特定的Dataset。更復雜的使用中,Iterator.initializer操作可以用不同的dataset重新初始並且引數化一個迭代器,可以以此在一個程式中訓練和驗證資料多次。

基礎機制

啟動一個輸入管道,需要首先定義source 。 為了從記憶體的張量中構建Dataset,可以使用 tf.data.Dataset.from_tensors() 或者
tf.data.Dataset.from_tensor_slices(),再或者,如果資料在硬碟上以推薦的TFRecord格式儲存,可以構建一個tf.data.TFRecordDataset

擁有了Dataset物件以後,可以將它們轉化為新的Dataset。例如Dataset.map()Dataset.batch()

最常用於使用Dataset的方式是使用一個迭代器(例如 Dataset.make_one_shot_iterator())。

tf.data.Iterator提供兩種操作:

  • Iterator.initializer (重新)初始化迭代器
  • Iterator.get_next() 返回下一個元素的tf.Tensor物件

Dataset構建

dataset包括了含有相同結構的元素。一個元素包含一個或多個tf.Tensor物件,稱作元件。每個元件有一個tf.DType 表示元素的型別;還有一個tf.TensorShape表示元素的形狀。可以用Dataset.output_typesDataset.output_shapes來檢查:

dataset1 = tf.data.Dataset.from_tensor_slices(tf.random_uniform([4, 10]))
print(dataset1.output_types)  # ==> "tf.float32"
print(dataset1.output_shapes)  # ==> "(10,)"
dataset2 = tf.data.Dataset.from_tensor_slices(
(tf.random_uniform([4]),
tf.random_uniform([4, 100], maxval=100, dtype=tf.int32)))
print(dataset2.output_types)  # ==> "(tf.float32, tf.int32)"
print(dataset2.output_shapes)  # ==> "((), (100,))"
dataset3 = tf.data.Dataset.zip((dataset1, dataset2))
print(dataset3.output_types)  # ==> (tf.float32, (tf.float32, tf.int32))
print(dataset3.output_shapes)  # ==> "(10, ((), (100,)))"

給元素中的元件命名也很方便 :

dataset = tf.data.Dataset.from_tensor_slices(
{"a": tf.random_uniform([4]),
"b": tf.random_uniform([4, 100], maxval=100, dtype=tf.int32)})
print(dataset.output_types)  # ==> "{'a': tf.float32, 'b': tf.int32}"
print(dataset.output_shapes)  # ==> "{'a': (), 'b': (100,)}"  

以及轉變操作 :

dataset1 = dataset1.map(lambda x: ...)
dataset2 = dataset2.flat_map(lambda x, y: ...)
# Note: Argument destructuring is not available in Python 3.
dataset3 = dataset3.filter(lambda x, (y, z): ...)

建立迭代器

有了表示輸入資料的Dataset後,下一步是建立可以得到其中元素的迭代器,隨著複雜程度的提高,tf.data API 提供以下迭代器:

  • one-shot
  • initializable
  • reinitializable
  • feedable

one-shot是最簡單的迭代器,不需要明確的初始化,迭代一次,但不支援引數化:

dataset = tf.data.Dataset.range(100)
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
for i in range(100):
value = sess.run(next_element)
assert i == value

目前,one-shot迭代器是唯一易於在 Estimator上使用的型別

initializable迭代器需要在使用前進行iterator.initializer的操作,雖然不方便,但支援引數化,可以使用一個或多個 tf.placeholder() 在初始化迭代器時佔位:

max_value = tf.placeholder(tf.int64, shape=[])
dataset = tf.data.Dataset.range(max_value)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
# Initialize an iterator over a dataset with 10 elements.
sess.run(iterator.initializer, feed_dict={max_value: 10})
for i in range(10):
value = sess.run(next_element)
assert i == value
# Initialize the same iterator over a dataset with 100 elements.
sess.run(iterator.initializer, feed_dict={max_value: 100})
for i in range(100):
value = sess.run(next_element)
assert i == value

reinitializable迭代器可以從多個不同的Dataset物件中初始化。例如,一個使用隨機干擾以提升泛化能力的訓練輸入管道,和一個評估未修改資料上的預測的驗證輸入管道。這些管道一般使用不同的Dataset物件,但是這些物件有相同的結構:

# Define training and validation datasets with the same structure.
training_dataset = tf.data.Dataset.range(100).map(
lambda x: x   tf.random_uniform([], -10, 10, tf.int64))
validation_dataset = tf.data.Dataset.range(50)
# A reinitializable iterator is defined by its structure. We could use the
# `output_types` and `output_shapes` properties of either `training_dataset`
# or `validation_dataset` here, because they are compatible.
iterator = tf.data.Iterator.from_structure(training_dataset.output_types,
training_dataset.output_shapes)
next_element = iterator.get_next()
training_init_op = iterator.make_initializer(training_dataset)
validation_init_op = iterator.make_initializer(validation_dataset)
# Run 20 epochs in which the training dataset is traversed, followed by the
# validation dataset.
for _ in range(20):
# Initialize an iterator over the training dataset.
sess.run(training_init_op)
for _ in range(100):
sess.run(next_element)
# Initialize an iterator over the validation dataset.
sess.run(validation_init_op)
for _ in range(50):
sess.run(next_element)

feedable 迭代器可以和tf.placeholder一起使用,以通過相似的feed_dict機制,選擇每一次tf.Session.run中的迭代器。它提供和reinitializable 迭代器相同的功能,但是不需要在選擇迭代器啟動dataset時就初始化迭代器:

# Define training and validation datasets with the same structure.
training_dataset = tf.data.Dataset.range(100).map(
lambda x: x   tf.random_uniform([], -10, 10, tf.int64)).repeat()
validation_dataset = tf.data.Dataset.range(50)
# A feedable iterator is defined by a handle placeholder and its structure. We
# could use the `output_types` and `output_shapes` properties of either
# `training_dataset` or `validation_dataset` here, because they have
# identical structure.
handle = tf.placeholder(tf.string, shape=[])
iterator = tf.data.Iterator.from_string_handle(
handle, training_dataset.output_types, training_dataset.output_shapes)
next_element = iterator.get_next()
# You can use feedable iterators with a variety of different kinds of iterator
# (such as one-shot and initializable iterators).
training_iterator = training_dataset.make_one_shot_iterator()
validation_iterator = validation_dataset.make_initializable_iterator()
# The `Iterator.string_handle()` method returns a tensor that can be evaluated
# and used to feed the `handle` placeholder.
training_handle = sess.run(training_iterator.string_handle())
validation_handle = sess.run(validation_iterator.string_handle())
# Loop forever, alternating between training and validation.
while True:
# Run 200 steps using the training dataset. Note that the training dataset is
# infinite, and we resume from where we left off in the previous `while` loop
# iteration.
for _ in range(200):
sess.run(next_element, feed_dict={handle: training_handle})
# Run one pass over the validation dataset.
sess.run(validation_iterator.initializer)
for _ in range(50):
sess.run(next_element, feed_dict={handle: validation_handle})

從迭代器中獲取資料

使用 Iterator.get_next() 方法。

當迭代器到達dataset尾部時,執行Iterator.get_next() 會raise一個tf.errors.OutOfRangeError。這個迭代器就處於不可用狀態,必須重新初始化才可以使用。

dataset = tf.data.Dataset.range(5)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
# Typically `result` will be the output of a model, or an optimizer's
# training operation.
result = tf.add(next_element, next_element)
sess.run(iterator.initializer)
print(sess.run(result))  # ==> "0"
print(sess.run(result))  # ==> "2"
print(sess.run(result))  # ==> "4"
print(sess.run(result))  # ==> "6"
print(sess.run(result))  # ==> "8"
try:
sess.run(result)
except tf.errors.OutOfRangeError:
print("End of dataset")  # ==> "End of dataset"

一個常見的方式是使用try-except 塊:

sess.run(iterator.initializer)
while True:
try:
sess.run(result)
except tf.errors.OutOfRangeError:
break

如果每個元素都有巢狀結構,那麼 Iterator.get_next() 會返回相同結構的一個或多個 tf.Tensor 物件:

dataset1 = tf.data.Dataset.from_tensor_slices(tf.random_uniform([4, 10]))
dataset2 = tf.data.Dataset.from_tensor_slices((tf.random_uniform([4]), tf.random_uniform([4, 100])))
dataset3 = tf.data.Dataset.zip((dataset1, dataset2))
iterator = dataset3.make_initializable_iterator()
sess.run(iterator.initializer)
next1, (next2, next3) = iterator.get_next()

評估next1,next2,next3中的任意一個,都將推動迭代器

讀入輸入資料

使用Numpy陣列

如果所有的輸入資料都在記憶體裡,從中建立dataset最簡單的方法是將它們轉化為tf.Tensor物件,然後用Dataset.from_tensor_slices() :

# Load the training data into two NumPy arrays, for example using `np.load()`.
with np.load("/var/data/training_data.npy") as data:
features = data["features"]
labels = data["labels"]
# Assume that each row of `features` corresponds to the same row as `labels`.
assert features.shape[0] == labels.shape[0]
dataset = tf.data.Dataset.from_tensor_slices((features, labels))

以上程式碼將feature和label陣列以 tf.constant() 操作組合。這個適用於小資料集,但是浪費記憶體——會將陣列內容複製多次,並且可能會遇到tf.GraphDef規定的2GB快取限制。

作為替代,可以用tf.placeholder() 張量來定義Dataset , 並在迭代器初始化時,把Numpy陣列feed進去。

# Load the training data into two NumPy arrays, for example using `np.load()`.
with np.load("/var/data/training_data.npy") as data:
features = data["features"]
labels = data["labels"]
# Assume that each row of `features` corresponds to the same row as `labels`.
assert features.shape[0] == labels.shape[0]
features_placeholder = tf.placeholder(features.dtype, features.shape)
labels_placeholder = tf.placeholder(labels.dtype, labels.shape)
dataset = tf.data.Dataset.from_tensor_slices((features_placeholder, labels_placeholder))
# [Other transformations on `dataset`...]
dataset = ...
iterator = dataset.make_initializable_iterator()
sess.run(iterator.initializer, feed_dict={features_placeholder: features,
labels_placeholder: labels})

使用TFRecord資料

# Creates a dataset that reads all of the examples from two files.
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)

filenames引數可以是一個字串、一個字串列表或者一個字串的tf.Tensor物件。因此,如果有兩組檔案分別用於訓練和驗證,可以使用tf.placeholder(tf.string) 來表示filenames,並且以適當的filename啟動迭代器 :

filenames = tf.placeholder(tf.string, shape=[None])
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)  # Parse the record into tensors.
dataset = dataset.repeat()  # Repeat the input indefinitely.
dataset = dataset.batch(32)
iterator = dataset.make_initializable_iterator()
# You can feed the initializer with the appropriate filenames for the current
# phase of execution, e.g. training vs. validation.
# Initialize `iterator` with training data.
training_filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
sess.run(iterator.initializer, feed_dict={filenames: training_filenames})
# Initialize `iterator` with validation data.
validation_filenames = ["/var/data/validation1.tfrecord", ...]
sess.run(iterator.initializer, feed_dict={filenames: validation_filenames})

使用文字資料

許多資料集分佈成一個或多個文字檔案,tf.data.TextLineDataset 提供了簡單方式從一個或多個檔案中提取行。

filenames = ["/var/data/file1.txt", "/var/data/file2.txt"]
dataset = tf.data.TextLineDataset(filenames)  

TextLineDataset 預設yield每個檔案的每一行,有時可能不必如此,例如有些檔案從一個標題欄開始,或者包含評論:這些行可以用 Dataset.skip()Dataset.filter() 去除。為了在每個檔案上分別應用這些轉化,我們使用Dataset.flat_map() 為每個檔案創造一個巢狀的資料集。

filenames = ["/var/data/file1.txt", "/var/data/file2.txt"]
dataset = tf.data.Dataset.from_tensor_slices(filenames)
# Use `Dataset.flat_map()` to transform each file as a separate nested dataset,
# and then concatenate their contents sequentially into a single "flat" dataset.
# * Skip the first line (header row).
# * Filter out lines beginning with "#" (comments).
dataset = dataset.flat_map(
lambda filename: (
tf.data.TextLineDataset(filename)
.skip(1)
.filter(lambda line: tf.not_equal(tf.substr(line, 0, 1), "#"))))

使用Dataset.map() 預處理資料

Dataset.map(f) 在每一個元素上進行所給函式f的操作來產生一個新資料集。函式f接受一個元素(tf.Tensor 物件)的輸入 , 然後返回一個元素(tf.Tensor 物件)成為新資料集。

解析 tf.Example 協議緩衝區資訊

許多輸入管道從TFRecord格式檔案中提取協議緩衝區資訊(例如使用tf.python_io.TFRecordWriter 寫入),每個tf.train.Example 記錄包含一個或多個“features”,輸入管道一般將這些features轉化為張量。

# Transforms a scalar string `example_proto` into a pair of a scalar string and
# a scalar integer, representing an image and its label, respectively.
def _parse_function(example_proto):
features = {"image": tf.FixedLenFeature((), tf.string, default_value=""),
"label": tf.FixedLenFeature((), tf.int32, default_value=0)}
parsed_features = tf.parse_single_example(example_proto, features)
return parsed_features["image"], parsed_features["label"]
# Creates a dataset that reads all of the examples from two files, and extracts
# the image and label features.
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(_parse_function)

解碼影象資料並調整大小

當訓練用於影象資料的神經網路時,經常需要把不同大小的圖片轉化成統一大小。

# Reads an image from a file, decodes it into a dense tensor, and resizes it
# to a fixed shape.
def _parse_function(filename, label):
image_string = tf.read_file(filename)
image_decoded = tf.image.decode_image(image_string)
image_resized = tf.image.resize_images(image_decoded, [28, 28])
return image_resized, label
# A vector of filenames.
filenames = tf.constant(["/var/data/image1.jpg", "/var/data/image2.jpg", ...])
# `labels[i]` is the label for the image in `filenames[i].
labels = tf.constant([0, 37, ...])
dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(_parse_function)

tf.py_func() 使用任意的python邏輯

有時解析資料時需要使用外部python庫,此時可以在Dataset.map() 上用tf.py_func() 操作呼叫:

import cv2
# Use a custom OpenCV function to read the image, instead of the standard
# TensorFlow `tf.read_file()` operation.
def _read_py_function(filename, label):
image_decoded = cv2.imread(filename.decode(), cv2.IMREAD_GRAYSCALE)
return image_decoded, label
# Use standard TensorFlow operations to resize the image to a fixed shape.
def _resize_function(image_decoded, label):
image_decoded.set_shape([None, None, None])
image_resized = tf.image.resize_images(image_decoded, [28, 28])
return image_resized, label
filenames = ["/var/data/image1.jpg", "/var/data/image2.jpg", ...]
labels = [0, 37, 29, 1, ...]
dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(
lambda filename, label: tuple(tf.py_func(
_read_py_function, [filename, label], [tf.uint8, label.dtype])))
dataset = dataset.map(_resize_function)

批處理資料集元素

簡單批處理

最簡單的批處理將n個連續的元素堆疊進一個元素中。Dataset.batch() 即是如此,有著與 tf.stack() 操作相同的限制,在元素的每個元件上應用操作:例如對於每個元件i,所有的元素都必須有相同大小的張量。

inc_dataset = tf.data.Dataset.range(100)
dec_dataset = tf.data.Dataset.range(0, -100, -1)
dataset = tf.data.Dataset.zip((inc_dataset, dec_dataset))
batched_dataset = dataset.batch(4)
iterator = batched_dataset.make_one_shot_iterator()
next_element = iterator.get_next()
print(sess.run(next_element))  # ==> ([0, 1, 2,   3],   [ 0, -1,  -2,  -3])
print(sess.run(next_element))  # ==> ([4, 5, 6,   7],   [-4, -5,  -6,  -7])
print(sess.run(next_element))  # ==> ([8, 9, 10, 11],   [-8, -9, -10, -11])  

使用填充批處理張量

上面的方法適用於相同大小的張量。然而很多模型的輸入資料會是不同大小,Dataset.padded_batch() 操作可以通過指定填充它們的一個或多個尺寸來批量處理不同形狀的張量。

dataset = tf.data.Dataset.range(100)
dataset = dataset.map(lambda x: tf.fill([tf.cast(x, tf.int32)], x))
dataset = dataset.padded_batch(4, padded_shapes=[None])
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
print(sess.run(next_element))  # ==> [[0, 0, 0], [1, 0, 0], [2, 2, 0], [3, 3, 3]]
print(sess.run(next_element))  # ==> [[4, 4, 4, 4, 0, 0, 0],
#      [5, 5, 5, 5, 5, 0, 0],
#      [6, 6, 6, 6, 6, 6, 0],
#      [7, 7, 7, 7, 7, 7, 7]]

訓練工作流

處理多次

tf.data API 提供兩種主要方式來多次處理相同資料。

最簡單的方式是使用 Dataset.repeat() 轉換,例如建立一個資料集重複輸入10次:

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.repeat(10)
dataset = dataset.batch(32)

不提供引數的Dataset.repeat() 會無期限的重複輸入。Dataset.repeat() 轉換將其引數連線起來,而不用指示一次迭代的結束和下一次迭代的開始。

如果想在每一次迭代結束時收到訊號,可以寫一個迴圈catch tf.errors.OutOfRangeError

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.batch(32)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
# Compute for 100 epochs.
for _ in range(100):
sess.run(iterator.initializer)
while True:
try:
sess.run(next_element)
except tf.errors.OutOfRangeError:
break
# [Perform end-of-epoch calculations here.]

隨機切割輸入資料

Dataset.shuffle() 轉換採取與tf.RandomShuffleQueue 相似的演算法來隨機切割輸入資料:它維護一個固定大小的緩衝區,並從該緩衝區隨機選擇下一個元素

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat()

使用高階API

tf.train.MonitoredTrainingSession API 簡化了在分散式環境中執行TensorFlow的許多方面。它使用 tf.errors.OutOfRangeError 來表示訓練已經完成,因此若要將其與tf.data API結合使用,推薦使用Dataset.make_one_shot_iterator()

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
next_example, next_label = iterator.get_next()
loss = model_function(next_example, next_label)
training_op = tf.train.AdagradOptimizer(...).minimize(loss)
with tf.train.MonitoredTrainingSession(...) as sess:
while not sess.should_stop():
sess.run(training_op)

要在tf.estimator.Estimatorinput_fn函式中使用 Dataset ,同樣推薦使用Dataset.make_one_shot_iterator()

def dataset_input_fn():
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
# Use `tf.parse_single_example()` to extract data from a `tf.Example`
# protocol buffer, and perform any additional per-record preprocessing.
def parser(record):
keys_to_features = {
"image_data": tf.FixedLenFeature((), tf.string, default_value=""),
"date_time": tf.FixedLenFeature((), tf.int64, default_value=""),
"label": tf.FixedLenFeature((), tf.int64,
default_value=tf.zeros([], dtype=tf.int64)),
}
parsed = tf.parse_single_example(record, keys_to_features)
# Perform additional preprocessing on the parsed data.
image = tf.image.decode_jpeg(parsed["image_data"])
image = tf.reshape(image, [299, 299, 1])
label = tf.cast(parsed["label"], tf.int32)
return {"image_data": image, "date_time": parsed["date_time"]}, label
# Use `Dataset.map()` to build a pair of a feature dictionary and a label
# tensor for each example.
dataset = dataset.map(parser)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
# `features` is a dictionary in which each value is a batch of values for
# that feature; `labels` is a batch of labels.
features, labels = iterator.get_next()
return features, labels