函数计算配置中心

一. 新建OSS配置文件存储仓库

1. 进入OSS控制台,新建bucket【fc-config】

函数计算配置中心

2. 创建一个子账户,用于读取OSS上的文件

  • 进入【RAM访问控制】-【用户】-【新建用户】
    函数计算配置中心
  • 为子账户分配权限
    函数计算配置中心
  • 为子账户创建AccessKey
    函数计算配置中心

3. 编写测试文件

在本地创建一个【fc-test-config.json】文件,写入一些测试内容:

{
    "host": "127.0.0.1",
    "port": 3306,
    "username": "test",
    "password": "123456"
}

4. 上传测试文件

函数计算配置中心

二、编写配置中心代码

1. 使用模板生成

fun init -n fc-config https://github.com/l616769490/python3-http-example.git

函数计算配置中心

2. 编写代码

import json
import oss2

def handler(environ, start_response):
    # 获取请求体
    try:
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))
    except (ValueError):
        request_body_size = 0

    request_body = environ['wsgi.input'].read(request_body_size)

    status, response = getConfigByName(str(request_body, encoding = "utf-8"))

    response_headers = [('Content-type', 'application/json')]
    start_response(status, response_headers)

    # 返回数据
    return [json.dumps(response).encode()]

def getConfigByName(fileName):
    utils = OSSUtils('这里替换成你的AccessKeyId', '这里替换成你的AccessKeySecret')
    return utils.getConfigByName(fileName, 'fc-config')

_ENDPOINT = 'https://oss-cn-shanghai.aliyuncs.com'

class OSSUtils:
    """ 封装OSS中的常用操作
    """

    def __init__(self, accessKeyId, accessKeySecret):
        """ 
        :param accessKeyId 阿里云accessKeyId
        :param accessKeySecret 阿里云accessKeySecret
        """
        self.accessKeyId = accessKeyId
        self.accessKeySecret = accessKeySecret
    
    def getConfigList(self, bucketName, dirName = ''):
        """ 获取配置列表 
        :param bucketName bucket名
        :param dirName 文件夹名
        """
        auth=oss2.Auth(self.accessKeyId, self.accessKeySecret)
        bucket = oss2.Bucket(auth, _ENDPOINT, bucketName)

        files = []
        for obj in oss2.ObjectIterator(bucket, dirName, ''):
            if obj.key != dirName:
                files.append(obj.key)
        return '200', files
    
    def getConfigByName(self, fileName, bucketName, dirName = ''):
        """ 获取配置
        :param fileName:配置文件名
        :param bucketName bucket名
        :param dirName 文件夹名
        :return: status, data status:成功返回200,失败返回404; data:成功返回数据,失败返回错误信息
        """
        auth=oss2.Auth(self.accessKeyId, self.accessKeySecret)
        bucket = oss2.Bucket(auth, _ENDPOINT, bucketName)

        objectName = dirName + fileName

        status = '200'
        data = ''
        try:
            remote_stream = bucket.get_object(objectName)
            data = str(remote_stream.read(), encoding = 'utf-8')
        except Exception as err:
            return err.status, err.message

        return status, data
    
    def updateConfig(self, data, bucketName, dirName = ''):
        """ 修改或者新增配置文件
        :param data{fileName, data}: fileName:文件名; data:文件内容
        :param bucketName bucket名
        :param dirName 文件夹名
        :return: status, data status:成功返回200,失败返回500; data:成功返回数据,失败返回错误信息
        """
        status = '200'
        data = ''
        if (data == None):
            data = '新增或修改失败'
            return status, data
        
        fileName = data['fileName']
        configData = data['data']

        try:
            auth=oss2.Auth(self.accessKeyId, self.accessKeySecret)
            bucket = oss2.Bucket(auth, _ENDPOINT, bucketName)
            objectName = dirName + fileName
            bucket.put_object(objectName, configData)

            data = '操作成功'
        except Exception as err:
            return err.status, err.message

        return status, data

三、测试

函数计算配置中心

四、其他函数计算中调用

1. 新建一个无触发器的普通函数计算【fc-config-test】,代码如下:

# -*- coding: utf-8 -*-
import requests
import datetime
import json

_HEADER = {
        'Content-Type' : 'application/json; charset=utf-8',
        'Date' : (datetime.datetime.now()-datetime.timedelta(hours=8)).strftime("%a, %d %b %Y %H:%M:%S GMT") 
    }

_CONF_HOST = '修改为你的配置中心地址'

def getDataForStr(host, data):
    """ 获取配置文件
    :param data:请求内容,字符串
    """
    r = requests.post(host, headers = _HEADER, data = data.encode())
    return r

def handler(event, context):
  conf = json.loads(getDataForStr(_CONF_HOST, 'fc-test-config.json').text)
  return conf

2. 执行

函数计算配置中心

上一篇:寻找网页设计灵感的27个最佳网站推荐


下一篇:库克婉拒网络安全听证会邀请 *麦凯恩:我本可不用这么客气