Source code for dataprocessor.execute

# coding=utf-8
import json

from .exception import DataProcessorError, InvalidJSONError, pipe_execute
from . import pipes


[docs]def check_manip(manip): """ check whether maniplation is valid Parameters ---------- manip : list A list defining manipulation Raises ------ InvalidJSONError It is raised if manipulation is invalid. Examples -------- >>> def do_check_manip(manip): ... try: ... check_manip(manip) ... except InvalidJSONError as e: ... print("[%s] %s" % (e.name, e.msg)) >>> do_check_manip([{"nae": "save_json", # typo: "nae" ... "args": ["out.json"], ... "kwds": {"silent" : "True"}}]) [] pipe name is not defined >>> do_check_manip([{"name": "save_json", ... "arg": ["out.json"], # typo: "arg" ... "kwds": {"silent" : "True"}}]) [save_json] attributes of pipes must be in 'name', 'args', 'kwds' >>> do_check_manip([{"name": "save_json", ... "args": ["out.json"], ... "kwd": {"silent" : "True"}}]) # typo: "kwd" [save_json] attributes of pipes must be in 'name', 'args', 'kwds' >>> do_check_manip([{"name": "sav_json", # typo: "sav_json" ... "args": ["out.json"], ... "kwds": {"silent" : "True"}}]) [sav_json] invalid pipe name >>> do_check_manip([{"name": "save_json", ... "args": [], # argument mismatch ... "kwds": {"silent" : "True"}}]) [save_json] the number of arguments mismatches >>> do_check_manip([{"name": "load_json", ... "args": ["in.json"], ... "kwds": {"silent": "True"}}]) ... # `load_json` does not have "kwds" [load_json] pipe does not have 'kwds' >>> do_check_manip([{"name": "save_json", ... "args": ["out.json"], ... "kwds": {"silnt" : "True"}}]) ... # typo in keywords: "silnt" [save_json] keyword argument 'silnt' does not exist """ for mn in manip: if "name" not in mn: raise InvalidJSONError("", "pipe name is not defined") name = mn["name"] for k in mn: if k not in ["name", "args", "kwds"]: msg = "attributes of pipes must be in 'name', 'args', 'kwds'" raise InvalidJSONError(name, msg) if name not in pipes.pipes_dics: raise InvalidJSONError(name, "invalid pipe name") dic = pipes.pipes_dics[name] if len(mn["args"]) != len(dic["args"]): raise InvalidJSONError(name, "the number of arguments mismatches") if "kwds" in mn: if "kwds" not in dic: raise InvalidJSONError(name, "pipe does not have 'kwds'") kwd_names = [kwd for kwd, _ in dic["kwds"]] for kwd in mn["kwds"]: if kwd not in kwd_names: msg = "keyword argument '%s' does not exist" raise InvalidJSONError(name, msg % kwd) else: mn["kwds"] = {}
[docs]def execute(manip, node_list=[]): """execute pipeline defined in `manip` This function does not check whether `manip` is valid. Please check by `.check_manip` Parameters ---------- manip : list A list defining manipulation node_list : list, optional initial node_list (default=[]) Returns ------- node_list : list """ for mn in manip: name = mn["name"] dic = pipes.pipes_dics[name] args = mn["args"] kwds = mn["kwds"] with pipe_execute(name): node_list = dic["func"](node_list, *args, **kwds) return node_list
[docs]def pipe(name, args, kwds={}, node_list=[]): """ Execute a pipe Parameters ---------- name : str Name of pipe args : list arguments of pipe kwds : dict, optional keyword arguments of pipe (default={}) node_list : list, optional initial node_list (default=[]) """ mp = { "name": name, "args": args, } if kwds: mp["kwds"] = kwds manip = [mp, ] check_manip(manip) return execute(manip, node_list)
[docs]def execute_from_json_str(manip_json_str, node_list=[]): """execute pipeline from JSON string This do `check_manip` and `execute`. Raises ------ InvalidJSONError It is raised if manipulation is invalid. See also `.check_manip` Returns ------- node_list : list """ try: manip = json.loads(manip_json_str) except ValueError: raise DataProcessorError("Cannot decode JSON file.") check_manip(manip) return execute(manip, node_list)