• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import datetime
3import sys
4import time
5import unittest
6import xmlrpclib
7import SimpleXMLRPCServer
8import mimetools
9import httplib
10import socket
11import StringIO
12import os
13import re
14from test import test_support
15
16try:
17    import threading
18except ImportError:
19    threading = None
20
21try:
22    import gzip
23except ImportError:
24    gzip = None
25
26alist = [{'astring': 'foo@bar.baz.spam',
27          'afloat': 7283.43,
28          'anint': 2**20,
29          'ashortlong': 2L,
30          'anotherlist': ['.zyx.41'],
31          'abase64': xmlrpclib.Binary("my dog has fleas"),
32          'boolean': xmlrpclib.False,
33          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
34          'datetime2': xmlrpclib.DateTime(
35                        (2005, 02, 10, 11, 41, 23, 0, 1, -1)),
36          'datetime3': xmlrpclib.DateTime(
37                        datetime.datetime(2005, 02, 10, 11, 41, 23)),
38          }]
39
40if test_support.have_unicode:
41    alist[0].update({
42          'unicode': test_support.u(r'\u4000\u6000\u8000'),
43          test_support.u(r'ukey\u4000'): 'regular value',
44    })
45
46class XMLRPCTestCase(unittest.TestCase):
47
48    def test_dump_load(self):
49        self.assertEqual(alist,
50                         xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0])
51
52    def test_dump_bare_datetime(self):
53        # This checks that an unwrapped datetime.date object can be handled
54        # by the marshalling code.  This can't be done via test_dump_load()
55        # since with use_datetime set to 1 the unmarshaller would create
56        # datetime objects for the 'datetime[123]' keys as well
57        dt = datetime.datetime(2005, 02, 10, 11, 41, 23)
58        s = xmlrpclib.dumps((dt,))
59        (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
60        self.assertEqual(newdt, dt)
61        self.assertEqual(m, None)
62
63        (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
64        self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
65
66    def test_datetime_before_1900(self):
67        # same as before but with a date before 1900
68        dt = datetime.datetime(1, 02, 10, 11, 41, 23)
69        s = xmlrpclib.dumps((dt,))
70        (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
71        self.assertEqual(newdt, dt)
72        self.assertEqual(m, None)
73
74        (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
75        self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
76
77    def test_cmp_datetime_DateTime(self):
78        now = datetime.datetime.now()
79        dt = xmlrpclib.DateTime(now.timetuple())
80        self.assertTrue(dt == now)
81        self.assertTrue(now == dt)
82        then = now + datetime.timedelta(seconds=4)
83        self.assertTrue(then >= dt)
84        self.assertTrue(dt < then)
85
86    def test_bug_1164912 (self):
87        d = xmlrpclib.DateTime()
88        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
89                                            methodresponse=True))
90        self.assertIsInstance(new_d.value, str)
91
92        # Check that the output of dumps() is still an 8-bit string
93        s = xmlrpclib.dumps((new_d,), methodresponse=True)
94        self.assertIsInstance(s, str)
95
96    def test_newstyle_class(self):
97        class T(object):
98            pass
99        t = T()
100        t.x = 100
101        t.y = "Hello"
102        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
103        self.assertEqual(t2, t.__dict__)
104
105    def test_dump_big_long(self):
106        self.assertRaises(OverflowError, xmlrpclib.dumps, (2L**99,))
107
108    def test_dump_bad_dict(self):
109        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
110
111    def test_dump_recursive_seq(self):
112        l = [1,2,3]
113        t = [3,4,5,l]
114        l.append(t)
115        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
116
117    def test_dump_recursive_dict(self):
118        d = {'1':1, '2':1}
119        t = {'3':3, 'd':d}
120        d['t'] = t
121        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
122
123    def test_dump_big_int(self):
124        if sys.maxint > 2L**31-1:
125            self.assertRaises(OverflowError, xmlrpclib.dumps,
126                              (int(2L**34),))
127
128        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
129        self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,))
130        self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,))
131
132        def dummy_write(s):
133            pass
134
135        m = xmlrpclib.Marshaller()
136        m.dump_int(xmlrpclib.MAXINT, dummy_write)
137        m.dump_int(xmlrpclib.MININT, dummy_write)
138        self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write)
139        self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write)
140
141
142    def test_dump_none(self):
143        value = alist + [None]
144        arg1 = (alist + [None],)
145        strg = xmlrpclib.dumps(arg1, allow_none=True)
146        self.assertEqual(value,
147                         xmlrpclib.loads(strg)[0][0])
148        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
149
150    @test_support.requires_unicode
151    def test_dump_encoding(self):
152        value = {test_support.u(r'key\u20ac\xa4'):
153                 test_support.u(r'value\u20ac\xa4')}
154        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
155        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
156        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
157
158        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
159                               methodresponse=True)
160        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
161
162        methodname = test_support.u(r'method\u20ac\xa4')
163        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
164                               methodname=methodname)
165        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
166        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
167
168    @test_support.requires_unicode
169    def test_default_encoding_issues(self):
170        # SF bug #1115989: wrong decoding in '_stringify'
171        utf8 = """<?xml version='1.0' encoding='iso-8859-1'?>
172                  <params>
173                    <param><value>
174                      <string>abc \x95</string>
175                      </value></param>
176                    <param><value>
177                      <struct>
178                        <member>
179                          <name>def \x96</name>
180                          <value><string>ghi \x97</string></value>
181                          </member>
182                        </struct>
183                      </value></param>
184                  </params>
185                  """
186
187        # sys.setdefaultencoding() normally doesn't exist after site.py is
188        # loaded.  Import a temporary fresh copy to get access to it
189        # but then restore the original copy to avoid messing with
190        # other potentially modified sys module attributes
191        old_encoding = sys.getdefaultencoding()
192        with test_support.CleanImport('sys'):
193            import sys as temp_sys
194            temp_sys.setdefaultencoding("iso-8859-1")
195            try:
196                (s, d), m = xmlrpclib.loads(utf8)
197            finally:
198                temp_sys.setdefaultencoding(old_encoding)
199
200        items = d.items()
201        if test_support.have_unicode:
202            self.assertEqual(s, u"abc \x95")
203            self.assertIsInstance(s, unicode)
204            self.assertEqual(items, [(u"def \x96", u"ghi \x97")])
205            self.assertIsInstance(items[0][0], unicode)
206            self.assertIsInstance(items[0][1], unicode)
207        else:
208            self.assertEqual(s, "abc \xc2\x95")
209            self.assertEqual(items, [("def \xc2\x96", "ghi \xc2\x97")])
210
211    def test_loads_unsupported(self):
212        ResponseError = xmlrpclib.ResponseError
213        data = '<params><param><value><spam/></value></param></params>'
214        self.assertRaises(ResponseError, xmlrpclib.loads, data)
215        data = ('<params><param><value><array>'
216                '<value><spam/></value>'
217                '</array></value></param></params>')
218        self.assertRaises(ResponseError, xmlrpclib.loads, data)
219        data = ('<params><param><value><struct>'
220                '<member><name>a</name><value><spam/></value></member>'
221                '<member><name>b</name><value><spam/></value></member>'
222                '</struct></value></param></params>')
223        self.assertRaises(ResponseError, xmlrpclib.loads, data)
224
225
226class HelperTestCase(unittest.TestCase):
227    def test_escape(self):
228        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
229        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
230        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
231
232class FaultTestCase(unittest.TestCase):
233    def test_repr(self):
234        f = xmlrpclib.Fault(42, 'Test Fault')
235        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
236        self.assertEqual(repr(f), str(f))
237
238    def test_dump_fault(self):
239        f = xmlrpclib.Fault(42, 'Test Fault')
240        s = xmlrpclib.dumps((f,))
241        (newf,), m = xmlrpclib.loads(s)
242        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
243        self.assertEqual(m, None)
244
245        s = xmlrpclib.Marshaller().dumps(f)
246        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
247
248
249class DateTimeTestCase(unittest.TestCase):
250    def test_default(self):
251        t = xmlrpclib.DateTime()
252
253    def test_time(self):
254        d = 1181399930.036952
255        t = xmlrpclib.DateTime(d)
256        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
257
258    def test_time_tuple(self):
259        d = (2007,6,9,10,38,50,5,160,0)
260        t = xmlrpclib.DateTime(d)
261        self.assertEqual(str(t), '20070609T10:38:50')
262
263    def test_time_struct(self):
264        d = time.localtime(1181399930.036952)
265        t = xmlrpclib.DateTime(d)
266        self.assertEqual(str(t),  time.strftime("%Y%m%dT%H:%M:%S", d))
267
268    def test_datetime_datetime(self):
269        d = datetime.datetime(2007,1,2,3,4,5)
270        t = xmlrpclib.DateTime(d)
271        self.assertEqual(str(t), '20070102T03:04:05')
272
273    def test_repr(self):
274        d = datetime.datetime(2007,1,2,3,4,5)
275        t = xmlrpclib.DateTime(d)
276        val ="<DateTime '20070102T03:04:05' at %x>" % id(t)
277        self.assertEqual(repr(t), val)
278
279    def test_decode(self):
280        d = ' 20070908T07:11:13  '
281        t1 = xmlrpclib.DateTime()
282        t1.decode(d)
283        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
284        self.assertEqual(t1, tref)
285
286        t2 = xmlrpclib._datetime(d)
287        self.assertEqual(t1, tref)
288
289class BinaryTestCase(unittest.TestCase):
290    def test_default(self):
291        t = xmlrpclib.Binary()
292        self.assertEqual(str(t), '')
293
294    def test_string(self):
295        d = '\x01\x02\x03abc123\xff\xfe'
296        t = xmlrpclib.Binary(d)
297        self.assertEqual(str(t), d)
298
299    def test_decode(self):
300        d = '\x01\x02\x03abc123\xff\xfe'
301        de = base64.encodestring(d)
302        t1 = xmlrpclib.Binary()
303        t1.decode(de)
304        self.assertEqual(str(t1), d)
305
306        t2 = xmlrpclib._binary(de)
307        self.assertEqual(str(t2), d)
308
309
310ADDR = PORT = URL = None
311
312# The evt is set twice.  First when the server is ready to serve.
313# Second when the server has been shutdown.  The user must clear
314# the event after it has been set the first time to catch the second set.
315def http_server(evt, numrequests, requestHandler=None, encoding=None):
316    class TestInstanceClass:
317        def div(self, x, y):
318            return x // y
319
320        def _methodHelp(self, name):
321            if name == 'div':
322                return 'This is the div function'
323
324    def my_function():
325        '''This is my function'''
326        return True
327
328    class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
329        def get_request(self):
330            # Ensure the socket is always non-blocking.  On Linux, socket
331            # attributes are not inherited like they are on *BSD and Windows.
332            s, port = self.socket.accept()
333            s.setblocking(True)
334            return s, port
335
336    if not requestHandler:
337        requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
338    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
339                          encoding=encoding,
340                          logRequests=False, bind_and_activate=False)
341    try:
342        serv.socket.settimeout(3)
343        serv.server_bind()
344        global ADDR, PORT, URL
345        ADDR, PORT = serv.socket.getsockname()
346        #connect to IP address directly.  This avoids socket.create_connection()
347        #trying to connect to "localhost" using all address families, which
348        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
349        #on AF_INET only.
350        URL = "http://%s:%d"%(ADDR, PORT)
351        serv.server_activate()
352        serv.register_introspection_functions()
353        serv.register_multicall_functions()
354        serv.register_function(pow)
355        serv.register_function(lambda x,y: x+y, 'add')
356        serv.register_function(lambda x: x, test_support.u(r't\xea\u0161t'))
357        serv.register_function(my_function)
358        serv.register_instance(TestInstanceClass())
359        evt.set()
360
361        # handle up to 'numrequests' requests
362        while numrequests > 0:
363            serv.handle_request()
364            numrequests -= 1
365
366    except socket.timeout:
367        pass
368    finally:
369        serv.socket.close()
370        PORT = None
371        evt.set()
372
373def http_multi_server(evt, numrequests, requestHandler=None):
374    class TestInstanceClass:
375        def div(self, x, y):
376            return x // y
377
378        def _methodHelp(self, name):
379            if name == 'div':
380                return 'This is the div function'
381
382    def my_function():
383        '''This is my function'''
384        return True
385
386    class MyXMLRPCServer(SimpleXMLRPCServer.MultiPathXMLRPCServer):
387        def get_request(self):
388            # Ensure the socket is always non-blocking.  On Linux, socket
389            # attributes are not inherited like they are on *BSD and Windows.
390            s, port = self.socket.accept()
391            s.setblocking(True)
392            return s, port
393
394    if not requestHandler:
395        requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
396    class MyRequestHandler(requestHandler):
397        rpc_paths = []
398
399    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
400                          logRequests=False, bind_and_activate=False)
401    serv.socket.settimeout(3)
402    serv.server_bind()
403    try:
404        global ADDR, PORT, URL
405        ADDR, PORT = serv.socket.getsockname()
406        #connect to IP address directly.  This avoids socket.create_connection()
407        #trying to connect to "localhost" using all address families, which
408        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
409        #on AF_INET only.
410        URL = "http://%s:%d"%(ADDR, PORT)
411        serv.server_activate()
412        paths = ["/foo", "/foo/bar"]
413        for path in paths:
414            d = serv.add_dispatcher(path, SimpleXMLRPCServer.SimpleXMLRPCDispatcher())
415            d.register_introspection_functions()
416            d.register_multicall_functions()
417        serv.get_dispatcher(paths[0]).register_function(pow)
418        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
419        evt.set()
420
421        # handle up to 'numrequests' requests
422        while numrequests > 0:
423            serv.handle_request()
424            numrequests -= 1
425
426    except socket.timeout:
427        pass
428    finally:
429        serv.socket.close()
430        PORT = None
431        evt.set()
432
433# This function prevents errors like:
434#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
435def is_unavailable_exception(e):
436    '''Returns True if the given ProtocolError is the product of a server-side
437       exception caused by the 'temporarily unavailable' response sometimes
438       given by operations on non-blocking sockets.'''
439
440    # sometimes we get a -1 error code and/or empty headers
441    try:
442        if e.errcode == -1 or e.headers is None:
443            return True
444        exc_mess = e.headers.get('X-exception')
445    except AttributeError:
446        # Ignore socket.errors here.
447        exc_mess = str(e)
448
449    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
450        return True
451
452    return False
453
454@unittest.skipUnless(threading, 'Threading required for this test.')
455class BaseServerTestCase(unittest.TestCase):
456    requestHandler = None
457    request_count = 1
458    threadFunc = staticmethod(http_server)
459
460    def setUp(self):
461        # enable traceback reporting
462        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
463
464        self.evt = threading.Event()
465        # start server thread to handle requests
466        serv_args = (self.evt, self.request_count, self.requestHandler)
467        threading.Thread(target=self.threadFunc, args=serv_args).start()
468
469        # wait for the server to be ready
470        self.evt.wait(10)
471        self.evt.clear()
472
473    def tearDown(self):
474        # wait on the server thread to terminate
475        self.evt.wait(10)
476
477        # disable traceback reporting
478        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
479
480# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
481# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer.  This
482# condition occurs infrequently on some platforms, frequently on others, and
483# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
484# If the server class is updated at some point in the future to handle this
485# situation more gracefully, these tests should be modified appropriately.
486
487class SimpleServerTestCase(BaseServerTestCase):
488    def test_simple1(self):
489        try:
490            p = xmlrpclib.ServerProxy(URL)
491            self.assertEqual(p.pow(6,8), 6**8)
492        except (xmlrpclib.ProtocolError, socket.error), e:
493            # ignore failures due to non-blocking socket 'unavailable' errors
494            if not is_unavailable_exception(e):
495                # protocol error; provide additional information in test output
496                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
497
498    @test_support.requires_unicode
499    def test_nonascii(self):
500        start_string = test_support.u(r'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t')
501        end_string = test_support.u(r'h\N{LATIN SMALL LETTER O WITH HORN}n')
502
503        try:
504            p = xmlrpclib.ServerProxy(URL)
505            self.assertEqual(p.add(start_string, end_string),
506                             start_string + end_string)
507        except (xmlrpclib.ProtocolError, socket.error) as e:
508            # ignore failures due to non-blocking socket unavailable errors.
509            if not is_unavailable_exception(e):
510                # protocol error; provide additional information in test output
511                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
512
513    @test_support.requires_unicode
514    def test_unicode_host(self):
515        server = xmlrpclib.ServerProxy(u"http://%s:%d/RPC2"%(ADDR, PORT))
516        self.assertEqual(server.add("a", u"\xe9"), u"a\xe9")
517
518    @test_support.requires_unicode
519    def test_client_encoding(self):
520        start_string = unichr(0x20ac)
521        end_string = unichr(0xa4)
522
523        try:
524            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
525            self.assertEqual(p.add(start_string, end_string),
526                             start_string + end_string)
527        except (xmlrpclib.ProtocolError, socket.error) as e:
528            # ignore failures due to non-blocking socket unavailable errors.
529            if not is_unavailable_exception(e):
530                # protocol error; provide additional information in test output
531                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
532
533    @test_support.requires_unicode
534    def test_nonascii_methodname(self):
535        try:
536            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
537            m = getattr(p, 't\xea\xa8t')
538            self.assertEqual(m(42), 42)
539        except (xmlrpclib.ProtocolError, socket.error) as e:
540            # ignore failures due to non-blocking socket unavailable errors.
541            if not is_unavailable_exception(e):
542                # protocol error; provide additional information in test output
543                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
544
545    # [ch] The test 404 is causing lots of false alarms.
546    def XXXtest_404(self):
547        # send POST with httplib, it should return 404 header and
548        # 'Not Found' message.
549        conn = httplib.HTTPConnection(ADDR, PORT)
550        conn.request('POST', '/this-is-not-valid')
551        response = conn.getresponse()
552        conn.close()
553
554        self.assertEqual(response.status, 404)
555        self.assertEqual(response.reason, 'Not Found')
556
557    def test_introspection1(self):
558        try:
559            p = xmlrpclib.ServerProxy(URL)
560            meth = p.system.listMethods()
561            expected_methods = set(['pow', 'div', 'my_function', 'add',
562                                    test_support.u(r't\xea\u0161t'),
563                                    'system.listMethods', 'system.methodHelp',
564                                    'system.methodSignature', 'system.multicall'])
565            self.assertEqual(set(meth), expected_methods)
566        except (xmlrpclib.ProtocolError, socket.error), e:
567            # ignore failures due to non-blocking socket 'unavailable' errors
568            if not is_unavailable_exception(e):
569                # protocol error; provide additional information in test output
570                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
571
572    def test_introspection2(self):
573        try:
574            # test _methodHelp()
575            p = xmlrpclib.ServerProxy(URL)
576            divhelp = p.system.methodHelp('div')
577            self.assertEqual(divhelp, 'This is the div function')
578        except (xmlrpclib.ProtocolError, socket.error), e:
579            # ignore failures due to non-blocking socket 'unavailable' errors
580            if not is_unavailable_exception(e):
581                # protocol error; provide additional information in test output
582                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
583
584    @unittest.skipIf(sys.flags.optimize >= 2,
585                     "Docstrings are omitted with -O2 and above")
586    def test_introspection3(self):
587        try:
588            # test native doc
589            p = xmlrpclib.ServerProxy(URL)
590            myfunction = p.system.methodHelp('my_function')
591            self.assertEqual(myfunction, 'This is my function')
592        except (xmlrpclib.ProtocolError, socket.error), e:
593            # ignore failures due to non-blocking socket 'unavailable' errors
594            if not is_unavailable_exception(e):
595                # protocol error; provide additional information in test output
596                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
597
598    def test_introspection4(self):
599        # the SimpleXMLRPCServer doesn't support signatures, but
600        # at least check that we can try making the call
601        try:
602            p = xmlrpclib.ServerProxy(URL)
603            divsig = p.system.methodSignature('div')
604            self.assertEqual(divsig, 'signatures not supported')
605        except (xmlrpclib.ProtocolError, socket.error), e:
606            # ignore failures due to non-blocking socket 'unavailable' errors
607            if not is_unavailable_exception(e):
608                # protocol error; provide additional information in test output
609                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
610
611    def test_multicall(self):
612        try:
613            p = xmlrpclib.ServerProxy(URL)
614            multicall = xmlrpclib.MultiCall(p)
615            multicall.add(2,3)
616            multicall.pow(6,8)
617            multicall.div(127,42)
618            add_result, pow_result, div_result = multicall()
619            self.assertEqual(add_result, 2+3)
620            self.assertEqual(pow_result, 6**8)
621            self.assertEqual(div_result, 127//42)
622        except (xmlrpclib.ProtocolError, socket.error), e:
623            # ignore failures due to non-blocking socket 'unavailable' errors
624            if not is_unavailable_exception(e):
625                # protocol error; provide additional information in test output
626                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
627
628    def test_non_existing_multicall(self):
629        try:
630            p = xmlrpclib.ServerProxy(URL)
631            multicall = xmlrpclib.MultiCall(p)
632            multicall.this_is_not_exists()
633            result = multicall()
634
635            # result.results contains;
636            # [{'faultCode': 1, 'faultString': '<type \'exceptions.Exception\'>:'
637            #   'method "this_is_not_exists" is not supported'>}]
638
639            self.assertEqual(result.results[0]['faultCode'], 1)
640            self.assertEqual(result.results[0]['faultString'],
641                '<type \'exceptions.Exception\'>:method "this_is_not_exists" '
642                'is not supported')
643        except (xmlrpclib.ProtocolError, socket.error), e:
644            # ignore failures due to non-blocking socket 'unavailable' errors
645            if not is_unavailable_exception(e):
646                # protocol error; provide additional information in test output
647                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
648
649    def test_dotted_attribute(self):
650        # Raises an AttributeError because private methods are not allowed.
651        self.assertRaises(AttributeError,
652                          SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')
653
654        self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
655        # Get the test to run faster by sending a request with test_simple1.
656        # This avoids waiting for the socket timeout.
657        self.test_simple1()
658
659    def test_partial_post(self):
660        # Check that a partial POST doesn't make the server loop: issue #14001.
661        conn = httplib.HTTPConnection(ADDR, PORT)
662        conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
663        conn.close()
664
665class SimpleServerEncodingTestCase(BaseServerTestCase):
666    @staticmethod
667    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
668        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
669
670    @test_support.requires_unicode
671    def test_server_encoding(self):
672        start_string = unichr(0x20ac)
673        end_string = unichr(0xa4)
674
675        try:
676            p = xmlrpclib.ServerProxy(URL)
677            self.assertEqual(p.add(start_string, end_string),
678                             start_string + end_string)
679        except (xmlrpclib.ProtocolError, socket.error) as e:
680            # ignore failures due to non-blocking socket unavailable errors.
681            if not is_unavailable_exception(e):
682                # protocol error; provide additional information in test output
683                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
684
685
686class MultiPathServerTestCase(BaseServerTestCase):
687    threadFunc = staticmethod(http_multi_server)
688    request_count = 2
689    def test_path1(self):
690        p = xmlrpclib.ServerProxy(URL+"/foo")
691        self.assertEqual(p.pow(6,8), 6**8)
692        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
693    def test_path2(self):
694        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
695        self.assertEqual(p.add(6,8), 6+8)
696        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
697
698#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
699#does indeed serve subsequent requests on the same connection
700class BaseKeepaliveServerTestCase(BaseServerTestCase):
701    #a request handler that supports keep-alive and logs requests into a
702    #class variable
703    class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
704        parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
705        protocol_version = 'HTTP/1.1'
706        myRequests = []
707        def handle(self):
708            self.myRequests.append([])
709            self.reqidx = len(self.myRequests)-1
710            return self.parentClass.handle(self)
711        def handle_one_request(self):
712            result = self.parentClass.handle_one_request(self)
713            self.myRequests[self.reqidx].append(self.raw_requestline)
714            return result
715
716    requestHandler = RequestHandler
717    def setUp(self):
718        #clear request log
719        self.RequestHandler.myRequests = []
720        return BaseServerTestCase.setUp(self)
721
722#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
723#does indeed serve subsequent requests on the same connection
724class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
725    def test_two(self):
726        p = xmlrpclib.ServerProxy(URL)
727        #do three requests.
728        self.assertEqual(p.pow(6,8), 6**8)
729        self.assertEqual(p.pow(6,8), 6**8)
730        self.assertEqual(p.pow(6,8), 6**8)
731
732        #they should have all been handled by a single request handler
733        self.assertEqual(len(self.RequestHandler.myRequests), 1)
734
735        #check that we did at least two (the third may be pending append
736        #due to thread scheduling)
737        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
738
739#test special attribute access on the serverproxy, through the __call__
740#function.
741class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
742    #ask for two keepalive requests to be handled.
743    request_count=2
744
745    def test_close(self):
746        p = xmlrpclib.ServerProxy(URL)
747        #do some requests with close.
748        self.assertEqual(p.pow(6,8), 6**8)
749        self.assertEqual(p.pow(6,8), 6**8)
750        self.assertEqual(p.pow(6,8), 6**8)
751        p("close")() #this should trigger a new keep-alive request
752        self.assertEqual(p.pow(6,8), 6**8)
753        self.assertEqual(p.pow(6,8), 6**8)
754        self.assertEqual(p.pow(6,8), 6**8)
755
756        #they should have all been two request handlers, each having logged at least
757        #two complete requests
758        self.assertEqual(len(self.RequestHandler.myRequests), 2)
759        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
760        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
761
762    def test_transport(self):
763        p = xmlrpclib.ServerProxy(URL)
764        #do some requests with close.
765        self.assertEqual(p.pow(6,8), 6**8)
766        p("transport").close() #same as above, really.
767        self.assertEqual(p.pow(6,8), 6**8)
768        self.assertEqual(len(self.RequestHandler.myRequests), 2)
769
770#A test case that verifies that gzip encoding works in both directions
771#(for a request and the response)
772@unittest.skipUnless(gzip, 'gzip not available')
773class GzipServerTestCase(BaseServerTestCase):
774    #a request handler that supports keep-alive and logs requests into a
775    #class variable
776    class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
777        parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
778        protocol_version = 'HTTP/1.1'
779
780        def do_POST(self):
781            #store content of last request in class
782            self.__class__.content_length = int(self.headers["content-length"])
783            return self.parentClass.do_POST(self)
784    requestHandler = RequestHandler
785
786    class Transport(xmlrpclib.Transport):
787        #custom transport, stores the response length for our perusal
788        fake_gzip = False
789        def parse_response(self, response):
790            self.response_length=int(response.getheader("content-length", 0))
791            return xmlrpclib.Transport.parse_response(self, response)
792
793        def send_content(self, connection, body):
794            if self.fake_gzip:
795                #add a lone gzip header to induce decode error remotely
796                connection.putheader("Content-Encoding", "gzip")
797            return xmlrpclib.Transport.send_content(self, connection, body)
798
799    def setUp(self):
800        BaseServerTestCase.setUp(self)
801
802    def test_gzip_request(self):
803        t = self.Transport()
804        t.encode_threshold = None
805        p = xmlrpclib.ServerProxy(URL, transport=t)
806        self.assertEqual(p.pow(6,8), 6**8)
807        a = self.RequestHandler.content_length
808        t.encode_threshold = 0 #turn on request encoding
809        self.assertEqual(p.pow(6,8), 6**8)
810        b = self.RequestHandler.content_length
811        self.assertTrue(a>b)
812
813    def test_bad_gzip_request(self):
814        t = self.Transport()
815        t.encode_threshold = None
816        t.fake_gzip = True
817        p = xmlrpclib.ServerProxy(URL, transport=t)
818        cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,
819                                     re.compile(r"\b400\b"))
820        with cm:
821            p.pow(6, 8)
822
823    def test_gzip_response(self):
824        t = self.Transport()
825        p = xmlrpclib.ServerProxy(URL, transport=t)
826        old = self.requestHandler.encode_threshold
827        self.requestHandler.encode_threshold = None #no encoding
828        self.assertEqual(p.pow(6,8), 6**8)
829        a = t.response_length
830        self.requestHandler.encode_threshold = 0 #always encode
831        self.assertEqual(p.pow(6,8), 6**8)
832        b = t.response_length
833        self.requestHandler.encode_threshold = old
834        self.assertTrue(a>b)
835
836    def test_gzip_decode_limit(self):
837        max_gzip_decode = 20 * 1024 * 1024
838        data = '\0' * max_gzip_decode
839        encoded = xmlrpclib.gzip_encode(data)
840        decoded = xmlrpclib.gzip_decode(encoded)
841        self.assertEqual(len(decoded), max_gzip_decode)
842
843        data = '\0' * (max_gzip_decode + 1)
844        encoded = xmlrpclib.gzip_encode(data)
845
846        with self.assertRaisesRegexp(ValueError,
847                                     "max gzipped payload length exceeded"):
848            xmlrpclib.gzip_decode(encoded)
849
850        xmlrpclib.gzip_decode(encoded, max_decode=-1)
851
852
853#Test special attributes of the ServerProxy object
854class ServerProxyTestCase(unittest.TestCase):
855    def setUp(self):
856        unittest.TestCase.setUp(self)
857        if threading:
858            self.url = URL
859        else:
860            # Without threading, http_server() and http_multi_server() will not
861            # be executed and URL is still equal to None. 'http://' is a just
862            # enough to choose the scheme (HTTP)
863            self.url = 'http://'
864
865    def test_close(self):
866        p = xmlrpclib.ServerProxy(self.url)
867        self.assertEqual(p('close')(), None)
868
869    def test_transport(self):
870        t = xmlrpclib.Transport()
871        p = xmlrpclib.ServerProxy(self.url, transport=t)
872        self.assertEqual(p('transport'), t)
873
874# This is a contrived way to make a failure occur on the server side
875# in order to test the _send_traceback_header flag on the server
876class FailingMessageClass(mimetools.Message):
877    def __getitem__(self, key):
878        key = key.lower()
879        if key == 'content-length':
880            return 'I am broken'
881        return mimetools.Message.__getitem__(self, key)
882
883
884@unittest.skipUnless(threading, 'Threading required for this test.')
885class FailingServerTestCase(unittest.TestCase):
886    def setUp(self):
887        self.evt = threading.Event()
888        # start server thread to handle requests
889        serv_args = (self.evt, 1)
890        threading.Thread(target=http_server, args=serv_args).start()
891
892        # wait for the server to be ready
893        self.evt.wait()
894        self.evt.clear()
895
896    def tearDown(self):
897        # wait on the server thread to terminate
898        self.evt.wait()
899        # reset flag
900        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
901        # reset message class
902        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
903
904    def test_basic(self):
905        # check that flag is false by default
906        flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
907        self.assertEqual(flagval, False)
908
909        # enable traceback reporting
910        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
911
912        # test a call that shouldn't fail just as a smoke test
913        try:
914            p = xmlrpclib.ServerProxy(URL)
915            self.assertEqual(p.pow(6,8), 6**8)
916        except (xmlrpclib.ProtocolError, socket.error), e:
917            # ignore failures due to non-blocking socket 'unavailable' errors
918            if not is_unavailable_exception(e):
919                # protocol error; provide additional information in test output
920                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
921
922    def test_fail_no_info(self):
923        # use the broken message class
924        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
925
926        try:
927            p = xmlrpclib.ServerProxy(URL)
928            p.pow(6,8)
929        except (xmlrpclib.ProtocolError, socket.error), e:
930            # ignore failures due to non-blocking socket 'unavailable' errors
931            if not is_unavailable_exception(e) and hasattr(e, "headers"):
932                # The two server-side error headers shouldn't be sent back in this case
933                self.assertTrue(e.headers.get("X-exception") is None)
934                self.assertTrue(e.headers.get("X-traceback") is None)
935        else:
936            self.fail('ProtocolError not raised')
937
938    def test_fail_with_info(self):
939        # use the broken message class
940        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
941
942        # Check that errors in the server send back exception/traceback
943        # info when flag is set
944        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
945
946        try:
947            p = xmlrpclib.ServerProxy(URL)
948            p.pow(6,8)
949        except (xmlrpclib.ProtocolError, socket.error), e:
950            # ignore failures due to non-blocking socket 'unavailable' errors
951            if not is_unavailable_exception(e) and hasattr(e, "headers"):
952                # We should get error info in the response
953                expected_err = "invalid literal for int() with base 10: 'I am broken'"
954                self.assertEqual(e.headers.get("x-exception"), expected_err)
955                self.assertTrue(e.headers.get("x-traceback") is not None)
956        else:
957            self.fail('ProtocolError not raised')
958
959class CGIHandlerTestCase(unittest.TestCase):
960    def setUp(self):
961        self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
962
963    def tearDown(self):
964        self.cgi = None
965
966    def test_cgi_get(self):
967        with test_support.EnvironmentVarGuard() as env:
968            env['REQUEST_METHOD'] = 'GET'
969            # if the method is GET and no request_text is given, it runs handle_get
970            # get sysout output
971            with test_support.captured_stdout() as data_out:
972                self.cgi.handle_request()
973
974            # parse Status header
975            data_out.seek(0)
976            handle = data_out.read()
977            status = handle.split()[1]
978            message = ' '.join(handle.split()[2:4])
979
980            self.assertEqual(status, '400')
981            self.assertEqual(message, 'Bad Request')
982
983
984    def test_cgi_xmlrpc_response(self):
985        data = """<?xml version='1.0'?>
986        <methodCall>
987            <methodName>test_method</methodName>
988            <params>
989                <param>
990                    <value><string>foo</string></value>
991                </param>
992                <param>
993                    <value><string>bar</string></value>
994                </param>
995            </params>
996        </methodCall>
997        """
998
999        with test_support.EnvironmentVarGuard() as env, \
1000             test_support.captured_stdout() as data_out, \
1001             test_support.captured_stdin() as data_in:
1002            data_in.write(data)
1003            data_in.seek(0)
1004            env['CONTENT_LENGTH'] = str(len(data))
1005            self.cgi.handle_request()
1006        data_out.seek(0)
1007
1008        # will respond exception, if so, our goal is achieved ;)
1009        handle = data_out.read()
1010
1011        # start with 44th char so as not to get http header, we just need only xml
1012        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1013
1014        # Also test the content-length returned  by handle_request
1015        # Using the same test method inorder to avoid all the datapassing
1016        # boilerplate code.
1017        # Test for bug: http://bugs.python.org/issue5040
1018
1019        content = handle[handle.find("<?xml"):]
1020
1021        self.assertEqual(
1022            int(re.search('Content-Length: (\d+)', handle).group(1)),
1023            len(content))
1024
1025
1026class FakeSocket:
1027
1028    def __init__(self):
1029        self.data = StringIO.StringIO()
1030
1031    def send(self, buf):
1032        self.data.write(buf)
1033        return len(buf)
1034
1035    def sendall(self, buf):
1036        self.data.write(buf)
1037
1038    def getvalue(self):
1039        return self.data.getvalue()
1040
1041    def makefile(self, x='r', y=-1):
1042        raise RuntimeError
1043
1044    def close(self):
1045        pass
1046
1047class FakeTransport(xmlrpclib.Transport):
1048    """A Transport instance that records instead of sending a request.
1049
1050    This class replaces the actual socket used by httplib with a
1051    FakeSocket object that records the request.  It doesn't provide a
1052    response.
1053    """
1054
1055    def make_connection(self, host):
1056        conn = xmlrpclib.Transport.make_connection(self, host)
1057        conn.sock = self.fake_socket = FakeSocket()
1058        return conn
1059
1060class TransportSubclassTestCase(unittest.TestCase):
1061
1062    def issue_request(self, transport_class):
1063        """Return an HTTP request made via transport_class."""
1064        transport = transport_class()
1065        proxy = xmlrpclib.ServerProxy("http://example.com/",
1066                                      transport=transport)
1067        try:
1068            proxy.pow(6, 8)
1069        except RuntimeError:
1070            return transport.fake_socket.getvalue()
1071        return None
1072
1073    def test_custom_user_agent(self):
1074        class TestTransport(FakeTransport):
1075
1076            def send_user_agent(self, conn):
1077                xmlrpclib.Transport.send_user_agent(self, conn)
1078                conn.putheader("X-Test", "test_custom_user_agent")
1079
1080        req = self.issue_request(TestTransport)
1081        self.assertIn("X-Test: test_custom_user_agent\r\n", req)
1082
1083    def test_send_host(self):
1084        class TestTransport(FakeTransport):
1085
1086            def send_host(self, conn, host):
1087                xmlrpclib.Transport.send_host(self, conn, host)
1088                conn.putheader("X-Test", "test_send_host")
1089
1090        req = self.issue_request(TestTransport)
1091        self.assertIn("X-Test: test_send_host\r\n", req)
1092
1093    def test_send_request(self):
1094        class TestTransport(FakeTransport):
1095
1096            def send_request(self, conn, url, body):
1097                xmlrpclib.Transport.send_request(self, conn, url, body)
1098                conn.putheader("X-Test", "test_send_request")
1099
1100        req = self.issue_request(TestTransport)
1101        self.assertIn("X-Test: test_send_request\r\n", req)
1102
1103    def test_send_content(self):
1104        class TestTransport(FakeTransport):
1105
1106            def send_content(self, conn, body):
1107                conn.putheader("X-Test", "test_send_content")
1108                xmlrpclib.Transport.send_content(self, conn, body)
1109
1110        req = self.issue_request(TestTransport)
1111        self.assertIn("X-Test: test_send_content\r\n", req)
1112
1113@test_support.reap_threads
1114def test_main():
1115    xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1116         BinaryTestCase, FaultTestCase, TransportSubclassTestCase]
1117    xmlrpc_tests.append(SimpleServerTestCase)
1118    xmlrpc_tests.append(SimpleServerEncodingTestCase)
1119    xmlrpc_tests.append(KeepaliveServerTestCase1)
1120    xmlrpc_tests.append(KeepaliveServerTestCase2)
1121    xmlrpc_tests.append(GzipServerTestCase)
1122    xmlrpc_tests.append(MultiPathServerTestCase)
1123    xmlrpc_tests.append(ServerProxyTestCase)
1124    xmlrpc_tests.append(FailingServerTestCase)
1125    xmlrpc_tests.append(CGIHandlerTestCase)
1126
1127    test_support.run_unittest(*xmlrpc_tests)
1128
1129if __name__ == "__main__":
1130    test_main()
1131