Source code for malaya_speech.pipeline

"""
Copyright (c) 2017, Continuum Analytics, Inc. and contributors
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.

Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

Neither the name of Continuum Analytics nor the names of any contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.

https://github.com/python-streamz/streamz/blob/master/LICENSE.txt
"""

# this pipeline fundamentally follow streamz, but we added checkpoint as the fundamental
# so we have to improve the baseline code

import functools
import logging
from collections import deque
from tornado.locks import Condition
from .orderset import OrderedWeakrefSet

_global_sinks = set()
logger = logging.getLogger(__name__)
zipping = zip


def identity(x):
    return x


[docs]class Pipeline(object): _graphviz_shape = 'ellipse' _graphviz_style = 'rounded,filled' _graphviz_fillcolor = 'white' _graphviz_orientation = 0 str_list = ['func', 'predicate', 'n', 'interval'] __name__ = 'pipeline' def __init__(self, upstream=None, upstreams=None, name=None): self.downstreams = OrderedWeakrefSet() if upstreams is not None: self.upstreams = list(upstreams) else: self.upstreams = [upstream] for upstream in self.upstreams: if upstream: upstream.downstreams.add(self) self.name = name self.checkpoint = {} def __str__(self): s_list = [] if self.name: s_list.append('{}; {}'.format(self.name, self.__class__.__name__)) else: s_list.append(self.__class__.__name__) for m in self.str_list: s = '' at = getattr(self, m, None) if at: if not callable(at): s = str(at) elif hasattr(at, '__name__'): s = getattr(self, m).__name__ elif hasattr(at.__class__, '__name__'): s = getattr(self, m).__class__.__name__ else: s = None if s: s_list.append('{}={}'.format(m, s)) if len(s_list) <= 2: s_list = [term.split('=')[-1] for term in s_list] text = '<' text += s_list[0] if len(s_list) > 1: text += ': ' text += ', '.join(s_list[1:]) text += '>' return text __repr__ = __str__ @classmethod def register_api(cls, modifier=identity): def _(func): @functools.wraps(func) def wrapped(*args, **kwargs): return func(*args, **kwargs) setattr(cls, func.__name__, modifier(wrapped)) return func return _ def _climb(self, source, name, x): for upstream in list(source.upstreams): if not upstream: source.checkpoint[name] = x else: self._climb(upstream, name, x) def _emit(self, x): name = type(self).__name__ if hasattr(self, 'func'): if 'lambda' in self.func.__name__: if not self.name: name = self.func.__name__ else: name = '' else: name = self.func.__name__ if self.name: if len(name): name = f'{name}_{self.name}' else: name = self.name self._climb(self, name, x) result = [] for downstream in list(self.downstreams): r = downstream.update(x, who=self) if isinstance(r, list): result.extend(r) else: result.append(r) return [element for element in result if element is not None] def update(self, x, who=None): self._emit(x) @property def upstream(self): if len(self.upstreams) != 1: raise ValueError('Pipeline has multiple upstreams') else: return self.upstreams[0]
[docs] def visualize(self, filename='pipeline.png', **kwargs): """ Render the computation of this object's task graph using graphviz. Requires ``graphviz`` to be installed. Parameters ---------- filename : str, optional The name of the file to write to disk. kwargs: Graph attributes to pass to graphviz like ``rankdir="LR"`` """ from .graph import visualize return visualize(self, filename, **kwargs)
def emit(self, x): result = self._emit(x) self.checkpoint.pop('Pipeline', None) return self.checkpoint def __call__(self, x): return self.emit(x)
[docs]@Pipeline.register_api() class map(Pipeline): """ apply a function / method to the pipeline Examples -------- >>> source = Pipeline() >>> source.map(lambda x: x + 1).map(print) >>> source.emit(1) 2 """ def __init__(self, upstream, func, *args, **kwargs): self.func = func name = kwargs.pop('name', None) self.kwargs = kwargs self.args = args Pipeline.__init__(self, upstream, name=name) _global_sinks.add(self) def update(self, x, who=None): try: result = self.func(x, *self.args, **self.kwargs) except Exception as e: logger.exception(e) raise else: return self._emit(result)
[docs]@Pipeline.register_api() class batching(Pipeline): """ Batching stream into tuples Examples -------- >>> source = Pipeline() >>> source.batching(2).map(print) >>> source.emit([1,2,3,4,5]) ([1, 2], [3, 4], [5]) """ _graphviz_shape = 'diamond' def __init__(self, upstream, n, **kwargs): self.n = n Pipeline.__init__(self, upstream, **kwargs) _global_sinks.add(self) def update(self, x, who=None): result = [] for i in range(0, len(x), self.n): index = min(i + self.n, len(x)) result.append(x[i:index]) return self._emit(tuple(result))
[docs]@Pipeline.register_api() class partition(Pipeline): """ Partition stream into tuples of equal size Examples -------- >>> source = Pipeline() >>> source.partition(3).map(print) >>> for i in range(10): ... source.emit(i) (0, 1, 2) (3, 4, 5) (6, 7, 8) """ _graphviz_shape = 'diamond' def __init__(self, upstream, n, **kwargs): self.n = n self.buffer = [] Pipeline.__init__(self, upstream, **kwargs) _global_sinks.add(self) def update(self, x, who=None): self.buffer.append(x) if len(self.buffer) == self.n: result, self.buffer = self.buffer, [] return self._emit(tuple(result)) else: return []
[docs]@Pipeline.register_api() class sliding_window(Pipeline): """ Produce overlapping tuples of size n Parameters ---------- return_partial : bool If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued. Examples -------- >>> source = Pipeline() >>> source.sliding_window(3, return_partial=False).map(print) >>> for i in range(8): ... source.emit(i) (0, 1, 2) (1, 2, 3) (2, 3, 4) (3, 4, 5) (4, 5, 6) (5, 6, 7) """ _graphviz_shape = 'diamond' def __init__(self, upstream, n, return_partial=True, **kwargs): self.n = n self.buffer = deque(maxlen=n) self.partial = return_partial Pipeline.__init__(self, upstream, **kwargs) _global_sinks.add(self) def update(self, x, who=None): self.buffer.append(x) if self.partial or len(self.buffer) == self.n: return self._emit(tuple(self.buffer)) else: return []
[docs]@Pipeline.register_api() class foreach_map(Pipeline): """ Apply a function to every element in a tuple in the stream. Parameters ---------- func: callable method: str, optional (default='sync') method to process each elements. * ``'sync'`` - loop one-by-one to process. * ``'async'`` - async process all elements at the same time. * ``'thread'`` - multithreading level to process all elements at the same time. Default is 1 worker. Override `worker_size=n` to increase. * ``'process'`` - multiprocessing level to process all elements at the same time. Default is 1 worker. Override `worker_size=n` to increase. *args : The arguments to pass to the function. **kwargs: Keyword arguments to pass to func. Examples -------- >>> source = Pipeline() >>> source.foreach_map(lambda x: 2*x).map(print) >>> for i in range(3): ... source.emit((i, i)) (0, 0) (2, 2) (4, 4) """ def __init__(self, upstream, func, method='sync', *args, **kwargs): method = method.lower() if method not in ['sync', 'async', 'thread', 'process']: raise ValueError( 'method only supported [`sync`, `async`, `thread`, `process`]' ) self.func = func name = kwargs.pop('name', None) worker_size = kwargs.pop('worker_size', 1) self.worker_size = worker_size self.kwargs = kwargs self.args = args self.method = method if self.method == 'async': try: from tornado import gen except BaseException: raise ValueError( 'tornado not installed. Please install it by `pip install tornado` and try again.' ) if self.method in ['thread', 'process']: try: import dask.bag as db except BaseException: raise ValueError( 'dask not installed. Please install it by `pip install dask` and try again.' ) Pipeline.__init__(self, upstream, name=name) _global_sinks.add(self) def update(self, x, who=None): try: if self.method == 'async': from tornado import gen @gen.coroutine def function(e, *args, **kwargs): return self.func(e, *args, **kwargs) @gen.coroutine def loop(): r = yield [ function(e, *self.args, **self.kwargs) for e in x ] return r result = loop().result() if self.method in ['thread', 'process']: import dask.bag as db bags = db.from_sequence(x) mapped = bags.map(self.func, *self.args, **self.kwargs) if self.method == 'thread': scheduler = 'threads' if self.method == 'process': scheduler = 'processes' result = mapped.compute( scheduler=scheduler, num_workers=self.worker_size ) if self.method == 'sync': result = [self.func(e, *self.args, **self.kwargs) for e in x] except Exception as e: logger.exception(e) raise else: return self._emit(result)
[docs]@Pipeline.register_api() class flatten(Pipeline): """ Flatten streams of lists or iterables into a stream of elements Examples -------- >>> source = Pipeline() >>> source.flatten().map(print) >>> source.emit([[1, 2, 3], [4, 5], [6, 7, 7]]) [1, 2, 3, 4, 5, 6, 7, 7] """ def __init__(self, upstream, *args, **kwargs): name = kwargs.pop('name', None) self.kwargs = kwargs self.args = args Pipeline.__init__(self, upstream, name=name) _global_sinks.add(self) def update(self, x, who=None): L = [] for item in x: if isinstance(item, list) or isinstance(item, tuple): L.extend(item) else: L.append(item) return self._emit(L)
[docs]@Pipeline.register_api() class zip(Pipeline): """ Combine 2 branches into 1 branch. Examples -------- >>> source = Pipeline() >>> left = source.map(lambda x: x + 1, name = 'left') >>> right = source.map(lambda x: x + 10, name = 'right') >>> left.zip(right).map(sum).map(print) >>> source.emit(2) 15 """ _graphviz_orientation = 270 _graphviz_shape = 'triangle' def __init__(self, *upstreams, **kwargs): self.maxsize = kwargs.pop('maxsize', 10) self.condition = Condition() self.literals = [ (i, val) for i, val in enumerate(upstreams) if not isinstance(val, Pipeline) ] self.buffers = { upstream: deque() for upstream in upstreams if isinstance(upstream, Pipeline) } upstreams2 = [ upstream for upstream in upstreams if isinstance(upstream, Pipeline) ] Pipeline.__init__(self, upstreams=upstreams2, **kwargs) _global_sinks.add(self) def _add_upstream(self, upstream): # Override method to handle setup of buffer for new stream self.buffers[upstream] = deque() super(zip, self)._add_upstream(upstream) def _remove_upstream(self, upstream): # Override method to handle removal of buffer for stream self.buffers.pop(upstream) super(zip, self)._remove_upstream(upstream)
[docs] def pack_literals(self, tup): """ Fill buffers for literals whenever we empty them """ inp = list(tup)[::-1] out = [] for i, val in self.literals: while len(out) < i: out.append(inp.pop()) out.append(val) while inp: out.append(inp.pop()) return out
def update(self, x, who=None): L = self.buffers[who] # get buffer for stream L.append(x) if len(L) == 1 and all(self.buffers.values()): tup = tuple(self.buffers[up][0] for up in self.upstreams) for buf in self.buffers.values(): buf.popleft() self.condition.notify_all() if self.literals: tup = self.pack_literals(tup) return self._emit(tup) elif len(L) > self.maxsize: return self.condition.wait()
@Pipeline.register_api() class foreach_zip(Pipeline): _graphviz_orientation = 270 _graphviz_shape = 'triangle' def __init__(self, *upstreams, **kwargs): self.maxsize = kwargs.pop('maxsize', 10) self.condition = Condition() self.literals = [ (i, val) for i, val in enumerate(upstreams) if not isinstance(val, Pipeline) ] self.buffers = { upstream: deque() for upstream in upstreams if isinstance(upstream, Pipeline) } upstreams2 = [ upstream for upstream in upstreams if isinstance(upstream, Pipeline) ] Pipeline.__init__(self, upstreams=upstreams2, **kwargs) _global_sinks.add(self) def _add_upstream(self, upstream): # Override method to handle setup of buffer for new stream self.buffers[upstream] = deque() super(zip, self)._add_upstream(upstream) def _remove_upstream(self, upstream): # Override method to handle removal of buffer for stream self.buffers.pop(upstream) super(zip, self)._remove_upstream(upstream) def pack_literals(self, tup): """ Fill buffers for literals whenever we empty them """ inp = list(tup)[::-1] out = [] for i, val in self.literals: while len(out) < i: out.append(inp.pop()) out.append(val) while inp: out.append(inp.pop()) return out def update(self, x, who=None): L = self.buffers[who] # get buffer for stream L.append(x) if len(L) == 1 and all(self.buffers.values()): tup = tuple(self.buffers[up][0] for up in self.upstreams) for buf in self.buffers.values(): buf.popleft() self.condition.notify_all() if self.literals: tup = self.pack_literals(tup) tup = tuple(zipping(*tup)) return self._emit(tup) elif len(L) > self.maxsize: return self.condition.wait()