Source code for ww.tools.iterables

# coding: utf-8

"""
    :doc:`g() </iterable_wrapper>` is very convenient, but it's only a
    thin wrapper on top of the tools from this module.

    So if you want to apply some of the goodies from it without having to
    turn your iterables into IterableWrapper objects, you can use the functions
    from this module directly.

    Example:

        >>> from ww.tools.iterables import chunks  # same as g().chunks()
        >>> list(chunks(range(10), 3))
        [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9,)]

    You'll find bellow the detailed documentation for each functions. Remember
    they all take an iterable as input, and most often ouput a generator.

    Go have a look, there is some great stuff here!
"""

from __future__ import division, absolute_import, print_function

import itertools

from future.utils import raise_from

import ww

from ww.types import Union, Callable, Iterable, Any, T  # noqa
from ww.utils import renamed_argument

from collections import deque

# TODO: implement all https://docs.python.org/3/library/itertools.html
# which means backports and receipes
# TODO: cycle, but accept a max repeat
# TODO: filter() but:
# if an iterable is first element, lambda x: x in first_element
# if an iterable is a non callable scalare,
# lambda x: x == first_element
# a 3rd param to take an Exception or a list of exception to ignore so you can
# filter out stuff raisin exceptions
# TODO: map, but a 3rd param to take an Exception or a list of exception
# to ignore so you can filter out stuff raisin exceptions


[docs]def starts_when(iterable, condition): # type: (Iterable, Union[Callable, Any]) -> Iterable """Start yielding items when a condition arise. Args: iterable: the iterable to filter. condition: if the callable returns True once, start yielding items. If it's not a callable, it will be converted to one as `lambda condition: condition == item`. Example: >>> list(starts_when(range(10), lambda x: x > 5)) [6, 7, 8, 9] >>> list(starts_when(range(10), 7)) [7, 8, 9] """ if not callable(condition): cond_value = condition def condition(x): return x == cond_value return itertools.dropwhile(lambda x: not condition(x), iterable)
[docs]def stops_when(iterable, condition): # type: (Iterable, Union[Callable, Any]) -> Iterable """Stop yielding items when a condition arise. Args: iterable: the iterable to filter. condition: if the callable returns True once, stop yielding items. If it's not a callable, it will be converted to one as `lambda condition: condition == item`. Example: >>> list(stops_when(range(10), lambda x: x > 5)) [0, 1, 2, 3, 4, 5] >>> list(stops_when(range(10), 7)) [0, 1, 2, 3, 4, 5, 6] """ if not callable(condition): cond_value = condition def condition(x): return x == cond_value return itertools.takewhile(lambda x: not condition(x), iterable)
[docs]def skip_duplicates(iterable, key=None, fingerprints=()): # type: (Iterable, Callable, Any) -> Iterable """ Returns a generator that will yield all objects from iterable, skipping duplicates. Duplicates are identified using the `key` function to calculate a unique fingerprint. This does not use natural equality, but the result use a set() to remove duplicates, so defining __eq__ on your objects would have no effect. By default the fingerprint is the object itself, which ensure the functions works as-is with an iterable of primitives such as int, str or tuple. :Example: >>> list(skip_duplicates([1, 2, 3, 4, 4, 2, 1, 3 , 4])) [1, 2, 3, 4] The return value of `key` MUST be hashable, which means for non hashable objects such as dict, set or list, you need to specify a a function that returns a hashable fingerprint. :Example: >>> list(skip_duplicates(([], [], (), [1, 2], (1, 2)), ... lambda x: tuple(x))) [[], [1, 2]] >>> list(skip_duplicates(([], [], (), [1, 2], (1, 2)), ... lambda x: (type(x), tuple(x)))) [[], (), [1, 2], (1, 2)] For more complex types, such as custom classes, the default behavior is to remove nothing. You MUST provide a `key` function is you wish to filter those. :Example: >>> class Test(object): ... def __init__(self, foo='bar'): ... self.foo = foo ... def __repr__(self): ... return "Test('%s')" % self.foo ... >>> list(skip_duplicates([Test(), Test(), Test('other')])) [Test('bar'), Test('bar'), Test('other')] >>> list(skip_duplicates([Test(), Test(), Test('other')],\ lambda x: x.foo)) [Test('bar'), Test('other')] """ fingerprints = fingerprints or set() fingerprint = None # needed on type errors unrelated to hashing try: # duplicate some code to gain perf in the most common case if key is None: for x in iterable: if x not in fingerprints: yield x fingerprints.add(x) else: for x in iterable: fingerprint = key(x) if fingerprint not in fingerprints: yield x fingerprints.add(fingerprint) except TypeError: try: hash(fingerprint) except TypeError: raise TypeError( "The 'key' function returned a non hashable object of type " "'%s' when receiving '%s'. Make sure this function always " "returns a hashable object. Hint: immutable primitives like" "int, str or tuple, are hashable while dict, set and list are " "not." % (type(fingerprint), x)) else: raise
# TODO: test that on big iterators to check for recursion limit
[docs]def chunks(iterable, chunksize, cast=tuple): # type: (Iterable, int, Callable) -> Iterable """ Yields items from an iterator in iterable chunks. """ it = iter(iterable) while True: yield cast(itertools.chain([next(it)], itertools.islice(it, chunksize - 1)))
[docs]def window(iterable, size=2, cast=tuple): # type: (Iterable, int, Callable) -> Iterable """ Yields iterms by bunch of a given size, but rolling only one item in and out at a time when iterating. >>> list(window([1, 2, 3])) [(1, 2), (2, 3)] By default, this will cast the window to a tuple before yielding it; however, any function that will accept an iterable as its argument is a valid target. If you pass None as a cast value, the deque will be returned as-is, which is more performant. However, since only one deque is used for the entire iteration, you'll get the same reference everytime, only the deque will contains different items. The result might not be what you want : >>> list(window([1, 2, 3], cast=None)) [deque([2, 3], maxlen=2), deque([2, 3], maxlen=2)] """ iterable = iter(iterable) d = deque(itertools.islice(iterable, size), size) if cast: yield cast(d) for x in iterable: d.append(x) yield cast(d) else: yield d for x in iterable: d.append(x) yield d
[docs]def at_index(iterable, index): # type: (Iterable[T], int) -> T """" Return the item at the index of this iterable or raises IndexError. WARNING: this will consume generators. Negative indices are allowed but be aware they will cause n items to be held in memory, where n = abs(index) """ try: if index < 0: return deque(iterable, maxlen=abs(index)).popleft() return next(itertools.islice(iterable, index, index + 1)) except (StopIteration, IndexError) as e: raise_from(IndexError('Index "%d" out of range' % index), e)
# TODO: accept a default value if not value is found
[docs]def first_true(iterable, func): # type: (Iterable[T], Callable) -> T """" Return the first item of the iterable for which func(item) == True. Or raises IndexError. WARNING: this will consume generators. """ try: return next((x for x in iterable if func(x))) except StopIteration as e: # TODO: Find a better error message raise_from(IndexError('No match for %s' % func), e)
[docs]def iterslice(iterable, start=0, stop=None, step=1): # type: (Iterable[T], int, int, int) -> Iterable[T] """ Like itertools.islice, but accept int and callables. If `start` is a callable, start the slice after the first time start(item) == True. If `stop` is a callable, stop the slice after the first time stop(item) == True. """ if step < 0: raise ValueError("The step can not be negative: '%s' given" % step) if not isinstance(start, int): # [Callable:Callable] if not isinstance(stop, int) and stop: return stops_when(starts_when(iterable, start), stop) # [Callable:int] return starts_when(itertools.islice(iterable, None, stop, step), start) # [int:Callable] if not isinstance(stop, int) and stop: return stops_when(itertools.islice(iterable, start, None, step), stop) # [int:int] return itertools.islice(iterable, start, stop, step)
# TODO: allow to disable auto sorting. Document how to make it behave # like the original groupby # TODO: allow cast to be None, which set cast to lambda x: x @renamed_argument('key', 'keyfunc') def groupby(iterable, keyfunc=None, reverse=False, cast=tuple): # type: (Iterable, Callable, bool, Callable) -> Iterable sorted_iterable = sorted(iterable, key=keyfunc, reverse=reverse) for key, group in itertools.groupby(sorted_iterable, keyfunc): yield key, cast(group) # TODO: make the same things than in matrix, where the default value # can be a callable, a non string iterable, or a value
[docs]def firsts(iterable, items=1, default=None): # type: (Iterable[T], int, T) -> Iterable[T] """ Lazily return the first x items from this iterable or default. """ try: items = int(items) except (ValueError, TypeError): raise ValueError("items should be usable as an int but is currently " "'{}' of type '{}'".format(items, type(items))) # TODO: replace this so that it returns lasts() if items < 0: raise ValueError(ww.f("items is {items} but should " "be greater than 0. If you wish to get the last " "items, use the lasts() function.")) i = 0 for i, item in zip(range(items), iterable): yield item for x in range(items - (i + 1)): yield default
[docs]def lasts(iterable, items=1, default=None): # type: (Iterable[T], int, T) -> Iterable[T] """ Lazily return the last x items from this iterable or default. """ last_items = deque(iterable, maxlen=items) for _ in range(items - len(last_items)): yield default for y in last_items: yield y
# reduce is technically the last value of accumulate # use ww.utils.EMPTY instead of EMPTY # Put in the doc than scan=fold=accumulare and reduce=accumulate # replace https://docs.python.org/3/library/itertools.html#itertools.accumulate # that works only on Python 3.3 and doesn't have echo_start # def accumulate(func, iterable, start=ww.utils.EMPTY, *, echo_start=True): # """ # Scan higher-order function. # The first 3 positional arguments are alike to the ``functools.reduce`` # signature. This function accepts an extra optional ``echo_start`` # parameter that controls whether the first value should be in the output. # """ # it = iter(iterable) # if start is ww.utils._EMPTY: # start = next(it) # if echo_start: # yield start # for item in it: # start = func(start, item) # yield start