• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Python client library to write logs to Clearcut.
16
17This class is intended to be general-purpose, usable for any Clearcut LogSource.
18
19    Typical usage example:
20
21    client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE)
22    client.log(my_event)
23    client.flush_events()
24"""
25
26import logging
27import threading
28import time
29
30try:
31    from urllib.request import urlopen
32    from urllib.request import Request
33    from urllib.request import HTTPError
34    from urllib.request import URLError
35except ImportError:
36    # for compatibility of asuite_metrics_lib_tests and asuite_cc_lib_tests.
37    from urllib2 import Request
38    from urllib2 import urlopen
39    from urllib2 import HTTPError
40    from urllib2 import URLError
41
42from proto import clientanalytics_pb2
43
44_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log'
45_DEFAULT_BUFFER_SIZE = 100  # Maximum number of events to be buffered.
46_DEFAULT_FLUSH_INTERVAL_SEC = 60  # 1 Minute.
47_BUFFER_FLUSH_RATIO = 0.5  # Flush buffer when we exceed this ratio.
48_CLIENT_TYPE = 6
49
50class Clearcut:
51    """Handles logging to Clearcut."""
52
53    def __init__(self, log_source, url=None, buffer_size=None,
54                 flush_interval_sec=None):
55        """Initializes a Clearcut client.
56
57        Args:
58            log_source: The log source.
59            url: The Clearcut url to connect to.
60            buffer_size: The size of the client buffer in number of events.
61            flush_interval_sec: The flush interval in seconds.
62        """
63        self._clearcut_url = url if url else _CLEARCUT_PROD_URL
64        self._log_source = log_source
65        self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE
66        self._pending_events = []
67        if flush_interval_sec:
68            self._flush_interval_sec = flush_interval_sec
69        else:
70            self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC
71        self._pending_events_lock = threading.Lock()
72        self._scheduled_flush_thread = None
73        self._scheduled_flush_time = float('inf')
74        self._min_next_request_time = 0
75
76    def log(self, event):
77        """Logs events to Clearcut.
78
79        Logging an event can potentially trigger a flush of queued events.
80        Flushing is triggered when the buffer is more than half full or
81        after the flush interval has passed.
82
83        Args:
84          event: A LogEvent to send to Clearcut.
85        """
86        self._append_events_to_buffer([event])
87
88    def flush_events(self):
89        """ Cancel whatever is scheduled and schedule an immediate flush."""
90        if self._scheduled_flush_thread:
91            self._scheduled_flush_thread.cancel()
92        self._min_next_request_time = 0
93        self._schedule_flush_thread(0)
94
95    def _serialize_events_to_proto(self, events):
96        log_request = clientanalytics_pb2.LogRequest()
97        log_request.request_time_ms = int(time.time() * 1000)
98        # pylint: disable=no-member
99        log_request.client_info.client_type = _CLIENT_TYPE
100        log_request.log_source = self._log_source
101        log_request.log_event.extend(events)
102        return log_request
103
104    def _append_events_to_buffer(self, events, retry=False):
105        with self._pending_events_lock:
106            self._pending_events.extend(events)
107            if len(self._pending_events) > self._buffer_size:
108                index = len(self._pending_events) - self._buffer_size
109                del self._pending_events[:index]
110            self._schedule_flush(retry)
111
112    def _schedule_flush(self, retry):
113        if (not retry
114                and len(self._pending_events) >= int(self._buffer_size *
115                                                     _BUFFER_FLUSH_RATIO)
116                and self._scheduled_flush_time > time.time()):
117            # Cancel whatever is scheduled and schedule an immediate flush.
118            if self._scheduled_flush_thread:
119                self._scheduled_flush_thread.cancel()
120            self._schedule_flush_thread(0)
121        elif self._pending_events and not self._scheduled_flush_thread:
122            # Schedule a flush to run later.
123            self._schedule_flush_thread(self._flush_interval_sec)
124
125    def _schedule_flush_thread(self, time_from_now):
126        min_wait_sec = self._min_next_request_time - time.time()
127        if min_wait_sec > time_from_now:
128            time_from_now = min_wait_sec
129        logging.debug('Scheduling thread to run in %f seconds', time_from_now)
130        self._scheduled_flush_thread = threading.Timer(
131            time_from_now, self._flush)
132        self._scheduled_flush_time = time.time() + time_from_now
133        self._scheduled_flush_thread.start()
134
135    def _flush(self):
136        """Flush buffered events to Clearcut.
137
138        If the sent request is unsuccessful, the events will be appended to
139        buffer and rescheduled for next flush.
140        """
141        with self._pending_events_lock:
142            self._scheduled_flush_time = float('inf')
143            self._scheduled_flush_thread = None
144            events = self._pending_events
145            self._pending_events = []
146        if self._min_next_request_time > time.time():
147            self._append_events_to_buffer(events, retry=True)
148            return
149        log_request = self._serialize_events_to_proto(events)
150        self._send_to_clearcut(log_request.SerializeToString())
151
152    #pylint: disable=broad-except
153    def _send_to_clearcut(self, data):
154        """Sends a POST request with data as the body.
155
156        Args:
157            data: The serialized proto to send to Clearcut.
158        """
159        request = Request(self._clearcut_url, data=data)
160        try:
161            response = urlopen(request)
162            msg = response.read()
163            logging.debug('LogRequest successfully sent to Clearcut.')
164            log_response = clientanalytics_pb2.LogResponse()
165            log_response.ParseFromString(msg)
166            # pylint: disable=no-member
167            # Throttle based on next_request_wait_millis value.
168            self._min_next_request_time = (log_response.next_request_wait_millis
169                                           / 1000 + time.time())
170            logging.debug('LogResponse: %s', log_response)
171        except HTTPError as e:
172            logging.debug('Failed to push events to Clearcut. Error code: %d',
173                          e.code)
174        except URLError:
175            logging.debug('Failed to push events to Clearcut.')
176        except Exception as e:
177            logging.debug(e)
178