diff --git a/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py new file mode 100644 index 00000000..833e471e --- /dev/null +++ b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py @@ -0,0 +1,121 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2018, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from pgadmin.utils.route import BaseTestGenerator +from pgadmin.browser.server_groups.servers.databases.tests import utils as \ + database_utils +from regression import parent_node_dict +from regression.python_test_utils import test_utils +import json + + +class TestEncodingCharset(BaseTestGenerator): + """ + This class validates character support in pgAdmin4 for + different PostgresDB encodings + """ + scenarios = [ + ( + 'With Encoding UTF8', + dict( + db_encoding='UTF8', + lc_collate='C', + test_str='A', + set_client_encoding=False + )), + ( + 'With Encoding WIN1252', + dict( + db_encoding='WIN1252', + lc_collate='C', + test_str='A', + set_client_encoding=False + )), + ( + 'With Encoding EUC_CN', + dict( + db_encoding='EUC_CN', + lc_collate='C', + test_str='A', + set_client_encoding=False + )), + ( + 'With Encoding SQL_ASCII', + dict( + db_encoding='SQL_ASCII', + lc_collate='C', + test_str='\\255', + set_client_encoding=True + )), + ] + + def setUp(self): + self.encode_db_name = 'encoding_' + self.db_encoding + self.encode_sid = self.server_information['server_id'] + self.encode_did = test_utils.create_database( + self.server, self.encode_db_name, + (self.db_encoding, self.lc_collate)) + + def runTest(self): + + db_con = database_utils.connect_database(self, + test_utils.SERVER_GROUP, + self.encode_sid, + self.encode_did) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to the database.") + + # Initialize query tool + url = '/datagrid/initialize/query_tool/{0}/{1}/{2}'.format( + test_utils.SERVER_GROUP, self.encode_sid, self.encode_did) + response = self.tester.post(url) + self.assertEquals(response.status_code, 200) + + response_data = json.loads(response.data.decode('utf-8')) + self.trans_id = response_data['data']['gridTransId'] + + if self.set_client_encoding: + # Change Client Encoding + url = "/sqleditor/query_tool/start/{0}".format(self.trans_id) + sql = "set client_encoding = '{0}';".format(self.db_encoding) + response = self.tester.post(url, data=json.dumps({"sql": sql}), + content_type='html/json') + self.assertEquals(response.status_code, 200) + url = '/sqleditor/poll/{0}'.format(self.trans_id) + response = self.tester.get(url) + self.assertEquals(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + + # Check character + url = "/sqleditor/query_tool/start/{0}".format(self.trans_id) + sql = "select E'{0}';".format(self.test_str) + response = self.tester.post(url, data=json.dumps({"sql": sql}), + content_type='html/json') + self.assertEquals(response.status_code, 200) + url = '/sqleditor/poll/{0}'.format(self.trans_id) + response = self.tester.get(url) + self.assertEquals(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + self.assertEquals(response_data['data']['rows_fetched_to'], 1) + + database_utils.disconnect_database(self, self.encode_sid, + self.encode_did) + + def tearDown(self): + server = [server['server'] for server in parent_node_dict['server'] + if server['server_id'] == self.encode_sid][0] + main_conn = test_utils.get_db_connection( + server['db'], + server['username'], + server['db_password'], + server['host'], + server['port'], + server['sslmode'] + ) + test_utils.drop_database(main_conn, self.encode_db_name) diff --git a/web/pgadmin/utils/driver/psycopg2/connection.py b/web/pgadmin/utils/driver/psycopg2/connection.py index cfd161a0..e8ca886e 100644 --- a/web/pgadmin/utils/driver/psycopg2/connection.py +++ b/web/pgadmin/utils/driver/psycopg2/connection.py @@ -51,6 +51,12 @@ else: _ = gettext +# Replace default ascii encoder with unicode-escape +# which translates characters to unicode format. +# Escape special characters to ASCII based on unicode +encodings['SQL_ASCII'] = 'unicode-escape' +encodings['SQLASCII'] = 'unicode-escape' + # Register global type caster which will be applicable to all connections. register_global_typecasters() diff --git a/web/pgadmin/utils/driver/psycopg2/typecast.py b/web/pgadmin/utils/driver/psycopg2/typecast.py index f1366049..a3fa2bc5 100644 --- a/web/pgadmin/utils/driver/psycopg2/typecast.py +++ b/web/pgadmin/utils/driver/psycopg2/typecast.py @@ -164,7 +164,7 @@ def register_global_typecasters(): def register_string_typecasters(connection): - if connection.encoding != 'UTF8': + if connection.encoding not in ('UTF8', 'SQLASCII', 'SQL_ASCII'): # In python3 when database encoding is other than utf-8 and client # encoding is set to UNICODE then we need to map data from database # encoding to utf-8. @@ -202,8 +202,8 @@ def register_string_typecasters(connection): (1002, 1003, 1009, 1014, 1015, 0 ), 'UNICODEARRAY', unicode_type) - psycopg2.extensions.register_type(unicode_type) - psycopg2.extensions.register_type(unicode_array_type) + psycopg2.extensions.register_type(unicode_type, connection) + psycopg2.extensions.register_type(unicode_array_type, connection) def register_binary_typecasters(connection): diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 3e517b61..464a09e1 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -116,7 +116,7 @@ def clear_node_info_dict(): del node_info_dict[node][:] -def create_database(server, db_name): +def create_database(server, db_name, encoding=None): """This function used to create database and returns the database id""" try: connection = get_db_connection( @@ -130,8 +130,14 @@ def create_database(server, db_name): old_isolation_level = connection.isolation_level connection.set_isolation_level(0) pg_cursor = connection.cursor() - pg_cursor.execute( - '''CREATE DATABASE "%s" TEMPLATE template0''' % db_name) + if encoding is None: + pg_cursor.execute( + '''CREATE DATABASE "%s" TEMPLATE template0''' % db_name) + else: + pg_cursor.execute( + '''CREATE DATABASE "%s" TEMPLATE template0 + ENCODING='%s' LC_COLLATE='%s' LC_CTYPE='%s' ''' % + (db_name, encoding[0], encoding[1], encoding[1])) connection.set_isolation_level(old_isolation_level) connection.commit()