我正在尝试构建一个函数,我可以将其用作我正在映射的RxPy流的处理程序.我所需要的函数需要访问定义该变量的范围之外的变量,对我而言,这意味着我需要使用某种闭包.所以我到达functools.partial来关闭一个变量并返回一个部分函数,我可以作为观察者传递给我的流.
但是,这样做会导致以下结果:
Traceback (most recent call last):
File "retry/example.py", line 46, in <module>
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
selector = adapt_call(selector)
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
argnames, varargs, kwargs = getargspec(func)[:3]
File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function
以下是一些重现问题的示例代码:
from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import OffsetType
import logging
import requests
import functools
logger = logging.basicConfig()
def puts(thing):
print thing
def message_stream(consumer):
def thing(observer):
for message in consumer:
observer.on_next(message)
return Observable.create(thing)
def message_handler(message, context=None):
def req():
return requests.get('http://httpbin.org/get')
return Observable.start(req)
def handle_response(message, response, context=None):
consumer = context['consumer']
producer = context['producer']
t = 'even' if message % 2 == 0 else 'odd'
return str(message) + ': ' + str(response) + ' - ' + t + ' | ' + str(consumer) + ' | ' + producer
consumer = ['pretend', 'these', 'are', 'kafka', 'messages']
producer = 'some producer'
context = {
'consumer': consumer,
'producer': producer
}
message_stream = message_stream(consumer)
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
message_response_stream = message_stream.zip(response_stream, functools.partial(handle_response, context=context))
message_stream.subscribe(puts)
问题似乎是我的部分函数在调用inspect.isfunction时返回False.
如何使我的部分功能通过此检查?有没有办法轻松将部分功能转换为“真正的”功能类型?
解决方法:
你问它是否真的是一个函数,并告诉你它不是一个函数.这是一个方法包装器.
你想要duck-type.
>>> def printargs(*args):
... print args
>>> import inspect
>>> from functools import partial
>>> inspect.isfunction(printargs)
True
>>> f = partial(printargs, 1)
>>> inspect.isfunction(f)
False
# try duck-typing, see if the variable is callable
# check does it work for a method-wrapper?
>>> hasattr(f, "__call__")
True
# check an integer, which should be false
>>> hasattr(1, "__call__")
False
# ensure it works on an actual function
>>> hasattr(printargs, "__call__")
True
这就是你鸭子式的原因.你不在乎它是否是一个功能.你关心它是否像一个函数.
编辑:
如果足够绝望,你可以编写一个类并传递对类中函数的引用.
class A():
def __init__(self, frozen, *args, **kwds):
self.frozen = frozen
self.args = args
self.kwds = kwds
def call(self):
self.frozen(*self.args, **self.kwds)
然后使用A(f).call作为你的包装器.
>>> f_ = A(f)
>>> inspect.ismethod(f_.call)
True
>>> f_.call()
(1,)
只要ismethod工作,这就有效.
如果没有,你真的需要一个装饰.
最终编辑:
如果你真的非常绝望并且不想编写自定义装饰器,你可以使用带有元组的lambda函数来传递以制作类似部分的函数.
例:
>>> import inspect
>>> def printargs(*args):
... print args
>>> a = (1,2,3)
>>> f = lambda x: printargs(*x)
>>> f(a)
(1, 2, 3)
>>> inspect.isfunction(f)
True