@bergus
2015-12-18T11:13:37.000000Z
字数 17610
阅读 2014
pipetools
代码分析
heyhapqqnqepzquj
最近研究了一下pipetools的代码,附上作者的github:https://github.com/0101/pipetools
pipetools是一个python类库,他能够让你通过管道符链式的方式给函数传递参数,比如:
f = pipe | list | foreach("{0}=={0}")
print f("1112342snfjfbjeryehbsa")
#再简单一点的方式是:
print (pipe | list | foreach("{0}=={0}"))("1112342snfjfbjeryehbsa")
#又或者是:
print "1112342snfjfbjeryehbsa" > pipe | list | foreach("{0}=={0}")
#这种传参数方式比传统的要直观,方便,容易书写
copy一点小东西
在python当中一切皆对象,函数也不例外,也具有属性(可用dir()查询)。作为对象,它还可以赋值给其它对象名,或者作为参数传递。
搬砖地址是:http://www.cnblogs.com/cnshen/p/3684863.html
#lambda函数
func = lambda x,y: x + y
print func(3,4)
#lambda生成一个函数对象。该函数参数为x,y,返回值为x+y。函数对象赋给func。func的调用与正常函数无异。
#以上定义可以写成以下形式:
def func(x, y):
return x + y
#函数作为参数传递
#函数可以作为一个对象,进行参数传递。函数名(比如func)即该对象。比如说:
def test(f, a, b):
print 'test'
print f(a, b)
test(func, 3, 5)
#test函数的第一个参数f就是一个函数对象。将func传递给f,test中的f()就拥有了func()的功能。
#我们因此可以提高程序的灵活性。可以使用上面的test函数,带入不同的函数参数。比如:
test((lambda x,y: x**2 + y), 6, 9)
#函数作为返回结果
#函数是一个对象,所以可以作为某个函数的返回结果。
def line_conf():
def line(x):
return 2*x+1
return line # return a function object
my_line = line_conf()
print(my_line(5))
#其实,这也就是传说中的闭包
#闭包与并行运算
'''
闭包有效的减少了函数所需定义的参数数目。这对于并行运算来说有重要的意义。在并行运算的环境下,我们可以让每台电脑负责一个函数,然后将一台电脑的输出和下一台电脑的输入串联起来。最终,我们像流水线一样工作,从串联的电脑集群一端输入数据,从另一端输出数据。这样的情境最适合只有一个参数输入的函数。闭包就可以实现这一目的。
并行运算正称为一个热点。这也是函数式编程又热起来的一个重要原因。函数式编程早在1950年代就已经存在,但应用并不广泛。然而,我们上面描述的流水线式的工作并行集群过程,正适合函数式编程。由于函数式编程这一天然优势,越来越多的语言也开始加入对函数式编程范式的支持。
'''
#局部函数应用,又称偏函数
#我的简单理解是functools.partial函数通过两次传递参数的方式计算被传递函数对象的结果,比如:
def hello(x,y,z,w):
return 12
print partial(hello)(1,2,3,4) == 12
print partial(hello,1)(2,3,4) == 12
print partial(hello,1,2)(3,4) == 12
print partial(hello1,2,3,4)() == 12
#具体的实现搬个砖:http://blog.csdn.net/largetalk/article/details/6612342
def partial(func, *args, **keywords):
def newfunc(*fargs, **fkeywords):
newkeywords = keywords.copy()
newkeywords.update(fkeywords)
return func(*(args + fargs), **newkeywords)
newfunc.func = func
newfunc.args = args
newfunc.keywords = keywords
return newfunc
在pipetools文件夹中有五个主要的python文件,咱们一个一个的分析起作用
# encoding=utf-8
from itertools import imap, chain
def set_name(name, f):
'''
给f函数对象增加一个名字为__pipetools__name__值为name的属性
:param name:
:param f:
:return:
>>> set_name("hello",lambda x:x).__pipetools__name__
'hello'
'''
try:
f.__pipetools__name__ = name
except (AttributeError, UnicodeEncodeError):
pass
return f
def get_name(f):
'''
返回f函数对象的标识名
返回值是字符串
:param f:
:return:
'''
from pipetools.main import Pipe
pipetools_name = getattr(f, '__pipetools__name__', None)
if pipetools_name: #如果__pipetools__name__属性存在,那么f是函数对象或者函数
return pipetools_name() if callable(pipetools_name) else pipetools_name
if isinstance(f, Pipe):
return repr(f)
return f.__name__ if hasattr(f, '__name__') else repr(f)
def repr_args(*args, **kwargs):
'''
返回列表和字典参数的字符串形式
:param args:
:param kwargs:
:return:
>>> repr_args(2, 3, 21, 4, 5, name=2234, ii='ffff')
"2, 3, 21, 4, 5, ii='ffff', name=2234"
'''
return ', '.join(chain( #chain连接多个迭代器为一个
imap('{0!r}'.format, args),
imap('{0[0]}={0[1]!r}'.format, kwargs.iteritems())))
# encoding=utf-8
from functools import partial
from pipetools.main import XObject, StringFormatter
class NoBuilder(ValueError):
pass
def DSBuilder(definition):
'''
definition是一个可转换迭代器
可转换迭代器是让一个序列或者字典经过其他函数处理后可迭代
:param definition:
:return:
'''
builder = select_builder(definition)
if builder:
return builder(definition)
raise NoBuilder("Don't know how to build %s" % type(definition))
def SequenceBuilder(cls, definition):
'''
把definition里面的可求值元素转换成其对应的函数
可求值元素是能够让里面的元素经过转换,变成一个能够传递参数的函数
而ds_item返回的是一个值,故而整体返回值是一个lambda包含着的cls类型的序列
:param cls:
:param definition:
:return:
'''
return lambda x: cls(ds_item(d, x) for d in definition)
def DictBuilder(definition):
return lambda x: dict(
(ds_item(key_def, x), ds_item(val_def, x))
for key_def, val_def in definition.iteritems())
'''
定义了三种构造器,列表,元组,字典构造器
定义构造器的目的是让列表,元组,字典里面的特殊元素转化成正常的元素
根据前面partial的介绍简单理解一下下面字典的作用,SequenceBuilder是一个有两个参数的函数,partial(SequenceBuilder, tuple)(data)得到计算结果,相当于tuple(data),也就是说partial(SequenceBuilder, tuple)是对tuple的封装,作用和tuple是一样的,封装的目的是处理data中复杂的数据结构
'''
builders = {
tuple: partial(SequenceBuilder, tuple),
list: partial(SequenceBuilder, list),
dict: DictBuilder,
}
def select_builder(definition):
'''
根据definition的类型,返回其对应的构造转换器,也就是上面builders中的value
:param definition:
:return:
'''
for cls, builder in builders.iteritems():
if isinstance(definition, cls):
return builder
def ds_item(definition, data):
'''
根据definition的类型转换成其对应的函数,然后根据data参数值返回处理后的结果
简而言之,该函数的结果是值,不是函数对象
:param definition:
:param data:
:return:
'''
if isinstance(definition, XObject):
return (~definition)(data)
if isinstance(definition, basestring):
return StringFormatter(definition)(data)
if callable(definition):
return definition(data)
try:
return DSBuilder(definition)(data)
except NoBuilder:
# static item
return definition
# encoding=utf-8
import re
from functools import partial, wraps
from pipetools.debug import repr_args, set_name, get_name
from pipetools.ds_builder import DSBuilder, NoBuilder
from pipetools.main import pipe, XObject, StringFormatter, xpartial
def pipe_util(func):
"""
装饰器装饰函数func之后,然后让其返回一个函数对象pipe_util_wrapper
pipe_util_wrapper函数的首参数为函数,返回值是一个Pipe对象
被装饰者的首参数就是pipe_util_wrapper的首参数
"""
@wraps(func)
def pipe_util_wrapper(function, *args, **kwargs):
if isinstance(function, XObject):
function = ~function
original_function = function
if args or kwargs:
function = xpartial(function, *args, **kwargs)
name = lambda: '%s(%s)' % (get_name(func), ', '.join(
filter(None, (get_name(original_function), repr_args(*args, **kwargs)))))
f = func(function) #被装饰者的返回值
result = pipe | set_name(name, f) #把被装饰者的返回值封装成一个Pipe对象
# if the util defines an 'attrs' mapping, copy it as attributes
# to the result
attrs = getattr(f, 'attrs', {})
for k, v in attrs.iteritems():
setattr(result, k, v)
# result是一个拥有f函数的Pipe对象
return result
return pipe_util_wrapper
def auto_string_formatter(func):
"""
把含有格式化的字符串转化成该字符串的格式化函数形式
"""
@wraps(func)
def auto_string_formatter_wrapper(function, *args, **kwargs):
if isinstance(function, basestring):
function = StringFormatter(function) #StringFormatter的返回值类似于"{0}".format
return func(function, *args, **kwargs)
return auto_string_formatter_wrapper
def data_structure_builder(func):
"""
装饰器处理元素中包含特殊数据结构的元组,列表,字典等
"""
@wraps(func)
def ds_builder_wrapper(function, *args, **kwargs):
try:
function = DSBuilder(function)
except NoBuilder:
pass
return func(function, *args, **kwargs)
return ds_builder_wrapper
def regex_condition(func):
"""
If a condition is given as string instead of a function, it is turned
into a regex-matching function.
"""
@wraps(func)
def regex_condition_wrapper(condition, *args, **kwargs):
if isinstance(condition, basestring):
condition = partial(re.match, condition)
return func(condition, *args, **kwargs)
return regex_condition_wrapper
from functools import partial
from itertools import imap, ifilter, islice, takewhile, dropwhile
import operator
from pipetools.debug import set_name, repr_args, get_name
from pipetools.decorators import data_structure_builder, regex_condition
from pipetools.decorators import pipe_util, auto_string_formatter
from pipetools.main import pipe, X, _iterable
KEY, VALUE = X[0], X[1]
@pipe_util
@auto_string_formatter
@data_structure_builder
def foreach(function):
"""
Returns a function that takes an iterable and returns an iterator over the
results of calling `function` on each item of the iterable.
>>> xrange(5) > foreach(factorial) | list
[1, 1, 2, 6, 24]
>>> "1234567assfsd" > foreach("{0}") | list
[u'1', u'2', u'3', u'4', u'5', u'6', u'7', u'a', u's', u's', u'f', u's', u'd']
>>> "123455dHHJBHJgg" > foreach(X.lower()) | list
['1', '2', '3', '4', '5', '5', 'd', 'h', 'h', 'j', 'b', 'h', 'j', 'g', 'g']
>>> "12BHJgg" > foreach(("{0}","{0}==")) | list
[(u'1', u'1=='), (u'2', u'2=='), (u'B', u'B=='), (u'H', u'H=='), (u'J', u'J=='), (u'g', u'g=='), (u'g', u'g==')]
>>> "12BHJgg" > foreach([X,X.upper()]) | list
[['1', '1'], ['2', '2'], ['B', 'B'], ['H', 'H'], ['J', 'J'], ['g', 'G'], ['g', 'G']]
>>> "12BHJgg" > foreach({X:X.upper()}) | list
[{'1': '1'}, {'2': '2'}, {'B': 'B'}, {'H': 'H'}, {'J': 'J'}, {'g': 'G'}, {'g': 'G'}]
"""
return partial(imap, function)
@pipe_util
def foreach_do(function):
"""
foreach除了返回一个迭代器什么都不会执行
foreach_do马上就会被执行,如果你只是想马上看到结果,不考虑性能等副作用的话,它很适合你
>>> xrange(5) > foreach_do(factorial) | foreach_do(X+3)
[4, 4, 5, 9, 27]
>>> "12332dffrrr" > foreach_do(X.upper())
['1', '2', '3', '3', '2', 'D', 'F', 'F', 'R', 'R', 'R']
"""
return lambda iterable: [function(item) for item in iterable]
@pipe_util
@regex_condition
def where(condition):
"""
Pipe-able lazy filter.
>>> odd_range = xrange | where(X % 2) | list
>>> odd_range(10)
[1, 3, 5, 7, 9]
"""
return partial(ifilter, condition)
@pipe_util
@regex_condition
def where_not(condition):
"""
Inverted :func:`where`.
"""
return partial(ifilter, pipe | condition | operator.not_)
@pipe_util
@data_structure_builder
def sort_by(function):
"""
Sorts an incoming sequence by using the given `function` as key.
>>> xrange(10) > sort_by(-X)
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Supports automatic data-structure creation::
users > sort_by([X.last_name, X.first_name])
There is also a shortcut for ``sort_by(X)`` called ``sort``:
>>> [4, 5, 8, -3, 0] > sort
[-3, 0, 4, 5, 8]
And (as of ``0.2.3``) a shortcut for reversing the sort:
>>> 'asdfaSfa' > sort_by(X.lower()).descending
['s', 'S', 'f', 'f', 'd', 'a', 'a', 'a']
"""
f = partial(sorted, key=function)
f.attrs = {'descending': _descending_sort_by(function)}
return f
@pipe_util
def _descending_sort_by(function):
return partial(sorted, key=function, reverse=True)
sort = sort_by(X)
@pipe_util
@auto_string_formatter
@data_structure_builder
def debug_print(function):
"""
Prints function applied on input and returns the input.
::
foo = (pipe
| something
| debug_print(X.get_status())
| something_else
| foreach(debug_print("attr is: {0.attr}"))
| etc)
"""
def debug(thing):
print function(thing)
return thing
return debug
@pipe_util
def as_args(function):
"""
Applies the sequence in the input as positional arguments to `function`.
::
some_lists > as_args(izip)
"""
return lambda x: function(*x)
@pipe_util
def as_kwargs(function):
"""
Applies the dictionary in the input as keyword arguments to `function`.
"""
return lambda x: function(**x)
def take_first(count):
"""
Assumes an iterable on the input, returns an iterable with first `count`
items from the input (or possibly less, if there isn't that many).
>>> xrange(9000) > where(X % 100 == 0) | take_first(5) | tuple
(0, 100, 200, 300, 400)
"""
def _take_first(iterable):
return islice(iterable, count)
return pipe | set_name('take_first(%s)' % count, _take_first)
def drop_first(count):
"""
Assumes an iterable on the input, returns an iterable with identical items
except for the first `count`.
>>> xrange(10) > drop_first(5) | tuple
(5, 6, 7, 8, 9)
"""
def _drop_first(iterable):
g = (x for x in xrange(1, count + 1))
return dropwhile(lambda i: unless(StopIteration, g.next)(), iterable)
return pipe | set_name('drop_first(%s)' % count, _drop_first)
def unless(exception_class_or_tuple, func, *args, **kwargs):
"""
When `exception_class_or_tuple` occurs while executing `func`, it will
be caught and ``None`` will be returned.
>>> f = where(X > 10) | list | unless(IndexError, X[0])
>>> f([5, 8, 12, 4])
12
>>> f([1, 2, 3])
None
"""
@pipe_util
@auto_string_formatter
@data_structure_builder
def construct_unless(function):
# a wrapper so we can re-use the decorators
def _unless(*args, **kwargs):
try:
return function(*args, **kwargs)
except exception_class_or_tuple:
pass
return _unless
name = lambda: 'unless(%s, %s)' % (exception_class_or_tuple, ', '.join(
filter(None, (get_name(func), repr_args(*args, **kwargs)))))
return set_name(name, construct_unless(func, *args, **kwargs))
@pipe_util
@regex_condition
def select_first(condition):
"""
Returns first item from input sequence that satisfies `condition`. Or
``None`` if none does.
>>> ['py', 'pie', 'pi'] > select_first(X.startswith('pi'))
'pie'
As of ``0.2.1`` you can also
:ref:`directly use regular expressions <auto-regex>` and write the above
as:
>>> ['py', 'pie', 'pi'] > select_first('^pi')
'pie'
There is also a shortcut for ``select_first(X)`` called ``first_of``:
>>> first_of(['', None, 0, 3, 'something'])
3
>>> first_of([])
None
"""
return where(condition) | unless(StopIteration, X.next())
first_of = select_first(X)
@pipe_util
@auto_string_formatter
@data_structure_builder
def group_by(function):
"""
Groups input sequence by `function`.
Returns an iterator over a sequence of tuples where the first item is a
result of `function` and the second one a list of items matching this
result.
Ordering of the resulting iterator is undefined, but ordering of the items
in the groups is preserved.
>>> [1, 2, 3, 4, 5, 6] > group_by(X % 2) | list
[(0, [2, 4, 6]), (1, [1, 3, 5])]
"""
def _group_by(seq):
result = {}
for item in seq:
result.setdefault(function(item), []).append(item)
return result.iteritems()
return _group_by
def _flatten(x):
if not _iterable(x):
yield x
else:
for y in x:
for z in _flatten(y):
yield z
def flatten(*args):
"""
Flattens an arbitrarily deep nested iterable(s).
"""
return _flatten(args)
flatten = pipe | flatten
def count(iterable):
"""
Returns the number of items in `iterable`.
"""
return sum(1 for whatever in iterable)
count = pipe | count
@pipe_util
@regex_condition
def take_until(condition):
"""
>>> [1, 4, 6, 4, 1] > take_until(X > 5) | list
[1, 4]
"""
return partial(takewhile, pipe | condition | operator.not_)
# encoding=utf-8
from collections import Iterable
from functools import partial, wraps
from pipetools.debug import get_name, set_name, repr_args
class Pipe(object):
"""
Pipe-style combinator.
Example::
p = pipe | F | G | H
p(x) == H(G(F(x)))
"""
def __init__(self, func=None):
self.func = func
self.__name__ = 'Pipe'
def __str__(self):
return get_name(self.func)
__repr__ = __str__
@staticmethod
def compose(first, second):
name = lambda: '{0} | {1}'.format(get_name(first), get_name(second))
def composite(*args, **kwargs):
return second(first(*args, **kwargs))
return set_name(name, composite)
@classmethod
def bind(cls, first, second):
return cls(cls.compose(first, second) if all([first, second]) else first or second)
def __or__(self, next_func):
return self.bind(self.func, prepare_function_for_pipe(next_func))
def __ror__(self, prev_func):
return self.bind(prepare_function_for_pipe(prev_func), self.func)
def __lt__(self, thing):
return self.func(thing) if self.func else thing
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
def __get__(self, instance, owner):
return partial(self, instance) if instance else self
pipe = Pipe()
class Maybe(Pipe):
@staticmethod
def compose(first, second):
name = lambda: '{0} ?| {1}'.format(get_name(first), get_name(second))
def composite(*args, **kwargs):
result = first(*args, **kwargs)
return None if result is None else second(result)
return set_name(name, composite)
def __call__(self, *args, **kwargs):
if len(args) == 1 and args[0] is None and not kwargs:
return None
return self.func(*args, **kwargs)
def __lt__(self, thing):
return (
None if thing is None else
self.func(thing) if self.func else
thing)
maybe = Maybe()
def prepare_function_for_pipe(thing):
if isinstance(thing, XObject):
return ~thing
if isinstance(thing, tuple):
return xpartial(*thing)
if isinstance(thing, basestring):
return StringFormatter(thing)
if callable(thing):
return thing
raise ValueError('Cannot pipe %s' % thing)
def StringFormatter(template):
'''
把一个字符串转化为一个带有format格式的函数
content是格式化字符串的参数
:param template:
:return:
'''
f = unicode(template).format
def _format(content):
if isinstance(content, dict):
return f(**content)
if _iterable(content):
return f(*content)
return f(content)
return set_name(lambda: "format('%s')" % template[:20], _format)
def _iterable(obj):
"Iterable but not a string"
return isinstance(obj, Iterable) and not isinstance(obj, basestring)
class XObject(object):
def __init__(self, func=None):
self._func = func
set_name(lambda: get_name(func) if func else 'X', self)
def __repr__(self):
return get_name(self)
def __invert__(self):
return self._func or set_name('X', lambda x: x)
def bind(self, name, func):
set_name(name, func)
return XObject((self._func | func) if self._func else (pipe | func))
def __call__(self, *args, **kwargs):
name = lambda: 'X(%s)' % repr_args(*args, **kwargs)
return self.bind(name, lambda x: x(*args, **kwargs))
def __eq__(self, other):
return self.bind(lambda: 'X == {0!r}'.format(other), lambda x: x == other)
def __getattr__(self, name):
return self.bind(lambda: 'X.{0}'.format(name), lambda x: getattr(x, name))
def __getitem__(self, item):
return self.bind(lambda: 'X[{0!r}]'.format(item), lambda x: x[item])
def __gt__(self, other):
return self.bind(lambda: 'X > {0!r}'.format(other), lambda x: x > other)
def __ge__(self, other):
return self.bind(lambda: 'X >= {0!r}'.format(other), lambda x: x >= other)
def __lt__(self, other):
return self.bind(lambda: 'X < {0!r}'.format(other), lambda x: x < other)
def __le__(self, other):
return self.bind(lambda: 'X <= {0!r}'.format(other), lambda x: x <= other)
def __mod__(self, y):
return self.bind(lambda: 'X % {0!r}'.format(y), lambda x: x % y)
def __ne__(self, other):
return self.bind(lambda: 'X != {0!r}'.format(other), lambda x: x != other)
def __neg__(self):
return self.bind(lambda: '-X', lambda x: -x)
def __mul__(self, other):
return self.bind(lambda: 'X * {0!r}'.format(other), lambda x: x * other)
def __div__(self, other):
return self.bind(lambda: 'X / {0!r}'.format(other), lambda x: x / other)
def __add__(self, other):
return self.bind(lambda: 'X + {0!r}'.format(other), lambda x: x + other)
def __sub__(self, other):
return self.bind(lambda: 'X - {0!r}'.format(other), lambda x: x - other)
def __pow__(self, other):
return self.bind(lambda: 'X ** {0!r}'.format(other), lambda x: x ** other)
def __ror__(self, func):
return pipe | func | self
def __or__(self, func):
if isinstance(func, Pipe):
return func.__ror__(self)
return pipe | self | func
def _in_(self, y):
return self.bind(lambda: 'X._in_({0!r})'.format(y), lambda x: x in y)
X = XObject()
def xpartial(func, *xargs, **xkwargs):
"""
Like :func:`functools.partial`, but can take an :class:`XObject`
placeholder that will be replaced with the first positional argument
when the partially applied function is called.
Useful when the function's positional arguments' order doesn't fit your
situation, e.g.:
>>> reverse_range = xpartial(range, X, 0, -1)
>>> reverse_range(5)
[5, 4, 3, 2, 1]
It can also be used to transform the positional argument to a keyword
argument, which can come in handy inside a *pipe*::
xpartial(objects.get, id=X)
Also the XObjects are evaluated, which can be used for some sort of
destructuring of the argument::
xpartial(somefunc, name=X.name, number=X.contacts['number'])
Lastly, unlike :func:`functools.partial`, this creates a regular function
which will bind to classes (like the ``curry`` function from
``django.utils.functional``).
"""
any_x = any(isinstance(a, XObject)
for a in xargs + tuple(xkwargs.values()))
use = lambda x, value: (~x)(value) if isinstance(x, XObject) else x
@wraps(func)
def xpartially_applied(*func_args, **func_kwargs):
if any_x:
if not func_args:
raise ValueError('Function "%s" partially applied with an '
'X placeholder but called with no positional arguments.'
% get_name(func))
first = func_args[0]
rest = func_args[1:]
args = tuple(use(x, first) for x in xargs) + rest
kwargs = dict((k, use(x, first)) for k, x in xkwargs.iteritems())
kwargs.update(func_kwargs)
else:
args = xargs + func_args
kwargs = dict(xkwargs, **func_kwargs)
return func(*args, **kwargs)
name = lambda: '%s(%s)' % (get_name(func), repr_args(*xargs, **xkwargs))
return set_name(name, xpartially_applied)
if __name__ == '__main__':
f = 'ggggcdwecfwefhweejhjwq' > pipe | list | ','.join
print f