from wsgiref.simple_server import make_server from webob import Response, Request, exc, dec import re, traceback, logging class Dict2Obj(object): def __init__(self, d: dict): # self.__dict__={'_dict':d} # 覆盖实例创建时的字典,依然会调用__setattr__ if not isinstance(d, (dict,)): self.__dict__['_dict'] = {} else: self.__dict__['_dict'] = d def __getattr__(self, item): try: return self._dict[item] except KeyError: raise AttributeError('Attribute {} not found!'.format(item)) def __setattr__(self, key, value): raise NotImplementedError class Context(dict): def __getattr__(self, item): try: return self[item] except KeyError: raise AttributeError('Attribute {} not found!'.format(item)) def __setattr__(self, key, value): self[key] = value class NestedContext(Context): # def __init__(self,globalcontext:Context=None): # super().__init__() # self.relate(globalcontext) def relate(self, globalcontext: Context = None): self.globalcontext = globalcontext def __getattr__(self, item): if item in self.keys(): return self[item] return getattr(self.globalcontext, item) class Router: mold = re.compile('/({[^{}:]+:?[^{}:]*})') TYPEPATTERNS = { 'str': r'[^/]+', # exclude / in url 'word': r'\w+', 'int': r'[+-]?\d+\b', # illegal escape sequence \ will stay the same 'float': r'[+-]?\d+\.\d+\b', 'any': r'.+' } TYPECAST = { 'str': str, 'word': str, 'int': int, 'float': float, 'any': str } def transform(self, tmp: str): designation, _, genre = tmp.strip('/{}').partition(':') return '/(?P<{}>{})'.format(designation, self.TYPEPATTERNS.get(genre, r'\w+')), designation, self.TYPECAST.get( genre, str) def parse(self, src: str): start = 0 result = '' translator = {} while True: matcher = self.mold.search(src, start) if matcher: result += matcher.string[start:matcher.start(0)] tmp = self.transform(matcher.string[matcher.start(0):matcher.end(0)]) # 匹配整体调用transform result += tmp[0] translator.setdefault(tmp[1], tmp[2]) start = matcher.end(0) print('result =', result) print('tmp =', tmp) print('matcher.pos =', matcher.pos) print('matcher.endpos =', matcher.endpos) else: break if result: return result, translator else: return src, translator def __init__(self, prefix: str = ''): self._prefix = prefix.rstrip('/\\') # such as /product self._routetable = [] # triple tuple # interceptors self.pre_interceptors = [] self.post_interceptors = [] # context self.ctx = NestedContext({'Router': 'I\'m router NestedContext'}) # 未关联全局,注册时注入 # self.ctx.router=self # 在向Application类注册时指定了 def register_pre_interceptor(self, fn): self.pre_interceptors.append(fn) return fn def register_post_interceptor(self, fn): self.post_interceptors.append(fn) return fn def route(self, rule, *methods): def wrapper(handler): pattern, translator = self.parse(rule) print('parse pattern translator', pattern, translator) for r_methods, r_pattern, r_translator, r_handler in self._routetable: print(r_methods, r_pattern, r_translator, r_handler) if handler == r_handler and re.compile(pattern) == r_pattern: translator.update(r_translator) self._routetable.append((r_methods + methods, r_pattern, translator, r_handler)) self._routetable.remove((r_methods, r_pattern, r_translator, r_handler)) # 移除原item break else: print('else') self._routetable.append((methods, re.compile(pattern), translator, handler)) print(self._routetable) return handler return wrapper @property def prefix(self): return self._prefix def get(self, pattern): return self.route(pattern, 'GET') def post(self, pattern): return self.route(pattern, 'POST') def head(self, pattern): return self.route(pattern, 'HEAD') def match(self, request: Request) -> Response: print('request.method = {}, request.path = {}, self.prefix = {!r}'.format(request.method, request.path, self.prefix)) # 判断prefix if self._prefix == '': if request.path == '/': pass else: return None else: if request.path.startswith(self._prefix): # /uranus and /uranuss is not the same prefix if request.path.startswith(self._prefix + '/') or request.path == self.prefix: # /uranus & /uranus/xxx situation pass else: # /uranuss situation return None else: return None # 依次执行拦截请求 for fn in self.pre_interceptors: logging.critical('in router interceptor') request = fn(self.ctx, request) for methods, pattern, translator, handler in self._routetable: print('prefix {} is right, methods = {}, pattern = {}, translator = {} handler = {}'.format(self._prefix, methods, pattern.pattern, translator, handler)) matcher = pattern.match(request.path.replace(self._prefix, '', 1)) # request.path.replace(self.prefix,'',1) == '' is /uranus circumstance if matcher or request.path.replace(self.prefix, '', 1) == '': print('request.path = {} matched {}'.format(request.path, pattern, pattern)) if not methods or request.method.upper() in methods: if request.path != self.prefix: # /uranus circumstance matcher is None request.args = matcher.groups() _dict = {} logging.error(matcher.groupdict()) for k, v in matcher.groupdict().items(): # 对/{name:str}/{id:int}'中name&id匹配的内容进行typecast _dict.setdefault(k, translator.get(k)(v)) request.kwargs = Dict2Obj(matcher.groupdict()) request.vars = Dict2Obj(_dict) logging.error(request.args) logging.warning(request.kwargs.__dict__) else: request.args = () request.kwargs = Dict2Obj({}) request.vars = Dict2Obj({}) logging.error('matcher is None,set default value in case of error') response = handler(self.ctx, request) # 执行response拦截请求 for fn in self.post_interceptors: response = fn(self.ctx, request, response) return response else: print('{} is illegal'.format(request.method)) raise exc.HTTPBadRequest('request {} is illegal'.format(request.method)) else: print('request.path {} mismatched {}'.format(request.path.replace(self.prefix, '', 1), pattern.pattern)) continue else: raise exc.HTTPNotFound('prefix {} is right, request.path {} not found'.format(self._prefix, request.path)) class Application: ctx = Context({'application': 'I\'m application Context'}) # 全局上下文对象 @classmethod def extend(cls,name,ext): cls.ctx[name]=ext def __init__(self, **kwargs): # 创建上下文对象,共享信息 self.ctx.app = self for k, v in kwargs.items(): self.ctx[k] = v # interceptors PRE_INTERCEPTORS = [] POST_INTERCEPTORS = [] # interceptors 注册函数 @classmethod def register_pre_interceptors(cls, fn): cls.PRE_INTERCEPTORS.append(fn) return fn @classmethod def register_post_interceptors(cls, fn): cls.POST_INTERCEPTORS.append(fn) return fn ROUTERS = [] # 注册的Router对象 @classmethod def register(cls, router: Router): # 为Router实例注入全局上下文 router.ctx.relate(cls.ctx) router.ctx.router = router # 为router的NestedContext对象之属性设置属性指向self cls.ROUTERS.append(router) @dec.wsgify def __call__(self, request: Request) -> Response: # 全局拦截 for fn in self.PRE_INTERCEPTORS: logging.critical('in Application interceptors') request = fn(self.ctx, request) for router in self.ROUTERS: response = router.match(request) print('class Application response =', response) # 全局拦截response for fn in self.POST_INTERCEPTORS: response = fn(self.ctx, request, response) if response: return response else: raise exc.HTTPNotFound('prefix not found, request.path = {}'.format(request.path)) idx = Router() # prefix = '' uranus = Router('/uranus') # prefix = /uranus Application.register(idx) Application.register(uranus) @idx.get('^/$') @idx.post('^/$') def index(ctx: NestedContext, request: Request) -> Response: # response=handler(self.ctx,request) add ctx parameter response = Response() response.status_code = 303 response.content_type = 'application/json' response.charset = 'utf8' response.body = 'request.path = {} \n{} \n{} \n{} \nctx = {}'.format(request.path, request.args, request.kwargs.__dict__, request.vars.__dict__, ctx).encode() return response # @uranus.route('/(?P<uranus>[a-zA-Z]+)') # @uranus.route(r'/(\w+)') @uranus.route('/{name:str}/{id:int}') def neptune(ctx: NestedContext, request: Request) -> Response: # response=handler(self.ctx,request) add ctx parameter response = Response() response.status_code = 304 response.content_type = 'text/xml' response.charset = 'ascii' response.body = 'request.path = {} \n{} \n{} \n{} \nctx = {}'.format(request.path, request.args, request.kwargs.__dict__, request.vars.__dict__, ctx).encode() return response # interceptor Examples @Application.register_pre_interceptors def show_headers(ctx: Context, request: Request) -> Request: print('Application interceptor request.path = {}'.format(request.path)) print('Application interceptor request.user_agent = {}'.format(request.user_agent)) return request @uranus.register_pre_interceptor def show_prefix(ctx: NestedContext, request: Request) -> Request: # 传递每个router实例的ctx print('router = {}, prefix = {}'.format(ctx.router, ctx.router.prefix)) return request # print(idx._routetable) # print(uranus._routetable) if __name__ == '__main__': server = make_server('', 9999, Application()) try: server.serve_forever() except: traceback.print_exc(limit=None) finally: server.shutdown() server.server_close()