1# -*- coding: utf-8 -*- 2# Protocol Buffers - Google's data interchange format 3# Copyright 2008 Google Inc. All rights reserved. 4# 5# Use of this source code is governed by a BSD-style 6# license that can be found in the LICENSE file or at 7# https://developers.google.com/open-source/licenses/bsd 8 9"""Test decoder.""" 10 11import io 12import unittest 13 14from google.protobuf.internal import decoder 15from google.protobuf.internal import testing_refleaks 16 17 18_INPUT_BYTES = b'\x84r\x12' 19_EXPECTED = (14596, 18) 20 21 22@testing_refleaks.TestCase 23class DecoderTest(unittest.TestCase): 24 25 def test_decode_varint_bytes(self): 26 (size, pos) = decoder._DecodeVarint(_INPUT_BYTES, 0) 27 self.assertEqual(size, _EXPECTED[0]) 28 self.assertEqual(pos, 2) 29 30 (size, pos) = decoder._DecodeVarint(_INPUT_BYTES, 2) 31 self.assertEqual(size, _EXPECTED[1]) 32 self.assertEqual(pos, 3) 33 34 def test_decode_varint_bytes_empty(self): 35 with self.assertRaises(IndexError) as context: 36 (size, pos) = decoder._DecodeVarint(b'', 0) 37 self.assertIn('index out of range', str(context.exception)) 38 39 def test_decode_varint_bytesio(self): 40 index = 0 41 input_io = io.BytesIO(_INPUT_BYTES) 42 while True: 43 size = decoder._DecodeVarint(input_io) 44 if size is None: 45 break 46 self.assertEqual(size, _EXPECTED[index]) 47 index += 1 48 self.assertEqual(index, len(_EXPECTED)) 49 50 def test_decode_varint_bytesio_empty(self): 51 input_io = io.BytesIO(b'') 52 size = decoder._DecodeVarint(input_io) 53 self.assertEqual(size, None) 54 55 def test_decode_unknown_group_field_too_many_levels(self): 56 data = memoryview(b'\023' * 5_000_000) 57 self.assertRaisesRegex( 58 message.DecodeError, 59 'Error parsing message', 60 decoder._DecodeUnknownField, 61 data, 62 1, 63 len(data), 64 1, 65 wire_format.WIRETYPE_START_GROUP, 66 ) 67 68 69 70if __name__ == '__main__': 71 unittest.main() 72