tensorflow的Parameter server架构(PS架构),集群中的节点被分为两类:参数服务器(parameter server)和工作服务器(worker)。其中参数服务器存放模型的参数,而工作服务器负责计算参数的梯度。在每个迭代过程,工作服务器从参数服务器中获得参数,然后将计算的梯度返回给参数服务器,参数服务器聚合从工作服务器传回的梯度,然后更新参数,并将新的参数广播给工作服务器。下面给一个简单的例子来说明,在一台机器上构建ps和worker(ip地址相同,端口号不同即可实现,和两台机器实际上是一样的),ps端和worker端代码如下:
ps:
#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
#coding=utf-8
#多台机器,每台机器有一个显卡、或者多个显卡,这种训练叫做分布式训练
from time import sleep
import tensorflow.compat.v1 as tf
#现在假设我们有A、B、C、D四台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
"worker": [
"127.0.0.1:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
],
"ps": [
"127.0.0.1:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
]})
#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
isps=True
if isps:
server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
server.join()
else:
server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
addwb=w+b
mutwb=w*b
divwb=w/b
saver = tf.train.Saver()
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(init_op=init_op, summary_op=summary_op, saver=saver)
with sv.managed_session(server.target) as sess:
while 1:
print(sess.run([addwb,mutwb,divwb]))
worker:
#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
from time import sleep
import tensorflow.compat.v1 as tf
#现在假设我们有A、B台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
"worker": [
"127.0.0.1:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
],
"ps": [
"127.0.0.1:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
]})
#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
isps=False
if isps:
server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
server.join()
else:
server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
addwb=w+b
mutwb=w*b
divwb=w/b
saver = tf.train.Saver()
summary_op = tf.summary.merge_all()
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(logdir="./simple_log", init_op=init_op, summary_op=summary_op, saver=saver)
# Supervisor会自动判断在logdir模型是否存在,去找checkpoint,如果存在的话会自动读取模型,不用显示地调用restore。
with sv.managed_session(server.target) as sess:
count = 0
while 1:
count = count + 1
print(str(count) + "th compute result is :")
print(sess.run([addwb,mutwb,divwb]))
print("***********************************")
sleep(1)
if count >= 5:
break
先执行ps,ps会在server.join()处等待worker传回参数,之后执行worker打印结果如下:
1th compute result is :
[array([[7., 7.],
[7., 7.]], dtype=float32), array([[10., 10.],
[10., 10.]], dtype=float32), array([[0.4, 0.4],
[0.4, 0.4]], dtype=float32)]
***********************************
2th compute result is :
[array([[7., 7.],
[7., 7.]], dtype=float32), array([[10., 10.],
[10., 10.]], dtype=float32), array([[0.4, 0.4],
[0.4, 0.4]], dtype=float32)]
***********************************
3th compute result is :
[array([[7., 7.],
[7., 7.]], dtype=float32), array([[10., 10.],
[10., 10.]], dtype=float32), array([[0.4, 0.4],
[0.4, 0.4]], dtype=float32)]
***********************************
4th compute result is :
[array([[7., 7.],
[7., 7.]], dtype=float32), array([[10., 10.],
[10., 10.]], dtype=float32), array([[0.4, 0.4],
[0.4, 0.4]], dtype=float32)]
***********************************
5th compute result is :
[array([[7., 7.],
[7., 7.]], dtype=float32), array([[10., 10.],
[10., 10.]], dtype=float32), array([[0.4, 0.4],
[0.4, 0.4]], dtype=float32)]
***********************************