2021SC@SDUSC
OSSIM-agent源代码分析(四)
OSSIM-agent-config.py下
简述
OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式(standardized way)有序的发送给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行标准化处理,这样Server就可以依一种统一的方式来处理这些信息,并且也简化了Server的处理过程
agent的配置文件是config.py,作为大量数据的标准化处理位置,配置文件的对各种数据、文件的处理方式的
相关方法
在plugin.cfg文件中显示,各种常量的设置
_NEEDED_CONFIG_ENTRIES = {
'config': ['type', 'source', 'enable']
}
_EXIT_IF_MALFORMED_CONFIG = False
TRANSLATION_SECTION = 'translation'
TRANSLATION_FUNCTION = 'translate'
TRANSLATION_DEFAULT = '_DEFAULT_'
SECTIONS_NOT_RULES = ["config", "info", TRANSLATION_SECTION]
CONCAT_FUNCTION = 'CONCAT'
_MAP_REPLACE_TRANSLATIONS = 4
_MAP_REPLACE_USER_FUNCTIONS = 8
_MAP_REPLACE_CUSTOM_USER_FUNCTIONS = 16
_MAP_REPLACE_CONCAT = 32 #00100000
_MAP_REPLACE_TRANSLATE2 = 64 #1000000
检查所选部分是否为可以的翻译部分
__regexIsTranslationSection = re.compile("translation-\S+", re.UNICODE)
__regex_check_for_translate2_function = re.compile("\{translate2\((?P<variable>\$[^\)]+),(?P<translate_section>\$[^\)]+)\)\}", re.UNICODE)
设置配置、翻译等具体的规则
def rules(self):
rules = {}
for section in sorted(self.sections()):
regexp = self.get(section,"regexp")
if self.get("config","source") == "log" and (regexp is None or regexp == ""):
continue
if Plugin.__regexIsTranslationSection.match(section.lower()) is not None:
if section.lower() not in Plugin.SECTIONS_NOT_RULES:
logger.info("Loading new translation section... %s" % section.lower())
Plugin.SECTIONS_NOT_RULES.append(section.lower())
if section.lower() not in Plugin.SECTIONS_NOT_RULES:
rules[section] = self.hitems(section,True)
return rules
通过正则表达式和位置变换进行变量替换
def _replace_array_variables(self, value, groups):
for i in range(2):
search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
rvalue = value
if search != []:
for string in search:
var = string[2:-1]
var_position = 0
try:
var_position= int(var)
if var_position < len(groups):
rvalue = rvalue.replace(string, str(groups[var_position]))
except ValueError,e:
rvalue = value
return rvalue
替换变量、用户功能和自定义用户功能
def get_replace_array_value(self, value, groups):
rvalue = ""
rvalue = self._replace_array_variables(value, groups)
if rvalue==value:
rvalue = self._replace_user_array_functions(value, groups)
if rvalue == value:
rvalue = self._replace_custom_user_function_array(value,groups)
return rvalue
使用位置参数替换自定义用户函数,并且检查所有变量是否有替换,以及调用函数在ParserUtil.py
def _replace_custom_user_function_array(self,value,groups):
search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE)
if search != []:
for string in search:
(string_matched, func, variables) = string
vars = split_variables(variables)
for var in vars:
var_pos =0
try:
var_pos = int(var)
except TypeError:
logger.warning("Can not replace '%s'" % var)
return value
if var_pos >= len(groups):
logger.warning("Can not replace '%s' " % var)
return value
func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id"))
if func_name != Plugin.TRANSLATION_FUNCTION and \
hasattr(Plugin, func_name):
args = ",".join(["groups[%s]"%s for s in vars])
try:
cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args)
exec cmd
except TypeError, e:
logger.error(e)
else:
logger.warning( "Function '%s' is not implemented" % (func))
value = value.replace(string_matched, str(groups[var]))
return value
替换位置函数,检查所有变量是否有替换,然后调用ParserUtil.py函数、getattr等函数执行替换功能
def _replace_user_array_functions(self, value, groups):
if value is None:
return None
search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
if search != []:
for string in search:
(string_matched, func, variables) = string
vars = split_variables(variables)
for var in vars:
if len(groups) - 1 < int(var):
return value
if func != Plugin.TRANSLATION_FUNCTION and \
hasattr(ParserUtil, func):
f = getattr(ParserUtil, func)
args = ""
for v in vars:
args += "str(groups[%s])," % (str(v))
try:
exec "value = value.replace(string_matched," + \
"str(f(" + args + ")))"
except TypeError, e:
logger.error(e)
else:
logger.debug("Tranlation fucntion...")
for v in vars:
.
if int(v) > (len(groups) - 1):
logger.debug("Error var:%d, is greatter than groups size. (%d)" % (int(v), len(groups)))
else:
var_value = groups[int(v)]
if self.has_section(Plugin.TRANSLATION_SECTION):
if self.has_option(Plugin.TRANSLATION_SECTION, var_value):
value = self.get(Plugin.TRANSLATION_SECTION, var_value)
return value
通过正则表达式,查找配置参数中的_CFG(section,option)值,并将其替换为全局配置文件中的值
def replace_config(self, conf):
for section in sorted(self.sections()):
for option in self.options(section):
regexp = self.get(section, option)
search = re.findall("(\\\\_CFG\(([\w-]+),([\w-]+)\))", regexp, re.UNICODE)
if search != []:
for string in search:
(all, arg1, arg2) = string
if conf.has_option(arg1, arg2):
value = conf.get(arg1, arg2)
regexp = regexp.replace(all, value)
self.set(section, option, regexp)
总规则下,regexp条目中替换处理每一个反斜线
def replace_aliases(self, aliases)
for rule in self.rules().iterkeys():
regexp = self.get(rule, 'regexp')
search = re.findall("\\\\\w\w+", regexp, re.UNICODE)
if search != []:
for string in search:
repl = string[1:]
if aliases.has_option("regexp", repl):
value = aliases.get("regexp", repl)
regexp = regexp.replace(string, value)
self.set(rule, "regexp", regexp)
从get_replace_value()调用,通过正则替换变量
def _replace_variables(self, value, groups, rounds=2):
for i in range(rounds):
search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
if search != []:
for string in search:
var = string[2:-1]
if groups.has_key(var):
value = value.replace(string, str(groups[var]))
return value
确定替换变量是否成功,如果失败,则跳过get_replace_value()
def _replace_variables_assess(self, value):
ret = 0
for i in range(2):
search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE)
if search != []:
ret = i
return ret
通过搜索字符串方式,替换指定插件及其对应插件
def _replace_translations(self, value, groups):
regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})"
search = re.findall(regexp, value, re.UNICODE)
if search != []:
for string in search:
(string_matched, func, var) = string
if groups.has_key(var):
if self.has_section(Plugin.TRANSLATION_SECTION):
if self.has_option(Plugin.TRANSLATION_SECTION,
groups[var]):
value = self.get(Plugin.TRANSLATION_SECTION,
groups[var])
else:
logger.warning("Can not translate '%s' value" % \
(groups[var]))
if self.has_option(Plugin.TRANSLATION_SECTION,
Plugin.TRANSLATION_DEFAULT):
value = self.get(Plugin.TRANSLATION_SECTION,
Plugin.TRANSLATION_DEFAULT)
else:
value = groups[var]
else:
logger.warning("There is no translation section")
value = groups[var]
return value
同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()
def _replace_translations_assess(self, value):
regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})"
search = re.findall(regexp, value, re.UNICODE)
if search != []:
return self._MAP_REPLACE_TRANSLATIONS
return 0
函数被指定为{f($v)},并且从get_replace_value()调用,执行替换功能
def _replace_user_functions(self, value, groups):
search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
if search != []:
for string in search:
(string_matched, func, variables) = string
vars = split_variables(variables)
for var in vars:
if not groups.has_key(var):
logger.warning("Can not replace '%s'" % (value))
return value
if func != Plugin.TRANSLATION_FUNCTION and \
hasattr(ParserUtil, func):
f = getattr(ParserUtil, func)
args = ""
for i in (range(len(vars))):
args += "groups[vars[%s]]," % (str(i))
try:
cmd = "value = value.replace(string_matched," + \
"str(f(" + args + ")))"
exec cmd
except TypeError, e:
logger.error(e)
else:
logger.warning(
"Function '%s' is not implemented" % (func))
value = value.replace(string_matched, \
str(groups[var]))
return value
同上,确定替换变量是否成功,如果失败,则跳过get_replace_value()
def _replace_user_functions_assess(self, value):
search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE)
if search != []:
return self._MAP_REPLACE_USER_FUNCTIONS
return 0
检查所有变量是否有替换,如果没有一直替换到全部替换为止
def _replace_custom_user_functions(self,value,groups):
search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE)
if search != []:
for string in search:
(string_matched, func, variables) = string
vars = split_variables(variables)
for var in vars:
if not groups.has_key(var):
logger.warning("Can not replace '%s'" % (var))
return value
func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id"))
if func_name != Plugin.TRANSLATION_FUNCTION and \
hasattr(Plugin, func_name):
args = ""
for i in (range(len(vars))):
args += "groups[vars[%s]]," % (str(i))
try:
cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args)
exec cmd
except TypeError, e:
logger.error(e)
else:
logger.warning(
"Function '%s' is not implemented" % (func))
value = value.replace(string_matched, \
str(groups[var]))
return value
替换数组变量连接的值
def __replaceConcatFunction(self,value,groups):
concat =""
m = re.match("\$CONCAT\((?P<params>.*)\)",value)
if m:
mdict = m.groupdict()
if mdict.has_key('params'):
paramlist = mdict['params'].split(',')
for param in paramlist:
if param.startswith('$'):#assume variable
if groups.has_key(param[1:]):
concat+=groups[param[1:]]
else:
concat+=param
else:
concat+=param
return concat
检查是否有必要进行连接函数替换
def __checkReplaceConcatFunction(self,value):
ret = 0
if re.match("\$CONCAT\((.*)\)",value):
ret = self._MAP_REPLACE_CONCAT
return ret
替换传递过来的对象中给定的值
def __replace_translate2_function(self,value,groups):
replaced_value =""
m = self.__regex_check_for_translate2_function.match(value)
if m:
mdict = m.groupdict()
if 'variable' in mdict and 'translate_section' in mdict:
variable = mdict['variable'].replace('$','')
translate_section = mdict['translate_section'].replace('$','')
if variable in groups:
if self.has_section(translate_section):
if self.has_option(translate_section,groups[variable]):
replaced_value = self.get(translate_section, groups[variable])
return replaced_value
else:
logger.warning("Translate2 variable(%s) not found on the translate section %s" % (groups[variable],translate_section))
else:
logger.warning("Translate2 translation_section(%s) not found" % translate_section)
replaced_value = value
检查是否需要换translate2
def __check_for_translate2_function(self,value):
"""Check whether we need to make a translate2 replacement or not.
@param value: the string in the rule
"""
ret = 0
if Plugin.__regex_check_for_translate2_function.match(value) is not None:
ret = self._MAP_REPLACE_TRANSLATE2
return ret
替换concat函数、自定义用户函数、用户函数、翻译、变量等
def get_replace_value(self, value, groups, replace=15):
if replace > 0:
if replace & self._MAP_REPLACE_TRANSLATE2:
value = self.__replace_translate2_function(value, groups)
if replace & 3:
value = self._replace_variables(value, groups, (replace & 3))
if replace & 4:
value = self._replace_translations(value, groups)
if replace & 8:
value = self._replace_user_functions(value, groups)
if replace & self._MAP_REPLACE_CUSTOM_USER_FUNCTIONS:
value = self._replace_custom_user_functions(value,groups)
if replace & self._MAP_REPLACE_CONCAT:
value = self.__replaceConcatFunction(value,groups)
return value
类CommandLineOptions,用于配置各种数据
class CommandLineOptions:
def __init__(self):
self.__options = None
parser = OptionParser(
usage="%prog [-v] [-q] [-d] [-f] [-g] [-c config_file]",
version="OSSIM (Open Source Security Information Management) " + \
"- Agent ")
parser.add_option("-v", "--verbose", dest="verbose",
action="count",
help="verbose mode, makes lot of noise")
parser.add_option("-d", "--daemon", dest="daemon", action="store_true",
help="Run agent in daemon mode")
parser.add_option("-f", "--force", dest="force", action="store_true",
help="Force startup overriding pidfile")
parser.add_option("-s", "--stats", dest="stats", type='choice', choices=['all', 'clients', 'plugins'], default=None,
help="Get stats about the agent")
parser.add_option("-c", "--config", dest="config_file", action="store",
help="read config from FILE", metavar="FILE")
(self.__options, args) = parser.parse_args()
if len(args) > 1:
parser.error("incorrect number of arguments")
if self.__options.verbose and self.__options.daemon:
parser.error("incompatible options -v -d")
def get_options(self):
return self.__options
进行数组等变量创建
def split_variables(string):
return re.findall("(?:\$([^,\s]+))+", string)
def split_sids(string, separator=','):
list = list_tmp = []
list = string.split(separator)
for sid in list:
a = sid.split('-')
if len(a) == 2:
list.remove(sid)
for i in range(int(a[0]), int(a[1]) + 1):
list_tmp.append(str(i))
list.extend(list_tmp)
return list