• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Python client library to write logs to Clearcut.
16
17This class is intended to be general-purpose, usable for any Clearcut LogSource.
18
19    Typical usage example:
20
21    client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE)
22    client.log(my_event)
23    client.flush_events()
24"""
25
26import logging
27import ssl
28import threading
29import time
30
31try:
32    from urllib.request import urlopen
33    from urllib.request import Request
34    from urllib.request import HTTPError
35    from urllib.request import URLError
36except ImportError:
37    # for compatibility of asuite_metrics_lib_tests and asuite_cc_lib_tests.
38    from urllib2 import Request
39    from urllib2 import urlopen
40    from urllib2 import HTTPError
41    from urllib2 import URLError
42
43from atest.proto import clientanalytics_pb2
44
45_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log'
46_DEFAULT_BUFFER_SIZE = 100  # Maximum number of events to be buffered.
47_DEFAULT_FLUSH_INTERVAL_SEC = 60  # 1 Minute.
48_BUFFER_FLUSH_RATIO = 0.5  # Flush buffer when we exceed this ratio.
49_CLIENT_TYPE = 6
50
51class Clearcut:
52    """Handles logging to Clearcut."""
53
54    def __init__(self, log_source, url=None, buffer_size=None,
55                 flush_interval_sec=None):
56        """Initializes a Clearcut client.
57
58        Args:
59            log_source: The log source.
60            url: The Clearcut url to connect to.
61            buffer_size: The size of the client buffer in number of events.
62            flush_interval_sec: The flush interval in seconds.
63        """
64        self._clearcut_url = url if url else _CLEARCUT_PROD_URL
65        self._log_source = log_source
66        self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE
67        self._pending_events = []
68        if flush_interval_sec:
69            self._flush_interval_sec = flush_interval_sec
70        else:
71            self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC
72        self._pending_events_lock = threading.Lock()
73        self._scheduled_flush_thread = None
74        self._scheduled_flush_time = float('inf')
75        self._min_next_request_time = 0
76
77    def log(self, event):
78        """Logs events to Clearcut.
79
80        Logging an event can potentially trigger a flush of queued events.
81        Flushing is triggered when the buffer is more than half full or
82        after the flush interval has passed.
83
84        Args:
85          event: A LogEvent to send to Clearcut.
86        """
87        self._append_events_to_buffer([event])
88
89    def flush_events(self):
90        """ Cancel whatever is scheduled and schedule an immediate flush."""
91        if self._scheduled_flush_thread:
92            self._scheduled_flush_thread.cancel()
93        self._min_next_request_time = 0
94        self._schedule_flush_thread(0)
95
96    def _serialize_events_to_proto(self, events):
97        log_request = clientanalytics_pb2.LogRequest()
98        log_request.request_time_ms = int(time.time() * 1000)
99        # pylint: disable=no-member
100        log_request.client_info.client_type = _CLIENT_TYPE
101        log_request.log_source = self._log_source
102        log_request.log_event.extend(events)
103        return log_request
104
105    def _append_events_to_buffer(self, events, retry=False):
106        with self._pending_events_lock:
107            self._pending_events.extend(events)
108            if len(self._pending_events) > self._buffer_size:
109                index = len(self._pending_events) - self._buffer_size
110                del self._pending_events[:index]
111            self._schedule_flush(retry)
112
113    def _schedule_flush(self, retry):
114        if (not retry
115                and len(self._pending_events) >= int(self._buffer_size *
116                                                     _BUFFER_FLUSH_RATIO)
117                and self._scheduled_flush_time > time.time()):
118            # Cancel whatever is scheduled and schedule an immediate flush.
119            if self._scheduled_flush_thread:
120                self._scheduled_flush_thread.cancel()
121            self._schedule_flush_thread(0)
122        elif self._pending_events and not self._scheduled_flush_thread:
123            # Schedule a flush to run later.
124            self._schedule_flush_thread(self._flush_interval_sec)
125
126    def _schedule_flush_thread(self, time_from_now):
127        min_wait_sec = self._min_next_request_time - time.time()
128        if min_wait_sec > time_from_now:
129            time_from_now = min_wait_sec
130        logging.debug('Scheduling thread to run in %f seconds', time_from_now)
131        self._scheduled_flush_thread = threading.Timer(
132            time_from_now, self._flush)
133        self._scheduled_flush_time = time.time() + time_from_now
134        self._scheduled_flush_thread.start()
135
136    def _flush(self):
137        """Flush buffered events to Clearcut.
138
139        If the sent request is unsuccessful, the events will be appended to
140        buffer and rescheduled for next flush.
141        """
142        with self._pending_events_lock:
143            self._scheduled_flush_time = float('inf')
144            self._scheduled_flush_thread = None
145            events = self._pending_events
146            self._pending_events = []
147        if self._min_next_request_time > time.time():
148            self._append_events_to_buffer(events, retry=True)
149            return
150        log_request = self._serialize_events_to_proto(events)
151        self._send_to_clearcut(log_request.SerializeToString())
152
153    #pylint: disable=broad-except
154    #pylint: disable=protected-access
155    def _send_to_clearcut(self, data):
156        """Sends a POST request with data as the body.
157
158        Args:
159            data: The serialized proto to send to Clearcut.
160        """
161        request = Request(self._clearcut_url, data=data)
162        try:
163            ssl._create_default_https_context = ssl._create_unverified_context
164            response = urlopen(request)
165            msg = response.read()
166            logging.debug('LogRequest successfully sent to Clearcut.')
167            log_response = clientanalytics_pb2.LogResponse()
168            log_response.ParseFromString(msg)
169            # pylint: disable=no-member
170            # Throttle based on next_request_wait_millis value.
171            self._min_next_request_time = (log_response.next_request_wait_millis
172                                           / 1000 + time.time())
173            logging.debug('LogResponse: %s', log_response)
174        except HTTPError as e:
175            logging.warning('Failed to push events to Clearcut. Error code: %d',
176                          e.code)
177        except URLError as e:
178            logging.warning('Failed to push events to Clearcut. Reason: %s', e)
179        except Exception as e:
180            logging.warning(e)
181