• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 #include "common.h"
21 #include "logging.h"
22 #include "native_hook_config.pb.h"
23 #include "plugin_service.h"
24 #include "plugin_session.h"
25 #include "plugin_session_manager.h"
26 #include "profiler_capability_manager.h"
27 #include "profiler_data_repeater.h"
28 #include "trace_file_writer.h"
29 #include <cinttypes>
30 
31 using namespace ::grpc;
32 using PluginContextPtr = std::shared_ptr<PluginContext>;
33 
34 #define CHECK_REQUEST_RESPONSE(context, request, response)         \
35     do {                                                          \
36         CHECK_POINTER_NOTNULL(context, "context ptr invalid!");   \
37         CHECK_POINTER_NOTNULL(request, "request ptr invalid!");    \
38         CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
39     } while (0)
40 
41 #define CHECK_POINTER_NOTNULL(ptr, errorMessage)                              \
42     do {                                                                      \
43         if ((ptr) == nullptr) {                                                 \
44             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
45             return {StatusCode::INTERNAL, errorMessage};                      \
46         }                                                                     \
47     } while (0)
48 
49 #define CHECK_EXPRESSION_TRUE(expr, errorMessage)                            \
50     do {                                                                     \
51         if (!(expr)) {                                                       \
52             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
53             return {StatusCode::INTERNAL, (errorMessage)};                   \
54         }                                                                    \
55     } while (0)
56 
57 namespace {
58 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
59 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
60 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
61 constexpr uint64_t CHECK_HEARTBEAT_INTERVAL = 500;
62 constexpr long BYTE_PER_KB = 1024;
63 const std::string HIPROFILER_PLUGINS_NAME("hiprofiler_plugins");
64 #ifdef  PERFORMANCE_DEBUG
65 constexpr uint64_t S_TO_NS = 1000 * 1000 * 1000;
66 #endif
67 } // namespace
68 
ProfilerService(const PluginServicePtr & pluginService)69 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
70     : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
71 {
72     pluginService_->SetPluginSessionManager(pluginSessionManager_);
73 }
74 
~ProfilerService()75 ProfilerService::~ProfilerService() {}
76 
~SessionContext()77 ProfilerService::SessionContext::~SessionContext()
78 {
79     PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
80     if (offlineScheduleTaskFd != -1) {
81         stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
82     }
83     StopSessionExpireTask(service->removeTask_);
84     service->pluginSessionManager_->RemovePluginSessions(pluginNames);
85 }
86 
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)87 Status ProfilerService::GetCapabilities(ServerContext* context,
88                                         const ::GetCapabilitiesRequest* request,
89                                         ::GetCapabilitiesResponse* response)
90 {
91     CHECK_REQUEST_RESPONSE(context, request, response);
92     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
93 
94     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
95     std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
96 
97     response->set_status(StatusCode::OK);
98     for (size_t i = 0; i < capabilities.size(); i++) {
99         *response->add_capabilities() = capabilities[i];
100     }
101     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
102     return Status::OK;
103 }
104 
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)105 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
106 {
107     std::unordered_map<std::string, size_t> nameIndex;
108     for (size_t i = 0; i < pluginConfigs.size(); i++) {
109         nameIndex[pluginConfigs[i].name()] = i;
110     }
111 
112     size_t updateCount = 0;
113     for (auto& cfg : newPluginConfigs) {
114         auto it = nameIndex.find(cfg.name());
115         if (it != nameIndex.end()) {
116             size_t index = it->second;
117             pluginConfigs[index] = cfg;
118             updateCount++;
119         }
120     }
121     return updateCount;
122 }
123 
CreatePluginSessions()124 bool ProfilerService::SessionContext::CreatePluginSessions()
125 {
126     if (bufferConfigs.size() > 0) { // with buffer configs
127         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater,
128             stateRepeater), false, "create plugin sessions with buffer configs failed!");
129     } else { // without buffer configs
130         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater, stateRepeater),
131                    false, "create plugin sessions without buffer configs failed!");
132     }
133     return true;
134 }
135 
StartPluginSessions()136 bool ProfilerService::SessionContext::StartPluginSessions()
137 {
138     std::unique_lock<std::mutex> lock(sessionMutex);
139 
140     // if dataRepeater exists, reset it to usable state.
141     if (dataRepeater) {
142         dataRepeater->Reset();
143     }
144 
145     if (stateRepeater) {
146         stateRepeater->Reset();
147     }
148 
149     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
150         uint32_t sampleDuration = sessionConfig.sample_duration();
151         if (sampleDuration > 0) {
152             traceFileWriter->SetTimeSource();
153             std::weak_ptr<SessionContext> weakCtx(shared_from_this());
154             // start offline trace timeout task
155             offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
156                 [weakCtx]() {
157                     if (auto ctx = weakCtx.lock(); ctx != nullptr) {
158                         ctx->StopPluginSessions();
159                     }
160                 },
161                 sampleDuration,
162                 true);
163             // keep_alive_time not set by client, but the sample_duration setted
164             if (sessionConfig.keep_alive_time() == 0) {
165                 // use sample_duration add a little time to set keep_alive_time
166                 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
167                 StartSessionExpireTask(service->removeTask_);
168             }
169         }
170     }
171 
172     // start each plugin sessions
173     service->pluginSessionManager_->StartPluginSessions(pluginNames);
174     return true;
175 }
176 
StopPluginSessions()177 bool ProfilerService::SessionContext::StopPluginSessions()
178 {
179     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
180         if (offlineScheduleTaskFd != -1) {
181             stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
182             offlineScheduleTaskFd = -1;
183         } else {
184             return true;
185         }
186         traceFileWriter->SetDurationTime();
187     }
188 
189     // stop each plugin sessions
190     service->pluginSessionManager_->StopPluginSessions(pluginNames);
191     // stop epoll thread receiving shared memory messages
192     service->pluginService_->StopEpollThread();
193 
194     // Read the remaining data of shared memory of all plugins.
195     for (auto& name : pluginNames) {
196         service->pluginService_->FlushAllData(name);
197     }
198     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
199         // update file header.
200         traceFileWriter->Finish();
201     }
202 
203     // make sure FetchData thread exit
204     if (dataRepeater) {
205         dataRepeater->Close();
206     }
207 
208     if (stateRepeater) {
209         stateRepeater->Close();
210     }
211     return true;
212 }
213 
214 namespace {
IsValidKeepAliveTime(uint32_t timeout)215 bool IsValidKeepAliveTime(uint32_t timeout)
216 {
217     if (timeout < MIN_SESSION_TIMEOUT_MS) {
218         return false;
219     }
220     if (timeout > MAX_SESSION_TIMEOUT_MS) {
221         return false;
222     }
223     return true;
224 }
225 }  // namespace
226 
SetKeepAliveTime(uint32_t timeout)227 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
228 {
229     if (timeout > 0) {
230         sessionConfig.set_keep_alive_time(timeout);
231     }
232 }
233 
StartSessionExpireTask(ScheduleTaskManager & task)234 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
235 {
236     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
237         timeoutScheduleTaskFd = task.ScheduleTask(
238             std::bind(&ProfilerService::RemoveSessionContext, service, id),
239             sessionConfig.keep_alive_time(), true);
240     }
241 }
242 
StopSessionExpireTask(ScheduleTaskManager & task)243 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
244 {
245     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
246         task.UnscheduleTask(timeoutScheduleTaskFd);
247         timeoutScheduleTaskFd = -1;
248     }
249 }
250 
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)251 Status ProfilerService::CreateSession(ServerContext* context,
252                                       const ::CreateSessionRequest* request,
253                                       ::CreateSessionResponse* response)
254 {
255     CHECK_REQUEST_RESPONSE(context, request, response);
256     CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
257     // check plugin configs
258     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
259     const int nConfigs = request->plugin_configs_size();
260     CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
261 
262     // check buffer configs
263     std::shared_ptr<ProfilerSessionConfig> sessionConfig =
264     std::make_shared<ProfilerSessionConfig>(request->session_config());
265     const int nBuffers = sessionConfig->buffers_size();
266     CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
267     // copy plugin configs from request
268     std::vector<ProfilerPluginConfig> pluginConfigs;
269     pluginConfigs.reserve(nConfigs);
270 
271     for (int i = 0; i < nConfigs; i++) {
272         pluginConfigs.push_back(request->plugin_configs(i));
273     }
274 
275     if (pluginConfigs.empty()) {
276         PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
277         return Status(StatusCode::PERMISSION_DENIED, "");
278     }
279     // copy buffer configs
280     std::vector<BufferConfig> bufferConfigs;
281     if (nBuffers == 1) {
282         // if only one buffer config provided, all plugin use the same buffer config
283         bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
284     } else if (nBuffers > 0) {
285         // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
286         bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
287     }
288     PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
289     std::vector<std::string> pluginNames;
290     std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
291                    [](ProfilerPluginConfig& config) { return config.name(); });
292     std::sort(pluginNames.begin(), pluginNames.end());
293     //set session configs
294     pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
295 
296     // create TraceFileWriter for offline mode
297     TraceFileWriterPtr traceWriter;
298     std::shared_ptr<ProfilerDataRepeater<ProfilerPluginData>> dataRepeater = nullptr;
299     if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
300         auto resultFile = sessionConfig->result_file();
301         CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
302         traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
303             sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
304         CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
305         pluginService_->SetTraceWriter(traceWriter);
306         for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
307             ProfilerPluginData pluginData;
308             pluginData.set_name(pluginConfigs[i].name() + "_config");
309             pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
310             pluginData.set_data(pluginConfigs[i].config_data());
311             std::vector<char> msgData(pluginData.ByteSizeLong());
312             if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
313                 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
314             }
315             traceWriter->SetPluginConfig(msgData.data(), msgData.size());
316         }
317         traceWriter->Flush();
318     } else {
319         dataRepeater = std::make_shared<ProfilerDataRepeater<ProfilerPluginData>>(DEFAULT_REPEATER_BUFFER_SIZE);
320         CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
321     }
322     ProfilerPluginState* state = response->add_plugin_status();
323     state->set_version(COMMON::STATE_VERSION);
324     // create session context
325     auto ctx = std::make_shared<SessionContext>();
326     CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
327 
328     // fill fields of SessionContext
329     ctx->service = this;
330     if (dataRepeater != nullptr) {
331         ctx->dataRepeater = dataRepeater;
332     }
333     if (stateRepeater_ != nullptr) {
334         ctx->stateRepeater = stateRepeater_;
335     }
336     if (traceWriter != nullptr) {
337         ctx->traceFileWriter = traceWriter;
338     }
339     ctx->sessionConfig = *sessionConfig;
340     ctx->pluginNames = std::move(pluginNames);
341     ctx->pluginConfigs = std::move(pluginConfigs);
342     ctx->bufferConfigs = std::move(bufferConfigs);
343 
344     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
345         if (stateRepeater_ == nullptr) {
346             stateRepeater_ = std::make_shared<ProfilerDataRepeater<ProfilerPluginState>>(DEFAULT_REPEATER_BUFFER_SIZE);
347         }
348         if (stateRepeater_ != nullptr) {
349             ctx->stateRepeater = stateRepeater_;
350         }
351     }
352     // create plugin sessions
353     CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
354     // alloc new session id
355     uint32_t sessionId = ++sessionIdCounter_;
356     ctx->id = sessionId;
357     ctx->name = "session-" + std::to_string(sessionId);
358 
359     // add {sessionId, ctx} to map
360     CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
361 
362     // create session expire schedule task
363     auto keepAliveTime = sessionConfig->keep_alive_time();
364     if (keepAliveTime) {
365         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
366         // create schedule task for session timeout feature
367         ctx->SetKeepAliveTime(keepAliveTime);
368         ctx->StartSessionExpireTask(removeTask_);
369     }
370 
371     // prepare response data fields
372     response->set_status(0);
373     response->set_session_id(sessionId);
374 
375     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
376     return Status::OK;
377 }
378 
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)379 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
380 {
381     std::unique_lock<std::mutex> lock(sessionContextMutex_);
382     CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
383     sessionContext_[sessionId] = sessionCtx;
384     return true;
385 }
386 
CheckClientStatus()387 void ProfilerService::CheckClientStatus()
388 {
389     int pidVal = 0;
390     if (!COMMON::IsProcessExist(HIPROFILER_PLUGINS_NAME, pidVal)) {
391         ProfilerPluginState pluginState;
392         pluginState.set_version(COMMON::STATE_VERSION);
393         pluginState.set_event_detail("hiprofiler_plugins process is dead!");
394         pluginState.set_event(ProfilerPluginState::RUNNING_ERR);
395         CHECK_NOTNULL(stateRepeater_, NO_RETVAL, "cannot push event, stateRepeater is null!");
396         stateRepeater_->PutPluginData(std::make_shared<ProfilerPluginState>(pluginState));
397         if (heartbeatFd_ != -1) {
398             checkStatusManager_.UnscheduleTask(heartbeatFd_);
399         }
400     }
401 }
402 
GetSessionContext(uint32_t sessionId) const403 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
404 {
405     std::unique_lock<std::mutex> lock(sessionContextMutex_);
406     auto it = sessionContext_.find(sessionId);
407     if (it != sessionContext_.end()) {
408         auto ptr = it->second;
409         return ptr;
410     }
411     return nullptr;
412 }
413 
RemoveSessionContext(uint32_t sessionId)414 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
415 {
416     std::unique_lock<std::mutex> lock(sessionContextMutex_);
417     auto it = sessionContext_.find(sessionId);
418     if (it != sessionContext_.end()) {
419         auto ptr = it->second;
420         PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
421         sessionContext_.erase(it);
422         return true;
423     }
424     return false;
425 }
426 
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)427 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
428     const std::string& outputFile, const std::string& pluginVersion)
429 {
430     if (pluginName.empty() || outputFile.empty()) {
431         PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
432                            pluginName.c_str(), outputFile.c_str());
433         return;
434     }
435     auto retFile = COMMON::CheckNotExistsFilePath(outputFile);
436     if (!retFile.first) {
437         PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, outputFile.c_str());
438         return;
439     }
440     std::ifstream fsFile {}; // read from output file
441     fsFile.open(retFile.second, std::ios_base::in | std::ios_base::binary);
442     if (!fsFile.good()) {
443         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
444         return;
445     }
446     auto targetFile = COMMON::CheckNotExistsFilePath(resultFile);
447     if (!targetFile.first) {
448         PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, resultFile.c_str());
449         return;
450     }
451     std::ofstream fsTarget {}; // write to profiler ouput file
452     fsTarget.open(targetFile.second, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
453     if (!fsTarget.good()) {
454         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
455         return;
456     }
457     fsTarget.seekp(0, std::ios_base::end);
458     int posFile = fsTarget.tellp(); // for update sha256
459 
460     TraceFileHeader header {};
461     if (pluginName == "hiperf-plugin") {
462         header.data_.dataType = DataType::HIPERF_DATA;
463     } else {
464         header.data_.dataType = DataType::STANDALONE_DATA;
465     }
466     fsFile.seekg(0, std::ios_base::end);
467     uint64_t fileSize = (uint64_t)(fsFile.tellg());
468     header.data_.length += fileSize;
469     size_t pluginSize = sizeof(header.data_.standalonePluginName);
470     int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
471     if (ret != EOK) {
472         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
473         return;
474     }
475     pluginSize = sizeof(header.data_.pluginVersion);
476     ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
477     if (ret != EOK) {
478         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
479         return;
480     }
481     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
482     if (!fsTarget.good()) {
483         PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
484         return;
485     }
486 
487     SHA256_CTX sha256Ctx;
488     SHA256_Init(&sha256Ctx);
489     constexpr uint64_t bufSize = 4 * 1024 * 1024;
490     std::vector<char> buf(bufSize);
491     uint64_t readSize = 0;
492     fsFile.seekg(0);
493     while ((readSize = std::min(bufSize, fileSize)) > 0) {
494         fsFile.read(buf.data(), readSize);
495         fsTarget.write(buf.data(), readSize);
496         if (!fsTarget.good()) {
497             PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
498             return;
499         }
500         fileSize -= readSize;
501 
502         SHA256_Update(&sha256Ctx, buf.data(), readSize);
503     }
504     SHA256_Final(header.data_.sha256, &sha256Ctx);
505     fsTarget.seekp(posFile, std::ios_base::beg);
506     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
507 
508     fsFile.close();
509     fsTarget.close();
510 
511     PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
512 }
513 
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)514 Status ProfilerService::StartSession(ServerContext* context,
515                                      const ::StartSessionRequest* request,
516                                      ::StartSessionResponse* response)
517 {
518     CHECK_REQUEST_RESPONSE(context, request, response);
519 
520     uint32_t sessionId = request->session_id();
521     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
522 
523     // copy plugin configs from request
524     std::vector<ProfilerPluginConfig> newPluginConfigs;
525     newPluginConfigs.reserve(request->update_configs_size());
526     for (int i = 0; i < request->update_configs_size(); i++) {
527         PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
528         newPluginConfigs.push_back(request->update_configs(i));
529     }
530 
531     // get plugin names in request
532     std::vector<std::string> requestNames;
533     std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
534                    [](auto& config) { return config.name(); });
535     std::sort(requestNames.begin(), requestNames.end());
536 
537     auto ctx = GetSessionContext(sessionId);
538     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
539 
540     // start epoll thread to receive shared memory messages.
541     CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
542 
543     // get intersection set of requestNames and pluginNames
544     std::vector<std::string> updateNames;
545     std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
546                           std::back_inserter(updateNames));
547 
548     if (updateNames.size() > 0) {
549         // remove old plugin sessions
550         pluginSessionManager_->RemovePluginSessions(updateNames);
551 
552         // update plugin configs
553         size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
554 
555         // re-create plugin sessions
556         CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
557         PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
558     }
559 
560     // start plugin sessions with configs
561     CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
562     ProfilerPluginState* state = response->add_plugin_status();
563     state->set_version(COMMON::STATE_VERSION);
564     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
565     if (heartbeatFd_ == -1) {
566         auto callback = [this] { this->CheckClientStatus(); };
567         heartbeatFd_ = checkStatusManager_.ScheduleTask(callback, CHECK_HEARTBEAT_INTERVAL);
568         if (heartbeatFd_ == -1) {
569             PROFILER_LOG_INFO(LOG_CORE, "schedule task failed for heartbeat check");
570         }
571     }
572     return Status::OK;
573 }
574 
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)575 Status ProfilerService::FetchData(ServerContext* context,
576                                   const ::FetchDataRequest* request,
577                                   ServerWriter<::FetchDataResponse>* writer)
578 {
579     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
580     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
581     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
582 
583     CHECK_POINTER_NOTNULL(request, "request invalid!");
584     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
585 
586     uint32_t sessionId = request->session_id();
587     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
588 
589     auto ctx = GetSessionContext(sessionId);
590     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
591 
592     // check each plugin session states
593     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
594                           "session status invalid!");
595 
596     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
597         auto dataRepeater = ctx->dataRepeater;
598         CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
599 
600         while (1) {
601             ctx = GetSessionContext(sessionId);
602             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
603 
604             FetchDataResponse response;
605             response.set_status(StatusCode::OK);
606             response.set_response_id(++responseIdCounter_);
607 
608             std::vector<ProfilerPluginDataPtr> pluginDataVec;
609             int count = dataRepeater->TakePluginData(pluginDataVec);
610             if (count > 0) {
611                 response.set_has_more(true);
612                 for (int i = 0; i < count; i++) {
613                     auto data = response.add_plugin_data();
614                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
615                     CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
616                     *data = *pluginDataVec[i];
617                 }
618             } else {
619                 response.set_has_more(false);
620                 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
621             }
622 
623             bool sendSuccess = writer->Write(response);
624             if (count <= 0 || !sendSuccess) {
625                 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
626                 break;
627             }
628         }
629     }
630 
631     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
632     return Status::OK;
633 }
634 
SubscribeProfilerEvt(ServerContext * context,const::SubscribeProfilerEvtRequest * request,ServerWriter<::SubscribeProfilerEvtResponse> * writer)635 Status ProfilerService::SubscribeProfilerEvt(ServerContext* context,
636                                              const ::SubscribeProfilerEvtRequest* request,
637                                              ServerWriter<::SubscribeProfilerEvtResponse>* writer)
638 {
639     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
640     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
641     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
642 
643     CHECK_POINTER_NOTNULL(request, "request invalid!");
644     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
645 
646     uint32_t sessionId = request->session_id();
647     PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u start", request->request_id(), sessionId);
648 
649     auto ctx = GetSessionContext(sessionId);
650     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
651 
652     // check each plugin session states
653     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
654                           "session status invalid!");
655 
656     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
657         auto stateRepeater = ctx->stateRepeater;
658         CHECK_POINTER_NOTNULL(stateRepeater, "state repeater invalid!");
659 
660         while (1) {
661             ctx = GetSessionContext(sessionId);
662             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
663 
664             SubscribeProfilerEvtResponse response;
665             response.set_status(StatusCode::OK);
666             response.set_response_id(++responseIdCounter_);
667 
668             std::vector<ProfilerPluginStatePtr> pluginStateVec;
669             int count = stateRepeater->TakePluginData(pluginStateVec);
670             if (count > 0) {
671                 response.set_has_more(true);
672                 for (int i = 0; i < count; i++) {
673                     auto data = response.add_plugin_state();
674                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
675                     CHECK_POINTER_NOTNULL(pluginStateVec[i], "plugin state invalid");
676                     *data = *pluginStateVec[i];
677                 }
678             } else {
679                 response.set_has_more(false);
680                 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
681             }
682 
683             bool sendSuccess = writer->Write(response);
684             if (count <= 0 || !sendSuccess) {
685                 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
686                 break;
687             }
688         }
689     }
690 
691     PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u done!", request->request_id(), sessionId);
692     return Status::OK;
693 }
694 
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)695 Status ProfilerService::StopSession(ServerContext* context,
696                                     const ::StopSessionRequest* request,
697                                     ::StopSessionResponse* response)
698 {
699 #ifdef PERFORMANCE_DEBUG
700     struct timespec start = {};
701     clock_gettime(CLOCK_REALTIME, &start);
702 #endif
703     CHECK_REQUEST_RESPONSE(context, request, response);
704     uint32_t sessionId = request->session_id();
705     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
706 
707     auto ctx = GetSessionContext(sessionId);
708     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
709     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
710         CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
711         ctx->traceFileWriter.get()->SetStopSplitFile(true);
712     }
713     CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
714     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
715 #ifdef PERFORMANCE_DEBUG
716     struct timespec end = {};
717     clock_gettime(CLOCK_REALTIME, &end);
718     uint64_t costTime = (end.tv_sec - start.tv_sec) * S_TO_NS + (end.tv_nsec - start.tv_nsec);
719     PROFILER_LOG_INFO(LOG_CORE, "StopSession cost time  %" PRIu64 " ns", costTime);
720 #endif
721     return Status::OK;
722 }
723 
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)724 Status ProfilerService::DestroySession(ServerContext* context,
725                                        const ::DestroySessionRequest* request,
726                                        ::DestroySessionResponse* response)
727 {
728     CHECK_REQUEST_RESPONSE(context, request, response);
729 
730     uint32_t sessionId = request->session_id();
731     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
732 
733     auto ctx = GetSessionContext(sessionId);
734     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
735 
736     CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
737     CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
738                           "remove plugin session FAILED!");
739     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
740 
741     struct stat fileInfo;
742     if (stat(ctx->sessionConfig.result_file().c_str(), &fileInfo) == 0) {
743         long result = static_cast<long>(fileInfo.st_size / BYTE_PER_KB);
744         PROFILER_LOG_INFO(LOG_CORE, "result file %s size is %ld KB", ctx->sessionConfig.result_file().c_str(), result);
745     }
746 
747     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
748         uint32_t pluginId = 0;
749         PluginContextPtr pluginCtx = nullptr;
750         for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
751             auto pluginName = ctx->pluginNames[i];
752             std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
753             if (pluginCtx->isStandaloneFileData == true) {
754                 if (!ctx->sessionConfig.split_file()) {
755                     MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
756                                         pluginCtx->pluginVersion);
757                 }
758             }
759         }
760     }
761 
762     return Status::OK;
763 }
764 
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)765 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
766                                             const ::KeepSessionRequest* request,
767                                             ::KeepSessionResponse* response)
768 {
769     CHECK_REQUEST_RESPONSE(context, request, response);
770     uint32_t sessionId = request->session_id();
771     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
772 
773     auto ctx = GetSessionContext(sessionId);
774     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
775 
776     // update keep alive time if keep_alive_time parameter provided
777     auto keepAliveTime = request->keep_alive_time();
778     if (keepAliveTime) {
779         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
780         ctx->SetKeepAliveTime(keepAliveTime);
781     }
782 
783     // reschedule session timeout task
784     if (ctx->sessionConfig.keep_alive_time() > 0) {
785         ctx->StopSessionExpireTask(removeTask_);
786         ctx->StartSessionExpireTask(removeTask_);
787     }
788     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
789     return Status::OK;
790 }
791 
792 struct LoggingInterceptor : public grpc::experimental::Interceptor {
793 public:
LoggingInterceptorLoggingInterceptor794     explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
795 
InterceptLoggingInterceptor796     void Intercept(experimental::InterceptorBatchMethods* methods) override
797     {
798         const char* method = info_->method();
799         if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
800             PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
801         } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
802             PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
803         }
804         methods->Proceed();
805     }
806 
807 private:
808     grpc::experimental::ServerRpcInfo* info_ = nullptr;
809 };
810 
811 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
812 protected:
CreateServerInterceptorInterceptorFactory813     grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
814     {
815         return new LoggingInterceptor(info);
816     }
817 };
818 
StartService(const std::string & listenUri)819 bool ProfilerService::StartService(const std::string& listenUri)
820 {
821     CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
822 
823     ServerBuilder builder;
824     builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
825     builder.RegisterService(this);
826 
827     auto server = builder.BuildAndStart();
828     CHECK_NOTNULL(server, false, "start service failed!");
829     PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
830     server_ = std::move(server);
831     return true;
832 }
833 
WaitServiceDone()834 void ProfilerService::WaitServiceDone()
835 {
836     if (server_) {
837         PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
838         server_->Wait();
839         PROFILER_LOG_INFO(LOG_CORE, "Server done!");
840     }
841 }
842 
StopService()843 void ProfilerService::StopService()
844 {
845     if (server_) {
846         server_->Shutdown();
847         PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
848     }
849 }