1 /*
2 * A type which wraps a pipe handle in message oriented mode
3 *
4 * pipe_connection.c
5 *
6 * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7 */
8
9 #include "multiprocessing.h"
10
11 #define CLOSE(h) CloseHandle(h)
12
13 /*
14 * Send string to the pipe; assumes in message oriented mode
15 */
16
17 static Py_ssize_t
conn_send_string(ConnectionObject * conn,char * string,size_t length)18 conn_send_string(ConnectionObject *conn, char *string, size_t length)
19 {
20 DWORD amount_written;
21 BOOL ret;
22
23 Py_BEGIN_ALLOW_THREADS
24 ret = WriteFile(conn->handle, string, length, &amount_written, NULL);
25 Py_END_ALLOW_THREADS
26
27 if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) {
28 PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length);
29 return MP_STANDARD_ERROR;
30 }
31
32 return ret ? MP_SUCCESS : MP_STANDARD_ERROR;
33 }
34
35 /*
36 * Attempts to read into buffer, or if buffer too small into *newbuffer.
37 *
38 * Returns number of bytes read. Assumes in message oriented mode.
39 */
40
41 static Py_ssize_t
conn_recv_string(ConnectionObject * conn,char * buffer,size_t buflength,char ** newbuffer,size_t maxlength)42 conn_recv_string(ConnectionObject *conn, char *buffer,
43 size_t buflength, char **newbuffer, size_t maxlength)
44 {
45 DWORD left, length, full_length, err;
46 BOOL ret;
47 *newbuffer = NULL;
48
49 Py_BEGIN_ALLOW_THREADS
50 ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
51 &length, NULL);
52 Py_END_ALLOW_THREADS
53 if (ret)
54 return length;
55
56 err = GetLastError();
57 if (err != ERROR_MORE_DATA) {
58 if (err == ERROR_BROKEN_PIPE)
59 return MP_END_OF_FILE;
60 return MP_STANDARD_ERROR;
61 }
62
63 if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
64 return MP_STANDARD_ERROR;
65
66 full_length = length + left;
67 if (full_length > maxlength)
68 return MP_BAD_MESSAGE_LENGTH;
69
70 *newbuffer = PyMem_Malloc(full_length);
71 if (*newbuffer == NULL)
72 return MP_MEMORY_ERROR;
73
74 memcpy(*newbuffer, buffer, length);
75
76 Py_BEGIN_ALLOW_THREADS
77 ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL);
78 Py_END_ALLOW_THREADS
79 if (ret) {
80 assert(length == left);
81 return full_length;
82 } else {
83 PyMem_Free(*newbuffer);
84 return MP_STANDARD_ERROR;
85 }
86 }
87
88 /*
89 * Check whether any data is available for reading
90 */
91
92 static int
conn_poll(ConnectionObject * conn,double timeout,PyThreadState * _save)93 conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
94 {
95 DWORD bytes, deadline, delay;
96 int difference, res;
97 BOOL block = FALSE;
98
99 if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
100 return MP_STANDARD_ERROR;
101
102 if (timeout == 0.0)
103 return bytes > 0;
104
105 if (timeout < 0.0)
106 block = TRUE;
107 else
108 /* XXX does not check for overflow */
109 deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
110
111 Sleep(0);
112
113 for (delay = 1 ; ; delay += 1) {
114 if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
115 return MP_STANDARD_ERROR;
116 else if (bytes > 0)
117 return TRUE;
118
119 if (!block) {
120 difference = deadline - GetTickCount();
121 if (difference < 0)
122 return FALSE;
123 if ((int)delay > difference)
124 delay = difference;
125 }
126
127 if (delay > 20)
128 delay = 20;
129
130 Sleep(delay);
131
132 /* check for signals */
133 Py_BLOCK_THREADS
134 res = PyErr_CheckSignals();
135 Py_UNBLOCK_THREADS
136
137 if (res)
138 return MP_EXCEPTION_HAS_BEEN_SET;
139 }
140 }
141
142 /*
143 * "connection.h" defines the PipeConnection type using the definitions above
144 */
145
146 #define CONNECTION_NAME "PipeConnection"
147 #define CONNECTION_TYPE PipeConnectionType
148
149 #include "connection.h"
150