Source code for dataprocessor.pipe

# -*- coding: utf-8 -*-

"""
This module provides wrapper functions
which make easy to define a SIMD-like pipe,
i.e. operates independently on each nodes.

The following two codes are equal without error handling:

>>> @wrap
... def pipe1(node, arg1, kwd1=""):
...     print(node)
...     node["attr1"] = "val"
...     return node

>>> def pipe1(node_list, arg1, kwd1=""):
...     for node in node_list:
...         print(node)
...         node["attr1"] = "val"
...     return node_list

Error handling policy is following:

- If ``DataProcessorError`` or its inherited is raised
  in the decorated function while processing a node,
  this process is quited and continue with other nodes.

- If another exception is raised,
  whole processes are quited.

Simply, corresponds to the following conceptual code:

>>> for node in node_list:  # doctest:+SKIP
...     try:
...         decorated(node)
...     except DataProcessorError:
...         continue

"""

import sys
import os.path as op
from functools import wraps

from .nodes import node_types
from .exception import DataProcessorError as dpError


[docs]class PipeImplementationError(Exception): """ Raised when the implementation of pipe is invalid Attributes ---------- name : str The name of pipe in which error occurred. msg : str A message for the error """ def __init__(self, name, msg): self.name = name self.msg = msg def __str__(self): return "[%s]: %s" % (self.name, self.msg)
def _wrap(filter_func): def decorator(func): @wraps(func) def wrapper(node_list, *args, **kwds): for node in node_list: if not filter_func(node): continue try: new_node = func(node, *args, **kwds) if not isinstance(new_node, dict): raise PipeImplementationError(func.__name__, "Pipe must return node") if new_node and new_node != node: node.clear() node.update(new_node) except dpError as e: print >>sys.stderr, e continue return node_list return wrapper return decorator wrap = _wrap(lambda _: True) wrap.__doc__ = """ Create a pipe from a function operates on a node. This decorator makes easy to define a SIMD-like pipe, i.e. operates independently on each nodes. Examples -------- The following two codes are equal without error handling: >>> @wrap ... def pipe1(node, arg1, kwd1=""): ... print(node) ... node["attr1"] = "val" ... return node >>> def pipe1(node_list, arg1, kwd1=""): ... for node in node_list: ... print(node) ... node["attr1"] = "val" ... return node_list """ file = _wrap(lambda n: op.isfile(n["path"])) file.__doc__ = "Create a pipe which operates on file nodes" directory = _wrap(lambda n: op.isdir(n["path"])) directory.__doc__ = "Create a pipe which operates on directory nodes"
[docs]def type(typename): """ Create a pipe which operates on nodes of specific type. Examples -------- >>> nl = [{ ... "path": "/path/1", ... "type": "project", ... "children": ["/path/2"], ... "parents": [], ... }, { ... "path": "/path/2", ... "type": "run", ... "children": [], ... "parents": ["/path/1"], ... }] >>> @type("run") ... def run_pipe(node): ... node["comment"] = "RUN" ... return node >>> @type("project") ... def project_pipe(node): ... node["comment"] = "PROJECT" ... return node >>> nl = run_pipe(nl) >>> nl = project_pipe(nl) >>> nl == [{ ... "path": "/path/1", ... "type": "project", ... "children": ["/path/2"], ... "parents": [], ... "comment": "PROJECT", ... }, { ... "path": "/path/2", ... "type": "run", ... "children": [], ... "parents": ["/path/1"], ... "comment": "RUN", ... }] True works on project nodes only. """ if typename not in node_types: raise dpError("Invalid type name: {}".format(typename)) return _wrap(lambda n: n["type"] == typename)