阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

ActionTrail会将事件压缩之后保存到OSS中。用户使用和分析OSS中的文件比较麻烦。如果能将审计事件写入到SLS,会给用户提供很大的便利。阿里云函数计算是实现这个迁移的好办法,成本低,见效快。本文将介绍如何通过函数计算将ActionTrail事件从OSS导入到SLS中。

阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

创建服务

首先进入函数计算控制台创建服务。在高级配置中配置好SLS project,用于保存函数计算运行的信息,方便排查问题。函数计算会往里面写入函数运行的信息,用户也可以用过logger对象写入自定义的信息。

创建服务的过程中涉及到很多授权,需要根据提示授予权限。具体到这个例子,是要给OSS服务授权调用函数计算的触发器。给函数计算服务授权,读取OSS的数据,往SLS写入事件。

函数计算目前支持的Region不多,会限制目标OSS的区域。比如在华东2创建服务,就要求读的OSS要在华东2

阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

创建触发器

ActionTrail通过OSS PutObject方法往OSS写入文件,因此触发函数选择PubObject。

阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

创建函数

通过Python 2.7实现处理逻辑。在线编辑器还不错,不过没有智能提示功能。先获取ActionTrail的压缩文件,将其解压后,使用SLS客户端将其上传到指定的SLS Project中。需要将OSS client的endpoint和bucket name,SLS client的endpoint、log project、logstore替换成自己的。

# coding=utf-8

import time
from hashlib import md5
import zlib

import json
import logging
import oss2
from aliyun.log.logclient import LogClient
from aliyun.log.putlogsrequest import PutLogsRequest
from aliyun.log.logitem import LogItem

def handler(event, context):
    logger = logging.getLogger()
    
    endpoint = 'oss-cn-shanghai.aliyuncs.com'.format(context.region)
    creds = context.credentials
    auth = oss2.StsAuth(creds.access_key_id, creds.access_key_secret, creds.security_token)
    
    sls_client = LogClient('cn-shanghai.log.aliyuncs.com', creds.access_key_id, creds.access_key_secret, securityToken=creds.security_token)

    for evt in json.loads(event)['events']:
      
      bucket_name = evt['oss']['bucket']['name']
      bucket = oss2.Bucket(auth, endpoint, bucket_name)
      object_name = evt['oss']['object']['key']
      r = bucket.get_object(object_name)
      zipData = r.read(409600) #ActionTrail一个文件压缩前不会超过8KB,所以超过8KB就OK。
      actionTrailEvents = zlib.decompress(zipData, zlib.MAX_WBITS|16)
      
      logitemList = []
      
      for actionTrailEvent in json.loads(actionTrailEvents):
    
        logItem = LogItem()
        event_time = time.mktime(time.strptime(actionTrailEvent['eventTime'], '%Y-%m-%dT%XZ'))
        logItem.set_time(event_time)
        #这里只配置了'eventName',可以增加更多需要单独展示或者检索的字段。
        logItem.set_contents([('eventName', actionTrailEvent['eventName']), ('event', json.dumps(actionTrailEvent))])
        logitemList.append(logItem)
        
      req = PutLogsRequest('actiontrail-events-from-oss-to-sls', 'actiontrail-events', 'ActionTrail', 'henshao', logitemList)
      res = sls_client.put_logs(req)
      logger.info(res.log_print())
      
    return 'OK'

开发调试

函数计算的调试非常方便。在触发事件中,选择自定义事件,拿一个真实的ActionTrail文件来做测试(需要替换JSON里面的oss.bucket.arn、oss.object.key,以及所有的阿里云id)。每次修改代码之后,点击保存并运行。函数运行的信息都会在Web页上打印出来。

阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

验证结果

通过一个登录事件来验证函数计算的运行情况。这里有一点需要注意,SLS开启检索功能之前的日志在查询里面是不能展示的,可以去预览里面翻翻。

阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

阿里云函数计算应用:将ActionTrail审计事件从OSS导入到SLS中

参考资料

实践中,我发现函数计算是可以使用阿里云Python SDK的,相关的Python代码直接帖进去都可以使用,所以函数计算可以实现非常复杂的功能。

  1. 操作事件(Event)结构定义
  2. aliyun-log-python-sdk
上一篇:C++、Java、Objective-C、Swift 二进制兼容测试


下一篇:深入理解Spark:核心思想与源码分析. 3.7 创建和启动DAGScheduler