1# Copyright 2018, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Python client library to write logs to Clearcut. 16 17This class is intended to be general-purpose, usable for any Clearcut LogSource. 18 19 Typical usage example: 20 21 client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE) 22 client.log(my_event) 23 client.flush_events() 24""" 25 26import logging 27import ssl 28import threading 29import time 30 31try: 32 from urllib.request import urlopen 33 from urllib.request import Request 34 from urllib.request import HTTPError 35 from urllib.request import URLError 36except ImportError: 37 # for compatibility of asuite_metrics_lib_tests and asuite_cc_lib_tests. 38 from urllib2 import Request 39 from urllib2 import urlopen 40 from urllib2 import HTTPError 41 from urllib2 import URLError 42 43from atest.proto import clientanalytics_pb2 44 45_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log' 46_DEFAULT_BUFFER_SIZE = 100 # Maximum number of events to be buffered. 47_DEFAULT_FLUSH_INTERVAL_SEC = 60 # 1 Minute. 48_BUFFER_FLUSH_RATIO = 0.5 # Flush buffer when we exceed this ratio. 49_CLIENT_TYPE = 6 50 51class Clearcut: 52 """Handles logging to Clearcut.""" 53 54 def __init__(self, log_source, url=None, buffer_size=None, 55 flush_interval_sec=None): 56 """Initializes a Clearcut client. 57 58 Args: 59 log_source: The log source. 60 url: The Clearcut url to connect to. 61 buffer_size: The size of the client buffer in number of events. 62 flush_interval_sec: The flush interval in seconds. 63 """ 64 self._clearcut_url = url if url else _CLEARCUT_PROD_URL 65 self._log_source = log_source 66 self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE 67 self._pending_events = [] 68 if flush_interval_sec: 69 self._flush_interval_sec = flush_interval_sec 70 else: 71 self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC 72 self._pending_events_lock = threading.Lock() 73 self._scheduled_flush_thread = None 74 self._scheduled_flush_time = float('inf') 75 self._min_next_request_time = 0 76 77 def log(self, event): 78 """Logs events to Clearcut. 79 80 Logging an event can potentially trigger a flush of queued events. 81 Flushing is triggered when the buffer is more than half full or 82 after the flush interval has passed. 83 84 Args: 85 event: A LogEvent to send to Clearcut. 86 """ 87 self._append_events_to_buffer([event]) 88 89 def flush_events(self): 90 """ Cancel whatever is scheduled and schedule an immediate flush.""" 91 if self._scheduled_flush_thread: 92 self._scheduled_flush_thread.cancel() 93 self._min_next_request_time = 0 94 self._schedule_flush_thread(0) 95 96 def _serialize_events_to_proto(self, events): 97 log_request = clientanalytics_pb2.LogRequest() 98 log_request.request_time_ms = int(time.time() * 1000) 99 # pylint: disable=no-member 100 log_request.client_info.client_type = _CLIENT_TYPE 101 log_request.log_source = self._log_source 102 log_request.log_event.extend(events) 103 return log_request 104 105 def _append_events_to_buffer(self, events, retry=False): 106 with self._pending_events_lock: 107 self._pending_events.extend(events) 108 if len(self._pending_events) > self._buffer_size: 109 index = len(self._pending_events) - self._buffer_size 110 del self._pending_events[:index] 111 self._schedule_flush(retry) 112 113 def _schedule_flush(self, retry): 114 if (not retry 115 and len(self._pending_events) >= int(self._buffer_size * 116 _BUFFER_FLUSH_RATIO) 117 and self._scheduled_flush_time > time.time()): 118 # Cancel whatever is scheduled and schedule an immediate flush. 119 if self._scheduled_flush_thread: 120 self._scheduled_flush_thread.cancel() 121 self._schedule_flush_thread(0) 122 elif self._pending_events and not self._scheduled_flush_thread: 123 # Schedule a flush to run later. 124 self._schedule_flush_thread(self._flush_interval_sec) 125 126 def _schedule_flush_thread(self, time_from_now): 127 min_wait_sec = self._min_next_request_time - time.time() 128 if min_wait_sec > time_from_now: 129 time_from_now = min_wait_sec 130 logging.debug('Scheduling thread to run in %f seconds', time_from_now) 131 self._scheduled_flush_thread = threading.Timer( 132 time_from_now, self._flush) 133 self._scheduled_flush_time = time.time() + time_from_now 134 self._scheduled_flush_thread.start() 135 136 def _flush(self): 137 """Flush buffered events to Clearcut. 138 139 If the sent request is unsuccessful, the events will be appended to 140 buffer and rescheduled for next flush. 141 """ 142 with self._pending_events_lock: 143 self._scheduled_flush_time = float('inf') 144 self._scheduled_flush_thread = None 145 events = self._pending_events 146 self._pending_events = [] 147 if self._min_next_request_time > time.time(): 148 self._append_events_to_buffer(events, retry=True) 149 return 150 log_request = self._serialize_events_to_proto(events) 151 self._send_to_clearcut(log_request.SerializeToString()) 152 153 #pylint: disable=broad-except 154 #pylint: disable=protected-access 155 def _send_to_clearcut(self, data): 156 """Sends a POST request with data as the body. 157 158 Args: 159 data: The serialized proto to send to Clearcut. 160 """ 161 request = Request(self._clearcut_url, data=data) 162 try: 163 ssl._create_default_https_context = ssl._create_unverified_context 164 response = urlopen(request) 165 msg = response.read() 166 logging.debug('LogRequest successfully sent to Clearcut.') 167 log_response = clientanalytics_pb2.LogResponse() 168 log_response.ParseFromString(msg) 169 # pylint: disable=no-member 170 # Throttle based on next_request_wait_millis value. 171 self._min_next_request_time = (log_response.next_request_wait_millis 172 / 1000 + time.time()) 173 logging.debug('LogResponse: %s', log_response) 174 except HTTPError as e: 175 logging.warning('Failed to push events to Clearcut. Error code: %d', 176 e.code) 177 except URLError as e: 178 logging.warning('Failed to push events to Clearcut. Reason: %s', e) 179 except Exception as e: 180 logging.warning(e) 181