@bergus
2015-12-18T03:13:37.000000Z
字数 17610
阅读 2311
pipetools 代码分析
heyhapqqnqepzquj
最近研究了一下pipetools的代码,附上作者的github:https://github.com/0101/pipetools
pipetools是一个python类库,他能够让你通过管道符链式的方式给函数传递参数,比如:
f = pipe | list | foreach("{0}=={0}")print f("1112342snfjfbjeryehbsa")#再简单一点的方式是:print (pipe | list | foreach("{0}=={0}"))("1112342snfjfbjeryehbsa")#又或者是:print "1112342snfjfbjeryehbsa" > pipe | list | foreach("{0}=={0}")#这种传参数方式比传统的要直观,方便,容易书写
copy一点小东西
在python当中一切皆对象,函数也不例外,也具有属性(可用dir()查询)。作为对象,它还可以赋值给其它对象名,或者作为参数传递。
搬砖地址是:http://www.cnblogs.com/cnshen/p/3684863.html
#lambda函数func = lambda x,y: x + yprint func(3,4)#lambda生成一个函数对象。该函数参数为x,y,返回值为x+y。函数对象赋给func。func的调用与正常函数无异。#以上定义可以写成以下形式:def func(x, y):return x + y#函数作为参数传递#函数可以作为一个对象,进行参数传递。函数名(比如func)即该对象。比如说:def test(f, a, b):print 'test'print f(a, b)test(func, 3, 5)#test函数的第一个参数f就是一个函数对象。将func传递给f,test中的f()就拥有了func()的功能。#我们因此可以提高程序的灵活性。可以使用上面的test函数,带入不同的函数参数。比如:test((lambda x,y: x**2 + y), 6, 9)#函数作为返回结果#函数是一个对象,所以可以作为某个函数的返回结果。def line_conf():def line(x):return 2*x+1return line # return a function objectmy_line = line_conf()print(my_line(5))#其实,这也就是传说中的闭包#闭包与并行运算'''闭包有效的减少了函数所需定义的参数数目。这对于并行运算来说有重要的意义。在并行运算的环境下,我们可以让每台电脑负责一个函数,然后将一台电脑的输出和下一台电脑的输入串联起来。最终,我们像流水线一样工作,从串联的电脑集群一端输入数据,从另一端输出数据。这样的情境最适合只有一个参数输入的函数。闭包就可以实现这一目的。并行运算正称为一个热点。这也是函数式编程又热起来的一个重要原因。函数式编程早在1950年代就已经存在,但应用并不广泛。然而,我们上面描述的流水线式的工作并行集群过程,正适合函数式编程。由于函数式编程这一天然优势,越来越多的语言也开始加入对函数式编程范式的支持。'''
#局部函数应用,又称偏函数#我的简单理解是functools.partial函数通过两次传递参数的方式计算被传递函数对象的结果,比如:def hello(x,y,z,w):return 12print partial(hello)(1,2,3,4) == 12print partial(hello,1)(2,3,4) == 12print partial(hello,1,2)(3,4) == 12print partial(hello1,2,3,4)() == 12#具体的实现搬个砖:http://blog.csdn.net/largetalk/article/details/6612342def partial(func, *args, **keywords):def newfunc(*fargs, **fkeywords):newkeywords = keywords.copy()newkeywords.update(fkeywords)return func(*(args + fargs), **newkeywords)newfunc.func = funcnewfunc.args = argsnewfunc.keywords = keywordsreturn newfunc
在pipetools文件夹中有五个主要的python文件,咱们一个一个的分析起作用
# encoding=utf-8from itertools import imap, chaindef set_name(name, f):'''给f函数对象增加一个名字为__pipetools__name__值为name的属性:param name::param f::return:>>> set_name("hello",lambda x:x).__pipetools__name__'hello''''try:f.__pipetools__name__ = nameexcept (AttributeError, UnicodeEncodeError):passreturn fdef get_name(f):'''返回f函数对象的标识名返回值是字符串:param f::return:'''from pipetools.main import Pipepipetools_name = getattr(f, '__pipetools__name__', None)if pipetools_name: #如果__pipetools__name__属性存在,那么f是函数对象或者函数return pipetools_name() if callable(pipetools_name) else pipetools_nameif isinstance(f, Pipe):return repr(f)return f.__name__ if hasattr(f, '__name__') else repr(f)def repr_args(*args, **kwargs):'''返回列表和字典参数的字符串形式:param args::param kwargs::return:>>> repr_args(2, 3, 21, 4, 5, name=2234, ii='ffff')"2, 3, 21, 4, 5, ii='ffff', name=2234"'''return ', '.join(chain( #chain连接多个迭代器为一个imap('{0!r}'.format, args),imap('{0[0]}={0[1]!r}'.format, kwargs.iteritems())))
# encoding=utf-8from functools import partialfrom pipetools.main import XObject, StringFormatterclass NoBuilder(ValueError):passdef DSBuilder(definition):'''definition是一个可转换迭代器可转换迭代器是让一个序列或者字典经过其他函数处理后可迭代:param definition::return:'''builder = select_builder(definition)if builder:return builder(definition)raise NoBuilder("Don't know how to build %s" % type(definition))def SequenceBuilder(cls, definition):'''把definition里面的可求值元素转换成其对应的函数可求值元素是能够让里面的元素经过转换,变成一个能够传递参数的函数而ds_item返回的是一个值,故而整体返回值是一个lambda包含着的cls类型的序列:param cls::param definition::return:'''return lambda x: cls(ds_item(d, x) for d in definition)def DictBuilder(definition):return lambda x: dict((ds_item(key_def, x), ds_item(val_def, x))for key_def, val_def in definition.iteritems())'''定义了三种构造器,列表,元组,字典构造器定义构造器的目的是让列表,元组,字典里面的特殊元素转化成正常的元素根据前面partial的介绍简单理解一下下面字典的作用,SequenceBuilder是一个有两个参数的函数,partial(SequenceBuilder, tuple)(data)得到计算结果,相当于tuple(data),也就是说partial(SequenceBuilder, tuple)是对tuple的封装,作用和tuple是一样的,封装的目的是处理data中复杂的数据结构'''builders = {tuple: partial(SequenceBuilder, tuple),list: partial(SequenceBuilder, list),dict: DictBuilder,}def select_builder(definition):'''根据definition的类型,返回其对应的构造转换器,也就是上面builders中的value:param definition::return:'''for cls, builder in builders.iteritems():if isinstance(definition, cls):return builderdef ds_item(definition, data):'''根据definition的类型转换成其对应的函数,然后根据data参数值返回处理后的结果简而言之,该函数的结果是值,不是函数对象:param definition::param data::return:'''if isinstance(definition, XObject):return (~definition)(data)if isinstance(definition, basestring):return StringFormatter(definition)(data)if callable(definition):return definition(data)try:return DSBuilder(definition)(data)except NoBuilder:# static itemreturn definition
# encoding=utf-8import refrom functools import partial, wrapsfrom pipetools.debug import repr_args, set_name, get_namefrom pipetools.ds_builder import DSBuilder, NoBuilderfrom pipetools.main import pipe, XObject, StringFormatter, xpartialdef pipe_util(func):"""装饰器装饰函数func之后,然后让其返回一个函数对象pipe_util_wrapperpipe_util_wrapper函数的首参数为函数,返回值是一个Pipe对象被装饰者的首参数就是pipe_util_wrapper的首参数"""@wraps(func)def pipe_util_wrapper(function, *args, **kwargs):if isinstance(function, XObject):function = ~functionoriginal_function = functionif args or kwargs:function = xpartial(function, *args, **kwargs)name = lambda: '%s(%s)' % (get_name(func), ', '.join(filter(None, (get_name(original_function), repr_args(*args, **kwargs)))))f = func(function) #被装饰者的返回值result = pipe | set_name(name, f) #把被装饰者的返回值封装成一个Pipe对象# if the util defines an 'attrs' mapping, copy it as attributes# to the resultattrs = getattr(f, 'attrs', {})for k, v in attrs.iteritems():setattr(result, k, v)# result是一个拥有f函数的Pipe对象return resultreturn pipe_util_wrapperdef auto_string_formatter(func):"""把含有格式化的字符串转化成该字符串的格式化函数形式"""@wraps(func)def auto_string_formatter_wrapper(function, *args, **kwargs):if isinstance(function, basestring):function = StringFormatter(function) #StringFormatter的返回值类似于"{0}".formatreturn func(function, *args, **kwargs)return auto_string_formatter_wrapperdef data_structure_builder(func):"""装饰器处理元素中包含特殊数据结构的元组,列表,字典等"""@wraps(func)def ds_builder_wrapper(function, *args, **kwargs):try:function = DSBuilder(function)except NoBuilder:passreturn func(function, *args, **kwargs)return ds_builder_wrapperdef regex_condition(func):"""If a condition is given as string instead of a function, it is turnedinto a regex-matching function."""@wraps(func)def regex_condition_wrapper(condition, *args, **kwargs):if isinstance(condition, basestring):condition = partial(re.match, condition)return func(condition, *args, **kwargs)return regex_condition_wrapper
from functools import partialfrom itertools import imap, ifilter, islice, takewhile, dropwhileimport operatorfrom pipetools.debug import set_name, repr_args, get_namefrom pipetools.decorators import data_structure_builder, regex_conditionfrom pipetools.decorators import pipe_util, auto_string_formatterfrom pipetools.main import pipe, X, _iterableKEY, VALUE = X[0], X[1]@pipe_util@auto_string_formatter@data_structure_builderdef foreach(function):"""Returns a function that takes an iterable and returns an iterator over theresults of calling `function` on each item of the iterable.>>> xrange(5) > foreach(factorial) | list[1, 1, 2, 6, 24]>>> "1234567assfsd" > foreach("{0}") | list[u'1', u'2', u'3', u'4', u'5', u'6', u'7', u'a', u's', u's', u'f', u's', u'd']>>> "123455dHHJBHJgg" > foreach(X.lower()) | list['1', '2', '3', '4', '5', '5', 'd', 'h', 'h', 'j', 'b', 'h', 'j', 'g', 'g']>>> "12BHJgg" > foreach(("{0}","{0}==")) | list[(u'1', u'1=='), (u'2', u'2=='), (u'B', u'B=='), (u'H', u'H=='), (u'J', u'J=='), (u'g', u'g=='), (u'g', u'g==')]>>> "12BHJgg" > foreach([X,X.upper()]) | list[['1', '1'], ['2', '2'], ['B', 'B'], ['H', 'H'], ['J', 'J'], ['g', 'G'], ['g', 'G']]>>> "12BHJgg" > foreach({X:X.upper()}) | list[{'1': '1'}, {'2': '2'}, {'B': 'B'}, {'H': 'H'}, {'J': 'J'}, {'g': 'G'}, {'g': 'G'}]"""return partial(imap, function)@pipe_utildef foreach_do(function):"""foreach除了返回一个迭代器什么都不会执行foreach_do马上就会被执行,如果你只是想马上看到结果,不考虑性能等副作用的话,它很适合你>>> xrange(5) > foreach_do(factorial) | foreach_do(X+3)[4, 4, 5, 9, 27]>>> "12332dffrrr" > foreach_do(X.upper())['1', '2', '3', '3', '2', 'D', 'F', 'F', 'R', 'R', 'R']"""return lambda iterable: [function(item) for item in iterable]@pipe_util@regex_conditiondef where(condition):"""Pipe-able lazy filter.>>> odd_range = xrange | where(X % 2) | list>>> odd_range(10)[1, 3, 5, 7, 9]"""return partial(ifilter, condition)@pipe_util@regex_conditiondef where_not(condition):"""Inverted :func:`where`."""return partial(ifilter, pipe | condition | operator.not_)@pipe_util@data_structure_builderdef sort_by(function):"""Sorts an incoming sequence by using the given `function` as key.>>> xrange(10) > sort_by(-X)[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Supports automatic data-structure creation::users > sort_by([X.last_name, X.first_name])There is also a shortcut for ``sort_by(X)`` called ``sort``:>>> [4, 5, 8, -3, 0] > sort[-3, 0, 4, 5, 8]And (as of ``0.2.3``) a shortcut for reversing the sort:>>> 'asdfaSfa' > sort_by(X.lower()).descending['s', 'S', 'f', 'f', 'd', 'a', 'a', 'a']"""f = partial(sorted, key=function)f.attrs = {'descending': _descending_sort_by(function)}return f@pipe_utildef _descending_sort_by(function):return partial(sorted, key=function, reverse=True)sort = sort_by(X)@pipe_util@auto_string_formatter@data_structure_builderdef debug_print(function):"""Prints function applied on input and returns the input.::foo = (pipe| something| debug_print(X.get_status())| something_else| foreach(debug_print("attr is: {0.attr}"))| etc)"""def debug(thing):print function(thing)return thingreturn debug@pipe_utildef as_args(function):"""Applies the sequence in the input as positional arguments to `function`.::some_lists > as_args(izip)"""return lambda x: function(*x)@pipe_utildef as_kwargs(function):"""Applies the dictionary in the input as keyword arguments to `function`."""return lambda x: function(**x)def take_first(count):"""Assumes an iterable on the input, returns an iterable with first `count`items from the input (or possibly less, if there isn't that many).>>> xrange(9000) > where(X % 100 == 0) | take_first(5) | tuple(0, 100, 200, 300, 400)"""def _take_first(iterable):return islice(iterable, count)return pipe | set_name('take_first(%s)' % count, _take_first)def drop_first(count):"""Assumes an iterable on the input, returns an iterable with identical itemsexcept for the first `count`.>>> xrange(10) > drop_first(5) | tuple(5, 6, 7, 8, 9)"""def _drop_first(iterable):g = (x for x in xrange(1, count + 1))return dropwhile(lambda i: unless(StopIteration, g.next)(), iterable)return pipe | set_name('drop_first(%s)' % count, _drop_first)def unless(exception_class_or_tuple, func, *args, **kwargs):"""When `exception_class_or_tuple` occurs while executing `func`, it willbe caught and ``None`` will be returned.>>> f = where(X > 10) | list | unless(IndexError, X[0])>>> f([5, 8, 12, 4])12>>> f([1, 2, 3])None"""@pipe_util@auto_string_formatter@data_structure_builderdef construct_unless(function):# a wrapper so we can re-use the decoratorsdef _unless(*args, **kwargs):try:return function(*args, **kwargs)except exception_class_or_tuple:passreturn _unlessname = lambda: 'unless(%s, %s)' % (exception_class_or_tuple, ', '.join(filter(None, (get_name(func), repr_args(*args, **kwargs)))))return set_name(name, construct_unless(func, *args, **kwargs))@pipe_util@regex_conditiondef select_first(condition):"""Returns first item from input sequence that satisfies `condition`. Or``None`` if none does.>>> ['py', 'pie', 'pi'] > select_first(X.startswith('pi'))'pie'As of ``0.2.1`` you can also:ref:`directly use regular expressions <auto-regex>` and write the aboveas:>>> ['py', 'pie', 'pi'] > select_first('^pi')'pie'There is also a shortcut for ``select_first(X)`` called ``first_of``:>>> first_of(['', None, 0, 3, 'something'])3>>> first_of([])None"""return where(condition) | unless(StopIteration, X.next())first_of = select_first(X)@pipe_util@auto_string_formatter@data_structure_builderdef group_by(function):"""Groups input sequence by `function`.Returns an iterator over a sequence of tuples where the first item is aresult of `function` and the second one a list of items matching thisresult.Ordering of the resulting iterator is undefined, but ordering of the itemsin the groups is preserved.>>> [1, 2, 3, 4, 5, 6] > group_by(X % 2) | list[(0, [2, 4, 6]), (1, [1, 3, 5])]"""def _group_by(seq):result = {}for item in seq:result.setdefault(function(item), []).append(item)return result.iteritems()return _group_bydef _flatten(x):if not _iterable(x):yield xelse:for y in x:for z in _flatten(y):yield zdef flatten(*args):"""Flattens an arbitrarily deep nested iterable(s)."""return _flatten(args)flatten = pipe | flattendef count(iterable):"""Returns the number of items in `iterable`."""return sum(1 for whatever in iterable)count = pipe | count@pipe_util@regex_conditiondef take_until(condition):""">>> [1, 4, 6, 4, 1] > take_until(X > 5) | list[1, 4]"""return partial(takewhile, pipe | condition | operator.not_)
# encoding=utf-8from collections import Iterablefrom functools import partial, wrapsfrom pipetools.debug import get_name, set_name, repr_argsclass Pipe(object):"""Pipe-style combinator.Example::p = pipe | F | G | Hp(x) == H(G(F(x)))"""def __init__(self, func=None):self.func = funcself.__name__ = 'Pipe'def __str__(self):return get_name(self.func)__repr__ = __str__@staticmethoddef compose(first, second):name = lambda: '{0} | {1}'.format(get_name(first), get_name(second))def composite(*args, **kwargs):return second(first(*args, **kwargs))return set_name(name, composite)@classmethoddef bind(cls, first, second):return cls(cls.compose(first, second) if all([first, second]) else first or second)def __or__(self, next_func):return self.bind(self.func, prepare_function_for_pipe(next_func))def __ror__(self, prev_func):return self.bind(prepare_function_for_pipe(prev_func), self.func)def __lt__(self, thing):return self.func(thing) if self.func else thingdef __call__(self, *args, **kwargs):return self.func(*args, **kwargs)def __get__(self, instance, owner):return partial(self, instance) if instance else selfpipe = Pipe()class Maybe(Pipe):@staticmethoddef compose(first, second):name = lambda: '{0} ?| {1}'.format(get_name(first), get_name(second))def composite(*args, **kwargs):result = first(*args, **kwargs)return None if result is None else second(result)return set_name(name, composite)def __call__(self, *args, **kwargs):if len(args) == 1 and args[0] is None and not kwargs:return Nonereturn self.func(*args, **kwargs)def __lt__(self, thing):return (None if thing is None elseself.func(thing) if self.func elsething)maybe = Maybe()def prepare_function_for_pipe(thing):if isinstance(thing, XObject):return ~thingif isinstance(thing, tuple):return xpartial(*thing)if isinstance(thing, basestring):return StringFormatter(thing)if callable(thing):return thingraise ValueError('Cannot pipe %s' % thing)def StringFormatter(template):'''把一个字符串转化为一个带有format格式的函数content是格式化字符串的参数:param template::return:'''f = unicode(template).formatdef _format(content):if isinstance(content, dict):return f(**content)if _iterable(content):return f(*content)return f(content)return set_name(lambda: "format('%s')" % template[:20], _format)def _iterable(obj):"Iterable but not a string"return isinstance(obj, Iterable) and not isinstance(obj, basestring)class XObject(object):def __init__(self, func=None):self._func = funcset_name(lambda: get_name(func) if func else 'X', self)def __repr__(self):return get_name(self)def __invert__(self):return self._func or set_name('X', lambda x: x)def bind(self, name, func):set_name(name, func)return XObject((self._func | func) if self._func else (pipe | func))def __call__(self, *args, **kwargs):name = lambda: 'X(%s)' % repr_args(*args, **kwargs)return self.bind(name, lambda x: x(*args, **kwargs))def __eq__(self, other):return self.bind(lambda: 'X == {0!r}'.format(other), lambda x: x == other)def __getattr__(self, name):return self.bind(lambda: 'X.{0}'.format(name), lambda x: getattr(x, name))def __getitem__(self, item):return self.bind(lambda: 'X[{0!r}]'.format(item), lambda x: x[item])def __gt__(self, other):return self.bind(lambda: 'X > {0!r}'.format(other), lambda x: x > other)def __ge__(self, other):return self.bind(lambda: 'X >= {0!r}'.format(other), lambda x: x >= other)def __lt__(self, other):return self.bind(lambda: 'X < {0!r}'.format(other), lambda x: x < other)def __le__(self, other):return self.bind(lambda: 'X <= {0!r}'.format(other), lambda x: x <= other)def __mod__(self, y):return self.bind(lambda: 'X % {0!r}'.format(y), lambda x: x % y)def __ne__(self, other):return self.bind(lambda: 'X != {0!r}'.format(other), lambda x: x != other)def __neg__(self):return self.bind(lambda: '-X', lambda x: -x)def __mul__(self, other):return self.bind(lambda: 'X * {0!r}'.format(other), lambda x: x * other)def __div__(self, other):return self.bind(lambda: 'X / {0!r}'.format(other), lambda x: x / other)def __add__(self, other):return self.bind(lambda: 'X + {0!r}'.format(other), lambda x: x + other)def __sub__(self, other):return self.bind(lambda: 'X - {0!r}'.format(other), lambda x: x - other)def __pow__(self, other):return self.bind(lambda: 'X ** {0!r}'.format(other), lambda x: x ** other)def __ror__(self, func):return pipe | func | selfdef __or__(self, func):if isinstance(func, Pipe):return func.__ror__(self)return pipe | self | funcdef _in_(self, y):return self.bind(lambda: 'X._in_({0!r})'.format(y), lambda x: x in y)X = XObject()def xpartial(func, *xargs, **xkwargs):"""Like :func:`functools.partial`, but can take an :class:`XObject`placeholder that will be replaced with the first positional argumentwhen the partially applied function is called.Useful when the function's positional arguments' order doesn't fit yoursituation, e.g.:>>> reverse_range = xpartial(range, X, 0, -1)>>> reverse_range(5)[5, 4, 3, 2, 1]It can also be used to transform the positional argument to a keywordargument, which can come in handy inside a *pipe*::xpartial(objects.get, id=X)Also the XObjects are evaluated, which can be used for some sort ofdestructuring of the argument::xpartial(somefunc, name=X.name, number=X.contacts['number'])Lastly, unlike :func:`functools.partial`, this creates a regular functionwhich will bind to classes (like the ``curry`` function from``django.utils.functional``)."""any_x = any(isinstance(a, XObject)for a in xargs + tuple(xkwargs.values()))use = lambda x, value: (~x)(value) if isinstance(x, XObject) else x@wraps(func)def xpartially_applied(*func_args, **func_kwargs):if any_x:if not func_args:raise ValueError('Function "%s" partially applied with an ''X placeholder but called with no positional arguments.'% get_name(func))first = func_args[0]rest = func_args[1:]args = tuple(use(x, first) for x in xargs) + restkwargs = dict((k, use(x, first)) for k, x in xkwargs.iteritems())kwargs.update(func_kwargs)else:args = xargs + func_argskwargs = dict(xkwargs, **func_kwargs)return func(*args, **kwargs)name = lambda: '%s(%s)' % (get_name(func), repr_args(*xargs, **xkwargs))return set_name(name, xpartially_applied)if __name__ == '__main__':f = 'ggggcdwecfwefhweejhjwq' > pipe | list | ','.joinprint f