Flink on YARN Security 浅析(Flink Part)

在上篇关于 YARN 系统 Security 的博客,我们解析了通过 YARN 提供的 Security API,Application 已经在 RM 注册并且可以顺利地申请到 container,但 YARN 对 container 后续的凭证刷新(reacquire)并不能作用到已经在运行的 Application 进程,因此对于长期运行的 Application 而言要开发者自己实现认证和后续凭证刷新的逻辑。本文将接着分析 Flink 如何在申请到的 container 启动 jobmanger 和 taskmanager 并完成认证,也就是 Flink Application 自身的认证,以及后续的凭证刷新方法,最后再讲述最近社区对于 Flink on YARN Security 改进提案。

长时间运行的 YARN Application 的四种认证方法

在 YARN 集群机器上预安装 keytab

将 Application 可能会用到的 keytab 预先安装到 YARN 集群的所有机器的本地文件系统并设置好目录权限,然后将相关路径作为作为 Application 的配置提供给 AM 和其他普通 container。用户进程启动时通过 UserGroupInformation.loginUserFromKeytab() 来加载凭证和认证,并且后续通过 keytab 来刷新 kerberos 凭证。具体的安全凭证分布如图一所示,其中实线表示永久凭证 keytab 的传递,虚线表示临时凭证 token 的传递。

图一. pre-installed-keytab

这种方式直接使用 Kerberos keytab ,绕开了 Hadoop delegation token,相当于 后者只用于 container 的申请。因此这样的优点是避开了 token 最大生命周期的问题,而缺点在于没有了 delegation token 的优点,即每次 TGT 刷新需要请求 KDC,而且 keytab 也需要比较高的运维成本。

通过 YARN 分发 keytab 给 AM 和其他 container

首先将 Application 客户端将 keytab 上传至 HDFS,并在提交 Application 时将其作为AM 需要本地化的资源。AM container 初始化时 NodeManager 会负责将 keytab 拷贝至 container 的资源目录,AM 启动时通过 UserGroupInformation.loginUserFromKeytab() 来重新认证。当 AM 需要申请 container 时,也将 HDFS 上的 keytab 列为需要本地化的资源,因此 container 也可以仿照 AM 进行认证。此外 AM 和 container 都必须额外实现一个线程来定时刷新 Kerberos TGT。

图二. distribute-via-yarn

Apache Flink 目前使用的正是这种方法。比起第一种方式,优点在于 keytab 只需要被安装在 client 端,YARN 集群上的机器只在有一个用户的作业在运行时才会有该用户的 keytab,作业完成后 keytab 也会随 container 被清理掉。

通过 YARN 分发 keytab 给 AM;AM 为 container 生成 delegation token

从 client 上传 keytab 到 AM 获得 keytab 的流程都与第二种方法相同,区别在于后续 AM 申请 container 时并不是将 keytab 列本地化资源,而是请求 container 需要的 delegation token(比如最基础的 HDFS_DELEGATION_TOKEN)并将这些 token 作为 ContainerLaunchContext 的安全凭证。由于 token 的最大生命周期问题,container 后续需要继续从 AM 获取新的 token,实现方式通常是 AM 和 container 通过 IPC 来定时更新 token。

图三. am-generates-token

这种方式比起前两种方式的好处在于 keytab 只存在于 AM 机器上似乎更加安全,不过由于 container 和 AM 相同的 HDFS 访问权限,实际上它们还是可以访问到 keytab,除非 AM 启动后将 keytab 从 HDFS 删除,不过这样在 AM 崩溃的情况下 YARN 就没有办法进行重试了。目前 Apache Spark 是使用这种方式认证。

Client 端推送 token 给 AM;AM 推送 token 给 container

如果将 YARN 集群视为不受信任的环境,严格限制将 keytab 分发到集群上,在 client 端推送 token 会是唯一的方式,即通过 Hadoop delegation token 启动 AM 和 container,随后 client 定时用 keytab 认证并重新获取 AM 的 token 并通过 IPC 的方式传递给它,AM 再通过 IPC 将 token 传递给 container。

图四. client-generates-token

这种方式把 keytab 限制在 client 机器上最为安全,但是 client 端的实现比较重,对于作业数成千上万的部署规模来说并不合适,而且 client 会成为一个故障单点。如果要一条路走到黑,部署多个 client 和高可用,那就相当于又一个安全的小分布式集群。

如上文所说,Flink 使用通过 YARN 分发 keytab 给 AM 和其他 container 的方式来认证,下面从源码角度来解析具体的实现,这会分为 Flink client 、AM(YarnClusterEntrypoint) 和 container(YarnTaskExecutorRunner)三个部分,源码以 1.6.2 版本为例。

篇幅起见这里只给出核心代码和以注释形式说明,完整代码见 AbstractYarnClusterDescriptor#startAppMaster。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 从配置读取 keytab 路径,若不为空则注册为 container 的 LocalResource
Path remotePathKeytab = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab = setupSingleLocalResource(
Utils.KEYTAB_FILE_NAME,
fs,
appId,
new Path(keytab),
localResources,
homeDir,
"");
}
// 根据配置和环境变量生成 AM 的启动命令,并设置到 AM ContainerLaunchContext 里
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
clusterSpecification.getMasterMemoryMB());
// 如果是 Kerberized 集群,向 NameNode 申请 HDFS_DELEGATION_TOKEN,并设置到 AM ContainerLaunchContext 里
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, yarnConfiguration);
}
amContainer.setLocalResources(localResources);

其中 Utils 类包含了许多 helper method,获取 token 的方法 #setTokensFor 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}

YarnClusterEntrypoint

YarnClusterEntrypoint 是 Flink 的 AM 主类,它在启动时会先初始化 SecurityContext。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void startCluster() {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
configureFileSystems(configuration);
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration);
return null;
});
} catch (Throwable t) {
LOG.error("Cluster initialization failed.", t);
shutDownAndTerminate(
STARTUP_FAILURE_RETURN_CODE,
ApplicationStatus.FAILED,
t.getMessage(),
false);
}
}

而 #installSecurityContext 方法通过 SecurityUtils 逐个调用了安全模块的认证,其中最重要的一个就是 HadoopModule 。HadoopModule 的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
// 用 keytab 认证
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
loginUser = UserGroupInformation.getLoginUser();
// supplement with any available tokens
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred =
(Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
hadoopConfiguration);
// 由于 Hadoop 认证机制更偏好 delegation token,使用 kerberos keytab 认证时需要过滤掉 delegation token
// 以下代码从 container 的本地 token 文件读取 token ,过滤掉 HDFS_DELEGATION_TOKEN 后覆盖掉原本的 container credentail
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentials.addToken(id, token);
}
}
Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, credentials);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}

此外 AM 还要负责申请 container 时设置好 container 的安全凭证,具体可见 Utils#createTaskExecutorContext,以下是核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 从 AM container 的本地 token 文件读取 token 并设置给 taskmanager container
// 其中这里有个问题是连同 AMRMToken 一齐传递给了 taskmanager,而 AMRMToken 顾名思义应该只有 AM 有权限使用,详见 [FLINK-11126](https://issues.apache.org/jira/browse/FLINK-11126)
final String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
log.debug("Adding security tokens to TaskExecutor's container launch context.");
try (DataOutputBuffer dob = new DataOutputBuffer()) {
Method readTokenStorageFileMethod = Credentials.class.getMethod(
"readTokenStorageFile", File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred =
(Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
HadoopUtils.getHadoopConfiguration(flinkConfig));
cred.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ctx.setTokens(securityTokens);
} catch (Throwable t) {
log.error("Failed to add Hadoop's security tokens.", t);
}
} else {
log.info("Could not set security tokens because Hadoop's token file location is unknown.");
}

YarnTaskExecutorRunner

YarnTaskExecutorRunner 是 Flink TaskManager 的主类,它初始化时的认证过程和 YarnClusterEntrypoint 相似,都是调用各个 SecurityModule 来认证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
String keytabPath = null;
if (remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
LOG.info("keytab path: {}", keytabPath);
}
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
currentUser.getShortUserName(), yarnClientUsername);
if (keytabPath != null && remoteKeytabPrincipal != null) {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
SecurityConfiguration sc = new SecurityConfiguration(configuration);
final String containerId = ENV.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
Preconditions.checkArgument(containerId != null,
"ContainerId variable %s not set", YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID);
// use the hostname passed by job manager
final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
if (taskExecutorHostname != null) {
configuration.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
}
SecurityUtils.install(sc);
SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
return null;
}
});

Uber 是 Apache Flink 的重度用户,而且由于技术栈和架构的关系 Uber 对于 Security 这块的积累比较多,最近(2108 年年底) Uber 的工程师向社区提出了 Security 的新改进方案[3]。虽然这个改进方案目前还没有得到广泛的关注(毕竟没有很多人熟悉安全模块)并且部分列举的场景比较小众,但其中的一些设计还是很有参考意义。

这个提案包含两个部分: 1.支持多种 YARN Application 认证方式;2.支持超级用户伪装为普通用户(impersonation)。下面将分点讲述。

支持多种 YARN Application 认证方式

该提案建议将 client 端、AM 端和普通 container 端的认证方法通过 CredentialFactory 来解耦,因此用户可以灵活定制三者的认证方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Kerberos Credential Factory
*/
public interface CredentialFactory {
/**
* Run on client side. Setup environment properties which will be shipped
* to the cluster side.
*/
Map<String, String> createEnvCredentialProperties(Configuration conf);
/**
* Run on client side. Setup environment credential by creating file cache or
* validate provided credentials are accessible on cluster side.
*/
void prepareEnvCrediential(Configuration conf, FileSystem fs);
/**
* Run on application master. Derive {@link SecurityConfiguration}, used by
* {@link SecureUtils} and {@link SecureModule} to install credentials.
*/
SecurityConfiguration prepareApplicationMasterCredentials(Map<String, String> envProperties);
/**
* Run on task executor. Derive {@link SecurityConfiguration}, used by
* {@link SecureUtils} and {@link SecureModule} to install credentials.
*/
SecurityConfiguration prepareTaskExecutorCredentials(Map<String, String> envProperties);
/**
* Checker function to ensure proper credentials are configured within the
* current cluster environment.
*/
boolean isCredentialEnvConfigured(Map<String, String> envProperties);
}

默认情况下提供除了 client 生成 token 外的三种常见认证策略。

支持超级用户 Impersonation

目前来说其实 Flink 的 Impersonation 需要通过 client 操作系统来实现(主要是因为 keytab 的权限是 600),比如 root 用户 su 为 joe 用户再使用 joe 用户的名义提交作业,这种方法会造成潜在的权限滥用。该提案提出我们可以使用 Hadoop 的 proxy user API[4]来实现 Flink 层面的 Impersonation 以避免这个问题。

要实现超级用户 Impersonation 首先要设置一个用户,比如 flink,为 Hadoop 的超级用户,这样他就可以申请到其他用户的 delagtion token,因此 flink 用户不需要访问一般用户joe 的 keytab 就可以通过 delegation token 来提交作业。后续 Flink 需要使用上述 Application 认证方式的第三或者第四种来持续刷新 delegation tokem,所以提案第一点支持多种 YARN Application 认证方式也是第二点 Impersonation 的前置条件。

总结

YARN 原本不是为长时间运行的实时应用设计的,尤其是在安全认证这块尤为明显,因此各种实时计算框架的 on YARN 认证都是神仙过海各显神通,没有统一的方式。Flink on YARN Security 目前已经比较稳定但是仍然不够灵活,导致在其上构建平台的企业级用户多少有些束手束脚的感觉。Uber 的提案是目前看上去是比较合理的,期待 Uber 这个提案可以顺利实现并被社区接受。

参考文献

1.Hadoop 官方文档: YARN Security
2.Flink 官方文档: Kerberos Authentication Setup and Configuration
3.Flink Kerberos Improvement Design
4.Hadoop 官方文档: Proxy User
5.Flink Security Improvements