• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for proactor_events.py"""
2
3import io
4import socket
5import unittest
6import sys
7from unittest import mock
8
9import asyncio
10from asyncio.proactor_events import BaseProactorEventLoop
11from asyncio.proactor_events import _ProactorSocketTransport
12from asyncio.proactor_events import _ProactorWritePipeTransport
13from asyncio.proactor_events import _ProactorDuplexPipeTransport
14from asyncio.proactor_events import _ProactorDatagramTransport
15from test.support import os_helper
16from test.support import socket_helper
17from test.test_asyncio import utils as test_utils
18
19
20def tearDownModule():
21    asyncio.set_event_loop_policy(None)
22
23
24def close_transport(transport):
25    # Don't call transport.close() because the event loop and the IOCP proactor
26    # are mocked
27    if transport._sock is None:
28        return
29    transport._sock.close()
30    transport._sock = None
31
32
33class ProactorSocketTransportTests(test_utils.TestCase):
34
35    def setUp(self):
36        super().setUp()
37        self.loop = self.new_test_loop()
38        self.addCleanup(self.loop.close)
39        self.proactor = mock.Mock()
40        self.loop._proactor = self.proactor
41        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
42        self.sock = mock.Mock(socket.socket)
43        self.buffer_size = 65536
44
45    def socket_transport(self, waiter=None):
46        transport = _ProactorSocketTransport(self.loop, self.sock,
47                                             self.protocol, waiter=waiter)
48        self.addCleanup(close_transport, transport)
49        return transport
50
51    def test_ctor(self):
52        fut = self.loop.create_future()
53        tr = self.socket_transport(waiter=fut)
54        test_utils.run_briefly(self.loop)
55        self.assertIsNone(fut.result())
56        self.protocol.connection_made(tr)
57        self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
58
59    def test_loop_reading(self):
60        tr = self.socket_transport()
61        tr._loop_reading()
62        self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
63        self.assertFalse(self.protocol.data_received.called)
64        self.assertFalse(self.protocol.eof_received.called)
65
66    def test_loop_reading_data(self):
67        buf = b'data'
68        res = self.loop.create_future()
69        res.set_result(len(buf))
70
71        tr = self.socket_transport()
72        tr._read_fut = res
73        tr._data[:len(buf)] = buf
74        tr._loop_reading(res)
75        called_buf = bytearray(self.buffer_size)
76        called_buf[:len(buf)] = buf
77        self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf)
78        self.protocol.data_received.assert_called_with(bytearray(buf))
79
80    def test_loop_reading_no_data(self):
81        res = self.loop.create_future()
82        res.set_result(0)
83
84        tr = self.socket_transport()
85        self.assertRaises(AssertionError, tr._loop_reading, res)
86
87        tr.close = mock.Mock()
88        tr._read_fut = res
89        tr._loop_reading(res)
90        self.assertFalse(self.loop._proactor.recv_into.called)
91        self.assertTrue(self.protocol.eof_received.called)
92        self.assertTrue(tr.close.called)
93
94    def test_loop_reading_aborted(self):
95        err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
96
97        tr = self.socket_transport()
98        tr._fatal_error = mock.Mock()
99        tr._loop_reading()
100        tr._fatal_error.assert_called_with(
101                            err,
102                            'Fatal read error on pipe transport')
103
104    def test_loop_reading_aborted_closing(self):
105        self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
106
107        tr = self.socket_transport()
108        tr._closing = True
109        tr._fatal_error = mock.Mock()
110        tr._loop_reading()
111        self.assertFalse(tr._fatal_error.called)
112
113    def test_loop_reading_aborted_is_fatal(self):
114        self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
115        tr = self.socket_transport()
116        tr._closing = False
117        tr._fatal_error = mock.Mock()
118        tr._loop_reading()
119        self.assertTrue(tr._fatal_error.called)
120
121    def test_loop_reading_conn_reset_lost(self):
122        err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
123
124        tr = self.socket_transport()
125        tr._closing = False
126        tr._fatal_error = mock.Mock()
127        tr._force_close = mock.Mock()
128        tr._loop_reading()
129        self.assertFalse(tr._fatal_error.called)
130        tr._force_close.assert_called_with(err)
131
132    def test_loop_reading_exception(self):
133        err = self.loop._proactor.recv_into.side_effect = (OSError())
134
135        tr = self.socket_transport()
136        tr._fatal_error = mock.Mock()
137        tr._loop_reading()
138        tr._fatal_error.assert_called_with(
139                            err,
140                            'Fatal read error on pipe transport')
141
142    def test_write(self):
143        tr = self.socket_transport()
144        tr._loop_writing = mock.Mock()
145        tr.write(b'data')
146        self.assertEqual(tr._buffer, None)
147        tr._loop_writing.assert_called_with(data=b'data')
148
149    def test_write_no_data(self):
150        tr = self.socket_transport()
151        tr.write(b'')
152        self.assertFalse(tr._buffer)
153
154    def test_write_more(self):
155        tr = self.socket_transport()
156        tr._write_fut = mock.Mock()
157        tr._loop_writing = mock.Mock()
158        tr.write(b'data')
159        self.assertEqual(tr._buffer, b'data')
160        self.assertFalse(tr._loop_writing.called)
161
162    def test_loop_writing(self):
163        tr = self.socket_transport()
164        tr._buffer = bytearray(b'data')
165        tr._loop_writing()
166        self.loop._proactor.send.assert_called_with(self.sock, b'data')
167        self.loop._proactor.send.return_value.add_done_callback.\
168            assert_called_with(tr._loop_writing)
169
170    @mock.patch('asyncio.proactor_events.logger')
171    def test_loop_writing_err(self, m_log):
172        err = self.loop._proactor.send.side_effect = OSError()
173        tr = self.socket_transport()
174        tr._fatal_error = mock.Mock()
175        tr._buffer = [b'da', b'ta']
176        tr._loop_writing()
177        tr._fatal_error.assert_called_with(
178                            err,
179                            'Fatal write error on pipe transport')
180        tr._conn_lost = 1
181
182        tr.write(b'data')
183        tr.write(b'data')
184        tr.write(b'data')
185        tr.write(b'data')
186        tr.write(b'data')
187        self.assertEqual(tr._buffer, None)
188        m_log.warning.assert_called_with('socket.send() raised exception.')
189
190    def test_loop_writing_stop(self):
191        fut = self.loop.create_future()
192        fut.set_result(b'data')
193
194        tr = self.socket_transport()
195        tr._write_fut = fut
196        tr._loop_writing(fut)
197        self.assertIsNone(tr._write_fut)
198
199    def test_loop_writing_closing(self):
200        fut = self.loop.create_future()
201        fut.set_result(1)
202
203        tr = self.socket_transport()
204        tr._write_fut = fut
205        tr.close()
206        tr._loop_writing(fut)
207        self.assertIsNone(tr._write_fut)
208        test_utils.run_briefly(self.loop)
209        self.protocol.connection_lost.assert_called_with(None)
210
211    def test_abort(self):
212        tr = self.socket_transport()
213        tr._force_close = mock.Mock()
214        tr.abort()
215        tr._force_close.assert_called_with(None)
216
217    def test_close(self):
218        tr = self.socket_transport()
219        tr.close()
220        test_utils.run_briefly(self.loop)
221        self.protocol.connection_lost.assert_called_with(None)
222        self.assertTrue(tr.is_closing())
223        self.assertEqual(tr._conn_lost, 1)
224
225        self.protocol.connection_lost.reset_mock()
226        tr.close()
227        test_utils.run_briefly(self.loop)
228        self.assertFalse(self.protocol.connection_lost.called)
229
230    def test_close_write_fut(self):
231        tr = self.socket_transport()
232        tr._write_fut = mock.Mock()
233        tr.close()
234        test_utils.run_briefly(self.loop)
235        self.assertFalse(self.protocol.connection_lost.called)
236
237    def test_close_buffer(self):
238        tr = self.socket_transport()
239        tr._buffer = [b'data']
240        tr.close()
241        test_utils.run_briefly(self.loop)
242        self.assertFalse(self.protocol.connection_lost.called)
243
244    @mock.patch('asyncio.base_events.logger')
245    def test_fatal_error(self, m_logging):
246        tr = self.socket_transport()
247        tr._force_close = mock.Mock()
248        tr._fatal_error(None)
249        self.assertTrue(tr._force_close.called)
250        self.assertTrue(m_logging.error.called)
251
252    def test_force_close(self):
253        tr = self.socket_transport()
254        tr._buffer = [b'data']
255        read_fut = tr._read_fut = mock.Mock()
256        write_fut = tr._write_fut = mock.Mock()
257        tr._force_close(None)
258
259        read_fut.cancel.assert_called_with()
260        write_fut.cancel.assert_called_with()
261        test_utils.run_briefly(self.loop)
262        self.protocol.connection_lost.assert_called_with(None)
263        self.assertEqual(None, tr._buffer)
264        self.assertEqual(tr._conn_lost, 1)
265
266    def test_loop_writing_force_close(self):
267        exc_handler = mock.Mock()
268        self.loop.set_exception_handler(exc_handler)
269        fut = self.loop.create_future()
270        fut.set_result(1)
271        self.proactor.send.return_value = fut
272
273        tr = self.socket_transport()
274        tr.write(b'data')
275        tr._force_close(None)
276        test_utils.run_briefly(self.loop)
277        exc_handler.assert_not_called()
278
279    def test_force_close_idempotent(self):
280        tr = self.socket_transport()
281        tr._closing = True
282        tr._force_close(None)
283        test_utils.run_briefly(self.loop)
284        self.assertFalse(self.protocol.connection_lost.called)
285
286    def test_fatal_error_2(self):
287        tr = self.socket_transport()
288        tr._buffer = [b'data']
289        tr._force_close(None)
290
291        test_utils.run_briefly(self.loop)
292        self.protocol.connection_lost.assert_called_with(None)
293        self.assertEqual(None, tr._buffer)
294
295    def test_call_connection_lost(self):
296        tr = self.socket_transport()
297        tr._call_connection_lost(None)
298        self.assertTrue(self.protocol.connection_lost.called)
299        self.assertTrue(self.sock.close.called)
300
301    def test_write_eof(self):
302        tr = self.socket_transport()
303        self.assertTrue(tr.can_write_eof())
304        tr.write_eof()
305        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
306        tr.write_eof()
307        self.assertEqual(self.sock.shutdown.call_count, 1)
308        tr.close()
309
310    def test_write_eof_buffer(self):
311        tr = self.socket_transport()
312        f = self.loop.create_future()
313        tr._loop._proactor.send.return_value = f
314        tr.write(b'data')
315        tr.write_eof()
316        self.assertTrue(tr._eof_written)
317        self.assertFalse(self.sock.shutdown.called)
318        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
319        f.set_result(4)
320        self.loop._run_once()
321        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
322        tr.close()
323
324    def test_write_eof_write_pipe(self):
325        tr = _ProactorWritePipeTransport(
326            self.loop, self.sock, self.protocol)
327        self.assertTrue(tr.can_write_eof())
328        tr.write_eof()
329        self.assertTrue(tr.is_closing())
330        self.loop._run_once()
331        self.assertTrue(self.sock.close.called)
332        tr.close()
333
334    def test_write_eof_buffer_write_pipe(self):
335        tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
336        f = self.loop.create_future()
337        tr._loop._proactor.send.return_value = f
338        tr.write(b'data')
339        tr.write_eof()
340        self.assertTrue(tr.is_closing())
341        self.assertFalse(self.sock.shutdown.called)
342        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
343        f.set_result(4)
344        self.loop._run_once()
345        self.loop._run_once()
346        self.assertTrue(self.sock.close.called)
347        tr.close()
348
349    def test_write_eof_duplex_pipe(self):
350        tr = _ProactorDuplexPipeTransport(
351            self.loop, self.sock, self.protocol)
352        self.assertFalse(tr.can_write_eof())
353        with self.assertRaises(NotImplementedError):
354            tr.write_eof()
355        close_transport(tr)
356
357    def test_pause_resume_reading(self):
358        tr = self.socket_transport()
359        index = 0
360        msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b'']
361        reversed_msgs = list(reversed(msgs))
362
363        def recv_into(sock, data):
364            f = self.loop.create_future()
365            msg = reversed_msgs.pop()
366
367            result = f.result
368            def monkey():
369                data[:len(msg)] = msg
370                return result()
371            f.result = monkey
372
373            f.set_result(len(msg))
374            return f
375
376        self.loop._proactor.recv_into.side_effect = recv_into
377        self.loop._run_once()
378        self.assertFalse(tr._paused)
379        self.assertTrue(tr.is_reading())
380
381        for msg in msgs[:2]:
382            self.loop._run_once()
383            self.protocol.data_received.assert_called_with(bytearray(msg))
384
385        tr.pause_reading()
386        tr.pause_reading()
387        self.assertTrue(tr._paused)
388        self.assertFalse(tr.is_reading())
389        for i in range(10):
390            self.loop._run_once()
391        self.protocol.data_received.assert_called_with(bytearray(msgs[1]))
392
393        tr.resume_reading()
394        tr.resume_reading()
395        self.assertFalse(tr._paused)
396        self.assertTrue(tr.is_reading())
397
398        for msg in msgs[2:4]:
399            self.loop._run_once()
400            self.protocol.data_received.assert_called_with(bytearray(msg))
401
402        tr.pause_reading()
403        tr.resume_reading()
404        self.loop.call_exception_handler = mock.Mock()
405        self.loop._run_once()
406        self.loop.call_exception_handler.assert_not_called()
407        self.protocol.data_received.assert_called_with(bytearray(msgs[4]))
408        tr.close()
409
410        self.assertFalse(tr.is_reading())
411
412
413    def pause_writing_transport(self, high):
414        tr = self.socket_transport()
415        tr.set_write_buffer_limits(high=high)
416
417        self.assertEqual(tr.get_write_buffer_size(), 0)
418        self.assertFalse(self.protocol.pause_writing.called)
419        self.assertFalse(self.protocol.resume_writing.called)
420        return tr
421
422    def test_pause_resume_writing(self):
423        tr = self.pause_writing_transport(high=4)
424
425        # write a large chunk, must pause writing
426        fut = self.loop.create_future()
427        self.loop._proactor.send.return_value = fut
428        tr.write(b'large data')
429        self.loop._run_once()
430        self.assertTrue(self.protocol.pause_writing.called)
431
432        # flush the buffer
433        fut.set_result(None)
434        self.loop._run_once()
435        self.assertEqual(tr.get_write_buffer_size(), 0)
436        self.assertTrue(self.protocol.resume_writing.called)
437
438    def test_pause_writing_2write(self):
439        tr = self.pause_writing_transport(high=4)
440
441        # first short write, the buffer is not full (3 <= 4)
442        fut1 = self.loop.create_future()
443        self.loop._proactor.send.return_value = fut1
444        tr.write(b'123')
445        self.loop._run_once()
446        self.assertEqual(tr.get_write_buffer_size(), 3)
447        self.assertFalse(self.protocol.pause_writing.called)
448
449        # fill the buffer, must pause writing (6 > 4)
450        tr.write(b'abc')
451        self.loop._run_once()
452        self.assertEqual(tr.get_write_buffer_size(), 6)
453        self.assertTrue(self.protocol.pause_writing.called)
454
455    def test_pause_writing_3write(self):
456        tr = self.pause_writing_transport(high=4)
457
458        # first short write, the buffer is not full (1 <= 4)
459        fut = self.loop.create_future()
460        self.loop._proactor.send.return_value = fut
461        tr.write(b'1')
462        self.loop._run_once()
463        self.assertEqual(tr.get_write_buffer_size(), 1)
464        self.assertFalse(self.protocol.pause_writing.called)
465
466        # second short write, the buffer is not full (3 <= 4)
467        tr.write(b'23')
468        self.loop._run_once()
469        self.assertEqual(tr.get_write_buffer_size(), 3)
470        self.assertFalse(self.protocol.pause_writing.called)
471
472        # fill the buffer, must pause writing (6 > 4)
473        tr.write(b'abc')
474        self.loop._run_once()
475        self.assertEqual(tr.get_write_buffer_size(), 6)
476        self.assertTrue(self.protocol.pause_writing.called)
477
478    def test_dont_pause_writing(self):
479        tr = self.pause_writing_transport(high=4)
480
481        # write a large chunk which completes immediately,
482        # it should not pause writing
483        fut = self.loop.create_future()
484        fut.set_result(None)
485        self.loop._proactor.send.return_value = fut
486        tr.write(b'very large data')
487        self.loop._run_once()
488        self.assertEqual(tr.get_write_buffer_size(), 0)
489        self.assertFalse(self.protocol.pause_writing.called)
490
491
492class ProactorDatagramTransportTests(test_utils.TestCase):
493
494    def setUp(self):
495        super().setUp()
496        self.loop = self.new_test_loop()
497        self.proactor = mock.Mock()
498        self.loop._proactor = self.proactor
499        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
500        self.sock = mock.Mock(spec_set=socket.socket)
501        self.sock.fileno.return_value = 7
502
503    def datagram_transport(self, address=None):
504        self.sock.getpeername.side_effect = None if address else OSError
505        transport = _ProactorDatagramTransport(self.loop, self.sock,
506                                               self.protocol,
507                                               address=address)
508        self.addCleanup(close_transport, transport)
509        return transport
510
511    def test_sendto(self):
512        data = b'data'
513        transport = self.datagram_transport()
514        transport.sendto(data, ('0.0.0.0', 1234))
515        self.assertTrue(self.proactor.sendto.called)
516        self.proactor.sendto.assert_called_with(
517            self.sock, data, addr=('0.0.0.0', 1234))
518
519    def test_sendto_bytearray(self):
520        data = bytearray(b'data')
521        transport = self.datagram_transport()
522        transport.sendto(data, ('0.0.0.0', 1234))
523        self.assertTrue(self.proactor.sendto.called)
524        self.proactor.sendto.assert_called_with(
525            self.sock, b'data', addr=('0.0.0.0', 1234))
526
527    def test_sendto_memoryview(self):
528        data = memoryview(b'data')
529        transport = self.datagram_transport()
530        transport.sendto(data, ('0.0.0.0', 1234))
531        self.assertTrue(self.proactor.sendto.called)
532        self.proactor.sendto.assert_called_with(
533            self.sock, b'data', addr=('0.0.0.0', 1234))
534
535    def test_sendto_no_data(self):
536        transport = self.datagram_transport()
537        transport._buffer.append((b'data', ('0.0.0.0', 12345)))
538        transport.sendto(b'', ())
539        self.assertFalse(self.sock.sendto.called)
540        self.assertEqual(
541            [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
542
543    def test_sendto_buffer(self):
544        transport = self.datagram_transport()
545        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
546        transport._write_fut = object()
547        transport.sendto(b'data2', ('0.0.0.0', 12345))
548        self.assertFalse(self.proactor.sendto.called)
549        self.assertEqual(
550            [(b'data1', ('0.0.0.0', 12345)),
551             (b'data2', ('0.0.0.0', 12345))],
552            list(transport._buffer))
553
554    def test_sendto_buffer_bytearray(self):
555        data2 = bytearray(b'data2')
556        transport = self.datagram_transport()
557        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
558        transport._write_fut = object()
559        transport.sendto(data2, ('0.0.0.0', 12345))
560        self.assertFalse(self.proactor.sendto.called)
561        self.assertEqual(
562            [(b'data1', ('0.0.0.0', 12345)),
563             (b'data2', ('0.0.0.0', 12345))],
564            list(transport._buffer))
565        self.assertIsInstance(transport._buffer[1][0], bytes)
566
567    def test_sendto_buffer_memoryview(self):
568        data2 = memoryview(b'data2')
569        transport = self.datagram_transport()
570        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
571        transport._write_fut = object()
572        transport.sendto(data2, ('0.0.0.0', 12345))
573        self.assertFalse(self.proactor.sendto.called)
574        self.assertEqual(
575            [(b'data1', ('0.0.0.0', 12345)),
576             (b'data2', ('0.0.0.0', 12345))],
577            list(transport._buffer))
578        self.assertIsInstance(transport._buffer[1][0], bytes)
579
580    @mock.patch('asyncio.proactor_events.logger')
581    def test_sendto_exception(self, m_log):
582        data = b'data'
583        err = self.proactor.sendto.side_effect = RuntimeError()
584
585        transport = self.datagram_transport()
586        transport._fatal_error = mock.Mock()
587        transport.sendto(data, ())
588
589        self.assertTrue(transport._fatal_error.called)
590        transport._fatal_error.assert_called_with(
591                                   err,
592                                   'Fatal write error on datagram transport')
593        transport._conn_lost = 1
594
595        transport._address = ('123',)
596        transport.sendto(data)
597        transport.sendto(data)
598        transport.sendto(data)
599        transport.sendto(data)
600        transport.sendto(data)
601        m_log.warning.assert_called_with('socket.sendto() raised exception.')
602
603    def test_sendto_error_received(self):
604        data = b'data'
605
606        self.sock.sendto.side_effect = ConnectionRefusedError
607
608        transport = self.datagram_transport()
609        transport._fatal_error = mock.Mock()
610        transport.sendto(data, ())
611
612        self.assertEqual(transport._conn_lost, 0)
613        self.assertFalse(transport._fatal_error.called)
614
615    def test_sendto_error_received_connected(self):
616        data = b'data'
617
618        self.proactor.send.side_effect = ConnectionRefusedError
619
620        transport = self.datagram_transport(address=('0.0.0.0', 1))
621        transport._fatal_error = mock.Mock()
622        transport.sendto(data)
623
624        self.assertFalse(transport._fatal_error.called)
625        self.assertTrue(self.protocol.error_received.called)
626
627    def test_sendto_str(self):
628        transport = self.datagram_transport()
629        self.assertRaises(TypeError, transport.sendto, 'str', ())
630
631    def test_sendto_connected_addr(self):
632        transport = self.datagram_transport(address=('0.0.0.0', 1))
633        self.assertRaises(
634            ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
635
636    def test_sendto_closing(self):
637        transport = self.datagram_transport(address=(1,))
638        transport.close()
639        self.assertEqual(transport._conn_lost, 1)
640        transport.sendto(b'data', (1,))
641        self.assertEqual(transport._conn_lost, 2)
642
643    def test__loop_writing_closing(self):
644        transport = self.datagram_transport()
645        transport._closing = True
646        transport._loop_writing()
647        self.assertIsNone(transport._write_fut)
648        test_utils.run_briefly(self.loop)
649        self.sock.close.assert_called_with()
650        self.protocol.connection_lost.assert_called_with(None)
651
652    def test__loop_writing_exception(self):
653        err = self.proactor.sendto.side_effect = RuntimeError()
654
655        transport = self.datagram_transport()
656        transport._fatal_error = mock.Mock()
657        transport._buffer.append((b'data', ()))
658        transport._loop_writing()
659
660        transport._fatal_error.assert_called_with(
661                                   err,
662                                   'Fatal write error on datagram transport')
663
664    def test__loop_writing_error_received(self):
665        self.proactor.sendto.side_effect = ConnectionRefusedError
666
667        transport = self.datagram_transport()
668        transport._fatal_error = mock.Mock()
669        transport._buffer.append((b'data', ()))
670        transport._loop_writing()
671
672        self.assertFalse(transport._fatal_error.called)
673
674    def test__loop_writing_error_received_connection(self):
675        self.proactor.send.side_effect = ConnectionRefusedError
676
677        transport = self.datagram_transport(address=('0.0.0.0', 1))
678        transport._fatal_error = mock.Mock()
679        transport._buffer.append((b'data', ()))
680        transport._loop_writing()
681
682        self.assertFalse(transport._fatal_error.called)
683        self.assertTrue(self.protocol.error_received.called)
684
685    @mock.patch('asyncio.base_events.logger.error')
686    def test_fatal_error_connected(self, m_exc):
687        transport = self.datagram_transport(address=('0.0.0.0', 1))
688        err = ConnectionRefusedError()
689        transport._fatal_error(err)
690        self.assertFalse(self.protocol.error_received.called)
691        m_exc.assert_not_called()
692
693
694class BaseProactorEventLoopTests(test_utils.TestCase):
695
696    def setUp(self):
697        super().setUp()
698
699        self.sock = test_utils.mock_nonblocking_socket()
700        self.proactor = mock.Mock()
701
702        self.ssock, self.csock = mock.Mock(), mock.Mock()
703
704        with mock.patch('asyncio.proactor_events.socket.socketpair',
705                        return_value=(self.ssock, self.csock)):
706            with mock.patch('signal.set_wakeup_fd'):
707                self.loop = BaseProactorEventLoop(self.proactor)
708        self.set_event_loop(self.loop)
709
710    @mock.patch('asyncio.proactor_events.socket.socketpair')
711    def test_ctor(self, socketpair):
712        ssock, csock = socketpair.return_value = (
713            mock.Mock(), mock.Mock())
714        with mock.patch('signal.set_wakeup_fd'):
715            loop = BaseProactorEventLoop(self.proactor)
716        self.assertIs(loop._ssock, ssock)
717        self.assertIs(loop._csock, csock)
718        self.assertEqual(loop._internal_fds, 1)
719        loop.close()
720
721    def test_close_self_pipe(self):
722        self.loop._close_self_pipe()
723        self.assertEqual(self.loop._internal_fds, 0)
724        self.assertTrue(self.ssock.close.called)
725        self.assertTrue(self.csock.close.called)
726        self.assertIsNone(self.loop._ssock)
727        self.assertIsNone(self.loop._csock)
728
729        # Don't call close(): _close_self_pipe() cannot be called twice
730        self.loop._closed = True
731
732    def test_close(self):
733        self.loop._close_self_pipe = mock.Mock()
734        self.loop.close()
735        self.assertTrue(self.loop._close_self_pipe.called)
736        self.assertTrue(self.proactor.close.called)
737        self.assertIsNone(self.loop._proactor)
738
739        self.loop._close_self_pipe.reset_mock()
740        self.loop.close()
741        self.assertFalse(self.loop._close_self_pipe.called)
742
743    def test_make_socket_transport(self):
744        tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
745        self.assertIsInstance(tr, _ProactorSocketTransport)
746        close_transport(tr)
747
748    def test_loop_self_reading(self):
749        self.loop._loop_self_reading()
750        self.proactor.recv.assert_called_with(self.ssock, 4096)
751        self.proactor.recv.return_value.add_done_callback.assert_called_with(
752            self.loop._loop_self_reading)
753
754    def test_loop_self_reading_fut(self):
755        fut = mock.Mock()
756        self.loop._self_reading_future = fut
757        self.loop._loop_self_reading(fut)
758        self.assertTrue(fut.result.called)
759        self.proactor.recv.assert_called_with(self.ssock, 4096)
760        self.proactor.recv.return_value.add_done_callback.assert_called_with(
761            self.loop._loop_self_reading)
762
763    def test_loop_self_reading_exception(self):
764        self.loop.call_exception_handler = mock.Mock()
765        self.proactor.recv.side_effect = OSError()
766        self.loop._loop_self_reading()
767        self.assertTrue(self.loop.call_exception_handler.called)
768
769    def test_write_to_self(self):
770        self.loop._write_to_self()
771        self.csock.send.assert_called_with(b'\0')
772
773    def test_process_events(self):
774        self.loop._process_events([])
775
776    @mock.patch('asyncio.base_events.logger')
777    def test_create_server(self, m_log):
778        pf = mock.Mock()
779        call_soon = self.loop.call_soon = mock.Mock()
780
781        self.loop._start_serving(pf, self.sock)
782        self.assertTrue(call_soon.called)
783
784        # callback
785        loop = call_soon.call_args[0][0]
786        loop()
787        self.proactor.accept.assert_called_with(self.sock)
788
789        # conn
790        fut = mock.Mock()
791        fut.result.return_value = (mock.Mock(), mock.Mock())
792
793        make_tr = self.loop._make_socket_transport = mock.Mock()
794        loop(fut)
795        self.assertTrue(fut.result.called)
796        self.assertTrue(make_tr.called)
797
798        # exception
799        fut.result.side_effect = OSError()
800        loop(fut)
801        self.assertTrue(self.sock.close.called)
802        self.assertTrue(m_log.error.called)
803
804    def test_create_server_cancel(self):
805        pf = mock.Mock()
806        call_soon = self.loop.call_soon = mock.Mock()
807
808        self.loop._start_serving(pf, self.sock)
809        loop = call_soon.call_args[0][0]
810
811        # cancelled
812        fut = self.loop.create_future()
813        fut.cancel()
814        loop(fut)
815        self.assertTrue(self.sock.close.called)
816
817    def test_stop_serving(self):
818        sock1 = mock.Mock()
819        future1 = mock.Mock()
820        sock2 = mock.Mock()
821        future2 = mock.Mock()
822        self.loop._accept_futures = {
823            sock1.fileno(): future1,
824            sock2.fileno(): future2
825        }
826
827        self.loop._stop_serving(sock1)
828        self.assertTrue(sock1.close.called)
829        self.assertTrue(future1.cancel.called)
830        self.proactor._stop_serving.assert_called_with(sock1)
831        self.assertFalse(sock2.close.called)
832        self.assertFalse(future2.cancel.called)
833
834    def datagram_transport(self):
835        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
836        return self.loop._make_datagram_transport(self.sock, self.protocol)
837
838    def test_make_datagram_transport(self):
839        tr = self.datagram_transport()
840        self.assertIsInstance(tr, _ProactorDatagramTransport)
841        close_transport(tr)
842
843    def test_datagram_loop_writing(self):
844        tr = self.datagram_transport()
845        tr._buffer.appendleft((b'data', ('127.0.0.1', 12068)))
846        tr._loop_writing()
847        self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068))
848        self.loop._proactor.sendto.return_value.add_done_callback.\
849            assert_called_with(tr._loop_writing)
850
851        close_transport(tr)
852
853    def test_datagram_loop_reading(self):
854        tr = self.datagram_transport()
855        tr._loop_reading()
856        self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
857        self.assertFalse(self.protocol.datagram_received.called)
858        self.assertFalse(self.protocol.error_received.called)
859        close_transport(tr)
860
861    def test_datagram_loop_reading_data(self):
862        res = self.loop.create_future()
863        res.set_result((b'data', ('127.0.0.1', 12068)))
864
865        tr = self.datagram_transport()
866        tr._read_fut = res
867        tr._loop_reading(res)
868        self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
869        self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068))
870        close_transport(tr)
871
872    def test_datagram_loop_reading_no_data(self):
873        res = self.loop.create_future()
874        res.set_result((b'', ('127.0.0.1', 12068)))
875
876        tr = self.datagram_transport()
877        self.assertRaises(AssertionError, tr._loop_reading, res)
878
879        tr.close = mock.Mock()
880        tr._read_fut = res
881        tr._loop_reading(res)
882        self.assertTrue(self.loop._proactor.recvfrom.called)
883        self.assertFalse(self.protocol.error_received.called)
884        self.assertFalse(tr.close.called)
885        close_transport(tr)
886
887    def test_datagram_loop_reading_aborted(self):
888        err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError()
889
890        tr = self.datagram_transport()
891        tr._fatal_error = mock.Mock()
892        tr._protocol.error_received = mock.Mock()
893        tr._loop_reading()
894        tr._protocol.error_received.assert_called_with(err)
895        close_transport(tr)
896
897    def test_datagram_loop_writing_aborted(self):
898        err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError()
899
900        tr = self.datagram_transport()
901        tr._fatal_error = mock.Mock()
902        tr._protocol.error_received = mock.Mock()
903        tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068)))
904        tr._loop_writing()
905        tr._protocol.error_received.assert_called_with(err)
906        close_transport(tr)
907
908
909@unittest.skipIf(sys.platform != 'win32',
910                 'Proactor is supported on Windows only')
911class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
912    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
913
914    class MyProto(asyncio.Protocol):
915
916        def __init__(self, loop):
917            self.started = False
918            self.closed = False
919            self.data = bytearray()
920            self.fut = loop.create_future()
921            self.transport = None
922
923        def connection_made(self, transport):
924            self.started = True
925            self.transport = transport
926
927        def data_received(self, data):
928            self.data.extend(data)
929
930        def connection_lost(self, exc):
931            self.closed = True
932            self.fut.set_result(None)
933
934        async def wait_closed(self):
935            await self.fut
936
937    @classmethod
938    def setUpClass(cls):
939        with open(os_helper.TESTFN, 'wb') as fp:
940            fp.write(cls.DATA)
941        super().setUpClass()
942
943    @classmethod
944    def tearDownClass(cls):
945        os_helper.unlink(os_helper.TESTFN)
946        super().tearDownClass()
947
948    def setUp(self):
949        self.loop = asyncio.ProactorEventLoop()
950        self.set_event_loop(self.loop)
951        self.addCleanup(self.loop.close)
952        self.file = open(os_helper.TESTFN, 'rb')
953        self.addCleanup(self.file.close)
954        super().setUp()
955
956    def make_socket(self, cleanup=True):
957        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
958        sock.setblocking(False)
959        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
960        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
961        if cleanup:
962            self.addCleanup(sock.close)
963        return sock
964
965    def run_loop(self, coro):
966        return self.loop.run_until_complete(coro)
967
968    def prepare(self):
969        sock = self.make_socket()
970        proto = self.MyProto(self.loop)
971        port = socket_helper.find_unused_port()
972        srv_sock = self.make_socket(cleanup=False)
973        srv_sock.bind(('127.0.0.1', port))
974        server = self.run_loop(self.loop.create_server(
975            lambda: proto, sock=srv_sock))
976        self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
977
978        def cleanup():
979            if proto.transport is not None:
980                # can be None if the task was cancelled before
981                # connection_made callback
982                proto.transport.close()
983                self.run_loop(proto.wait_closed())
984
985            server.close()
986            self.run_loop(server.wait_closed())
987
988        self.addCleanup(cleanup)
989
990        return sock, proto
991
992    def test_sock_sendfile_not_a_file(self):
993        sock, proto = self.prepare()
994        f = object()
995        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
996                                    "not a regular file"):
997            self.run_loop(self.loop._sock_sendfile_native(sock, f,
998                                                          0, None))
999        self.assertEqual(self.file.tell(), 0)
1000
1001    def test_sock_sendfile_iobuffer(self):
1002        sock, proto = self.prepare()
1003        f = io.BytesIO()
1004        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1005                                    "not a regular file"):
1006            self.run_loop(self.loop._sock_sendfile_native(sock, f,
1007                                                          0, None))
1008        self.assertEqual(self.file.tell(), 0)
1009
1010    def test_sock_sendfile_not_regular_file(self):
1011        sock, proto = self.prepare()
1012        f = mock.Mock()
1013        f.fileno.return_value = -1
1014        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1015                                    "not a regular file"):
1016            self.run_loop(self.loop._sock_sendfile_native(sock, f,
1017                                                          0, None))
1018        self.assertEqual(self.file.tell(), 0)
1019
1020
1021if __name__ == '__main__':
1022    unittest.main()
1023