• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5# This file defines helper functions for putting entries into elasticsearch.
6
7"""Utils for sending metadata to elasticsearch
8
9Elasticsearch is a key-value store NOSQL database.
10Source is here: https://github.com/elasticsearch/elasticsearch
11We will be using es to store our metadata.
12
13For example, if we wanted to store the following metadata:
14
15metadata = {
16    'host_id': 1
17    'job_id': 20
18    'time_start': 100000
19    'time_recorded': 100006
20}
21
22The following call will send metadata to the default es server.
23    autotest_es.post(index, metadata)
24We can also specify which port and host to use.
25
26Using for testing: Sometimes, when we choose a single index
27to put entries into, we want to clear that index of all
28entries before running our tests. Use clear_index function.
29(see es_utils_functionaltest.py for an example)
30
31This file also contains methods for sending queries to es. Currently,
32the query (json dict) we send to es is quite complicated (but flexible).
33
34For example, the below query returns job_id, host_id, and job_start
35for all job_ids in [0, 99999] and host_id matching 10.
36
37range_eq_query = {
38    'fields': ['job_id', 'host_id', 'job_start'],
39    'query': {
40        'filtered': {
41            'query': {
42                'match': {
43                    'host_id': 10,
44                }
45            }
46            'filter': {
47                'range': {
48                    'job_id': {
49                        'gte': 0,
50                        'lte': 99999,
51                    }
52                }
53            }
54        }
55    }
56}
57
58To send a query once it is created, call execute_query() to send it to the
59intended elasticsearch server. The query() function can be used to construct a
60query with certain parameters and execute it all in one call.
61
62"""
63
64import es_utils
65
66import common
67from autotest_lib.client.common_lib import global_config
68
69
70# Server and ports for elasticsearch (for metadata use only)
71METADATA_ES_SERVER = global_config.global_config.get_config_value(
72        'CROS', 'ES_HOST', default='localhost')
73ES_PORT = global_config.global_config.get_config_value(
74        'CROS', 'ES_PORT', type=int, default=9200)
75ES_UDP_PORT = global_config.global_config.get_config_value(
76        'CROS', 'ES_UDP_PORT', type=int, default=9700)
77# Whether to use http. udp is very little overhead (around 3 ms) compared to
78# using http (tcp) takes ~ 500 ms for the first connection and 50-100ms for
79# subsequent connections.
80ES_USE_HTTP = global_config.global_config.get_config_value(
81        'CROS', 'ES_USE_HTTP', type=bool, default=False)
82
83# If CLIENT/metadata_index is not set, INDEX_METADATA falls back to
84# autotest instance name (SERVER/hostname).
85INDEX_METADATA = global_config.global_config.get_config_value(
86        'CLIENT', 'metadata_index', type=str, default=None)
87if not INDEX_METADATA:
88    INDEX_METADATA = global_config.global_config.get_config_value(
89            'SERVER', 'hostname', type=str, default='localhost')
90
91# 3 Seconds before connection to esdb timeout.
92DEFAULT_TIMEOUT = 3
93
94DEFAULT_BULK_POST_RETRIES = 5
95
96def post(use_http=ES_USE_HTTP, host=METADATA_ES_SERVER, port=ES_PORT,
97         timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA, udp_port=ES_UDP_PORT,
98         *args, **kwargs):
99    """This function takes a series of arguments which are passed to the
100    es_utils.ESMetadata constructor, and any other arguments are passed to
101    its post() function. For an explanation of each, see those functions in
102    es_utils.
103    """
104    esmd = es_utils.ESMetadata(use_http=use_http, host=host, port=port,
105                               timeout=timeout, index=index, udp_port=udp_port)
106    return esmd.post(*args, **kwargs)
107
108
109def bulk_post(data_list, host=METADATA_ES_SERVER, port=ES_PORT,
110              timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA,
111              retries=DEFAULT_BULK_POST_RETRIES, *args, **kwargs):
112    """This function takes a series of arguments which are passed to the
113    es_utils.ESMetadata constructor, and a list of metadata, then upload to
114    Elasticsearch server using Elasticsearch bulk API. This can greatly nhance
115    the performance of uploading data using HTTP.
116    For an explanation of each argument, see those functions in es_utils.
117    """
118    esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
119                               timeout=timeout, index=index,
120                               udp_port=ES_UDP_PORT)
121    # bulk post may fail due to the amount of data, retry several times.
122    for _ in range(retries):
123        if esmd.bulk_post(data_list, *args, **kwargs):
124            return True
125    return False
126
127
128def execute_query(host=METADATA_ES_SERVER, port=ES_PORT,
129                  timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA,
130                  *args, **kwargs):
131    """This function takes a series of arguments which are passed to the
132    es_utils.ESMetadata constructor, and any other arguments are passed to
133    its execute_query() function. For an explanation of each, see those
134    functions in es_utils.
135    """
136    esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
137                               timeout=timeout, index=index, udp_port=0)
138    return esmd.execute_query(*args, **kwargs)
139
140
141def query(host=METADATA_ES_SERVER, port=ES_PORT, timeout=DEFAULT_TIMEOUT,
142          index=INDEX_METADATA, *args, **kwargs):
143    """This function takes a series of arguments which are passed to the
144    es_utils.ESMetadata constructor, and any other arguments are passed to
145    its query() function. For an explanation of each, see those functions in
146    es_utils.
147    """
148    esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
149                               timeout=timeout, index=index, udp_port=0)
150    return esmd.query(*args, **kwargs)
151