Python 代码实践小结

Python 代码实践小结

最近写了较多的 Python 脚本,将最近自己写的脚本进行一个总结,其中有些是 Python 独有的,有些是所有程序设计*有的:

考虑使用 Logger(logger 怎么配置,需要输出哪些信息 — 可以反向考虑,自己看到这个 logger 的时候想了解什么信息)

传递的数据结构如何考虑(是否对调用方有先验知识的要求,比如返回一个 Tuple,则需要用户了解 tuple 中元素的顺序,这样情况是否应该进行封装;),数据结构定义清楚了,很多东西也就清楚了。

如何操作数据库(可以学习 sqlalchemy,包括 core 和 orm 两种 api)

异常如何处理(异常应该分开捕获 — 可以清楚的知道什么情况下导致的,异常之后应该打印日志说明出现什么问题,如果情况恶劣需要进行异常再次抛出或者报警)

所有获取资源的地方都应该做 check(a. 没有获取到会怎么办;b.获取到异常的怎么办)

所有操作资源的地方都应该检查是否操作成功

每个函数都应该简短,如果函数过长应该进行拆分(有个建议值,函数包含的行数应该在 20-30 行之间,具体按照这个规范做过一次之后就会发现这样真好)

使用 class 之后,考虑重构 __str__ 函数,用户打印输出(如果不实现 __str__,会调用 __repr__ ),如果对象放到 collection 中之后,需要实现 __repr__ 函数,用于打印整个 collection 的时候,直观显示

如果有些资源会发生变化,可以单独抽取出来,做成函数,这样后续调用就可以不用改变了

附上一份 Python2.7 代码(将一些私有的东西进行了修改)


  1. # -*- coding:utf-8 -*- 
  2.   
  3. from sqlalchemy import create_engine 
  4. import logging 
  5. from logging.config import fileConfig 
  6. import requests 
  7. import Clinet # 私有的模块 
  8.   
  9. fileConfig("logging_config.ini"
  10. logger = logging.getLogger("killduplicatedjob"
  11.   
  12. #配置可以单独放到一个模块中 
  13. DB_USER = "xxxxxxx" 
  14. DB_PASSWORD = "xxxxxxxx" 
  15. DB_PORT = 111111 
  16. DB_HOST_PORT = "xxxxxxxxxx" 
  17. DB_DATA_BASE = "xxxxxxxxxxx" 
  18.   
  19. REST_API_URL = "http://sample.com" 
  20.   
  21. engine = create_engine("mysql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST_PORT, DB_PORT, DB_DATA_BASE)) 
  22.   
  23. # 这个 class 是为了在函数间传递时,不需要使用方了解属性的具体顺序而写的,也可以放到一个单独的模块中 
  24. class DuplicatedJobs(object): 
  25.  def __init__(self, app_id, app_name, user): 
  26.  self.app_id = app_id 
  27.  self.app_name = app_name 
  28.  self.user = user 
  29.   
  30. def __repr__(self): 
  31.  return '[appid:%s, app_name:%s, user:%s]' % (self.app_id, self.app_name, self.user
  32.   
  33. def find_duplicated_jobs(): 
  34.  logger.info("starting find duplicated jobs"
  35.  (running_apps, app_name_to_user) = get_all_running_jobs() 
  36.  all_apps_on_yarn = get_apps_from_yarn_with_queue(get_resource_queue()) 
  37.   
  38. duplicated_jobs = [] 
  39.  for app in all_apps_on_yarn: 
  40.  (app_id, app_name) = app 
  41.   
  42. if app_id not in running_apps: 
  43.  if not app_name.startswith("test"): 
  44.  logger.info("find a duplicated job, prefixed_name[%s] with appid[%s]" % (app_name, app_id)) 
  45.  user = app_name_to_user[app_name] 
  46.  duplicated_jobs.append(DuplicatedJobs(app_id, app_name, user)) 
  47.  else
  48.  logger.info("Job[%s] is a test job, would not kill it" % app_name) 
  49.   
  50. logger.info("Find duplicated jobs [%s]" % duplicated_jobs) 
  51.   
  52. return duplicated_jobs 
  53.   
  54. def get_apps_from_yarn_with_queue(queue): 
  55.  param = {"queue": queue} 
  56.  r = requests.get(REST_API_URL, params=param) 
  57.  apps_on_yarn = [] 
  58.  try: 
  59.  jobs = r.json().get("apps"
  60.  app_list = jobs.get("app", []) 
  61.  for app in app_list: 
  62.  app_id = app.get("id"
  63.  name = app.get("name"
  64.  apps_on_yarn.append((app_id, name)) 
  65.   
  66. except Exception as e: #Exception 最好进行单独的分开,针对每一种 Exception 进行不同的处理 
  67.  logger.error("Get apps from Yarn Error, message[%s]" % e.message) 
  68.   
  69. logger.info("Fetch all apps from Yarn [%s]" % apps_on_yarn) 
  70.   
  71. return apps_on_yarn 
  72.   
  73. def get_all_running_jobs(): 
  74.  job_infos = get_result_from_mysql("select * from xxxx where xx=yy"
  75.   
  76. app_ids = [] 
  77.  app_name_to_user = {} 
  78.  for (topology_id, topology_name) in job_infos: 
  79.  status_set = get_result_from_mysql("select * from xxxx where xx=yy"
  80.  application_id = status_set[0][0] 
  81.  if "" != application_id: 
  82.  configed_resource_queue = get_result_from_mysql( 
  83.  "select * from xxxx where xx=yy"
  84.  app_ids.append(application_id) 
  85.  app_name_to_user[topology_name] = configed_resource_queue[0][0].split(".")[1] 
  86.   
  87. logger.info("All running jobs appids[%s] topology_name2user[%s]" % (app_ids, app_name_to_user)) 
  88.  return app_ids, app_name_to_user 
  89.   
  90. def kill_duplicated_jobs(duplicated_jobs): 
  91.  for job in duplicated_jobs: 
  92.  app_id = job.app_id 
  93.  app_name = job.app_name 
  94.  user = job.user 
  95.  logger.info("try to kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user)) 
  96.  try: 
  97.  Client.kill_job(app_id, user
  98.  logger.info("Job[%s] with appid[%s] for user[%s] has been killed" % (app_name, app_id, user)) 
  99.  except Exception as e: 
  100.  logger.error("Can't kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user)) 
  101.   
  102. def get_result_from_mysql(sql): 
  103.  a = engine.execute(sql) 
  104.  return a.fetchall() 
  105.   
  106. # 因为下面的资源可能发生变化,而且可能包含一些具体的逻辑,因此单独抽取出来,独立成一个函数 
  107. def get_resource_queue(): 
  108.  return "xxxxxxxxxxxxx" 
  109.   
  110. if __name__ == "__main__"
  111.  kill_duplicated_jobs(find_duplicated_jobs()) 
  112.   

其中 logger 配置文件如下(对于 Python 的 logger,官方文档写的非常好,建议读一次,并且实践一次)


  1. [loggers] 
  2. keys=root, simpleLogger 
  3.   
  4. [handlers] 
  5. keys=consoleHandler, logger_handler 
  6.   
  7. [formatters] 
  8. keys=formatter 
  9.   
  10. [logger_root] 
  11. level=WARN 
  12. handlers=consoleHandler 
  13.   
  14. [logger_simpleLogger] 
  15. level=INFO 
  16. handlers=logger_handler 
  17. propagate=0 
  18. qualname=killduplicatedjob 
  19.   
  20. [handler_consoleHandler] 
  21. class=StreamHandler 
  22. level=WARN 
  23. formatter=formatter 
  24. args=(sys.stdout,) 
  25.   
  26. [handler_logger_handler] 
  27. class=logging.handlers.RotatingFileHandler 
  28. level=INFO 
  29. formatter=formatter 
  30. args=("kill_duplicated_streaming.log""a", 52428800, 3,) 
  31.   
  32. [formatter_formatter] 
  33. format=%(asctime)s %(name)-12s %(levelname)-5s %(message)s 


本文作者:佚名

来源:51CTO

上一篇:打脸NSA:法国前情报官员证实NSA曾入侵法国总统府网络


下一篇:python循环语句