1 /*
2 * Copyright (C) 2018, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "statsd_writer.h"
17
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <inttypes.h>
21 #include <poll.h>
22 #include <private/android_logger.h>
23 #include <stdarg.h>
24 #include <stdatomic.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/stat.h>
29 #include <sys/types.h>
30 #include <sys/uio.h>
31 #include <sys/un.h>
32 #include <time.h>
33 #include <unistd.h>
34
35
36 // Compatibility shims for glibc-2.17 in the Android tree.
37 #ifndef __BIONIC__
38
39 // gettid() is not present in unistd.h for glibc-2.17.
40 extern pid_t gettid();
41
42 // TEMP_FAILURE_RETRY is not present in unistd.h for glibc-2.17.
43 #ifndef TEMP_FAILURE_RETRY
44 #define TEMP_FAILURE_RETRY(exp) ({ \
45 __typeof__(exp) _rc; \
46 do { \
47 _rc = (exp); \
48 } while (_rc == -1 && errno == EINTR); \
49 _rc; })
50 #endif // TEMP_FAILURE_RETRY
51
52 #endif // __BIONIC__
53
54 static pthread_mutex_t log_init_lock = PTHREAD_MUTEX_INITIALIZER;
55 static atomic_int dropped = 0;
56 static atomic_int log_error = 0;
57 static atomic_int atom_tag = 0;
58
statsd_writer_init_lock()59 void statsd_writer_init_lock() {
60 /*
61 * If we trigger a signal handler in the middle of locked activity and the
62 * signal handler logs a message, we could get into a deadlock state.
63 */
64 pthread_mutex_lock(&log_init_lock);
65 }
66
statd_writer_trylock()67 int statd_writer_trylock() {
68 return pthread_mutex_trylock(&log_init_lock);
69 }
70
statsd_writer_init_unlock()71 void statsd_writer_init_unlock() {
72 pthread_mutex_unlock(&log_init_lock);
73 }
74
75 static int statsdAvailable();
76 static int statsdOpen();
77 static void statsdClose();
78 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr);
79 static void statsdNoteDrop();
80 static int statsdIsClosed();
81
82 struct android_log_transport_write statsdLoggerWrite = {
83 .name = "statsd",
84 .sock = -EBADF,
85 .available = statsdAvailable,
86 .open = statsdOpen,
87 .close = statsdClose,
88 .write = statsdWrite,
89 .noteDrop = statsdNoteDrop,
90 .isClosed = statsdIsClosed,
91 };
92
93 /* log_init_lock assumed */
statsdOpen()94 static int statsdOpen() {
95 int i, ret = 0;
96
97 i = atomic_load(&statsdLoggerWrite.sock);
98 if (i < 0) {
99 int flags = SOCK_DGRAM;
100 #ifdef SOCK_CLOEXEC
101 flags |= SOCK_CLOEXEC;
102 #endif
103 #ifdef SOCK_NONBLOCK
104 flags |= SOCK_NONBLOCK;
105 #endif
106 int sock = TEMP_FAILURE_RETRY(socket(PF_UNIX, flags, 0));
107 if (sock < 0) {
108 ret = -errno;
109 } else {
110 int sndbuf = 1 * 1024 * 1024; // set max send buffer size 1MB
111 socklen_t bufLen = sizeof(sndbuf);
112 // SO_RCVBUF does not have an effect on unix domain socket, but SO_SNDBUF does.
113 // Proceed to connect even setsockopt fails.
114 setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, bufLen);
115 struct sockaddr_un un;
116 memset(&un, 0, sizeof(struct sockaddr_un));
117 un.sun_family = AF_UNIX;
118 strcpy(un.sun_path, "/dev/socket/statsdw");
119
120 if (TEMP_FAILURE_RETRY(
121 connect(sock, (struct sockaddr*)&un, sizeof(struct sockaddr_un))) < 0) {
122 ret = -errno;
123 switch (ret) {
124 case -ENOTCONN:
125 case -ECONNREFUSED:
126 case -ENOENT:
127 i = atomic_exchange(&statsdLoggerWrite.sock, ret);
128 /* FALLTHRU */
129 default:
130 break;
131 }
132 close(sock);
133 } else {
134 ret = atomic_exchange(&statsdLoggerWrite.sock, sock);
135 if ((ret >= 0) && (ret != sock)) {
136 close(ret);
137 }
138 ret = 0;
139 }
140 }
141 }
142
143 return ret;
144 }
145
__statsdClose(int negative_errno)146 static void __statsdClose(int negative_errno) {
147 int sock = atomic_exchange(&statsdLoggerWrite.sock, negative_errno);
148 if (sock >= 0) {
149 close(sock);
150 }
151 }
152
statsdClose()153 static void statsdClose() {
154 __statsdClose(-EBADF);
155 }
156
statsdAvailable()157 static int statsdAvailable() {
158 if (atomic_load(&statsdLoggerWrite.sock) < 0) {
159 if (access("/dev/socket/statsdw", W_OK) == 0) {
160 return 0;
161 }
162 return -EBADF;
163 }
164 return 1;
165 }
166
statsdNoteDrop(int error,int tag)167 static void statsdNoteDrop(int error, int tag) {
168 atomic_fetch_add_explicit(&dropped, 1, memory_order_relaxed);
169 atomic_exchange_explicit(&log_error, error, memory_order_relaxed);
170 atomic_exchange_explicit(&atom_tag, tag, memory_order_relaxed);
171 }
172
statsdIsClosed()173 static int statsdIsClosed() {
174 if (atomic_load(&statsdLoggerWrite.sock) < 0) {
175 return 1;
176 }
177 return 0;
178 }
179
statsdWrite(struct timespec * ts,struct iovec * vec,size_t nr)180 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr) {
181 ssize_t ret;
182 int sock;
183 static const unsigned headerLength = 1;
184 struct iovec newVec[nr + headerLength];
185 android_log_header_t header;
186 size_t i, payloadSize;
187
188 sock = atomic_load(&statsdLoggerWrite.sock);
189 if (sock < 0) switch (sock) {
190 case -ENOTCONN:
191 case -ECONNREFUSED:
192 case -ENOENT:
193 break;
194 default:
195 return -EBADF;
196 }
197 /*
198 * struct {
199 * // what we provide to socket
200 * android_log_header_t header;
201 * // caller provides
202 * union {
203 * struct {
204 * char prio;
205 * char payload[];
206 * } string;
207 * struct {
208 * uint32_t tag
209 * char payload[];
210 * } binary;
211 * };
212 * };
213 */
214
215 header.tid = gettid();
216 header.realtime.tv_sec = ts->tv_sec;
217 header.realtime.tv_nsec = ts->tv_nsec;
218
219 newVec[0].iov_base = (unsigned char*)&header;
220 newVec[0].iov_len = sizeof(header);
221
222 // If we dropped events before, try to tell statsd.
223 if (sock >= 0) {
224 int32_t snapshot = atomic_exchange_explicit(&dropped, 0, memory_order_relaxed);
225 if (snapshot) {
226 android_log_event_long_t buffer;
227 header.id = LOG_ID_STATS;
228 // store the last log error in the tag field. This tag field is not used by statsd.
229 buffer.header.tag = atomic_load(&log_error);
230 buffer.payload.type = EVENT_TYPE_LONG;
231 // format:
232 // |atom_tag|dropped_count|
233 int64_t composed_long = atomic_load(&atom_tag);
234 // Send 2 int32's via an int64.
235 composed_long = ((composed_long << 32) | ((int64_t)snapshot));
236 buffer.payload.data = composed_long;
237
238 newVec[headerLength].iov_base = &buffer;
239 newVec[headerLength].iov_len = sizeof(buffer);
240
241 ret = TEMP_FAILURE_RETRY(writev(sock, newVec, 2));
242 if (ret != (ssize_t)(sizeof(header) + sizeof(buffer))) {
243 atomic_fetch_add_explicit(&dropped, snapshot, memory_order_relaxed);
244 }
245 }
246 }
247
248 header.id = LOG_ID_STATS;
249
250 for (payloadSize = 0, i = headerLength; i < nr + headerLength; i++) {
251 newVec[i].iov_base = vec[i - headerLength].iov_base;
252 payloadSize += newVec[i].iov_len = vec[i - headerLength].iov_len;
253
254 if (payloadSize > LOGGER_ENTRY_MAX_PAYLOAD) {
255 newVec[i].iov_len -= payloadSize - LOGGER_ENTRY_MAX_PAYLOAD;
256 if (newVec[i].iov_len) {
257 ++i;
258 }
259 break;
260 }
261 }
262
263 /*
264 * The write below could be lost, but will never block.
265 *
266 * ENOTCONN occurs if statsd has died.
267 * ENOENT occurs if statsd is not running and socket is missing.
268 * ECONNREFUSED occurs if we can not reconnect to statsd.
269 * EAGAIN occurs if statsd is overloaded.
270 */
271 if (sock < 0) {
272 ret = sock;
273 } else {
274 ret = TEMP_FAILURE_RETRY(writev(sock, newVec, i));
275 if (ret < 0) {
276 ret = -errno;
277 }
278 }
279 switch (ret) {
280 case -ENOTCONN:
281 case -ECONNREFUSED:
282 case -ENOENT:
283 if (statd_writer_trylock()) {
284 return ret; /* in a signal handler? try again when less stressed
285 */
286 }
287 __statsdClose(ret);
288 ret = statsdOpen();
289 statsd_writer_init_unlock();
290
291 if (ret < 0) {
292 return ret;
293 }
294
295 ret = TEMP_FAILURE_RETRY(writev(atomic_load(&statsdLoggerWrite.sock), newVec, i));
296 if (ret < 0) {
297 ret = -errno;
298 }
299 /* FALLTHRU */
300 default:
301 break;
302 }
303
304 if (ret > (ssize_t)sizeof(header)) {
305 ret -= sizeof(header);
306 }
307
308 return ret;
309 }
310