> Author: xidianwangtao@gmail.com
 当前性能问题描述
  - 增加worker数,一定范围内能带来较好的性能提升,但是继续增加worker数时,训练性能提升不明显;
- 增加ps数,一定范围内能带来较好的性能提升,但是继续增加ps数时,训练性能提升不明显;
可能原因:
  -  与ps和worker的分布情况强相关:  - 目前的调度策略,主要根据服务器的cpu和内存使用情况进行均衡调度,尽量使得集群中每台服务器的cpu和内存使用率相当。这种情况下,ps和worker的调度存在一定程度的随机性。
- 如果调度时,每台包含worker的服务器都有对应一个ps,那么训练性能会更高?如果有,性能提升多少呢?
 
-  K8S中的worker从HDFS集群中读取训练数据时存在IO瓶颈?可能网络上的或者是HDFS本身的配置,需要通过HDFS集群的监控来进一步排查。 
下面,是针对第一种“可能原因:与ps和worker的分布情况强相关“ 设计的测试场景和用例:
 场景1:将每个worker所在的服务器都有对应的ps。
 测试用例
   | 用例ID | 服务器数 | worker数 | ps数 | 说明 | 
|---|
   | 1 | 1 | 10 | 1 | 一台服务器部署了10个worker和1个ps | 
 | 2 | 5 | 50 | 5 | 5台服务器分别部署了10个worker和1个p | 
 | 3 | 10 | 100 | 10 | 10台服务器分别部署了10个worker和1个p | 
 | 4 | 20 | 200 | 20 | 20台服务器分别部署了10个worker和1个p | 
  
 TensorFlow tasks调度设计图
 
 调度实现
  - 场景1的TensorFlow对象模板***scene1.jinja***
# scene1.jinja —— 对象模板 {%- set name = "##NAME##" -%} {%- set worker_replicas = ##WN## -%} {%- set ps_replicas = ##PN## -%} {%- set script = "##SCRIPT##" -%} {%- set case = "##CASE##" -%}   {%- set port = 2222 -%} {%- set log_host_dir = "/var/log/tensorflow" -%} {%- set log_container_dir = "/var/log" -%} {%- set image = "registry.vivo.xyz:4443/bigdata_release/tensorflow1.3.0" -%} {%- set replicas = {"worker": worker_replicas, "ps": ps_replicas} -%}  {%- macro worker_hosts() -%}   {%- for i in range(worker_replicas) -%}     {%- if not loop.first -%},{%- endif -%}     {{ name }}-worker-{{ i }}:{{ port }}   {%- endfor -%} {%- endmacro -%}  {%- macro ps_hosts() -%}   {%- for i in range(ps_replicas) -%}     {%- if not loop.first -%},{%- endif -%}     {{ name }}-ps-{{ i }}:{{ port }}   {%- endfor -%} {%- endmacro -%}   {%- for i in range( begin_index, end_index ) -%} {%- if task_type  == "worker" %}  --- kind: Service apiVersion: v1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   clusterIP: None   selector:     name: {{ name }}     job: {{ task_type }}     task: "{{ i }}"   ports:   - port: {{ port }}     targetPort: 2222 --- kind: Job apiVersion: batch/v1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   template:     metadata:       labels:         name: {{ name }}         job: {{ task_type }}         task: "{{ i }}"     spec:       imagePullSecrets:       - name: harborsecret'       affinity:         nodeAffinity:           requiredDuringSchedulingIgnoredDuringExecution:             nodeSelectorTerms:               - matchExpressions:                 - key: "CASE"                   operator: In                   values:                    - "{{ case }}"                 - key: "INDEX"                   operator: In                   values:                    - "{{ i // 10 }}"                 - key: "SCENCE"                   operator: In                   values:                    - "1"       containers:       - name: {{ name }}-{{ task_type }}-{{ i }}         image: {{ image }}         resources:           requests:             memory: "4Gi"             cpu: "300m"         ports:         - containerPort: 2222         command: ["/bin/sh", "-c", "export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH  -np --cut-dir=1 -R 'index.html*,*gif'  {{ script }}; cd ./{{ name }}; sh ./run.sh {{ ps_hosts() }} {{ worker_hosts() }} {{ task_type }} {{ i }} {{ ps_replicas }} {{ worker_replicas }}"]       restartPolicy: OnFailure  {%- endif -%}  {%- if task_type == "ps" -%} --- kind: Service apiVersion: v1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   clusterIP: None   selector:     name: {{ name }}     job: {{ task_type }}     task: "{{ i }}"   ports:   - port: {{ port }}     targetPort: 2222 --- kind: Deployment apiVersion: extensions/v1beta1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   replicas: 1   template:     metadata:       labels:         name: {{ name }}         job: {{ task_type }}         task: "{{ i }}"     spec:       imagePullSecrets:       - name: harborsecret       affinity:         nodeAffinity:           requiredDuringSchedulingIgnoredDuringExecution:             nodeSelectorTerms:               - matchExpressions:                 - key: "CASE"                   operator: In                   values:                    - "{{ case }}"                 - key: "INDEX"                   operator: In                   values:                    - "{{ i }}"                 - key: "SCENCE"                   operator: In                   values:                    - "1"       containers:       - name: {{ name }}-{{ task_type }}-{{ i }}         image: {{ image }}         resources:           requests:             memory: "4Gi"             cpu: "2"         ports:         - containerPort: 2222         command: ["/bin/sh", "-c","export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH  -np --cut-dir=1 -R 'index.html*,*gif'  {{ script }}; cd ./{{ name }}; sh ./run.sh {{ ps_hosts() }} {{ worker_hosts() }} {{ task_type }} {{ i }} {{ ps_replicas }} {{ worker_replicas }}"]       restartPolicy: Always {%- endif -%} {%- endfor -%} 
  选择对应的节点打上对应的Label。
 kubectl label node $node_name SCENCE=1 CASE=? INDEX=? 
 测试结果
 用例2的测试截图:
 
 场景2:将所有ps和所有worker都强制进行物理隔离。
 测试用例
   | 用例ID | 服务器数 | worker数 | ps数 | 说明 | 
|---|
   | 1 | 2 | 10 | 1 | 一台服务器部署10个worker,另外一台部署1个ps | 
 | 2 | 10 | 20 | 5 | 5台服务器分别部署10个worker,5台服务器分别部署1个ps | 
 | 3 | 20 | 50 | 10 | 10台服务器分别部署10个worker,10台服务器分别部署1个ps | 
 | 4 | 40 | 200 | 20 | 20台服务器分别部署10个worker,20台服务器分别部署1个ps | 
  
 TensorFlow tasks调度设计图
 
 调度实现
  - 场景2的TensorFlow对象模板***scene2.jinja***
# scene2.jinja —— 对象模板 {%- set name = "##NAME##" -%} {%- set worker_replicas = ##WN## -%} {%- set ps_replicas = ##PN## -%} {%- set script = "##SCRIPT##" -%} {%- set case = "##CASE##" -%}   {%- set port = 2222 -%} {%- set log_host_dir = "/var/log/tensorflow" -%} {%- set log_container_dir = "/var/log" -%} {%- set image = "registry.vivo.xyz:4443/bigdata_release/tensorflow1.3.0" -%} {%- set replicas = {"worker": worker_replicas, "ps": ps_replicas} -%}  {%- macro worker_hosts() -%}   {%- for i in range(worker_replicas) -%}     {%- if not loop.first -%},{%- endif -%}     {{ name }}-worker-{{ i }}:{{ port }}   {%- endfor -%} {%- endmacro -%}  {%- macro ps_hosts() -%}   {%- for i in range(ps_replicas) -%}     {%- if not loop.first -%},{%- endif -%}     {{ name }}-ps-{{ i }}:{{ port }}   {%- endfor -%} {%- endmacro -%}   {%- for i in range( begin_index, end_index ) -%} {%- if task_type  == "worker" %}  --- kind: Service apiVersion: v1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   clusterIP: None   selector:     name: {{ name }}     job: {{ task_type }}     task: "{{ i }}"   ports:   - port: {{ port }}     targetPort: 2222 --- kind: Job apiVersion: batch/v1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   template:     metadata:       labels:         name: {{ name }}         job: {{ task_type }}         task: "{{ i }}"     spec:       imagePullSecrets:       - name: harborsecret'       affinity:         nodeAffinity:           requiredDuringSchedulingIgnoredDuringExecution:             nodeSelectorTerms:               - matchExpressions:                 - key: "CASE"                   operator: In                   values:                    - "{{ case }}"                 - key: "INDEX"                   operator: In                   values:                    - "{{ i // 10 }}"                 - key: "SCENCE"                   operator: In                   values:                    - "2"                 - key: "TYPE"                   operator: In                   values:                    - "worker"       containers:       - name: {{ name }}-{{ task_type }}-{{ i }}         image: {{ image }}         resources:           requests:             memory: "4Gi"             cpu: "300m"         ports:         - containerPort: 2222         command: ["/bin/sh", "-c", "export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH  -np --cut-dir=1 -R 'index.html*,*gif'  {{ script }}; cd ./{{ name }}; sh ./run.sh {{ ps_hosts() }} {{ worker_hosts() }} {{ task_type }} {{ i }} {{ ps_replicas }} {{ worker_replicas }}"]       restartPolicy: OnFailure  {%- endif -%}  {%- if task_type == "ps" -%} --- kind: Service apiVersion: v1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   clusterIP: None   selector:     name: {{ name }}     job: {{ task_type }}     task: "{{ i }}"   ports:   - port: {{ port }}     targetPort: 2222 --- kind: Deployment apiVersion: extensions/v1beta1 metadata:   name: {{ name }}-{{ task_type }}-{{ i }}   namespace: {{ name }} spec:   replicas: 1   template:     metadata:       labels:         name: {{ name }}         job: {{ task_type }}         task: "{{ i }}"     spec:       imagePullSecrets:       - name: harborsecret       affinity:         nodeAffinity:           requiredDuringSchedulingIgnoredDuringExecution:             nodeSelectorTerms:               - matchExpressions:                 - key: "CASE"                   operator: In                   values:                    - "{{ case }}"                 - key: "INDEX"                   operator: In                   values:                    - "{{ i }}"                 - key: "SCENCE"                   operator: In                   values:                    - "2"                 - key: "TYPE"                   operator: In                   values:                    - "ps"       containers:       - name: {{ name }}-{{ task_type }}-{{ i }}         image: {{ image }}         resources:           requests:             memory: "4Gi"             cpu: "2"         ports:         - containerPort: 2222         command: ["/bin/sh", "-c","export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH  -np --cut-dir=1 -R 'index.html*,*gif'  {{ script }}; cd ./{{ name }}; sh ./run.sh {{ ps_hosts() }} {{ worker_hosts() }} {{ task_type }} {{ i }} {{ ps_replicas }} {{ worker_replicas }}"]       restartPolicy: Always {%- endif -%} {%- endfor -%} 
  选择对应的节点打上对应的Label。
 kubectl label node $node_name SCENCE=1 CASE=? INDEX=? TYPE=? 
 测试结果
 用例2的测试截图:
 
 测试结论及思考
 对比两种不同场景下用例2(5个ps,50个worker)的监控数据,发现如下现象:
  测试结论
  - 分布式tensorflow中,每个worker选择哪个ps作为自己的参数服务器跟我们如何强制分布ps和worker的布局无关,由分布式tensorflow内部自己控制(跟tf.train.replica_device_setter()设置的strategy有关)。
问题思考
  -  为什么这个训练中,多个ps中只有一个ps在工作?是算法只有一个Big参数?如果是,那么默认按照Round-Robin策略只会使用一个ps,就能解释这个问题了。这需要算法的兄弟进行确认。 
-  如果将Big参数拆分成众多Small参数,使用RR或LB或Partition策略之一,应该都能利用多个ps进行参数更新明显提升训练性能。 
-  通过这次折腾,也不是一无所获,至少发现我们对于Distributed TensorFlow的内部工作原理还不甚了解,非常有必要深入到源码进行分析。