一篇读懂Flink闭包清除源码:深度全面剖析机制、原理与实现细节详解

2026-06-16阅读 0热度 0
其他

0x1 摘要

本文聚焦两个核心问题:为何 Flink 需要对闭包进行清理?以及 Flink 底层是如何实现闭包清理的?

Flink 闭包清除源码分析

0x2 Flink 为什么要做闭包清除

Flink 的所有算子都需要通过序列化机制分发到集群各节点执行。因此算子对象必须支持序列化,这是硬性约束。不少开发者习惯用匿名内部类定义算子——匿名内部类一旦捕获外部对象,便会自动形成闭包引用。若该外部对象未实现 Serializable 接口,序列化阶段就会直接抛异常。简而言之,Flink 框架必须在运行时将这些闭包引用“清除”,才能确保作业顺利运行。

0x3 Flink 闭包清除实现

先看一个最简单的 Map 算子示例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource source = env.addSource(new SourceFunction() {
    @Override
    public void run(SourceContext ctx) throws Exception {}
    @Override
    public void cancel() {}
});

source.map(new MapFunction() {
    @Override
    public String map(String value) throws Exception {
        return null;
    }
});

接下来深入 map 方法的源码:

public  SingleOutputStreamOperator map(MapFunction mapper) {
    TypeInformation outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
            Utils.getCallLocationName(), true);
    return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

注意这里调用了 clean(mapper)。继续追踪链路,最终会进入 StreamExecutionEnvironment 类的如下方法:

@Internal
public  F clean(F f) {
    if (getConfig().isClosureCleanerEnabled()) {
        ClosureCleaner.clean(f, true);
    }
    ClosureCleaner.ensureSerializable(f);
    return f;
}

至此已经清晰:闭包清理的核心工具就是 ClosureCleaner。下面详细拆解这个类。

先看它的 clean 方法:

public static void clean(Object func, boolean checkSerializable) {
    if (func == null) {
        return;
    }
    final Class cls = func.getClass();
    // First find the field name of the "this$0" field, this can
    // be "this$x" depending on the nesting
    boolean closureAccessed = false;
    for (Field f: cls.getDeclaredFields()) {
        if (f.getName().startsWith("this$")) {
            // found a closure referencing field - now try to clean
            closureAccessed |= cleanThis0(func, cls, f.getName());
        }
    }
    if (checkSerializable) {
        try {
            InstantiationUtil.serializeObject(func);
        }
        catch (Exception e) {
            String functionType = getSuperClassOrInterfaceName(func.getClass());
            String msg = functionType == null ?
                (func + " is not serializable.") :
                ("The implementation of the " + functionType + " is not serializable.");
            if (closureAccessed) {
                msg += " The implementation accesses fields of its enclosing class, which is "
                    + "a common reason for non-serializability. "
                    + "A common solution is to make the function a proper (non-inner) class, or "
                    + "a static inner class.";
            } else {
                msg += " The object probably contains or references non serializable fields.";
            }
            throw new InvalidProgramException(msg, e);
        }
    }
}

该方法接收两个参数:func 是需要清理的算子对象,checkSerializable 表示清理完成后是否执行序列化验证。

第一步通过反射找出所有以 this$ 开头的成员变量——这些正是闭包引用的字段,代码片段如下:

for (Field f: cls.getDeclaredFields()) {
    if (f.getName().startsWith("this$")) {
        // found a closure referencing field - now try to clean
        closureAccessed |= cleanThis0(func, cls, f.getName());
    }
}

定位到这些字段后,调用内部私有方法 cleanThis0 进行处理。来看它的实现:

private static boolean cleanThis0(Object func, Class cls, String this0Name) {
    This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
    getClassReader(cls).accept(this0Finder, 0);
    final boolean accessesClosure = this0Finder.isThis0Accessed();
    if (LOG.isDebugEnabled()) {
        LOG.debug(this0Name + " is accessed: " + accessesClosure);
    }
    if (!accessesClosure) {
        Field this0;
        try {
            this0 = func.getClass().getDeclaredField(this0Name);
        } catch (NoSuchFieldException e) {
            // has no this$0, just return
            throw new RuntimeException("Could not set " + this0Name + ": " + e);
        }
        try {
            this0.setAccessible(true);
            this0.set(func, null);
        }
        catch (Exception e) {
            // should not happen, since we use setAccessible
            throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e);
        }
    }
    return accessesClosure;
}

核心操作只有一行:this0.set(func, null); —— 直接将闭包引用置空。另外该方法借助了 ASM 字节码工具来探测闭包是否被访问,具体细节不再展开,感兴趣的同学可以自行查阅。

免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策