分布式计算学习笔记(四) Ray 实战 -- 将CNN网络改写成 Ray

分布式计算学习笔记(四) Ray 实战 -- 将CNN网络改写成 Ray

概述

本篇博客首先会对 CNN + MNIST 的神经网络结构进行分析(因为我也不会),之后利用 Ray 的 API进行改写,完成分布式框架上的深度学习工作。

实现

CNN 结构

tf.reshape(tensor, shape, name=None)

将 tensor 变换为参数shape的形式

1
xs = tf.placeholder(tf.float32, [None,784])
2
x_image = =tf.reshape(xs,[-1,28,28,1])

其中,两个28表示了该图片的长宽是28个像素,厚度因为是黑白图片,所以为1,如果图片是rgb颜色,则为3,最开始的-1表示这种图片的多少,即 x_image[0] 为一张完整的图片。

tf.nn.conv2d(input, filter, strides, padding, use_cudnn_on_gpu=None, data_format=None, name=None)

卷积操作,filter= patch(筛选器)
input : [batch, in_height, in_width, in_channels]
filter: [filter_height, filter_width, in_channels, out_channels]
其中 in_cahnnels代表的是图像的高度,输入为1,由于卷积操作之后图像的高度会变高,所以out_cahnnels要大于in_channels的值
strides: [1, x_movement, y_movement, 1] 第一个参数和第四个参数都是1,第二个和第三个是 x 轴和 y 轴的移动距离,如果都是2,代表着每次移动2个像素(会有一个像素的信息不能被采集)
padding: 当padding=SAME 时,输入与输出形状相同

tf.nn.max_pool(value, ksize, strides, padding, name=None)

池化操作
value: [batch, in_height, in_width, in_channels]池化输入参数
ksize: [1, height, width, 1] 池化窗口大小
strides` = `[1, x_movement, y_movement, 1] 第一个参数和第四个参数都是1,第二个和第三个是x轴和y轴的移动距离,如果都是2,代表着每次移动2个像素(会有一个像素的信息不能被采集)
padding: 当 padding=SAME 时,输入与输出形状相同

基于 MNIST 的图像分类结构

2D + CNN

1
from __future__ import print_function
2
import tensorflow.compat.v1 as tf
3
tf.disable_v2_behavior()
4
from tensorflow.examples.tutorials.mnist import input_data
5
import time
6
# number 1 to 10 data
7
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
8
9
10
def compute_accuracy(v_xs, v_ys):
11
    # global prediction
12
    y_pre = sess.run(prediction, feed_dict={xs: v_xs, keep_prob: 1})
13
    correct_prediction = tf.equal(tf.argmax(y_pre, 1), tf.argmax(v_ys, 1))
14
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
15
    result = sess.run(accuracy, feed_dict={xs: v_xs, ys: v_ys, keep_prob: 1})
16
    return result
17
18
def weight_variable(shape):
19
    initial = tf.truncated_normal(shape, stddev=0.1)
20
    return tf.Variable(initial)
21
22
def bias_variable(shape):
23
    initial = tf.constant(0.1, shape=shape)
24
    return tf.Variable(initial)
25
26
def conv2d(x, W):
27
    # stride [1, x_movement, y_movement, 1]
28
    # Must have strides[0] = strides[3] = 1
29
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
30
31
def max_pool_2x2(x):
32
    # stride [1, x_movement, y_movement, 1]
33
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
34
35
# define placeholder for inputs to network
36
xs = tf.placeholder(tf.float32, [None, 784])  # 28x28
37
ys = tf.placeholder(tf.float32, [None, 10])
38
keep_prob = tf.placeholder(tf.float32)
39
x_image = tf.reshape(xs,[-1,28,28,1])
40
41
## conv1 layer ##
42
W_conv1 = weight_variable([5,5,1,32])
43
b_conv1 = bias_variable([32])
44
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1) # x_image: 28*28*1 output size: 28*28*32
45
h_pool1 = max_pool_2x2(h_conv1) # output size: 14*14*32
46
47
## conv2 layer ##
48
W_conv2 = weight_variable([5, 5, 32, 64])
49
b_conv2 = bias_variable([64])
50
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)# 14*14*32 -> 14*14*64
51
h_pool2 = max_pool_2x2(h_conv2) # 14*14*64 -> 7*7*64
52
53
## func1 layer ##
54
W_fc1 = weight_variable([7*7*64,1024])
55
b_fc1 = bias_variable([1024])
56
h_pool2_flat = tf.reshape(h_pool2,[-1, 7*7*64])
57
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1)+b_fc1)
58
h_fc1_drop = tf.nn.dropout(h_fc1,keep_prob)
59
60
## func2 layer ##
61
62
W_fc2 = weight_variable([1024, 10])
63
b_fc2 = bias_variable([10])
64
prediction = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2)+b_fc2)
65
66
# the error between prediction and real data
67
cross_entropy = tf.reduce_mean(-tf.reduce_sum(ys * tf.log(prediction),
68
                                              reduction_indices=[1]))       # loss
69
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
70
71
sess = tf.Session()
72
# important step
73
# tf.initialize_all_variables() no long valid from
74
# 2017-03-02 if using tensorflow >= 0.12
75
if int((tf.__version__).split('.')[1]) < 12 and int((tf.__version__).split('.')[0]) < 1:
76
    init = tf.initialize_all_variables()
77
else:
78
    init = tf.global_variables_initializer()
79
sess.run(init)
80
start = time.time()
81
for i in range(1000):
82
    batch_xs, batch_ys = mnist.train.next_batch(100)
83
    sess.run(train_step, feed_dict={
84
             xs: batch_xs, ys: batch_ys, keep_prob: 0.5})
85
    if i % 50 == 0:
86
        print(compute_accuracy(
87
            mnist.test.images[:1000], mnist.test.labels[:1000]))
88
end = time.time()
89
print('execution time: ' + str(end-start) + 's')

2D + CNN + Ray

1
2
from tensorflow.examples.tutorials.mnist import input_data
3
import os
4
import ray
5
import time
6
import tensorflow.compat.v1 as tf
7
tf.disable_v2_behavior()
8
9
ray.init(num_gpus=8, include_webui=True)
10
# consruct neural network
11
12
def weight_variable(shape):
13
    initial = tf.truncated_normal(shape, stddev=0.1)
14
    return tf.Variable(initial)
15
16
def bias_variable(shape):
17
    initial = tf.constant(0.1, shape=shape)
18
    return tf.Variable(initial)
19
20
def conv2d(x, W):
21
    # stride [1, x_movement, y_movement, 1]
22
    # Must have strides[0] = strides[3] = 1
23
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
24
25
def max_pool_2x2(x):
26
    # stride [1, x_movement, y_movement, 1]
27
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
28
29
def construct_network():
30
31
    # define placeholder for inputs to network
32
    xs = tf.placeholder(tf.float32, [None, 784])  # 28x28
33
    ys = tf.placeholder(tf.float32, [None, 10])
34
    keep_prob = tf.placeholder(tf.float32)
35
    x_image = tf.reshape(xs, [-1, 28, 28, 1])
36
37
    ## conv1 layer ##
38
    W_conv1 = weight_variable([5, 5, 1, 32])
39
    b_conv1 = bias_variable([32])
40
    # x_image: 28*28*1 output size: 28*28*32
41
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
42
    h_pool1 = max_pool_2x2(h_conv1)  # output size: 14*14*32
43
44
    ## conv2 layer ##
45
    W_conv2 = weight_variable([5, 5, 32, 64])
46
    b_conv2 = bias_variable([64])
47
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) +
48
                         b_conv2)  # 14*14*32 -> 14*14*64
49
    h_pool2 = max_pool_2x2(h_conv2)  # 14*14*64 -> 7*7*64
50
51
    ## func1 layer ##
52
    W_fc1 = weight_variable([7*7*64, 1024])
53
    b_fc1 = bias_variable([1024])
54
    h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
55
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1)+b_fc1)
56
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
57
58
    ## func2 layer ##
59
60
    W_fc2 = weight_variable([1024, 10])
61
    b_fc2 = bias_variable([10])
62
    prediction = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2)+b_fc2)
63
64
    # the error between prediction and real data
65
    cross_entropy = tf.reduce_mean(-tf.reduce_sum(ys * tf.log(prediction),
66
                                                  reduction_indices=[1]))       # loss
67
    train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
68
    
69
    correct_prediction = tf.equal(tf.argmax(prediction, 1), tf.argmax(ys, 1))
70
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
71
72
    return xs, ys, train_step, keep_prob, accuracy
73
74
@ray.remote(num_gpus=1)
75
class CNN_ON_RAY(object):
76
    def __init__(self, mnist_data):
77
        self.mnist = mnist_data
78
        # Set an environment variable to tell TensorFlow which GPUs to use. Note
79
        # that this must be done before the call to tf.Session.
80
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
81
            [str(i) for i in ray.get_gpu_ids()])
82
        with tf.Graph().as_default():
83
            with tf.device("/gpu:0"):
84
                self.xs, self.ys, self.train_step, self.keep_prob, self.accuracy = construct_network()
85
                # Allow this to run on CPUs if there aren't any GPUs.
86
                config = tf.ConfigProto(allow_soft_placement=True)
87
                #### normal network
88
                # init = tf.initialize_all_variables()
89
                # sess = tf.Session()
90
                # sess.run(init)
91
                ####
92
                self.sess = tf.Session(config=config)
93
                # Initialize the network.
94
                init = tf.global_variables_initializer()
95
                self.sess.run(init)
96
97
    def train(self, num_steps):
98
        for i in range(num_steps):
99
            # load dataset by batch
100
            batch_xs, batch_ys = self.mnist.train.next_batch(100)
101
            # train
102
            self.sess.run(self.train_step, feed_dict={
103
                          self.xs: batch_xs, self.ys: batch_ys, self.keep_prob: 0.5})
104
105
            if i % 50 == 0:
106
                # print(compute_accuracy(
107
                #     mnist.test.images[:1000], mnist.test.labels[:1000]))
108
                print(self.get_accuracy())
109
110
    def get_accuracy(self):
111
        return self.sess.run(self.accuracy, feed_dict={
112
            self.xs: mnist.test.images[:1000], self.ys: mnist.test.labels[:1000], self.keep_prob: 1})
113
114
# load MNIST dataset,并告诉Ray如何序列化定制类。
115
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)
116
start = time.time()
117
# Create the actor. 实例化actor并运行构造函数
118
nn = CNN_ON_RAY.remote(mnist)
119
120
# Run a few steps of training and print the accuracy.
121
train_id = nn.train.remote(1000)
122
ray.get(train_id)
123
end = time.time()
124
print('execution time: ' + str(end-start) + 's')

结果分析

No Ray

1
0.087
2
0.749
3
0.854
4
0.887
5
0.902
6
0.926
7
0.919
8
0.941
9
0.946
10
0.949
11
0.954
12
0.953
13
0.958
14
0.956
15
0.96
16
0.965
17
0.962
18
0.963
19
0.965
20
0.968
21
execution time: 52.58758211135864s

Ray

1
(pid=11597) 0.077
2
(pid=11597) 0.722
3
(pid=11597) 0.853
4
(pid=11597) 0.877
5
(pid=11597) 0.904
6
(pid=11597) 0.912
7
(pid=11597) 0.921
8
(pid=11597) 0.932
9
(pid=11597) 0.94
10
(pid=11597) 0.938
11
(pid=11597) 0.944
12
(pid=11597) 0.941
13
(pid=11597) 0.944
14
(pid=11597) 0.952
15
(pid=11597) 0.953
16
(pid=11597) 0.951
17
(pid=11597) 0.957
18
(pid=11597) 0.959
19
(pid=11597) 0.958
20
(pid=11597) 0.966
21
execution time: 59.315696001052856s

从结果看,Ray在性能上没有进步太多,这是因为在运行过程中并没有使用多 worker,也就是没有发挥 Ray 本身(分布式框架)的属性。因为是前期实验,所以没有太多更复杂的操作,多worker并行操作就需要涉及到不同worker之间 weight, bias的同步以及网络结构的统一,这都是需要在后期考虑的事情。
dashboard

问题

通过和同学的讨论,之后会选择在终端先启动 Ray,然后在 ray.init() 中进行调用的方式进行训练,这样就避免了每一次任务结束之后 Ray 会自动关闭的情况,但在测试中发现了一些问题,进行记录。

  1. 在终端中启动 Ray 之后,会进行一下输出

    1
    Started Ray on this node. You can add additional nodes to the cluster by calling
    2
    ray start --address='172.17.78.111:21907' --redis-password='5241590000000000'

    其意思为增加新的节点,在测试过程中我一共输入了两遍,逻辑上共创建了1个母节点(master)和两个子节点(slave),但在 dashboard输出中,我并没有看到3个节点间的逻辑关系
    ray-test1
    ray-test2

  2. 终端运行 python3 cnn-ray.py , 在 dashboard 中可以看到,的确只有一个进程(worker)在执行task。
    ray-test3
    但是在终端输出界面,却发现了同样的结果输出了3次的情况
    ray-test4


# Recommend Posts
 1.1/1/2019 - 12/31/2019
 2.1/1/2019 - 12/31/2019
 3.82 年生的金智英
 4.Hexo主题折腾日记(二) 添加豆瓣和聊天插件
 5.Hexo主题折腾日记(一) 从cactus到icarus

Comments


修子也好,远野也好,对于情感世界发生的事,很难简单以对和错来衡量,在这样的世界里沉浮,飘落的是情感,不败的总是每年盛开的樱花。 --《情人》

JavaScript NLP Python RL
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×