• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/server/load_reporter/load_data_store.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/port_platform.h>
23 #include <gtest/gtest.h>
24 
25 #include <set>
26 #include <vector>
27 
28 #include "src/cpp/server/load_reporter/constants.h"
29 #include "test/core/test_util/port.h"
30 #include "test/core/test_util/test_config.h"
31 
32 namespace grpc {
33 namespace testing {
34 namespace {
35 
36 using ::grpc::load_reporter::CallMetricValue;
37 using ::grpc::load_reporter::kInvalidLbId;
38 using ::grpc::load_reporter::LoadDataStore;
39 using ::grpc::load_reporter::LoadRecordKey;
40 using ::grpc::load_reporter::LoadRecordValue;
41 using ::grpc::load_reporter::PerBalancerStore;
42 
43 class LoadDataStoreTest : public ::testing::Test {
44  public:
LoadDataStoreTest()45   LoadDataStoreTest()
46       : kKey1(kLbId1, kLbTag1, kUser1, kClientIp1),
47         kKey2(kLbId2, kLbTag2, kUser2, kClientIp2) {}
48 
49   // Check whether per_balancer_stores contains a store which was originally
50   // created for <hostname, lb_id, and load_key>.
PerBalancerStoresContains(const LoadDataStore & load_data_store,const std::set<PerBalancerStore * > * per_balancer_stores,const std::string & hostname,const std::string & lb_id,const std::string & load_key)51   bool PerBalancerStoresContains(
52       const LoadDataStore& load_data_store,
53       const std::set<PerBalancerStore*>* per_balancer_stores,
54       const std::string& hostname, const std::string& lb_id,
55       const std::string& load_key) {
56     auto original_per_balancer_store =
57         load_data_store.FindPerBalancerStore(hostname, lb_id);
58     EXPECT_NE(original_per_balancer_store, nullptr);
59     EXPECT_EQ(original_per_balancer_store->lb_id(), lb_id);
60     EXPECT_EQ(original_per_balancer_store->load_key(), load_key);
61     for (auto per_balancer_store : *per_balancer_stores) {
62       if (per_balancer_store == original_per_balancer_store) {
63         return true;
64       }
65     }
66     return false;
67   }
68 
FormatLbId(size_t index)69   std::string FormatLbId(size_t index) {
70     return "kLbId" + std::to_string(index);
71   }
72 
73   const std::string kHostname1 = "kHostname1";
74   const std::string kHostname2 = "kHostname2";
75   const std::string kLbId1 = "kLbId1";
76   const std::string kLbId2 = "kLbId2";
77   const std::string kLbId3 = "kLbId3";
78   const std::string kLbId4 = "kLbId4";
79   const std::string kLoadKey1 = "kLoadKey1";
80   const std::string kLoadKey2 = "kLoadKey2";
81   const std::string kLbTag1 = "kLbTag1";
82   const std::string kLbTag2 = "kLbTag2";
83   const std::string kUser1 = "kUser1";
84   const std::string kUser2 = "kUser2";
85   const std::string kClientIp1 = "00";
86   const std::string kClientIp2 = "02";
87   const std::string kMetric1 = "kMetric1";
88   const std::string kMetric2 = "kMetric2";
89   const LoadRecordKey kKey1;
90   const LoadRecordKey kKey2;
91 };
92 
93 using PerBalancerStoreTest = LoadDataStoreTest;
94 
TEST_F(LoadDataStoreTest,AssignToSelf)95 TEST_F(LoadDataStoreTest, AssignToSelf) {
96   LoadDataStore load_data_store;
97   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
98   auto assigned_stores = load_data_store.GetAssignedStores(kHostname1, kLbId1);
99   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_stores,
100                                         kHostname1, kLbId1, kLoadKey1));
101 }
102 
TEST_F(LoadDataStoreTest,ReassignOrphanStores)103 TEST_F(LoadDataStoreTest, ReassignOrphanStores) {
104   LoadDataStore load_data_store;
105   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
106   load_data_store.ReportStreamCreated(kHostname1, kLbId2, kLoadKey1);
107   load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
108   load_data_store.ReportStreamCreated(kHostname2, kLbId4, kLoadKey1);
109   // 1. Close the second stream.
110   load_data_store.ReportStreamClosed(kHostname1, kLbId2);
111   auto assigned_to_lb_id_1 =
112       load_data_store.GetAssignedStores(kHostname1, kLbId1);
113   // The orphaned store is re-assigned to kLbId1 with the same load key.
114   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
115                                         kHostname1, kLbId1, kLoadKey1));
116   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
117                                         kHostname1, kLbId2, kLoadKey1));
118   // 2. Close the first stream.
119   load_data_store.ReportStreamClosed(kHostname1, kLbId1);
120   auto assigned_to_lb_id_3 =
121       load_data_store.GetAssignedStores(kHostname1, kLbId3);
122   // The orphaned stores are re-assigned to kLbId3 with the same host,
123   // because there isn't any LB with the same load key.
124   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
125                                         kHostname1, kLbId1, kLoadKey1));
126   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
127                                         kHostname1, kLbId2, kLoadKey1));
128   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
129                                         kHostname1, kLbId3, kLoadKey2));
130   // 3. Close the third stream.
131   load_data_store.ReportStreamClosed(kHostname1, kLbId3);
132   auto assigned_to_lb_id_4 =
133       load_data_store.GetAssignedStores(kHostname2, kLbId4);
134   // There is no active LB for the first host now. kLbId4 is active but
135   // it's for the second host, so it will NOT adopt the orphaned stores.
136   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
137                                          kHostname1, kLbId1, kLoadKey1));
138   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
139                                          kHostname1, kLbId2, kLoadKey1));
140   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
141                                          kHostname1, kLbId3, kLoadKey2));
142   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
143                                         kHostname2, kLbId4, kLoadKey1));
144 }
145 
TEST_F(LoadDataStoreTest,OrphanAssignmentIsSticky)146 TEST_F(LoadDataStoreTest, OrphanAssignmentIsSticky) {
147   LoadDataStore load_data_store;
148   std::set<std::string> active_lb_ids;
149   size_t num_lb_ids = 1000;
150   for (size_t i = 0; i < num_lb_ids; ++i) {
151     load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
152     active_lb_ids.insert(FormatLbId(i));
153   }
154   std::string orphaned_lb_id = FormatLbId(std::rand() % num_lb_ids);
155   load_data_store.ReportStreamClosed(kHostname1, orphaned_lb_id);
156   active_lb_ids.erase(orphaned_lb_id);
157   // Find which LB is assigned the orphaned store.
158   std::string assigned_lb_id;
159   for (const auto& lb_id : active_lb_ids) {
160     if (PerBalancerStoresContains(
161             load_data_store,
162             load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
163             orphaned_lb_id, kLoadKey1)) {
164       assigned_lb_id = lb_id;
165       break;
166     }
167   }
168   EXPECT_STRNE(assigned_lb_id.c_str(), "");
169   // Close 10 more stream, skipping the assigned_lb_id. The assignment of
170   // orphaned_lb_id shouldn't change.
171   for (size_t _ = 0; _ < 10; ++_) {
172     std::string lb_id_to_close;
173     for (const auto& lb_id : active_lb_ids) {
174       if (lb_id != assigned_lb_id) {
175         lb_id_to_close = lb_id;
176         break;
177       }
178     }
179     EXPECT_STRNE(lb_id_to_close.c_str(), "");
180     load_data_store.ReportStreamClosed(kHostname1, lb_id_to_close);
181     active_lb_ids.erase(lb_id_to_close);
182     EXPECT_TRUE(PerBalancerStoresContains(
183         load_data_store,
184         load_data_store.GetAssignedStores(kHostname1, assigned_lb_id),
185         kHostname1, orphaned_lb_id, kLoadKey1));
186   }
187   // Close the assigned_lb_id, orphaned_lb_id will be re-assigned again.
188   load_data_store.ReportStreamClosed(kHostname1, assigned_lb_id);
189   active_lb_ids.erase(assigned_lb_id);
190   size_t orphaned_lb_id_occurrences = 0;
191   for (const auto& lb_id : active_lb_ids) {
192     if (PerBalancerStoresContains(
193             load_data_store,
194             load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
195             orphaned_lb_id, kLoadKey1)) {
196       orphaned_lb_id_occurrences++;
197     }
198   }
199   EXPECT_EQ(orphaned_lb_id_occurrences, 1U);
200 }
201 
TEST_F(LoadDataStoreTest,HostTemporarilyLoseAllStreams)202 TEST_F(LoadDataStoreTest, HostTemporarilyLoseAllStreams) {
203   LoadDataStore load_data_store;
204   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
205   load_data_store.ReportStreamCreated(kHostname2, kLbId2, kLoadKey1);
206   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
207   auto store_invalid_lb_id_1 =
208       load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
209   EXPECT_FALSE(store_lb_id_1->IsSuspended());
210   EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
211   // Disconnect all the streams of the first host.
212   load_data_store.ReportStreamClosed(kHostname1, kLbId1);
213   // All the streams of that host are suspended.
214   EXPECT_TRUE(store_lb_id_1->IsSuspended());
215   EXPECT_TRUE(store_invalid_lb_id_1->IsSuspended());
216   // Detailed load data won't be kept when the PerBalancerStore is suspended.
217   store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
218   store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
219   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 0U);
220   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 0U);
221   // The stores for different hosts won't mix, even if the load key is the same.
222   auto assigned_to_lb_id_2 =
223       load_data_store.GetAssignedStores(kHostname2, kLbId2);
224   EXPECT_EQ(assigned_to_lb_id_2->size(), 2U);
225   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
226                                         kHostname2, kLbId2, kLoadKey1));
227   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
228                                         kHostname2, kInvalidLbId, ""));
229   // A new stream is created for the first host.
230   load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
231   // The stores for the first host are resumed.
232   EXPECT_FALSE(store_lb_id_1->IsSuspended());
233   EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
234   store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
235   store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
236   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
237   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 1U);
238   // The resumed stores are assigned to the new LB.
239   auto assigned_to_lb_id_3 =
240       load_data_store.GetAssignedStores(kHostname1, kLbId3);
241   EXPECT_EQ(assigned_to_lb_id_3->size(), 3U);
242   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
243                                         kHostname1, kLbId1, kLoadKey1));
244   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
245                                         kHostname1, kInvalidLbId, ""));
246   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
247                                         kHostname1, kLbId3, kLoadKey2));
248 }
249 
TEST_F(LoadDataStoreTest,OneStorePerLbId)250 TEST_F(LoadDataStoreTest, OneStorePerLbId) {
251   LoadDataStore load_data_store;
252   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId1), nullptr);
253   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId),
254             nullptr);
255   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
256   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
257   // Create The first stream.
258   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
259   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
260   auto store_invalid_lb_id_1 =
261       load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
262   // Two stores will be created: one is for the stream; the other one is for
263   // kInvalidLbId.
264   EXPECT_NE(store_lb_id_1, nullptr);
265   EXPECT_NE(store_invalid_lb_id_1, nullptr);
266   EXPECT_NE(store_lb_id_1, store_invalid_lb_id_1);
267   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
268   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
269   // Create the second stream.
270   load_data_store.ReportStreamCreated(kHostname2, kLbId3, kLoadKey1);
271   auto store_lb_id_3 = load_data_store.FindPerBalancerStore(kHostname2, kLbId3);
272   auto store_invalid_lb_id_2 =
273       load_data_store.FindPerBalancerStore(kHostname2, kInvalidLbId);
274   EXPECT_NE(store_lb_id_3, nullptr);
275   EXPECT_NE(store_invalid_lb_id_2, nullptr);
276   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_2);
277   // The PerBalancerStores created for different hosts are independent.
278   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_1);
279   EXPECT_NE(store_invalid_lb_id_2, store_invalid_lb_id_1);
280   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
281 }
282 
TEST_F(LoadDataStoreTest,ExactlyOnceAssignment)283 TEST_F(LoadDataStoreTest, ExactlyOnceAssignment) {
284   LoadDataStore load_data_store;
285   size_t num_create = 100;
286   size_t num_close = 50;
287   for (size_t i = 0; i < num_create; ++i) {
288     load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
289   }
290   for (size_t i = 0; i < num_close; ++i) {
291     load_data_store.ReportStreamClosed(kHostname1, FormatLbId(i));
292   }
293   std::set<std::string> reported_lb_ids;
294   for (size_t i = num_close; i < num_create; ++i) {
295     for (auto assigned_store :
296          *load_data_store.GetAssignedStores(kHostname1, FormatLbId(i))) {
297       EXPECT_TRUE(reported_lb_ids.insert(assigned_store->lb_id()).second);
298     }
299   }
300   // Add one for kInvalidLbId.
301   EXPECT_EQ(reported_lb_ids.size(), (num_create + 1));
302   EXPECT_NE(reported_lb_ids.find(kInvalidLbId), reported_lb_ids.end());
303 }
304 
TEST_F(LoadDataStoreTest,UnknownBalancerIdTracking)305 TEST_F(LoadDataStoreTest, UnknownBalancerIdTracking) {
306   LoadDataStore load_data_store;
307   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
308   // Merge data for a known LB ID.
309   LoadRecordValue v1(192);
310   load_data_store.MergeRow(kHostname1, kKey1, v1);
311   // Merge data for unknown LB ID.
312   LoadRecordValue v2(23);
313   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
314   load_data_store.MergeRow(
315       kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v2);
316   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
317   LoadRecordValue v3(952);
318   load_data_store.MergeRow(
319       kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v3);
320   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
321   // The data kept for a known LB ID is correct.
322   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
323   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
324   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
325             v1.start_count());
326   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), v1.start_count());
327   // No PerBalancerStore created for Unknown LB ID.
328   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId2), nullptr);
329   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
330   // End all the started RPCs for kLbId1.
331   LoadRecordValue v4(0, v1.start_count());
332   load_data_store.MergeRow(kHostname1, kKey1, v4);
333   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
334   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
335             v1.start_count());
336   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.ok_count(),
337             v4.ok_count());
338   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), 0U);
339   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId1));
340   // End all the started RPCs for kLbId2.
341   LoadRecordValue v5(0, v2.start_count());
342   load_data_store.MergeRow(
343       kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v5);
344   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
345   // End some of the started RPCs for kLbId3.
346   LoadRecordValue v6(0, v3.start_count() / 2);
347   load_data_store.MergeRow(
348       kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v6);
349   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
350 }
351 
TEST_F(PerBalancerStoreTest,Suspend)352 TEST_F(PerBalancerStoreTest, Suspend) {
353   PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
354   EXPECT_FALSE(per_balancer_store.IsSuspended());
355   // Suspend the store.
356   per_balancer_store.Suspend();
357   EXPECT_TRUE(per_balancer_store.IsSuspended());
358   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
359   // Data merged when the store is suspended won't be kept.
360   LoadRecordValue v1(139, 19);
361   per_balancer_store.MergeRow(kKey1, v1);
362   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
363   // Resume the store.
364   per_balancer_store.Resume();
365   EXPECT_FALSE(per_balancer_store.IsSuspended());
366   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
367   // Data merged after the store is resumed will be kept.
368   LoadRecordValue v2(23, 0, 51);
369   per_balancer_store.MergeRow(kKey1, v2);
370   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
371   // Suspend the store.
372   per_balancer_store.Suspend();
373   EXPECT_TRUE(per_balancer_store.IsSuspended());
374   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
375   // Data merged when the store is suspended won't be kept.
376   LoadRecordValue v3(62, 11);
377   per_balancer_store.MergeRow(kKey1, v3);
378   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
379   // Resume the store.
380   per_balancer_store.Resume();
381   EXPECT_FALSE(per_balancer_store.IsSuspended());
382   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
383   // Data merged after the store is resumed will be kept.
384   LoadRecordValue v4(225, 98);
385   per_balancer_store.MergeRow(kKey1, v4);
386   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
387   // In-progress count is always kept.
388   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
389             v1.start_count() - v1.ok_count() + v2.start_count() -
390                 v2.error_count() + v3.start_count() - v3.ok_count() +
391                 v4.start_count() - v4.ok_count());
392 }
393 
TEST_F(PerBalancerStoreTest,DataAggregation)394 TEST_F(PerBalancerStoreTest, DataAggregation) {
395   PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
396   // Construct some Values.
397   LoadRecordValue v1(992, 34, 13, 234, 164, 173467);
398   v1.InsertCallMetric(kMetric1, CallMetricValue(3, 2773.2));
399   LoadRecordValue v2(4842, 213, 9, 393, 974, 1345);
400   v2.InsertCallMetric(kMetric1, CallMetricValue(7, 25.234));
401   v2.InsertCallMetric(kMetric2, CallMetricValue(2, 387.08));
402   // v3 doesn't change the number of in-progress RPCs.
403   LoadRecordValue v3(293, 55, 293 - 55, 28764, 5284, 5772);
404   v3.InsertCallMetric(kMetric1, CallMetricValue(61, 3465.0));
405   v3.InsertCallMetric(kMetric2, CallMetricValue(13, 672.0));
406   // The initial state of the store.
407   uint64_t num_calls_in_progress = 0;
408   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
409   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
410             num_calls_in_progress);
411   // Merge v1 and get report of the number of in-progress calls.
412   per_balancer_store.MergeRow(kKey1, v1);
413   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
414   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
415             num_calls_in_progress +=
416             (v1.start_count() - v1.ok_count() - v1.error_count()));
417   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
418   // Merge v2 and get report of the number of in-progress calls.
419   per_balancer_store.MergeRow(kKey2, v2);
420   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
421   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
422             num_calls_in_progress +=
423             (v2.start_count() - v2.ok_count() - v2.error_count()));
424   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
425   // Merge v3 and get report of the number of in-progress calls.
426   per_balancer_store.MergeRow(kKey1, v3);
427   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
428   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
429             num_calls_in_progress);
430   // LoadRecordValue for kKey1 is aggregated correctly.
431   LoadRecordValue value_for_key1 =
432       per_balancer_store.load_record_map().find(kKey1)->second;
433   EXPECT_EQ(value_for_key1.start_count(), v1.start_count() + v3.start_count());
434   EXPECT_EQ(value_for_key1.ok_count(), v1.ok_count() + v3.ok_count());
435   EXPECT_EQ(value_for_key1.error_count(), v1.error_count() + v3.error_count());
436   EXPECT_EQ(value_for_key1.bytes_sent(), v1.bytes_sent() + v3.bytes_sent());
437   EXPECT_EQ(value_for_key1.bytes_recv(), v1.bytes_recv() + v3.bytes_recv());
438   EXPECT_EQ(value_for_key1.latency_ms(), v1.latency_ms() + v3.latency_ms());
439   EXPECT_EQ(value_for_key1.call_metrics().size(), 2U);
440   EXPECT_EQ(value_for_key1.call_metrics().find(kMetric1)->second.num_calls(),
441             v1.call_metrics().find(kMetric1)->second.num_calls() +
442                 v3.call_metrics().find(kMetric1)->second.num_calls());
443   EXPECT_EQ(
444       value_for_key1.call_metrics().find(kMetric1)->second.total_metric_value(),
445       v1.call_metrics().find(kMetric1)->second.total_metric_value() +
446           v3.call_metrics().find(kMetric1)->second.total_metric_value());
447   EXPECT_EQ(value_for_key1.call_metrics().find(kMetric2)->second.num_calls(),
448             v3.call_metrics().find(kMetric2)->second.num_calls());
449   EXPECT_EQ(
450       value_for_key1.call_metrics().find(kMetric2)->second.total_metric_value(),
451       v3.call_metrics().find(kMetric2)->second.total_metric_value());
452   // LoadRecordValue for kKey2 is aggregated (trivially) correctly.
453   LoadRecordValue value_for_key2 =
454       per_balancer_store.load_record_map().find(kKey2)->second;
455   EXPECT_EQ(value_for_key2.start_count(), v2.start_count());
456   EXPECT_EQ(value_for_key2.ok_count(), v2.ok_count());
457   EXPECT_EQ(value_for_key2.error_count(), v2.error_count());
458   EXPECT_EQ(value_for_key2.bytes_sent(), v2.bytes_sent());
459   EXPECT_EQ(value_for_key2.bytes_recv(), v2.bytes_recv());
460   EXPECT_EQ(value_for_key2.latency_ms(), v2.latency_ms());
461   EXPECT_EQ(value_for_key2.call_metrics().size(), 2U);
462   EXPECT_EQ(value_for_key2.call_metrics().find(kMetric1)->second.num_calls(),
463             v2.call_metrics().find(kMetric1)->second.num_calls());
464   EXPECT_EQ(
465       value_for_key2.call_metrics().find(kMetric1)->second.total_metric_value(),
466       v2.call_metrics().find(kMetric1)->second.total_metric_value());
467   EXPECT_EQ(value_for_key2.call_metrics().find(kMetric2)->second.num_calls(),
468             v2.call_metrics().find(kMetric2)->second.num_calls());
469   EXPECT_EQ(
470       value_for_key2.call_metrics().find(kMetric2)->second.total_metric_value(),
471       v2.call_metrics().find(kMetric2)->second.total_metric_value());
472 }
473 
474 }  // namespace
475 }  // namespace testing
476 }  // namespace grpc
477 
main(int argc,char ** argv)478 int main(int argc, char** argv) {
479   grpc::testing::TestEnvironment env(&argc, argv);
480   ::testing::InitGoogleTest(&argc, argv);
481   return RUN_ALL_TESTS();
482 }
483