summaryrefslogtreecommitdiff
path: root/octave_kernel.py
diff options
context:
space:
mode:
authorSteven Silvester2014-08-09 07:35:45 -0500
committerSteven Silvester2014-08-09 07:35:45 -0500
commit1b464668edaf8e5d60c9e8e2e1eb963caab4bf43 (patch)
tree10085c6043dc7f2e67ca68dc7840cede50082576 /octave_kernel.py
parent79ed6181e1b61fc4397511b7beb3f1968d82df7c (diff)
downloadscilab_kernel-1b464668edaf8e5d60c9e8e2e1eb963caab4bf43.tar.gz
scilab_kernel-1b464668edaf8e5d60c9e8e2e1eb963caab4bf43.tar.bz2
scilab_kernel-1b464668edaf8e5d60c9e8e2e1eb963caab4bf43.zip
Port from octave to scilab
Diffstat (limited to 'octave_kernel.py')
-rw-r--r--octave_kernel.py353
1 files changed, 0 insertions, 353 deletions
diff --git a/octave_kernel.py b/octave_kernel.py
deleted file mode 100644
index 86708d6..0000000
--- a/octave_kernel.py
+++ /dev/null
@@ -1,353 +0,0 @@
-from IPython.kernel.zmq.kernelbase import Kernel
-from IPython.utils.path import locate_profile
-from IPython.core.oinspect import Inspector, cast_unicode
-from oct2py import Oct2PyError, octave
-
-import os
-import signal
-from subprocess import check_output
-import re
-import logging
-
-__version__ = '0.3'
-
-version_pat = re.compile(r'version (\d+(\.\d+)+)')
-
-
-class OctaveKernel(Kernel):
- implementation = 'octave_kernel'
- implementation_version = __version__
- language = 'octave'
-
- @property
- def language_version(self):
- m = version_pat.search(self.banner)
- return m.group(1)
-
- _banner = None
-
- @property
- def banner(self):
- if self._banner is None:
- self._banner = check_output(['octave',
- '--version']).decode('utf-8')
- return self._banner
-
- def __init__(self, **kwargs):
- Kernel.__init__(self, **kwargs)
- # Signal handlers are inherited by forked processes,
- # and we can't easily reset it from the subprocess.
- # Since kernelapp ignores SIGINT except in message handlers,
- # we need to temporarily reset the SIGINT handler here
- # so that octave and its children are interruptible.
- sig = signal.signal(signal.SIGINT, signal.SIG_DFL)
- try:
- self.octavewrapper = octave
- octave.restart()
- finally:
- signal.signal(signal.SIGINT, sig)
-
- self.inspector = Inspector()
- self.inspector.set_active_scheme("Linux")
-
- self.log.setLevel(logging.CRITICAL)
-
- try:
- self.hist_file = os.path.join(locate_profile(),
- 'octave_kernel.hist')
- except IOError:
- self.hist_file = None
- self.log.warn('No default profile found, history unavailable')
-
- self.max_hist_cache = 1000
- self.hist_cache = []
- self.docstring_cache = {}
- self.help_cache = {}
-
- def do_execute(self, code, silent, store_history=True,
- user_expressions=None, allow_stdin=False):
- """Execute a line of code in Octave."""
- code = code.strip()
- abort_msg = {'status': 'abort',
- 'execution_count': self.execution_count}
-
- if code and store_history:
- self.hist_cache.append(code)
-
- if not code or code == 'keyboard' or code.startswith('keyboard('):
- return {'status': 'ok', 'execution_count': self.execution_count,
- 'payload': [], 'user_expressions': {}}
-
- elif (code == 'exit' or code.startswith('exit(')
- or code == 'quit' or code.startswith('quit(')):
- # TODO: exit gracefully here
- self.do_shutdown(False)
- return abort_msg
-
- elif code == 'restart':
- self.octavewrapper.restart()
- return abort_msg
-
- elif code.endswith('?') or code.startswith('?'):
- self._get_help(code)
- return abort_msg
-
- interrupted = False
- try:
- output = self.octavewrapper._eval([code])
-
- except KeyboardInterrupt:
- self.octavewrapper._session.proc.send_signal(signal.SIGINT)
- interrupted = True
- output = 'Octave Session Interrupted'
-
- except Oct2PyError as e:
- return self._handle_error(str(e))
-
- except Exception:
- self.octavewrapper.restart()
- output = 'Uncaught Exception, Restarting Octave'
-
- else:
- if output is None:
- output = ''
- elif output == 'Octave Session Interrupted':
- interrupted = True
-
- if not silent:
- stream_content = {'name': 'stdout', 'data': output}
- self.send_response(self.iopub_socket, 'stream', stream_content)
-
- if interrupted:
- return abort_msg
-
- return {'status': 'ok', 'execution_count': self.execution_count,
- 'payload': [], 'user_expressions': {}}
-
- def do_complete(self, code, cursor_pos):
- """Get code completions using Octave's 'completion_matches'"""
- code = code[:cursor_pos]
- default = {'matches': [], 'cursor_start': 0,
- 'cursor_end': cursor_pos, 'metadata': dict(),
- 'status': 'ok'}
-
- if code[-1] == ' ':
- return default
-
- tokens = code.replace(';', ' ').split()
- if not tokens:
- return default
- token = tokens[-1]
-
- if os.sep in token:
- dname = os.path.dirname(token)
- rest = os.path.basename(token)
-
- if os.path.exists(dname):
- files = os.listdir(dname)
- matches = [f for f in files if f.startswith(rest)]
- start = cursor_pos - len(rest)
-
- else:
- return default
-
- else:
- start = cursor_pos - len(token)
- cmd = 'completion_matches("%s")' % token
- output = self.octavewrapper._eval([cmd])
- matches = output.split()
-
- for item in dir(self.octavewrapper):
- if item.startswith(token) and not item in matches:
- matches.append(item)
-
- return {'matches': matches, 'cursor_start': start,
- 'cursor_end': cursor_pos, 'metadata': dict(),
- 'status': 'ok'}
-
- def do_inspect(self, code, cursor_pos, detail_level=0):
- """If the code ends with a (, try to return a calltip docstring"""
- default = {'status': 'aborted', 'data': dict(), 'metadata': dict()}
- if (not code or not len(code) >= cursor_pos or
- not code[cursor_pos - 1] == '('):
- return default
-
- else:
- token = code[:cursor_pos - 1].replace(';', '').split()[-1]
- if token in self.docstring_cache:
- docstring = self.docstring_cache[token]
-
- elif token in self.help_cache:
- docstring = self.help_cache[token]['docstring']
-
- else:
- docstring = self._get_octave_info(token,
- detail_level)['docstring']
- self.docstring_cache[token] = docstring
-
- if docstring:
- data = {'text/plain': docstring}
- return {'status': 'ok', 'data': data, 'metadata': dict()}
-
- return default
-
- def do_history(self, hist_access_type, output, raw, session=None,
- start=None, stop=None, n=None, pattern=None, unique=False):
- """Access history at startup.
- """
- if not self.hist_file:
- return {'history': []}
-
- if not os.path.exists(self.hist_file):
- with open(self.hist_file, 'wb') as fid:
- fid.write('')
-
- with open(self.hist_file, 'rb') as fid:
- history = fid.readlines()
-
- history = history[:self.max_hist_cache]
- self.hist_cache = history
- self.log.debug('**HISTORY:')
- self.log.debug(history)
- history = [(None, None, h) for h in history]
-
- return {'history': history}
-
- def do_shutdown(self, restart):
- """Shut down the app gracefully, saving history.
- """
- self.log.debug("**Shutting down")
-
- if restart:
- self.octavewrapper.restart()
-
- else:
- self.octavewrapper.close()
-
- if self.hist_file:
- with open(self.hist_file, 'wb') as fid:
- fid.write('\n'.join(self.hist_cache[-self.max_hist_cache:]))
-
- return {'status': 'ok', 'restart': restart}
-
- def _get_help(self, code):
- if code.startswith('??') or code.endswith('??'):
- detail_level = 1
- else:
- detail_level = 0
-
- code = code.replace('?', '')
- tokens = code.replace(';', ' ').split()
- if not tokens:
- return
- token = tokens[-1]
-
- if token in self.help_cache:
- info = self.help_cache[token]
-
- else:
- info = self._get_octave_info(token, detail_level)
- self.help_cache[token] = info
- if token in self.docstring_cache:
- del self.docstring_cache[token]
-
- output = self._get_printable_info(info, detail_level)
- stream_content = {'name': 'stdout', 'data': output}
- self.send_response(self.iopub_socket, 'stream', stream_content)
-
- def _handle_error(self, err):
- if 'parse error:' in err:
- err = 'Parse Error'
-
- elif 'Octave returned:' in err:
- err = err[err.index('Octave returned:'):]
- err = err[len('Octave returned:'):].lstrip()
-
- elif 'Syntax Error' in err:
- err = 'Syntax Error'
-
- stream_content = {'name': 'stdout', 'data': err.strip()}
- self.send_response(self.iopub_socket, 'stream', stream_content)
-
- return {'status': 'error', 'execution_count': self.execution_count,
- 'ename': '', 'evalue': err, 'traceback': []}
-
- def _get_printable_info(self, info, detail_level=0):
- inspector = self.inspector
- displayfields = []
-
- def add_fields(fields):
- for title, key in fields:
- field = info[key]
- if field is not None:
- displayfields.append((title, field.rstrip()))
-
- add_fields(inspector.pinfo_fields1)
- add_fields(inspector.pinfo_fields2)
- add_fields(inspector.pinfo_fields3)
-
- # Source or docstring, depending on detail level and whether
- # source found.
- if detail_level > 0 and info['source'] is not None:
- source = cast_unicode(info['source'])
- displayfields.append(("Source", source))
-
- elif info['docstring'] is not None:
- displayfields.append(("Docstring", info["docstring"]))
-
- # Info for objects:
- else:
- add_fields(inspector.pinfo_fields_obj)
-
- # Finally send to printer/pager:
- if displayfields:
- return inspector._format_fields(displayfields)
-
- def _get_octave_info(self, obj, detail_level):
- info = dict(argspec=None, base_class=None, call_def=None,
- call_docstring=None, class_docstring=None,
- definition=None, docstring=None, file=None,
- found=False, init_definition=None,
- init_docstring=None, isalias=0, isclass=None,
- ismagic=0, length=None, name='', namespace=None,
- source=None, string_form=None, type_name='')
-
- oc = self.octavewrapper
-
- if obj in dir(oc):
- obj = getattr(oc, obj)
- return self.inspector.info(obj, detail_level=detail_level)
-
- exist = oc.run('exist "%s"' % obj)
- if exist.endswith('0'):
- return info
-
- try:
- help_str = oc.run('help %s' % obj)
- except Oct2PyError:
- help_str = None
- type_str = oc.type(obj)[0].strip()
- cls_str = oc.run("class %s" % obj)[6:]
-
- type_first_line = type_str.splitlines()[0]
- type_str = '\n'.join(type_str.splitlines()[1:])
- is_var = 'is a variable' in type_first_line
-
- info['found'] = True
- info['docstring'] = help_str or type_first_line
- info['type_name'] = cls_str if is_var else 'built-in function'
- info['source'] = help_str
- info['string_form'] = obj if not is_var else type_str.rstrip()
-
- if type_first_line.rstrip().endswith('.m'):
- info['file'] = type_first_line.split()[-1]
- info['type_name'] = 'function'
- info['source'] = type_str
- if not help_str:
- info['docstring'] = None
-
- return info
-
-if __name__ == '__main__':
- from IPython.kernel.zmq.kernelapp import IPKernelApp
- IPKernelApp.launch_instance(kernel_class=OctaveKernel)