• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "trace_file_writer.h"
28 #include <cinttypes>
29 
30 using namespace ::grpc;
31 using PluginContextPtr = std::shared_ptr<PluginContext>;
32 
33 #define CHECK_REQUEST_RESPONSE(context, request, response)         \
34     do {                                                          \
35         CHECK_POINTER_NOTNULL(context, "context ptr invalid!");   \
36         CHECK_POINTER_NOTNULL(request, "request ptr invalid!");    \
37         CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
38     } while (0)
39 
40 #define CHECK_POINTER_NOTNULL(ptr, errorMessage)                              \
41     do {                                                                      \
42         if ((ptr) == nullptr) {                                                 \
43             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
44             return {StatusCode::INTERNAL, errorMessage};                      \
45         }                                                                     \
46     } while (0)
47 
48 #define CHECK_EXPRESSION_TRUE(expr, errorMessage)                            \
49     do {                                                                     \
50         if (!(expr)) {                                                       \
51             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
52             return {StatusCode::INTERNAL, (errorMessage)};                   \
53         }                                                                    \
54     } while (0)
55 
56 namespace {
57 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
58 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
59 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
60 constexpr uint64_t CHECK_HEARTBEAT_INTERVAL = 500;
61 const std::string HIPROFILER_PLUGINS_NAME("hiprofiler_plugins");
62 #ifdef  PERFORMANCE_DEBUG
63 constexpr uint64_t S_TO_NS = 1000 * 1000 * 1000;
64 #endif
65 } // namespace
66 
ProfilerService(const PluginServicePtr & pluginService)67 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
68     : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
69 {
70     pluginService_->SetPluginSessionManager(pluginSessionManager_);
71 }
72 
~ProfilerService()73 ProfilerService::~ProfilerService() {}
74 
~SessionContext()75 ProfilerService::SessionContext::~SessionContext()
76 {
77     PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
78     if (offlineScheduleTaskFd != -1) {
79         stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
80     }
81     StopSessionExpireTask(service->removeTask_);
82     service->pluginSessionManager_->RemovePluginSessions(pluginNames);
83 }
84 
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)85 Status ProfilerService::GetCapabilities(ServerContext* context,
86                                         const ::GetCapabilitiesRequest* request,
87                                         ::GetCapabilitiesResponse* response)
88 {
89     CHECK_REQUEST_RESPONSE(context, request, response);
90     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
91 
92     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
93     std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
94 
95     response->set_status(StatusCode::OK);
96     for (size_t i = 0; i < capabilities.size(); i++) {
97         *response->add_capabilities() = capabilities[i];
98     }
99     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
100     return Status::OK;
101 }
102 
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)103 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
104 {
105     std::unordered_map<std::string, size_t> nameIndex;
106     for (size_t i = 0; i < pluginConfigs.size(); i++) {
107         nameIndex[pluginConfigs[i].name()] = i;
108     }
109 
110     size_t updateCount = 0;
111     for (auto& cfg : newPluginConfigs) {
112         auto it = nameIndex.find(cfg.name());
113         if (it != nameIndex.end()) {
114             size_t index = it->second;
115             pluginConfigs[index] = cfg;
116             updateCount++;
117         }
118     }
119     return updateCount;
120 }
121 
CreatePluginSessions()122 bool ProfilerService::SessionContext::CreatePluginSessions()
123 {
124     if (bufferConfigs.size() > 0) { // with buffer configs
125         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater,
126             stateRepeater), false, "create plugin sessions with buffer configs failed!");
127     } else { // without buffer configs
128         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater, stateRepeater),
129                    false, "create plugin sessions without buffer configs failed!");
130     }
131     return true;
132 }
133 
StartPluginSessions()134 bool ProfilerService::SessionContext::StartPluginSessions()
135 {
136     std::unique_lock<std::mutex> lock(sessionMutex);
137 
138     // if dataRepeater exists, reset it to usable state.
139     if (dataRepeater) {
140         dataRepeater->Reset();
141     }
142 
143     if (stateRepeater) {
144         stateRepeater->Reset();
145     }
146 
147     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
148         uint32_t sampleDuration = sessionConfig.sample_duration();
149         if (sampleDuration > 0) {
150             traceFileWriter->SetTimeSource();
151             std::weak_ptr<SessionContext> weakCtx(shared_from_this());
152             // start offline trace timeout task
153             offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
154                 [weakCtx]() {
155                     if (auto ctx = weakCtx.lock(); ctx != nullptr) {
156                         ctx->StopPluginSessions();
157                     }
158                 },
159                 sampleDuration,
160                 true);
161             // keep_alive_time not set by client, but the sample_duration setted
162             if (sessionConfig.keep_alive_time() == 0) {
163                 // use sample_duration add a little time to set keep_alive_time
164                 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
165                 StartSessionExpireTask(service->removeTask_);
166             }
167         }
168     }
169 
170     // start each plugin sessions
171     service->pluginSessionManager_->StartPluginSessions(pluginNames);
172     return true;
173 }
174 
StopPluginSessions()175 bool ProfilerService::SessionContext::StopPluginSessions()
176 {
177     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
178         if (offlineScheduleTaskFd != -1) {
179             stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
180             offlineScheduleTaskFd = -1;
181         } else {
182             return true;
183         }
184         traceFileWriter->SetDurationTime();
185     }
186 
187     // stop each plugin sessions
188     service->pluginSessionManager_->StopPluginSessions(pluginNames);
189     // stop epoll thread receiving shared memory messages
190     service->pluginService_->StopEpollThread();
191 
192     // Read the remaining data of shared memory of all plugins.
193     for (auto& name : pluginNames) {
194         service->pluginService_->FlushAllData(name);
195     }
196     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
197         // update file header.
198         traceFileWriter->Finish();
199     }
200 
201     // make sure FetchData thread exit
202     if (dataRepeater) {
203         dataRepeater->Close();
204     }
205 
206     if (stateRepeater) {
207         stateRepeater->Close();
208     }
209     return true;
210 }
211 
212 namespace {
IsValidKeepAliveTime(uint32_t timeout)213 bool IsValidKeepAliveTime(uint32_t timeout)
214 {
215     if (timeout < MIN_SESSION_TIMEOUT_MS) {
216         return false;
217     }
218     if (timeout > MAX_SESSION_TIMEOUT_MS) {
219         return false;
220     }
221     return true;
222 }
223 }  // namespace
224 
SetKeepAliveTime(uint32_t timeout)225 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
226 {
227     if (timeout > 0) {
228         sessionConfig.set_keep_alive_time(timeout);
229     }
230 }
231 
StartSessionExpireTask(ScheduleTaskManager & task)232 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
233 {
234     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
235         timeoutScheduleTaskFd = task.ScheduleTask(
236             std::bind(&ProfilerService::RemoveSessionContext, service, id),
237             sessionConfig.keep_alive_time(), true);
238     }
239 }
240 
StopSessionExpireTask(ScheduleTaskManager & task)241 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
242 {
243     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
244         task.UnscheduleTask(timeoutScheduleTaskFd);
245         timeoutScheduleTaskFd = -1;
246     }
247 }
248 
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)249 Status ProfilerService::CreateSession(ServerContext* context,
250                                       const ::CreateSessionRequest* request,
251                                       ::CreateSessionResponse* response)
252 {
253     CHECK_REQUEST_RESPONSE(context, request, response);
254     CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
255     // check plugin configs
256     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
257     const int nConfigs = request->plugin_configs_size();
258     CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
259 
260     // check buffer configs
261     std::shared_ptr<ProfilerSessionConfig> sessionConfig =
262     std::make_shared<ProfilerSessionConfig>(request->session_config());
263     const int nBuffers = sessionConfig->buffers_size();
264     CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
265     // copy plugin configs from request
266     std::vector<ProfilerPluginConfig> pluginConfigs;
267     pluginConfigs.reserve(nConfigs);
268 
269     for (int i = 0; i < nConfigs; i++) {
270         pluginConfigs.push_back(request->plugin_configs(i));
271     }
272 
273     if (pluginConfigs.empty()) {
274         PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
275         return Status(StatusCode::PERMISSION_DENIED, "");
276     }
277     // copy buffer configs
278     std::vector<BufferConfig> bufferConfigs;
279     if (nBuffers == 1) {
280         // if only one buffer config provided, all plugin use the same buffer config
281         bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
282     } else if (nBuffers > 0) {
283         // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
284         bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
285     }
286     PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
287     std::vector<std::string> pluginNames;
288     std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
289                    [](ProfilerPluginConfig& config) { return config.name(); });
290     std::sort(pluginNames.begin(), pluginNames.end());
291     //set session configs
292     pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
293 
294     // create TraceFileWriter for offline mode
295     TraceFileWriterPtr traceWriter;
296     std::shared_ptr<ProfilerDataRepeater<ProfilerPluginData>> dataRepeater = nullptr;
297     if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
298         auto resultFile = sessionConfig->result_file();
299         CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
300         traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
301             sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
302         CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
303         pluginService_->SetTraceWriter(traceWriter);
304         for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
305             ProfilerPluginData pluginData;
306             pluginData.set_name(pluginConfigs[i].name() + "_config");
307             pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
308             pluginData.set_data(pluginConfigs[i].config_data());
309             std::vector<char> msgData(pluginData.ByteSizeLong());
310             if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
311                 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
312             }
313             traceWriter->SetPluginConfig(msgData.data(), msgData.size());
314         }
315         traceWriter->Flush();
316     } else {
317         dataRepeater = std::make_shared<ProfilerDataRepeater<ProfilerPluginData>>(DEFAULT_REPEATER_BUFFER_SIZE);
318         CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
319     }
320     ProfilerPluginState* state = response->add_plugin_status();
321     state->set_version(COMMON::STATE_VERSION);
322     // create session context
323     auto ctx = std::make_shared<SessionContext>();
324     CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
325 
326     // fill fields of SessionContext
327     ctx->service = this;
328     if (dataRepeater != nullptr) {
329         ctx->dataRepeater = dataRepeater;
330     }
331     if (stateRepeater_ != nullptr) {
332         ctx->stateRepeater = stateRepeater_;
333     }
334     if (traceWriter != nullptr) {
335         ctx->traceFileWriter = traceWriter;
336     }
337     ctx->sessionConfig = *sessionConfig;
338     ctx->pluginNames = std::move(pluginNames);
339     ctx->pluginConfigs = std::move(pluginConfigs);
340     ctx->bufferConfigs = std::move(bufferConfigs);
341 
342     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
343         if (stateRepeater_ == nullptr) {
344             stateRepeater_ = std::make_shared<ProfilerDataRepeater<ProfilerPluginState>>(DEFAULT_REPEATER_BUFFER_SIZE);
345         }
346         if (stateRepeater_ != nullptr) {
347             ctx->stateRepeater = stateRepeater_;
348         }
349     }
350     // create plugin sessions
351     CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
352     // alloc new session id
353     uint32_t sessionId = ++sessionIdCounter_;
354     ctx->id = sessionId;
355     ctx->name = "session-" + std::to_string(sessionId);
356 
357     // add {sessionId, ctx} to map
358     CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
359 
360     // create session expire schedule task
361     auto keepAliveTime = sessionConfig->keep_alive_time();
362     if (keepAliveTime) {
363         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
364         // create schedule task for session timeout feature
365         ctx->SetKeepAliveTime(keepAliveTime);
366         ctx->StartSessionExpireTask(removeTask_);
367     }
368 
369     // prepare response data fields
370     response->set_status(0);
371     response->set_session_id(sessionId);
372 
373     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
374     return Status::OK;
375 }
376 
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)377 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
378 {
379     std::unique_lock<std::mutex> lock(sessionContextMutex_);
380     CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
381     sessionContext_[sessionId] = sessionCtx;
382     return true;
383 }
384 
CheckClientStatus()385 void ProfilerService::CheckClientStatus()
386 {
387     int pidVal = 0;
388     if (!COMMON::IsProcessExist(HIPROFILER_PLUGINS_NAME, pidVal)) {
389         ProfilerPluginState pluginState;
390         pluginState.set_version(COMMON::STATE_VERSION);
391         pluginState.set_event_detail("hiprofiler_plugins process is dead!");
392         pluginState.set_event(ProfilerPluginState::RUNNING_ERR);
393         CHECK_NOTNULL(stateRepeater_, NO_RETVAL, "cannot push event, stateRepeater is null!");
394         stateRepeater_->PutPluginData(std::make_shared<ProfilerPluginState>(pluginState));
395         if (heartbeatFd_ != -1) {
396             checkStatusManager_.UnscheduleTask(heartbeatFd_);
397         }
398     }
399 }
400 
GetSessionContext(uint32_t sessionId) const401 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
402 {
403     std::unique_lock<std::mutex> lock(sessionContextMutex_);
404     auto it = sessionContext_.find(sessionId);
405     if (it != sessionContext_.end()) {
406         auto ptr = it->second;
407         return ptr;
408     }
409     return nullptr;
410 }
411 
RemoveSessionContext(uint32_t sessionId)412 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
413 {
414     std::unique_lock<std::mutex> lock(sessionContextMutex_);
415     auto it = sessionContext_.find(sessionId);
416     if (it != sessionContext_.end()) {
417         auto ptr = it->second;
418         PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
419         sessionContext_.erase(it);
420         return true;
421     }
422     return false;
423 }
424 
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)425 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
426     const std::string& outputFile, const std::string& pluginVersion)
427 {
428     if (pluginName.empty() || outputFile.empty()) {
429         PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
430                            pluginName.c_str(), outputFile.c_str());
431         return;
432     }
433     auto retFile = COMMON::CheckNotExistsFilePath(outputFile);
434     if (!retFile.first) {
435         PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, outputFile.c_str());
436         return;
437     }
438     std::ifstream fsFile {}; // read from output file
439     fsFile.open(retFile.second, std::ios_base::in | std::ios_base::binary);
440     if (!fsFile.good()) {
441         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
442         return;
443     }
444     auto targetFile = COMMON::CheckNotExistsFilePath(resultFile);
445     if (!targetFile.first) {
446         PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, resultFile.c_str());
447         return;
448     }
449     std::ofstream fsTarget {}; // write to profiler ouput file
450     fsTarget.open(targetFile.second, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
451     if (!fsTarget.good()) {
452         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
453         return;
454     }
455     fsTarget.seekp(0, std::ios_base::end);
456     int posFile = fsTarget.tellp(); // for update sha256
457 
458     TraceFileHeader header {};
459     if (pluginName == "hiperf-plugin") {
460         header.data_.dataType = DataType::HIPERF_DATA;
461     } else {
462         header.data_.dataType = DataType::STANDALONE_DATA;
463     }
464     fsFile.seekg(0, std::ios_base::end);
465     uint64_t fileSize = (uint64_t)(fsFile.tellg());
466     header.data_.length += fileSize;
467     size_t pluginSize = sizeof(header.data_.standalonePluginName);
468     int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
469     if (ret != EOK) {
470         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
471         return;
472     }
473     pluginSize = sizeof(header.data_.pluginVersion);
474     ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
475     if (ret != EOK) {
476         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
477         return;
478     }
479     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
480     if (!fsTarget.good()) {
481         PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
482         return;
483     }
484 
485     SHA256_CTX sha256Ctx;
486     SHA256_Init(&sha256Ctx);
487     constexpr uint64_t bufSize = 4 * 1024 * 1024;
488     std::vector<char> buf(bufSize);
489     uint64_t readSize = 0;
490     fsFile.seekg(0);
491     while ((readSize = std::min(bufSize, fileSize)) > 0) {
492         fsFile.read(buf.data(), readSize);
493         fsTarget.write(buf.data(), readSize);
494         if (!fsTarget.good()) {
495             PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
496             return;
497         }
498         fileSize -= readSize;
499 
500         SHA256_Update(&sha256Ctx, buf.data(), readSize);
501     }
502     SHA256_Final(header.data_.sha256, &sha256Ctx);
503     fsTarget.seekp(posFile, std::ios_base::beg);
504     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
505 
506     fsFile.close();
507     fsTarget.close();
508 
509     PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
510 }
511 
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)512 Status ProfilerService::StartSession(ServerContext* context,
513                                      const ::StartSessionRequest* request,
514                                      ::StartSessionResponse* response)
515 {
516     CHECK_REQUEST_RESPONSE(context, request, response);
517 
518     uint32_t sessionId = request->session_id();
519     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
520 
521     // copy plugin configs from request
522     std::vector<ProfilerPluginConfig> newPluginConfigs;
523     newPluginConfigs.reserve(request->update_configs_size());
524     for (int i = 0; i < request->update_configs_size(); i++) {
525         PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
526         newPluginConfigs.push_back(request->update_configs(i));
527     }
528 
529     // get plugin names in request
530     std::vector<std::string> requestNames;
531     std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
532                    [](auto& config) { return config.name(); });
533     std::sort(requestNames.begin(), requestNames.end());
534 
535     auto ctx = GetSessionContext(sessionId);
536     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
537 
538     // start epoll thread to receive shared memory messages.
539     CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
540 
541     // get intersection set of requestNames and pluginNames
542     std::vector<std::string> updateNames;
543     std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
544                           std::back_inserter(updateNames));
545 
546     if (updateNames.size() > 0) {
547         // remove old plugin sessions
548         pluginSessionManager_->RemovePluginSessions(updateNames);
549 
550         // update plugin configs
551         size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
552 
553         // re-create plugin sessions
554         CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
555         PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
556     }
557 
558     // start plugin sessions with configs
559     CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
560     ProfilerPluginState* state = response->add_plugin_status();
561     state->set_version(COMMON::STATE_VERSION);
562     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
563     if (heartbeatFd_ == -1) {
564         auto callback = [this] { this->CheckClientStatus(); };
565         heartbeatFd_ = checkStatusManager_.ScheduleTask(callback, CHECK_HEARTBEAT_INTERVAL);
566         if (heartbeatFd_ == -1) {
567             PROFILER_LOG_INFO(LOG_CORE, "schedule task failed for heartbeat check");
568         }
569     }
570     return Status::OK;
571 }
572 
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)573 Status ProfilerService::FetchData(ServerContext* context,
574                                   const ::FetchDataRequest* request,
575                                   ServerWriter<::FetchDataResponse>* writer)
576 {
577     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
578     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
579     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
580 
581     CHECK_POINTER_NOTNULL(request, "request invalid!");
582     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
583 
584     uint32_t sessionId = request->session_id();
585     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
586 
587     auto ctx = GetSessionContext(sessionId);
588     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
589 
590     // check each plugin session states
591     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
592                           "session status invalid!");
593 
594     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
595         auto dataRepeater = ctx->dataRepeater;
596         CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
597 
598         while (1) {
599             ctx = GetSessionContext(sessionId);
600             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
601 
602             FetchDataResponse response;
603             response.set_status(StatusCode::OK);
604             response.set_response_id(++responseIdCounter_);
605 
606             std::vector<ProfilerPluginDataPtr> pluginDataVec;
607             int count = dataRepeater->TakePluginData(pluginDataVec);
608             if (count > 0) {
609                 response.set_has_more(true);
610                 for (int i = 0; i < count; i++) {
611                     auto data = response.add_plugin_data();
612                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
613                     CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
614                     *data = *pluginDataVec[i];
615                 }
616             } else {
617                 response.set_has_more(false);
618                 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
619             }
620 
621             bool sendSuccess = writer->Write(response);
622             if (count <= 0 || !sendSuccess) {
623                 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
624                 break;
625             }
626         }
627     }
628 
629     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
630     return Status::OK;
631 }
632 
SubscribeProfilerEvt(ServerContext * context,const::SubscribeProfilerEvtRequest * request,ServerWriter<::SubscribeProfilerEvtResponse> * writer)633 Status ProfilerService::SubscribeProfilerEvt(ServerContext* context,
634                                              const ::SubscribeProfilerEvtRequest* request,
635                                              ServerWriter<::SubscribeProfilerEvtResponse>* writer)
636 {
637     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
638     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
639     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
640 
641     CHECK_POINTER_NOTNULL(request, "request invalid!");
642     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
643 
644     uint32_t sessionId = request->session_id();
645     PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u start", request->request_id(), sessionId);
646 
647     auto ctx = GetSessionContext(sessionId);
648     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
649 
650     // check each plugin session states
651     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
652                           "session status invalid!");
653 
654     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
655         auto stateRepeater = ctx->stateRepeater;
656         CHECK_POINTER_NOTNULL(stateRepeater, "state repeater invalid!");
657 
658         while (1) {
659             ctx = GetSessionContext(sessionId);
660             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
661 
662             SubscribeProfilerEvtResponse response;
663             response.set_status(StatusCode::OK);
664             response.set_response_id(++responseIdCounter_);
665 
666             std::vector<ProfilerPluginStatePtr> pluginStateVec;
667             int count = stateRepeater->TakePluginData(pluginStateVec);
668             if (count > 0) {
669                 response.set_has_more(true);
670                 for (int i = 0; i < count; i++) {
671                     auto data = response.add_plugin_state();
672                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
673                     CHECK_POINTER_NOTNULL(pluginStateVec[i], "plugin state invalid");
674                     *data = *pluginStateVec[i];
675                 }
676             } else {
677                 response.set_has_more(false);
678                 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
679             }
680 
681             bool sendSuccess = writer->Write(response);
682             if (count <= 0 || !sendSuccess) {
683                 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
684                 break;
685             }
686         }
687     }
688 
689     PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u done!", request->request_id(), sessionId);
690     return Status::OK;
691 }
692 
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)693 Status ProfilerService::StopSession(ServerContext* context,
694                                     const ::StopSessionRequest* request,
695                                     ::StopSessionResponse* response)
696 {
697 #ifdef PERFORMANCE_DEBUG
698     struct timespec start = {};
699     clock_gettime(CLOCK_REALTIME, &start);
700 #endif
701     CHECK_REQUEST_RESPONSE(context, request, response);
702     uint32_t sessionId = request->session_id();
703     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
704 
705     auto ctx = GetSessionContext(sessionId);
706     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
707     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
708         CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
709         ctx->traceFileWriter.get()->SetStopSplitFile(true);
710     }
711     CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
712     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
713 #ifdef PERFORMANCE_DEBUG
714     struct timespec end = {};
715     clock_gettime(CLOCK_REALTIME, &end);
716     uint64_t costTime = (end.tv_sec - start.tv_sec) * S_TO_NS + (end.tv_nsec - start.tv_nsec);
717     PROFILER_LOG_INFO(LOG_CORE, "StopSession cost time  %" PRIu64 " ns", costTime);
718 #endif
719     return Status::OK;
720 }
721 
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)722 Status ProfilerService::DestroySession(ServerContext* context,
723                                        const ::DestroySessionRequest* request,
724                                        ::DestroySessionResponse* response)
725 {
726     CHECK_REQUEST_RESPONSE(context, request, response);
727 
728     uint32_t sessionId = request->session_id();
729     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
730 
731     auto ctx = GetSessionContext(sessionId);
732     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
733 
734     CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
735     CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
736                           "remove plugin session FAILED!");
737     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
738 
739     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
740         uint32_t pluginId = 0;
741         PluginContextPtr pluginCtx = nullptr;
742         for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
743             auto pluginName = ctx->pluginNames[i];
744             std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
745             if (pluginCtx->isStandaloneFileData == true) {
746                 if (!ctx->sessionConfig.split_file()) {
747                     MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
748                                         pluginCtx->pluginVersion);
749                 }
750             }
751         }
752     }
753 
754     return Status::OK;
755 }
756 
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)757 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
758                                             const ::KeepSessionRequest* request,
759                                             ::KeepSessionResponse* response)
760 {
761     CHECK_REQUEST_RESPONSE(context, request, response);
762     uint32_t sessionId = request->session_id();
763     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
764 
765     auto ctx = GetSessionContext(sessionId);
766     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
767 
768     // update keep alive time if keep_alive_time parameter provided
769     auto keepAliveTime = request->keep_alive_time();
770     if (keepAliveTime) {
771         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
772         ctx->SetKeepAliveTime(keepAliveTime);
773     }
774 
775     // reschedule session timeout task
776     if (ctx->sessionConfig.keep_alive_time() > 0) {
777         ctx->StopSessionExpireTask(removeTask_);
778         ctx->StartSessionExpireTask(removeTask_);
779     }
780     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
781     return Status::OK;
782 }
783 
784 struct LoggingInterceptor : public grpc::experimental::Interceptor {
785 public:
LoggingInterceptorLoggingInterceptor786     explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
787 
InterceptLoggingInterceptor788     void Intercept(experimental::InterceptorBatchMethods* methods) override
789     {
790         const char* method = info_->method();
791         if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
792             PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
793         } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
794             PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
795         }
796         methods->Proceed();
797     }
798 
799 private:
800     grpc::experimental::ServerRpcInfo* info_ = nullptr;
801 };
802 
803 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
804 protected:
CreateServerInterceptorInterceptorFactory805     grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
806     {
807         return new LoggingInterceptor(info);
808     }
809 };
810 
StartService(const std::string & listenUri)811 bool ProfilerService::StartService(const std::string& listenUri)
812 {
813     CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
814 
815     ServerBuilder builder;
816     builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
817     builder.RegisterService(this);
818 
819     auto server = builder.BuildAndStart();
820     CHECK_NOTNULL(server, false, "start service failed!");
821     PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
822     server_ = std::move(server);
823     return true;
824 }
825 
WaitServiceDone()826 void ProfilerService::WaitServiceDone()
827 {
828     if (server_) {
829         PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
830         server_->Wait();
831         PROFILER_LOG_INFO(LOG_CORE, "Server done!");
832     }
833 }
834 
StopService()835 void ProfilerService::StopService()
836 {
837     if (server_) {
838         server_->Shutdown();
839         PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
840     }
841 }