# Protocol Buffers - Google's data interchange format # Copyright 2008 Google Inc. All rights reserved. # # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file or at # https://developers.google.com/open-source/licenses/bsd """Tests for google.protobuf.message_factory.""" __author__ = 'matthewtoia@google.com (Matt Toia)' import unittest import gc from google.protobuf import descriptor_pb2 from google.protobuf.internal import api_implementation from google.protobuf.internal import factory_test1_pb2 from google.protobuf.internal import factory_test2_pb2 from google.protobuf.internal import testing_refleaks from google.protobuf import descriptor_database from google.protobuf import descriptor_pool from google.protobuf import message_factory from google.protobuf import descriptor @testing_refleaks.TestCase class MessageFactoryTest(unittest.TestCase): def setUp(self): self.factory_test1_fd = descriptor_pb2.FileDescriptorProto.FromString( factory_test1_pb2.DESCRIPTOR.serialized_pb) self.factory_test2_fd = descriptor_pb2.FileDescriptorProto.FromString( factory_test2_pb2.DESCRIPTOR.serialized_pb) def _ExerciseDynamicClass(self, cls): msg = cls() msg.mandatory = 42 msg.nested_factory_2_enum = 0 msg.nested_factory_2_message.value = 'nested message value' msg.factory_1_message.factory_1_enum = 1 msg.factory_1_message.nested_factory_1_enum = 0 msg.factory_1_message.nested_factory_1_message.value = ( 'nested message value') msg.factory_1_message.scalar_value = 22 msg.factory_1_message.list_value.extend([u'one', u'two', u'three']) msg.factory_1_message.list_value.append(u'four') msg.factory_1_enum = 1 msg.nested_factory_1_enum = 0 msg.nested_factory_1_message.value = 'nested message value' msg.circular_message.mandatory = 1 msg.circular_message.circular_message.mandatory = 2 msg.circular_message.scalar_value = 'one deep' msg.scalar_value = 'zero deep' msg.list_value.extend([u'four', u'three', u'two']) msg.list_value.append(u'one') msg.grouped.add() msg.grouped[0].part_1 = 'hello' msg.grouped[0].part_2 = 'world' msg.grouped.add(part_1='testing', part_2='123') msg.loop.loop.mandatory = 2 msg.loop.loop.loop.loop.mandatory = 4 serialized = msg.SerializeToString() converted = factory_test2_pb2.Factory2Message.FromString(serialized) reserialized = converted.SerializeToString() self.assertEqual(serialized, reserialized) result = cls.FromString(reserialized) self.assertEqual(msg, result) def testGetMessageClass(self): db = descriptor_database.DescriptorDatabase() pool = descriptor_pool.DescriptorPool(db) db.Add(self.factory_test1_fd) db.Add(self.factory_test2_fd) cls = message_factory.GetMessageClass(pool.FindMessageTypeByName( 'google.protobuf.python.internal.Factory2Message')) self.assertFalse(cls is factory_test2_pb2.Factory2Message) self._ExerciseDynamicClass(cls) cls2 = message_factory.GetMessageClass(pool.FindMessageTypeByName( 'google.protobuf.python.internal.Factory2Message')) self.assertTrue(cls is cls2) def testGetExistingPrototype(self): # Get Existing Prototype should not create a new class. cls = message_factory.GetMessageClass( descriptor=factory_test2_pb2.Factory2Message.DESCRIPTOR) msg = factory_test2_pb2.Factory2Message() self.assertIsInstance(msg, cls) self.assertIsInstance(msg.factory_1_message, factory_test1_pb2.Factory1Message) def testGetMessages(self): # performed twice because multiple calls with the same input must be allowed for _ in range(2): # GetMessage should work regardless of the order the FileDescriptorProto # are provided. In particular, the function should succeed when the files # are not in the topological order of dependencies. # Assuming factory_test2_fd depends on factory_test1_fd. self.assertIn(self.factory_test1_fd.name, self.factory_test2_fd.dependency) # Get messages should work when a file comes before its dependencies: # factory_test2_fd comes before factory_test1_fd. messages = message_factory.GetMessages([self.factory_test2_fd, self.factory_test1_fd], descriptor_pool.Default()) self.assertTrue( set(['google.protobuf.python.internal.Factory2Message', 'google.protobuf.python.internal.Factory1Message'], ).issubset(set(messages.keys()))) self._ExerciseDynamicClass( messages['google.protobuf.python.internal.Factory2Message']) factory_msg1 = messages['google.protobuf.python.internal.Factory1Message'] self.assertTrue(set( ['google.protobuf.python.internal.Factory2Message.one_more_field', 'google.protobuf.python.internal.another_field'],).issubset(set( ext.full_name for ext in factory_msg1.DESCRIPTOR.file.pool.FindAllExtensions( factory_msg1.DESCRIPTOR)))) msg1 = messages['google.protobuf.python.internal.Factory1Message']() ext1 = msg1.Extensions._FindExtensionByName( 'google.protobuf.python.internal.Factory2Message.one_more_field') ext2 = msg1.Extensions._FindExtensionByName( 'google.protobuf.python.internal.another_field') self.assertEqual(0, len(msg1.Extensions)) msg1.Extensions[ext1] = 'test1' msg1.Extensions[ext2] = 'test2' self.assertEqual('test1', msg1.Extensions[ext1]) self.assertEqual('test2', msg1.Extensions[ext2]) self.assertEqual(None, msg1.Extensions._FindExtensionByNumber(12321)) self.assertEqual(2, len(msg1.Extensions)) if api_implementation.Type() == 'python': self.assertEqual(None, msg1.Extensions._FindExtensionByName(0)) self.assertEqual(None, msg1.Extensions._FindExtensionByNumber('')) else: self.assertRaises(TypeError, msg1.Extensions._FindExtensionByName, 0) self.assertRaises(TypeError, msg1.Extensions._FindExtensionByNumber, '') def testDuplicateExtensionNumber(self): pool = descriptor_pool.DescriptorPool() # Add Container message. f = descriptor_pb2.FileDescriptorProto( name='google/protobuf/internal/container.proto', package='google.protobuf.python.internal') f.message_type.add(name='Container').extension_range.add(start=1, end=10) pool.Add(f) msgs = message_factory.GetMessageClassesForFiles([f.name], pool) self.assertIn('google.protobuf.python.internal.Container', msgs) # Extend container. f = descriptor_pb2.FileDescriptorProto( name='google/protobuf/internal/extension.proto', package='google.protobuf.python.internal', dependency=['google/protobuf/internal/container.proto']) msg = f.message_type.add(name='Extension') msg.extension.add( name='extension_field', number=2, label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, type_name='Extension', extendee='Container', ) pool.Add(f) msgs = message_factory.GetMessageClassesForFiles([f.name], pool) self.assertIn('google.protobuf.python.internal.Extension', msgs) # Add Duplicate extending the same field number. f = descriptor_pb2.FileDescriptorProto( name='google/protobuf/internal/duplicate.proto', package='google.protobuf.python.internal', dependency=['google/protobuf/internal/container.proto']) msg = f.message_type.add(name='Duplicate') msg.extension.add( name='extension_field', number=2, label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, type_name='Duplicate', extendee='Container', ) pool.Add(f) with self.assertRaises(Exception) as cm: message_factory.GetMessageClassesForFiles([f.name], pool) self.assertIn(str(cm.exception), ['Extensions ' '"google.protobuf.python.internal.Duplicate.extension_field" and' ' "google.protobuf.python.internal.Extension.extension_field"' ' both try to extend message type' ' "google.protobuf.python.internal.Container"' ' with field number 2.', 'Double registration of Extensions']) def testExtensionValueInDifferentFile(self): # Add Container message. f1 = descriptor_pb2.FileDescriptorProto( name='google/protobuf/internal/container.proto', package='google.protobuf.python.internal') f1.message_type.add(name='Container').extension_range.add(start=1, end=10) # Add ValueType message. f2 = descriptor_pb2.FileDescriptorProto( name='google/protobuf/internal/value_type.proto', package='google.protobuf.python.internal') f2.message_type.add(name='ValueType').field.add( name='setting', number=1, label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, type=descriptor_pb2.FieldDescriptorProto.TYPE_INT32, default_value='123') # Extend container with field of ValueType. f3 = descriptor_pb2.FileDescriptorProto( name='google/protobuf/internal/extension.proto', package='google.protobuf.python.internal', dependency=[f1.name, f2.name]) f3.extension.add( name='top_level_extension_field', number=2, label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, type_name='ValueType', extendee='Container', ) f3.message_type.add(name='Extension').extension.add( name='nested_extension_field', number=3, label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, type_name='ValueType', extendee='Container', ) pool = descriptor_pool.Default() try: pool.Add(f1) pool.Add(f2) pool.Add(f3) except: pass msgs = message_factory.GetMessageClassesForFiles( [f1.name, f3.name], pool) # Deliberately not f2. msg = msgs['google.protobuf.python.internal.Container'] desc = msgs['google.protobuf.python.internal.Extension'].DESCRIPTOR ext1 = desc.file.extensions_by_name['top_level_extension_field'] ext2 = desc.extensions_by_name['nested_extension_field'] m = msg() m.Extensions[ext1].setting = 234 m.Extensions[ext2].setting = 345 serialized = m.SerializeToString() f1.name='google/protobuf/internal/another/container.proto' f1.package='google.protobuf.python.internal.another' f2.name='google/protobuf/internal/another/value_type.proto' f2.package='google.protobuf.python.internal.another' f3.name='google/protobuf/internal/another/extension.proto' f3.package='google.protobuf.python.internal.another' f3.ClearField('dependency') f3.dependency.extend([f1.name, f2.name]) try: pool.Add(f1) pool.Add(f2) pool.Add(f3) except: pass msgs = message_factory.GetMessageClassesForFiles( [f1.name, f3.name], pool) # Deliberately not f2. msg = msgs['google.protobuf.python.internal.another.Container'] desc = msgs['google.protobuf.python.internal.another.Extension'].DESCRIPTOR ext1 = desc.file.extensions_by_name['top_level_extension_field'] ext2 = desc.extensions_by_name['nested_extension_field'] m = msg.FromString(serialized) self.assertEqual(2, len(m.ListFields())) self.assertEqual(234, m.Extensions[ext1].setting) self.assertEqual(345, m.Extensions[ext2].setting) def testDescriptorKeepConcreteClass(self): def loadFile(): f= descriptor_pb2.FileDescriptorProto( name='google/protobuf/internal/meta_class.proto', package='google.protobuf.python.internal') msg_proto = f.message_type.add(name='Empty') msg_proto.nested_type.add(name='Nested') msg_proto.field.add(name='nested_field', number=1, label=descriptor.FieldDescriptor.LABEL_REPEATED, type=descriptor.FieldDescriptor.TYPE_MESSAGE, type_name='Nested') return message_factory.GetMessages([f]) messages = loadFile() for des, meta_class in messages.items(): message = meta_class() nested_des = message.DESCRIPTOR.nested_types_by_name['Nested'] nested_msg = nested_des._concrete_class() def testOndemandCreateMetaClass(self): def loadFile(): f = descriptor_pb2.FileDescriptorProto.FromString( factory_test1_pb2.DESCRIPTOR.serialized_pb) return message_factory.GetMessages([f]) messages = loadFile() data = factory_test1_pb2.Factory1Message() data.map_field['hello'] = 'welcome' # Force GC to collect. UPB python will clean up the map entry class. # cpp extension and pure python will still keep the map entry class. gc.collect() message = messages['google.protobuf.python.internal.Factory1Message']() message.ParseFromString(data.SerializeToString()) value = message.map_field values = [ # The entry class will be created on demand in upb python. value.GetEntryClass()(key=k, value=value[k]) for k in sorted(value) ] gc.collect() self.assertEqual(1, len(values)) self.assertEqual('hello', values[0].key) self.assertEqual('welcome', values[0].value) if __name__ == '__main__': unittest.main()