• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "trace_file_writer.h"
28 
29 using namespace ::grpc;
30 using PluginContextPtr = std::shared_ptr<PluginContext>;
31 
32 #define CHECK_REQUEST_RESPONSE(context, request, response)         \
33     do {                                                          \
34         CHECK_POINTER_NOTNULL(context, "context ptr invalid!");   \
35         CHECK_POINTER_NOTNULL(request, "request ptr invalid!");    \
36         CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
37     } while (0)
38 
39 #define CHECK_POINTER_NOTNULL(ptr, errorMessage)                              \
40     do {                                                                      \
41         if ((ptr) == nullptr) {                                                 \
42             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
43             return {StatusCode::INTERNAL, errorMessage};                      \
44         }                                                                     \
45     } while (0)
46 
47 #define CHECK_EXPRESSION_TRUE(expr, errorMessage)                            \
48     do {                                                                     \
49         if (!(expr)) {                                                       \
50             PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
51             return {StatusCode::INTERNAL, (errorMessage)};                   \
52         }                                                                    \
53     } while (0)
54 
55 namespace {
56 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
57 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
58 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
59 } // namespace
60 
ProfilerService(const PluginServicePtr & pluginService)61 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
62     : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
63 {
64     pluginService_->SetPluginSessionManager(pluginSessionManager_);
65 }
66 
~ProfilerService()67 ProfilerService::~ProfilerService() {}
68 
~SessionContext()69 ProfilerService::SessionContext::~SessionContext()
70 {
71     PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
72     if (offlineScheduleTaskFd != -1) {
73         stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
74     }
75     StopSessionExpireTask(service->removeTask_);
76     service->pluginSessionManager_->RemovePluginSessions(pluginNames);
77 }
78 
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)79 Status ProfilerService::GetCapabilities(ServerContext* context,
80                                         const ::GetCapabilitiesRequest* request,
81                                         ::GetCapabilitiesResponse* response)
82 {
83     CHECK_REQUEST_RESPONSE(context, request, response);
84     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
85 
86     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
87     std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
88 
89     response->set_status(StatusCode::OK);
90     for (size_t i = 0; i < capabilities.size(); i++) {
91         *response->add_capabilities() = capabilities[i];
92     }
93     PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
94     return Status::OK;
95 }
96 
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)97 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
98 {
99     std::unordered_map<std::string, size_t> nameIndex;
100     for (size_t i = 0; i < pluginConfigs.size(); i++) {
101         nameIndex[pluginConfigs[i].name()] = i;
102     }
103 
104     size_t updateCount = 0;
105     for (auto& cfg : newPluginConfigs) {
106         auto it = nameIndex.find(cfg.name());
107         if (it != nameIndex.end()) {
108             int index = it->second;
109             pluginConfigs[index] = cfg;
110             updateCount++;
111         }
112     }
113     return updateCount;
114 }
115 
CreatePluginSessions()116 bool ProfilerService::SessionContext::CreatePluginSessions()
117 {
118     if (bufferConfigs.size() > 0) { // with buffer configs
119         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
120                    false, "create plugin sessions with buffer configs failed!");
121     } else { // without buffer configs
122         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
123                    "create plugin sessions without buffer configs failed!");
124     }
125     return true;
126 }
127 
StartPluginSessions()128 bool ProfilerService::SessionContext::StartPluginSessions()
129 {
130     std::unique_lock<std::mutex> lock(sessionMutex);
131 
132     // if dataRepeater exists, reset it to usable state.
133     if (dataRepeater) {
134         dataRepeater->Reset();
135     }
136 
137     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
138         uint32_t sampleDuration = sessionConfig.sample_duration();
139         if (sampleDuration > 0) {
140             traceFileWriter->SetTimeSource();
141             std::weak_ptr<SessionContext> weakCtx(shared_from_this());
142             // start offline trace timeout task
143             offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
144                 [weakCtx]() {
145                     if (auto ctx = weakCtx.lock(); ctx != nullptr) {
146                         ctx->StopPluginSessions();
147                     }
148                 },
149                 sampleDuration,
150                 true);
151             // keep_alive_time not set by client, but the sample_duration setted
152             if (sessionConfig.keep_alive_time() == 0) {
153                 // use sample_duration add a little time to set keep_alive_time
154                 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
155                 StartSessionExpireTask(service->removeTask_);
156             }
157         }
158     }
159 
160     // start each plugin sessions
161     service->pluginSessionManager_->StartPluginSessions(pluginNames);
162     return true;
163 }
164 
StopPluginSessions()165 bool ProfilerService::SessionContext::StopPluginSessions()
166 {
167     std::unique_lock<std::mutex> lock(sessionMutex);
168     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
169         if (offlineScheduleTaskFd != -1) {
170             stopExpireTask.UnscheduleTaskLockless(offlineScheduleTaskFd);
171             offlineScheduleTaskFd = -1;
172         } else {
173             return true;
174         }
175         traceFileWriter->SetDurationTime();
176     }
177 
178     // stop each plugin sessions
179     service->pluginSessionManager_->StopPluginSessions(pluginNames);
180     // stop epoll thread receiving shared memory messages
181     service->pluginService_->StopEpollThread();
182 
183     // Read the remaining data of shared memory of all plugins.
184     for (auto& name : pluginNames) {
185         service->pluginService_->FlushAllData(name);
186     }
187     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
188         // update file header.
189         traceFileWriter->Finish();
190     }
191 
192     // make sure FetchData thread exit
193     if (dataRepeater) {
194         dataRepeater->Close();
195     }
196     return true;
197 }
198 
199 namespace {
IsValidKeepAliveTime(uint32_t timeout)200 bool IsValidKeepAliveTime(uint32_t timeout)
201 {
202     if (timeout < MIN_SESSION_TIMEOUT_MS) {
203         return false;
204     }
205     if (timeout > MAX_SESSION_TIMEOUT_MS) {
206         return false;
207     }
208     return true;
209 }
210 }  // namespace
211 
SetKeepAliveTime(uint32_t timeout)212 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
213 {
214     if (timeout > 0) {
215         sessionConfig.set_keep_alive_time(timeout);
216     }
217 }
218 
StartSessionExpireTask(ScheduleTaskManager & task)219 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
220 {
221     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
222         timeoutScheduleTaskFd = task.ScheduleTask(
223             std::bind(&ProfilerService::RemoveSessionContext, service, id),
224             sessionConfig.keep_alive_time(), true);
225     }
226 }
227 
StopSessionExpireTask(ScheduleTaskManager & task)228 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
229 {
230     if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
231         task.UnscheduleTaskLockless(timeoutScheduleTaskFd);
232         timeoutScheduleTaskFd = -1;
233     }
234 }
235 
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)236 Status ProfilerService::CreateSession(ServerContext* context,
237                                       const ::CreateSessionRequest* request,
238                                       ::CreateSessionResponse* response)
239 {
240     CHECK_REQUEST_RESPONSE(context, request, response);
241     CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
242     // check plugin configs
243     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
244     const int nConfigs = request->plugin_configs_size();
245     CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
246 
247     // check buffer configs
248     std::shared_ptr<ProfilerSessionConfig> sessionConfig =
249     std::make_shared<ProfilerSessionConfig>(request->session_config());
250     const int nBuffers = sessionConfig->buffers_size();
251     CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
252     // copy plugin configs from request
253     std::vector<ProfilerPluginConfig> pluginConfigs;
254     pluginConfigs.reserve(nConfigs);
255 
256     for (int i = 0; i < nConfigs; i++) {
257         pluginConfigs.push_back(request->plugin_configs(i));
258     }
259 
260     if (pluginConfigs.empty()) {
261         PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
262         return Status(StatusCode::PERMISSION_DENIED, "");
263     }
264     // copy buffer configs
265     std::vector<BufferConfig> bufferConfigs;
266     if (nBuffers == 1) {
267         // if only one buffer config provided, all plugin use the same buffer config
268         bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
269     } else if (nBuffers > 0) {
270         // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
271         bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
272     }
273     PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
274     std::vector<std::string> pluginNames;
275     std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
276                    [](ProfilerPluginConfig& config) { return config.name(); });
277     std::sort(pluginNames.begin(), pluginNames.end());
278     //set session configs
279     pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
280 
281     // create TraceFileWriter for offline mode
282     TraceFileWriterPtr traceWriter;
283     std::shared_ptr<ProfilerDataRepeater> dataRepeater = nullptr;
284     if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
285         auto resultFile = sessionConfig->result_file();
286         CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
287         traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
288             sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
289         CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
290         pluginService_->SetTraceWriter(traceWriter);
291         for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
292             ProfilerPluginData pluginData;
293             pluginData.set_name(pluginConfigs[i].name() + "_config");
294             pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
295             pluginData.set_data(pluginConfigs[i].config_data());
296             std::vector<char> msgData(pluginData.ByteSizeLong());
297             if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
298                 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
299             }
300             traceWriter->SetPluginConfig(msgData.data(), msgData.size());
301         }
302         traceWriter->Flush();
303     } else {
304         dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
305         CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
306     }
307 
308     // create session context
309     auto ctx = std::make_shared<SessionContext>();
310     CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
311 
312     // fill fields of SessionContext
313     ctx->service = this;
314     if (dataRepeater != nullptr) {
315         ctx->dataRepeater = dataRepeater;
316     }
317     if (traceWriter != nullptr) {
318         ctx->traceFileWriter = traceWriter;
319     }
320     ctx->sessionConfig = *sessionConfig;
321     ctx->pluginNames = std::move(pluginNames);
322     ctx->pluginConfigs = std::move(pluginConfigs);
323     ctx->bufferConfigs = std::move(bufferConfigs);
324 
325     // create plugin sessions
326     CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
327     // alloc new session id
328     uint32_t sessionId = ++sessionIdCounter_;
329     ctx->id = sessionId;
330     ctx->name = "session-" + std::to_string(sessionId);
331 
332     // add {sessionId, ctx} to map
333     CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
334 
335     // create session expire schedule task
336     auto keepAliveTime = sessionConfig->keep_alive_time();
337     if (keepAliveTime) {
338         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
339         // create schedule task for session timeout feature
340         ctx->SetKeepAliveTime(keepAliveTime);
341         ctx->StartSessionExpireTask(removeTask_);
342     }
343 
344     // prepare response data fields
345     response->set_status(0);
346     response->set_session_id(sessionId);
347 
348     PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
349     return Status::OK;
350 }
351 
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)352 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
353 {
354     std::unique_lock<std::mutex> lock(sessionContextMutex_);
355     CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
356     sessionContext_[sessionId] = sessionCtx;
357     return true;
358 }
359 
GetSessionContext(uint32_t sessionId) const360 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
361 {
362     std::unique_lock<std::mutex> lock(sessionContextMutex_);
363     auto it = sessionContext_.find(sessionId);
364     if (it != sessionContext_.end()) {
365         auto ptr = it->second;
366         return ptr;
367     }
368     return nullptr;
369 }
370 
RemoveSessionContext(uint32_t sessionId)371 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
372 {
373     std::unique_lock<std::mutex> lock(sessionContextMutex_);
374     auto it = sessionContext_.find(sessionId);
375     if (it != sessionContext_.end()) {
376         auto ptr = it->second;
377         PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
378         sessionContext_.erase(it);
379         return true;
380     }
381     return false;
382 }
383 
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)384 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
385     const std::string& outputFile, const std::string& pluginVersion)
386 {
387     if (pluginName.empty() || outputFile.empty()) {
388         PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
389                            pluginName.c_str(), outputFile.c_str());
390         return;
391     }
392 
393     std::ifstream fsFile {}; // read from output file
394     fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
395     if (!fsFile.good()) {
396         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
397         return;
398     }
399 
400     std::ofstream fsTarget {}; // write to profiler ouput file
401     fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
402     if (!fsTarget.good()) {
403         PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
404         return;
405     }
406     fsTarget.seekp(0, std::ios_base::end);
407     int posFile = fsTarget.tellp(); // for update sha256
408 
409     TraceFileHeader header {};
410     if (pluginName == "hiperf-plugin") {
411         header.data_.dataType = DataType::HIPERF_DATA;
412     } else {
413         header.data_.dataType = DataType::STANDALONE_DATA;
414     }
415     fsFile.seekg(0, std::ios_base::end);
416     uint64_t fileSize = (uint64_t)(fsFile.tellg());
417     header.data_.length += fileSize;
418     size_t pluginSize = sizeof(header.data_.standalonePluginName);
419     int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
420     if (ret != EOK) {
421         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
422         return;
423     }
424     pluginSize = sizeof(header.data_.pluginVersion);
425     ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
426     if (ret != EOK) {
427         PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
428         return;
429     }
430     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
431     if (!fsTarget.good()) {
432         PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
433         return;
434     }
435 
436     SHA256_CTX sha256Ctx;
437     SHA256_Init(&sha256Ctx);
438     constexpr uint64_t bufSize = 4 * 1024 * 1024;
439     std::vector<char> buf(bufSize);
440     uint64_t readSize = 0;
441     fsFile.seekg(0);
442     while ((readSize = std::min(bufSize, fileSize)) > 0) {
443         fsFile.read(buf.data(), readSize);
444         fsTarget.write(buf.data(), readSize);
445         if (!fsTarget.good()) {
446             PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
447             return;
448         }
449         fileSize -= readSize;
450 
451         SHA256_Update(&sha256Ctx, buf.data(), readSize);
452     }
453     SHA256_Final(header.data_.sha256, &sha256Ctx);
454     fsTarget.seekp(posFile, std::ios_base::beg);
455     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
456 
457     fsFile.close();
458     fsTarget.close();
459 
460     PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
461 }
462 
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)463 Status ProfilerService::StartSession(ServerContext* context,
464                                      const ::StartSessionRequest* request,
465                                      ::StartSessionResponse* response)
466 {
467     CHECK_REQUEST_RESPONSE(context, request, response);
468 
469     uint32_t sessionId = request->session_id();
470     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
471 
472     // copy plugin configs from request
473     std::vector<ProfilerPluginConfig> newPluginConfigs;
474     newPluginConfigs.reserve(request->update_configs_size());
475     for (int i = 0; i < request->update_configs_size(); i++) {
476         PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
477         newPluginConfigs.push_back(request->update_configs(i));
478     }
479 
480     // get plugin names in request
481     std::vector<std::string> requestNames;
482     std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
483                    [](auto& config) { return config.name(); });
484     std::sort(requestNames.begin(), requestNames.end());
485 
486     auto ctx = GetSessionContext(sessionId);
487     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
488 
489     // start epoll thread to receive shared memory messages.
490     CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
491 
492     // get intersection set of requestNames and pluginNames
493     std::vector<std::string> updateNames;
494     std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
495                           std::back_inserter(updateNames));
496 
497     if (updateNames.size() > 0) {
498         // remove old plugin sessions
499         pluginSessionManager_->RemovePluginSessions(updateNames);
500 
501         // update plugin configs
502         size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
503 
504         // re-create plugin sessions
505         CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
506         PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
507     }
508 
509     // start plugin sessions with configs
510     CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
511     PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
512     return Status::OK;
513 }
514 
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)515 Status ProfilerService::FetchData(ServerContext* context,
516                                   const ::FetchDataRequest* request,
517                                   ServerWriter<::FetchDataResponse>* writer)
518 {
519     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
520     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
521     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
522 
523     CHECK_POINTER_NOTNULL(request, "request invalid!");
524     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
525 
526     uint32_t sessionId = request->session_id();
527     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
528 
529     auto ctx = GetSessionContext(sessionId);
530     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
531 
532     // check each plugin session states
533     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
534                           "session status invalid!");
535 
536     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
537         auto dataRepeater = ctx->dataRepeater;
538         CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
539 
540         while (1) {
541             ctx = GetSessionContext(sessionId);
542             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
543 
544             FetchDataResponse response;
545             response.set_status(StatusCode::OK);
546             response.set_response_id(++responseIdCounter_);
547 
548             std::vector<ProfilerPluginDataPtr> pluginDataVec;
549             int count = dataRepeater->TakePluginData(pluginDataVec);
550             if (count > 0) {
551                 response.set_has_more(true);
552                 for (int i = 0; i < count; i++) {
553                     auto data = response.add_plugin_data();
554                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
555                     CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
556                     *data = *pluginDataVec[i];
557                 }
558             } else {
559                 response.set_has_more(false);
560                 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
561             }
562 
563             bool sendSuccess = writer->Write(response);
564             if (count <= 0 || !sendSuccess) {
565                 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
566                 break;
567             }
568         }
569     }
570 
571     PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
572     return Status::OK;
573 }
574 
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)575 Status ProfilerService::StopSession(ServerContext* context,
576                                     const ::StopSessionRequest* request,
577                                     ::StopSessionResponse* response)
578 {
579     CHECK_REQUEST_RESPONSE(context, request, response);
580     uint32_t sessionId = request->session_id();
581     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
582 
583     auto ctx = GetSessionContext(sessionId);
584     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
585     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
586         CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
587         ctx->traceFileWriter.get()->SetStopSplitFile(true);
588     }
589     CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
590     PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
591     return Status::OK;
592 }
593 
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)594 Status ProfilerService::DestroySession(ServerContext* context,
595                                        const ::DestroySessionRequest* request,
596                                        ::DestroySessionResponse* response)
597 {
598     CHECK_REQUEST_RESPONSE(context, request, response);
599 
600     uint32_t sessionId = request->session_id();
601     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
602 
603     auto ctx = GetSessionContext(sessionId);
604     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
605 
606     CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
607     CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
608                           "remove plugin session FAILED!");
609     PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
610 
611     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
612         uint32_t pluginId = 0;
613         PluginContextPtr pluginCtx = nullptr;
614         for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
615             auto pluginName = ctx->pluginNames[i];
616             std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
617             if (pluginCtx->isStandaloneFileData == true) {
618                 if (!ctx->sessionConfig.split_file()) {
619                     MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
620                                         pluginCtx->pluginVersion);
621                 }
622             }
623         }
624     }
625 
626     return Status::OK;
627 }
628 
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)629 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
630                                             const ::KeepSessionRequest* request,
631                                             ::KeepSessionResponse* response)
632 {
633     CHECK_REQUEST_RESPONSE(context, request, response);
634     uint32_t sessionId = request->session_id();
635     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
636 
637     auto ctx = GetSessionContext(sessionId);
638     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
639 
640     // update keep alive time if keep_alive_time parameter provided
641     auto keepAliveTime = request->keep_alive_time();
642     if (keepAliveTime) {
643         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
644         ctx->SetKeepAliveTime(keepAliveTime);
645     }
646 
647     // reschedule session timeout task
648     if (ctx->sessionConfig.keep_alive_time() > 0) {
649         ctx->StopSessionExpireTask(removeTask_);
650         ctx->StartSessionExpireTask(removeTask_);
651     }
652     PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
653     return Status::OK;
654 }
655 
656 struct LoggingInterceptor : public grpc::experimental::Interceptor {
657 public:
LoggingInterceptorLoggingInterceptor658     explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
659 
InterceptLoggingInterceptor660     void Intercept(experimental::InterceptorBatchMethods* methods) override
661     {
662         const char* method = info_->method();
663         if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
664             PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
665         } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
666             PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
667         }
668         methods->Proceed();
669     }
670 
671 private:
672     grpc::experimental::ServerRpcInfo* info_ = nullptr;
673 };
674 
675 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
676 protected:
CreateServerInterceptorInterceptorFactory677     grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
678     {
679         return new LoggingInterceptor(info);
680     }
681 };
682 
StartService(const std::string & listenUri)683 bool ProfilerService::StartService(const std::string& listenUri)
684 {
685     CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
686 
687     ServerBuilder builder;
688     builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
689     builder.RegisterService(this);
690 
691     auto server = builder.BuildAndStart();
692     CHECK_NOTNULL(server, false, "start service failed!");
693     PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
694     server_ = std::move(server);
695     return true;
696 }
697 
WaitServiceDone()698 void ProfilerService::WaitServiceDone()
699 {
700     if (server_) {
701         PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
702         server_->Wait();
703         PROFILER_LOG_INFO(LOG_CORE, "Server done!");
704     }
705 }
706 
StopService()707 void ProfilerService::StopService()
708 {
709     if (server_) {
710         server_->Shutdown();
711         PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
712     }
713 }