• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import datetime
3import sys
4import time
5import unittest
6import xmlrpclib
7import SimpleXMLRPCServer
8import mimetools
9import httplib
10import socket
11import StringIO
12import os
13import re
14from test import test_support
15
16try:
17    import threading
18except ImportError:
19    threading = None
20
21try:
22    import gzip
23except ImportError:
24    gzip = None
25
26alist = [{'astring': 'foo@bar.baz.spam',
27          'afloat': 7283.43,
28          'anint': 2**20,
29          'ashortlong': 2L,
30          'anotherlist': ['.zyx.41'],
31          'abase64': xmlrpclib.Binary("my dog has fleas"),
32          'boolean': xmlrpclib.False,
33          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
34          'datetime2': xmlrpclib.DateTime(
35                        (2005, 02, 10, 11, 41, 23, 0, 1, -1)),
36          'datetime3': xmlrpclib.DateTime(
37                        datetime.datetime(2005, 02, 10, 11, 41, 23)),
38          }]
39
40if test_support.have_unicode:
41    alist[0].update({
42          'unicode': test_support.u(r'\u4000\u6000\u8000'),
43          test_support.u(r'ukey\u4000'): 'regular value',
44    })
45
46class XMLRPCTestCase(unittest.TestCase):
47
48    def test_dump_load(self):
49        self.assertEqual(alist,
50                         xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0])
51
52    def test_dump_bare_datetime(self):
53        # This checks that an unwrapped datetime.date object can be handled
54        # by the marshalling code.  This can't be done via test_dump_load()
55        # since with use_datetime set to 1 the unmarshaller would create
56        # datetime objects for the 'datetime[123]' keys as well
57        dt = datetime.datetime(2005, 02, 10, 11, 41, 23)
58        s = xmlrpclib.dumps((dt,))
59        (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
60        self.assertEqual(newdt, dt)
61        self.assertEqual(m, None)
62
63        (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
64        self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
65
66    def test_datetime_before_1900(self):
67        # same as before but with a date before 1900
68        dt = datetime.datetime(1, 02, 10, 11, 41, 23)
69        s = xmlrpclib.dumps((dt,))
70        (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
71        self.assertEqual(newdt, dt)
72        self.assertEqual(m, None)
73
74        (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
75        self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
76
77    def test_cmp_datetime_DateTime(self):
78        now = datetime.datetime.now()
79        dt = xmlrpclib.DateTime(now.timetuple())
80        self.assertTrue(dt == now)
81        self.assertTrue(now == dt)
82        then = now + datetime.timedelta(seconds=4)
83        self.assertTrue(then >= dt)
84        self.assertTrue(dt < then)
85
86    def test_bug_1164912 (self):
87        d = xmlrpclib.DateTime()
88        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
89                                            methodresponse=True))
90        self.assertIsInstance(new_d.value, str)
91
92        # Check that the output of dumps() is still an 8-bit string
93        s = xmlrpclib.dumps((new_d,), methodresponse=True)
94        self.assertIsInstance(s, str)
95
96    def test_newstyle_class(self):
97        class T(object):
98            pass
99        t = T()
100        t.x = 100
101        t.y = "Hello"
102        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
103        self.assertEqual(t2, t.__dict__)
104
105    def test_dump_big_long(self):
106        self.assertRaises(OverflowError, xmlrpclib.dumps, (2L**99,))
107
108    def test_dump_bad_dict(self):
109        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
110
111    def test_dump_recursive_seq(self):
112        l = [1,2,3]
113        t = [3,4,5,l]
114        l.append(t)
115        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
116
117    def test_dump_recursive_dict(self):
118        d = {'1':1, '2':1}
119        t = {'3':3, 'd':d}
120        d['t'] = t
121        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
122
123    def test_dump_big_int(self):
124        if sys.maxint > 2L**31-1:
125            self.assertRaises(OverflowError, xmlrpclib.dumps,
126                              (int(2L**34),))
127
128        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
129        self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,))
130        self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,))
131
132        def dummy_write(s):
133            pass
134
135        m = xmlrpclib.Marshaller()
136        m.dump_int(xmlrpclib.MAXINT, dummy_write)
137        m.dump_int(xmlrpclib.MININT, dummy_write)
138        self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write)
139        self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write)
140
141
142    def test_dump_none(self):
143        value = alist + [None]
144        arg1 = (alist + [None],)
145        strg = xmlrpclib.dumps(arg1, allow_none=True)
146        self.assertEqual(value,
147                         xmlrpclib.loads(strg)[0][0])
148        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
149
150    @test_support.requires_unicode
151    def test_dump_encoding(self):
152        value = {test_support.u(r'key\u20ac\xa4'):
153                 test_support.u(r'value\u20ac\xa4')}
154        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
155        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
156        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
157
158        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
159                               methodresponse=True)
160        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
161
162        methodname = test_support.u(r'method\u20ac\xa4')
163        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
164                               methodname=methodname)
165        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
166        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
167
168    @test_support.requires_unicode
169    def test_default_encoding_issues(self):
170        # SF bug #1115989: wrong decoding in '_stringify'
171        utf8 = """<?xml version='1.0' encoding='iso-8859-1'?>
172                  <params>
173                    <param><value>
174                      <string>abc \x95</string>
175                      </value></param>
176                    <param><value>
177                      <struct>
178                        <member>
179                          <name>def \x96</name>
180                          <value><string>ghi \x97</string></value>
181                          </member>
182                        </struct>
183                      </value></param>
184                  </params>
185                  """
186
187        # sys.setdefaultencoding() normally doesn't exist after site.py is
188        # loaded.  Import a temporary fresh copy to get access to it
189        # but then restore the original copy to avoid messing with
190        # other potentially modified sys module attributes
191        old_encoding = sys.getdefaultencoding()
192        with test_support.CleanImport('sys'):
193            import sys as temp_sys
194            temp_sys.setdefaultencoding("iso-8859-1")
195            try:
196                (s, d), m = xmlrpclib.loads(utf8)
197            finally:
198                temp_sys.setdefaultencoding(old_encoding)
199
200        items = d.items()
201        if test_support.have_unicode:
202            self.assertEqual(s, u"abc \x95")
203            self.assertIsInstance(s, unicode)
204            self.assertEqual(items, [(u"def \x96", u"ghi \x97")])
205            self.assertIsInstance(items[0][0], unicode)
206            self.assertIsInstance(items[0][1], unicode)
207        else:
208            self.assertEqual(s, "abc \xc2\x95")
209            self.assertEqual(items, [("def \xc2\x96", "ghi \xc2\x97")])
210
211    def test_loads_unsupported(self):
212        ResponseError = xmlrpclib.ResponseError
213        data = '<params><param><value><spam/></value></param></params>'
214        self.assertRaises(ResponseError, xmlrpclib.loads, data)
215        data = ('<params><param><value><array>'
216                '<value><spam/></value>'
217                '</array></value></param></params>')
218        self.assertRaises(ResponseError, xmlrpclib.loads, data)
219        data = ('<params><param><value><struct>'
220                '<member><name>a</name><value><spam/></value></member>'
221                '<member><name>b</name><value><spam/></value></member>'
222                '</struct></value></param></params>')
223        self.assertRaises(ResponseError, xmlrpclib.loads, data)
224
225
226class HelperTestCase(unittest.TestCase):
227    def test_escape(self):
228        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
229        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
230        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
231
232class FaultTestCase(unittest.TestCase):
233    def test_repr(self):
234        f = xmlrpclib.Fault(42, 'Test Fault')
235        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
236        self.assertEqual(repr(f), str(f))
237
238    def test_dump_fault(self):
239        f = xmlrpclib.Fault(42, 'Test Fault')
240        s = xmlrpclib.dumps((f,))
241        (newf,), m = xmlrpclib.loads(s)
242        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
243        self.assertEqual(m, None)
244
245        s = xmlrpclib.Marshaller().dumps(f)
246        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
247
248
249class DateTimeTestCase(unittest.TestCase):
250    def test_default(self):
251        t = xmlrpclib.DateTime()
252
253    def test_time(self):
254        d = 1181399930.036952
255        t = xmlrpclib.DateTime(d)
256        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
257
258    def test_time_tuple(self):
259        d = (2007,6,9,10,38,50,5,160,0)
260        t = xmlrpclib.DateTime(d)
261        self.assertEqual(str(t), '20070609T10:38:50')
262
263    def test_time_struct(self):
264        d = time.localtime(1181399930.036952)
265        t = xmlrpclib.DateTime(d)
266        self.assertEqual(str(t),  time.strftime("%Y%m%dT%H:%M:%S", d))
267
268    def test_datetime_datetime(self):
269        d = datetime.datetime(2007,1,2,3,4,5)
270        t = xmlrpclib.DateTime(d)
271        self.assertEqual(str(t), '20070102T03:04:05')
272
273    def test_repr(self):
274        d = datetime.datetime(2007,1,2,3,4,5)
275        t = xmlrpclib.DateTime(d)
276        val ="<DateTime '20070102T03:04:05' at %x>" % id(t)
277        self.assertEqual(repr(t), val)
278
279    def test_decode(self):
280        d = ' 20070908T07:11:13  '
281        t1 = xmlrpclib.DateTime()
282        t1.decode(d)
283        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
284        self.assertEqual(t1, tref)
285
286        t2 = xmlrpclib._datetime(d)
287        self.assertEqual(t1, tref)
288
289class BinaryTestCase(unittest.TestCase):
290    def test_default(self):
291        t = xmlrpclib.Binary()
292        self.assertEqual(str(t), '')
293
294    def test_string(self):
295        d = '\x01\x02\x03abc123\xff\xfe'
296        t = xmlrpclib.Binary(d)
297        self.assertEqual(str(t), d)
298
299    def test_decode(self):
300        d = '\x01\x02\x03abc123\xff\xfe'
301        de = base64.encodestring(d)
302        t1 = xmlrpclib.Binary()
303        t1.decode(de)
304        self.assertEqual(str(t1), d)
305
306        t2 = xmlrpclib._binary(de)
307        self.assertEqual(str(t2), d)
308
309
310ADDR = PORT = URL = None
311
312# The evt is set twice.  First when the server is ready to serve.
313# Second when the server has been shutdown.  The user must clear
314# the event after it has been set the first time to catch the second set.
315def http_server(evt, numrequests, requestHandler=None, encoding=None):
316    class TestInstanceClass:
317        def div(self, x, y):
318            return x // y
319
320        def _methodHelp(self, name):
321            if name == 'div':
322                return 'This is the div function'
323
324    def my_function():
325        '''This is my function'''
326        return True
327
328    class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
329        def get_request(self):
330            # Ensure the socket is always non-blocking.  On Linux, socket
331            # attributes are not inherited like they are on *BSD and Windows.
332            s, port = self.socket.accept()
333            s.setblocking(True)
334            return s, port
335
336    if not requestHandler:
337        requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
338    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
339                          encoding=encoding,
340                          logRequests=False, bind_and_activate=False)
341    try:
342        serv.socket.settimeout(3)
343        serv.server_bind()
344        global ADDR, PORT, URL
345        ADDR, PORT = serv.socket.getsockname()
346        #connect to IP address directly.  This avoids socket.create_connection()
347        #trying to connect to "localhost" using all address families, which
348        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
349        #on AF_INET only.
350        URL = "http://%s:%d"%(ADDR, PORT)
351        serv.server_activate()
352        serv.register_introspection_functions()
353        serv.register_multicall_functions()
354        serv.register_function(pow)
355        serv.register_function(lambda x,y: x+y, 'add')
356        serv.register_function(lambda x: x, test_support.u(r't\xea\u0161t'))
357        serv.register_function(my_function)
358        serv.register_instance(TestInstanceClass())
359        evt.set()
360
361        # handle up to 'numrequests' requests
362        while numrequests > 0:
363            serv.handle_request()
364            numrequests -= 1
365
366    except socket.timeout:
367        pass
368    finally:
369        serv.socket.close()
370        PORT = None
371        evt.set()
372
373def http_multi_server(evt, numrequests, requestHandler=None):
374    class TestInstanceClass:
375        def div(self, x, y):
376            return x // y
377
378        def _methodHelp(self, name):
379            if name == 'div':
380                return 'This is the div function'
381
382    def my_function():
383        '''This is my function'''
384        return True
385
386    class MyXMLRPCServer(SimpleXMLRPCServer.MultiPathXMLRPCServer):
387        def get_request(self):
388            # Ensure the socket is always non-blocking.  On Linux, socket
389            # attributes are not inherited like they are on *BSD and Windows.
390            s, port = self.socket.accept()
391            s.setblocking(True)
392            return s, port
393
394    if not requestHandler:
395        requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
396    class MyRequestHandler(requestHandler):
397        rpc_paths = []
398
399    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
400                          logRequests=False, bind_and_activate=False)
401    serv.socket.settimeout(3)
402    serv.server_bind()
403    try:
404        global ADDR, PORT, URL
405        ADDR, PORT = serv.socket.getsockname()
406        #connect to IP address directly.  This avoids socket.create_connection()
407        #trying to connect to "localhost" using all address families, which
408        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
409        #on AF_INET only.
410        URL = "http://%s:%d"%(ADDR, PORT)
411        serv.server_activate()
412        paths = ["/foo", "/foo/bar"]
413        for path in paths:
414            d = serv.add_dispatcher(path, SimpleXMLRPCServer.SimpleXMLRPCDispatcher())
415            d.register_introspection_functions()
416            d.register_multicall_functions()
417        serv.get_dispatcher(paths[0]).register_function(pow)
418        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
419        evt.set()
420
421        # handle up to 'numrequests' requests
422        while numrequests > 0:
423            serv.handle_request()
424            numrequests -= 1
425
426    except socket.timeout:
427        pass
428    finally:
429        serv.socket.close()
430        PORT = None
431        evt.set()
432
433# This function prevents errors like:
434#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
435def is_unavailable_exception(e):
436    '''Returns True if the given ProtocolError is the product of a server-side
437       exception caused by the 'temporarily unavailable' response sometimes
438       given by operations on non-blocking sockets.'''
439
440    # sometimes we get a -1 error code and/or empty headers
441    try:
442        if e.errcode == -1 or e.headers is None:
443            return True
444        exc_mess = e.headers.get('X-exception')
445    except AttributeError:
446        # Ignore socket.errors here.
447        exc_mess = str(e)
448
449    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
450        return True
451
452    return False
453
454@unittest.skipUnless(threading, 'Threading required for this test.')
455class BaseServerTestCase(unittest.TestCase):
456    requestHandler = None
457    request_count = 1
458    threadFunc = staticmethod(http_server)
459
460    def setUp(self):
461        # enable traceback reporting
462        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
463
464        self.evt = threading.Event()
465        # start server thread to handle requests
466        serv_args = (self.evt, self.request_count, self.requestHandler)
467        threading.Thread(target=self.threadFunc, args=serv_args).start()
468
469        # wait for the server to be ready
470        self.evt.wait(10)
471        self.evt.clear()
472
473    def tearDown(self):
474        # wait on the server thread to terminate
475        self.evt.wait(10)
476
477        # disable traceback reporting
478        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
479
480# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
481# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer.  This
482# condition occurs infrequently on some platforms, frequently on others, and
483# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
484# If the server class is updated at some point in the future to handle this
485# situation more gracefully, these tests should be modified appropriately.
486
487class SimpleServerTestCase(BaseServerTestCase):
488    def test_simple1(self):
489        try:
490            p = xmlrpclib.ServerProxy(URL)
491            self.assertEqual(p.pow(6,8), 6**8)
492        except (xmlrpclib.ProtocolError, socket.error), e:
493            # ignore failures due to non-blocking socket 'unavailable' errors
494            if not is_unavailable_exception(e):
495                # protocol error; provide additional information in test output
496                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
497
498    @test_support.requires_unicode
499    def test_nonascii(self):
500        start_string = test_support.u(r'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t')
501        end_string = test_support.u(r'h\N{LATIN SMALL LETTER O WITH HORN}n')
502
503        try:
504            p = xmlrpclib.ServerProxy(URL)
505            self.assertEqual(p.add(start_string, end_string),
506                             start_string + end_string)
507        except (xmlrpclib.ProtocolError, socket.error) as e:
508            # ignore failures due to non-blocking socket unavailable errors.
509            if not is_unavailable_exception(e):
510                # protocol error; provide additional information in test output
511                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
512
513    @test_support.requires_unicode
514    def test_unicode_host(self):
515        server = xmlrpclib.ServerProxy(u"http://%s:%d/RPC2"%(ADDR, PORT))
516        self.assertEqual(server.add("a", u"\xe9"), u"a\xe9")
517
518    @test_support.requires_unicode
519    def test_client_encoding(self):
520        start_string = unichr(0x20ac)
521        end_string = unichr(0xa4)
522
523        try:
524            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
525            self.assertEqual(p.add(start_string, end_string),
526                             start_string + end_string)
527        except (xmlrpclib.ProtocolError, socket.error) as e:
528            # ignore failures due to non-blocking socket unavailable errors.
529            if not is_unavailable_exception(e):
530                # protocol error; provide additional information in test output
531                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
532
533    @test_support.requires_unicode
534    def test_nonascii_methodname(self):
535        try:
536            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
537            m = getattr(p, 't\xea\xa8t')
538            self.assertEqual(m(42), 42)
539        except (xmlrpclib.ProtocolError, socket.error) as e:
540            # ignore failures due to non-blocking socket unavailable errors.
541            if not is_unavailable_exception(e):
542                # protocol error; provide additional information in test output
543                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
544
545    # [ch] The test 404 is causing lots of false alarms.
546    def XXXtest_404(self):
547        # send POST with httplib, it should return 404 header and
548        # 'Not Found' message.
549        conn = httplib.HTTPConnection(ADDR, PORT)
550        conn.request('POST', '/this-is-not-valid')
551        response = conn.getresponse()
552        conn.close()
553
554        self.assertEqual(response.status, 404)
555        self.assertEqual(response.reason, 'Not Found')
556
557    def test_introspection1(self):
558        try:
559            p = xmlrpclib.ServerProxy(URL)
560            meth = p.system.listMethods()
561            expected_methods = set(['pow', 'div', 'my_function', 'add',
562                                    test_support.u(r't\xea\u0161t'),
563                                    'system.listMethods', 'system.methodHelp',
564                                    'system.methodSignature', 'system.multicall'])
565            self.assertEqual(set(meth), expected_methods)
566        except (xmlrpclib.ProtocolError, socket.error), e:
567            # ignore failures due to non-blocking socket 'unavailable' errors
568            if not is_unavailable_exception(e):
569                # protocol error; provide additional information in test output
570                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
571
572    def test_introspection2(self):
573        try:
574            # test _methodHelp()
575            p = xmlrpclib.ServerProxy(URL)
576            divhelp = p.system.methodHelp('div')
577            self.assertEqual(divhelp, 'This is the div function')
578        except (xmlrpclib.ProtocolError, socket.error), e:
579            # ignore failures due to non-blocking socket 'unavailable' errors
580            if not is_unavailable_exception(e):
581                # protocol error; provide additional information in test output
582                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
583
584    @unittest.skipIf(sys.flags.optimize >= 2,
585                     "Docstrings are omitted with -O2 and above")
586    def test_introspection3(self):
587        try:
588            # test native doc
589            p = xmlrpclib.ServerProxy(URL)
590            myfunction = p.system.methodHelp('my_function')
591            self.assertEqual(myfunction, 'This is my function')
592        except (xmlrpclib.ProtocolError, socket.error), e:
593            # ignore failures due to non-blocking socket 'unavailable' errors
594            if not is_unavailable_exception(e):
595                # protocol error; provide additional information in test output
596                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
597
598    def test_introspection4(self):
599        # the SimpleXMLRPCServer doesn't support signatures, but
600        # at least check that we can try making the call
601        try:
602            p = xmlrpclib.ServerProxy(URL)
603            divsig = p.system.methodSignature('div')
604            self.assertEqual(divsig, 'signatures not supported')
605        except (xmlrpclib.ProtocolError, socket.error), e:
606            # ignore failures due to non-blocking socket 'unavailable' errors
607            if not is_unavailable_exception(e):
608                # protocol error; provide additional information in test output
609                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
610
611    def test_multicall(self):
612        try:
613            p = xmlrpclib.ServerProxy(URL)
614            multicall = xmlrpclib.MultiCall(p)
615            multicall.add(2,3)
616            multicall.pow(6,8)
617            multicall.div(127,42)
618            add_result, pow_result, div_result = multicall()
619            self.assertEqual(add_result, 2+3)
620            self.assertEqual(pow_result, 6**8)
621            self.assertEqual(div_result, 127//42)
622        except (xmlrpclib.ProtocolError, socket.error), e:
623            # ignore failures due to non-blocking socket 'unavailable' errors
624            if not is_unavailable_exception(e):
625                # protocol error; provide additional information in test output
626                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
627
628    def test_non_existing_multicall(self):
629        try:
630            p = xmlrpclib.ServerProxy(URL)
631            multicall = xmlrpclib.MultiCall(p)
632            multicall.this_is_not_exists()
633            result = multicall()
634
635            # result.results contains;
636            # [{'faultCode': 1, 'faultString': '<type \'exceptions.Exception\'>:'
637            #   'method "this_is_not_exists" is not supported'>}]
638
639            self.assertEqual(result.results[0]['faultCode'], 1)
640            self.assertEqual(result.results[0]['faultString'],
641                '<type \'exceptions.Exception\'>:method "this_is_not_exists" '
642                'is not supported')
643        except (xmlrpclib.ProtocolError, socket.error), e:
644            # ignore failures due to non-blocking socket 'unavailable' errors
645            if not is_unavailable_exception(e):
646                # protocol error; provide additional information in test output
647                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
648
649    def test_dotted_attribute(self):
650        # Raises an AttributeError because private methods are not allowed.
651        self.assertRaises(AttributeError,
652                          SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')
653
654        self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
655        # Get the test to run faster by sending a request with test_simple1.
656        # This avoids waiting for the socket timeout.
657        self.test_simple1()
658
659    def test_partial_post(self):
660        # Check that a partial POST doesn't make the server loop: issue #14001.
661        conn = httplib.HTTPConnection(ADDR, PORT)
662        conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
663        conn.close()
664
665class SimpleServerEncodingTestCase(BaseServerTestCase):
666    @staticmethod
667    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
668        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
669
670    @test_support.requires_unicode
671    def test_server_encoding(self):
672        start_string = unichr(0x20ac)
673        end_string = unichr(0xa4)
674
675        try:
676            p = xmlrpclib.ServerProxy(URL)
677            self.assertEqual(p.add(start_string, end_string),
678                             start_string + end_string)
679        except (xmlrpclib.ProtocolError, socket.error) as e:
680            # ignore failures due to non-blocking socket unavailable errors.
681            if not is_unavailable_exception(e):
682                # protocol error; provide additional information in test output
683                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
684
685
686class MultiPathServerTestCase(BaseServerTestCase):
687    threadFunc = staticmethod(http_multi_server)
688    request_count = 2
689    def test_path1(self):
690        p = xmlrpclib.ServerProxy(URL+"/foo")
691        self.assertEqual(p.pow(6,8), 6**8)
692        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
693    def test_path2(self):
694        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
695        self.assertEqual(p.add(6,8), 6+8)
696        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
697
698#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
699#does indeed serve subsequent requests on the same connection
700class BaseKeepaliveServerTestCase(BaseServerTestCase):
701    #a request handler that supports keep-alive and logs requests into a
702    #class variable
703    class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
704        parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
705        protocol_version = 'HTTP/1.1'
706        myRequests = []
707        def handle(self):
708            self.myRequests.append([])
709            self.reqidx = len(self.myRequests)-1
710            return self.parentClass.handle(self)
711        def handle_one_request(self):
712            result = self.parentClass.handle_one_request(self)
713            self.myRequests[self.reqidx].append(self.raw_requestline)
714            return result
715
716    requestHandler = RequestHandler
717    def setUp(self):
718        #clear request log
719        self.RequestHandler.myRequests = []
720        return BaseServerTestCase.setUp(self)
721
722#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
723#does indeed serve subsequent requests on the same connection
724class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
725    def test_two(self):
726        p = xmlrpclib.ServerProxy(URL)
727        #do three requests.
728        self.assertEqual(p.pow(6,8), 6**8)
729        self.assertEqual(p.pow(6,8), 6**8)
730        self.assertEqual(p.pow(6,8), 6**8)
731
732        #they should have all been handled by a single request handler
733        self.assertEqual(len(self.RequestHandler.myRequests), 1)
734
735        #check that we did at least two (the third may be pending append
736        #due to thread scheduling)
737        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
738
739#test special attribute access on the serverproxy, through the __call__
740#function.
741class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
742    #ask for two keepalive requests to be handled.
743    request_count=2
744
745    def test_close(self):
746        p = xmlrpclib.ServerProxy(URL)
747        #do some requests with close.
748        self.assertEqual(p.pow(6,8), 6**8)
749        self.assertEqual(p.pow(6,8), 6**8)
750        self.assertEqual(p.pow(6,8), 6**8)
751        p("close")() #this should trigger a new keep-alive request
752        self.assertEqual(p.pow(6,8), 6**8)
753        self.assertEqual(p.pow(6,8), 6**8)
754        self.assertEqual(p.pow(6,8), 6**8)
755
756        #they should have all been two request handlers, each having logged at least
757        #two complete requests
758        self.assertEqual(len(self.RequestHandler.myRequests), 2)
759        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
760        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
761
762    def test_transport(self):
763        p = xmlrpclib.ServerProxy(URL)
764        #do some requests with close.
765        self.assertEqual(p.pow(6,8), 6**8)
766        p("transport").close() #same as above, really.
767        self.assertEqual(p.pow(6,8), 6**8)
768        self.assertEqual(len(self.RequestHandler.myRequests), 2)
769
770#A test case that verifies that gzip encoding works in both directions
771#(for a request and the response)
772@unittest.skipUnless(gzip, 'gzip not available')
773class GzipServerTestCase(BaseServerTestCase):
774    #a request handler that supports keep-alive and logs requests into a
775    #class variable
776    class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
777        parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
778        protocol_version = 'HTTP/1.1'
779
780        def do_POST(self):
781            #store content of last request in class
782            self.__class__.content_length = int(self.headers["content-length"])
783            return self.parentClass.do_POST(self)
784    requestHandler = RequestHandler
785
786    class Transport(xmlrpclib.Transport):
787        #custom transport, stores the response length for our perusal
788        fake_gzip = False
789        def parse_response(self, response):
790            self.response_length=int(response.getheader("content-length", 0))
791            return xmlrpclib.Transport.parse_response(self, response)
792
793        def send_content(self, connection, body):
794            if self.fake_gzip:
795                #add a lone gzip header to induce decode error remotely
796                connection.putheader("Content-Encoding", "gzip")
797            return xmlrpclib.Transport.send_content(self, connection, body)
798
799    def setUp(self):
800        BaseServerTestCase.setUp(self)
801
802    def test_gzip_request(self):
803        t = self.Transport()
804        t.encode_threshold = None
805        p = xmlrpclib.ServerProxy(URL, transport=t)
806        self.assertEqual(p.pow(6,8), 6**8)
807        a = self.RequestHandler.content_length
808        t.encode_threshold = 0 #turn on request encoding
809        self.assertEqual(p.pow(6,8), 6**8)
810        b = self.RequestHandler.content_length
811        self.assertTrue(a>b)
812
813    def test_bad_gzip_request(self):
814        t = self.Transport()
815        t.encode_threshold = None
816        t.fake_gzip = True
817        p = xmlrpclib.ServerProxy(URL, transport=t)
818        cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,
819                                     re.compile(r"\b400\b"))
820        with cm:
821            p.pow(6, 8)
822
823    def test_gzip_response(self):
824        t = self.Transport()
825        p = xmlrpclib.ServerProxy(URL, transport=t)
826        old = self.requestHandler.encode_threshold
827        self.requestHandler.encode_threshold = None #no encoding
828        self.assertEqual(p.pow(6,8), 6**8)
829        a = t.response_length
830        self.requestHandler.encode_threshold = 0 #always encode
831        self.assertEqual(p.pow(6,8), 6**8)
832        b = t.response_length
833        self.requestHandler.encode_threshold = old
834        self.assertTrue(a>b)
835
836    def test_gzip_decode_limit(self):
837        max_gzip_decode = 20 * 1024 * 1024
838        data = '\0' * max_gzip_decode
839        encoded = xmlrpclib.gzip_encode(data)
840        decoded = xmlrpclib.gzip_decode(encoded)
841        self.assertEqual(len(decoded), max_gzip_decode)
842
843        data = '\0' * (max_gzip_decode + 1)
844        encoded = xmlrpclib.gzip_encode(data)
845
846        with self.assertRaisesRegexp(ValueError,
847                                     "max gzipped payload length exceeded"):
848            xmlrpclib.gzip_decode(encoded)
849
850        xmlrpclib.gzip_decode(encoded, max_decode=-1)
851
852
853#Test special attributes of the ServerProxy object
854class ServerProxyTestCase(unittest.TestCase):
855    def setUp(self):
856        unittest.TestCase.setUp(self)
857        # Actual value of the URL doesn't matter if it is a string in
858        # the correct format.
859        self.url = 'http://fake.localhost'
860
861    def test_close(self):
862        p = xmlrpclib.ServerProxy(self.url)
863        self.assertEqual(p('close')(), None)
864
865    def test_transport(self):
866        t = xmlrpclib.Transport()
867        p = xmlrpclib.ServerProxy(self.url, transport=t)
868        self.assertEqual(p('transport'), t)
869
870# This is a contrived way to make a failure occur on the server side
871# in order to test the _send_traceback_header flag on the server
872class FailingMessageClass(mimetools.Message):
873    def __getitem__(self, key):
874        key = key.lower()
875        if key == 'content-length':
876            return 'I am broken'
877        return mimetools.Message.__getitem__(self, key)
878
879
880@unittest.skipUnless(threading, 'Threading required for this test.')
881class FailingServerTestCase(unittest.TestCase):
882    def setUp(self):
883        self.evt = threading.Event()
884        # start server thread to handle requests
885        serv_args = (self.evt, 1)
886        threading.Thread(target=http_server, args=serv_args).start()
887
888        # wait for the server to be ready
889        self.evt.wait()
890        self.evt.clear()
891
892    def tearDown(self):
893        # wait on the server thread to terminate
894        self.evt.wait()
895        # reset flag
896        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
897        # reset message class
898        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
899
900    def test_basic(self):
901        # check that flag is false by default
902        flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
903        self.assertEqual(flagval, False)
904
905        # enable traceback reporting
906        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
907
908        # test a call that shouldn't fail just as a smoke test
909        try:
910            p = xmlrpclib.ServerProxy(URL)
911            self.assertEqual(p.pow(6,8), 6**8)
912        except (xmlrpclib.ProtocolError, socket.error), e:
913            # ignore failures due to non-blocking socket 'unavailable' errors
914            if not is_unavailable_exception(e):
915                # protocol error; provide additional information in test output
916                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
917
918    def test_fail_no_info(self):
919        # use the broken message class
920        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
921
922        try:
923            p = xmlrpclib.ServerProxy(URL)
924            p.pow(6,8)
925        except (xmlrpclib.ProtocolError, socket.error), e:
926            # ignore failures due to non-blocking socket 'unavailable' errors
927            if not is_unavailable_exception(e) and hasattr(e, "headers"):
928                # The two server-side error headers shouldn't be sent back in this case
929                self.assertTrue(e.headers.get("X-exception") is None)
930                self.assertTrue(e.headers.get("X-traceback") is None)
931        else:
932            self.fail('ProtocolError not raised')
933
934    def test_fail_with_info(self):
935        # use the broken message class
936        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
937
938        # Check that errors in the server send back exception/traceback
939        # info when flag is set
940        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
941
942        try:
943            p = xmlrpclib.ServerProxy(URL)
944            p.pow(6,8)
945        except (xmlrpclib.ProtocolError, socket.error), e:
946            # ignore failures due to non-blocking socket 'unavailable' errors
947            if not is_unavailable_exception(e) and hasattr(e, "headers"):
948                # We should get error info in the response
949                expected_err = "invalid literal for int() with base 10: 'I am broken'"
950                self.assertEqual(e.headers.get("x-exception"), expected_err)
951                self.assertTrue(e.headers.get("x-traceback") is not None)
952        else:
953            self.fail('ProtocolError not raised')
954
955class CGIHandlerTestCase(unittest.TestCase):
956    def setUp(self):
957        self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
958
959    def tearDown(self):
960        self.cgi = None
961
962    def test_cgi_get(self):
963        with test_support.EnvironmentVarGuard() as env:
964            env['REQUEST_METHOD'] = 'GET'
965            # if the method is GET and no request_text is given, it runs handle_get
966            # get sysout output
967            with test_support.captured_stdout() as data_out:
968                self.cgi.handle_request()
969
970            # parse Status header
971            data_out.seek(0)
972            handle = data_out.read()
973            status = handle.split()[1]
974            message = ' '.join(handle.split()[2:4])
975
976            self.assertEqual(status, '400')
977            self.assertEqual(message, 'Bad Request')
978
979
980    def test_cgi_xmlrpc_response(self):
981        data = """<?xml version='1.0'?>
982        <methodCall>
983            <methodName>test_method</methodName>
984            <params>
985                <param>
986                    <value><string>foo</string></value>
987                </param>
988                <param>
989                    <value><string>bar</string></value>
990                </param>
991            </params>
992        </methodCall>
993        """
994
995        with test_support.EnvironmentVarGuard() as env, \
996             test_support.captured_stdout() as data_out, \
997             test_support.captured_stdin() as data_in:
998            data_in.write(data)
999            data_in.seek(0)
1000            env['CONTENT_LENGTH'] = str(len(data))
1001            self.cgi.handle_request()
1002        data_out.seek(0)
1003
1004        # will respond exception, if so, our goal is achieved ;)
1005        handle = data_out.read()
1006
1007        # start with 44th char so as not to get http header, we just need only xml
1008        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1009
1010        # Also test the content-length returned  by handle_request
1011        # Using the same test method inorder to avoid all the datapassing
1012        # boilerplate code.
1013        # Test for bug: http://bugs.python.org/issue5040
1014
1015        content = handle[handle.find("<?xml"):]
1016
1017        self.assertEqual(
1018            int(re.search('Content-Length: (\d+)', handle).group(1)),
1019            len(content))
1020
1021
1022class FakeSocket:
1023
1024    def __init__(self):
1025        self.data = StringIO.StringIO()
1026
1027    def send(self, buf):
1028        self.data.write(buf)
1029        return len(buf)
1030
1031    def sendall(self, buf):
1032        self.data.write(buf)
1033
1034    def getvalue(self):
1035        return self.data.getvalue()
1036
1037    def makefile(self, x='r', y=-1):
1038        raise RuntimeError
1039
1040    def close(self):
1041        pass
1042
1043class FakeTransport(xmlrpclib.Transport):
1044    """A Transport instance that records instead of sending a request.
1045
1046    This class replaces the actual socket used by httplib with a
1047    FakeSocket object that records the request.  It doesn't provide a
1048    response.
1049    """
1050
1051    def make_connection(self, host):
1052        conn = xmlrpclib.Transport.make_connection(self, host)
1053        conn.sock = self.fake_socket = FakeSocket()
1054        return conn
1055
1056class TransportSubclassTestCase(unittest.TestCase):
1057
1058    def issue_request(self, transport_class):
1059        """Return an HTTP request made via transport_class."""
1060        transport = transport_class()
1061        proxy = xmlrpclib.ServerProxy("http://example.com/",
1062                                      transport=transport)
1063        try:
1064            proxy.pow(6, 8)
1065        except RuntimeError:
1066            return transport.fake_socket.getvalue()
1067        return None
1068
1069    def test_custom_user_agent(self):
1070        class TestTransport(FakeTransport):
1071
1072            def send_user_agent(self, conn):
1073                xmlrpclib.Transport.send_user_agent(self, conn)
1074                conn.putheader("X-Test", "test_custom_user_agent")
1075
1076        req = self.issue_request(TestTransport)
1077        self.assertIn("X-Test: test_custom_user_agent\r\n", req)
1078
1079    def test_send_host(self):
1080        class TestTransport(FakeTransport):
1081
1082            def send_host(self, conn, host):
1083                xmlrpclib.Transport.send_host(self, conn, host)
1084                conn.putheader("X-Test", "test_send_host")
1085
1086        req = self.issue_request(TestTransport)
1087        self.assertIn("X-Test: test_send_host\r\n", req)
1088
1089    def test_send_request(self):
1090        class TestTransport(FakeTransport):
1091
1092            def send_request(self, conn, url, body):
1093                xmlrpclib.Transport.send_request(self, conn, url, body)
1094                conn.putheader("X-Test", "test_send_request")
1095
1096        req = self.issue_request(TestTransport)
1097        self.assertIn("X-Test: test_send_request\r\n", req)
1098
1099    def test_send_content(self):
1100        class TestTransport(FakeTransport):
1101
1102            def send_content(self, conn, body):
1103                conn.putheader("X-Test", "test_send_content")
1104                xmlrpclib.Transport.send_content(self, conn, body)
1105
1106        req = self.issue_request(TestTransport)
1107        self.assertIn("X-Test: test_send_content\r\n", req)
1108
1109@test_support.reap_threads
1110def test_main():
1111    xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1112         BinaryTestCase, FaultTestCase, TransportSubclassTestCase]
1113    xmlrpc_tests.append(SimpleServerTestCase)
1114    xmlrpc_tests.append(SimpleServerEncodingTestCase)
1115    xmlrpc_tests.append(KeepaliveServerTestCase1)
1116    xmlrpc_tests.append(KeepaliveServerTestCase2)
1117    xmlrpc_tests.append(GzipServerTestCase)
1118    xmlrpc_tests.append(MultiPathServerTestCase)
1119    xmlrpc_tests.append(ServerProxyTestCase)
1120    xmlrpc_tests.append(FailingServerTestCase)
1121    xmlrpc_tests.append(CGIHandlerTestCase)
1122
1123    test_support.run_unittest(*xmlrpc_tests)
1124
1125if __name__ == "__main__":
1126    test_main()
1127