parse_single_example将之前的A scalar string Tensor, a single serialized Example转变成张量。返回:A dict mapping feature keys to Tensor and SparseTensor values.
其中FixedLenFeature表示Configuration for parsing a fixed-length input feature,J理解为怎么从protocol buffer中以怎样的格式结束数据。
def file_based_input_fn_builder(input_file, seq_length, is_training,
drop_remainder):
"""Creates an `input_fn` closure to be passed to TPUEstimator."""
name_to_features = {
"input_ids": tf.FixedLenFeature([seq_length], tf.int64),
"input_mask": tf.FixedLenFeature([seq_length], tf.int64),
"segment_ids": tf.FixedLenFeature([seq_length], tf.int64),
"label_ids": tf.FixedLenFeature([], tf.int64),
}
def _decode_record(record, name_to_features):
"""Decodes a record to a TensorFlow example."""
example = tf.parse_single_example(record, name_to_features)
# tf.Example only supports tf.int64, but the TPU only supports tf.int32.
# So cast all int64 to int32.
for name in list(example.keys()):
t = example[name]
if t.dtype == tf.int64:
t = tf.to_int32(t)
example[name] = t
return example
def input_fn(params):
"""The actual input function."""
batch_size = params["batch_size"]
# For training, we want a lot of parallel reading and shuffling.
# For eval, we want no shuffling and parallel reading doesn't matter.
d = tf.data.TFRecordDataset(input_file)
if is_training:
d = d.repeat()
d = d.shuffle(buffer_size=100)
d = d.apply(
tf.contrib.data.map_and_batch(
lambda record: _decode_record(record, name_to_features),
batch_size=batch_size,
drop_remainder=drop_remainder))
return d
return input_fn
Warning: This section discusses implementing input pipelines using the queue-based APIs which can be cleanly replaced by the tf.data API.
A typical queue-based pipeline for reading records from files has the following stages:
The list of filenames
Optional filename shuffling
Optional epoch limit
Filename queue
A Reader for the file format
A decoder for a record read by the reader
Optional preprocessing
Example queue
Filenames, shuffling, and epoch limits
For the list of filenames, use either a constant string Tensor (like ["file0", "file1"] or [("file%d" % i) for i in range(2)]).
def get_files_name(path):
list = os.listdir(path)
result = []
for line in list:
file_path = os.path.join(path, line)
if os.path.isfile(file_path):
result.append(file_path)
return result
Pass the list of filenames to the tf.train.string_input_producer function.string_input_producer creates a FIFO queue for holding the filenames until the reader needs them.
def inputs(path, batch_size, num_epochs, allow_smaller_final_batch=False):
"""Reads input data num_epochs times.
Args:
train: Selects between the training (True) and validation (False) data.
batch_size: Number of examples per returned batch.
num_epochs: Number of times to read the input data, or 0/None to
train forever.
Returns:
A tuple (images, labels), where:
* images is a float tensor with shape [batch_size, mnist.IMAGE_PIXELS]
in the range [-0.5, 0.5].
* labels is an int32 tensor with shape [batch_size] with the true label,
a number in the range [0, mnist.NUM_CLASSES).
Note that an tf.train.QueueRunner is added to the graph, which
must be run using e.g. tf.train.start_queue_runners().
"""
if not num_epochs: num_epochs = None
# filename = os.path.join(FLAGS.train_dir,
# TRAIN_FILE if train else VALIDATION_FILE)
with tf.name_scope('input'):
filename_queue = tf.train.string_input_producer(
path, num_epochs=num_epochs)
Reader
Select the reader that matches your input file format and pass the filename queue to the reader's read method. The read method outputs a key identifying the file and record (useful for debugging if you have some weird records), and a scalar string value. Use one (or more) of the decoder and conversion ops to decode this string into the tensors that make up an example.
Another approach is to convert whatever data you have into a supported format. This approach makes it easier to mix and match data sets and network architectures. The recommended format for TensorFlow is a TFRecords file containing tf.train.Example protocol buffers (which containFeatures as a field). You write a little program that gets your data, stuffs it in an Example protocol buffer, serializes the protocol buffer to a string, and then writes the string to a TFRecords file using the tf.python_io.TFRecordWriter. For example,tensorflow/examples/how_tos/reading_data/convert_to_records.py converts MNIST data to this format.
dataset = tf.data.TFRecordDataset(filename)
dataset = dataset.repeat(num_epochs)
# map takes a python function and applies it to every sample
dataset = dataset.map(decode)
To accomplish the same task with a queue based input pipeline requires the following code (using the same decode function from the above example):
其中的reader方法:Returns the next record (key, value) pair produced by a reader.
Will dequeue a work unit from queue if necessary (e.g. when the Reader needs to start reading from a new file since it has finished with the previous file).
J也就是意思说read每次产生一个新的样本,而非新的tfrecord file文件。见原文The read method outputs a key identifying the file and record (useful for debugging if you have some weird records), and a scalar string value.
Jparse_single_example将之前的A scalar string Tensor, a single serialized Example转变成张量。返回:A dict mapping feature keys to Tensor and SparseTensor values.
其中FixedLenFeature表示Configuration for parsing a fixed-length input feature,J理解为怎么从protocol buffer中以怎样的格式结束数据。
而decode_raw表示Reinterpret the bytes of a string as a vector of numbers,就是将字符串解析为数值,J猜测上一步只是按照什么格式去读,但是读出来还是字符串,需要转换。
To read a file of TFRecords, use tf.TFRecordReader with the tf.parse_single_example decoder. The parse_single_example op decodes the example protocol buffers into tensors.
tf.image.per_image_standardization:Linearly scales image to have zero mean and unit norm.
Batching
At the end of the pipeline we use another queue to batch together examples for training, evaluation, or inference. For this we use a queue that randomizes the order of examples, using thetf.train.shuffle_batch.
def read_my_file_format(filename_queue):
reader = tf.SomeReader()
key, record_string = reader.read(filename_queue)
example, label = tf.some_decoder(record_string)
processed_example = some_processing(example)
return processed_example, label
def input_pipeline(filenames, batch_size, num_epochs=None):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
example, label = read_my_file_format(filename_queue)
# min_after_dequeue defines how big a buffer we will randomly sample
# from -- bigger means better shuffling but slower start up and more
# memory used.
# capacity must be larger than min_after_dequeue and the amount larger
# determines the maximum we will prefetch. Recommendation:
# min_after_dequeue + (num_threads + a small safety margin) * batch_size
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
Creating threads to prefetch using QueueRunner objects
上面的tf.train的函数都会产生QueueRunner对象,其存储了对于队列的相关操作。Holds a list of enqueue operations for a queue, each to be run in a thread.需要调用start_queue_runners启动这些QueueRunner对象才能操作。而QueueRunner需要配合Coordinator处理其队列可能出现的error。
示例代码为:
# Create the graph, etc.
init_op = tf.global_variables_initializer()
# Create a session for running operations in the Graph.
sess = tf.Session()
# Initialize the variables (like the epoch counter).
sess.run(init_op)
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
while not coord.should_stop():
# Run training steps or whatever
sess.run(train_op)
except tf.errors.OutOfRangeError:
print('Done training -- epoch limit reached')
finally:
# When done, ask the threads to stop.
coord.request_stop()
# Wait for threads to finish.
coord.join(threads)
sess.close()
实例代码为:
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
step = sess.run(global_step) #初始化一次step值方便后面的第一次summary
start_time = time.time()
while not coord.should_stop(): #在此循环,等待是否有任何队列所关联的线程发出停止的消息,因为需要不断对批量数据进行迭代,之前设置的allow_smaller_final_batch=False也是这个道理保证只有最后批次才不满足batch,直到末尾触发了outofrange的error,才会退出while
_, summary = sess.run([train_op, merged], {train_mode: True})
train_writer.add_summary(summary, step)
if step % 100 == 0: #因为一开始就是0,所以会对第一次进行统计
duration = time.time() - start_time
print('%.3f sec' % duration)
start_time = time.time()
if step % 1000 == 0:
save_path = new_saver.save(sess, os.path.join(model_path, "model.ckpt"), global_step=global_step)
print("Model saved in file: %s" % save_path)
step = sess.run(global_step)
except tf.errors.OutOfRangeError:#到了末尾后,就会到这里
print('Done training for %d epochs, %d steps.' % (epoch, step))
finally:#最后退出try模块,就会执行此步骤
# When done, ask the threads to stop.
save_path = new_saver.save(sess, os.path.join(model_path, "model.ckpt"), global_step=global_step)
print("Model saved in file: %s" % save_path)
coord.request_stop() #要求停止所有队列
# Wait for threads to finish.
coord.join(threads) #等待队列全部停止
sess.close() #关闭sess即可
def decode(serialized_example):
"""Parses an image and label from the given `serialized_example`."""
features = tf.parse_single_example(
serialized_example,
# Defaults are not specified since both keys are required.
features={
'image_raw': tf.FixedLenFeature([], tf.string),
'age': tf.FixedLenFeature([], tf.int64),
'gender': tf.FixedLenFeature([], tf.int64),
'file_name': tf.FixedLenFeature([], tf.string),
})
# Convert from a scalar string tensor (whose single string has
# length mnist.IMAGE_PIXELS) to a uint8 tensor with shape
# [mnist.IMAGE_PIXELS].
image = tf.decode_raw(features['image_raw'], tf.uint8)
image.set_shape([160 * 160 * 3])
image = tf.reshape(image, [160, 160, 3])
image = tf.reverse_v2(image, [-1])
image = tf.image.per_image_standardization(image)
# Convert label from a scalar uint8 tensor to an int32 scalar.
age = features['age']
gender = features['gender']
file_path = features['file_name']
return image, age, gender, file_path
def inputs(path, batch_size, num_epochs):
"""Reads input data
Args:
path: the path where tfrecord file is stored
Returns:
A tuple (images, age_labels, gender_labels, file_paths), where:
This function creates a one_shot_iterator, meaning that it will only iterate
over the dataset once. On the other hand there is no special initialization
required.
"""
filename = glob.glob(path + "/*.tfrecords")
with tf.name_scope('input'):
# TFRecordDataset opens a binary file and reads one record at a time.
# `filename` could also be a list of filenames, which will be read in order.
dataset = tf.data.TFRecordDataset(filename)
# The map transformation takes a function and applies it to every element
# of the dataset.
dataset = dataset.map(decode)
# The shuffle transformation uses a finite-sized buffer to shuffle elements
# in memory. The parameter is the number of elements in the buffer. For
# completely uniform shuffling, set the parameter to be the same as the
# number of elements in the dataset.
dataset = dataset.shuffle(1000 + 3 * batch_size)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
#dataset = dataset.apply(tf.contrib.data.batch_and_drop_remainder(batch_size))
#the final batch contain smaller tensors with shape N % batch_size in the batch dimension.
#If your program depends on the batches having the same shape,
#consider using the tf.contrib.data.batch_and_drop_remainder transformation instead.
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def test_once(image_path, batch_size, model_checkpoint_path):
with tf.Graph().as_default():
sess = tf.Session()
images, age_labels, gender_labels, file_paths = inputs(
path=image_path,
batch_size=batch_size,
num_epochs=1)
train_mode = tf.placeholder(tf.bool)
age_logits, gender_logits, _ = inception_resnet_v1.inference(images, keep_probability=0.8,
phase_train=train_mode,
weight_decay=1e-5)
age_cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=age_labels, logits=age_logits)
age_cross_entropy_mean = tf.reduce_mean(age_cross_entropy)
gender_cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=gender_labels,
logits=gender_logits)
gender_cross_entropy_mean = tf.reduce_mean(gender_cross_entropy)
total_loss = tf.add_n(
[gender_cross_entropy_mean, age_cross_entropy_mean] + tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES),
name="total_loss")
age_ = tf.cast(tf.constant([i for i in range(0, 101)]), tf.float32)
prob_age = tf.reduce_sum(tf.multiply(tf.nn.softmax(age_logits), age_), axis=1)
abs_age_error = tf.losses.absolute_difference(prob_age, age_labels)
prob_gender = tf.argmax(tf.nn.softmax(gender_logits), 1)
gender_acc = tf.reduce_mean(tf.cast(tf.nn.in_top_k(gender_logits, gender_labels, 1), tf.float32))
init_op = tf.group(tf.global_variables_initializer(),
tf.local_variables_initializer())
sess.run(init_op)
saver = tf.train.Saver()
saver.restore(sess, model_checkpoint_path)
mean_error_age, mean_gender_acc, mean_loss = [], [], []
try:
while True: # Train until OutOfRangeError
prob_gender_val, real_gender, prob_age_val, real_age, image_val, gender_acc_val, abs_age_error_val, cross_entropy_mean_val, file_names = sess.run(
[prob_gender, gender_labels, prob_age, age_labels, images, gender_acc, abs_age_error, total_loss,
file_paths], {train_mode: False})
mean_error_age.append(abs_age_error_val)
mean_gender_acc.append(gender_acc_val)
mean_loss.append(cross_entropy_mean_val)
print("Age_MAE:%.2f,Gender_Acc:%.2f%%,Loss:%.2f" % (
abs_age_error_val, gender_acc_val * 100, cross_entropy_mean_val))
except tf.errors.OutOfRangeError:
print('!!!TESTING DONE!!!')
sess.close()
return prob_age_val, real_age, prob_gender_val, real_gender, image_val, np.mean(
mean_error_age), np.mean(mean_gender_acc), np.mean(mean_loss), file_names
数据增强
基本代码
def decode(serialized_example):
"""Parses an image and label from the given `serialized_example`."""
features = tf.parse_single_example(
serialized_example,
# Defaults are not specified since both keys are required.
features={
'image_raw': tf.FixedLenFeature([], tf.string),
'age': tf.FixedLenFeature([], tf.int64),
'gender': tf.FixedLenFeature([], tf.int64),
'file_name': tf.FixedLenFeature([], tf.string),
})
# Convert from a scalar string tensor (whose single string has
# length mnist.IMAGE_PIXELS) to a uint8 tensor with shape
# [mnist.IMAGE_PIXELS].
image = tf.decode_raw(features['image_raw'], tf.uint8)
image.set_shape([160 * 160 * 3])
image = tf.reshape(image, [160, 160, 3])
image = tf.reverse_v2(image, [-1])
# image = tf.image.per_image_standardization(image)
# Convert label from a scalar uint8 tensor to an int32 scalar.
age = features['age']
gender = features['gender']
file_path = features['file_name']
return image, age, gender, file_path
def augment(image, age, gender, file_path):
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, max_delta=32.0 / 255.0)
image = tf.image.random_saturation(image, lower=0.5, upper=1.5)
return image, age, gender, file_path
def normalize(image, age, gender, file_path):
image = tf.image.per_image_standardization(image)
return image, age, gender, file_path
def inputs(path, batch_size, num_epochs, allow_smaller_final_batch=False):
"""Reads input data num_epochs times.
Args:
train: Selects between the training (True) and validation (False) data.
batch_size: Number of examples per returned batch.
num_epochs: Number of times to read the input data, or 0/None to
train forever.
Returns:
A tuple (images, labels), where:
* images is a float tensor with shape [batch_size, mnist.IMAGE_PIXELS]
in the range [-0.5, 0.5].
* labels is an int32 tensor with shape [batch_size] with the true label,
a number in the range [0, mnist.NUM_CLASSES).
Note that an tf.train.QueueRunner is added to the graph, which
must be run using e.g. tf.train.start_queue_runners().
"""
if not num_epochs: num_epochs = None
# filename = os.path.join(FLAGS.train_dir,
# TRAIN_FILE if train else VALIDATION_FILE)
with tf.name_scope('input'):
# TFRecordDataset opens a binary file and reads one record at a time.
# `filename` could also be a list of filenames, which will be read in order.
dataset = tf.data.TFRecordDataset(path)
# The map transformation takes a function and applies it to every element
# of the dataset.
dataset = dataset.map(decode)
dataset = dataset.map(augment)
dataset = dataset.map(normalize)
# The shuffle transformation uses a finite-sized buffer to shuffle elements
# in memory. The parameter is the number of elements in the buffer. For
# completely uniform shuffling, set the parameter to be the same as the
# number of elements in the dataset.
dataset = dataset.shuffle(1000 + 3 * batch_size)
dataset = dataset.repeat(num_epochs)
dataset = dataset.apply(tf.contrib.data.batch_and_drop_remainder(batch_size))
#the final batch contain smaller tensors with shape N % batch_size in the batch dimension.
#If your program depends on the batches having the same shape,
#consider using the tf.contrib.data.batch_and_drop_remainder transformation instead.
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()