1# Copyright 2018, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Python client library to write logs to Clearcut. 16 17This class is intended to be general-purpose, usable for any Clearcut LogSource. 18 19 Typical usage example: 20 21 client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE) 22 client.log(my_event) 23 client.flush_events() 24""" 25 26import logging 27import threading 28import time 29 30try: 31 from urllib.request import urlopen 32 from urllib.request import Request 33 from urllib.request import HTTPError 34 from urllib.request import URLError 35except ImportError: 36 # for compatibility of asuite_metrics_lib_tests and asuite_cc_lib_tests. 37 from urllib2 import Request 38 from urllib2 import urlopen 39 from urllib2 import HTTPError 40 from urllib2 import URLError 41 42from proto import clientanalytics_pb2 43 44_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log' 45_DEFAULT_BUFFER_SIZE = 100 # Maximum number of events to be buffered. 46_DEFAULT_FLUSH_INTERVAL_SEC = 60 # 1 Minute. 47_BUFFER_FLUSH_RATIO = 0.5 # Flush buffer when we exceed this ratio. 48_CLIENT_TYPE = 6 49 50class Clearcut: 51 """Handles logging to Clearcut.""" 52 53 def __init__(self, log_source, url=None, buffer_size=None, 54 flush_interval_sec=None): 55 """Initializes a Clearcut client. 56 57 Args: 58 log_source: The log source. 59 url: The Clearcut url to connect to. 60 buffer_size: The size of the client buffer in number of events. 61 flush_interval_sec: The flush interval in seconds. 62 """ 63 self._clearcut_url = url if url else _CLEARCUT_PROD_URL 64 self._log_source = log_source 65 self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE 66 self._pending_events = [] 67 if flush_interval_sec: 68 self._flush_interval_sec = flush_interval_sec 69 else: 70 self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC 71 self._pending_events_lock = threading.Lock() 72 self._scheduled_flush_thread = None 73 self._scheduled_flush_time = float('inf') 74 self._min_next_request_time = 0 75 76 def log(self, event): 77 """Logs events to Clearcut. 78 79 Logging an event can potentially trigger a flush of queued events. 80 Flushing is triggered when the buffer is more than half full or 81 after the flush interval has passed. 82 83 Args: 84 event: A LogEvent to send to Clearcut. 85 """ 86 self._append_events_to_buffer([event]) 87 88 def flush_events(self): 89 """ Cancel whatever is scheduled and schedule an immediate flush.""" 90 if self._scheduled_flush_thread: 91 self._scheduled_flush_thread.cancel() 92 self._min_next_request_time = 0 93 self._schedule_flush_thread(0) 94 95 def _serialize_events_to_proto(self, events): 96 log_request = clientanalytics_pb2.LogRequest() 97 log_request.request_time_ms = int(time.time() * 1000) 98 # pylint: disable=no-member 99 log_request.client_info.client_type = _CLIENT_TYPE 100 log_request.log_source = self._log_source 101 log_request.log_event.extend(events) 102 return log_request 103 104 def _append_events_to_buffer(self, events, retry=False): 105 with self._pending_events_lock: 106 self._pending_events.extend(events) 107 if len(self._pending_events) > self._buffer_size: 108 index = len(self._pending_events) - self._buffer_size 109 del self._pending_events[:index] 110 self._schedule_flush(retry) 111 112 def _schedule_flush(self, retry): 113 if (not retry 114 and len(self._pending_events) >= int(self._buffer_size * 115 _BUFFER_FLUSH_RATIO) 116 and self._scheduled_flush_time > time.time()): 117 # Cancel whatever is scheduled and schedule an immediate flush. 118 if self._scheduled_flush_thread: 119 self._scheduled_flush_thread.cancel() 120 self._schedule_flush_thread(0) 121 elif self._pending_events and not self._scheduled_flush_thread: 122 # Schedule a flush to run later. 123 self._schedule_flush_thread(self._flush_interval_sec) 124 125 def _schedule_flush_thread(self, time_from_now): 126 min_wait_sec = self._min_next_request_time - time.time() 127 if min_wait_sec > time_from_now: 128 time_from_now = min_wait_sec 129 logging.debug('Scheduling thread to run in %f seconds', time_from_now) 130 self._scheduled_flush_thread = threading.Timer( 131 time_from_now, self._flush) 132 self._scheduled_flush_time = time.time() + time_from_now 133 self._scheduled_flush_thread.start() 134 135 def _flush(self): 136 """Flush buffered events to Clearcut. 137 138 If the sent request is unsuccessful, the events will be appended to 139 buffer and rescheduled for next flush. 140 """ 141 with self._pending_events_lock: 142 self._scheduled_flush_time = float('inf') 143 self._scheduled_flush_thread = None 144 events = self._pending_events 145 self._pending_events = [] 146 if self._min_next_request_time > time.time(): 147 self._append_events_to_buffer(events, retry=True) 148 return 149 log_request = self._serialize_events_to_proto(events) 150 self._send_to_clearcut(log_request.SerializeToString()) 151 152 #pylint: disable=broad-except 153 def _send_to_clearcut(self, data): 154 """Sends a POST request with data as the body. 155 156 Args: 157 data: The serialized proto to send to Clearcut. 158 """ 159 request = Request(self._clearcut_url, data=data) 160 try: 161 response = urlopen(request) 162 msg = response.read() 163 logging.debug('LogRequest successfully sent to Clearcut.') 164 log_response = clientanalytics_pb2.LogResponse() 165 log_response.ParseFromString(msg) 166 # pylint: disable=no-member 167 # Throttle based on next_request_wait_millis value. 168 self._min_next_request_time = (log_response.next_request_wait_millis 169 / 1000 + time.time()) 170 logging.debug('LogResponse: %s', log_response) 171 except HTTPError as e: 172 logging.debug('Failed to push events to Clearcut. Error code: %d', 173 e.code) 174 except URLError: 175 logging.debug('Failed to push events to Clearcut.') 176 except Exception as e: 177 logging.debug(e) 178