diff --git a/web/pgadmin/utils/driver/abstract.py b/web/pgadmin/utils/driver/abstract.py index 8a23ccb..aa48f25 100644 --- a/web/pgadmin/utils/driver/abstract.py +++ b/web/pgadmin/utils/driver/abstract.py @@ -147,6 +147,7 @@ class BaseConnection(object): ASYNC_READ_TIMEOUT = 2 ASYNC_WRITE_TIMEOUT = 3 ASYNC_NOT_CONNECTED = 4 + ASYNC_EXECUTION_ABORTED = 5 @abstractmethod def connect(self, **kwargs): diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py index db938bc..8f5933a 100644 --- a/web/pgadmin/utils/driver/psycopg2/__init__.py +++ b/web/pgadmin/utils/driver/psycopg2/__init__.py @@ -35,7 +35,7 @@ from .keywords import ScanKeyword _ = gettext -ASYNC_WAIT_TIMEOUT = 0.1 # in seconds or 100 milliseconds +ASYNC_WAIT_TIMEOUT = 0.01 # in seconds or 10 milliseconds class Connection(BaseConnection): @@ -102,6 +102,11 @@ class Connection(BaseConnection): * messages() - Returns the list of messages/notices sends from the PostgreSQL database server. + + * _formatted_error_msg(err_obj) + - This method is used to parse the psycopg2.Error object and returns the + formatted error message. + """ def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0): assert(manager is not None) @@ -116,6 +121,8 @@ class Connection(BaseConnection): self.__async_cursor = None self.__async_query_id = None self.__backend_pid = None + self.formatted_error = None + self.execution_aborted = False super(Connection, self).__init__() @@ -224,6 +231,7 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id} self.conn = pg_conn self.__backend_pid = pg_conn.get_backend_pid() + self.execution_aborted = False # autocommit flag does not work with asynchronous connections. # By default asynchronous connection runs in autocommit mode. @@ -447,15 +455,17 @@ Attempt to reconnect it failed with the below error: return True, None - def execute_async(self, query, params=None): + def execute_async(self, query, params=None, formatted_error=False): """ This function executes the given query asynchronously and returns result. Args: query: SQL query to run. params: extra parameters to the function + formatted_error: if True then function return the formatted error """ status, cur = self.__cursor() + self.formatted_error = formatted_error if not status: return False, str(cur) @@ -472,10 +482,21 @@ Execute (async) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{qu ) try: + self.execution_aborted = False cur.execute(query, params) res = self._wait_timeout(cur.connection, ASYNC_WAIT_TIMEOUT) except psycopg2.Error as pe: - errmsg = str(pe) + if self.formatted_error: + # Get the formatted error message + errmsg = self._formatted_error_msg(pe) + else: + if pe.pgerror: + errmsg = pe.pgerror + elif pe.diag.message_detail: + errmsg = pe.diag.message_detail + else: + errmsg = str(pe) + current_app.logger.error(""" Failed to execute query (execute_async) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg} @@ -728,11 +749,15 @@ Failed to reset the connection of the server due to following error: if state == psycopg2.extensions.POLL_OK: return self.ASYNC_OK elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [conn.fileno()], [], time) - return self.ASYNC_WRITE_TIMEOUT + if select.select([], [conn.fileno()], [], time) == ([], [], []): + return self.ASYNC_WRITE_TIMEOUT + # Call recursively if no timeout + self._wait_timeout(conn, time) elif state == psycopg2.extensions.POLL_READ: - select.select([conn.fileno()], [], [], time) - return self.ASYNC_READ_TIMEOUT + if select.select([conn.fileno()], [], [], time) == ([], [], []): + return self.ASYNC_READ_TIMEOUT + + self._wait_timeout(conn, time) else: raise psycopg2.OperationalError("poll() returned %s from _wait_timeout function" % state) @@ -751,19 +776,46 @@ Failed to reset the connection of the server due to following error: current_app.logger.log(25, """ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_id)) - status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT) + try: + status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT) + except psycopg2.Error as pe: + if self.formatted_error: + # Get the formatted error message + errmsg = self._formatted_error_msg(pe) + else: + if pe.pgerror: + errmsg = pe.pgerror + elif pe.diag.message_detail: + errmsg = pe.diag.message_detail + else: + errmsg = str(pe) + return False, errmsg, None + colinfo = None if status == self.ASYNC_OK: + + # if user has cancelled the transaction then changed the status + if self.execution_aborted: + status = self.ASYNC_CANCEL_TRANSACTION + self.execution_aborted = False + return status, None, colinfo + # Fetch the column information - colinfo = [desc for desc in cur.description] + if cur.description is not None: + colinfo = [desc for desc in cur.description] + result = cur.statusmessage if cur.rowcount > 0: result = [] - # Fetch the data rows. - for row in cur: - result.append(dict(row)) - self.__async_cursor = None - return status, result, colinfo + + try: + for row in cur: + result.append(dict(row)) + except psycopg2.ProgrammingError as e: + result = cur.statusmessage + self.__async_cursor = None + return status, result, colinfo + return status, None, colinfo def cancel_transaction(self, conn_id, did=None): @@ -823,6 +875,9 @@ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_ else: if self.connected(): status, msg = self.execute_void(query) + + if status: + cancel_conn.execution_aborted = True else: status = False msg = gettext("Not connected to the database server.") @@ -835,6 +890,64 @@ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_ """ return self.conn.notices if self.conn else [] + def _formatted_error_msg(self, err_obj): + """ + This method is used to parse the psycopg2.Error object + and returns the formatted error message. + + Args: + err_obj: psycopg2.Error object + + Returns: Formatted error message + """ + + if err_obj.pgerror: + errmsg = err_obj.pgerror + elif err_obj.diag.message_detail: + errmsg = err_obj.diag.message_detail + else: + errmsg = str(err_obj) + + errmsg += '********** Error **********\n\n' + + if err_obj.diag.severity is not None \ + and err_obj.diag.message_primary is not None: + errmsg += err_obj.diag.severity + ": " + err_obj.diag.message_primary + elif err_obj.diag.message_primary is not None: + errmsg += err_obj.diag.message_primary + + if err_obj.diag.sqlstate is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('SQL state: ') + errmsg += err_obj.diag.sqlstate + + if err_obj.diag.message_detail is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Detail: ') + errmsg += err_obj.diag.message_detail + + if err_obj.diag.message_hint is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Hint: ') + errmsg += err_obj.diag.message_hint + + if err_obj.diag.statement_position is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Character: ') + errmsg += err_obj.diag.statement_position + + if err_obj.diag.context is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Context: ') + errmsg += err_obj.diag.context + + return errmsg + class ServerManager(object): """