• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for proactor_events.py"""
2
3import io
4import socket
5import unittest
6import sys
7from unittest import mock
8
9import asyncio
10from asyncio.proactor_events import BaseProactorEventLoop
11from asyncio.proactor_events import _ProactorSocketTransport
12from asyncio.proactor_events import _ProactorWritePipeTransport
13from asyncio.proactor_events import _ProactorDuplexPipeTransport
14from asyncio.proactor_events import _ProactorDatagramTransport
15from test.support import os_helper
16from test.support import socket_helper
17from test.test_asyncio import utils as test_utils
18
19
20def tearDownModule():
21    asyncio.set_event_loop_policy(None)
22
23
24def close_transport(transport):
25    # Don't call transport.close() because the event loop and the IOCP proactor
26    # are mocked
27    if transport._sock is None:
28        return
29    transport._sock.close()
30    transport._sock = None
31
32
33class ProactorSocketTransportTests(test_utils.TestCase):
34
35    def setUp(self):
36        super().setUp()
37        self.loop = self.new_test_loop()
38        self.addCleanup(self.loop.close)
39        self.proactor = mock.Mock()
40        self.loop._proactor = self.proactor
41        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
42        self.sock = mock.Mock(socket.socket)
43        self.buffer_size = 65536
44
45    def socket_transport(self, waiter=None):
46        transport = _ProactorSocketTransport(self.loop, self.sock,
47                                             self.protocol, waiter=waiter)
48        self.addCleanup(close_transport, transport)
49        return transport
50
51    def test_ctor(self):
52        fut = self.loop.create_future()
53        tr = self.socket_transport(waiter=fut)
54        test_utils.run_briefly(self.loop)
55        self.assertIsNone(fut.result())
56        self.protocol.connection_made(tr)
57        self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
58
59    def test_loop_reading(self):
60        tr = self.socket_transport()
61        tr._loop_reading()
62        self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size))
63        self.assertFalse(self.protocol.data_received.called)
64        self.assertFalse(self.protocol.eof_received.called)
65
66    def test_loop_reading_data(self):
67        buf = b'data'
68        res = self.loop.create_future()
69        res.set_result(len(buf))
70
71        tr = self.socket_transport()
72        tr._read_fut = res
73        tr._data[:len(buf)] = buf
74        tr._loop_reading(res)
75        called_buf = bytearray(self.buffer_size)
76        called_buf[:len(buf)] = buf
77        self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf)
78        self.protocol.data_received.assert_called_with(bytearray(buf))
79
80    def test_loop_reading_no_data(self):
81        res = self.loop.create_future()
82        res.set_result(0)
83
84        tr = self.socket_transport()
85        self.assertRaises(AssertionError, tr._loop_reading, res)
86
87        tr.close = mock.Mock()
88        tr._read_fut = res
89        tr._loop_reading(res)
90        self.assertFalse(self.loop._proactor.recv_into.called)
91        self.assertTrue(self.protocol.eof_received.called)
92        self.assertTrue(tr.close.called)
93
94    def test_loop_reading_aborted(self):
95        err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
96
97        tr = self.socket_transport()
98        tr._fatal_error = mock.Mock()
99        tr._loop_reading()
100        tr._fatal_error.assert_called_with(
101                            err,
102                            'Fatal read error on pipe transport')
103
104    def test_loop_reading_aborted_closing(self):
105        self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
106
107        tr = self.socket_transport()
108        tr._closing = True
109        tr._fatal_error = mock.Mock()
110        tr._loop_reading()
111        self.assertFalse(tr._fatal_error.called)
112
113    def test_loop_reading_aborted_is_fatal(self):
114        self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
115        tr = self.socket_transport()
116        tr._closing = False
117        tr._fatal_error = mock.Mock()
118        tr._loop_reading()
119        self.assertTrue(tr._fatal_error.called)
120
121    def test_loop_reading_conn_reset_lost(self):
122        err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
123
124        tr = self.socket_transport()
125        tr._closing = False
126        tr._fatal_error = mock.Mock()
127        tr._force_close = mock.Mock()
128        tr._loop_reading()
129        self.assertFalse(tr._fatal_error.called)
130        tr._force_close.assert_called_with(err)
131
132    def test_loop_reading_exception(self):
133        err = self.loop._proactor.recv_into.side_effect = (OSError())
134
135        tr = self.socket_transport()
136        tr._fatal_error = mock.Mock()
137        tr._loop_reading()
138        tr._fatal_error.assert_called_with(
139                            err,
140                            'Fatal read error on pipe transport')
141
142    def test_write(self):
143        tr = self.socket_transport()
144        tr._loop_writing = mock.Mock()
145        tr.write(b'data')
146        self.assertEqual(tr._buffer, None)
147        tr._loop_writing.assert_called_with(data=b'data')
148
149    def test_write_no_data(self):
150        tr = self.socket_transport()
151        tr.write(b'')
152        self.assertFalse(tr._buffer)
153
154    def test_write_more(self):
155        tr = self.socket_transport()
156        tr._write_fut = mock.Mock()
157        tr._loop_writing = mock.Mock()
158        tr.write(b'data')
159        self.assertEqual(tr._buffer, b'data')
160        self.assertFalse(tr._loop_writing.called)
161
162    def test_loop_writing(self):
163        tr = self.socket_transport()
164        tr._buffer = bytearray(b'data')
165        tr._loop_writing()
166        self.loop._proactor.send.assert_called_with(self.sock, b'data')
167        self.loop._proactor.send.return_value.add_done_callback.\
168            assert_called_with(tr._loop_writing)
169
170    @mock.patch('asyncio.proactor_events.logger')
171    def test_loop_writing_err(self, m_log):
172        err = self.loop._proactor.send.side_effect = OSError()
173        tr = self.socket_transport()
174        tr._fatal_error = mock.Mock()
175        tr._buffer = [b'da', b'ta']
176        tr._loop_writing()
177        tr._fatal_error.assert_called_with(
178                            err,
179                            'Fatal write error on pipe transport')
180        tr._conn_lost = 1
181
182        tr.write(b'data')
183        tr.write(b'data')
184        tr.write(b'data')
185        tr.write(b'data')
186        tr.write(b'data')
187        self.assertEqual(tr._buffer, None)
188        m_log.warning.assert_called_with('socket.send() raised exception.')
189
190    def test_loop_writing_stop(self):
191        fut = self.loop.create_future()
192        fut.set_result(b'data')
193
194        tr = self.socket_transport()
195        tr._write_fut = fut
196        tr._loop_writing(fut)
197        self.assertIsNone(tr._write_fut)
198
199    def test_loop_writing_closing(self):
200        fut = self.loop.create_future()
201        fut.set_result(1)
202
203        tr = self.socket_transport()
204        tr._write_fut = fut
205        tr.close()
206        tr._loop_writing(fut)
207        self.assertIsNone(tr._write_fut)
208        test_utils.run_briefly(self.loop)
209        self.protocol.connection_lost.assert_called_with(None)
210
211    def test_abort(self):
212        tr = self.socket_transport()
213        tr._force_close = mock.Mock()
214        tr.abort()
215        tr._force_close.assert_called_with(None)
216
217    def test_close(self):
218        tr = self.socket_transport()
219        tr.close()
220        test_utils.run_briefly(self.loop)
221        self.protocol.connection_lost.assert_called_with(None)
222        self.assertTrue(tr.is_closing())
223        self.assertEqual(tr._conn_lost, 1)
224
225        self.protocol.connection_lost.reset_mock()
226        tr.close()
227        test_utils.run_briefly(self.loop)
228        self.assertFalse(self.protocol.connection_lost.called)
229
230    def test_close_write_fut(self):
231        tr = self.socket_transport()
232        tr._write_fut = mock.Mock()
233        tr.close()
234        test_utils.run_briefly(self.loop)
235        self.assertFalse(self.protocol.connection_lost.called)
236
237    def test_close_buffer(self):
238        tr = self.socket_transport()
239        tr._buffer = [b'data']
240        tr.close()
241        test_utils.run_briefly(self.loop)
242        self.assertFalse(self.protocol.connection_lost.called)
243
244    def test_close_invalid_sockobj(self):
245        tr = self.socket_transport()
246        self.sock.fileno.return_value = -1
247        tr.close()
248        test_utils.run_briefly(self.loop)
249        self.protocol.connection_lost.assert_called_with(None)
250        self.assertFalse(self.sock.shutdown.called)
251
252    @mock.patch('asyncio.base_events.logger')
253    def test_fatal_error(self, m_logging):
254        tr = self.socket_transport()
255        tr._force_close = mock.Mock()
256        tr._fatal_error(None)
257        self.assertTrue(tr._force_close.called)
258        self.assertTrue(m_logging.error.called)
259
260    def test_force_close(self):
261        tr = self.socket_transport()
262        tr._buffer = [b'data']
263        read_fut = tr._read_fut = mock.Mock()
264        write_fut = tr._write_fut = mock.Mock()
265        tr._force_close(None)
266
267        read_fut.cancel.assert_called_with()
268        write_fut.cancel.assert_called_with()
269        test_utils.run_briefly(self.loop)
270        self.protocol.connection_lost.assert_called_with(None)
271        self.assertEqual(None, tr._buffer)
272        self.assertEqual(tr._conn_lost, 1)
273
274    def test_loop_writing_force_close(self):
275        exc_handler = mock.Mock()
276        self.loop.set_exception_handler(exc_handler)
277        fut = self.loop.create_future()
278        fut.set_result(1)
279        self.proactor.send.return_value = fut
280
281        tr = self.socket_transport()
282        tr.write(b'data')
283        tr._force_close(None)
284        test_utils.run_briefly(self.loop)
285        exc_handler.assert_not_called()
286
287    def test_force_close_idempotent(self):
288        tr = self.socket_transport()
289        tr._closing = True
290        tr._force_close(None)
291        test_utils.run_briefly(self.loop)
292        self.assertFalse(self.protocol.connection_lost.called)
293
294    def test_fatal_error_2(self):
295        tr = self.socket_transport()
296        tr._buffer = [b'data']
297        tr._force_close(None)
298
299        test_utils.run_briefly(self.loop)
300        self.protocol.connection_lost.assert_called_with(None)
301        self.assertEqual(None, tr._buffer)
302
303    def test_call_connection_lost(self):
304        tr = self.socket_transport()
305        tr._call_connection_lost(None)
306        self.assertTrue(self.protocol.connection_lost.called)
307        self.assertTrue(self.sock.close.called)
308
309    def test_write_eof(self):
310        tr = self.socket_transport()
311        self.assertTrue(tr.can_write_eof())
312        tr.write_eof()
313        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
314        tr.write_eof()
315        self.assertEqual(self.sock.shutdown.call_count, 1)
316        tr.close()
317
318    def test_write_eof_buffer(self):
319        tr = self.socket_transport()
320        f = self.loop.create_future()
321        tr._loop._proactor.send.return_value = f
322        tr.write(b'data')
323        tr.write_eof()
324        self.assertTrue(tr._eof_written)
325        self.assertFalse(self.sock.shutdown.called)
326        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
327        f.set_result(4)
328        self.loop._run_once()
329        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
330        tr.close()
331
332    def test_write_eof_write_pipe(self):
333        tr = _ProactorWritePipeTransport(
334            self.loop, self.sock, self.protocol)
335        self.assertTrue(tr.can_write_eof())
336        tr.write_eof()
337        self.assertTrue(tr.is_closing())
338        self.loop._run_once()
339        self.assertTrue(self.sock.close.called)
340        tr.close()
341
342    def test_write_eof_buffer_write_pipe(self):
343        tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
344        f = self.loop.create_future()
345        tr._loop._proactor.send.return_value = f
346        tr.write(b'data')
347        tr.write_eof()
348        self.assertTrue(tr.is_closing())
349        self.assertFalse(self.sock.shutdown.called)
350        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
351        f.set_result(4)
352        self.loop._run_once()
353        self.loop._run_once()
354        self.assertTrue(self.sock.close.called)
355        tr.close()
356
357    def test_write_eof_duplex_pipe(self):
358        tr = _ProactorDuplexPipeTransport(
359            self.loop, self.sock, self.protocol)
360        self.assertFalse(tr.can_write_eof())
361        with self.assertRaises(NotImplementedError):
362            tr.write_eof()
363        close_transport(tr)
364
365    def test_pause_resume_reading(self):
366        tr = self.socket_transport()
367        index = 0
368        msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b'']
369        reversed_msgs = list(reversed(msgs))
370
371        def recv_into(sock, data):
372            f = self.loop.create_future()
373            msg = reversed_msgs.pop()
374
375            result = f.result
376            def monkey():
377                data[:len(msg)] = msg
378                return result()
379            f.result = monkey
380
381            f.set_result(len(msg))
382            return f
383
384        self.loop._proactor.recv_into.side_effect = recv_into
385        self.loop._run_once()
386        self.assertFalse(tr._paused)
387        self.assertTrue(tr.is_reading())
388
389        for msg in msgs[:2]:
390            self.loop._run_once()
391            self.protocol.data_received.assert_called_with(bytearray(msg))
392
393        tr.pause_reading()
394        tr.pause_reading()
395        self.assertTrue(tr._paused)
396        self.assertFalse(tr.is_reading())
397        for i in range(10):
398            self.loop._run_once()
399        self.protocol.data_received.assert_called_with(bytearray(msgs[1]))
400
401        tr.resume_reading()
402        tr.resume_reading()
403        self.assertFalse(tr._paused)
404        self.assertTrue(tr.is_reading())
405
406        for msg in msgs[2:4]:
407            self.loop._run_once()
408            self.protocol.data_received.assert_called_with(bytearray(msg))
409
410        tr.pause_reading()
411        tr.resume_reading()
412        self.loop.call_exception_handler = mock.Mock()
413        self.loop._run_once()
414        self.loop.call_exception_handler.assert_not_called()
415        self.protocol.data_received.assert_called_with(bytearray(msgs[4]))
416        tr.close()
417
418        self.assertFalse(tr.is_reading())
419
420
421    def pause_writing_transport(self, high):
422        tr = self.socket_transport()
423        tr.set_write_buffer_limits(high=high)
424
425        self.assertEqual(tr.get_write_buffer_size(), 0)
426        self.assertFalse(self.protocol.pause_writing.called)
427        self.assertFalse(self.protocol.resume_writing.called)
428        return tr
429
430    def test_pause_resume_writing(self):
431        tr = self.pause_writing_transport(high=4)
432
433        # write a large chunk, must pause writing
434        fut = self.loop.create_future()
435        self.loop._proactor.send.return_value = fut
436        tr.write(b'large data')
437        self.loop._run_once()
438        self.assertTrue(self.protocol.pause_writing.called)
439
440        # flush the buffer
441        fut.set_result(None)
442        self.loop._run_once()
443        self.assertEqual(tr.get_write_buffer_size(), 0)
444        self.assertTrue(self.protocol.resume_writing.called)
445
446    def test_pause_writing_2write(self):
447        tr = self.pause_writing_transport(high=4)
448
449        # first short write, the buffer is not full (3 <= 4)
450        fut1 = self.loop.create_future()
451        self.loop._proactor.send.return_value = fut1
452        tr.write(b'123')
453        self.loop._run_once()
454        self.assertEqual(tr.get_write_buffer_size(), 3)
455        self.assertFalse(self.protocol.pause_writing.called)
456
457        # fill the buffer, must pause writing (6 > 4)
458        tr.write(b'abc')
459        self.loop._run_once()
460        self.assertEqual(tr.get_write_buffer_size(), 6)
461        self.assertTrue(self.protocol.pause_writing.called)
462
463    def test_pause_writing_3write(self):
464        tr = self.pause_writing_transport(high=4)
465
466        # first short write, the buffer is not full (1 <= 4)
467        fut = self.loop.create_future()
468        self.loop._proactor.send.return_value = fut
469        tr.write(b'1')
470        self.loop._run_once()
471        self.assertEqual(tr.get_write_buffer_size(), 1)
472        self.assertFalse(self.protocol.pause_writing.called)
473
474        # second short write, the buffer is not full (3 <= 4)
475        tr.write(b'23')
476        self.loop._run_once()
477        self.assertEqual(tr.get_write_buffer_size(), 3)
478        self.assertFalse(self.protocol.pause_writing.called)
479
480        # fill the buffer, must pause writing (6 > 4)
481        tr.write(b'abc')
482        self.loop._run_once()
483        self.assertEqual(tr.get_write_buffer_size(), 6)
484        self.assertTrue(self.protocol.pause_writing.called)
485
486    def test_dont_pause_writing(self):
487        tr = self.pause_writing_transport(high=4)
488
489        # write a large chunk which completes immediately,
490        # it should not pause writing
491        fut = self.loop.create_future()
492        fut.set_result(None)
493        self.loop._proactor.send.return_value = fut
494        tr.write(b'very large data')
495        self.loop._run_once()
496        self.assertEqual(tr.get_write_buffer_size(), 0)
497        self.assertFalse(self.protocol.pause_writing.called)
498
499
500class ProactorDatagramTransportTests(test_utils.TestCase):
501
502    def setUp(self):
503        super().setUp()
504        self.loop = self.new_test_loop()
505        self.proactor = mock.Mock()
506        self.loop._proactor = self.proactor
507        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
508        self.sock = mock.Mock(spec_set=socket.socket)
509        self.sock.fileno.return_value = 7
510
511    def datagram_transport(self, address=None):
512        self.sock.getpeername.side_effect = None if address else OSError
513        transport = _ProactorDatagramTransport(self.loop, self.sock,
514                                               self.protocol,
515                                               address=address)
516        self.addCleanup(close_transport, transport)
517        return transport
518
519    def test_sendto(self):
520        data = b'data'
521        transport = self.datagram_transport()
522        transport.sendto(data, ('0.0.0.0', 1234))
523        self.assertTrue(self.proactor.sendto.called)
524        self.proactor.sendto.assert_called_with(
525            self.sock, data, addr=('0.0.0.0', 1234))
526
527    def test_sendto_bytearray(self):
528        data = bytearray(b'data')
529        transport = self.datagram_transport()
530        transport.sendto(data, ('0.0.0.0', 1234))
531        self.assertTrue(self.proactor.sendto.called)
532        self.proactor.sendto.assert_called_with(
533            self.sock, b'data', addr=('0.0.0.0', 1234))
534
535    def test_sendto_memoryview(self):
536        data = memoryview(b'data')
537        transport = self.datagram_transport()
538        transport.sendto(data, ('0.0.0.0', 1234))
539        self.assertTrue(self.proactor.sendto.called)
540        self.proactor.sendto.assert_called_with(
541            self.sock, b'data', addr=('0.0.0.0', 1234))
542
543    def test_sendto_no_data(self):
544        transport = self.datagram_transport()
545        transport._buffer.append((b'data', ('0.0.0.0', 12345)))
546        transport.sendto(b'', ())
547        self.assertFalse(self.sock.sendto.called)
548        self.assertEqual(
549            [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
550
551    def test_sendto_buffer(self):
552        transport = self.datagram_transport()
553        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
554        transport._write_fut = object()
555        transport.sendto(b'data2', ('0.0.0.0', 12345))
556        self.assertFalse(self.proactor.sendto.called)
557        self.assertEqual(
558            [(b'data1', ('0.0.0.0', 12345)),
559             (b'data2', ('0.0.0.0', 12345))],
560            list(transport._buffer))
561
562    def test_sendto_buffer_bytearray(self):
563        data2 = bytearray(b'data2')
564        transport = self.datagram_transport()
565        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
566        transport._write_fut = object()
567        transport.sendto(data2, ('0.0.0.0', 12345))
568        self.assertFalse(self.proactor.sendto.called)
569        self.assertEqual(
570            [(b'data1', ('0.0.0.0', 12345)),
571             (b'data2', ('0.0.0.0', 12345))],
572            list(transport._buffer))
573        self.assertIsInstance(transport._buffer[1][0], bytes)
574
575    def test_sendto_buffer_memoryview(self):
576        data2 = memoryview(b'data2')
577        transport = self.datagram_transport()
578        transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
579        transport._write_fut = object()
580        transport.sendto(data2, ('0.0.0.0', 12345))
581        self.assertFalse(self.proactor.sendto.called)
582        self.assertEqual(
583            [(b'data1', ('0.0.0.0', 12345)),
584             (b'data2', ('0.0.0.0', 12345))],
585            list(transport._buffer))
586        self.assertIsInstance(transport._buffer[1][0], bytes)
587
588    @mock.patch('asyncio.proactor_events.logger')
589    def test_sendto_exception(self, m_log):
590        data = b'data'
591        err = self.proactor.sendto.side_effect = RuntimeError()
592
593        transport = self.datagram_transport()
594        transport._fatal_error = mock.Mock()
595        transport.sendto(data, ())
596
597        self.assertTrue(transport._fatal_error.called)
598        transport._fatal_error.assert_called_with(
599                                   err,
600                                   'Fatal write error on datagram transport')
601        transport._conn_lost = 1
602
603        transport._address = ('123',)
604        transport.sendto(data)
605        transport.sendto(data)
606        transport.sendto(data)
607        transport.sendto(data)
608        transport.sendto(data)
609        m_log.warning.assert_called_with('socket.sendto() raised exception.')
610
611    def test_sendto_error_received(self):
612        data = b'data'
613
614        self.sock.sendto.side_effect = ConnectionRefusedError
615
616        transport = self.datagram_transport()
617        transport._fatal_error = mock.Mock()
618        transport.sendto(data, ())
619
620        self.assertEqual(transport._conn_lost, 0)
621        self.assertFalse(transport._fatal_error.called)
622
623    def test_sendto_error_received_connected(self):
624        data = b'data'
625
626        self.proactor.send.side_effect = ConnectionRefusedError
627
628        transport = self.datagram_transport(address=('0.0.0.0', 1))
629        transport._fatal_error = mock.Mock()
630        transport.sendto(data)
631
632        self.assertFalse(transport._fatal_error.called)
633        self.assertTrue(self.protocol.error_received.called)
634
635    def test_sendto_str(self):
636        transport = self.datagram_transport()
637        self.assertRaises(TypeError, transport.sendto, 'str', ())
638
639    def test_sendto_connected_addr(self):
640        transport = self.datagram_transport(address=('0.0.0.0', 1))
641        self.assertRaises(
642            ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
643
644    def test_sendto_closing(self):
645        transport = self.datagram_transport(address=(1,))
646        transport.close()
647        self.assertEqual(transport._conn_lost, 1)
648        transport.sendto(b'data', (1,))
649        self.assertEqual(transport._conn_lost, 2)
650
651    def test__loop_writing_closing(self):
652        transport = self.datagram_transport()
653        transport._closing = True
654        transport._loop_writing()
655        self.assertIsNone(transport._write_fut)
656        test_utils.run_briefly(self.loop)
657        self.sock.close.assert_called_with()
658        self.protocol.connection_lost.assert_called_with(None)
659
660    def test__loop_writing_exception(self):
661        err = self.proactor.sendto.side_effect = RuntimeError()
662
663        transport = self.datagram_transport()
664        transport._fatal_error = mock.Mock()
665        transport._buffer.append((b'data', ()))
666        transport._loop_writing()
667
668        transport._fatal_error.assert_called_with(
669                                   err,
670                                   'Fatal write error on datagram transport')
671
672    def test__loop_writing_error_received(self):
673        self.proactor.sendto.side_effect = ConnectionRefusedError
674
675        transport = self.datagram_transport()
676        transport._fatal_error = mock.Mock()
677        transport._buffer.append((b'data', ()))
678        transport._loop_writing()
679
680        self.assertFalse(transport._fatal_error.called)
681
682    def test__loop_writing_error_received_connection(self):
683        self.proactor.send.side_effect = ConnectionRefusedError
684
685        transport = self.datagram_transport(address=('0.0.0.0', 1))
686        transport._fatal_error = mock.Mock()
687        transport._buffer.append((b'data', ()))
688        transport._loop_writing()
689
690        self.assertFalse(transport._fatal_error.called)
691        self.assertTrue(self.protocol.error_received.called)
692
693    @mock.patch('asyncio.base_events.logger.error')
694    def test_fatal_error_connected(self, m_exc):
695        transport = self.datagram_transport(address=('0.0.0.0', 1))
696        err = ConnectionRefusedError()
697        transport._fatal_error(err)
698        self.assertFalse(self.protocol.error_received.called)
699        m_exc.assert_not_called()
700
701
702class BaseProactorEventLoopTests(test_utils.TestCase):
703
704    def setUp(self):
705        super().setUp()
706
707        self.sock = test_utils.mock_nonblocking_socket()
708        self.proactor = mock.Mock()
709
710        self.ssock, self.csock = mock.Mock(), mock.Mock()
711
712        with mock.patch('asyncio.proactor_events.socket.socketpair',
713                        return_value=(self.ssock, self.csock)):
714            with mock.patch('signal.set_wakeup_fd'):
715                self.loop = BaseProactorEventLoop(self.proactor)
716        self.set_event_loop(self.loop)
717
718    @mock.patch('asyncio.proactor_events.socket.socketpair')
719    def test_ctor(self, socketpair):
720        ssock, csock = socketpair.return_value = (
721            mock.Mock(), mock.Mock())
722        with mock.patch('signal.set_wakeup_fd'):
723            loop = BaseProactorEventLoop(self.proactor)
724        self.assertIs(loop._ssock, ssock)
725        self.assertIs(loop._csock, csock)
726        self.assertEqual(loop._internal_fds, 1)
727        loop.close()
728
729    def test_close_self_pipe(self):
730        self.loop._close_self_pipe()
731        self.assertEqual(self.loop._internal_fds, 0)
732        self.assertTrue(self.ssock.close.called)
733        self.assertTrue(self.csock.close.called)
734        self.assertIsNone(self.loop._ssock)
735        self.assertIsNone(self.loop._csock)
736
737        # Don't call close(): _close_self_pipe() cannot be called twice
738        self.loop._closed = True
739
740    def test_close(self):
741        self.loop._close_self_pipe = mock.Mock()
742        self.loop.close()
743        self.assertTrue(self.loop._close_self_pipe.called)
744        self.assertTrue(self.proactor.close.called)
745        self.assertIsNone(self.loop._proactor)
746
747        self.loop._close_self_pipe.reset_mock()
748        self.loop.close()
749        self.assertFalse(self.loop._close_self_pipe.called)
750
751    def test_make_socket_transport(self):
752        tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
753        self.assertIsInstance(tr, _ProactorSocketTransport)
754        close_transport(tr)
755
756    def test_loop_self_reading(self):
757        self.loop._loop_self_reading()
758        self.proactor.recv.assert_called_with(self.ssock, 4096)
759        self.proactor.recv.return_value.add_done_callback.assert_called_with(
760            self.loop._loop_self_reading)
761
762    def test_loop_self_reading_fut(self):
763        fut = mock.Mock()
764        self.loop._self_reading_future = fut
765        self.loop._loop_self_reading(fut)
766        self.assertTrue(fut.result.called)
767        self.proactor.recv.assert_called_with(self.ssock, 4096)
768        self.proactor.recv.return_value.add_done_callback.assert_called_with(
769            self.loop._loop_self_reading)
770
771    def test_loop_self_reading_exception(self):
772        self.loop.call_exception_handler = mock.Mock()
773        self.proactor.recv.side_effect = OSError()
774        self.loop._loop_self_reading()
775        self.assertTrue(self.loop.call_exception_handler.called)
776
777    def test_write_to_self(self):
778        self.loop._write_to_self()
779        self.csock.send.assert_called_with(b'\0')
780
781    def test_process_events(self):
782        self.loop._process_events([])
783
784    @mock.patch('asyncio.base_events.logger')
785    def test_create_server(self, m_log):
786        pf = mock.Mock()
787        call_soon = self.loop.call_soon = mock.Mock()
788
789        self.loop._start_serving(pf, self.sock)
790        self.assertTrue(call_soon.called)
791
792        # callback
793        loop = call_soon.call_args[0][0]
794        loop()
795        self.proactor.accept.assert_called_with(self.sock)
796
797        # conn
798        fut = mock.Mock()
799        fut.result.return_value = (mock.Mock(), mock.Mock())
800
801        make_tr = self.loop._make_socket_transport = mock.Mock()
802        loop(fut)
803        self.assertTrue(fut.result.called)
804        self.assertTrue(make_tr.called)
805
806        # exception
807        fut.result.side_effect = OSError()
808        loop(fut)
809        self.assertTrue(self.sock.close.called)
810        self.assertTrue(m_log.error.called)
811
812    def test_create_server_cancel(self):
813        pf = mock.Mock()
814        call_soon = self.loop.call_soon = mock.Mock()
815
816        self.loop._start_serving(pf, self.sock)
817        loop = call_soon.call_args[0][0]
818
819        # cancelled
820        fut = self.loop.create_future()
821        fut.cancel()
822        loop(fut)
823        self.assertTrue(self.sock.close.called)
824
825    def test_stop_serving(self):
826        sock1 = mock.Mock()
827        future1 = mock.Mock()
828        sock2 = mock.Mock()
829        future2 = mock.Mock()
830        self.loop._accept_futures = {
831            sock1.fileno(): future1,
832            sock2.fileno(): future2
833        }
834
835        self.loop._stop_serving(sock1)
836        self.assertTrue(sock1.close.called)
837        self.assertTrue(future1.cancel.called)
838        self.proactor._stop_serving.assert_called_with(sock1)
839        self.assertFalse(sock2.close.called)
840        self.assertFalse(future2.cancel.called)
841
842    def datagram_transport(self):
843        self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
844        return self.loop._make_datagram_transport(self.sock, self.protocol)
845
846    def test_make_datagram_transport(self):
847        tr = self.datagram_transport()
848        self.assertIsInstance(tr, _ProactorDatagramTransport)
849        self.assertIsInstance(tr, asyncio.DatagramTransport)
850        close_transport(tr)
851
852    def test_datagram_loop_writing(self):
853        tr = self.datagram_transport()
854        tr._buffer.appendleft((b'data', ('127.0.0.1', 12068)))
855        tr._loop_writing()
856        self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068))
857        self.loop._proactor.sendto.return_value.add_done_callback.\
858            assert_called_with(tr._loop_writing)
859
860        close_transport(tr)
861
862    def test_datagram_loop_reading(self):
863        tr = self.datagram_transport()
864        tr._loop_reading()
865        self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
866        self.assertFalse(self.protocol.datagram_received.called)
867        self.assertFalse(self.protocol.error_received.called)
868        close_transport(tr)
869
870    def test_datagram_loop_reading_data(self):
871        res = self.loop.create_future()
872        res.set_result((b'data', ('127.0.0.1', 12068)))
873
874        tr = self.datagram_transport()
875        tr._read_fut = res
876        tr._loop_reading(res)
877        self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024)
878        self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068))
879        close_transport(tr)
880
881    def test_datagram_loop_reading_no_data(self):
882        res = self.loop.create_future()
883        res.set_result((b'', ('127.0.0.1', 12068)))
884
885        tr = self.datagram_transport()
886        self.assertRaises(AssertionError, tr._loop_reading, res)
887
888        tr.close = mock.Mock()
889        tr._read_fut = res
890        tr._loop_reading(res)
891        self.assertTrue(self.loop._proactor.recvfrom.called)
892        self.assertFalse(self.protocol.error_received.called)
893        self.assertFalse(tr.close.called)
894        close_transport(tr)
895
896    def test_datagram_loop_reading_aborted(self):
897        err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError()
898
899        tr = self.datagram_transport()
900        tr._fatal_error = mock.Mock()
901        tr._protocol.error_received = mock.Mock()
902        tr._loop_reading()
903        tr._protocol.error_received.assert_called_with(err)
904        close_transport(tr)
905
906    def test_datagram_loop_writing_aborted(self):
907        err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError()
908
909        tr = self.datagram_transport()
910        tr._fatal_error = mock.Mock()
911        tr._protocol.error_received = mock.Mock()
912        tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068)))
913        tr._loop_writing()
914        tr._protocol.error_received.assert_called_with(err)
915        close_transport(tr)
916
917
918@unittest.skipIf(sys.platform != 'win32',
919                 'Proactor is supported on Windows only')
920class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
921    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
922
923    class MyProto(asyncio.Protocol):
924
925        def __init__(self, loop):
926            self.started = False
927            self.closed = False
928            self.data = bytearray()
929            self.fut = loop.create_future()
930            self.transport = None
931
932        def connection_made(self, transport):
933            self.started = True
934            self.transport = transport
935
936        def data_received(self, data):
937            self.data.extend(data)
938
939        def connection_lost(self, exc):
940            self.closed = True
941            self.fut.set_result(None)
942
943        async def wait_closed(self):
944            await self.fut
945
946    @classmethod
947    def setUpClass(cls):
948        with open(os_helper.TESTFN, 'wb') as fp:
949            fp.write(cls.DATA)
950        super().setUpClass()
951
952    @classmethod
953    def tearDownClass(cls):
954        os_helper.unlink(os_helper.TESTFN)
955        super().tearDownClass()
956
957    def setUp(self):
958        self.loop = asyncio.ProactorEventLoop()
959        self.set_event_loop(self.loop)
960        self.addCleanup(self.loop.close)
961        self.file = open(os_helper.TESTFN, 'rb')
962        self.addCleanup(self.file.close)
963        super().setUp()
964
965    def make_socket(self, cleanup=True):
966        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
967        sock.setblocking(False)
968        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
969        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
970        if cleanup:
971            self.addCleanup(sock.close)
972        return sock
973
974    def run_loop(self, coro):
975        return self.loop.run_until_complete(coro)
976
977    def prepare(self):
978        sock = self.make_socket()
979        proto = self.MyProto(self.loop)
980        port = socket_helper.find_unused_port()
981        srv_sock = self.make_socket(cleanup=False)
982        srv_sock.bind(('127.0.0.1', port))
983        server = self.run_loop(self.loop.create_server(
984            lambda: proto, sock=srv_sock))
985        self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
986
987        def cleanup():
988            if proto.transport is not None:
989                # can be None if the task was cancelled before
990                # connection_made callback
991                proto.transport.close()
992                self.run_loop(proto.wait_closed())
993
994            server.close()
995            self.run_loop(server.wait_closed())
996
997        self.addCleanup(cleanup)
998
999        return sock, proto
1000
1001    def test_sock_sendfile_not_a_file(self):
1002        sock, proto = self.prepare()
1003        f = object()
1004        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1005                                    "not a regular file"):
1006            self.run_loop(self.loop._sock_sendfile_native(sock, f,
1007                                                          0, None))
1008        self.assertEqual(self.file.tell(), 0)
1009
1010    def test_sock_sendfile_iobuffer(self):
1011        sock, proto = self.prepare()
1012        f = io.BytesIO()
1013        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1014                                    "not a regular file"):
1015            self.run_loop(self.loop._sock_sendfile_native(sock, f,
1016                                                          0, None))
1017        self.assertEqual(self.file.tell(), 0)
1018
1019    def test_sock_sendfile_not_regular_file(self):
1020        sock, proto = self.prepare()
1021        f = mock.Mock()
1022        f.fileno.return_value = -1
1023        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
1024                                    "not a regular file"):
1025            self.run_loop(self.loop._sock_sendfile_native(sock, f,
1026                                                          0, None))
1027        self.assertEqual(self.file.tell(), 0)
1028
1029
1030if __name__ == '__main__':
1031    unittest.main()
1032