• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import cPickle
2import cStringIO
3import io
4import functools
5import unittest
6from test.pickletester import (AbstractUnpickleTests,
7                               AbstractPickleTests,
8                               AbstractPickleModuleTests,
9                               AbstractPicklerUnpicklerObjectTests,
10                               BigmemPickleTests)
11from test import test_support
12
13class cStringIOMixin:
14    output = input = cStringIO.StringIO
15
16    def close(self, f):
17        pass
18
19class BytesIOMixin:
20    output = input = io.BytesIO
21
22    def close(self, f):
23        pass
24
25class FileIOMixin:
26
27    def output(self):
28        return open(test_support.TESTFN, 'wb+')
29
30    def input(self, data):
31        f = open(test_support.TESTFN, 'wb+')
32        try:
33            f.write(data)
34            f.seek(0)
35            return f
36        except:
37            f.close()
38            raise
39
40    def close(self, f):
41        f.close()
42        test_support.unlink(test_support.TESTFN)
43
44
45class cPickleTests(AbstractUnpickleTests, AbstractPickleTests,
46                   AbstractPickleModuleTests):
47
48    def setUp(self):
49        self.dumps = cPickle.dumps
50        self.loads = cPickle.loads
51
52    error = cPickle.BadPickleGet
53    module = cPickle
54    bad_stack_errors = (cPickle.UnpicklingError,)
55    bad_mark_errors = (EOFError,)
56    truncated_errors = (cPickle.UnpicklingError, EOFError,
57                        AttributeError, ValueError)
58
59class cPickleUnpicklerTests(AbstractUnpickleTests):
60
61    def loads(self, buf):
62        f = self.input(buf)
63        try:
64            p = cPickle.Unpickler(f)
65            return p.load()
66        finally:
67            self.close(f)
68
69    error = cPickle.BadPickleGet
70    bad_stack_errors = (cPickle.UnpicklingError,)
71    bad_mark_errors = (EOFError,)
72    truncated_errors = (cPickle.UnpicklingError, EOFError,
73                        AttributeError, ValueError)
74
75class cStringIOCUnpicklerTests(cStringIOMixin, cPickleUnpicklerTests):
76    pass
77
78class BytesIOCUnpicklerTests(BytesIOMixin, cPickleUnpicklerTests):
79    pass
80
81class FileIOCUnpicklerTests(FileIOMixin, cPickleUnpicklerTests):
82    pass
83
84
85class cPicklePicklerTests(AbstractPickleTests):
86
87    def dumps(self, arg, proto=0):
88        f = self.output()
89        try:
90            p = cPickle.Pickler(f, proto)
91            p.dump(arg)
92            f.seek(0)
93            return f.read()
94        finally:
95            self.close(f)
96
97    def loads(self, buf):
98        f = self.input(buf)
99        try:
100            p = cPickle.Unpickler(f)
101            return p.load()
102        finally:
103            self.close(f)
104
105class cStringIOCPicklerTests(cStringIOMixin, cPicklePicklerTests):
106    pass
107
108class BytesIOCPicklerTests(BytesIOMixin, cPicklePicklerTests):
109    pass
110
111class FileIOCPicklerTests(FileIOMixin, cPicklePicklerTests):
112    pass
113
114
115class cPickleListPicklerTests(AbstractPickleTests):
116
117    def dumps(self, arg, proto=0):
118        p = cPickle.Pickler(proto)
119        p.dump(arg)
120        return p.getvalue()
121
122    def loads(self, *args):
123        f = self.input(args[0])
124        try:
125            p = cPickle.Unpickler(f)
126            return p.load()
127        finally:
128            self.close(f)
129
130    error = cPickle.BadPickleGet
131
132class cStringIOCPicklerListTests(cStringIOMixin, cPickleListPicklerTests):
133    pass
134
135class BytesIOCPicklerListTests(BytesIOMixin, cPickleListPicklerTests):
136    pass
137
138class FileIOCPicklerListTests(FileIOMixin, cPickleListPicklerTests):
139    pass
140
141
142class cPickleFastPicklerTests(AbstractPickleTests):
143
144    def dumps(self, arg, proto=0):
145        f = self.output()
146        try:
147            p = cPickle.Pickler(f, proto)
148            p.fast = 1
149            p.dump(arg)
150            f.seek(0)
151            return f.read()
152        finally:
153            self.close(f)
154
155    def loads(self, *args):
156        f = self.input(args[0])
157        try:
158            p = cPickle.Unpickler(f)
159            return p.load()
160        finally:
161            self.close(f)
162
163    def test_nonrecursive_deep(self):
164        # If it's not cyclic, it should pickle OK even if the nesting
165        # depth exceeds PY_CPICKLE_FAST_LIMIT.  That happens to be
166        # 50 today.  Jack Jansen reported stack overflow on Mac OS 9
167        # at 64.
168        a = []
169        for i in range(60):
170            a = [a]
171        b = self.loads(self.dumps(a))
172        self.assertEqual(a, b)
173
174for name in dir(AbstractPickleTests):
175    if name.startswith('test_recursive_'):
176        func = getattr(AbstractPickleTests, name)
177        if '_subclass' in name and '_and_inst' not in name:
178            assert_args = RuntimeError, 'maximum recursion depth exceeded'
179        else:
180            assert_args = ValueError, "can't pickle cyclic objects"
181        def wrapper(self, func=func, assert_args=assert_args):
182            with self.assertRaisesRegexp(*assert_args):
183                func(self)
184        functools.update_wrapper(wrapper, func)
185        setattr(cPickleFastPicklerTests, name, wrapper)
186
187class cStringIOCPicklerFastTests(cStringIOMixin, cPickleFastPicklerTests):
188    pass
189
190class BytesIOCPicklerFastTests(BytesIOMixin, cPickleFastPicklerTests):
191    pass
192
193class FileIOCPicklerFastTests(FileIOMixin, cPickleFastPicklerTests):
194    pass
195
196
197class cPicklePicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
198
199    pickler_class = cPickle.Pickler
200    unpickler_class = cPickle.Unpickler
201
202class cPickleBigmemPickleTests(BigmemPickleTests):
203
204    def dumps(self, arg, proto=0, fast=0):
205        # Ignore fast
206        return cPickle.dumps(arg, proto)
207
208    def loads(self, buf):
209        # Ignore fast
210        return cPickle.loads(buf)
211
212
213class Node(object):
214    pass
215
216class cPickleDeepRecursive(unittest.TestCase):
217    def test_issue2702(self):
218        # This should raise a RecursionLimit but in some
219        # platforms (FreeBSD, win32) sometimes raises KeyError instead,
220        # or just silently terminates the interpreter (=crashes).
221        nodes = [Node() for i in range(500)]
222        for n in nodes:
223            n.connections = list(nodes)
224            n.connections.remove(n)
225        self.assertRaises((AttributeError, RuntimeError), cPickle.dumps, n)
226
227    def test_issue3179(self):
228        # Safe test, because I broke this case when fixing the
229        # behaviour for the previous test.
230        res=[]
231        for x in range(1,2000):
232            res.append(dict(doc=x, similar=[]))
233        cPickle.dumps(res)
234
235
236def test_main():
237    test_support.run_unittest(
238        cPickleTests,
239        cStringIOCUnpicklerTests,
240        BytesIOCUnpicklerTests,
241        FileIOCUnpicklerTests,
242        cStringIOCPicklerTests,
243        BytesIOCPicklerTests,
244        FileIOCPicklerTests,
245        cStringIOCPicklerListTests,
246        BytesIOCPicklerListTests,
247        FileIOCPicklerListTests,
248        cStringIOCPicklerFastTests,
249        BytesIOCPicklerFastTests,
250        FileIOCPicklerFastTests,
251        cPickleDeepRecursive,
252        cPicklePicklerUnpicklerObjectTests,
253        cPickleBigmemPickleTests,
254    )
255
256if __name__ == "__main__":
257    test_main()
258