1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include "logging.h"
19 #include "plugin_service.h"
20 #include "plugin_session.h"
21 #include "plugin_session_manager.h"
22 #include "profiler_capability_manager.h"
23 #include "profiler_data_repeater.h"
24 #include "result_demuxer.h"
25 #include "schedule_task_manager.h"
26 #include "trace_file_writer.h"
27
28 using namespace ::grpc;
29
30 #define CHECK_REQUEST_RESPONSE(context, requst, response) \
31 do { \
32 CHECK_POINTER_NOTNULL(context, "context ptr invalid!"); \
33 CHECK_POINTER_NOTNULL(requst, "request ptr invalid!"); \
34 CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
35 } while (0)
36
37 #define CHECK_POINTER_NOTNULL(ptr, errorMessage) \
38 do { \
39 if (ptr == nullptr) { \
40 HILOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
41 return {StatusCode::INTERNAL, errorMessage}; \
42 } \
43 } while (0)
44
45 #define CHECK_EXPRESSION_TRUE(expr, errorMessage) \
46 do { \
47 if (!(expr)) { \
48 HILOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
49 return {StatusCode::INTERNAL, (errorMessage)}; \
50 } \
51 } while (0)
52
53 namespace {
54 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
55 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
56 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
57 } // namespace
58
ProfilerService(const PluginServicePtr & pluginService)59 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
60 : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
61 {
62 pluginService_->SetPluginSessionManager(pluginSessionManager_);
63 }
64
~ProfilerService()65 ProfilerService::~ProfilerService() {}
66
~SessionContext()67 ProfilerService::SessionContext::~SessionContext()
68 {
69 HILOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
70 if (offlineTask.size() > 0) {
71 ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
72 }
73 StopSessionExpireTask();
74 service->pluginSessionManager_->RemovePluginSessions(pluginNames);
75 }
76
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)77 Status ProfilerService::GetCapabilities(ServerContext* context,
78 const ::GetCapabilitiesRequest* request,
79 ::GetCapabilitiesResponse* response)
80 {
81 CHECK_REQUEST_RESPONSE(context, request, response);
82 HILOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
83
84 HILOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
85 std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
86
87 response->set_status(StatusCode::OK);
88 for (size_t i = 0; i < capabilities.size(); i++) {
89 *response->add_capabilities() = capabilities[i];
90 }
91 HILOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
92 return Status::OK;
93 }
94
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)95 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
96 {
97 std::unordered_map<std::string, size_t> nameIndex;
98 for (size_t i = 0; i < pluginConfigs.size(); i++) {
99 nameIndex[pluginConfigs[i].name()] = i;
100 }
101
102 size_t updateCount = 0;
103 for (auto& cfg : newPluginConfigs) {
104 auto it = nameIndex.find(cfg.name());
105 if (it != nameIndex.end()) {
106 int index = it->second;
107 pluginConfigs[index] = cfg;
108 updateCount++;
109 }
110 }
111 return updateCount;
112 }
113
CreatePluginSessions()114 bool ProfilerService::SessionContext::CreatePluginSessions()
115 {
116 if (bufferConfigs.size() > 0) { // with buffer configs
117 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
118 false, "create plugin sessions with buffer configs failed!");
119 } else { // without buffer configs
120 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
121 "create plugin sessions without buffer configs failed!");
122 }
123 return true;
124 }
125
StartPluginSessions()126 bool ProfilerService::SessionContext::StartPluginSessions()
127 {
128 std::unique_lock<std::mutex> lock(sessionMutex);
129
130 // if dataRepeater exists, reset it to usable state.
131 if (dataRepeater) {
132 dataRepeater->Reset();
133 }
134
135 // start demuxer take result thread
136 if (resultDemuxer != nullptr && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
137 resultDemuxer->StartTakeResults(); // start write file thread
138 uint32_t sampleDuration = sessionConfig.sample_duration();
139 if (sampleDuration > 0) {
140 offlineTask = "stop-session-" + std::to_string(id);
141 std::weak_ptr<SessionContext> weakCtx(shared_from_this());
142 // start offline trace timeout task
143 ScheduleTaskManager::GetInstance().ScheduleTask(
144 offlineTask,
145 [weakCtx]() {
146 if (auto ctx = weakCtx.lock(); ctx != nullptr) {
147 ctx->StopPluginSessions();
148 }
149 },
150 std::chrono::milliseconds(0), // do not repeat
151 std::chrono::milliseconds(sampleDuration));
152
153 // keep_alive_time not set by client, but the sample_duration setted
154 if (sessionConfig.keep_alive_time() == 0) {
155 // use sample_duration add a little time to set keep_alive_time
156 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
157 StartSessionExpireTask();
158 }
159 }
160 }
161
162 // start each plugin sessions
163 service->pluginSessionManager_->StartPluginSessions(pluginNames);
164 return true;
165 }
166
StopPluginSessions()167 bool ProfilerService::SessionContext::StopPluginSessions()
168 {
169 std::unique_lock<std::mutex> lock(sessionMutex);
170 // stop each plugin sessions
171 service->pluginSessionManager_->StopPluginSessions(pluginNames);
172
173 // stop demuxer take result thread
174 if (resultDemuxer && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
175 if (offlineTask.size() > 0) {
176 ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
177 }
178 resultDemuxer->StopTakeResults(); // stop write file thread
179 }
180
181 // make sure FetchData thread exit
182 if (dataRepeater) {
183 dataRepeater->Close();
184 }
185 return true;
186 }
187
188 namespace {
IsValidKeepAliveTime(uint32_t timeout)189 bool IsValidKeepAliveTime(uint32_t timeout)
190 {
191 if (timeout < MIN_SESSION_TIMEOUT_MS) {
192 return false;
193 }
194 if (timeout > MAX_SESSION_TIMEOUT_MS) {
195 return false;
196 }
197 return true;
198 }
199 } // namespace
200
SetKeepAliveTime(uint32_t timeout)201 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
202 {
203 if (timeout > 0) {
204 timeoutTask = "timeout-session-" + std::to_string(id);
205 sessionConfig.set_keep_alive_time(timeout);
206 }
207 }
208
StartSessionExpireTask()209 void ProfilerService::SessionContext::StartSessionExpireTask()
210 {
211 if (timeoutTask.size() > 0) {
212 ScheduleTaskManager::GetInstance().ScheduleTask(timeoutTask,
213 std::bind(&ProfilerService::RemoveSessionContext, service, id),
214 std::chrono::milliseconds(0), // do not repeat
215 std::chrono::milliseconds(sessionConfig.keep_alive_time()));
216 }
217 }
218
StopSessionExpireTask()219 void ProfilerService::SessionContext::StopSessionExpireTask()
220 {
221 if (timeoutTask.size() > 0) {
222 ScheduleTaskManager::GetInstance().UnscheduleTask(timeoutTask);
223 }
224 }
225
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)226 Status ProfilerService::CreateSession(ServerContext* context,
227 const ::CreateSessionRequest* request,
228 ::CreateSessionResponse* response)
229 {
230 CHECK_REQUEST_RESPONSE(context, request, response);
231 HILOG_INFO(LOG_CORE, "CreateSession from '%s'", context->peer().c_str());
232 CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
233
234 // check plugin configs
235 HILOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
236 const int nConfigs = request->plugin_configs_size();
237 CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
238
239 // check buffer configs
240 ProfilerSessionConfig sessionConfig = request->session_config();
241 const int nBuffers = sessionConfig.buffers_size();
242 CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
243
244 // copy buffer configs
245 std::vector<BufferConfig> bufferConfigs;
246 if (nBuffers == 1) {
247 // if only one buffer config provided, all plugin use the same buffer config
248 bufferConfigs.resize(nConfigs, sessionConfig.buffers(0));
249 } else if (nBuffers > 0) {
250 // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
251 bufferConfigs.assign(sessionConfig.buffers().begin(), sessionConfig.buffers().end());
252 }
253 HILOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
254
255 // copy plugin configs from request
256 std::vector<ProfilerPluginConfig> pluginConfigs;
257 pluginConfigs.reserve(nConfigs);
258 for (int i = 0; i < nConfigs; i++) {
259 pluginConfigs.push_back(request->plugin_configs(i));
260 }
261
262 std::vector<std::string> pluginNames;
263 std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
264 [](ProfilerPluginConfig& config) { return config.name(); });
265 std::sort(pluginNames.begin(), pluginNames.end());
266
267 // create ProfilerDataRepeater
268 auto dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
269 CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
270
271 // create ResultDemuxer
272 auto resultDemuxer = std::make_shared<ResultDemuxer>(dataRepeater);
273 CHECK_POINTER_NOTNULL(resultDemuxer, "alloc ResultDemuxer failed!");
274
275 // create TraceFileWriter for offline mode
276 TraceFileWriterPtr traceWriter;
277 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
278 auto resultFile = sessionConfig.result_file();
279 CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
280 traceWriter = std::make_shared<TraceFileWriter>(resultFile);
281 CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
282 resultDemuxer->SetTraceWriter(traceWriter);
283 }
284
285 // create session context
286 auto ctx = std::make_shared<SessionContext>();
287 CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
288
289 // fill fields of SessionContext
290 ctx->service = this;
291 ctx->dataRepeater = dataRepeater;
292 ctx->resultDemuxer = resultDemuxer;
293 ctx->traceFileWriter = traceWriter;
294 ctx->sessionConfig = sessionConfig;
295 ctx->pluginNames = std::move(pluginNames);
296 ctx->pluginConfigs = std::move(pluginConfigs);
297 ctx->bufferConfigs = std::move(bufferConfigs);
298
299 // create plugin sessions
300 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
301 // alloc new session id
302 uint32_t sessionId = ++sessionIdCounter_;
303 ctx->id = sessionId;
304 ctx->name = "session-" + std::to_string(sessionId);
305
306 // add {sessionId, ctx} to map
307 CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
308
309 // create session expire schedule task
310 auto keepAliveTime = sessionConfig.keep_alive_time();
311 if (keepAliveTime) {
312 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
313 // create schedule task for session timeout feature
314 ctx->SetKeepAliveTime(keepAliveTime);
315 ctx->StartSessionExpireTask();
316 }
317
318 // prepare response data fields
319 response->set_status(0);
320 response->set_session_id(sessionId);
321
322 HILOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
323 return Status::OK;
324 }
325
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)326 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
327 {
328 std::unique_lock<std::mutex> lock(sessionContextMutex_);
329 if (sessionContext_.count(sessionId) > 0) {
330 HILOG_WARN(LOG_CORE, "sessionId already exists!");
331 return false;
332 }
333 sessionContext_[sessionId] = sessionCtx;
334 return true;
335 }
336
GetSessionContext(uint32_t sessionId) const337 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
338 {
339 std::unique_lock<std::mutex> lock(sessionContextMutex_);
340 auto it = sessionContext_.find(sessionId);
341 if (it != sessionContext_.end()) {
342 auto ptr = it->second;
343 HILOG_INFO(LOG_CORE, "GetCtx %p use_count: %ld", ptr.get(), ptr.use_count());
344 return ptr;
345 }
346 return nullptr;
347 }
348
RemoveSessionContext(uint32_t sessionId)349 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
350 {
351 std::unique_lock<std::mutex> lock(sessionContextMutex_);
352 auto it = sessionContext_.find(sessionId);
353 if (it != sessionContext_.end()) {
354 auto ptr = it->second;
355 HILOG_INFO(LOG_CORE, "DelCtx %p use_count: %ld", ptr.get(), ptr.use_count());
356 sessionContext_.erase(it);
357 return true;
358 }
359 return false;
360 }
361
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)362 Status ProfilerService::StartSession(ServerContext* context,
363 const ::StartSessionRequest* request,
364 ::StartSessionResponse* response)
365 {
366 CHECK_REQUEST_RESPONSE(context, request, response);
367 HILOG_INFO(LOG_CORE, "StartSession from '%s'", context->peer().c_str());
368
369 uint32_t sessionId = request->session_id();
370 HILOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
371
372 // copy plugin configs from request
373 std::vector<ProfilerPluginConfig> newPluginConfigs;
374 newPluginConfigs.reserve(request->update_configs_size());
375 for (int i = 0; i < request->update_configs_size(); i++) {
376 HILOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
377 newPluginConfigs.push_back(request->update_configs(i));
378 }
379
380 // get plugin names in request
381 std::vector<std::string> requestNames;
382 std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
383 [](auto& config) { return config.name(); });
384 std::sort(requestNames.begin(), requestNames.end());
385
386 auto ctx = GetSessionContext(sessionId);
387 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
388
389 // get intersection set of requestNames and pluginNames
390 std::vector<std::string> updateNames;
391 std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
392 std::back_inserter(updateNames));
393
394 if (updateNames.size() > 0) {
395 // remove old plugin sessions
396 pluginSessionManager_->RemovePluginSessions(updateNames);
397
398 // update plugin configs
399 size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
400
401 // re-create plugin sessions
402 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
403 HILOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
404 }
405
406 // start plugin sessions with configs
407 CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
408 HILOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
409 return Status::OK;
410 }
411
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)412 Status ProfilerService::FetchData(ServerContext* context,
413 const ::FetchDataRequest* request,
414 ServerWriter<::FetchDataResponse>* writer)
415 {
416 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
417 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
418 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
419
420 HILOG_INFO(LOG_CORE, "FetchData from '%s'", context->peer().c_str());
421 CHECK_POINTER_NOTNULL(request, "request invalid!");
422 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
423
424 uint32_t sessionId = request->session_id();
425 HILOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
426
427 auto ctx = GetSessionContext(sessionId);
428 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
429
430 // check each plugin session states
431 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
432 "session status invalid!");
433
434 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
435 auto dataRepeater = ctx->dataRepeater;
436 CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
437
438 while (1) {
439 ctx = GetSessionContext(sessionId);
440 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
441
442 FetchDataResponse response;
443 response.set_status(StatusCode::OK);
444 response.set_response_id(++responseIdCounter_);
445
446 std::vector<ProfilerPluginDataPtr> pluginDataVec;
447 int count = dataRepeater->TakePluginData(pluginDataVec);
448 if (count > 0) {
449 response.set_has_more(true);
450 for (int i = 0; i < count; i++) {
451 auto data = response.add_plugin_data();
452 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
453 CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
454 *data = *pluginDataVec[i];
455 }
456 } else {
457 response.set_has_more(false);
458 HILOG_INFO(LOG_CORE, "no more data need to fill to response!");
459 }
460
461 bool sendSuccess = writer->Write(response);
462 if (count <= 0 || !sendSuccess) {
463 HILOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
464 break;
465 }
466 }
467 }
468
469 HILOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
470 return Status::OK;
471 }
472
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)473 Status ProfilerService::StopSession(ServerContext* context,
474 const ::StopSessionRequest* request,
475 ::StopSessionResponse* response)
476 {
477 CHECK_REQUEST_RESPONSE(context, request, response);
478 HILOG_INFO(LOG_CORE, "StopSession from '%s'", context->peer().c_str());
479
480 uint32_t sessionId = request->session_id();
481 HILOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
482
483 auto ctx = GetSessionContext(sessionId);
484 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
485
486 CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
487 HILOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
488 return Status::OK;
489 }
490
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)491 Status ProfilerService::DestroySession(ServerContext* context,
492 const ::DestroySessionRequest* request,
493 ::DestroySessionResponse* response)
494 {
495 CHECK_REQUEST_RESPONSE(context, request, response);
496 HILOG_INFO(LOG_CORE, "DestroySession from '%s'", context->peer().c_str());
497
498 uint32_t sessionId = request->session_id();
499 HILOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
500
501 auto ctx = GetSessionContext(sessionId);
502 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
503
504 CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
505 CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
506 "remove plugin session FAILED!");
507 HILOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
508 return Status::OK;
509 }
510
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)511 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
512 const ::KeepSessionRequest* request,
513 ::KeepSessionResponse* response)
514 {
515 CHECK_REQUEST_RESPONSE(context, request, response);
516 HILOG_INFO(LOG_CORE, "KeepSession from '%s'", context->peer().c_str());
517
518 uint32_t sessionId = request->session_id();
519 HILOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
520
521 auto ctx = GetSessionContext(sessionId);
522 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
523
524 // update keep alive time if keep_alive_time parameter provided
525 auto keepAliveTime = request->keep_alive_time();
526 if (keepAliveTime) {
527 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
528 ctx->SetKeepAliveTime(keepAliveTime);
529 }
530
531 // reschedule session timeout task
532 if (ctx->timeoutTask.size() > 0) {
533 ctx->StopSessionExpireTask();
534 ctx->StartSessionExpireTask();
535 }
536 HILOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
537 return Status::OK;
538 }
539
540 struct LoggingInterceptor : public grpc::experimental::Interceptor {
541 public:
LoggingInterceptorLoggingInterceptor542 explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
543
InterceptLoggingInterceptor544 void Intercept(experimental::InterceptorBatchMethods* methods) override
545 {
546 const char* method = info_->method();
547 if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
548 HILOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
549 } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
550 HILOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
551 }
552 methods->Proceed();
553 }
554
555 private:
556 grpc::experimental::ServerRpcInfo* info_ = nullptr;
557 };
558
559 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
560 protected:
CreateServerInterceptorInterceptorFactory561 grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
562 {
563 return new LoggingInterceptor(info);
564 }
565 };
566
StartService(const std::string & listenUri)567 bool ProfilerService::StartService(const std::string& listenUri)
568 {
569 if (listenUri == "") {
570 HILOG_WARN(LOG_CORE, "listenUri empty!");
571 return false;
572 }
573
574 ServerBuilder builder;
575 builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
576 builder.RegisterService(this);
577
578 auto server = builder.BuildAndStart();
579 CHECK_NOTNULL(server, false, "start service on %s failed!", listenUri.c_str());
580 HILOG_INFO(LOG_CORE, "Server listening on %s", listenUri.c_str());
581
582 server_ = std::move(server);
583 return true;
584 }
585
WaitServiceDone()586 void ProfilerService::WaitServiceDone()
587 {
588 if (server_) {
589 HILOG_INFO(LOG_CORE, "waiting Server...");
590 server_->Wait();
591 HILOG_INFO(LOG_CORE, "Server done!");
592 }
593 }
594
StopService()595 void ProfilerService::StopService()
596 {
597 if (server_) {
598 server_->Shutdown();
599 HILOG_INFO(LOG_CORE, "Server stop done!");
600 }
601 }
602