f = pipe | list | foreach("{0}=={0}")
print f("1112342snfjfbjeryehbsa")
print (pipe | list | foreach("{0}=={0}"))("1112342snfjfbjeryehbsa")
print "1112342snfjfbjeryehbsa" > pipe | list | foreach("{0}=={0}")
func = lambda x,y: x + y
print func(3,4)
def func(x, y):
return x + y
def test(f, a, b):
print 'test'
print f(a, b)
test(func, 3, 5)
test((lambda x,y: x**2 + y), 6, 9)
def line_conf():
def line(x):
return 2*x+1
return line # return a function object
my_line = line_conf()
def hello(x,y,z,w):
return 12
print partial(hello)(1,2,3,4) == 12
print partial(hello,1)(2,3,4) == 12
print partial(hello,1,2)(3,4) == 12
print partial(hello1,2,3,4)() == 12
def partial(func, *args, **keywords):
def newfunc(*fargs, **fkeywords):
newkeywords = keywords.copy()
return func(*(args + fargs), **newkeywords)
newfunc.func = func
newfunc.args = args
newfunc.keywords = keywords
return newfunc
# encoding=utf-8
from itertools import imap, chain
def set_name(name, f):
:param name:
:param f:
>>> set_name("hello",lambda x:x).__pipetools__name__
f.__pipetools__name__ = name
except (AttributeError, UnicodeEncodeError):
return f
def get_name(f):
:param f:
from pipetools.main import Pipe
pipetools_name = getattr(f, '__pipetools__name__', None)
if pipetools_name: #如果__pipetools__name__属性存在,那么f是函数对象或者函数
return pipetools_name() if callable(pipetools_name) else pipetools_name
if isinstance(f, Pipe):
return repr(f)
return f.__name__ if hasattr(f, '__name__') else repr(f)
def repr_args(*args, **kwargs):
:param args:
:param kwargs:
>>> repr_args(2, 3, 21, 4, 5, name=2234, ii='ffff')
"2, 3, 21, 4, 5, ii='ffff', name=2234"
return ', '.join(chain( #chain连接多个迭代器为一个
imap('{0!r}'.format, args),
imap('{0[0]}={0[1]!r}'.format, kwargs.iteritems())))
# encoding=utf-8
from functools import partial
from pipetools.main import XObject, StringFormatter
class NoBuilder(ValueError):
def DSBuilder(definition):
:param definition:
builder = select_builder(definition)
if builder:
return builder(definition)
raise NoBuilder("Don't know how to build %s" % type(definition))
def SequenceBuilder(cls, definition):
:param cls:
:param definition:
return lambda x: cls(ds_item(d, x) for d in definition)
def DictBuilder(definition):
return lambda x: dict(
(ds_item(key_def, x), ds_item(val_def, x))
for key_def, val_def in definition.iteritems())
根据前面partial的介绍简单理解一下下面字典的作用,SequenceBuilder是一个有两个参数的函数,partial(SequenceBuilder, tuple)(data)得到计算结果,相当于tuple(data),也就是说partial(SequenceBuilder, tuple)是对tuple的封装,作用和tuple是一样的,封装的目的是处理data中复杂的数据结构
builders = {
tuple: partial(SequenceBuilder, tuple),
list: partial(SequenceBuilder, list),
dict: DictBuilder,
def select_builder(definition):
:param definition:
for cls, builder in builders.iteritems():
if isinstance(definition, cls):
return builder
def ds_item(definition, data):
:param definition:
:param data:
if isinstance(definition, XObject):
return (~definition)(data)
if isinstance(definition, basestring):
return StringFormatter(definition)(data)
if callable(definition):
return definition(data)
return DSBuilder(definition)(data)
except NoBuilder:
# static item
return definition
# encoding=utf-8
import re
from functools import partial, wraps
from pipetools.debug import repr_args, set_name, get_name
from pipetools.ds_builder import DSBuilder, NoBuilder
from pipetools.main import pipe, XObject, StringFormatter, xpartial
def pipe_util(func):
def pipe_util_wrapper(function, *args, **kwargs):
if isinstance(function, XObject):
function = ~function
original_function = function
if args or kwargs:
function = xpartial(function, *args, **kwargs)
name = lambda: '%s(%s)' % (get_name(func), ', '.join(
filter(None, (get_name(original_function), repr_args(*args, **kwargs)))))
f = func(function) #被装饰者的返回值
result = pipe | set_name(name, f) #把被装饰者的返回值封装成一个Pipe对象
# if the util defines an 'attrs' mapping, copy it as attributes
# to the result
attrs = getattr(f, 'attrs', {})
for k, v in attrs.iteritems():
setattr(result, k, v)
# result是一个拥有f函数的Pipe对象
return result
return pipe_util_wrapper
def auto_string_formatter(func):
def auto_string_formatter_wrapper(function, *args, **kwargs):
if isinstance(function, basestring):
function = StringFormatter(function) #StringFormatter的返回值类似于"{0}".format
return func(function, *args, **kwargs)
return auto_string_formatter_wrapper
def data_structure_builder(func):
def ds_builder_wrapper(function, *args, **kwargs):
function = DSBuilder(function)
except NoBuilder:
return func(function, *args, **kwargs)
return ds_builder_wrapper
def regex_condition(func):
If a condition is given as string instead of a function, it is turned
into a regex-matching function.
def regex_condition_wrapper(condition, *args, **kwargs):
if isinstance(condition, basestring):
condition = partial(re.match, condition)
return func(condition, *args, **kwargs)
return regex_condition_wrapper
from functools import partial
from itertools import imap, ifilter, islice, takewhile, dropwhile
import operator
from pipetools.debug import set_name, repr_args, get_name
from pipetools.decorators import data_structure_builder, regex_condition
from pipetools.decorators import pipe_util, auto_string_formatter
from pipetools.main import pipe, X, _iterable
KEY, VALUE = X[0], X[1]
def foreach(function):
Returns a function that takes an iterable and returns an iterator over the
results of calling `function` on each item of the iterable.
>>> xrange(5) > foreach(factorial) | list
[1, 1, 2, 6, 24]
>>> "1234567assfsd" > foreach("{0}") | list
[u'1', u'2', u'3', u'4', u'5', u'6', u'7', u'a', u's', u's', u'f', u's', u'd']
>>> "123455dHHJBHJgg" > foreach(X.lower()) | list
['1', '2', '3', '4', '5', '5', 'd', 'h', 'h', 'j', 'b', 'h', 'j', 'g', 'g']
>>> "12BHJgg" > foreach(("{0}","{0}==")) | list
[(u'1', u'1=='), (u'2', u'2=='), (u'B', u'B=='), (u'H', u'H=='), (u'J', u'J=='), (u'g', u'g=='), (u'g', u'g==')]
>>> "12BHJgg" > foreach([X,X.upper()]) | list
[['1', '1'], ['2', '2'], ['B', 'B'], ['H', 'H'], ['J', 'J'], ['g', 'G'], ['g', 'G']]
>>> "12BHJgg" > foreach({X:X.upper()}) | list
[{'1': '1'}, {'2': '2'}, {'B': 'B'}, {'H': 'H'}, {'J': 'J'}, {'g': 'G'}, {'g': 'G'}]
return partial(imap, function)
def foreach_do(function):
>>> xrange(5) > foreach_do(factorial) | foreach_do(X+3)
[4, 4, 5, 9, 27]
>>> "12332dffrrr" > foreach_do(X.upper())
['1', '2', '3', '3', '2', 'D', 'F', 'F', 'R', 'R', 'R']
return lambda iterable: [function(item) for item in iterable]
def where(condition):
Pipe-able lazy filter.
>>> odd_range = xrange | where(X % 2) | list
>>> odd_range(10)
[1, 3, 5, 7, 9]
return partial(ifilter, condition)
def where_not(condition):
Inverted :func:`where`.
return partial(ifilter, pipe | condition | operator.not_)
def sort_by(function):
Sorts an incoming sequence by using the given `function` as key.
>>> xrange(10) > sort_by(-X)
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Supports automatic data-structure creation::
users > sort_by([X.last_name, X.first_name])
There is also a shortcut for ``sort_by(X)`` called ``sort``:
>>> [4, 5, 8, -3, 0] > sort
[-3, 0, 4, 5, 8]
And (as of ``0.2.3``) a shortcut for reversing the sort:
>>> 'asdfaSfa' > sort_by(X.lower()).descending
['s', 'S', 'f', 'f', 'd', 'a', 'a', 'a']
f = partial(sorted, key=function)
f.attrs = {'descending': _descending_sort_by(function)}
return f
def _descending_sort_by(function):
return partial(sorted, key=function, reverse=True)
sort = sort_by(X)
def debug_print(function):
Prints function applied on input and returns the input.
foo = (pipe
| something
| debug_print(X.get_status())
| something_else
| foreach(debug_print("attr is: {0.attr}"))
| etc)
def debug(thing):
print function(thing)
return thing
return debug
def as_args(function):
Applies the sequence in the input as positional arguments to `function`.
some_lists > as_args(izip)
return lambda x: function(*x)
def as_kwargs(function):
Applies the dictionary in the input as keyword arguments to `function`.
return lambda x: function(**x)
def take_first(count):
Assumes an iterable on the input, returns an iterable with first `count`
items from the input (or possibly less, if there isn't that many).
>>> xrange(9000) > where(X % 100 == 0) | take_first(5) | tuple
(0, 100, 200, 300, 400)
def _take_first(iterable):
return islice(iterable, count)
return pipe | set_name('take_first(%s)' % count, _take_first)
def drop_first(count):
Assumes an iterable on the input, returns an iterable with identical items
except for the first `count`.
>>> xrange(10) > drop_first(5) | tuple
(5, 6, 7, 8, 9)
def _drop_first(iterable):
g = (x for x in xrange(1, count + 1))
return dropwhile(lambda i: unless(StopIteration, g.next)(), iterable)
return pipe | set_name('drop_first(%s)' % count, _drop_first)
def unless(exception_class_or_tuple, func, *args, **kwargs):
When `exception_class_or_tuple` occurs while executing `func`, it will
be caught and ``None`` will be returned.
>>> f = where(X > 10) | list | unless(IndexError, X[0])
>>> f([5, 8, 12, 4])
>>> f([1, 2, 3])
def construct_unless(function):
# a wrapper so we can re-use the decorators
def _unless(*args, **kwargs):
return function(*args, **kwargs)
except exception_class_or_tuple:
return _unless
name = lambda: 'unless(%s, %s)' % (exception_class_or_tuple, ', '.join(
filter(None, (get_name(func), repr_args(*args, **kwargs)))))
return set_name(name, construct_unless(func, *args, **kwargs))
def select_first(condition):
Returns first item from input sequence that satisfies `condition`. Or
``None`` if none does.
>>> ['py', 'pie', 'pi'] > select_first(X.startswith('pi'))
As of ``0.2.1`` you can also
:ref:`directly use regular expressions <auto-regex>` and write the above
>>> ['py', 'pie', 'pi'] > select_first('^pi')
There is also a shortcut for ``select_first(X)`` called ``first_of``:
>>> first_of(['', None, 0, 3, 'something'])
>>> first_of([])
return where(condition) | unless(StopIteration, X.next())
first_of = select_first(X)
def group_by(function):
Groups input sequence by `function`.
Returns an iterator over a sequence of tuples where the first item is a
result of `function` and the second one a list of items matching this
Ordering of the resulting iterator is undefined, but ordering of the items
in the groups is preserved.
>>> [1, 2, 3, 4, 5, 6] > group_by(X % 2) | list
[(0, [2, 4, 6]), (1, [1, 3, 5])]
def _group_by(seq):
result = {}
for item in seq:
result.setdefault(function(item), []).append(item)
return result.iteritems()
return _group_by
def _flatten(x):
if not _iterable(x):
yield x
for y in x:
for z in _flatten(y):
yield z
def flatten(*args):
Flattens an arbitrarily deep nested iterable(s).
return _flatten(args)
flatten = pipe | flatten
def count(iterable):
Returns the number of items in `iterable`.
return sum(1 for whatever in iterable)
count = pipe | count
def take_until(condition):
>>> [1, 4, 6, 4, 1] > take_until(X > 5) | list
[1, 4]
return partial(takewhile, pipe | condition | operator.not_)
# encoding=utf-8
from collections import Iterable
from functools import partial, wraps
from pipetools.debug import get_name, set_name, repr_args
class Pipe(object):
Pipe-style combinator.
p = pipe | F | G | H
p(x) == H(G(F(x)))
def __init__(self, func=None):
self.func = func
self.__name__ = 'Pipe'
def __str__(self):
return get_name(self.func)
__repr__ = __str__
def compose(first, second):
name = lambda: '{0} | {1}'.format(get_name(first), get_name(second))
def composite(*args, **kwargs):
return second(first(*args, **kwargs))
return set_name(name, composite)
def bind(cls, first, second):
return cls(cls.compose(first, second) if all([first, second]) else first or second)
def __or__(self, next_func):
return self.bind(self.func, prepare_function_for_pipe(next_func))
def __ror__(self, prev_func):
return self.bind(prepare_function_for_pipe(prev_func), self.func)
def __lt__(self, thing):
return self.func(thing) if self.func else thing
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
def __get__(self, instance, owner):
return partial(self, instance) if instance else self
pipe = Pipe()
class Maybe(Pipe):
def compose(first, second):
name = lambda: '{0} ?| {1}'.format(get_name(first), get_name(second))
def composite(*args, **kwargs):
result = first(*args, **kwargs)
return None if result is None else second(result)
return set_name(name, composite)
def __call__(self, *args, **kwargs):
if len(args) == 1 and args[0] is None and not kwargs:
return None
return self.func(*args, **kwargs)
def __lt__(self, thing):
return (
None if thing is None else
self.func(thing) if self.func else
maybe = Maybe()
def prepare_function_for_pipe(thing):
if isinstance(thing, XObject):
return ~thing
if isinstance(thing, tuple):
return xpartial(*thing)
if isinstance(thing, basestring):
return StringFormatter(thing)
if callable(thing):
return thing
raise ValueError('Cannot pipe %s' % thing)
def StringFormatter(template):
:param template:
f = unicode(template).format
def _format(content):
if isinstance(content, dict):
return f(**content)
if _iterable(content):
return f(*content)
return f(content)
return set_name(lambda: "format('%s')" % template[:20], _format)
def _iterable(obj):
"Iterable but not a string"
return isinstance(obj, Iterable) and not isinstance(obj, basestring)
class XObject(object):
def __init__(self, func=None):
self._func = func
set_name(lambda: get_name(func) if func else 'X', self)
def __repr__(self):
return get_name(self)
def __invert__(self):
return self._func or set_name('X', lambda x: x)
def bind(self, name, func):
set_name(name, func)
return XObject((self._func | func) if self._func else (pipe | func))
def __call__(self, *args, **kwargs):
name = lambda: 'X(%s)' % repr_args(*args, **kwargs)
return self.bind(name, lambda x: x(*args, **kwargs))
def __eq__(self, other):
return self.bind(lambda: 'X == {0!r}'.format(other), lambda x: x == other)
def __getattr__(self, name):
return self.bind(lambda: 'X.{0}'.format(name), lambda x: getattr(x, name))
def __getitem__(self, item):
return self.bind(lambda: 'X[{0!r}]'.format(item), lambda x: x[item])
def __gt__(self, other):
return self.bind(lambda: 'X > {0!r}'.format(other), lambda x: x > other)
def __ge__(self, other):
return self.bind(lambda: 'X >= {0!r}'.format(other), lambda x: x >= other)
def __lt__(self, other):
return self.bind(lambda: 'X < {0!r}'.format(other), lambda x: x < other)
def __le__(self, other):
return self.bind(lambda: 'X <= {0!r}'.format(other), lambda x: x <= other)
def __mod__(self, y):
return self.bind(lambda: 'X % {0!r}'.format(y), lambda x: x % y)
def __ne__(self, other):
return self.bind(lambda: 'X != {0!r}'.format(other), lambda x: x != other)
def __neg__(self):
return self.bind(lambda: '-X', lambda x: -x)
def __mul__(self, other):
return self.bind(lambda: 'X * {0!r}'.format(other), lambda x: x * other)
def __div__(self, other):
return self.bind(lambda: 'X / {0!r}'.format(other), lambda x: x / other)
def __add__(self, other):
return self.bind(lambda: 'X + {0!r}'.format(other), lambda x: x + other)
def __sub__(self, other):
return self.bind(lambda: 'X - {0!r}'.format(other), lambda x: x - other)
def __pow__(self, other):
return self.bind(lambda: 'X ** {0!r}'.format(other), lambda x: x ** other)
def __ror__(self, func):
return pipe | func | self
def __or__(self, func):
if isinstance(func, Pipe):
return func.__ror__(self)
return pipe | self | func
def _in_(self, y):
return self.bind(lambda: 'X._in_({0!r})'.format(y), lambda x: x in y)
X = XObject()
def xpartial(func, *xargs, **xkwargs):
Like :func:`functools.partial`, but can take an :class:`XObject`
placeholder that will be replaced with the first positional argument
when the partially applied function is called.
Useful when the function's positional arguments' order doesn't fit your
situation, e.g.:
>>> reverse_range = xpartial(range, X, 0, -1)
>>> reverse_range(5)
[5, 4, 3, 2, 1]
It can also be used to transform the positional argument to a keyword
argument, which can come in handy inside a *pipe*::
xpartial(objects.get, id=X)
Also the XObjects are evaluated, which can be used for some sort of
destructuring of the argument::
xpartial(somefunc, name=X.name, number=X.contacts['number'])
Lastly, unlike :func:`functools.partial`, this creates a regular function
which will bind to classes (like the ``curry`` function from
any_x = any(isinstance(a, XObject)
for a in xargs + tuple(xkwargs.values()))
use = lambda x, value: (~x)(value) if isinstance(x, XObject) else x
def xpartially_applied(*func_args, **func_kwargs):
if any_x:
if not func_args:
raise ValueError('Function "%s" partially applied with an '
'X placeholder but called with no positional arguments.'
% get_name(func))
first = func_args[0]
rest = func_args[1:]
args = tuple(use(x, first) for x in xargs) + rest
kwargs = dict((k, use(x, first)) for k, x in xkwargs.iteritems())
args = xargs + func_args
kwargs = dict(xkwargs, **func_kwargs)
return func(*args, **kwargs)
name = lambda: '%s(%s)' % (get_name(func), repr_args(*xargs, **xkwargs))
return set_name(name, xpartially_applied)
if __name__ == '__main__':
f = 'ggggcdwecfwefhweejhjwq' > pipe | list | ','.join
print f