1 /*
2 * A type which wraps a socket
3 *
4 * socket_connection.c
5 *
6 * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7 */
8
9 #include "multiprocessing.h"
10
11 #if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
12 # include "poll.h"
13 #endif
14
15 #ifdef MS_WINDOWS
16 # define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0)
17 # define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0)
18 # define CLOSE(h) closesocket((SOCKET)h)
19 #else
20 # define WRITE(h, buffer, length) write(h, buffer, length)
21 # define READ(h, buffer, length) read(h, buffer, length)
22 # define CLOSE(h) close(h)
23 #endif
24
25 /*
26 * Wrapper for PyErr_CheckSignals() which can be called without the GIL
27 */
28
29 static int
check_signals(void)30 check_signals(void)
31 {
32 PyGILState_STATE state;
33 int res;
34 state = PyGILState_Ensure();
35 res = PyErr_CheckSignals();
36 PyGILState_Release(state);
37 return res;
38 }
39
40 /*
41 * Send string to file descriptor
42 */
43
44 static Py_ssize_t
_conn_sendall(HANDLE h,char * string,size_t length)45 _conn_sendall(HANDLE h, char *string, size_t length)
46 {
47 char *p = string;
48 Py_ssize_t res;
49
50 while (length > 0) {
51 res = WRITE(h, p, length);
52 if (res < 0) {
53 if (errno == EINTR) {
54 if (check_signals() < 0)
55 return MP_EXCEPTION_HAS_BEEN_SET;
56 continue;
57 }
58 return MP_SOCKET_ERROR;
59 }
60 length -= res;
61 p += res;
62 }
63
64 return MP_SUCCESS;
65 }
66
67 /*
68 * Receive string of exact length from file descriptor
69 */
70
71 static Py_ssize_t
_conn_recvall(HANDLE h,char * buffer,size_t length)72 _conn_recvall(HANDLE h, char *buffer, size_t length)
73 {
74 size_t remaining = length;
75 Py_ssize_t temp;
76 char *p = buffer;
77
78 while (remaining > 0) {
79 temp = READ(h, p, remaining);
80 if (temp < 0) {
81 if (errno == EINTR) {
82 if (check_signals() < 0)
83 return MP_EXCEPTION_HAS_BEEN_SET;
84 continue;
85 }
86 return temp;
87 }
88 else if (temp == 0) {
89 return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
90 }
91 remaining -= temp;
92 p += temp;
93 }
94
95 return MP_SUCCESS;
96 }
97
98 /*
99 * Send a string prepended by the string length in network byte order
100 */
101
102 static Py_ssize_t
conn_send_string(ConnectionObject * conn,char * string,size_t length)103 conn_send_string(ConnectionObject *conn, char *string, size_t length)
104 {
105 Py_ssize_t res;
106 /* The "header" of the message is a 32 bit unsigned number (in
107 network order) which specifies the length of the "body". If
108 the message is shorter than about 16kb then it is quicker to
109 combine the "header" and the "body" of the message and send
110 them at once. */
111 if (length < (16*1024)) {
112 char *message;
113
114 message = PyMem_Malloc(length+4);
115 if (message == NULL)
116 return MP_MEMORY_ERROR;
117
118 *(UINT32*)message = htonl((UINT32)length);
119 memcpy(message+4, string, length);
120 Py_BEGIN_ALLOW_THREADS
121 res = _conn_sendall(conn->handle, message, length+4);
122 Py_END_ALLOW_THREADS
123 PyMem_Free(message);
124 } else {
125 UINT32 lenbuff;
126
127 if (length > MAX_MESSAGE_LENGTH)
128 return MP_BAD_MESSAGE_LENGTH;
129
130 lenbuff = htonl((UINT32)length);
131 Py_BEGIN_ALLOW_THREADS
132 res = _conn_sendall(conn->handle, (char*)&lenbuff, 4) ||
133 _conn_sendall(conn->handle, string, length);
134 Py_END_ALLOW_THREADS
135 }
136 return res;
137 }
138
139 /*
140 * Attempts to read into buffer, or failing that into *newbuffer
141 *
142 * Returns number of bytes read.
143 */
144
145 static Py_ssize_t
conn_recv_string(ConnectionObject * conn,char * buffer,size_t buflength,char ** newbuffer,size_t maxlength)146 conn_recv_string(ConnectionObject *conn, char *buffer,
147 size_t buflength, char **newbuffer, size_t maxlength)
148 {
149 Py_ssize_t res;
150 UINT32 ulength;
151
152 *newbuffer = NULL;
153
154 Py_BEGIN_ALLOW_THREADS
155 res = _conn_recvall(conn->handle, (char*)&ulength, 4);
156 Py_END_ALLOW_THREADS
157 if (res < 0)
158 return res;
159
160 ulength = ntohl(ulength);
161 if (ulength > maxlength)
162 return MP_BAD_MESSAGE_LENGTH;
163
164 if (ulength > buflength) {
165 *newbuffer = buffer = PyMem_Malloc((size_t)ulength);
166 if (buffer == NULL)
167 return MP_MEMORY_ERROR;
168 }
169
170 Py_BEGIN_ALLOW_THREADS
171 res = _conn_recvall(conn->handle, buffer, (size_t)ulength);
172 Py_END_ALLOW_THREADS
173
174 if (res >= 0) {
175 res = (Py_ssize_t)ulength;
176 } else if (*newbuffer != NULL) {
177 PyMem_Free(*newbuffer);
178 *newbuffer = NULL;
179 }
180 return res;
181 }
182
183 /*
184 * Check whether any data is available for reading -- neg timeout blocks
185 */
186
187 static int
conn_poll(ConnectionObject * conn,double timeout,PyThreadState * _save)188 conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
189 {
190 #if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
191 int res;
192 struct pollfd p;
193
194 p.fd = (int)conn->handle;
195 p.events = POLLIN | POLLPRI;
196 p.revents = 0;
197
198 if (timeout < 0) {
199 do {
200 res = poll(&p, 1, -1);
201 } while (res < 0 && errno == EINTR);
202 } else {
203 res = poll(&p, 1, (int)(timeout * 1000 + 0.5));
204 if (res < 0 && errno == EINTR) {
205 /* We were interrupted by a signal. Just indicate a
206 timeout even though we are early. */
207 return FALSE;
208 }
209 }
210
211 if (res < 0) {
212 return MP_SOCKET_ERROR;
213 } else if (p.revents & (POLLNVAL|POLLERR)) {
214 Py_BLOCK_THREADS
215 PyErr_SetString(PyExc_IOError, "poll() gave POLLNVAL or POLLERR");
216 Py_UNBLOCK_THREADS
217 return MP_EXCEPTION_HAS_BEEN_SET;
218 } else if (p.revents != 0) {
219 return TRUE;
220 } else {
221 assert(res == 0);
222 return FALSE;
223 }
224 #else
225 int res;
226 fd_set rfds;
227
228 /*
229 * Verify the handle, issue 3321. Not required for windows.
230 */
231 #ifndef MS_WINDOWS
232 if (((int)conn->handle) < 0 || ((int)conn->handle) >= FD_SETSIZE) {
233 Py_BLOCK_THREADS
234 PyErr_SetString(PyExc_IOError, "handle out of range in select()");
235 Py_UNBLOCK_THREADS
236 return MP_EXCEPTION_HAS_BEEN_SET;
237 }
238 #endif
239
240 FD_ZERO(&rfds);
241 FD_SET((SOCKET)conn->handle, &rfds);
242
243 if (timeout < 0.0) {
244 do {
245 res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
246 } while (res < 0 && errno == EINTR);
247 } else {
248 struct timeval tv;
249 tv.tv_sec = (long)timeout;
250 tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
251 res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv);
252 if (res < 0 && errno == EINTR) {
253 /* We were interrupted by a signal. Just indicate a
254 timeout even though we are early. */
255 return FALSE;
256 }
257 }
258
259 if (res < 0) {
260 return MP_SOCKET_ERROR;
261 } else if (FD_ISSET(conn->handle, &rfds)) {
262 return TRUE;
263 } else {
264 assert(res == 0);
265 return FALSE;
266 }
267 #endif
268 }
269
270 /*
271 * "connection.h" defines the Connection type using defs above
272 */
273
274 #define CONNECTION_NAME "Connection"
275 #define CONNECTION_TYPE ConnectionType
276
277 #include "connection.h"
278