diff --git a/web/pgadmin/misc/bgprocess/__init__.py b/web/pgadmin/misc/bgprocess/__init__.py index 2897329..900e74b 100644 --- a/web/pgadmin/misc/bgprocess/__init__.py +++ b/web/pgadmin/misc/bgprocess/__init__.py @@ -114,7 +114,7 @@ def acknowledge(pid): Positive status """ try: - BatchProcess.acknowledge(pid, True) + BatchProcess.acknowledge(pid) return success_return() except LookupError as lerr: diff --git a/web/pgadmin/misc/bgprocess/process_executor.py b/web/pgadmin/misc/bgprocess/process_executor.py index d4fcd52..0eb46bb 100644 --- a/web/pgadmin/misc/bgprocess/process_executor.py +++ b/web/pgadmin/misc/bgprocess/process_executor.py @@ -34,55 +34,88 @@ import sys import os import argparse import sqlite3 -from datetime import datetime +from datetime import datetime, timedelta, tzinfo from subprocess import Popen, PIPE from threading import Thread import csv -import pytz import codecs +import signal + +IS_WIN = (os.name == 'nt') +IS_PY2 = sys.version_info < (3,) + # SQLite3 needs all string as UTF-8 # We need to make string for Python2/3 compatible -if sys.version_info < (3,): +if IS_PY2: from cStringIO import StringIO - def u(x): return x else: from io import StringIO - def u(x): if hasattr(x, 'decode'): return x.decode() return x +ZERO = timedelta(0) -def usage(): - """ - This function will display usage message. - Args: - None +# Copied the 'UTC' class from the 'pytz' package to allow to run this script +# without any external dependent library, and can be used with any python +# version. +class UTC(tzinfo): + """UTC - Returns: - Displays help message + Optimized UTC implementation. It unpickles using the single module global + instance defined beneath this class declaration. """ + zone = "UTC" - help_msg = """ -Usage: + _utcoffset = ZERO + _dst = ZERO + _tzname = zone -executer.py [-h|--help] - [-p|--process] Process ID - [-d|--db_file] SQLite3 database file path -""" - print(help_msg) + def fromutc(self, dt): + if dt.tzinfo is None: + return self.localize(dt) + return super(UTC.__class__, self).fromutc(dt) + + def utcoffset(self, dt): + return ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return ZERO + + def localize(self, dt, is_dst=False): + '''Convert naive time to local time''' + if dt.tzinfo is not None: + raise ValueError('Not naive datetime (tzinfo is already set)') + return dt.replace(tzinfo=self) + + def normalize(self, dt, is_dst=False): + '''Correct the timezone information on the given datetime''' + if dt.tzinfo is self: + return dt + if dt.tzinfo is None: + raise ValueError('Naive time - no tzinfo set') + return dt.astimezone(self) + + def __repr__(self): + return "" + + def __str__(self): + return "UTC" def get_current_time(format='%Y-%m-%d %H:%M:%S.%f %z'): return datetime.utcnow().replace( - tzinfo=pytz.utc + tzinfo=UTC() ).strftime(format) @@ -93,7 +126,7 @@ class ProcessLogger(Thread): Methods: -------- - * __init__(stream_type, configs) + * __init__(stream_type) - This method is use to initlize the ProcessLogger class object * logging(msg) @@ -103,25 +136,24 @@ class ProcessLogger(Thread): - Reads the stdout/stderr for messages and sent them to logger """ - def __init__(self, stream_type, configs): + def __init__(self, stream_type): """ This method is use to initialize the ProcessLogger class object Args: stream_type: Type of STD (std) - configs: Process details dict Returns: None """ Thread.__init__(self) - self.configs = configs self.process = None self.stream = None + self.encoding = ((sys.stdout and sys.stdout.encoding) or "utf-8") self.logger = codecs.open( os.path.join( - configs['output_directory'], stream_type - ), 'w', "utf-8" + os.environ['OUTDIR'], stream_type + ), 'w', self.encoding, "ignore" ) def attach_process_stream(self, process, stream): @@ -136,6 +168,7 @@ class ProcessLogger(Thread): None """ self.process = process + self.encoding = ((stream and stream.encoding) or self.encoding) self.stream = stream def log(self, msg): @@ -153,12 +186,19 @@ class ProcessLogger(Thread): if msg: self.logger.write( str('{0},{1}').format( - get_current_time(format='%Y%m%d%H%M%S%f'), u(msg) + ProcessLogger.lognum(), msg.decode(self.encoding, 'replace') ) ) return True return False + log_num = 0 + + @classmethod + def lognum(cls): + ProcessLogger.log_num += 1 + return ProcessLogger.log_num + def run(self): if self.process and self.stream: while True: @@ -176,7 +216,7 @@ class ProcessLogger(Thread): self.logger = None -def read_configs(data): +def update_configs(args): """ This reads SQLite3 database and fetches process details @@ -186,34 +226,30 @@ def read_configs(data): Returns: Process details fetched from database as a dict """ - if data.db_file is not None and data.process_id is not None: - conn = sqlite3.connect(data.db_file) - c = conn.cursor() - t = (data.process_id,) - - c.execute('SELECT command, arguments FROM process WHERE \ - exit_code is NULL \ - AND pid=?', t) - - row = c.fetchone() - conn.close() - - if row and len(row) > 1: - configs = { - 'pid': data.process_id, - 'cmd': row[0], - 'args': row[1], - 'output_directory': data.output_directory, - 'db_file': data.db_file - } - return configs - else: - return None + if 'PROCID' in os.environ and 'PRG' in os.environ: + return + + conn = sqlite3.connect(args.db_file) + c = conn.cursor() + t = (args.process_id,) + + c.execute('SELECT command, arguments FROM process WHERE \ + exit_code is NULL \ + AND pid=?', t) + + row = c.fetchone() + conn.close() + + if row and len(row) > 1: + os.environ['PROCID'] = args.process_id + os.environ['PRG'] = row[0] + os.environ['ARGS'] = row[1] + os.environ['OUTDIR'] = args.output_directory + os.environ['DBFILE'] = args.db_file else: - raise ValueError("Please verify pid and db_file arguments.") - + raise ValueError("Please verify the given details.") -def update_configs(kwargs): +def update_status(kwargs): """ This function will updates process stats @@ -223,8 +259,8 @@ def update_configs(kwargs): Returns: None """ - if 'db_file' in kwargs and 'pid' in kwargs: - conn = sqlite3.connect(kwargs['db_file']) + if os.environ['DBFILE'] and os.environ['PROCID']: + conn = sqlite3.connect(os.environ['DBFILE']) sql = 'UPDATE process SET ' params = list() @@ -237,7 +273,7 @@ def update_configs(kwargs): return sql += 'WHERE pid=?' - params.append(kwargs['pid']) + params.append(os.environ['PROCID']) with conn: c = conn.cursor() @@ -250,119 +286,112 @@ def update_configs(kwargs): raise ValueError("Please verify pid and db_file arguments.") -def execute(configs): +def execute(): """ This function will execute the background process - Args: - configs: Process configuration details - Returns: None """ - if configs is not None: - command = [configs['cmd']] - args_csv = StringIO(configs['args']) - args_reader = csv.reader(args_csv, delimiter=str(',')) - for args in args_reader: - command = command + args - args = { - 'pid': configs['pid'], - 'db_file': configs['db_file'] - } - - try: - reload(sys) - sys.setdefaultencoding('utf8') - except: - pass - - # Create seprate thread for stdout and stderr - process_stdout = ProcessLogger('out', configs) - process_stderr = ProcessLogger('err', configs) - - try: - # update start_time - args.update({ - 'start_time': get_current_time(), - 'stdout': process_stdout.log, - 'stderr': process_stderr.log - }) - - # Update start time - update_configs(args) - - if args['pid'] in os.environ: - os.environ['PGPASSWORD'] = os.environ[args['pid']] - - process = Popen( - command, stdout=PIPE, stderr=PIPE, stdin=PIPE, - shell=(os.name == 'nt'), close_fds=(os.name != 'nt') - ) - try: - del (os.environ['PGPASSWORD']) - except: - pass - - # Attach the stream to the process logger, and start logging. - process_stdout.attach_process_stream(process, process.stdout) - process_stdout.start() - process_stderr.attach_process_stream(process, process.stderr) - process_stderr.start() - - # Join both threads together - process_stdout.join() - process_stderr.join() - - # Child process return code - exitCode = process.wait() - - if exitCode is None: - exitCode = process.poll() - - args.update({'exit_code': exitCode}) - - # Add end_time - args.update({'end_time': get_current_time()}) - - # Fetch last output, and error from process if it has missed. - data = process.communicate() - if data: - if data[0]: - process_stdout.log(data[0]) - if data[1]: - process_stderr.log(data[1]) - - # If executable not found or invalid arguments passed - except OSError as e: - if process_stderr: - process_stderr.log(e.strerror) - else: - print("WARNING: ", e.strerror, file=sys.stderr) - args.update({'end_time': get_current_time()}) - args.update({'exit_code': e.errno}) - - # Unknown errors - except Exception as e: - if process_stderr: - process_stderr.log(str(e)) - else: - print("WARNING: ", str(e), file=sys.stderr) - args.update({'end_time': get_current_time()}) - args.update({'exit_code': -1}) - finally: - # Update the execution end_time, and exit-code. - update_configs(args) - if process_stderr: - process_stderr.release() - process_stderr = None - if process_stdout: - process_stdout.release() - process_stdout = None + command = [ + os.environ['PRG'].encode( + (sys.stdout and sys.stdout.encoding) or 'utf-8' + ) if IS_PY2 else os.environ['PRG'] + ] + args_csv = StringIO(os.environ['ARGS']) + args_reader = csv.reader(args_csv, delimiter=str(',')) + for args in args_reader: + command = command + args + + args = dict() + + # Create seprate thread for stdout and stderr + process_stdout = ProcessLogger('out') + process_stderr = ProcessLogger('err') + process = None + + try: + # update start_time + args.update({ + 'start_time': get_current_time(), + 'stdout': process_stdout.log, + 'stderr': process_stderr.log, + 'PID': os.getpid() + }) + + # Update start time + update_status(args) + + if os.environ['PROCID'] in os.environ: + os.environ['PGPASSWORD'] = os.environ[os.environ['PROCID']] + + kwargs = dict() + kwargs['close_fds'] = False + kwargs['shell'] = True if IS_WIN else False + kwargs['env']=os.environ.copy() + + process = Popen( + command, stdout=PIPE, stderr=PIPE, stdin=None, **kwargs + ) - else: - raise ValueError("Please verify process configs.") + # Attach the stream to the process logger, and start logging. + process_stdout.attach_process_stream(process, process.stdout) + process_stdout.start() + process_stderr.attach_process_stream(process, process.stderr) + process_stderr.start() + + # Join both threads together + process_stdout.join() + process_stderr.join() + + # Child process return code + exitCode = process.wait() + + if exitCode is None: + exitCode = process.poll() + args.update({'exit_code': exitCode}) + + # Add end_time + args.update({'end_time': get_current_time()}) + + # Fetch last output, and error from process if it has missed. + data = process.communicate() + if data: + if data[0]: + process_stdout.log(data[0]) + if data[1]: + process_stderr.log(data[1]) + + # If executable not found or invalid arguments passed + except OSError as e: + if process_stderr: + process_stderr.log(e.strerror) + else: + print("WARNING: ", e.strerror, file=sys.stderr) + args.update({'end_time': get_current_time()}) + args.update({'exit_code': e.errno}) + + # Unknown errors + except Exception as e: + if process_stderr: + process_stderr.log(str(e)) + else: + print("WARNING: ", str(e), file=sys.stderr) + args.update({'end_time': get_current_time()}) + args.update({'exit_code': -1}) + finally: + # Update the execution end_time, and exit-code. + update_status(args) + if process_stderr: + process_stderr.release() + if process_stdout: + process_stdout.release() + + +# Let's ignore all the signal comming to us. +def signal_handler(signal, msg): + pass if __name__ == '__main__': # Read command line arguments @@ -370,19 +399,114 @@ if __name__ == '__main__': description='Process executor for pgAdmin 4' ) parser.add_argument( - '-p', '--process_id', help='Process ID', required=True + '-p', '--process_id', help='Process ID', required=False ) parser.add_argument( - '-d', '--db_file', help='Configuration Database', required=True + '-d', '--db_file', help='Configuration Database', required=False ) parser.add_argument( '-o', '--output_directory', - help='Location where the logs will be created', required=True + help='Location where the logs will be created', required=False ) + if IS_WIN: + parser.add_argument( + '-f', '--foreground', action="store_true", + help='Run the process in foreground', required=False + ) args = parser.parse_args() - # Fetch bakcground process details from SQLite3 database file - configs = read_configs(args) + # Ignore any signals + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + if ( + args.db_file is None or args.process_id is None or + args.output_directory is None + ) and ( + not IS_WIN or not args.foreground + ): + parser.print_usage() + sys.exit(1) + + # Fetch background process details from SQLite3 database file + update_configs(args) + + if IS_WIN: + signal.signal(signal.SIGBREAK, signal_handler) + + # For windows: + # We would run the process_executor in the detached mode again to make + # the child process to run as a daemon. And, it would run without + # depending on the status of the web-server. + if args.foreground: + # This is a child process running as the daemon process. + # Let's do the job assing to it. + execute() + else: + if args.db_file is None or args.process_id is None or \ + args.output_directory is None: + parser.print_usage() + raise ValueError( + "Please verify process-id, output-directory and db_file arguments." + ) + + from subprocess import CREATE_NEW_PROCESS_GROUP + DETACHED_PROCESS = 0x00000008 + + # Forward the standard input, output, and error stream to the + # 'devnull'. + stdin = open(os.devnull, "r") + stdout = open(os.devnull, "a") + stderr = open(os.devnull, "a") + + kwargs = { + 'stdin': stdin.fileno(), + 'stdout': stdout.fileno(), + 'stderr': stderr.fileno(), + 'creationflags': CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS, + 'close_fds': True if not IS_WIN else False, + 'cwd': os.environ['OUTDIR'], + 'env': os.environ.copy() + } + + cmd = [ + sys.executable, sys.argv[0], '-p', args.process_id, + '-o', args.output_directory, '-d', args.db_file, '-f' + ] + + p = Popen(cmd, **kwargs) + + # Question: Should we wait for sometime? + sys.exit(0) + else: + r, w = os.pipe() + + # For POSIX: + # We will fork the process, and run the child process as daemon, and + # let it do the job. + if os.fork() == 0: + # Hmm... So - I need to do the job now... + try: + os.close(r) + + # Let me be the process leader first. + os.setsid() + os.umask(0) + + w = os.fdopen(w, 'w') + # Let me inform my parent - I will do the job, do not worry + # now, and die peacefully. + w.write('1') + w.close() + + execute() + except Exception: + sys.exit(1) + else: + os.close(w) + r = os.fdopen(r) + # I do not care, what the child send. + r.read() + r.close() - # Execute the background process - execute(configs) + sys.exit(0) diff --git a/web/pgadmin/misc/bgprocess/processes.py b/web/pgadmin/misc/bgprocess/processes.py index 32aa967..997669d 100644 --- a/web/pgadmin/misc/bgprocess/processes.py +++ b/web/pgadmin/misc/bgprocess/processes.py @@ -19,11 +19,10 @@ import sys from abc import ABCMeta, abstractproperty, abstractmethod from datetime import datetime from pickle import dumps, loads -from subprocess import Popen, PIPE +from subprocess import Popen import pytz from dateutil import parser -from flask import current_app as app from flask_babel import gettext as _ from flask_security import current_user @@ -169,6 +168,19 @@ class BatchProcess(object): db.session.commit() def start(self): + + def which(program, paths): + def is_exe(fpath): + return os.path.exists(fpath) and os.access(fpath, os.X_OK) + + for path in paths: + if not os.path.isdir(path): + continue + exe_file = os.path.join(path, program) + if is_exe(exe_file): + return exe_file + return None + if self.stime is not None: if self.etime is None: raise Exception(_('The process has already been started.')) @@ -179,11 +191,23 @@ class BatchProcess(object): executor = os.path.join( os.path.dirname(__file__), 'process_executor.py' ) + paths = sys.path[:] + interpreter = None + + if os.name == 'nt': + paths.insert(0, os.path.join(sys.prefix, 'Scripts')) + paths.insert(0, os.path.join(sys.prefix)) + + interpreter = which('pythonw.exe', paths) + if interpreter is None: + interpreter = which('python.exe', paths) + else: + paths.insert(0, os.path.join(sys.prefix, 'bin')) + interpreter = which('python', paths) p = None cmd = [ - (sys.executable if not app.PGADMIN_RUNTIME else - 'pythonw.exe' if os.name == 'nt' else 'python'), + interpreter if interpreter is not None else 'python', executor, '-p', self.id, '-o', self.log_dir, @@ -191,9 +215,19 @@ class BatchProcess(object): ] if os.name == 'nt': + DETACHED_PROCESS = 0x00000008 + from subprocess import CREATE_NEW_PROCESS_GROUP + + stdout = os.devnull + stderr = stdout + stdin = open(os.devnull, "r") + stdout = open(stdout, "a") + stderr = open(stderr, "a") + p = Popen( - cmd, stdout=None, stderr=None, stdin=None, close_fds=True, - shell=False, creationflags=0x00000008 + cmd, close_fds=False, stdout=stdout.fileno(), + stderr=stderr.fileno(), stdin=stdin.fileno(), + creationflags=(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS) ) else: def preexec_function(): @@ -204,14 +238,13 @@ class BatchProcess(object): signal.signal(signal.SIGINT, signal.SIG_IGN) p = Popen( - cmd, stdout=PIPE, stderr=None, stdin=None, close_fds=True, - shell=False, preexec_fn=preexec_function + cmd, close_fds=True, stdout=None, stderr=None, stdin=None, + preexec_fn=preexec_function ) self.ecode = p.poll() if self.ecode is not None and self.ecode != 0: # TODO:// Find a way to read error from detached failed process - # Couldn't start execution p = Process.query.filter_by( pid=self.id, user_id=current_user.id @@ -319,7 +352,9 @@ class BatchProcess(object): res = [] for p in processes: - if p.start_time is None or p.acknowledge is not None: + if p.start_time is None or ( + p.acknowledge is not None and p.end_time is None + ): continue execution_time = None @@ -353,7 +388,14 @@ class BatchProcess(object): return res @staticmethod - def acknowledge(_pid, _release): + def acknowledge(_pid): + """ + Acknowledge from the user, he/she has alredy watched the status. + + Update the acknowledgement status, if the process is still running. + And, delete the process information from the configuration, and the log + files related to the process, if it has already been completed. + """ p = Process.query.filter_by( user_id=current_user.id, pid=_pid ).first() @@ -363,33 +405,12 @@ class BatchProcess(object): _("Could not find a process with the specified ID.") ) - if _release: - import shutil - shutil.rmtree(p.logdir, True) + if p.end_time is not None: + logdir = p.logdir db.session.delete(p) + import shutil + shutil.rmtree(logdir, True) else: p.acknowledge = get_current_time() db.session.commit() - - @staticmethod - def release(pid=None): - import shutil - processes = None - - if pid is not None: - processes = Process.query.filter_by( - user_id=current_user.id, pid=pid - ) - else: - processes = Process.query.filter_by( - user_id=current_user.id, - acknowledge=None - ) - - if processes: - for p in processes: - shutil.rmtree(p.logdir, True) - - db.session.delete(p) - db.session.commit() diff --git a/web/pgadmin/misc/bgprocess/static/css/bgprocess.css b/web/pgadmin/misc/bgprocess/static/css/bgprocess.css index 2767772..6f90e3a 100644 --- a/web/pgadmin/misc/bgprocess/static/css/bgprocess.css +++ b/web/pgadmin/misc/bgprocess/static/css/bgprocess.css @@ -165,3 +165,28 @@ ol.pg-bg-process-logs { .bg-process-footer .bg-process-exec-time { padding-right: 0; } + +.pg-bg-bgprocess .ajs-commands { + right: -13px; + top: 2px; + opacity: 0.5; +} + +.pg-bg-bgprocess .bg-close { + display: inline-block; + position: absolute; + height: 25px; + width: 25px; + right: -12px; + top: 3px; + padding: 2px; + border: 2px solid #1f5fa6; + border-radius: 4px; + opacity: 0.5; + background-color: white; + color: red; +} + +.pg-bg-bgprocess:hover .bg-close { + opacity: 0.95; +} diff --git a/web/pgadmin/misc/bgprocess/static/js/bgprocess.js b/web/pgadmin/misc/bgprocess/static/js/bgprocess.js index 122a306..ab205c3 100644 --- a/web/pgadmin/misc/bgprocess/static/js/bgprocess.js +++ b/web/pgadmin/misc/bgprocess/static/js/bgprocess.js @@ -197,7 +197,7 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { setTimeout(function() {self.show.apply(self)}, 10); } - if (self.state != 2 || (self.details && !self.completed)) { + if (self.details && !self.acknowledge && !self.completed) { setTimeout( function() { self.status.apply(self); @@ -232,12 +232,11 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { if (self.notify && !self.details) { if (!self.notifier) { - var content = $('
').append( - $('
', { - class: "h5 pg-bg-notify-header" - }).text( - self.desc - ) + var header = $('
', { + class: "h5 pg-bg-notify-header" + }).append($('').text(self.desc)), + content = $('
').append( + header ).append( $('
', {class: 'pg-bg-notify-body h6' }).append( $('
', {class: 'pg-bg-start col-xs-12' }).append( @@ -249,12 +248,17 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { ), for_details = $('
', { class: "col-xs-12 text-center pg-bg-click h6" - }).text(pgMessages.CLICK_FOR_DETAILED_MSG).appendTo(content), + }).append( + $('').text(pgMessages.CLICK_FOR_DETAILED_MSG) + ).appendTo(content), status = $('
', { class: "pg-bg-status col-xs-12 h5 " + ((self.exit_code === 0) ? 'bg-success': (self.exit_code == 1) ? 'bg-failed' : '') - }).appendTo(content); + }).appendTo(content), + close_me = $( + '
' + ).appendTo(header); self.container = content; self.notifier = alertify.notify( @@ -272,6 +276,12 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { this.show_detailed_view.apply(this); }.bind(self)); + close_me.on('click', function(ev) { + this.notifier.dismiss(); + this.notifier = null; + this.acknowledge_server.apply(this); + }.bind(this)); + // Do not close the notifier, when clicked on the container, which // is a default behaviour. content.on('click', function(ev) { @@ -419,14 +429,14 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { function() { setTimeout( function() { - pgBrowser.BackgroundProcessObsorver.update_process_list(); + pgBrowser.BackgroundProcessObsorver.update_process_list(true); }, 1000 ); } ) }, - update_process_list: function() { + update_process_list: function(recheck) { var observer = this; $.ajax({ @@ -437,10 +447,8 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { async: true, contentType: "application/json", success: function(res) { - if (!res) { - // FIXME:: - // Do you think - we should call the list agains after some - // interval? + var cnt = 0; + if (!res || !_.isArray(res)) { return; } for (idx in res) { @@ -451,6 +459,14 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { } } } + if (recheck && res.length == 0) { + // Recheck after some more time + setTimeout( + function() { + observer.update_process_list(false); + }, 3000 + ); + } }, error: function(res) { // FIXME:: What to do now?