[实时计算flink]安全访问最佳实践
由于Flink无法提前预知您要使用的上下游系统,当作业需要访问不同的上下游系统来读写数据时,可能需要使用您的AccessKey信息作为访问凭证。主账号的AccessKey具备云账号下资源的全部权限,一旦泄露可能会造成严重后果。本文通过对RAM用户授予相关上下游更小化权限,并结合Flink变量功能对AccessKey进行加密,以进一步提升访问安全性。
方案说明
本方案通过减少暴露面积原则来实现更安全的AccessKey访问。即不使用主账号的AccessKey,通过主账号授权子账号的方式,让子账号具备访问某些上下游的权限或更小化权限,然后利用Flink变量功能使用AccessKey,降低AccessKey明文泄漏的风险,减少意外情况下AccessKey的泄漏导致您主账号下所有的云资源受到的相关安全风险。
本文创建了一个RAM用户,并为RAM用户授予日志服务SLS Project下指定Logstore权限,并利用变量功能,实现Flink作业读写SLS数据。
示例教程
-
使用阿里云账号(主账号)或RAM管理员创建RAM用户,具体操作请参见创建RAM用户。
访问方式必须勾选OpenAPI调用访问,启用后,会自动为RAM用户生成一个AccessKey ID和AccessKey Secret。
重要
RAM用户的AccessKey Secret只在创建时显示,不支持查看,请务必妥善保管。
-
为RAM用户授予SLS相关权限。
-
创建自定义权限策略。
-
在左侧导航栏,选择权限管理 > 权限策略。
-
在权限策略页面,单击创建权限策略。
-
在脚本编辑页签,将配置框中的原有脚本替换为如下内容,然后单击继续编辑基本信息。
Project和Logstore需要根据您的实际情况替换,更多权限策略请参见RAM自定义授权示例。
指定Logstore只读权限
指定Logstore写入权限
{"Version": "1","Statement": [{"Action": "log:ListProject","Resource": "acs:log:*:*:project/*","Effect": "Allow"},{"Action": "log:List*","Resource": "acs:log:*:*:project/<指定的Project名称>/logstore/*","Effect": "Allow"},{"Action": ["log:Get*","log:List*"],"Resource": "acs:log:*:*:project/<指定的Project名称>/logstore/<指定的Logstore名称>","Effect": "Allow"}] }
-
输入权限策略名称和备注,单击确定。
-
-
-
为RAM用户添加上一步创建的自定义权限策略。具体操作,请参见为RAM用户授权。
-
配置变量,降低AccessKey明文泄漏的风险。
为步骤1创建RAM用户时得到的AccessKey ID和AccessKey Secret创建变量,创建后只需调用变量名使用,无需填写具体值,具体操作步骤请参见新增变量。本文针对AccessKey ID和AccessKey Secret分别创建了名为slslak和slsaks的变量。
-
创建Flink作业,读取SLS数据。
在SQL作业开发时以
${secret_values.变量名}
格式使用变量,避免明文AccessKey带来的安全风险,示例如下。CREATE TEMPORARY TABLE sls_input(`__source__` STRING METADATA VIRTUAL,__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,`__topic__` STRING METADATA VIRTUAL,deploymentName STRING,`level`STRING,`location` STRING,message STRING,thread STRING,`time`STRING ) WITH ('connector' = 'sls','endpoint' ='cn-beijing-intranet.log.aliyuncs.com','accessId' = '${secret_values.slsak}','accessKey' = '${secret_values.slsaks}','starttime' = '2024-08-30 15:39:00','project' ='test','logstore' ='flinktest' );CREATE TEMPORARY TABLE blackhole_sink(`__source__` STRING,`__topic__` STRING,deploymentName STRING,`level` STRING,`location` STRING,message STRING,thread STRING,`time` STRING,receive_time BIGINT ) WITH ('connector' = 'blackhole' );INSERT INTO blackhole_sink SELECT `__source__`,`__topic__`,deploymentName,`level`,`location`,message,thread,`time`,cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;