воскресенье, 30 сентября 2018 г.

TensorFlow: импорт данных - предобработка данных

Предобработка данных с Dataset.map()

Dataset.map(f) преобразование производит новый набор данных применением переданной функции f к каждому элементу набора данных ввода. Оно основано на map() функции, которая в основном применяется к листам (и другим структурам) в функциональных языках программирования. Функция f принимает tf.Tensor объекты, которые представляют единичный элемент ввода, и возвращает tf.Tensor объекты, которые представляют единичный элемент в новом датасете. Эта реализация использует стандартные TensorFlow операции для преобразования одного элемента в другой.

Этот раздел описывает общие примеры того как использовать Dataset.map().

Парсинг сообщений буфера tf.Example протокола

Многие пайплайны ввода извлекают сообщения буфера протокола tf.train.Example из файла TFRecord формата (записанный, например, используя tf.python_io.TFRecordWriter). Каждая tf.train.Example запись содержит одно или несколько "свойств", и пайплайн ввода обычно преобразует эти свойства в тензоры.

# Преобразуем скалярную строку `example_proto` в пару скалярной строки
# и скалярное число, представляющую изображение и метку, соответственно.
def _parse_function(example_proto):
  features = 
        {"image": tf.FixedLenFeature((), tf.string, default_value=""),
         "label": tf.FixedLenFeature((), tf.int64, default_value=0)}
  parsed_features = tf.parse_single_example(example_proto, features)
  return parsed_features["image"], parsed_features["label"]

# Создаем датасет, который читает все примеры из двух файлов,
# и извлекает изображение и метку.
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(_parse_function)

Декодирование данных изображения и изменение его размера

При тренировке нейронной сети на данных настоящих изображений часто необходимо преобразовать изображения разных размеров к общему размеру, таким образом они могут быть объединены в пакеты в фиксированном размере.

# Читаем изображение из файла, декодируем его в тесносвязанный тензор
# и меняем размер к фиксированной форме
def _parse_function(filename, label):
  image_string = tf.read_file(filename)
  image_decoded = tf.image.decode_jpeg(image_string)
  image_resized = tf.image.resize_images(image_decoded, [28, 28])
  return image_resized, label

# Вектор имен файлов.
filenames = tf.constant(["/var/data/image1.jpg", 
                         "/var/data/image2.jpg", ...])

# `labels[i]` метка для изображения в `filenames[i].
labels = tf.constant([0, 37, ...])

dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(_parse_function)

Применение произвольной Python логики с tf.py_func()

По причинам производительности рекомендуется использовать TensorFlow операции для предобработки данных когда это возможно. Однако, иногда удобно вызывать внешние Python библиотеки при парсинге вводных данных. Чтобы выполнить это используем tf.py_func() операцию в Dataset.map() трансформации.

import cv2

# Используем кастомную OpenCV функцию для чтения изображения
# вместо стандартной TensorFlow `tf.read_file()` операции
def _read_py_function(filename, label):
  image_decoded = cv2.imread(filename.decode(), cv2.IMREAD_GRAYSCALE)
  return image_decoded, label

# Используем стандартные TensorFlow операции 
# для изменения размера изображения к фиксированной форме
def _resize_function(image_decoded, label):
  image_decoded.set_shape([None, None, None])
  image_resized = tf.image.resize_images(image_decoded, [28, 28])
  return image_resized, label

filenames = ["/var/data/image1.jpg", "/var/data/image2.jpg", ...]
labels = [0, 37, 29, 1, ...]

dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(
    lambda filename, label: tuple(tf.py_func(
        _read_py_function, [filename, label], [tf.uint8, label.dtype])))
dataset = dataset.map(_resize_function)

Пакетизация элементов датасета

Простая пакетизация

Простейшая пакетизация складывает n последовательных элементов датасета в один элемент. Dataset.batch() трансформация делает именно это, с теми ограничениями что и tf.stack() оператор, применяемый к каждому компоненту элементов: то есть для каждого компонента i все элементы должны иметь тензор точно такой же формы.

inc_dataset = tf.data.Dataset.range(100)
dec_dataset = tf.data.Dataset.range(0, -100, -1)
dataset = tf.data.Dataset.zip((inc_dataset, dec_dataset))
batched_dataset = dataset.batch(4)

iterator = batched_dataset.make_one_shot_iterator()
next_element = iterator.get_next()

print(sess.run(next_element))  
# ==> ([0, 1, 2,   3],   [ 0, -1,  -2,  -3])

print(sess.run(next_element))  
# ==> ([4, 5, 6,   7],   [-4, -5,  -6,  -7])

print(sess.run(next_element))  
# ==> ([8, 9, 10, 11],   [-8, -9, -10, -11])

Пакетизация тензоров с отступом

Приведенный выше пример работает для тензоров, которые имеют одинаковый размер. Однако, многие модели (например, последовательные модели) работают с вводными данными, которые могут иметь разный размер (например, последовательности различной длины). Для обработки таких случаев Dataset.padded_batch() трансформация позволяет пакетизировать тензоры различной формы определяя одно или несколько измерений, в которых может быть выставлен отступ.

dataset = tf.data.Dataset.range(100)
dataset = dataset.map(lambda x: tf.fill([tf.cast(x, tf.int32)], x))
dataset = dataset.padded_batch(4, padded_shapes=[None])

iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

print(sess.run(next_element))  
# ==> [[0, 0, 0], [1, 0, 0], [2, 2, 0], [3, 3, 3]]

print(sess.run(next_element))  # ==> [[4, 4, 4, 4, 0, 0, 0],
                               #      [5, 5, 5, 5, 5, 0, 0],
                               #      [6, 6, 6, 6, 6, 6, 0],
                               #      [7, 7, 7, 7, 7, 7, 7]]

Dataset.padded_batch() трансформация позволяет устанавливать различный отступ для каждого измерения каждого компонента, и он может быть переменной длины (означено None в примере выше) или постоянной длины. Также возможно переопределять значение в отступе (по умолчанию оно равно нулю).

Рабочие потоки тренировки

Обработка нескольких эпох

tf.data API предлагает два основных пути для обработки нескольких эпох тех же данных.

Постейший путь итерации датасета в нескольких эпохах - это использование Dataset.repeat() трансформации. Например, чтобы создать датасет, который повторяет его ввод в течение 10 эпох:

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.repeat(10)
dataset = dataset.batch(32)

Применение Dataset.repeat() трансформации без аргументов будет повторять ввод. Dataset.repeat() трансформация соединяет аргументы без обозначения конца каждой эпохи и начала следующей эпохи.

Если необходимо получать сигнал в конце каждой эпохи можно написать тренировочный цикл, который отлавливает tf.errors.OutOfRangeError исключение в конце датасета. В этой точке можно собирать статистику (например, валидационную ошибку) для эпохи.

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.batch(32)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()

# Вычисления для 100 эпох
for _ in range(100):
  sess.run(iterator.initializer)
  while True:
    try:
      sess.run(next_element)
    except tf.errors.OutOfRangeError:
      break

  # [Выполняем вычисления в конце эпохи здесь]

Перемешивание данных ввода в случайном порядке

Dataset.shuffle() преобразование перемешивает в случайном порядке вводный датасет, используя схожий алгоритм для tf.RandomShuffleQueue: он содержит буфер фиксированного размера и выбирает следующий элемент равномерно в случайном порядке из этого буфера.

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat()

Использование высокоуровневых API

tf.train.MonitoredTrainingSession API упрощает многие аспекты исполнения TensorFlow в распределенной установке. MonitoredTrainingSession использует tf.errors.OutOfRangeError для обозначения окончания тренировки, таким образом для его использования с tf.data API рекомендуется использовать Dataset.make_one_shot_iterator(). Например:

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()

next_example, next_label = iterator.get_next()
loss = model_function(next_example, next_label)

training_op = tf.train.AdagradOptimizer(...).minimize(loss)

with tf.train.MonitoredTrainingSession(...) as sess:
  while not sess.should_stop():
    sess.run(training_op)

Для использования Dataset в input_fn tf.estimator.Estimator просто возвращаем Dataset и фреймворк позаботится о создании итератора и его инициализировании. Например:

def dataset_input_fn():
  filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
  dataset = tf.data.TFRecordDataset(filenames)

  # Используем `tf.parse_single_example()` для извлечения данных из 
  # буфера протокола `tf.Example`, и выполнения любой дополнительной
  # предобработки для каждой записи.
  def parser(record):
    keys_to_features = {
        "image_data": tf.FixedLenFeature(
                          (), tf.string, default_value=""),
        "date_time": tf.FixedLenFeature((), tf.int64, default_value=""),
        "label": tf.FixedLenFeature((), tf.int64,
                           default_value=tf.zeros([], dtype=tf.int64)),
    }
    parsed = tf.parse_single_example(record, keys_to_features)

    # Выполняем дополнительную предобработку для полученных данных
    image = tf.image.decode_jpeg(parsed["image_data"])
    image = tf.reshape(image, [299, 299, 1])
    label = tf.cast(parsed["label"], tf.int32)

    return {"image_data": image, "date_time": parsed["date_time"]},\
           label

  # Используем `Dataset.map()`для построения пары словаря свойства 
  # и метки тензора для каждого примера.
  dataset = dataset.map(parser)
  dataset = dataset.shuffle(buffer_size=10000)
  dataset = dataset.batch(32)
  dataset = dataset.repeat(num_epochs)

  # Каждый элемент `dataset` это тапл(tuple), содержащий словарь свойств
  # (в котором каждое значение - это пакет значений для этого свойства)
  # и пакет меток.
  return dataset