• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include "logging.h"
19 #include "plugin_service.h"
20 #include "plugin_session.h"
21 #include "plugin_session_manager.h"
22 #include "profiler_capability_manager.h"
23 #include "profiler_data_repeater.h"
24 #include "result_demuxer.h"
25 #include "schedule_task_manager.h"
26 #include "trace_file_writer.h"
27 
28 using namespace ::grpc;
29 
30 #define CHECK_REQUEST_RESPONSE(context, requst, response)         \
31     do {                                                          \
32         CHECK_POINTER_NOTNULL(context, "context ptr invalid!");   \
33         CHECK_POINTER_NOTNULL(requst, "request ptr invalid!");    \
34         CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
35     } while (0)
36 
37 #define CHECK_POINTER_NOTNULL(ptr, errorMessage)                              \
38     do {                                                                      \
39         if (ptr == nullptr) {                                                 \
40             HILOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
41             return {StatusCode::INTERNAL, errorMessage};                      \
42         }                                                                     \
43     } while (0)
44 
45 #define CHECK_EXPRESSION_TRUE(expr, errorMessage)                            \
46     do {                                                                     \
47         if (!(expr)) {                                                       \
48             HILOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
49             return {StatusCode::INTERNAL, (errorMessage)};                   \
50         }                                                                    \
51     } while (0)
52 
53 namespace {
54 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
55 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
56 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
57 } // namespace
58 
ProfilerService(const PluginServicePtr & pluginService)59 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
60     : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
61 {
62     pluginService_->SetPluginSessionManager(pluginSessionManager_);
63 }
64 
~ProfilerService()65 ProfilerService::~ProfilerService() {}
66 
~SessionContext()67 ProfilerService::SessionContext::~SessionContext()
68 {
69     HILOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
70     if (offlineTask.size() > 0) {
71         ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
72     }
73     StopSessionExpireTask();
74     service->pluginSessionManager_->RemovePluginSessions(pluginNames);
75 }
76 
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)77 Status ProfilerService::GetCapabilities(ServerContext* context,
78                                         const ::GetCapabilitiesRequest* request,
79                                         ::GetCapabilitiesResponse* response)
80 {
81     CHECK_REQUEST_RESPONSE(context, request, response);
82     HILOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
83 
84     HILOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
85     std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
86 
87     response->set_status(StatusCode::OK);
88     for (size_t i = 0; i < capabilities.size(); i++) {
89         *response->add_capabilities() = capabilities[i];
90     }
91     HILOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
92     return Status::OK;
93 }
94 
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)95 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
96 {
97     std::unordered_map<std::string, size_t> nameIndex;
98     for (size_t i = 0; i < pluginConfigs.size(); i++) {
99         nameIndex[pluginConfigs[i].name()] = i;
100     }
101 
102     size_t updateCount = 0;
103     for (auto& cfg : newPluginConfigs) {
104         auto it = nameIndex.find(cfg.name());
105         if (it != nameIndex.end()) {
106             int index = it->second;
107             pluginConfigs[index] = cfg;
108             updateCount++;
109         }
110     }
111     return updateCount;
112 }
113 
CreatePluginSessions()114 bool ProfilerService::SessionContext::CreatePluginSessions()
115 {
116     if (bufferConfigs.size() > 0) { // with buffer configs
117         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
118                    false, "create plugin sessions with buffer configs failed!");
119     } else { // without buffer configs
120         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
121                    "create plugin sessions without buffer configs failed!");
122     }
123     return true;
124 }
125 
StartPluginSessions()126 bool ProfilerService::SessionContext::StartPluginSessions()
127 {
128     std::unique_lock<std::mutex> lock(sessionMutex);
129 
130     // if dataRepeater exists, reset it to usable state.
131     if (dataRepeater) {
132         dataRepeater->Reset();
133     }
134 
135     // start demuxer take result thread
136     if (resultDemuxer != nullptr && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
137         resultDemuxer->StartTakeResults(); // start write file thread
138         uint32_t sampleDuration = sessionConfig.sample_duration();
139         if (sampleDuration > 0) {
140             offlineTask = "stop-session-" + std::to_string(id);
141             std::weak_ptr<SessionContext> weakCtx(shared_from_this());
142             // start offline trace timeout task
143             ScheduleTaskManager::GetInstance().ScheduleTask(
144                 offlineTask,
145                 [weakCtx]() {
146                     if (auto ctx = weakCtx.lock(); ctx != nullptr) {
147                         ctx->StopPluginSessions();
148                     }
149                 },
150                 std::chrono::milliseconds(0), // do not repeat
151                 std::chrono::milliseconds(sampleDuration));
152 
153             // keep_alive_time not set by client, but the sample_duration setted
154             if (sessionConfig.keep_alive_time() == 0) {
155                 // use sample_duration add a little time to set keep_alive_time
156                 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
157                 StartSessionExpireTask();
158             }
159         }
160     }
161 
162     // start each plugin sessions
163     service->pluginSessionManager_->StartPluginSessions(pluginNames);
164     return true;
165 }
166 
StopPluginSessions()167 bool ProfilerService::SessionContext::StopPluginSessions()
168 {
169     std::unique_lock<std::mutex> lock(sessionMutex);
170     // stop each plugin sessions
171     service->pluginSessionManager_->StopPluginSessions(pluginNames);
172 
173     // stop demuxer take result thread
174     if (resultDemuxer && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
175         if (offlineTask.size() > 0) {
176             ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
177         }
178         resultDemuxer->StopTakeResults(); // stop write file thread
179     }
180 
181     // make sure FetchData thread exit
182     if (dataRepeater) {
183         dataRepeater->Close();
184     }
185     return true;
186 }
187 
188 namespace {
IsValidKeepAliveTime(uint32_t timeout)189 bool IsValidKeepAliveTime(uint32_t timeout)
190 {
191     if (timeout < MIN_SESSION_TIMEOUT_MS) {
192         return false;
193     }
194     if (timeout > MAX_SESSION_TIMEOUT_MS) {
195         return false;
196     }
197     return true;
198 }
199 }  // namespace
200 
SetKeepAliveTime(uint32_t timeout)201 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
202 {
203     if (timeout > 0) {
204         timeoutTask = "timeout-session-" + std::to_string(id);
205         sessionConfig.set_keep_alive_time(timeout);
206     }
207 }
208 
StartSessionExpireTask()209 void ProfilerService::SessionContext::StartSessionExpireTask()
210 {
211     if (timeoutTask.size() > 0) {
212         ScheduleTaskManager::GetInstance().ScheduleTask(timeoutTask,
213                                                         std::bind(&ProfilerService::RemoveSessionContext, service, id),
214                                                         std::chrono::milliseconds(0), // do not repeat
215                                                         std::chrono::milliseconds(sessionConfig.keep_alive_time()));
216     }
217 }
218 
StopSessionExpireTask()219 void ProfilerService::SessionContext::StopSessionExpireTask()
220 {
221     if (timeoutTask.size() > 0) {
222         ScheduleTaskManager::GetInstance().UnscheduleTask(timeoutTask);
223     }
224 }
225 
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)226 Status ProfilerService::CreateSession(ServerContext* context,
227                                       const ::CreateSessionRequest* request,
228                                       ::CreateSessionResponse* response)
229 {
230     CHECK_REQUEST_RESPONSE(context, request, response);
231     HILOG_INFO(LOG_CORE, "CreateSession from '%s'", context->peer().c_str());
232     CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
233 
234     // check plugin configs
235     HILOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
236     const int nConfigs = request->plugin_configs_size();
237     CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
238 
239     // check buffer configs
240     ProfilerSessionConfig sessionConfig = request->session_config();
241     const int nBuffers = sessionConfig.buffers_size();
242     CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
243 
244     // copy buffer configs
245     std::vector<BufferConfig> bufferConfigs;
246     if (nBuffers == 1) {
247         // if only one buffer config provided, all plugin use the same buffer config
248         bufferConfigs.resize(nConfigs, sessionConfig.buffers(0));
249     } else if (nBuffers > 0) {
250         // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
251         bufferConfigs.assign(sessionConfig.buffers().begin(), sessionConfig.buffers().end());
252     }
253     HILOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
254 
255     // copy plugin configs from request
256     std::vector<ProfilerPluginConfig> pluginConfigs;
257     pluginConfigs.reserve(nConfigs);
258     for (int i = 0; i < nConfigs; i++) {
259         pluginConfigs.push_back(request->plugin_configs(i));
260     }
261 
262     std::vector<std::string> pluginNames;
263     std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
264                    [](ProfilerPluginConfig& config) { return config.name(); });
265     std::sort(pluginNames.begin(), pluginNames.end());
266 
267     // create ProfilerDataRepeater
268     auto dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
269     CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
270 
271     // create ResultDemuxer
272     auto resultDemuxer = std::make_shared<ResultDemuxer>(dataRepeater);
273     CHECK_POINTER_NOTNULL(resultDemuxer, "alloc ResultDemuxer failed!");
274 
275     // create TraceFileWriter for offline mode
276     TraceFileWriterPtr traceWriter;
277     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
278         auto resultFile = sessionConfig.result_file();
279         CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
280         traceWriter = std::make_shared<TraceFileWriter>(resultFile);
281         CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
282         resultDemuxer->SetTraceWriter(traceWriter);
283     }
284 
285     // create session context
286     auto ctx = std::make_shared<SessionContext>();
287     CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
288 
289     // fill fields of SessionContext
290     ctx->service = this;
291     ctx->dataRepeater = dataRepeater;
292     ctx->resultDemuxer = resultDemuxer;
293     ctx->traceFileWriter = traceWriter;
294     ctx->sessionConfig = sessionConfig;
295     ctx->pluginNames = std::move(pluginNames);
296     ctx->pluginConfigs = std::move(pluginConfigs);
297     ctx->bufferConfigs = std::move(bufferConfigs);
298 
299     // create plugin sessions
300     CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
301     // alloc new session id
302     uint32_t sessionId = ++sessionIdCounter_;
303     ctx->id = sessionId;
304     ctx->name = "session-" + std::to_string(sessionId);
305 
306     // add {sessionId, ctx} to map
307     CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
308 
309     // create session expire schedule task
310     auto keepAliveTime = sessionConfig.keep_alive_time();
311     if (keepAliveTime) {
312         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
313         // create schedule task for session timeout feature
314         ctx->SetKeepAliveTime(keepAliveTime);
315         ctx->StartSessionExpireTask();
316     }
317 
318     // prepare response data fields
319     response->set_status(0);
320     response->set_session_id(sessionId);
321 
322     HILOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
323     return Status::OK;
324 }
325 
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)326 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
327 {
328     std::unique_lock<std::mutex> lock(sessionContextMutex_);
329     if (sessionContext_.count(sessionId) > 0) {
330         HILOG_WARN(LOG_CORE, "sessionId already exists!");
331         return false;
332     }
333     sessionContext_[sessionId] = sessionCtx;
334     return true;
335 }
336 
GetSessionContext(uint32_t sessionId) const337 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
338 {
339     std::unique_lock<std::mutex> lock(sessionContextMutex_);
340     auto it = sessionContext_.find(sessionId);
341     if (it != sessionContext_.end()) {
342         auto ptr = it->second;
343         HILOG_INFO(LOG_CORE, "GetCtx %p use_count: %ld", ptr.get(), ptr.use_count());
344         return ptr;
345     }
346     return nullptr;
347 }
348 
RemoveSessionContext(uint32_t sessionId)349 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
350 {
351     std::unique_lock<std::mutex> lock(sessionContextMutex_);
352     auto it = sessionContext_.find(sessionId);
353     if (it != sessionContext_.end()) {
354         auto ptr = it->second;
355         HILOG_INFO(LOG_CORE, "DelCtx %p use_count: %ld", ptr.get(), ptr.use_count());
356         sessionContext_.erase(it);
357         return true;
358     }
359     return false;
360 }
361 
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)362 Status ProfilerService::StartSession(ServerContext* context,
363                                      const ::StartSessionRequest* request,
364                                      ::StartSessionResponse* response)
365 {
366     CHECK_REQUEST_RESPONSE(context, request, response);
367     HILOG_INFO(LOG_CORE, "StartSession from '%s'", context->peer().c_str());
368 
369     uint32_t sessionId = request->session_id();
370     HILOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
371 
372     // copy plugin configs from request
373     std::vector<ProfilerPluginConfig> newPluginConfigs;
374     newPluginConfigs.reserve(request->update_configs_size());
375     for (int i = 0; i < request->update_configs_size(); i++) {
376         HILOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
377         newPluginConfigs.push_back(request->update_configs(i));
378     }
379 
380     // get plugin names in request
381     std::vector<std::string> requestNames;
382     std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
383                    [](auto& config) { return config.name(); });
384     std::sort(requestNames.begin(), requestNames.end());
385 
386     auto ctx = GetSessionContext(sessionId);
387     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
388 
389     // get intersection set of requestNames and pluginNames
390     std::vector<std::string> updateNames;
391     std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
392                           std::back_inserter(updateNames));
393 
394     if (updateNames.size() > 0) {
395         // remove old plugin sessions
396         pluginSessionManager_->RemovePluginSessions(updateNames);
397 
398         // update plugin configs
399         size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
400 
401         // re-create plugin sessions
402         CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
403         HILOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
404     }
405 
406     // start plugin sessions with configs
407     CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
408     HILOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
409     return Status::OK;
410 }
411 
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)412 Status ProfilerService::FetchData(ServerContext* context,
413                                   const ::FetchDataRequest* request,
414                                   ServerWriter<::FetchDataResponse>* writer)
415 {
416     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
417     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
418     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
419 
420     HILOG_INFO(LOG_CORE, "FetchData from '%s'", context->peer().c_str());
421     CHECK_POINTER_NOTNULL(request, "request invalid!");
422     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
423 
424     uint32_t sessionId = request->session_id();
425     HILOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
426 
427     auto ctx = GetSessionContext(sessionId);
428     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
429 
430     // check each plugin session states
431     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
432                           "session status invalid!");
433 
434     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
435         auto dataRepeater = ctx->dataRepeater;
436         CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
437 
438         while (1) {
439             ctx = GetSessionContext(sessionId);
440             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
441 
442             FetchDataResponse response;
443             response.set_status(StatusCode::OK);
444             response.set_response_id(++responseIdCounter_);
445 
446             std::vector<ProfilerPluginDataPtr> pluginDataVec;
447             int count = dataRepeater->TakePluginData(pluginDataVec);
448             if (count > 0) {
449                 response.set_has_more(true);
450                 for (int i = 0; i < count; i++) {
451                     auto data = response.add_plugin_data();
452                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
453                     CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
454                     *data = *pluginDataVec[i];
455                 }
456             } else {
457                 response.set_has_more(false);
458                 HILOG_INFO(LOG_CORE, "no more data need to fill to response!");
459             }
460 
461             bool sendSuccess = writer->Write(response);
462             if (count <= 0 || !sendSuccess) {
463                 HILOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
464                 break;
465             }
466         }
467     }
468 
469     HILOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
470     return Status::OK;
471 }
472 
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)473 Status ProfilerService::StopSession(ServerContext* context,
474                                     const ::StopSessionRequest* request,
475                                     ::StopSessionResponse* response)
476 {
477     CHECK_REQUEST_RESPONSE(context, request, response);
478     HILOG_INFO(LOG_CORE, "StopSession from '%s'", context->peer().c_str());
479 
480     uint32_t sessionId = request->session_id();
481     HILOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
482 
483     auto ctx = GetSessionContext(sessionId);
484     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
485 
486     CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
487     HILOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
488     return Status::OK;
489 }
490 
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)491 Status ProfilerService::DestroySession(ServerContext* context,
492                                        const ::DestroySessionRequest* request,
493                                        ::DestroySessionResponse* response)
494 {
495     CHECK_REQUEST_RESPONSE(context, request, response);
496     HILOG_INFO(LOG_CORE, "DestroySession from '%s'", context->peer().c_str());
497 
498     uint32_t sessionId = request->session_id();
499     HILOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
500 
501     auto ctx = GetSessionContext(sessionId);
502     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
503 
504     CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
505     CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
506                           "remove plugin session FAILED!");
507     HILOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
508     return Status::OK;
509 }
510 
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)511 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
512                                             const ::KeepSessionRequest* request,
513                                             ::KeepSessionResponse* response)
514 {
515     CHECK_REQUEST_RESPONSE(context, request, response);
516     HILOG_INFO(LOG_CORE, "KeepSession from '%s'", context->peer().c_str());
517 
518     uint32_t sessionId = request->session_id();
519     HILOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
520 
521     auto ctx = GetSessionContext(sessionId);
522     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
523 
524     // update keep alive time if keep_alive_time parameter provided
525     auto keepAliveTime = request->keep_alive_time();
526     if (keepAliveTime) {
527         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
528         ctx->SetKeepAliveTime(keepAliveTime);
529     }
530 
531     // reschedule session timeout task
532     if (ctx->timeoutTask.size() > 0) {
533         ctx->StopSessionExpireTask();
534         ctx->StartSessionExpireTask();
535     }
536     HILOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
537     return Status::OK;
538 }
539 
540 struct LoggingInterceptor : public grpc::experimental::Interceptor {
541 public:
LoggingInterceptorLoggingInterceptor542     explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
543 
InterceptLoggingInterceptor544     void Intercept(experimental::InterceptorBatchMethods* methods) override
545     {
546         const char* method = info_->method();
547         if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
548             HILOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
549         } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
550             HILOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
551         }
552         methods->Proceed();
553     }
554 
555 private:
556     grpc::experimental::ServerRpcInfo* info_ = nullptr;
557 };
558 
559 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
560 protected:
CreateServerInterceptorInterceptorFactory561     grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
562     {
563         return new LoggingInterceptor(info);
564     }
565 };
566 
StartService(const std::string & listenUri)567 bool ProfilerService::StartService(const std::string& listenUri)
568 {
569     if (listenUri == "") {
570         HILOG_WARN(LOG_CORE, "listenUri empty!");
571         return false;
572     }
573 
574     ServerBuilder builder;
575     builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
576     builder.RegisterService(this);
577 
578     auto server = builder.BuildAndStart();
579     CHECK_NOTNULL(server, false, "start service on %s failed!", listenUri.c_str());
580     HILOG_INFO(LOG_CORE, "Server listening on %s", listenUri.c_str());
581 
582     server_ = std::move(server);
583     return true;
584 }
585 
WaitServiceDone()586 void ProfilerService::WaitServiceDone()
587 {
588     if (server_) {
589         HILOG_INFO(LOG_CORE, "waiting Server...");
590         server_->Wait();
591         HILOG_INFO(LOG_CORE, "Server done!");
592     }
593 }
594 
StopService()595 void ProfilerService::StopService()
596 {
597     if (server_) {
598         server_->Shutdown();
599         HILOG_INFO(LOG_CORE, "Server stop done!");
600     }
601 }
602