diff options
author | Steven Silvester | 2014-08-09 07:35:45 -0500 |
---|---|---|
committer | Steven Silvester | 2014-08-09 07:35:45 -0500 |
commit | 1b464668edaf8e5d60c9e8e2e1eb963caab4bf43 (patch) | |
tree | 10085c6043dc7f2e67ca68dc7840cede50082576 /octave_kernel.py | |
parent | 79ed6181e1b61fc4397511b7beb3f1968d82df7c (diff) | |
download | scilab_kernel-1b464668edaf8e5d60c9e8e2e1eb963caab4bf43.tar.gz scilab_kernel-1b464668edaf8e5d60c9e8e2e1eb963caab4bf43.tar.bz2 scilab_kernel-1b464668edaf8e5d60c9e8e2e1eb963caab4bf43.zip |
Port from octave to scilab
Diffstat (limited to 'octave_kernel.py')
-rw-r--r-- | octave_kernel.py | 353 |
1 files changed, 0 insertions, 353 deletions
diff --git a/octave_kernel.py b/octave_kernel.py deleted file mode 100644 index 86708d6..0000000 --- a/octave_kernel.py +++ /dev/null @@ -1,353 +0,0 @@ -from IPython.kernel.zmq.kernelbase import Kernel -from IPython.utils.path import locate_profile -from IPython.core.oinspect import Inspector, cast_unicode -from oct2py import Oct2PyError, octave - -import os -import signal -from subprocess import check_output -import re -import logging - -__version__ = '0.3' - -version_pat = re.compile(r'version (\d+(\.\d+)+)') - - -class OctaveKernel(Kernel): - implementation = 'octave_kernel' - implementation_version = __version__ - language = 'octave' - - @property - def language_version(self): - m = version_pat.search(self.banner) - return m.group(1) - - _banner = None - - @property - def banner(self): - if self._banner is None: - self._banner = check_output(['octave', - '--version']).decode('utf-8') - return self._banner - - def __init__(self, **kwargs): - Kernel.__init__(self, **kwargs) - # Signal handlers are inherited by forked processes, - # and we can't easily reset it from the subprocess. - # Since kernelapp ignores SIGINT except in message handlers, - # we need to temporarily reset the SIGINT handler here - # so that octave and its children are interruptible. - sig = signal.signal(signal.SIGINT, signal.SIG_DFL) - try: - self.octavewrapper = octave - octave.restart() - finally: - signal.signal(signal.SIGINT, sig) - - self.inspector = Inspector() - self.inspector.set_active_scheme("Linux") - - self.log.setLevel(logging.CRITICAL) - - try: - self.hist_file = os.path.join(locate_profile(), - 'octave_kernel.hist') - except IOError: - self.hist_file = None - self.log.warn('No default profile found, history unavailable') - - self.max_hist_cache = 1000 - self.hist_cache = [] - self.docstring_cache = {} - self.help_cache = {} - - def do_execute(self, code, silent, store_history=True, - user_expressions=None, allow_stdin=False): - """Execute a line of code in Octave.""" - code = code.strip() - abort_msg = {'status': 'abort', - 'execution_count': self.execution_count} - - if code and store_history: - self.hist_cache.append(code) - - if not code or code == 'keyboard' or code.startswith('keyboard('): - return {'status': 'ok', 'execution_count': self.execution_count, - 'payload': [], 'user_expressions': {}} - - elif (code == 'exit' or code.startswith('exit(') - or code == 'quit' or code.startswith('quit(')): - # TODO: exit gracefully here - self.do_shutdown(False) - return abort_msg - - elif code == 'restart': - self.octavewrapper.restart() - return abort_msg - - elif code.endswith('?') or code.startswith('?'): - self._get_help(code) - return abort_msg - - interrupted = False - try: - output = self.octavewrapper._eval([code]) - - except KeyboardInterrupt: - self.octavewrapper._session.proc.send_signal(signal.SIGINT) - interrupted = True - output = 'Octave Session Interrupted' - - except Oct2PyError as e: - return self._handle_error(str(e)) - - except Exception: - self.octavewrapper.restart() - output = 'Uncaught Exception, Restarting Octave' - - else: - if output is None: - output = '' - elif output == 'Octave Session Interrupted': - interrupted = True - - if not silent: - stream_content = {'name': 'stdout', 'data': output} - self.send_response(self.iopub_socket, 'stream', stream_content) - - if interrupted: - return abort_msg - - return {'status': 'ok', 'execution_count': self.execution_count, - 'payload': [], 'user_expressions': {}} - - def do_complete(self, code, cursor_pos): - """Get code completions using Octave's 'completion_matches'""" - code = code[:cursor_pos] - default = {'matches': [], 'cursor_start': 0, - 'cursor_end': cursor_pos, 'metadata': dict(), - 'status': 'ok'} - - if code[-1] == ' ': - return default - - tokens = code.replace(';', ' ').split() - if not tokens: - return default - token = tokens[-1] - - if os.sep in token: - dname = os.path.dirname(token) - rest = os.path.basename(token) - - if os.path.exists(dname): - files = os.listdir(dname) - matches = [f for f in files if f.startswith(rest)] - start = cursor_pos - len(rest) - - else: - return default - - else: - start = cursor_pos - len(token) - cmd = 'completion_matches("%s")' % token - output = self.octavewrapper._eval([cmd]) - matches = output.split() - - for item in dir(self.octavewrapper): - if item.startswith(token) and not item in matches: - matches.append(item) - - return {'matches': matches, 'cursor_start': start, - 'cursor_end': cursor_pos, 'metadata': dict(), - 'status': 'ok'} - - def do_inspect(self, code, cursor_pos, detail_level=0): - """If the code ends with a (, try to return a calltip docstring""" - default = {'status': 'aborted', 'data': dict(), 'metadata': dict()} - if (not code or not len(code) >= cursor_pos or - not code[cursor_pos - 1] == '('): - return default - - else: - token = code[:cursor_pos - 1].replace(';', '').split()[-1] - if token in self.docstring_cache: - docstring = self.docstring_cache[token] - - elif token in self.help_cache: - docstring = self.help_cache[token]['docstring'] - - else: - docstring = self._get_octave_info(token, - detail_level)['docstring'] - self.docstring_cache[token] = docstring - - if docstring: - data = {'text/plain': docstring} - return {'status': 'ok', 'data': data, 'metadata': dict()} - - return default - - def do_history(self, hist_access_type, output, raw, session=None, - start=None, stop=None, n=None, pattern=None, unique=False): - """Access history at startup. - """ - if not self.hist_file: - return {'history': []} - - if not os.path.exists(self.hist_file): - with open(self.hist_file, 'wb') as fid: - fid.write('') - - with open(self.hist_file, 'rb') as fid: - history = fid.readlines() - - history = history[:self.max_hist_cache] - self.hist_cache = history - self.log.debug('**HISTORY:') - self.log.debug(history) - history = [(None, None, h) for h in history] - - return {'history': history} - - def do_shutdown(self, restart): - """Shut down the app gracefully, saving history. - """ - self.log.debug("**Shutting down") - - if restart: - self.octavewrapper.restart() - - else: - self.octavewrapper.close() - - if self.hist_file: - with open(self.hist_file, 'wb') as fid: - fid.write('\n'.join(self.hist_cache[-self.max_hist_cache:])) - - return {'status': 'ok', 'restart': restart} - - def _get_help(self, code): - if code.startswith('??') or code.endswith('??'): - detail_level = 1 - else: - detail_level = 0 - - code = code.replace('?', '') - tokens = code.replace(';', ' ').split() - if not tokens: - return - token = tokens[-1] - - if token in self.help_cache: - info = self.help_cache[token] - - else: - info = self._get_octave_info(token, detail_level) - self.help_cache[token] = info - if token in self.docstring_cache: - del self.docstring_cache[token] - - output = self._get_printable_info(info, detail_level) - stream_content = {'name': 'stdout', 'data': output} - self.send_response(self.iopub_socket, 'stream', stream_content) - - def _handle_error(self, err): - if 'parse error:' in err: - err = 'Parse Error' - - elif 'Octave returned:' in err: - err = err[err.index('Octave returned:'):] - err = err[len('Octave returned:'):].lstrip() - - elif 'Syntax Error' in err: - err = 'Syntax Error' - - stream_content = {'name': 'stdout', 'data': err.strip()} - self.send_response(self.iopub_socket, 'stream', stream_content) - - return {'status': 'error', 'execution_count': self.execution_count, - 'ename': '', 'evalue': err, 'traceback': []} - - def _get_printable_info(self, info, detail_level=0): - inspector = self.inspector - displayfields = [] - - def add_fields(fields): - for title, key in fields: - field = info[key] - if field is not None: - displayfields.append((title, field.rstrip())) - - add_fields(inspector.pinfo_fields1) - add_fields(inspector.pinfo_fields2) - add_fields(inspector.pinfo_fields3) - - # Source or docstring, depending on detail level and whether - # source found. - if detail_level > 0 and info['source'] is not None: - source = cast_unicode(info['source']) - displayfields.append(("Source", source)) - - elif info['docstring'] is not None: - displayfields.append(("Docstring", info["docstring"])) - - # Info for objects: - else: - add_fields(inspector.pinfo_fields_obj) - - # Finally send to printer/pager: - if displayfields: - return inspector._format_fields(displayfields) - - def _get_octave_info(self, obj, detail_level): - info = dict(argspec=None, base_class=None, call_def=None, - call_docstring=None, class_docstring=None, - definition=None, docstring=None, file=None, - found=False, init_definition=None, - init_docstring=None, isalias=0, isclass=None, - ismagic=0, length=None, name='', namespace=None, - source=None, string_form=None, type_name='') - - oc = self.octavewrapper - - if obj in dir(oc): - obj = getattr(oc, obj) - return self.inspector.info(obj, detail_level=detail_level) - - exist = oc.run('exist "%s"' % obj) - if exist.endswith('0'): - return info - - try: - help_str = oc.run('help %s' % obj) - except Oct2PyError: - help_str = None - type_str = oc.type(obj)[0].strip() - cls_str = oc.run("class %s" % obj)[6:] - - type_first_line = type_str.splitlines()[0] - type_str = '\n'.join(type_str.splitlines()[1:]) - is_var = 'is a variable' in type_first_line - - info['found'] = True - info['docstring'] = help_str or type_first_line - info['type_name'] = cls_str if is_var else 'built-in function' - info['source'] = help_str - info['string_form'] = obj if not is_var else type_str.rstrip() - - if type_first_line.rstrip().endswith('.m'): - info['file'] = type_first_line.split()[-1] - info['type_name'] = 'function' - info['source'] = type_str - if not help_str: - info['docstring'] = None - - return info - -if __name__ == '__main__': - from IPython.kernel.zmq.kernelapp import IPKernelApp - IPKernelApp.launch_instance(kernel_class=OctaveKernel) |