# Postgres Related regression tests # # Type the following command to launch start the tests: # $ test/run_tests -P "load_contrib('postgres')" -t test/contrib/postgres.uts + postgres = postgres initialization from scapy.contrib.postgres import * ssl_request = "\x00\x00\x00\x08\x04\xd2\x16\x2f" startup = Startup( b"\x00\x00\x00\x57\x00\x03\x00\x00\x75\x73\x65\x72\x00\x70\x6f\x73" b"\x74\x67\x72\x65\x73\x00\x64\x61\x74\x61\x62\x61\x73\x65\x00\x70" b"\x6f\x73\x74\x67\x72\x65\x73\x00\x61\x70\x70\x6c\x69\x63\x61\x74" b"\x69\x6f\x6e\x5f\x6e\x61\x6d\x65\x00\x70\x73\x71\x6c\x00\x63\x6c" b"\x69\x65\x6e\x74\x5f\x65\x6e\x63\x6f\x64\x69\x6e\x67\x00\x57\x49" b"\x4e\x31\x32\x35\x32\x00\x00" ) assert startup.len == 87 assert startup.protocol_version_major == 3 assert startup.protocol_version_minor == 0 assert ( startup.options == b"user\x00postgres\x00database\x00postgres\x00application_name\x00psql\x00client_encoding\x00WIN1252\x00\x00" ) init_packet = ( b"\x52\x00\x00\x00\x08\x00\x00\x00\x00\x53\x00\x00\x00\x1a\x61\x70" b"\x70\x6c\x69\x63\x61\x74\x69\x6f\x6e\x5f\x6e\x61\x6d\x65\x00\x70" b"\x73\x71\x6c\x00\x53\x00\x00\x00\x1c\x63\x6c\x69\x65\x6e\x74\x5f" b"\x65\x6e\x63\x6f\x64\x69\x6e\x67\x00\x57\x49\x4e\x31\x32\x35\x32" b"\x00\x53\x00\x00\x00\x17\x44\x61\x74\x65\x53\x74\x79\x6c\x65\x00" b"\x49\x53\x4f\x2c\x20\x4d\x44\x59\x00\x53\x00\x00\x00\x26\x64\x65" b"\x66\x61\x75\x6c\x74\x5f\x74\x72\x61\x6e\x73\x61\x63\x74\x69\x6f" b"\x6e\x5f\x72\x65\x61\x64\x5f\x6f\x6e\x6c\x79\x00\x6f\x66\x66\x00" b"\x53\x00\x00\x00\x17\x69\x6e\x5f\x68\x6f\x74\x5f\x73\x74\x61\x6e" b"\x64\x62\x79\x00\x6f\x66\x66\x00\x53\x00\x00\x00\x19\x69\x6e\x74" b"\x65\x67\x65\x72\x5f\x64\x61\x74\x65\x74\x69\x6d\x65\x73\x00\x6f" b"\x6e\x00\x53\x00\x00\x00\x1b\x49\x6e\x74\x65\x72\x76\x61\x6c\x53" b"\x74\x79\x6c\x65\x00\x70\x6f\x73\x74\x67\x72\x65\x73\x00\x53\x00" b"\x00\x00\x14\x69\x73\x5f\x73\x75\x70\x65\x72\x75\x73\x65\x72\x00" b"\x6f\x6e\x00\x53\x00\x00\x00\x19\x73\x65\x72\x76\x65\x72\x5f\x65" b"\x6e\x63\x6f\x64\x69\x6e\x67\x00\x55\x54\x46\x38\x00\x53\x00\x00" b"\x00\x32\x73\x65\x72\x76\x65\x72\x5f\x76\x65\x72\x73\x69\x6f\x6e" b"\x00\x31\x34\x2e\x32\x20\x28\x44\x65\x62\x69\x61\x6e\x20\x31\x34" b"\x2e\x32\x2d\x31\x2e\x70\x67\x64\x67\x31\x31\x30\x2b\x31\x29\x00" b"\x53\x00\x00\x00\x23\x73\x65\x73\x73\x69\x6f\x6e\x5f\x61\x75\x74" b"\x68\x6f\x72\x69\x7a\x61\x74\x69\x6f\x6e\x00\x70\x6f\x73\x74\x67" b"\x72\x65\x73\x00\x53\x00\x00\x00\x23\x73\x74\x61\x6e\x64\x61\x72" b"\x64\x5f\x63\x6f\x6e\x66\x6f\x72\x6d\x69\x6e\x67\x5f\x73\x74\x72" b"\x69\x6e\x67\x73\x00\x6f\x6e\x00\x53\x00\x00\x00\x15\x54\x69\x6d" b"\x65\x5a\x6f\x6e\x65\x00\x45\x74\x63\x2f\x55\x54\x43\x00\x4b\x00" b"\x00\x00\x0c\x00\x00\x01\x7f\x43\x4c\x36\xa5\x5a\x00\x00\x00\x05\x49" ) = postgres backend sequence init = PostgresBackend(init_packet) assert isinstance(init.contents[0], Authentication) assert init.contents[0].len == 8 assert init.contents[0].method == 0 assert len(init.contents) == 16 assert isinstance(init.contents[1], ParameterStatus) assert init.contents[1].len == 26 assert init.contents[1].parameter == b"application_name" assert init.contents[1].value == b"psql" = simple queries simple_query_packet = ( b"\x51\x00\x00\x00\x15\x53\x45\x4c\x45\x43\x54\x20\x56\x45\x52\x53" b"\x49\x4f\x4e\x28\x29\x00" ) simple_query = PostgresFrontend(simple_query_packet) assert isinstance(simple_query.contents[0], Query) assert simple_query.contents[0].len == 21 assert simple_query.contents[0].query == b"SELECT VERSION()" pair = SignedIntStrPair(b"\x00\x00\x00\x04\x01\x02\x03\x04") assert pair.len == 4 assert pair.data == b"\x01\x02\x03\x04" command_response_packet = ( b"\x54\x00\x00\x00\x20\x00\x01\x76\x65\x72\x73\x69\x6f\x6e\x00\x00" b"\x00\x00\x00\x00\x00\x00\x00\x00\x19\xff\xff\xff\xff\xff\xff\x00" b"\x00\x44\x00\x00\x00\x85\x00\x01\x00\x00\x00\x7b\x50\x6f\x73\x74" b"\x67\x72\x65\x53\x51\x4c\x20\x31\x34\x2e\x32\x20\x28\x44\x65\x62" b"\x69\x61\x6e\x20\x31\x34\x2e\x32\x2d\x31\x2e\x70\x67\x64\x67\x31" b"\x31\x30\x2b\x31\x29\x20\x6f\x6e\x20\x78\x38\x36\x5f\x36\x34\x2d" b"\x70\x63\x2d\x6c\x69\x6e\x75\x78\x2d\x67\x6e\x75\x2c\x20\x63\x6f" b"\x6d\x70\x69\x6c\x65\x64\x20\x62\x79\x20\x67\x63\x63\x20\x28\x44" b"\x65\x62\x69\x61\x6e\x20\x31\x30\x2e\x32\x2e\x31\x2d\x36\x29\x20" b"\x31\x30\x2e\x32\x2e\x31\x20\x32\x30\x32\x31\x30\x31\x31\x30\x2c" b"\x20\x36\x34\x2d\x62\x69\x74\x43\x00\x00\x00\x0d\x53\x45\x4c\x45" b"\x43\x54\x20\x31\x00\x5a\x00\x00\x00\x05\x49" ) = row data response command_response = PostgresBackend(command_response_packet) assert len(command_response.contents) == 4 assert isinstance(command_response.contents[0], RowDescription) rd = command_response.contents[0] assert rd.len == 32 assert rd.numfields == 1 assert rd.cols[0].col == b"version" assert rd.cols[0].tableoid == 0 assert rd.cols[0].colno == 0 assert rd.cols[0].typeoid == 25 assert rd.cols[0].typelen == -1 assert rd.cols[0].format == 0 assert rd.cols[0].typemod == -1 assert isinstance(command_response.contents[1], DataRow) assert command_response.contents[1].len == 133 assert command_response.contents[1].numfields == 1 assert len(command_response.contents[1].data) == 1 assert isinstance(command_response.contents[1].data[0], SignedIntStrPair) assert command_response.contents[1].data[0].len == 123 assert ( command_response.contents[1].data[0].data == b"PostgreSQL 14.2 (Debian 14.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit" ) assert isinstance(command_response.contents[2], CommandComplete) assert isinstance(command_response.contents[3], ReadyForQuery) three_col_rd = RowDescription( b"\x54\x00\x00\x00\x55\x00\x03\x6e\x61\x6d\x65\x00\x00\x00\x00\x00" b"\x00\x00\x00\x00\x00\x19\xff\xff\xff\xff\xff\xff\x00\x00\x73\x65" b"\x74\x74\x69\x6e\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19" b"\xff\xff\xff\xff\xff\xff\x00\x00\x64\x65\x73\x63\x72\x69\x70\x74" b"\x69\x6f\x6e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\xff\xff" b"\xff\xff\xff\xff\x00\x00" ) assert three_col_rd.len == 85 assert three_col_rd.numfields == 3 assert len(three_col_rd.cols) == 3 three_col_dr = DataRow( b"\x44\x00\x00\x00\x63\x00\x03\x00\x00\x00\x17\x61\x6c\x6c\x6f\x77" b"\x5f\x73\x79\x73\x74\x65\x6d\x5f\x74\x61\x62\x6c\x65\x5f\x6d\x6f" b"\x64\x73\x00\x00\x00\x03\x6f\x66\x66\x00\x00\x00\x37\x41\x6c\x6c" b"\x6f\x77\x73\x20\x6d\x6f\x64\x69\x66\x69\x63\x61\x74\x69\x6f\x6e" b"\x73\x20\x6f\x66\x20\x74\x68\x65\x20\x73\x74\x72\x75\x63\x74\x75" b"\x72\x65\x20\x6f\x66\x20\x73\x79\x73\x74\x65\x6d\x20\x74\x61\x62" b"\x6c\x65\x73\x2e" ) assert three_col_dr.numfields == 3 assert len(three_col_dr.data) == 3 assert three_col_dr.data[0].len == 23 assert three_col_dr.data[0].data == b"allow_system_table_mods" assert three_col_dr.data[1].len == 3 assert three_col_dr.data[1].data == b"off" assert three_col_dr.data[2].len == 55 assert ( three_col_dr.data[2].data == b"Allows modifications of the structure of system tables." ) = errors error_response = ErrorResponse( b"\x45\x00\x00\x00\x69\x53\x45\x52\x52\x4f\x52\x00\x56\x45\x52\x52" b"\x4f\x52\x00\x43\x34\x32\x50\x30\x31\x00\x4d\x72\x65\x6c\x61\x74" b"\x69\x6f\x6e\x20\x22\x66\x6f\x6f\x62\x61\x72\x22\x20\x64\x6f\x65" b"\x73\x20\x6e\x6f\x74\x20\x65\x78\x69\x73\x74\x00\x50\x31\x35\x00" b"\x46\x70\x61\x72\x73\x65\x5f\x72\x65\x6c\x61\x74\x69\x6f\x6e\x2e" b"\x63\x00\x4c\x31\x33\x38\x31\x00\x52\x70\x61\x72\x73\x65\x72\x4f" b"\x70\x65\x6e\x54\x61\x62\x6c\x65\x00\x00" ) assert len(error_response.error_fields) == 8 assert error_response.error_fields[0] == ("Severity", b"ERROR") assert error_response.error_fields[7] == ("Routine", b"parserOpenTable") = copy data response and request copyin_response = CopyInResponse(b"\x47\x00\x00\x00\x0f\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00") assert copyin_response.len == 15 assert copyin_response.format == 0 # Text assert len(copyin_response.cols) == 4 assert copyin_response.ncols == 4 assert copyin_response.cols[0] == 0 # Text assert copyin_response.cols[1] == 0 # Text assert copyin_response.cols[2] == 0 # Text assert copyin_response.cols[3] == 0 # Text copydata_in = PostgresFrontend(b"\x64\x00\x00\x00\x10\x31\x2c\x42\x6f\x62\x2c\x32\x33\x2c\x31\x0d" \ b"\x0a\x64\x00\x00\x00\x12\x32\x2c\x53\x61\x6c\x6c\x79\x2c\x34\x33" \ b"\x2c\x32\x0d\x0a\x64\x00\x00\x00\x14\x33\x2c\x50\x61\x72\x64\x65" \ b"\x65\x70\x2c\x35\x34\x2c\x33\x0d\x0a\x64\x00\x00\x00\x0f\x34\x2c" \ b"\x53\x75\x2c\x33\x32\x2c\x34\x0d\x0a\x64\x00\x00\x00\x0f\x35\x2c" \ b"\x58\x69\x2c\x34\x33\x2c\x35\x0d\x0a\x64\x00\x00\x00\x0e\x36\x2c" \ b"\x50\x69\x70\x2c\x36\x36\x2c\x36\x63\x00\x00\x00\x04" ) copyout_response = CopyOutResponse(b"\x48\x00\x00\x00\x0f\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00") assert copyout_response.len == 15 # Combined message copydata_out = PostgresBackend(b"\x48\x00\x00\x00\x0f\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00" \ b"\x64\x00\x00\x00\x0f\x31\x09\x42\x6f\x62\x09\x32\x33\x09\x31\x0a" \ b"\x64\x00\x00\x00\x11\x32\x09\x53\x61\x6c\x6c\x79\x09\x34\x33\x09" \ b"\x32\x0a\x64\x00\x00\x00\x13\x33\x09\x50\x61\x72\x64\x65\x65\x70" \ b"\x09\x35\x34\x09\x33\x0a\x64\x00\x00\x00\x0e\x34\x09\x53\x75\x09" \ b"\x33\x32\x09\x34\x0a\x64\x00\x00\x00\x0e\x35\x09\x58\x69\x09\x34" \ b"\x33\x09\x35\x0a\x64\x00\x00\x00\x0f\x36\x09\x50\x69\x70\x09\x36" \ b"\x36\x09\x36\x0a\x63\x00\x00\x00\x04\x43\x00\x00\x00\x0b\x43\x4f" \ b"\x50\x59\x20\x36\x00\x5a\x00\x00\x00\x05\x49") assert len(copydata_out.contents) == 10 assert copydata_out.contents[0].len == 15 assert isinstance(copydata_out.contents[0], CopyOutResponse) assert isinstance(copydata_out.contents[1], CopyData) assert copydata_out.contents[1].len == 15 assert copydata_out.contents[1].data == b'1\tBob\t23\t1\n' assert isinstance(copydata_out.contents[2], CopyData) assert copydata_out.contents[2].data == b'2\tSally\t43\t2\n' assert isinstance(copydata_out.contents[3], CopyData) assert copydata_out.contents[3].data == b'3\tPardeep\t54\t3\n' assert isinstance(copydata_out.contents[4], CopyData) assert copydata_out.contents[4].data == b'4\tSu\t32\t4\n' assert isinstance(copydata_out.contents[5], CopyData) assert copydata_out.contents[5].data == b'5\tXi\t43\t5\n' assert isinstance(copydata_out.contents[6], CopyData) assert copydata_out.contents[6].data == b'6\tPip\t66\t6\n' assert isinstance(copydata_out.contents[7], CopyDone) assert isinstance(copydata_out.contents[8], CommandComplete) assert isinstance(copydata_out.contents[9], ReadyForQuery) = Check example request packet request = PostgresFrontend( b"\x50\x00\x00\x00\x64\x00\x53\x45\x4c\x45\x43\x54\x20\x44\x5f\x4e" b"\x45\x58\x54\x5f\x4f\x5f\x49\x44\x2c\x20\x44\x5f\x54\x41\x58\x20" b"\x20\x20\x46\x52\x4f\x4d\x20\x64\x69\x73\x74\x72\x69\x63\x74\x20" b"\x57\x48\x45\x52\x45\x20\x44\x5f\x57\x5f\x49\x44\x20\x3d\x20\x24" b"\x31\x20\x41\x4e\x44\x20\x44\x5f\x49\x44\x20\x3d\x20\x24\x32\x20" b"\x46\x4f\x52\x20\x55\x50\x44\x41\x54\x45\x00\x00\x02\x00\x00\x00" b"\x17\x00\x00\x00\x17\x42\x00\x00\x00\x20\x00\x00\x00\x02\x00\x01" b"\x00\x01\x00\x02\x00\x00\x00\x04\x00\x00\x00\x14\x00\x00\x00\x04" b"\x00\x00\x00\x0a\x00\x00\x44\x00\x00\x00\x06\x50\x00\x45\x00\x00" b"\x00\x09\x00\x00\x00\x00\x00\x53\x00\x00\x00\x04" ) assert len(request.contents) == 5 assert isinstance(request.contents[0], Parse) assert isinstance(request.contents[1], Bind) assert isinstance(request.contents[2], Describe) assert isinstance(request.contents[3], Execute) assert isinstance(request.contents[4], Sync) = Check parse decoding parse_msg = request.contents[0] assert parse_msg.len == 100 assert parse_msg.destination == b"" assert parse_msg.query == b"SELECT D_NEXT_O_ID, D_TAX FROM district WHERE D_W_ID = $1 AND D_ID = $2 FOR UPDATE" assert parse_msg.num_param_dtypes == 2 assert parse_msg.params[0] == 23 assert parse_msg.params[1] == 23 = Check bind decoding bind_msg = request.contents[1] assert bind_msg.len == 32 assert bind_msg.destination == b"" assert bind_msg.statement == b"" assert bind_msg.codes_count == 2 assert bind_msg.codes[0] == 1 assert bind_msg.codes[1] == 1 assert bind_msg.values_count == 2 assert bind_msg.values[0].len == 4 assert bind_msg.values[0].data == b"\x00\x00\x00\x14" assert bind_msg.values[1].len == 4 assert bind_msg.values[1].data == b"\x00\x00\x00\x0a" assert bind_msg.results_count == 0 = Check describe decoding describe_msg = request.contents[2] assert describe_msg.len == 6 assert describe_msg.close_type == b"P" assert describe_msg.statement == b"" = Check execute decoding exec_msg = request.contents[3] assert exec_msg.len == 9 assert exec_msg.portal == b"" assert exec_msg.rows == 0 = Check sync decoding sync_msg = request.contents[4] assert sync_msg.len == 4