Source code for dataprocessor.ipynb
# -*- coding: utf-8 -*-
import os.path as op
import psutil
import json
from .utility import check_file
from .exception import DataProcessorError as dpError
[docs]def gather_notebooks():
""" Gather processes of IPython Notebook
Return
------
notes : list of dict
each dict has following keys: "pid", "cwd", and "port"
Raises
------
DataProcessorError
- No IPython Notebook found
"""
notes = []
for p in psutil.process_iter():
if not p.name().lower() in ["ipython", "python"]:
continue
if "notebook" not in p.cmdline():
continue
for net in p.connections(kind="inet4"):
if net.status != "LISTEN":
continue
_, port = net.laddr
break
notes.append({
"pid": p.pid,
"cwd": p.cwd(),
"port": port,
})
if not notes:
raise dpError("No IPython Notebook found")
return notes
[docs]def resolve_url(ipynb_path, notebooks=None):
"""
Return valid URL for .ipynb
Parameters
----------
ipynb_path : str
path of existing .ipynb file
Raises
------
DataProcessorError
- Existing notebook servers do not start
on the parent directory of .ipynb file.
"""
ipynb_path = check_file(ipynb_path)
if not notebooks:
notebooks = gather_notebooks()
for note in notebooks:
cwd = note["cwd"]
if cwd.endswith("/"):
cwd = cwd[:-1]
if not ipynb_path.startswith(cwd):
continue
note["postfix"] = ipynb_path[len(cwd) + 1:] # remove '/'
return "http://localhost:{port}/notebooks/{postfix}".format(**note)
raise dpError("No valid Notebook found. "
"Please stand notebook server on the parent directory.")
[docs]def resolve_name(ipynb_path):
ipynb_path = check_file(ipynb_path)
try:
with open(ipynb_path, "r") as f:
name = json.load(f)["metdata"]["name"]
except KeyError:
name = ""
if not name:
name = op.basename(ipynb_path)
return name