Source code for dataprocessor.pipes.ipynb
# -*- coding: utf-8 -*-
import os.path as op
import webbrowser
from glob import glob
from .. import nodes
from .. import rc
from ..utility import path_expand, check_file
from ..ipynb import resolve_url, resolve_name
from ..exception import DataProcessorError as dpError
[docs]def start(nl, ipynb_path):
"""
Start existing .ipynb file
at standing ipython notebook server
Parameters
----------
ipynb_path : str
path of existing .ipynb file
Raise
-----
DataProcessorError
- No IPython Notebook found
- Existing notebook servers do not start
on the parent directory of .ipynb file.
- Cannot open browser
"""
url = resolve_url(ipynb_path)
try:
webbrowser.open(url)
except webbrowser.Error:
raise dpError("Unknown error while opening notebook")
return nl
[docs]def gather(node_list, pattern="*.ipynb"):
"""
Search ipynb file by its extention
"""
for node in node_list:
path = node["path"]
if not op.isdir(path):
continue
for fn in glob(op.join(path, pattern)):
fn = path_expand(fn)
node = {
"path": fn,
"type": "ipynb",
"name": resolve_name(fn),
"parents": [path, ],
"children": [],
}
nodes.add(node_list, node, strategy="update")
return node_list
[docs]def add(node_list, path, parents=[]):
"""
Add ipynb file manually
"""
p = check_file(path)
if isinstance(parents, str):
parents = [parents]
if not isinstance(parents, list):
raise dpError("Parents must be a list")
parents = map(lambda p: rc.resolve_project_path(p, False), parents)
nodes.add(node_list, {
"path": p,
"type": "ipynb",
"name": resolve_name(p),
"parents": parents,
"children": []
})
return node_list
[docs]def register(pipes_dics):
pipes_dics["start_ipynb"] = {
"func": start,
"args": ["ipynb_path"],
"desc": "start .ipynb in standing notebook",
}
pipes_dics["gather_ipynb"] = {
"func": gather,
"args": [],
"desc": "gather ipynb files",
}
pipes_dics["add_ipynb"] = {
"func": add,
"args": [("path", {"help": "path of .ipynb file"})],
"kwds": [("parents", {
"help": "parents of new ipynb",
"nargs": "+"
})],
"desc": "register .ipynb file"
}