• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "schedule_task_manager.h"
28 #include "trace_file_writer.h"
29 
30 using namespace ::grpc;
31 using PluginContextPtr = std::shared_ptr<PluginContext>;
32 
33 #define CHECK_REQUEST_RESPONSE(context, request, response)         \
34     do {                                                          \
35         CHECK_POINTER_NOTNULL(context, "context ptr invalid!");   \
36         CHECK_POINTER_NOTNULL(request, "request ptr invalid!");    \
37         CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
38     } while (0)
39 
40 #define CHECK_POINTER_NOTNULL(ptr, errorMessage)                              \
41     do {                                                                      \
42         if ((ptr) == nullptr) {                                                 \
43             HILOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
44             return {StatusCode::INTERNAL, errorMessage};                      \
45         }                                                                     \
46     } while (0)
47 
48 #define CHECK_EXPRESSION_TRUE(expr, errorMessage)                            \
49     do {                                                                     \
50         if (!(expr)) {                                                       \
51             HILOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
52             return {StatusCode::INTERNAL, (errorMessage)};                   \
53         }                                                                    \
54     } while (0)
55 
56 namespace {
57 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
58 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
59 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
60 } // namespace
61 
ProfilerService(const PluginServicePtr & pluginService)62 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
63     : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
64 {
65     pluginService_->SetPluginSessionManager(pluginSessionManager_);
66 }
67 
~ProfilerService()68 ProfilerService::~ProfilerService() {}
69 
~SessionContext()70 ProfilerService::SessionContext::~SessionContext()
71 {
72     HILOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
73     if (offlineTask.size() > 0) {
74         ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
75     }
76     StopSessionExpireTask();
77     service->pluginSessionManager_->RemovePluginSessions(pluginNames);
78 }
79 
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)80 Status ProfilerService::GetCapabilities(ServerContext* context,
81                                         const ::GetCapabilitiesRequest* request,
82                                         ::GetCapabilitiesResponse* response)
83 {
84     CHECK_REQUEST_RESPONSE(context, request, response);
85     HILOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
86 
87     HILOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
88     std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
89 
90     response->set_status(StatusCode::OK);
91     for (size_t i = 0; i < capabilities.size(); i++) {
92         *response->add_capabilities() = capabilities[i];
93     }
94     HILOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
95     return Status::OK;
96 }
97 
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)98 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
99 {
100     std::unordered_map<std::string, size_t> nameIndex;
101     for (size_t i = 0; i < pluginConfigs.size(); i++) {
102         nameIndex[pluginConfigs[i].name()] = i;
103     }
104 
105     size_t updateCount = 0;
106     for (auto& cfg : newPluginConfigs) {
107         auto it = nameIndex.find(cfg.name());
108         if (it != nameIndex.end()) {
109             int index = it->second;
110             pluginConfigs[index] = cfg;
111             updateCount++;
112         }
113     }
114     return updateCount;
115 }
116 
CreatePluginSessions()117 bool ProfilerService::SessionContext::CreatePluginSessions()
118 {
119     if (bufferConfigs.size() > 0) { // with buffer configs
120         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
121                    false, "create plugin sessions with buffer configs failed!");
122     } else { // without buffer configs
123         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
124                    "create plugin sessions without buffer configs failed!");
125     }
126     return true;
127 }
128 
StartPluginSessions()129 bool ProfilerService::SessionContext::StartPluginSessions()
130 {
131     std::unique_lock<std::mutex> lock(sessionMutex);
132 
133     // if dataRepeater exists, reset it to usable state.
134     if (dataRepeater) {
135         dataRepeater->Reset();
136     }
137 
138     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
139         uint32_t sampleDuration = sessionConfig.sample_duration();
140         if (sampleDuration > 0) {
141             offlineTask = "stop-session-" + std::to_string(id);
142             std::weak_ptr<SessionContext> weakCtx(shared_from_this());
143             // start offline trace timeout task
144             ScheduleTaskManager::GetInstance().ScheduleTask(
145                 offlineTask,
146                 [weakCtx]() {
147                     if (auto ctx = weakCtx.lock(); ctx != nullptr) {
148                         ctx->StopPluginSessions();
149                     }
150                 },
151                 std::chrono::milliseconds(0), // do not repeat
152                 std::chrono::milliseconds(sampleDuration));
153 
154             // keep_alive_time not set by client, but the sample_duration setted
155             if (sessionConfig.keep_alive_time() == 0) {
156                 // use sample_duration add a little time to set keep_alive_time
157                 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
158                 StartSessionExpireTask();
159             }
160         }
161     }
162 
163     // start each plugin sessions
164     service->pluginSessionManager_->StartPluginSessions(pluginNames);
165     return true;
166 }
167 
StopPluginSessions()168 bool ProfilerService::SessionContext::StopPluginSessions()
169 {
170     std::unique_lock<std::mutex> lock(sessionMutex);
171     // stop each plugin sessions
172     service->pluginSessionManager_->StopPluginSessions(pluginNames);
173 
174     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
175         if (offlineTask.size() > 0) {
176             ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
177         }
178         traceFileWriter->Finish();
179     }
180 
181     // make sure FetchData thread exit
182     if (dataRepeater) {
183         dataRepeater->Close();
184     }
185     return true;
186 }
187 
188 namespace {
IsValidKeepAliveTime(uint32_t timeout)189 bool IsValidKeepAliveTime(uint32_t timeout)
190 {
191     if (timeout < MIN_SESSION_TIMEOUT_MS) {
192         return false;
193     }
194     if (timeout > MAX_SESSION_TIMEOUT_MS) {
195         return false;
196     }
197     return true;
198 }
199 }  // namespace
200 
SetKeepAliveTime(uint32_t timeout)201 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
202 {
203     if (timeout > 0) {
204         timeoutTask = "timeout-session-" + std::to_string(id);
205         sessionConfig.set_keep_alive_time(timeout);
206     }
207 }
208 
StartSessionExpireTask()209 void ProfilerService::SessionContext::StartSessionExpireTask()
210 {
211     if (timeoutTask.size() > 0) {
212         ScheduleTaskManager::GetInstance().ScheduleTask(timeoutTask,
213                                                         std::bind(&ProfilerService::RemoveSessionContext, service, id),
214                                                         std::chrono::milliseconds(0), // do not repeat
215                                                         std::chrono::milliseconds(sessionConfig.keep_alive_time()));
216     }
217 }
218 
StopSessionExpireTask()219 void ProfilerService::SessionContext::StopSessionExpireTask()
220 {
221     if (timeoutTask.size() > 0) {
222         ScheduleTaskManager::GetInstance().UnscheduleTask(timeoutTask);
223     }
224 }
225 
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)226 Status ProfilerService::CreateSession(ServerContext* context,
227                                       const ::CreateSessionRequest* request,
228                                       ::CreateSessionResponse* response)
229 {
230     CHECK_REQUEST_RESPONSE(context, request, response);
231     HILOG_INFO(LOG_CORE, "CreateSession from '%s'", context->peer().c_str());
232     CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
233 
234     // check plugin configs
235     HILOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
236     const int nConfigs = request->plugin_configs_size();
237     CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
238 
239     // check buffer configs
240     ProfilerSessionConfig sessionConfig = request->session_config();
241     pluginService_->SetProfilerSessionConfig(sessionConfig);
242     const int nBuffers = sessionConfig.buffers_size();
243     CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
244     // copy plugin configs from request
245     std::vector<ProfilerPluginConfig> pluginConfigs;
246     pluginConfigs.reserve(nConfigs);
247 
248     for (int i = 0; i < nConfigs; i++) {
249         if (request->plugin_configs(i).name() == "nativehook" && getuid() != 0) {
250             NativeHookConfig hookConfig;
251             std::string cfgData = request->plugin_configs(i).config_data();
252             if (hookConfig.ParseFromArray(reinterpret_cast<const uint8_t*>(cfgData.c_str()), cfgData.size()) <= 0) {
253                 HILOG_ERROR(LOG_CORE, "%s: ParseFromArray failed", __func__);
254                 continue;
255             }
256             if (!COMMON::CheckApplicationPermission(hookConfig.pid(), hookConfig.process_name())) {
257                 HILOG_ERROR(LOG_CORE, "Application debug permisson denied!");
258                 continue;
259             }
260         }
261         pluginConfigs.push_back(request->plugin_configs(i));
262     }
263 
264     if (pluginConfigs.empty()) {
265         HILOG_ERROR(LOG_CORE, "No plugins are loaded!");
266         return Status(StatusCode::PERMISSION_DENIED, "");
267     }
268     // copy buffer configs
269     std::vector<BufferConfig> bufferConfigs;
270     if (nBuffers == 1) {
271         // if only one buffer config provided, all plugin use the same buffer config
272         bufferConfigs.resize(pluginConfigs.size(), sessionConfig.buffers(0));
273     } else if (nBuffers > 0) {
274         // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
275         bufferConfigs.assign(sessionConfig.buffers().begin(), sessionConfig.buffers().end());
276     }
277     HILOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
278     std::vector<std::string> pluginNames;
279     std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
280                    [](ProfilerPluginConfig& config) { return config.name(); });
281     std::sort(pluginNames.begin(), pluginNames.end());
282 
283     // create TraceFileWriter for offline mode
284     TraceFileWriterPtr traceWriter;
285     std::shared_ptr<ProfilerDataRepeater> dataRepeater = nullptr;
286     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
287         auto resultFile = sessionConfig.result_file();
288         CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
289         traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig.split_file(),
290             sessionConfig.split_file_max_size_mb(), sessionConfig.split_file_max_num());
291         CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
292         pluginService_->SetTraceWriter(traceWriter);
293         for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
294             ProfilerPluginData pluginData;
295             pluginData.set_name(pluginConfigs[i].name() + "_config");
296             pluginData.set_data(pluginConfigs[i].config_data());
297             std::vector<char> msgData(pluginData.ByteSizeLong());
298             if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
299                 HILOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
300             }
301             traceWriter->SetPluginConfig(msgData.data(), msgData.size());
302         }
303         traceWriter->Flush();
304     } else {
305         dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
306         CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
307     }
308 
309     // create session context
310     auto ctx = std::make_shared<SessionContext>();
311     CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
312 
313     // fill fields of SessionContext
314     ctx->service = this;
315     if (dataRepeater != nullptr) {
316         ctx->dataRepeater = dataRepeater;
317     }
318     if (traceWriter != nullptr) {
319         ctx->traceFileWriter = traceWriter;
320     }
321     ctx->sessionConfig = sessionConfig;
322     ctx->pluginNames = std::move(pluginNames);
323     ctx->pluginConfigs = std::move(pluginConfigs);
324     ctx->bufferConfigs = std::move(bufferConfigs);
325 
326     // create plugin sessions
327     CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
328     // alloc new session id
329     uint32_t sessionId = ++sessionIdCounter_;
330     ctx->id = sessionId;
331     ctx->name = "session-" + std::to_string(sessionId);
332 
333     // add {sessionId, ctx} to map
334     CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
335 
336     // create session expire schedule task
337     auto keepAliveTime = sessionConfig.keep_alive_time();
338     if (keepAliveTime) {
339         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
340         // create schedule task for session timeout feature
341         ctx->SetKeepAliveTime(keepAliveTime);
342         ctx->StartSessionExpireTask();
343     }
344 
345     // prepare response data fields
346     response->set_status(0);
347     response->set_session_id(sessionId);
348 
349     HILOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
350     return Status::OK;
351 }
352 
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)353 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
354 {
355     std::unique_lock<std::mutex> lock(sessionContextMutex_);
356     CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
357     sessionContext_[sessionId] = sessionCtx;
358     return true;
359 }
360 
GetSessionContext(uint32_t sessionId) const361 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
362 {
363     std::unique_lock<std::mutex> lock(sessionContextMutex_);
364     auto it = sessionContext_.find(sessionId);
365     if (it != sessionContext_.end()) {
366         auto ptr = it->second;
367         return ptr;
368     }
369     return nullptr;
370 }
371 
RemoveSessionContext(uint32_t sessionId)372 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
373 {
374     std::unique_lock<std::mutex> lock(sessionContextMutex_);
375     auto it = sessionContext_.find(sessionId);
376     if (it != sessionContext_.end()) {
377         auto ptr = it->second;
378         HILOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
379         sessionContext_.erase(it);
380         return true;
381     }
382     return false;
383 }
384 
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)385 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
386     const std::string& outputFile, const std::string& pluginVersion)
387 {
388     if (pluginName.empty() || outputFile.empty()) {
389         HILOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)", pluginName.c_str(), outputFile.c_str());
390         return;
391     }
392 
393     std::ifstream fsFile {}; // read from output file
394     fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
395     if (!fsFile.good()) {
396         HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
397         return;
398     }
399 
400     std::ofstream fsTarget {}; // write to profiler ouput file
401     fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
402     if (!fsTarget.good()) {
403         HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
404         return;
405     }
406     fsTarget.seekp(0, std::ios_base::end);
407     int posFile = fsTarget.tellp(); // for update sha256
408 
409     TraceFileHeader header {};
410     if (pluginName == "hiperf-plugin") {
411         header.data_.dataType = DataType::HIPERF_DATA;
412     } else {
413         header.data_.dataType = DataType::STANDALONE_DATA;
414     }
415     fsFile.seekg(0, std::ios_base::end);
416     uint64_t fileSize = (uint64_t)(fsFile.tellg());
417     header.data_.length += fileSize;
418     size_t pluginSize = sizeof(header.data_.standalonePluginName);
419     int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
420     if (ret != EOK) {
421         HILOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
422         return;
423     }
424     pluginSize = sizeof(header.data_.pluginVersion);
425     ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
426     if (ret != EOK) {
427         HILOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
428         return;
429     }
430     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
431     if (!fsTarget.good()) {
432         HILOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
433         return;
434     }
435 
436     SHA256_CTX sha256Ctx;
437     SHA256_Init(&sha256Ctx);
438     constexpr uint64_t bufSize = 4 * 1024 * 1024;
439     std::vector<char> buf(bufSize);
440     uint64_t readSize = 0;
441     fsFile.seekg(0);
442     while ((readSize = std::min(bufSize, fileSize)) > 0) {
443         fsFile.read(buf.data(), readSize);
444         fsTarget.write(buf.data(), readSize);
445         if (!fsTarget.good()) {
446             HILOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
447             return;
448         }
449         fileSize -= readSize;
450 
451         SHA256_Update(&sha256Ctx, buf.data(), readSize);
452     }
453     SHA256_Final(header.data_.sha256, &sha256Ctx);
454     fsTarget.seekp(posFile, std::ios_base::beg);
455     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
456 
457     fsFile.close();
458     fsTarget.close();
459 
460     HILOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
461 }
462 
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)463 Status ProfilerService::StartSession(ServerContext* context,
464                                      const ::StartSessionRequest* request,
465                                      ::StartSessionResponse* response)
466 {
467     CHECK_REQUEST_RESPONSE(context, request, response);
468     HILOG_INFO(LOG_CORE, "StartSession from '%s'", context->peer().c_str());
469 
470     uint32_t sessionId = request->session_id();
471     HILOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
472 
473     // copy plugin configs from request
474     std::vector<ProfilerPluginConfig> newPluginConfigs;
475     newPluginConfigs.reserve(request->update_configs_size());
476     for (int i = 0; i < request->update_configs_size(); i++) {
477         HILOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
478         newPluginConfigs.push_back(request->update_configs(i));
479     }
480 
481     // get plugin names in request
482     std::vector<std::string> requestNames;
483     std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
484                    [](auto& config) { return config.name(); });
485     std::sort(requestNames.begin(), requestNames.end());
486 
487     auto ctx = GetSessionContext(sessionId);
488     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
489 
490     // get intersection set of requestNames and pluginNames
491     std::vector<std::string> updateNames;
492     std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
493                           std::back_inserter(updateNames));
494 
495     if (updateNames.size() > 0) {
496         // remove old plugin sessions
497         pluginSessionManager_->RemovePluginSessions(updateNames);
498 
499         // update plugin configs
500         size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
501 
502         // re-create plugin sessions
503         CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
504         HILOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
505     }
506 
507     // start plugin sessions with configs
508     CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
509     HILOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
510     return Status::OK;
511 }
512 
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)513 Status ProfilerService::FetchData(ServerContext* context,
514                                   const ::FetchDataRequest* request,
515                                   ServerWriter<::FetchDataResponse>* writer)
516 {
517     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
518     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
519     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
520 
521     HILOG_INFO(LOG_CORE, "FetchData from '%s'", context->peer().c_str());
522     CHECK_POINTER_NOTNULL(request, "request invalid!");
523     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
524 
525     uint32_t sessionId = request->session_id();
526     HILOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
527 
528     auto ctx = GetSessionContext(sessionId);
529     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
530 
531     // check each plugin session states
532     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
533                           "session status invalid!");
534 
535     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
536         auto dataRepeater = ctx->dataRepeater;
537         CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
538 
539         while (1) {
540             ctx = GetSessionContext(sessionId);
541             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
542 
543             FetchDataResponse response;
544             response.set_status(StatusCode::OK);
545             response.set_response_id(++responseIdCounter_);
546 
547             std::vector<ProfilerPluginDataPtr> pluginDataVec;
548             int count = dataRepeater->TakePluginData(pluginDataVec);
549             if (count > 0) {
550                 response.set_has_more(true);
551                 for (int i = 0; i < count; i++) {
552                     auto data = response.add_plugin_data();
553                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
554                     CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
555                     *data = *pluginDataVec[i];
556                 }
557             } else {
558                 response.set_has_more(false);
559                 HILOG_INFO(LOG_CORE, "no more data need to fill to response!");
560             }
561 
562             bool sendSuccess = writer->Write(response);
563             if (count <= 0 || !sendSuccess) {
564                 HILOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
565                 break;
566             }
567         }
568     }
569 
570     HILOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
571     return Status::OK;
572 }
573 
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)574 Status ProfilerService::StopSession(ServerContext* context,
575                                     const ::StopSessionRequest* request,
576                                     ::StopSessionResponse* response)
577 {
578     CHECK_REQUEST_RESPONSE(context, request, response);
579     HILOG_INFO(LOG_CORE, "StopSession from '%s'", context->peer().c_str());
580 
581     uint32_t sessionId = request->session_id();
582     HILOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
583 
584     auto ctx = GetSessionContext(sessionId);
585     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
586     CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
587     HILOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
588     return Status::OK;
589 }
590 
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)591 Status ProfilerService::DestroySession(ServerContext* context,
592                                        const ::DestroySessionRequest* request,
593                                        ::DestroySessionResponse* response)
594 {
595     CHECK_REQUEST_RESPONSE(context, request, response);
596     HILOG_INFO(LOG_CORE, "DestroySession from '%s'", context->peer().c_str());
597 
598     uint32_t sessionId = request->session_id();
599     HILOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
600 
601     auto ctx = GetSessionContext(sessionId);
602     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
603 
604     CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
605     CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
606                           "remove plugin session FAILED!");
607     HILOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
608 
609     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
610         uint32_t pluginId = 0;
611         PluginContextPtr pluginCtx = nullptr;
612         for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
613             auto pluginName = ctx->pluginNames[i];
614             std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
615             if (pluginCtx->isStandaloneFileData == true) {
616                 std::string file = ctx->sessionConfig.result_file();
617                 if (ctx->sessionConfig.split_file()) {
618                     file = ctx->traceFileWriter.get()->Path();
619                 }
620                 MergeStandaloneFile(file, pluginName, pluginCtx->outFileName, pluginCtx->pluginVersion);
621             }
622         }
623     }
624 
625     return Status::OK;
626 }
627 
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)628 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
629                                             const ::KeepSessionRequest* request,
630                                             ::KeepSessionResponse* response)
631 {
632     CHECK_REQUEST_RESPONSE(context, request, response);
633     HILOG_INFO(LOG_CORE, "KeepSession from '%s'", context->peer().c_str());
634 
635     uint32_t sessionId = request->session_id();
636     HILOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
637 
638     auto ctx = GetSessionContext(sessionId);
639     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
640 
641     // update keep alive time if keep_alive_time parameter provided
642     auto keepAliveTime = request->keep_alive_time();
643     if (keepAliveTime) {
644         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
645         ctx->SetKeepAliveTime(keepAliveTime);
646     }
647 
648     // reschedule session timeout task
649     if (ctx->timeoutTask.size() > 0) {
650         ctx->StopSessionExpireTask();
651         ctx->StartSessionExpireTask();
652     }
653     HILOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
654     return Status::OK;
655 }
656 
657 struct LoggingInterceptor : public grpc::experimental::Interceptor {
658 public:
LoggingInterceptorLoggingInterceptor659     explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
660 
InterceptLoggingInterceptor661     void Intercept(experimental::InterceptorBatchMethods* methods) override
662     {
663         const char* method = info_->method();
664         if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
665             HILOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
666         } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
667             HILOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
668         }
669         methods->Proceed();
670     }
671 
672 private:
673     grpc::experimental::ServerRpcInfo* info_ = nullptr;
674 };
675 
676 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
677 protected:
CreateServerInterceptorInterceptorFactory678     grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
679     {
680         return new LoggingInterceptor(info);
681     }
682 };
683 
StartService(const std::string & listenUri)684 bool ProfilerService::StartService(const std::string& listenUri)
685 {
686     CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
687 
688     ServerBuilder builder;
689     builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
690     builder.RegisterService(this);
691 
692     auto server = builder.BuildAndStart();
693     CHECK_NOTNULL(server, false, "start service on %s failed!", listenUri.c_str());
694     HILOG_INFO(LOG_CORE, "Server listening on %s", listenUri.c_str());
695 
696     server_ = std::move(server);
697     return true;
698 }
699 
WaitServiceDone()700 void ProfilerService::WaitServiceDone()
701 {
702     if (server_) {
703         HILOG_INFO(LOG_CORE, "waiting Server...");
704         server_->Wait();
705         HILOG_INFO(LOG_CORE, "Server done!");
706     }
707 }
708 
StopService()709 void ProfilerService::StopService()
710 {
711     if (server_) {
712         server_->Shutdown();
713         HILOG_INFO(LOG_CORE, "Server stop done!");
714     }
715 }
716