# 分布式计算框架:从 MapReduce 到 Ray,我们到底在聊什么?
如果你问 MapReduce 和 Spark 有什么关系,大多数人会脱口而出:它们都是大数据处理引擎。但要是接着问 Spark 和 TensorFlow 呢?不少人可能就有点含糊了——这俩关注的领域好像不太一样吧?再往下问,Spark 和 MPI 又是什么关系?这就更远了。
虽然这么问多少有些不严谨,但仔细想想,它们确实共享着同一个内核,也就是我们今天要聊的一个大话题:**分布式计算框架**。
无论是 MapReduce、Spark 还是 TensorFlow,本质上都是在利用分布式能力,运行某些计算,解决特定问题。从这个层面看,它们都定义了一套“分布式计算模型”——也就是提出一种计算方法,通过这种方法来解决海量数据的分布式计算问题。区别在于,它们各自提出的计算模型不同。MapReduce 如其名,是一个非常基础的 map-reduce 模型(似乎说了等于没说)。Spark 定义了 RDD 模型,本质上是由一系列 map/reduce 操作组成的 DAG 图。TensorFlow 的计算模型也是一张图,但比 Spark 的图要“复杂”得多——你需要为图中的每个节点和每条边作出明确定义,这些定义会指导 TensorFlow 如何执行这张图。这种具体化的定义,让 TensorFlow 特别适合处理特定类型的计算,也就是神经网络。而 Spark 的 RDD 模型则更适合那些没有相互依赖关系的数据并行任务。
那么,有没有一种既通用、又简单、同时性能还高的分布式计算模型呢?从实践来看,这很难。通用往往意味着无法针对具体场景做极致优化;而为专门任务定制的分布式方案又做不到通用,当然也做不到简单。
插一句题外话:分布式计算模型有个常被忽视的伴随内容——调度。虽然不怎么受关注,但它是分布式引擎的必备组件。MapReduce 的调度依靠 YARN,Spark 有自己的内嵌调度器,TensorFlow 也一样。而 MPI 呢?它的调度几乎等于没有——假设集群有资源,直接用 ssh 把所有任务拉起来。其实调度应该分为资源调度器和任务调度器:前者向资源管理者申请硬件资源,后者把计算图中的任务下发到这些远程资源上去执行,也就是所谓的两阶段调度。近年来出现的 TensorFlowOnSpark 这类项目,本质就是用 Spark 的资源调度能力,加上 TensorFlow 的计算模型。
## 当单机程序遇上分布式:一个自然的想法
写完一个单机程序后,如果遇到数据量上的瓶颈,一个很自然的念头就是:能不能让它跑在分布式环境里?如果能不加改动或者稍加改动就能实现分布式化,那就太理想了。可惜现实往往很残酷。通常情况下,对于一般性的程序,开发者需要自己手动编写分布式版本——借助 MPI 之类的框架,自己控制数据的分发和汇总,自己处理任务的失败容灾(通常也谈不上容灾)。如果恰好要对一批数据做批量化处理,那可以直接用 MapReduce 或 Spark 预定义的 API。这类任务中,计算框架已经帮你把业务之外的“脚手架代码”全部搞定了。同理,如果要训练一个神经网络,用 TensorFlow 或 PyTorch 就好。简单说,你的问题如果已经有对应的框架,直接拿来用就行。但如果没有呢?除了自己从头实现,还有别的路吗?
## 一个叫 Ray 的项目:改几行代码就让程序变成分布式?
今天注意到一个项目叫 Ray,它声称只需要稍微改一下你的代码,就能让它跑在分布式环境里——当然,这个项目其实早就发布了,只是之前没太关注。它只限于 Python,来看这个例子:
------------------------------------------------ ----------------------------------------------------
| **Basic Python** | **Distributed with Ray** |
------------------------------------------------ ----------------------------------------------------
| | |
| # Execute f serially. | # Execute f in parallel. |
| | |
| def f(): | @ray.remote |
| time.sleep(1) | def f(): |
| return 1 | time.sleep(1) |
| | return 1 |
| | |
| results = [f() for i in range(4)] | ray.init() |
| | results = ray.get([f.remote() for i in range(4)]) |
------------------------------------------------ ----------------------------------------------------
这么简单?这让笔者想到了 OpenMP(注意不是 OpenMPI)。看看 C++ 的例子:
#include
#include"omp.h"
using namespace std;
void main() {
#pragma omp parallel for
for(int i = 0; i < 10; i++) {
cout << "Test" << endl;
}
system("pause");
}
引入头文件,加一行预处理指令,代码立刻变成并行执行。当然 OpenMP 不是分布式,它只是借助编译器把需要并行化的部分编译成多线程运行,本身还是一个进程,所以并行度受限于 CPU 线程数。如果 CPU 是双线程,只能 2 倍加速;有些服务器上单核有 32 线程,那就能享受到 32 倍加速(被并行化的部分)。不过这些细节不重要——在用户看来,Ray 的做法和 OpenMP 是不是有几分相似?你不需要做太多代码改动,就能让代码变成分布式执行(当然 OpenMP 更绝,因为对于不支持 OpenMP 的编译器,那一行预处理指令就是一行注释)。
## Ray 的原理:API 注入,隐式定义计算图
那么 Ray 是怎么做到的呢?说起来也比较简单:它定义了一些 API,类似于 MPI 中的通信原语。使用时,把这些 API “注入”到代码的合适位置,代码就变成了用户代码夹杂着 Ray 框架层的 API 调用,整个代码实际上就形成了一张计算图。接下来就等 Ray 把这图计算完,返回结果。Ray 的论文给了一个例子:
@ray.remote
def create_policy():
# Initialize the policy randomly.
return policy
@ray.remote(num_gpus=1)
class Simulator(object):
def __init__(self):
# Initialize the environment.
self.env = Environment()
def rollout(self, policy, num_steps):
observations = []
observation = self.env.current_state()
for _ in range(num_steps):
action = policy(observation)
observation = self.env.step(action)
observations.append(observation)
return observations
@ray.remote(num_gpus=2)
def update_policy(policy, *rollouts):
# Update the policy.
return policy
@ray.remote
def train_policy():
# Create a policy.
policy_id = create_policy.remote()
# Create 10 actors.
simulators = [Simulator.remote() for _ in range(10)]
# Do 100 steps of training.
for _ in range(100):
# Perform one rollout on each actor.
rollout_ids = [s.rollout.remote(policy_id)
for s in simulators]
# Update the policy with the rollouts.
policy_id = update_policy.remote(policy_id, *rollout_ids)
return ray.get(policy_id)
生成的计算图为:

所以,用户要做的事情就是在自己的代码里加入合适的 Ray API 调用,然后代码就变成了一张分布式计算图。作为对比,再来看 TensorFlow 对图的定义:
import tensorflow as tf
# 创建数据流图:y = W * x + b,其中W和b为存储节点,x为数据节点。
x = tf.placeholder(tf.float32)
W = tf.Variable(1.0)
b = tf.Variable(1.0)
y = W * x + b
with tf.Session() as sess:
tf.global_variables_initializer().run() # Operation.run
fetch = y.eval(feed_dict={x: 3.0}) # Tensor.eval
print(fetch) # fetch = 1.0 * 3.0 + 1.0
'''输出:4.0'''
可以看出,TensorFlow 需要你显式地、明确地定义出图的节点(`placeholder`、`Variable` 等等,这些都是具体的图节点类型),而 Ray 中的图是以一种隐式的方式定义的。从开发者的角度看,后者更自然——它站在你的视角,而不是让你去适配框架的轮子。
## Ray 是那个“通用、简单、灵活”的分布式框架吗?
那么,Ray 是不是我们一直在找的那个既通用、又简单、还灵活的分布式计算框架?坦白说,因为没有太多的 Ray 使用经验,这个问题不太好下定论。从官方介绍来看,有限的几个 API 确实足够简单。但仅靠这几个 API 能不能实现通用且灵活的目标,还不好讲。本质上,TensorFlow 对图的定义也足够通用,但它并不是一个通用的分布式计算框架。有些问题不在于框架本身,而在于问题本身的分布式化就存在困难。试图寻求一个通用分布式计算框架来解决所有单机问题,可能本身就是一个伪命题。
话扯远了。假设 Ray 真的能让我们用一种比较轻松的方式让程序分布式执行,那会带来什么?前不久 Databricks 开源了一个新项目叫 Koalas,试图用 RDD 的框架来并行化 Pandas。因为 Pandas 的场景与 Spark 类似,两者的底层存储结构和概念也很相近,所以用 RDD 来分布式化 Pandas 是可行的。换个角度想,如果 Ray 足够简单好用,在 Pandas 里加一些 Ray 的 API 调用,所需要花费的时间和精力可能会远远少于开发一套 Koalas。但问题是,在 Pandas 里加 Ray 就把 Pandas 绑定到了 Ray 上,即使是单机场景也一样——因为 Ray 做不到像 OpenMP 那样:如果支持,很好;不支持,也不影响代码运行。
## 跳出细节:分布式计算框架的本质
啰嗦了这么多,其实是想从这些引擎的细节中跳出来,思考一下到底什么是分布式计算框架,每种框架是如何设计的,解决了什么问题,又有哪些优缺点。最后用一位大佬的观点来收尾。David Patterson 在演讲《New Golden Age For Computer Architecture》中提到:通用硬件正在逼近极限,要想达到更高的效率,我们需要设计面向领域的架构(Domain Specific Architectures)。如今正是一个计算架构层出不穷的时代,每种架构都是为了解决其面对的领域问题而出现的,必然包含对特定问题的特殊优化。通用性不是用户解决问题的出发点,而更多是框架设计者的“一厢情愿”——用户关注的永远是领域问题。从这个意义上讲,面向领域的计算架构才是正确的方向。
*声明:限于本人水平有限,文中陈述内容可能有误。欢迎批评指正。*