diff --git a/web/pgadmin/tools/sqleditor/__init__.py b/web/pgadmin/tools/sqleditor/__init__.py index 013e4dc..a9460dd 100644 --- a/web/pgadmin/tools/sqleditor/__init__.py +++ b/web/pgadmin/tools/sqleditor/__init__.py @@ -1480,19 +1480,24 @@ def query_tool_status(trans_id): if conn and trans_obj and session_obj: status = conn.transaction_status() - # Check for the asynchronous notifies statements. - conn.check_notifies(True) - notifies = conn.get_notifies() + if status is not None: + # Check for the asynchronous notifies statements. + conn.check_notifies(True) + notifies = conn.get_notifies() - return make_json_response( - data={ - 'status': status, - 'message': gettext( - CONNECTION_STATUS_MESSAGE_MAPPING.get(status), - ), - 'notifies': notifies - } - ) + return make_json_response( + data={ + 'status': status, + 'message': gettext( + CONNECTION_STATUS_MESSAGE_MAPPING.get(status), + ), + 'notifies': notifies + } + ) + else: + return internal_server_error( + errormsg=gettext("Transaction status check failed.") + ) else: return internal_server_error( errormsg=gettext("Transaction status check failed.") diff --git a/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py new file mode 100644 index 0000000..64155bf --- /dev/null +++ b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py @@ -0,0 +1,132 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2018, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +import os +import re + +from flask import Flask, render_template +from jinja2 import FileSystemLoader + +from pgadmin import VersionedTemplateLoader +from pgadmin.utils.route import BaseTestGenerator +from pgadmin.utils.driver import get_driver +from config import PG_DEFAULT_DRIVER +from pgadmin.browser.server_groups.servers.databases.tests import utils as \ + database_utils + +from regression import parent_node_dict +from regression.python_test_utils import test_utils +import json + + +class TestEncodingCharset(BaseTestGenerator): + """ + This class validates character support in pgAdmin4 for + different PostgresDB encodings + """ + scenarios = [ + ( + 'With Encoding UTF8', + dict( + db_encoding='UTF8', + lc_collate='C', + test_str='A', + set_client_encoding=False + )), + ( + 'With Encoding WIN1252', + dict( + db_encoding='WIN1252', + lc_collate='C', + test_str='A', + set_client_encoding=False + )), + ( + 'With Encoding EUC_CN', + dict( + db_encoding='EUC_CN', + lc_collate='C', + test_str='A', + set_client_encoding=False + )), + ( + 'With Encoding SQL_ASCII', + dict( + db_encoding='SQL_ASCII', + lc_collate='C', + test_str='\\255', + set_client_encoding=True + )), + ] + + def setUp(self): + server_details = parent_node_dict['server'][-1] + server = server_details['server'] + self.main_conn = test_utils.get_db_connection( + server['db'], + server['username'], + server['db_password'], + server['host'], + server['port'], + server['sslmode'] + ) + self.encode_db_name = 'encoding_test' + self.encode_sid = server_details['server_id'] + self.encode_did = test_utils.create_database( + self.server, self.encode_db_name, + (self.db_encoding, self.lc_collate)) + + def runTest(self): + + db_con = database_utils.connect_database(self, + test_utils.SERVER_GROUP, + self.encode_sid, + self.encode_did) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to the database.") + + # Initialize query tool + url = '/datagrid/initialize/query_tool/{0}/{1}/{2}'.format( + test_utils.SERVER_GROUP, self.encode_sid, self.encode_did) + response = self.tester.post(url) + self.assertEquals(response.status_code, 200) + + response_data = json.loads(response.data.decode('utf-8')) + self.trans_id = response_data['data']['gridTransId'] + + if self.set_client_encoding: + # Change Client Encoding + url = "/sqleditor/query_tool/start/{0}".format(self.trans_id) + sql = "set client_encoding = '{0}';".format(self.db_encoding) + response = self.tester.post(url, data=json.dumps({"sql": sql}), + content_type='html/json') + self.assertEquals(response.status_code, 200) + url = '/sqleditor/poll/{0}'.format(self.trans_id) + response = self.tester.get(url) + self.assertEquals(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + + # Check character + url = "/sqleditor/query_tool/start/{0}".format(self.trans_id) + sql = "select E'{0}';".format(self.test_str) + response = self.tester.post(url, data=json.dumps({"sql": sql}), + content_type='html/json') + self.assertEquals(response.status_code, 200) + url = '/sqleditor/poll/{0}'.format(self.trans_id) + response = self.tester.get(url) + self.assertEquals(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + self.assertEquals(response_data['data']['rows_fetched_to'], 1) + + database_utils.disconnect_database(self, self.encode_sid, + self.encode_did) + + def tearDown(self): + test_utils.drop_database(self.main_conn, self.encode_db_name) + self.main_conn.close() diff --git a/web/pgadmin/utils/driver/psycopg2/connection.py b/web/pgadmin/utils/driver/psycopg2/connection.py index cfd161a..e8ca886 100644 --- a/web/pgadmin/utils/driver/psycopg2/connection.py +++ b/web/pgadmin/utils/driver/psycopg2/connection.py @@ -51,6 +51,12 @@ else: _ = gettext +# Replace default ascii encoder with unicode-escape +# which translates characters to unicode format. +# Escape special characters to ASCII based on unicode +encodings['SQL_ASCII'] = 'unicode-escape' +encodings['SQLASCII'] = 'unicode-escape' + # Register global type caster which will be applicable to all connections. register_global_typecasters() diff --git a/web/pgadmin/utils/driver/psycopg2/typecast.py b/web/pgadmin/utils/driver/psycopg2/typecast.py index f136604..ccebb2f 100644 --- a/web/pgadmin/utils/driver/psycopg2/typecast.py +++ b/web/pgadmin/utils/driver/psycopg2/typecast.py @@ -164,7 +164,7 @@ def register_global_typecasters(): def register_string_typecasters(connection): - if connection.encoding != 'UTF8': + if connection.encoding not in ('UTF8', 'SQLASCII', 'SQL_ASCII'): # In python3 when database encoding is other than utf-8 and client # encoding is set to UNICODE then we need to map data from database # encoding to utf-8. diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 3e517b6..464a09e 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -116,7 +116,7 @@ def clear_node_info_dict(): del node_info_dict[node][:] -def create_database(server, db_name): +def create_database(server, db_name, encoding=None): """This function used to create database and returns the database id""" try: connection = get_db_connection( @@ -130,8 +130,14 @@ def create_database(server, db_name): old_isolation_level = connection.isolation_level connection.set_isolation_level(0) pg_cursor = connection.cursor() - pg_cursor.execute( - '''CREATE DATABASE "%s" TEMPLATE template0''' % db_name) + if encoding is None: + pg_cursor.execute( + '''CREATE DATABASE "%s" TEMPLATE template0''' % db_name) + else: + pg_cursor.execute( + '''CREATE DATABASE "%s" TEMPLATE template0 + ENCODING='%s' LC_COLLATE='%s' LC_CTYPE='%s' ''' % + (db_name, encoding[0], encoding[1], encoding[1])) connection.set_isolation_level(old_isolation_level) connection.commit()