• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for proactor_events.py"""
2
3import io
4import socket
5import unittest
6import sys
7from unittest import mock
8
9import asyncio
10from asyncio.proactor_events import BaseProactorEventLoop
11from asyncio.proactor_events import _ProactorSocketTransport
12from asyncio.proactor_events import _ProactorWritePipeTransport
13from asyncio.proactor_events import _ProactorDuplexPipeTransport
14from asyncio.proactor_events import _ProactorDatagramTransport
15from test import support
16from test.test_asyncio import utils as test_utils
17
18
19def tearDownModule():
20    asyncio.set_event_loop_policy(None)
21
22
23def close_transport(transport):
24    # Don't call transport.close() because the event loop and the IOCP proactor
25    # are mocked
26    if transport._sock is None:
27        return
28    transport._sock.close()
29    transport._sock = None
30
31
32class ProactorSocketTransportTests(test_utils.TestCase):
33
34    def setUp(self):
35        super().setUp()
36        self.loop = self.new_test_loop()
37        self.addCleanup(self.loop.close)
38        self.proactor = mock.Mock()
39        self.loop._proactor = self.proactor
40        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
41        self.sock = mock.Mock(socket.socket)
42
43    def socket_transport(self, waiter=None):
44        transport = _ProactorSocketTransport(self.loop, self.sock,
45                                             self.protocol, waiter=waiter)
46        self.addCleanup(close_transport, transport)
47        return transport
48
49    def test_ctor(self):
50        fut = self.loop.create_future()
51        tr = self.socket_transport(waiter=fut)
52        test_utils.run_briefly(self.loop)
53        self.assertIsNone(fut.result())
54        self.protocol.connection_made(tr)
55        self.proactor.recv.assert_called_with(self.sock, 32768)
56
57    def test_loop_reading(self):
58        tr = self.socket_transport()
59        tr._loop_reading()
60        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
61        self.assertFalse(self.protocol.data_received.called)
62        self.assertFalse(self.protocol.eof_received.called)
63
64    def test_loop_reading_data(self):
65        res = self.loop.create_future()
66        res.set_result(b'data')
67
68        tr = self.socket_transport()
69        tr._read_fut = res
70        tr._loop_reading(res)
71        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
72        self.protocol.data_received.assert_called_with(b'data')
73
74    def test_loop_reading_no_data(self):
75        res = self.loop.create_future()
76        res.set_result(b'')
77
78        tr = self.socket_transport()
79        self.assertRaises(AssertionError, tr._loop_reading, res)
80
81        tr.close = mock.Mock()
82        tr._read_fut = res
83        tr._loop_reading(res)
84        self.assertFalse(self.loop._proactor.recv.called)
85        self.assertTrue(self.protocol.eof_received.called)
86        self.assertTrue(tr.close.called)
87
88    def test_loop_reading_aborted(self):
89        err = self.loop._proactor.recv.side_effect = ConnectionAbortedError()
90
91        tr = self.socket_transport()
92        tr._fatal_error = mock.Mock()
93        tr._loop_reading()
94        tr._fatal_error.assert_called_with(
95                            err,
96                            'Fatal read error on pipe transport')
97
98    def test_loop_reading_aborted_closing(self):
99        self.loop._proactor.recv.side_effect = ConnectionAbortedError()
100
101        tr = self.socket_transport()
102        tr._closing = True
103        tr._fatal_error = mock.Mock()
104        tr._loop_reading()
105        self.assertFalse(tr._fatal_error.called)
106
107    def test_loop_reading_aborted_is_fatal(self):
108        self.loop._proactor.recv.side_effect = ConnectionAbortedError()
109        tr = self.socket_transport()
110        tr._closing = False
111        tr._fatal_error = mock.Mock()
112        tr._loop_reading()
113        self.assertTrue(tr._fatal_error.called)
114
115    def test_loop_reading_conn_reset_lost(self):
116        err = self.loop._proactor.recv.side_effect = ConnectionResetError()
117
118        tr = self.socket_transport()
119        tr._closing = False
120        tr._fatal_error = mock.Mock()
121        tr._force_close = mock.Mock()
122        tr._loop_reading()
123        self.assertFalse(tr._fatal_error.called)
124        tr._force_close.assert_called_with(err)
125
126    def test_loop_reading_exception(self):
127        err = self.loop._proactor.recv.side_effect = (OSError())
128
129        tr = self.socket_transport()
130        tr._fatal_error = mock.Mock()
131        tr._loop_reading()
132        tr._fatal_error.assert_called_with(
133                            err,
134                            'Fatal read error on pipe transport')
135
136    def test_write(self):
137        tr = self.socket_transport()
138        tr._loop_writing = mock.Mock()
139        tr.write(b'data')
140        self.assertEqual(tr._buffer, None)
141        tr._loop_writing.assert_called_with(data=b'data')
142
143    def test_write_no_data(self):
144        tr = self.socket_transport()
145        tr.write(b'')
146        self.assertFalse(tr._buffer)
147
148    def test_write_more(self):
149        tr = self.socket_transport()
150        tr._write_fut = mock.Mock()
151        tr._loop_writing = mock.Mock()
152        tr.write(b'data')
153        self.assertEqual(tr._buffer, b'data')
154        self.assertFalse(tr._loop_writing.called)
155
156    def test_loop_writing(self):
157        tr = self.socket_transport()
158        tr._buffer = bytearray(b'data')
159        tr._loop_writing()
160        self.loop._proactor.send.assert_called_with(self.sock, b'data')
161        self.loop._proactor.send.return_value.add_done_callback.\
162            assert_called_with(tr._loop_writing)
163
164    @mock.patch('asyncio.proactor_events.logger')
165    def test_loop_writing_err(self, m_log):
166        err = self.loop._proactor.send.side_effect = OSError()
167        tr = self.socket_transport()
168        tr._fatal_error = mock.Mock()
169        tr._buffer = [b'da', b'ta']
170        tr._loop_writing()
171        tr._fatal_error.assert_called_with(
172                            err,
173                            'Fatal write error on pipe transport')
174        tr._conn_lost = 1
175
176        tr.write(b'data')
177        tr.write(b'data')
178        tr.write(b'data')
179        tr.write(b'data')
180        tr.write(b'data')
181        self.assertEqual(tr._buffer, None)
182        m_log.warning.assert_called_with('socket.send() raised exception.')
183
184    def test_loop_writing_stop(self):
185        fut = self.loop.create_future()
186        fut.set_result(b'data')
187
188        tr = self.socket_transport()
189        tr._write_fut = fut
190        tr._loop_writing(fut)
191        self.assertIsNone(tr._write_fut)
192
193    def test_loop_writing_closing(self):
194        fut = self.loop.create_future()
195        fut.set_result(1)
196
197        tr = self.socket_transport()
198        tr._write_fut = fut
199        tr.close()
200        tr._loop_writing(fut)
201        self.assertIsNone(tr._write_fut)
202        test_utils.run_briefly(self.loop)
203        self.protocol.connection_lost.assert_called_with(None)
204
205    def test_abort(self):
206        tr = self.socket_transport()
207        tr._force_close = mock.Mock()
208        tr.abort()
209        tr._force_close.assert_called_with(None)
210
211    def test_close(self):
212        tr = self.socket_transport()
213        tr.close()
214        test_utils.run_briefly(self.loop)
215        self.protocol.connection_lost.assert_called_with(None)
216        self.assertTrue(tr.is_closing())
217        self.assertEqual(tr._conn_lost, 1)
218
219        self.protocol.connection_lost.reset_mock()
220        tr.close()
221        test_utils.run_briefly(self.loop)
222        self.assertFalse(self.protocol.connection_lost.called)
223
224    def test_close_write_fut(self):
225        tr = self.socket_transport()
226        tr._write_fut = mock.Mock()
227        tr.close()
228        test_utils.run_briefly(self.loop)
229        self.assertFalse(self.protocol.connection_lost.called)
230
231    def test_close_buffer(self):
232        tr = self.socket_transport()
233        tr._buffer = [b'data']
234        tr.close()
235        test_utils.run_briefly(self.loop)
236        self.assertFalse(self.protocol.connection_lost.called)
237
238    @mock.patch('asyncio.base_events.logger')
239    def test_fatal_error(self, m_logging):
240        tr = self.socket_transport()
241        tr._force_close = mock.Mock()
242        tr._fatal_error(None)
243        self.assertTrue(tr._force_close.called)
244        self.assertTrue(m_logging.error.called)
245
246    def test_force_close(self):
247        tr = self.socket_transport()
248        tr._buffer = [b'data']
249        read_fut = tr._read_fut = mock.Mock()
250        write_fut = tr._write_fut = mock.Mock()
251        tr._force_close(None)
252
253        read_fut.cancel.assert_called_with()
254        write_fut.cancel.assert_called_with()
255        test_utils.run_briefly(self.loop)
256        self.protocol.connection_lost.assert_called_with(None)
257        self.assertEqual(None, tr._buffer)
258        self.assertEqual(tr._conn_lost, 1)
259
260    def test_loop_writing_force_close(self):
261        exc_handler = mock.Mock()
262        self.loop.set_exception_handler(exc_handler)
263        fut = self.loop.create_future()
264        fut.set_result(1)
265        self.proactor.send.return_value = fut
266
267        tr = self.socket_transport()
268        tr.write(b'data')
269        tr._force_close(None)
270        test_utils.run_briefly(self.loop)
271        exc_handler.assert_not_called()
272
273    def test_force_close_idempotent(self):
274        tr = self.socket_transport()
275        tr._closing = True
276        tr._force_close(None)
277        test_utils.run_briefly(self.loop)
278        self.assertFalse(self.protocol.connection_lost.called)
279
280    def test_fatal_error_2(self):
281        tr = self.socket_transport()
282        tr._buffer = [b'data']
283        tr._force_close(None)
284
285        test_utils.run_briefly(self.loop)
286        self.protocol.connection_lost.assert_called_with(None)
287        self.assertEqual(None, tr._buffer)
288
289    def test_call_connection_lost(self):
290        tr = self.socket_transport()
291        tr._call_connection_lost(None)
292        self.assertTrue(self.protocol.connection_lost.called)
293        self.assertTrue(self.sock.close.called)
294
295    def test_write_eof(self):
296        tr = self.socket_transport()
297        self.assertTrue(tr.can_write_eof())
298        tr.write_eof()
299        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
300        tr.write_eof()
301        self.assertEqual(self.sock.shutdown.call_count, 1)
302        tr.close()
303
304    def test_write_eof_buffer(self):
305        tr = self.socket_transport()
306        f = self.loop.create_future()
307        tr._loop._proactor.send.return_value = f
308        tr.write(b'data')
309        tr.write_eof()
310        self.assertTrue(tr._eof_written)
311        self.assertFalse(self.sock.shutdown.called)
312        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
313        f.set_result(4)
314        self.loop._run_once()
315        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
316        tr.close()
317
318    def test_write_eof_write_pipe(self):
319        tr = _ProactorWritePipeTransport(
320            self.loop, self.sock, self.protocol)
321        self.assertTrue(tr.can_write_eof())
322        tr.write_eof()
323        self.assertTrue(tr.is_closing())
324        self.loop._run_once()
325        self.assertTrue(self.sock.close.called)
326        tr.close()
327
328    def test_write_eof_buffer_write_pipe(self):
329        tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
330        f = self.loop.create_future()
331        tr._loop._proactor.send.return_value = f
332        tr.write(b'data')
333        tr.write_eof()
334        self.assertTrue(tr.is_closing())
335        self.assertFalse(self.sock.shutdown.called)
336        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
337        f.set_result(4)
338        self.loop._run_once()
339        self.loop._run_once()
340        self.assertTrue(self.sock.close.called)
341        tr.close()
342
343    def test_write_eof_duplex_pipe(self):
344        tr = _ProactorDuplexPipeTransport(
345            self.loop, self.sock, self.protocol)
346        self.assertFalse(tr.can_write_eof())
347        with self.assertRaises(NotImplementedError):
348            tr.write_eof()
349        close_transport(tr)
350
351    def test_pause_resume_reading(self):
352        tr = self.socket_transport()
353        futures = []
354        for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']:
355            f = self.loop.create_future()
356            f.set_result(msg)
357            futures.append(f)
358
359        self.loop._proactor.recv.side_effect = futures
360        self.loop._run_once()
361        self.assertFalse(tr._paused)
362        self.assertTrue(tr.is_reading())
363        self.loop._run_once()
364        self.protocol.data_received.assert_called_with(b'data1')
365        self.loop._run_once()
366        self.protocol.data_received.assert_called_with(b'data2')
367
368        tr.pause_reading()
369        tr.pause_reading()
370        self.assertTrue(tr._paused)
371        self.assertFalse(tr.is_reading())
372        for i in range(10):
373            self.loop._run_once()
374        self.protocol.data_received.assert_called_with(b'data2')
375
376        tr.resume_reading()
377        tr.resume_reading()
378        self.assertFalse(tr._paused)
379        self.assertTrue(tr.is_reading())
380        self.loop._run_once()
381        self.protocol.data_received.assert_called_with(b'data3')
382        self.loop._run_once()
383        self.protocol.data_received.assert_called_with(b'data4')
384
385        tr.pause_reading()
386        tr.resume_reading()
387        self.loop.call_exception_handler = mock.Mock()
388        self.loop._run_once()
389        self.loop.call_exception_handler.assert_not_called()
390        self.protocol.data_received.assert_called_with(b'data5')
391        tr.close()
392
393        self.assertFalse(tr.is_reading())
394
395
396    def pause_writing_transport(self, high):
397        tr = self.socket_transport()
398        tr.set_write_buffer_limits(high=high)
399
400        self.assertEqual(tr.get_write_buffer_size(), 0)
401        self.assertFalse(self.protocol.pause_writing.called)
402        self.assertFalse(self.protocol.resume_writing.called)
403        return tr
404
405    def test_pause_resume_writing(self):
406        tr = self.pause_writing_transport(high=4)
407
408        # write a large chunk, must pause writing
409        fut = self.loop.create_future()
410        self.loop._proactor.send.return_value = fut
411        tr.write(b'large data')
412        self.loop._run_once()
413        self.assertTrue(self.protocol.pause_writing.called)
414
415        # flush the buffer
416        fut.set_result(None)
417        self.loop._run_once()
418        self.assertEqual(tr.get_write_buffer_size(), 0)
419        self.assertTrue(self.protocol.resume_writing.called)
420
421    def test_pause_writing_2write(self):
422        tr = self.pause_writing_transport(high=4)
423
424        # first short write, the buffer is not full (3 <= 4)
425        fut1 = self.loop.create_future()
426        self.loop._proactor.send.return_value = fut1
427        tr.write(b'123')
428        self.loop._run_once()
429        self.assertEqual(tr.get_write_buffer_size(), 3)
430        self.assertFalse(self.protocol.pause_writing.called)
431
432        # fill the buffer, must pause writing (6 > 4)
433        tr.write(b'abc')
434        self.loop._run_once()
435        self.assertEqual(tr.get_write_buffer_size(), 6)
436        self.assertTrue(self.protocol.pause_writing.called)
437
438    def test_pause_writing_3write(self):
439        tr = self.pause_writing_transport(high=4)
440
441        # first short write, the buffer is not full (1 <= 4)
442        fut = self.loop.create_future()
443        self.loop._proactor.send.return_value = fut
444        tr.write(b'1')
445        self.loop._run_once()
446        self.assertEqual(tr.get_write_buffer_size(), 1)
447        self.assertFalse(self.protocol.pause_writing.called)
448
449        # second short write, the buffer is not full (3 <= 4)
450        tr.write(b'23')
451        self.loop._run_once()
452        self.assertEqual(tr.get_write_buffer_size(), 3)
453        self.assertFalse(self.protocol.pause_writing.called)
454
455        # fill the buffer, must pause writing (6 > 4)
456        tr.write(b'abc')
457        self.loop._run_once()
458        self.assertEqual(tr.get_write_buffer_size(), 6)
459        self.assertTrue(self.protocol.pause_writing.called)
460
461    def test_dont_pause_writing(self):
462        tr = self.pause_writing_transport(high=4)
463
464        # write a large chunk which completes immediately,
465        # it should not pause writing
466        fut = self.loop.create_future()
467        fut.set_result(None)
468        self.loop._proactor.send.return_value = fut
469        tr.write(b'very large data')
470        self.loop._run_once()
471        self.assertEqual(tr.get_write_buffer_size(), 0)
472        self.assertFalse(self.protocol.pause_writing.called)
473
474
475class ProactorDatagramTransportTests(test_utils.TestCase):
476
477    def setUp(self):
478        super().setUp()
479        self.loop = self.new_test_loop()
480        self.proactor = mock.Mock()
481        self.loop._proactor = self.proactor
482        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
483        self.sock = mock.Mock(spec_set=socket.socket)
484        self.sock.fileno.return_value = 7
485
486    def datagram_transport(self, address=None):
487        self.sock.getpeername.side_effect = None if address else OSError
488        transport = _ProactorDatagramTransport(self.loop, self.sock,
489                                               self.protocol,
490                                               address=address)
491        self.addCleanup(close_transport, transport)
492        return transport
493
494    def test_sendto(self):
495        data = b'data'
496        transport = self.datagram_transport()
497        transport.sendto(data, ('0.0.0.0', 1234))
498        self.assertTrue(self.proactor.sendto.called)
499        self.proactor.sendto.assert_called_with(
500            self.sock, data, addr=('0.0.0.0', 1234))
501
502    def test_sendto_bytearray(self):
503        data = bytearray(b'data')
504        transport = self.datagram_transport()
505        transport.sendto(data, ('0.0.0.0', 1234))
506        self.assertTrue(self.proactor.sendto.called)
507        self.proactor.sendto.assert_called_with(
508            self.sock, b'data', addr=('0.0.0.0', 1234))
509
510    def test_sendto_memoryview(self):
511        data = memoryview(b'data')
512        transport = self.datagram_transport()
513        transport.sendto(data, ('0.0.0.0', 1234))
514        self.assertTrue(self.proactor.sendto.called)
515        self.proactor.sendto.assert_called_with(
516            self.sock, b'data', addr=('0.0.0.0', 1234))
517
518    def test_sendto_no_data(self):
519        transport = self.datagram_transport()
520        transport._buffer.append((b'data', ('0.0.0.0', 12345)))
521        transport.sendto(b'', ())
522        self.assertFalse(self.sock.sendto.called)
523        self.assertEqual(
524            [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
525
526    def test_sendto_buffer(self):
527        transport = self.datagram_transport()
528        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
529        transport._write_fut = object()
530        transport.sendto(b'data2', ('0.0.0.0', 12345))
531        self.assertFalse(self.proactor.sendto.called)
532        self.assertEqual(
533            [(b'data1', ('0.0.0.0', 12345)),
534             (b'data2', ('0.0.0.0', 12345))],
535            list(transport._buffer))
536
537    def test_sendto_buffer_bytearray(self):
538        data2 = bytearray(b'data2')
539        transport = self.datagram_transport()
540        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
541        transport._write_fut = object()
542        transport.sendto(data2, ('0.0.0.0', 12345))
543        self.assertFalse(self.proactor.sendto.called)
544        self.assertEqual(
545            [(b'data1', ('0.0.0.0', 12345)),
546             (b'data2', ('0.0.0.0', 12345))],
547            list(transport._buffer))
548        self.assertIsInstance(transport._buffer[1][0], bytes)
549
550    def test_sendto_buffer_memoryview(self):
551        data2 = memoryview(b'data2')
552        transport = self.datagram_transport()
553        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
554        transport._write_fut = object()
555        transport.sendto(data2, ('0.0.0.0', 12345))
556        self.assertFalse(self.proactor.sendto.called)
557        self.assertEqual(
558            [(b'data1', ('0.0.0.0', 12345)),
559             (b'data2', ('0.0.0.0', 12345))],
560            list(transport._buffer))
561        self.assertIsInstance(transport._buffer[1][0], bytes)
562
563    @mock.patch('asyncio.proactor_events.logger')
564    def test_sendto_exception(self, m_log):
565        data = b'data'
566        err = self.proactor.sendto.side_effect = RuntimeError()
567
568        transport = self.datagram_transport()
569        transport._fatal_error = mock.Mock()
570        transport.sendto(data, ())
571
572        self.assertTrue(transport._fatal_error.called)
573        transport._fatal_error.assert_called_with(
574                                   err,
575                                   'Fatal write error on datagram transport')
576        transport._conn_lost = 1
577
578        transport._address = ('123',)
579        transport.sendto(data)
580        transport.sendto(data)
581        transport.sendto(data)
582        transport.sendto(data)
583        transport.sendto(data)
584        m_log.warning.assert_called_with('socket.sendto() raised exception.')
585
586    def test_sendto_error_received(self):
587        data = b'data'
588
589        self.sock.sendto.side_effect = ConnectionRefusedError
590
591        transport = self.datagram_transport()
592        transport._fatal_error = mock.Mock()
593        transport.sendto(data, ())
594
595        self.assertEqual(transport._conn_lost, 0)
596        self.assertFalse(transport._fatal_error.called)
597
598    def test_sendto_error_received_connected(self):
599        data = b'data'
600
601        self.proactor.send.side_effect = ConnectionRefusedError
602
603        transport = self.datagram_transport(address=('0.0.0.0', 1))
604        transport._fatal_error = mock.Mock()
605        transport.sendto(data)
606
607        self.assertFalse(transport._fatal_error.called)
608        self.assertTrue(self.protocol.error_received.called)
609
610    def test_sendto_str(self):
611        transport = self.datagram_transport()
612        self.assertRaises(TypeError, transport.sendto, 'str', ())
613
614    def test_sendto_connected_addr(self):
615        transport = self.datagram_transport(address=('0.0.0.0', 1))
616        self.assertRaises(
617            ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
618
619    def test_sendto_closing(self):
620        transport = self.datagram_transport(address=(1,))
621        transport.close()
622        self.assertEqual(transport._conn_lost, 1)
623        transport.sendto(b'data', (1,))
624        self.assertEqual(transport._conn_lost, 2)
625
626    def test__loop_writing_closing(self):
627        transport = self.datagram_transport()
628        transport._closing = True
629        transport._loop_writing()
630        self.assertIsNone(transport._write_fut)
631        test_utils.run_briefly(self.loop)
632        self.sock.close.assert_called_with()
633        self.protocol.connection_lost.assert_called_with(None)
634
635    def test__loop_writing_exception(self):
636        err = self.proactor.sendto.side_effect = RuntimeError()
637
638        transport = self.datagram_transport()
639        transport._fatal_error = mock.Mock()
640        transport._buffer.append((b'data', ()))
641        transport._loop_writing()
642
643        transport._fatal_error.assert_called_with(
644                                   err,
645                                   'Fatal write error on datagram transport')
646
647    def test__loop_writing_error_received(self):
648        self.proactor.sendto.side_effect = ConnectionRefusedError
649
650        transport = self.datagram_transport()
651        transport._fatal_error = mock.Mock()
652        transport._buffer.append((b'data', ()))
653        transport._loop_writing()
654
655        self.assertFalse(transport._fatal_error.called)
656
657    def test__loop_writing_error_received_connection(self):
658        self.proactor.send.side_effect = ConnectionRefusedError
659
660        transport = self.datagram_transport(address=('0.0.0.0', 1))
661        transport._fatal_error = mock.Mock()
662        transport._buffer.append((b'data', ()))
663        transport._loop_writing()
664
665        self.assertFalse(transport._fatal_error.called)
666        self.assertTrue(self.protocol.error_received.called)
667
668    @mock.patch('asyncio.base_events.logger.error')
669    def test_fatal_error_connected(self, m_exc):
670        transport = self.datagram_transport(address=('0.0.0.0', 1))
671        err = ConnectionRefusedError()
672        transport._fatal_error(err)
673        self.assertFalse(self.protocol.error_received.called)
674        m_exc.assert_not_called()
675
676
677class BaseProactorEventLoopTests(test_utils.TestCase):
678
679    def setUp(self):
680        super().setUp()
681
682        self.sock = test_utils.mock_nonblocking_socket()
683        self.proactor = mock.Mock()
684
685        self.ssock, self.csock = mock.Mock(), mock.Mock()
686
687        with mock.patch('asyncio.proactor_events.socket.socketpair',
688                        return_value=(self.ssock, self.csock)):
689            with mock.patch('signal.set_wakeup_fd'):
690                self.loop = BaseProactorEventLoop(self.proactor)
691        self.set_event_loop(self.loop)
692
693    @mock.patch('asyncio.proactor_events.socket.socketpair')
694    def test_ctor(self, socketpair):
695        ssock, csock = socketpair.return_value = (
696            mock.Mock(), mock.Mock())
697        with mock.patch('signal.set_wakeup_fd'):
698            loop = BaseProactorEventLoop(self.proactor)
699        self.assertIs(loop._ssock, ssock)
700        self.assertIs(loop._csock, csock)
701        self.assertEqual(loop._internal_fds, 1)
702        loop.close()
703
704    def test_close_self_pipe(self):
705        self.loop._close_self_pipe()
706        self.assertEqual(self.loop._internal_fds, 0)
707        self.assertTrue(self.ssock.close.called)
708        self.assertTrue(self.csock.close.called)
709        self.assertIsNone(self.loop._ssock)
710        self.assertIsNone(self.loop._csock)
711
712        # Don't call close(): _close_self_pipe() cannot be called twice
713        self.loop._closed = True
714
715    def test_close(self):
716        self.loop._close_self_pipe = mock.Mock()
717        self.loop.close()
718        self.assertTrue(self.loop._close_self_pipe.called)
719        self.assertTrue(self.proactor.close.called)
720        self.assertIsNone(self.loop._proactor)
721
722        self.loop._close_self_pipe.reset_mock()
723        self.loop.close()
724        self.assertFalse(self.loop._close_self_pipe.called)
725
726    def test_make_socket_transport(self):
727        tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
728        self.assertIsInstance(tr, _ProactorSocketTransport)
729        close_transport(tr)
730
731    def test_loop_self_reading(self):
732        self.loop._loop_self_reading()
733        self.proactor.recv.assert_called_with(self.ssock, 4096)
734        self.proactor.recv.return_value.add_done_callback.assert_called_with(
735            self.loop._loop_self_reading)
736
737    def test_loop_self_reading_fut(self):
738        fut = mock.Mock()
739        self.loop._loop_self_reading(fut)
740        self.assertTrue(fut.result.called)
741        self.proactor.recv.assert_called_with(self.ssock, 4096)
742        self.proactor.recv.return_value.add_done_callback.assert_called_with(
743            self.loop._loop_self_reading)
744
745    def test_loop_self_reading_exception(self):
746        self.loop.call_exception_handler = mock.Mock()
747        self.proactor.recv.side_effect = OSError()
748        self.loop._loop_self_reading()
749        self.assertTrue(self.loop.call_exception_handler.called)
750
751    def test_write_to_self(self):
752        self.loop._write_to_self()
753        self.csock.send.assert_called_with(b'\0')
754
755    def test_process_events(self):
756        self.loop._process_events([])
757
758    @mock.patch('asyncio.base_events.logger')
759    def test_create_server(self, m_log):
760        pf = mock.Mock()
761        call_soon = self.loop.call_soon = mock.Mock()
762
763        self.loop._start_serving(pf, self.sock)
764        self.assertTrue(call_soon.called)
765
766        # callback
767        loop = call_soon.call_args[0][0]
768        loop()
769        self.proactor.accept.assert_called_with(self.sock)
770
771        # conn
772        fut = mock.Mock()
773        fut.result.return_value = (mock.Mock(), mock.Mock())
774
775        make_tr = self.loop._make_socket_transport = mock.Mock()
776        loop(fut)
777        self.assertTrue(fut.result.called)
778        self.assertTrue(make_tr.called)
779
780        # exception
781        fut.result.side_effect = OSError()
782        loop(fut)
783        self.assertTrue(self.sock.close.called)
784        self.assertTrue(m_log.error.called)
785
786    def test_create_server_cancel(self):
787        pf = mock.Mock()
788        call_soon = self.loop.call_soon = mock.Mock()
789
790        self.loop._start_serving(pf, self.sock)
791        loop = call_soon.call_args[0][0]
792
793        # cancelled
794        fut = self.loop.create_future()
795        fut.cancel()
796        loop(fut)
797        self.assertTrue(self.sock.close.called)
798
799    def test_stop_serving(self):
800        sock1 = mock.Mock()
801        future1 = mock.Mock()
802        sock2 = mock.Mock()
803        future2 = mock.Mock()
804        self.loop._accept_futures = {
805            sock1.fileno(): future1,
806            sock2.fileno(): future2
807        }
808
809        self.loop._stop_serving(sock1)
810        self.assertTrue(sock1.close.called)
811        self.assertTrue(future1.cancel.called)
812        self.proactor._stop_serving.assert_called_with(sock1)
813        self.assertFalse(sock2.close.called)
814        self.assertFalse(future2.cancel.called)
815
816    def datagram_transport(self):
817        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
818        return self.loop._make_datagram_transport(self.sock, self.protocol)
819
820    def test_make_datagram_transport(self):
821        tr = self.datagram_transport()
822        self.assertIsInstance(tr, _ProactorDatagramTransport)
823        close_transport(tr)
824
825    def test_datagram_loop_writing(self):
826        tr = self.datagram_transport()
827        tr._buffer.appendleft((b'data', ('127.0.0.1', 12068)))
828        tr._loop_writing()
829        self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068))
830        self.loop._proactor.sendto.return_value.add_done_callback.\
831            assert_called_with(tr._loop_writing)
832
833        close_transport(tr)
834
835    def test_datagram_loop_reading(self):
836        tr = self.datagram_transport()
837        tr._loop_reading()
838        self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
839        self.assertFalse(self.protocol.datagram_received.called)
840        self.assertFalse(self.protocol.error_received.called)
841        close_transport(tr)
842
843    def test_datagram_loop_reading_data(self):
844        res = self.loop.create_future()
845        res.set_result((b'data', ('127.0.0.1', 12068)))
846
847        tr = self.datagram_transport()
848        tr._read_fut = res
849        tr._loop_reading(res)
850        self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
851        self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068))
852        close_transport(tr)
853
854    def test_datagram_loop_reading_no_data(self):
855        res = self.loop.create_future()
856        res.set_result((b'', ('127.0.0.1', 12068)))
857
858        tr = self.datagram_transport()
859        self.assertRaises(AssertionError, tr._loop_reading, res)
860
861        tr.close = mock.Mock()
862        tr._read_fut = res
863        tr._loop_reading(res)
864        self.assertTrue(self.loop._proactor.recvfrom.called)
865        self.assertFalse(self.protocol.error_received.called)
866        self.assertFalse(tr.close.called)
867        close_transport(tr)
868
869    def test_datagram_loop_reading_aborted(self):
870        err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError()
871
872        tr = self.datagram_transport()
873        tr._fatal_error = mock.Mock()
874        tr._protocol.error_received = mock.Mock()
875        tr._loop_reading()
876        tr._protocol.error_received.assert_called_with(err)
877        close_transport(tr)
878
879    def test_datagram_loop_writing_aborted(self):
880        err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError()
881
882        tr = self.datagram_transport()
883        tr._fatal_error = mock.Mock()
884        tr._protocol.error_received = mock.Mock()
885        tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068)))
886        tr._loop_writing()
887        tr._protocol.error_received.assert_called_with(err)
888        close_transport(tr)
889
890
891@unittest.skipIf(sys.platform != 'win32',
892                 'Proactor is supported on Windows only')
893class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
894    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
895
896    class MyProto(asyncio.Protocol):
897
898        def __init__(self, loop):
899            self.started = False
900            self.closed = False
901            self.data = bytearray()
902            self.fut = loop.create_future()
903            self.transport = None
904
905        def connection_made(self, transport):
906            self.started = True
907            self.transport = transport
908
909        def data_received(self, data):
910            self.data.extend(data)
911
912        def connection_lost(self, exc):
913            self.closed = True
914            self.fut.set_result(None)
915
916        async def wait_closed(self):
917            await self.fut
918
919    @classmethod
920    def setUpClass(cls):
921        with open(support.TESTFN, 'wb') as fp:
922            fp.write(cls.DATA)
923        super().setUpClass()
924
925    @classmethod
926    def tearDownClass(cls):
927        support.unlink(support.TESTFN)
928        super().tearDownClass()
929
930    def setUp(self):
931        self.loop = asyncio.ProactorEventLoop()
932        self.set_event_loop(self.loop)
933        self.addCleanup(self.loop.close)
934        self.file = open(support.TESTFN, 'rb')
935        self.addCleanup(self.file.close)
936        super().setUp()
937
938    def make_socket(self, cleanup=True):
939        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
940        sock.setblocking(False)
941        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
942        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
943        if cleanup:
944            self.addCleanup(sock.close)
945        return sock
946
947    def run_loop(self, coro):
948        return self.loop.run_until_complete(coro)
949
950    def prepare(self):
951        sock = self.make_socket()
952        proto = self.MyProto(self.loop)
953        port = support.find_unused_port()
954        srv_sock = self.make_socket(cleanup=False)
955        srv_sock.bind(('127.0.0.1', port))
956        server = self.run_loop(self.loop.create_server(
957            lambda: proto, sock=srv_sock))
958        self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
959
960        def cleanup():
961            if proto.transport is not None:
962                # can be None if the task was cancelled before
963                # connection_made callback
964                proto.transport.close()
965                self.run_loop(proto.wait_closed())
966
967            server.close()
968            self.run_loop(server.wait_closed())
969
970        self.addCleanup(cleanup)
971
972        return sock, proto
973
974    def test_sock_sendfile_not_a_file(self):
975        sock, proto = self.prepare()
976        f = object()
977        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
978                                    "not a regular file"):
979            self.run_loop(self.loop._sock_sendfile_native(sock, f,
980                                                          0, None))
981        self.assertEqual(self.file.tell(), 0)
982
983    def test_sock_sendfile_iobuffer(self):
984        sock, proto = self.prepare()
985        f = io.BytesIO()
986        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
987                                    "not a regular file"):
988            self.run_loop(self.loop._sock_sendfile_native(sock, f,
989                                                          0, None))
990        self.assertEqual(self.file.tell(), 0)
991
992    def test_sock_sendfile_not_regular_file(self):
993        sock, proto = self.prepare()
994        f = mock.Mock()
995        f.fileno.return_value = -1
996        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
997                                    "not a regular file"):
998            self.run_loop(self.loop._sock_sendfile_native(sock, f,
999                                                          0, None))
1000        self.assertEqual(self.file.tell(), 0)
1001
1002
1003if __name__ == '__main__':
1004    unittest.main()
1005