• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "trace_file_writer.h"
28 #include <cinttypes>
29 
30 using namespace ::grpc;
31 using PluginContextPtr = std::shared_ptr<PluginContext>;
32 
33 #define CHECK_REQUEST_RESPONSE(context, request, response)         \
34     do {                                                          \
35         CHECK_POINTER_NOTNULL(context, "context ptr invalid!");   \
36         CHECK_POINTER_NOTNULL(request, "request ptr invalid!");    \
37         CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
38     } while (0)
39 
40 #define CHECK_POINTER_NOTNULL(ptr, errorMessage)                              \
41     do {                                                                      \
42         if ((ptr) == nullptr) {                                                 \
43             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
44             return {StatusCode::INTERNAL, errorMessage};                      \
45         }                                                                     \
46     } while (0)
47 
48 #define CHECK_EXPRESSION_TRUE(expr, errorMessage)                            \
49     do {                                                                     \
50         if (!(expr)) {                                                       \
51             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
52             return {StatusCode::INTERNAL, (errorMessage)};                   \
53         }                                                                    \
54     } while (0)
55 
56 namespace {
57 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
58 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
59 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
60 #ifdef  PERFORMANCE_DEBUG
61 constexpr uint64_t S_TO_NS = 1000 * 1000 * 1000;
62 #endif
63 } // namespace
64 
ProfilerService(const PluginServicePtr & pluginService)65 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
66     : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
67 {
68     pluginService_->SetPluginSessionManager(pluginSessionManager_);
69 }
70 
~ProfilerService()71 ProfilerService::~ProfilerService() {}
72 
~SessionContext()73 ProfilerService::SessionContext::~SessionContext()
74 {
75     PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
76     if (offlineScheduleTaskFd != -1) {
77         stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
78     }
79     StopSessionExpireTask(service->removeTask_);
80     service->pluginSessionManager_->RemovePluginSessions(pluginNames);
81 }
82 
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)83 Status ProfilerService::GetCapabilities(ServerContext* context,
84                                         const ::GetCapabilitiesRequest* request,
85                                         ::GetCapabilitiesResponse* response)
86 {
87     CHECK_REQUEST_RESPONSE(context, request, response);
88     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
89 
90     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
91     std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
92 
93     response->set_status(StatusCode::OK);
94     for (size_t i = 0; i < capabilities.size(); i++) {
95         *response->add_capabilities() = capabilities[i];
96     }
97     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
98     return Status::OK;
99 }
100 
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)101 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
102 {
103     std::unordered_map<std::string, size_t> nameIndex;
104     for (size_t i = 0; i < pluginConfigs.size(); i++) {
105         nameIndex[pluginConfigs[i].name()] = i;
106     }
107 
108     size_t updateCount = 0;
109     for (auto& cfg : newPluginConfigs) {
110         auto it = nameIndex.find(cfg.name());
111         if (it != nameIndex.end()) {
112             int index = it->second;
113             pluginConfigs[index] = cfg;
114             updateCount++;
115         }
116     }
117     return updateCount;
118 }
119 
CreatePluginSessions()120 bool ProfilerService::SessionContext::CreatePluginSessions()
121 {
122     if (bufferConfigs.size() > 0) { // with buffer configs
123         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
124                    false, "create plugin sessions with buffer configs failed!");
125     } else { // without buffer configs
126         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
127                    "create plugin sessions without buffer configs failed!");
128     }
129     return true;
130 }
131 
StartPluginSessions()132 bool ProfilerService::SessionContext::StartPluginSessions()
133 {
134     std::unique_lock<std::mutex> lock(sessionMutex);
135 
136     // if dataRepeater exists, reset it to usable state.
137     if (dataRepeater) {
138         dataRepeater->Reset();
139     }
140 
141     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
142         uint32_t sampleDuration = sessionConfig.sample_duration();
143         if (sampleDuration > 0) {
144             traceFileWriter->SetTimeSource();
145             std::weak_ptr<SessionContext> weakCtx(shared_from_this());
146             // start offline trace timeout task
147             offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
148                 [weakCtx]() {
149                     if (auto ctx = weakCtx.lock(); ctx != nullptr) {
150                         ctx->StopPluginSessions();
151                     }
152                 },
153                 sampleDuration,
154                 true);
155             // keep_alive_time not set by client, but the sample_duration setted
156             if (sessionConfig.keep_alive_time() == 0) {
157                 // use sample_duration add a little time to set keep_alive_time
158                 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
159                 StartSessionExpireTask(service->removeTask_);
160             }
161         }
162     }
163 
164     // start each plugin sessions
165     service->pluginSessionManager_->StartPluginSessions(pluginNames);
166     return true;
167 }
168 
StopPluginSessions()169 bool ProfilerService::SessionContext::StopPluginSessions()
170 {
171     std::unique_lock<std::mutex> lock(sessionMutex);
172     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
173         if (offlineScheduleTaskFd != -1) {
174             stopExpireTask.UnscheduleTaskLockless(offlineScheduleTaskFd);
175             offlineScheduleTaskFd = -1;
176         } else {
177             return true;
178         }
179         traceFileWriter->SetDurationTime();
180     }
181 
182     // stop each plugin sessions
183     service->pluginSessionManager_->StopPluginSessions(pluginNames);
184     // stop epoll thread receiving shared memory messages
185     service->pluginService_->StopEpollThread();
186 
187     // Read the remaining data of shared memory of all plugins.
188     for (auto& name : pluginNames) {
189         service->pluginService_->FlushAllData(name);
190     }
191     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
192         // update file header.
193         traceFileWriter->Finish();
194     }
195 
196     // make sure FetchData thread exit
197     if (dataRepeater) {
198         dataRepeater->Close();
199     }
200     return true;
201 }
202 
203 namespace {
IsValidKeepAliveTime(uint32_t timeout)204 bool IsValidKeepAliveTime(uint32_t timeout)
205 {
206     if (timeout < MIN_SESSION_TIMEOUT_MS) {
207         return false;
208     }
209     if (timeout > MAX_SESSION_TIMEOUT_MS) {
210         return false;
211     }
212     return true;
213 }
214 }  // namespace
215 
SetKeepAliveTime(uint32_t timeout)216 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
217 {
218     if (timeout > 0) {
219         sessionConfig.set_keep_alive_time(timeout);
220     }
221 }
222 
StartSessionExpireTask(ScheduleTaskManager & task)223 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
224 {
225     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
226         timeoutScheduleTaskFd = task.ScheduleTask(
227             std::bind(&ProfilerService::RemoveSessionContext, service, id),
228             sessionConfig.keep_alive_time(), true);
229     }
230 }
231 
StopSessionExpireTask(ScheduleTaskManager & task)232 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
233 {
234     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
235         task.UnscheduleTaskLockless(timeoutScheduleTaskFd);
236         timeoutScheduleTaskFd = -1;
237     }
238 }
239 
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)240 Status ProfilerService::CreateSession(ServerContext* context,
241                                       const ::CreateSessionRequest* request,
242                                       ::CreateSessionResponse* response)
243 {
244     CHECK_REQUEST_RESPONSE(context, request, response);
245     CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
246     // check plugin configs
247     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
248     const int nConfigs = request->plugin_configs_size();
249     CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
250 
251     // check buffer configs
252     std::shared_ptr<ProfilerSessionConfig> sessionConfig =
253     std::make_shared<ProfilerSessionConfig>(request->session_config());
254     const int nBuffers = sessionConfig->buffers_size();
255     CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
256     // copy plugin configs from request
257     std::vector<ProfilerPluginConfig> pluginConfigs;
258     pluginConfigs.reserve(nConfigs);
259 
260     for (int i = 0; i < nConfigs; i++) {
261         pluginConfigs.push_back(request->plugin_configs(i));
262     }
263 
264     if (pluginConfigs.empty()) {
265         PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
266         return Status(StatusCode::PERMISSION_DENIED, "");
267     }
268     // copy buffer configs
269     std::vector<BufferConfig> bufferConfigs;
270     if (nBuffers == 1) {
271         // if only one buffer config provided, all plugin use the same buffer config
272         bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
273     } else if (nBuffers > 0) {
274         // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
275         bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
276     }
277     PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
278     std::vector<std::string> pluginNames;
279     std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
280                    [](ProfilerPluginConfig& config) { return config.name(); });
281     std::sort(pluginNames.begin(), pluginNames.end());
282     //set session configs
283     pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
284 
285     // create TraceFileWriter for offline mode
286     TraceFileWriterPtr traceWriter;
287     std::shared_ptr<ProfilerDataRepeater> dataRepeater = nullptr;
288     if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
289         auto resultFile = sessionConfig->result_file();
290         CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
291         traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
292             sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
293         CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
294         pluginService_->SetTraceWriter(traceWriter);
295         for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
296             ProfilerPluginData pluginData;
297             pluginData.set_name(pluginConfigs[i].name() + "_config");
298             pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
299             pluginData.set_data(pluginConfigs[i].config_data());
300             std::vector<char> msgData(pluginData.ByteSizeLong());
301             if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
302                 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
303             }
304             traceWriter->SetPluginConfig(msgData.data(), msgData.size());
305         }
306         traceWriter->Flush();
307     } else {
308         dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
309         CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
310     }
311 
312     // create session context
313     auto ctx = std::make_shared<SessionContext>();
314     CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
315 
316     // fill fields of SessionContext
317     ctx->service = this;
318     if (dataRepeater != nullptr) {
319         ctx->dataRepeater = dataRepeater;
320     }
321     if (traceWriter != nullptr) {
322         ctx->traceFileWriter = traceWriter;
323     }
324     ctx->sessionConfig = *sessionConfig;
325     ctx->pluginNames = std::move(pluginNames);
326     ctx->pluginConfigs = std::move(pluginConfigs);
327     ctx->bufferConfigs = std::move(bufferConfigs);
328 
329     // create plugin sessions
330     CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
331     // alloc new session id
332     uint32_t sessionId = ++sessionIdCounter_;
333     ctx->id = sessionId;
334     ctx->name = "session-" + std::to_string(sessionId);
335 
336     // add {sessionId, ctx} to map
337     CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
338 
339     // create session expire schedule task
340     auto keepAliveTime = sessionConfig->keep_alive_time();
341     if (keepAliveTime) {
342         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
343         // create schedule task for session timeout feature
344         ctx->SetKeepAliveTime(keepAliveTime);
345         ctx->StartSessionExpireTask(removeTask_);
346     }
347 
348     // prepare response data fields
349     response->set_status(0);
350     response->set_session_id(sessionId);
351 
352     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
353     return Status::OK;
354 }
355 
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)356 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
357 {
358     std::unique_lock<std::mutex> lock(sessionContextMutex_);
359     CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
360     sessionContext_[sessionId] = sessionCtx;
361     return true;
362 }
363 
GetSessionContext(uint32_t sessionId) const364 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
365 {
366     std::unique_lock<std::mutex> lock(sessionContextMutex_);
367     auto it = sessionContext_.find(sessionId);
368     if (it != sessionContext_.end()) {
369         auto ptr = it->second;
370         return ptr;
371     }
372     return nullptr;
373 }
374 
RemoveSessionContext(uint32_t sessionId)375 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
376 {
377     std::unique_lock<std::mutex> lock(sessionContextMutex_);
378     auto it = sessionContext_.find(sessionId);
379     if (it != sessionContext_.end()) {
380         auto ptr = it->second;
381         PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
382         sessionContext_.erase(it);
383         return true;
384     }
385     return false;
386 }
387 
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)388 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
389     const std::string& outputFile, const std::string& pluginVersion)
390 {
391     if (pluginName.empty() || outputFile.empty()) {
392         PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
393                            pluginName.c_str(), outputFile.c_str());
394         return;
395     }
396 
397     std::ifstream fsFile {}; // read from output file
398     fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
399     if (!fsFile.good()) {
400         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
401         return;
402     }
403 
404     std::ofstream fsTarget {}; // write to profiler ouput file
405     fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
406     if (!fsTarget.good()) {
407         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
408         return;
409     }
410     fsTarget.seekp(0, std::ios_base::end);
411     int posFile = fsTarget.tellp(); // for update sha256
412 
413     TraceFileHeader header {};
414     if (pluginName == "hiperf-plugin") {
415         header.data_.dataType = DataType::HIPERF_DATA;
416     } else {
417         header.data_.dataType = DataType::STANDALONE_DATA;
418     }
419     fsFile.seekg(0, std::ios_base::end);
420     uint64_t fileSize = (uint64_t)(fsFile.tellg());
421     header.data_.length += fileSize;
422     size_t pluginSize = sizeof(header.data_.standalonePluginName);
423     int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
424     if (ret != EOK) {
425         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
426         return;
427     }
428     pluginSize = sizeof(header.data_.pluginVersion);
429     ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
430     if (ret != EOK) {
431         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
432         return;
433     }
434     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
435     if (!fsTarget.good()) {
436         PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
437         return;
438     }
439 
440     SHA256_CTX sha256Ctx;
441     SHA256_Init(&sha256Ctx);
442     constexpr uint64_t bufSize = 4 * 1024 * 1024;
443     std::vector<char> buf(bufSize);
444     uint64_t readSize = 0;
445     fsFile.seekg(0);
446     while ((readSize = std::min(bufSize, fileSize)) > 0) {
447         fsFile.read(buf.data(), readSize);
448         fsTarget.write(buf.data(), readSize);
449         if (!fsTarget.good()) {
450             PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
451             return;
452         }
453         fileSize -= readSize;
454 
455         SHA256_Update(&sha256Ctx, buf.data(), readSize);
456     }
457     SHA256_Final(header.data_.sha256, &sha256Ctx);
458     fsTarget.seekp(posFile, std::ios_base::beg);
459     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
460 
461     fsFile.close();
462     fsTarget.close();
463 
464     PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
465 }
466 
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)467 Status ProfilerService::StartSession(ServerContext* context,
468                                      const ::StartSessionRequest* request,
469                                      ::StartSessionResponse* response)
470 {
471     CHECK_REQUEST_RESPONSE(context, request, response);
472 
473     uint32_t sessionId = request->session_id();
474     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
475 
476     // copy plugin configs from request
477     std::vector<ProfilerPluginConfig> newPluginConfigs;
478     newPluginConfigs.reserve(request->update_configs_size());
479     for (int i = 0; i < request->update_configs_size(); i++) {
480         PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
481         newPluginConfigs.push_back(request->update_configs(i));
482     }
483 
484     // get plugin names in request
485     std::vector<std::string> requestNames;
486     std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
487                    [](auto& config) { return config.name(); });
488     std::sort(requestNames.begin(), requestNames.end());
489 
490     auto ctx = GetSessionContext(sessionId);
491     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
492 
493     // start epoll thread to receive shared memory messages.
494     CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
495 
496     // get intersection set of requestNames and pluginNames
497     std::vector<std::string> updateNames;
498     std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
499                           std::back_inserter(updateNames));
500 
501     if (updateNames.size() > 0) {
502         // remove old plugin sessions
503         pluginSessionManager_->RemovePluginSessions(updateNames);
504 
505         // update plugin configs
506         size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
507 
508         // re-create plugin sessions
509         CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
510         PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
511     }
512 
513     // start plugin sessions with configs
514     CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
515     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
516     return Status::OK;
517 }
518 
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)519 Status ProfilerService::FetchData(ServerContext* context,
520                                   const ::FetchDataRequest* request,
521                                   ServerWriter<::FetchDataResponse>* writer)
522 {
523     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
524     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
525     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
526 
527     CHECK_POINTER_NOTNULL(request, "request invalid!");
528     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
529 
530     uint32_t sessionId = request->session_id();
531     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
532 
533     auto ctx = GetSessionContext(sessionId);
534     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
535 
536     // check each plugin session states
537     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
538                           "session status invalid!");
539 
540     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
541         auto dataRepeater = ctx->dataRepeater;
542         CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
543 
544         while (1) {
545             ctx = GetSessionContext(sessionId);
546             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
547 
548             FetchDataResponse response;
549             response.set_status(StatusCode::OK);
550             response.set_response_id(++responseIdCounter_);
551 
552             std::vector<ProfilerPluginDataPtr> pluginDataVec;
553             int count = dataRepeater->TakePluginData(pluginDataVec);
554             if (count > 0) {
555                 response.set_has_more(true);
556                 for (int i = 0; i < count; i++) {
557                     auto data = response.add_plugin_data();
558                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
559                     CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
560                     *data = *pluginDataVec[i];
561                 }
562             } else {
563                 response.set_has_more(false);
564                 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
565             }
566 
567             bool sendSuccess = writer->Write(response);
568             if (count <= 0 || !sendSuccess) {
569                 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
570                 break;
571             }
572         }
573     }
574 
575     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
576     return Status::OK;
577 }
578 
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)579 Status ProfilerService::StopSession(ServerContext* context,
580                                     const ::StopSessionRequest* request,
581                                     ::StopSessionResponse* response)
582 {
583 #ifdef PERFORMANCE_DEBUG
584     struct timespec start = {};
585     clock_gettime(CLOCK_REALTIME, &start);
586 #endif
587     CHECK_REQUEST_RESPONSE(context, request, response);
588     uint32_t sessionId = request->session_id();
589     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
590 
591     auto ctx = GetSessionContext(sessionId);
592     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
593     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
594         CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
595         ctx->traceFileWriter.get()->SetStopSplitFile(true);
596     }
597     CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
598     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
599 #ifdef PERFORMANCE_DEBUG
600     struct timespec end = {};
601     clock_gettime(CLOCK_REALTIME, &end);
602     uint64_t costTime = (end.tv_sec - start.tv_sec) * S_TO_NS + (end.tv_nsec - start.tv_nsec);
603     PROFILER_LOG_INFO(LOG_CORE, "StopSession cost time  %" PRIu64 " ns", costTime);
604 #endif
605     return Status::OK;
606 }
607 
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)608 Status ProfilerService::DestroySession(ServerContext* context,
609                                        const ::DestroySessionRequest* request,
610                                        ::DestroySessionResponse* response)
611 {
612     CHECK_REQUEST_RESPONSE(context, request, response);
613 
614     uint32_t sessionId = request->session_id();
615     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
616 
617     auto ctx = GetSessionContext(sessionId);
618     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
619 
620     CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
621     CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
622                           "remove plugin session FAILED!");
623     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
624 
625     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
626         uint32_t pluginId = 0;
627         PluginContextPtr pluginCtx = nullptr;
628         for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
629             auto pluginName = ctx->pluginNames[i];
630             std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
631             if (pluginCtx->isStandaloneFileData == true) {
632                 if (!ctx->sessionConfig.split_file()) {
633                     MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
634                                         pluginCtx->pluginVersion);
635                 }
636             }
637         }
638     }
639 
640     return Status::OK;
641 }
642 
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)643 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
644                                             const ::KeepSessionRequest* request,
645                                             ::KeepSessionResponse* response)
646 {
647     CHECK_REQUEST_RESPONSE(context, request, response);
648     uint32_t sessionId = request->session_id();
649     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
650 
651     auto ctx = GetSessionContext(sessionId);
652     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
653 
654     // update keep alive time if keep_alive_time parameter provided
655     auto keepAliveTime = request->keep_alive_time();
656     if (keepAliveTime) {
657         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
658         ctx->SetKeepAliveTime(keepAliveTime);
659     }
660 
661     // reschedule session timeout task
662     if (ctx->sessionConfig.keep_alive_time() > 0) {
663         ctx->StopSessionExpireTask(removeTask_);
664         ctx->StartSessionExpireTask(removeTask_);
665     }
666     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
667     return Status::OK;
668 }
669 
670 struct LoggingInterceptor : public grpc::experimental::Interceptor {
671 public:
LoggingInterceptorLoggingInterceptor672     explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
673 
InterceptLoggingInterceptor674     void Intercept(experimental::InterceptorBatchMethods* methods) override
675     {
676         const char* method = info_->method();
677         if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
678             PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
679         } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
680             PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
681         }
682         methods->Proceed();
683     }
684 
685 private:
686     grpc::experimental::ServerRpcInfo* info_ = nullptr;
687 };
688 
689 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
690 protected:
CreateServerInterceptorInterceptorFactory691     grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
692     {
693         return new LoggingInterceptor(info);
694     }
695 };
696 
StartService(const std::string & listenUri)697 bool ProfilerService::StartService(const std::string& listenUri)
698 {
699     CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
700 
701     ServerBuilder builder;
702     builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
703     builder.RegisterService(this);
704 
705     auto server = builder.BuildAndStart();
706     CHECK_NOTNULL(server, false, "start service failed!");
707     PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
708     server_ = std::move(server);
709     return true;
710 }
711 
WaitServiceDone()712 void ProfilerService::WaitServiceDone()
713 {
714     if (server_) {
715         PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
716         server_->Wait();
717         PROFILER_LOG_INFO(LOG_CORE, "Server done!");
718     }
719 }
720 
StopService()721 void ProfilerService::StopService()
722 {
723     if (server_) {
724         server_->Shutdown();
725         PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
726     }
727 }