• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "result_demuxer.h"
28 #include "schedule_task_manager.h"
29 #include "trace_file_writer.h"
30 
31 using namespace ::grpc;
32 using PluginContextPtr = std::shared_ptr<PluginContext>;
33 
34 #define CHECK_REQUEST_RESPONSE(context, request, response)         \
35     do {                                                          \
36         CHECK_POINTER_NOTNULL(context, "context ptr invalid!");   \
37         CHECK_POINTER_NOTNULL(request, "request ptr invalid!");    \
38         CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
39     } while (0)
40 
41 #define CHECK_POINTER_NOTNULL(ptr, errorMessage)                              \
42     do {                                                                      \
43         if ((ptr) == nullptr) {                                                 \
44             HILOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
45             return {StatusCode::INTERNAL, errorMessage};                      \
46         }                                                                     \
47     } while (0)
48 
49 #define CHECK_EXPRESSION_TRUE(expr, errorMessage)                            \
50     do {                                                                     \
51         if (!(expr)) {                                                       \
52             HILOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
53             return {StatusCode::INTERNAL, (errorMessage)};                   \
54         }                                                                    \
55     } while (0)
56 
57 namespace {
58 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
59 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
60 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
61 } // namespace
62 
ProfilerService(const PluginServicePtr & pluginService)63 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
64     : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
65 {
66     pluginService_->SetPluginSessionManager(pluginSessionManager_);
67 }
68 
~ProfilerService()69 ProfilerService::~ProfilerService() {}
70 
~SessionContext()71 ProfilerService::SessionContext::~SessionContext()
72 {
73     HILOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
74     if (offlineTask.size() > 0) {
75         ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
76     }
77     StopSessionExpireTask();
78     service->pluginSessionManager_->RemovePluginSessions(pluginNames);
79 }
80 
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)81 Status ProfilerService::GetCapabilities(ServerContext* context,
82                                         const ::GetCapabilitiesRequest* request,
83                                         ::GetCapabilitiesResponse* response)
84 {
85     CHECK_REQUEST_RESPONSE(context, request, response);
86     HILOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
87 
88     HILOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
89     std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
90 
91     response->set_status(StatusCode::OK);
92     for (size_t i = 0; i < capabilities.size(); i++) {
93         *response->add_capabilities() = capabilities[i];
94     }
95     HILOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
96     return Status::OK;
97 }
98 
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)99 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
100 {
101     std::unordered_map<std::string, size_t> nameIndex;
102     for (size_t i = 0; i < pluginConfigs.size(); i++) {
103         nameIndex[pluginConfigs[i].name()] = i;
104     }
105 
106     size_t updateCount = 0;
107     for (auto& cfg : newPluginConfigs) {
108         auto it = nameIndex.find(cfg.name());
109         if (it != nameIndex.end()) {
110             int index = it->second;
111             pluginConfigs[index] = cfg;
112             updateCount++;
113         }
114     }
115     return updateCount;
116 }
117 
CreatePluginSessions()118 bool ProfilerService::SessionContext::CreatePluginSessions()
119 {
120     if (bufferConfigs.size() > 0) { // with buffer configs
121         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
122                    false, "create plugin sessions with buffer configs failed!");
123     } else { // without buffer configs
124         CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
125                    "create plugin sessions without buffer configs failed!");
126     }
127     return true;
128 }
129 
StartPluginSessions()130 bool ProfilerService::SessionContext::StartPluginSessions()
131 {
132     std::unique_lock<std::mutex> lock(sessionMutex);
133 
134     // if dataRepeater exists, reset it to usable state.
135     if (dataRepeater) {
136         dataRepeater->Reset();
137     }
138 
139     // start demuxer take result thread
140     if (resultDemuxer != nullptr && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
141         resultDemuxer->StartTakeResults(); // start write file thread
142         uint32_t sampleDuration = sessionConfig.sample_duration();
143         if (sampleDuration > 0) {
144             offlineTask = "stop-session-" + std::to_string(id);
145             std::weak_ptr<SessionContext> weakCtx(shared_from_this());
146             // start offline trace timeout task
147             ScheduleTaskManager::GetInstance().ScheduleTask(
148                 offlineTask,
149                 [weakCtx]() {
150                     if (auto ctx = weakCtx.lock(); ctx != nullptr) {
151                         ctx->StopPluginSessions();
152                     }
153                 },
154                 std::chrono::milliseconds(0), // do not repeat
155                 std::chrono::milliseconds(sampleDuration));
156 
157             // keep_alive_time not set by client, but the sample_duration setted
158             if (sessionConfig.keep_alive_time() == 0) {
159                 // use sample_duration add a little time to set keep_alive_time
160                 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
161                 StartSessionExpireTask();
162             }
163         }
164     }
165 
166     // start each plugin sessions
167     service->pluginSessionManager_->StartPluginSessions(pluginNames);
168     return true;
169 }
170 
StopPluginSessions()171 bool ProfilerService::SessionContext::StopPluginSessions()
172 {
173     std::unique_lock<std::mutex> lock(sessionMutex);
174     // stop each plugin sessions
175     service->pluginSessionManager_->StopPluginSessions(pluginNames);
176 
177     // stop demuxer take result thread
178     if (resultDemuxer && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
179         if (offlineTask.size() > 0) {
180             ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
181         }
182         resultDemuxer->StopTakeResults(); // stop write file thread
183     }
184 
185     // make sure FetchData thread exit
186     if (dataRepeater) {
187         dataRepeater->Close();
188     }
189     return true;
190 }
191 
192 namespace {
IsValidKeepAliveTime(uint32_t timeout)193 bool IsValidKeepAliveTime(uint32_t timeout)
194 {
195     if (timeout < MIN_SESSION_TIMEOUT_MS) {
196         return false;
197     }
198     if (timeout > MAX_SESSION_TIMEOUT_MS) {
199         return false;
200     }
201     return true;
202 }
203 }  // namespace
204 
SetKeepAliveTime(uint32_t timeout)205 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
206 {
207     if (timeout > 0) {
208         timeoutTask = "timeout-session-" + std::to_string(id);
209         sessionConfig.set_keep_alive_time(timeout);
210     }
211 }
212 
StartSessionExpireTask()213 void ProfilerService::SessionContext::StartSessionExpireTask()
214 {
215     if (timeoutTask.size() > 0) {
216         ScheduleTaskManager::GetInstance().ScheduleTask(timeoutTask,
217                                                         std::bind(&ProfilerService::RemoveSessionContext, service, id),
218                                                         std::chrono::milliseconds(0), // do not repeat
219                                                         std::chrono::milliseconds(sessionConfig.keep_alive_time()));
220     }
221 }
222 
StopSessionExpireTask()223 void ProfilerService::SessionContext::StopSessionExpireTask()
224 {
225     if (timeoutTask.size() > 0) {
226         ScheduleTaskManager::GetInstance().UnscheduleTask(timeoutTask);
227     }
228 }
229 
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)230 Status ProfilerService::CreateSession(ServerContext* context,
231                                       const ::CreateSessionRequest* request,
232                                       ::CreateSessionResponse* response)
233 {
234     CHECK_REQUEST_RESPONSE(context, request, response);
235     HILOG_INFO(LOG_CORE, "CreateSession from '%s'", context->peer().c_str());
236     CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
237 
238     // check plugin configs
239     HILOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
240     const int nConfigs = request->plugin_configs_size();
241     CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
242 
243     // check buffer configs
244     ProfilerSessionConfig sessionConfig = request->session_config();
245     const int nBuffers = sessionConfig.buffers_size();
246     CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
247     // copy plugin configs from request
248     std::vector<ProfilerPluginConfig> pluginConfigs;
249     pluginConfigs.reserve(nConfigs);
250 
251     for (int i = 0; i < nConfigs; i++) {
252         if (request->plugin_configs(i).name() == "nativehook" && getuid() != 0) {
253             NativeHookConfig hookConfig;
254             std::string cfgData = request->plugin_configs(i).config_data();
255             if (hookConfig.ParseFromArray(reinterpret_cast<const uint8_t*>(cfgData.c_str()), cfgData.size()) <= 0) {
256                 HILOG_ERROR(LOG_CORE, "%s: ParseFromArray failed", __func__);
257                 continue;
258             }
259             if (!COMMON::CheckApplicationPermission(hookConfig.pid(), hookConfig.process_name())) {
260                 HILOG_ERROR(LOG_CORE, "Application debug permisson denied!");
261                 continue;
262             }
263         }
264         pluginConfigs.push_back(request->plugin_configs(i));
265     }
266 
267     if (pluginConfigs.empty()) {
268         HILOG_ERROR(LOG_CORE, "No plugins are loaded!");
269         return Status(StatusCode::PERMISSION_DENIED, "");
270     }
271     // copy buffer configs
272     std::vector<BufferConfig> bufferConfigs;
273     if (nBuffers == 1) {
274         // if only one buffer config provided, all plugin use the same buffer config
275         bufferConfigs.resize(pluginConfigs.size(), sessionConfig.buffers(0));
276     } else if (nBuffers > 0) {
277         // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
278         bufferConfigs.assign(sessionConfig.buffers().begin(), sessionConfig.buffers().end());
279     }
280     HILOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
281     std::vector<std::string> pluginNames;
282     std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
283                    [](ProfilerPluginConfig& config) { return config.name(); });
284     std::sort(pluginNames.begin(), pluginNames.end());
285 
286     // create ProfilerDataRepeater
287     auto dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
288     CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
289 
290     // create ResultDemuxer
291     auto resultDemuxer = std::make_shared<ResultDemuxer>(dataRepeater, pluginSessionManager_);
292     CHECK_POINTER_NOTNULL(resultDemuxer, "alloc ResultDemuxer failed!");
293 
294     // create TraceFileWriter for offline mode
295     TraceFileWriterPtr traceWriter;
296     if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
297         auto resultFile = sessionConfig.result_file();
298         CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
299         traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig.split_file(),
300             sessionConfig.single_file_max_size_mb());
301         CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
302         resultDemuxer->SetTraceWriter(traceWriter);
303         for (int i = 0; i < pluginConfigs.size(); i++) {
304             ProfilerPluginData pluginData;
305             pluginData.set_name(pluginConfigs[i].name() + "_config");
306             pluginData.set_data(pluginConfigs[i].config_data());
307             std::vector<char> msgData(pluginData.ByteSizeLong());
308             if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
309                 HILOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
310             }
311             traceWriter->SetPluginConfig(msgData.data(), msgData.size());
312         }
313         traceWriter->Flush();
314     }
315 
316     // create session context
317     auto ctx = std::make_shared<SessionContext>();
318     CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
319 
320     // fill fields of SessionContext
321     ctx->service = this;
322     ctx->dataRepeater = dataRepeater;
323     ctx->resultDemuxer = resultDemuxer;
324     ctx->traceFileWriter = traceWriter;
325     ctx->sessionConfig = sessionConfig;
326     ctx->pluginNames = std::move(pluginNames);
327     ctx->pluginConfigs = std::move(pluginConfigs);
328     ctx->bufferConfigs = std::move(bufferConfigs);
329 
330     // create plugin sessions
331     CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
332     // alloc new session id
333     uint32_t sessionId = ++sessionIdCounter_;
334     ctx->id = sessionId;
335     ctx->name = "session-" + std::to_string(sessionId);
336 
337     // add {sessionId, ctx} to map
338     CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
339 
340     // create session expire schedule task
341     auto keepAliveTime = sessionConfig.keep_alive_time();
342     if (keepAliveTime) {
343         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
344         // create schedule task for session timeout feature
345         ctx->SetKeepAliveTime(keepAliveTime);
346         ctx->StartSessionExpireTask();
347     }
348 
349     // prepare response data fields
350     response->set_status(0);
351     response->set_session_id(sessionId);
352 
353     pluginService_->SetProfilerSessionConfig(sessionConfig);
354     HILOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
355     return Status::OK;
356 }
357 
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)358 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
359 {
360     std::unique_lock<std::mutex> lock(sessionContextMutex_);
361     if (sessionContext_.count(sessionId) > 0) {
362         HILOG_WARN(LOG_CORE, "sessionId already exists!");
363         return false;
364     }
365     sessionContext_[sessionId] = sessionCtx;
366     return true;
367 }
368 
GetSessionContext(uint32_t sessionId) const369 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
370 {
371     std::unique_lock<std::mutex> lock(sessionContextMutex_);
372     auto it = sessionContext_.find(sessionId);
373     if (it != sessionContext_.end()) {
374         auto ptr = it->second;
375         return ptr;
376     }
377     return nullptr;
378 }
379 
RemoveSessionContext(uint32_t sessionId)380 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
381 {
382     std::unique_lock<std::mutex> lock(sessionContextMutex_);
383     auto it = sessionContext_.find(sessionId);
384     if (it != sessionContext_.end()) {
385         auto ptr = it->second;
386         HILOG_INFO(LOG_CORE, "DelCtx %p use_count: %ld", ptr.get(), ptr.use_count());
387         sessionContext_.erase(it);
388         return true;
389     }
390     return false;
391 }
392 
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)393 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
394     const std::string& outputFile, const std::string& pluginVersion)
395 {
396     if (pluginName.empty() || outputFile.empty()) {
397         HILOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)", pluginName.c_str(), outputFile.c_str());
398         return;
399     }
400 
401     std::ifstream fsFile {}; // read from output file
402     fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
403     if (!fsFile.good()) {
404         HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
405         return;
406     }
407 
408     std::ofstream fsTarget {}; // write to profiler ouput file
409     fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
410     if (!fsTarget.good()) {
411         HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
412         return;
413     }
414     fsTarget.seekp(0, std::ios_base::end);
415     int posFile = fsTarget.tellp(); // for update sha256
416 
417     TraceFileHeader header {};
418     if (pluginName == "hiperf-plugin") {
419         header.data_.dataType = DataType::HIPERF_DATA;
420     } else {
421         header.data_.dataType = DataType::STANDALONE_DATA;
422     }
423     fsFile.seekg(0, std::ios_base::end);
424     uint64_t fileSize = (uint64_t)(fsFile.tellg());
425     header.data_.length += fileSize;
426     size_t pluginSize = sizeof(header.data_.standalonePluginName);
427     int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
428     if (ret != EOK) {
429         HILOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
430         return;
431     }
432     pluginSize = sizeof(header.data_.pluginVersion);
433     ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
434     if (ret != EOK) {
435         HILOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
436         return;
437     }
438     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
439     if (!fsTarget.good()) {
440         HILOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
441         return;
442     }
443 
444     SHA256_CTX sha256Ctx;
445     SHA256_Init(&sha256Ctx);
446     constexpr uint64_t bufSize = 4 * 1024 * 1024;
447     std::vector<char> buf(bufSize);
448     uint64_t readSize = 0;
449     fsFile.seekg(0);
450     while ((readSize = std::min(bufSize, fileSize)) > 0) {
451         fsFile.read(buf.data(), readSize);
452         fsTarget.write(buf.data(), readSize);
453         if (!fsTarget.good()) {
454             HILOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
455             return;
456         }
457         fileSize -= readSize;
458 
459         SHA256_Update(&sha256Ctx, buf.data(), readSize);
460     }
461     SHA256_Final(header.data_.sha256, &sha256Ctx);
462     fsTarget.seekp(posFile, std::ios_base::beg);
463     fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
464 
465     fsFile.close();
466     fsTarget.close();
467 
468     HILOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
469 }
470 
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)471 Status ProfilerService::StartSession(ServerContext* context,
472                                      const ::StartSessionRequest* request,
473                                      ::StartSessionResponse* response)
474 {
475     CHECK_REQUEST_RESPONSE(context, request, response);
476     HILOG_INFO(LOG_CORE, "StartSession from '%s'", context->peer().c_str());
477 
478     uint32_t sessionId = request->session_id();
479     HILOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
480 
481     // copy plugin configs from request
482     std::vector<ProfilerPluginConfig> newPluginConfigs;
483     newPluginConfigs.reserve(request->update_configs_size());
484     for (int i = 0; i < request->update_configs_size(); i++) {
485         HILOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
486         newPluginConfigs.push_back(request->update_configs(i));
487     }
488 
489     // get plugin names in request
490     std::vector<std::string> requestNames;
491     std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
492                    [](auto& config) { return config.name(); });
493     std::sort(requestNames.begin(), requestNames.end());
494 
495     auto ctx = GetSessionContext(sessionId);
496     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
497 
498     // get intersection set of requestNames and pluginNames
499     std::vector<std::string> updateNames;
500     std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
501                           std::back_inserter(updateNames));
502 
503     if (updateNames.size() > 0) {
504         // remove old plugin sessions
505         pluginSessionManager_->RemovePluginSessions(updateNames);
506 
507         // update plugin configs
508         size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
509 
510         // re-create plugin sessions
511         CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
512         HILOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
513     }
514 
515     // start plugin sessions with configs
516     CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
517     HILOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
518     return Status::OK;
519 }
520 
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)521 Status ProfilerService::FetchData(ServerContext* context,
522                                   const ::FetchDataRequest* request,
523                                   ServerWriter<::FetchDataResponse>* writer)
524 {
525     CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
526     CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
527     CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
528 
529     HILOG_INFO(LOG_CORE, "FetchData from '%s'", context->peer().c_str());
530     CHECK_POINTER_NOTNULL(request, "request invalid!");
531     CHECK_POINTER_NOTNULL(writer, "writer invalid!");
532 
533     uint32_t sessionId = request->session_id();
534     HILOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
535 
536     auto ctx = GetSessionContext(sessionId);
537     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
538 
539     // check each plugin session states
540     CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
541                           "session status invalid!");
542 
543     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
544         auto dataRepeater = ctx->dataRepeater;
545         CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
546 
547         while (1) {
548             ctx = GetSessionContext(sessionId);
549             CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
550 
551             FetchDataResponse response;
552             response.set_status(StatusCode::OK);
553             response.set_response_id(++responseIdCounter_);
554 
555             std::vector<ProfilerPluginDataPtr> pluginDataVec;
556             int count = dataRepeater->TakePluginData(pluginDataVec);
557             if (count > 0) {
558                 response.set_has_more(true);
559                 for (int i = 0; i < count; i++) {
560                     auto data = response.add_plugin_data();
561                     CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
562                     CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
563                     *data = *pluginDataVec[i];
564                 }
565             } else {
566                 response.set_has_more(false);
567                 HILOG_INFO(LOG_CORE, "no more data need to fill to response!");
568             }
569 
570             bool sendSuccess = writer->Write(response);
571             if (count <= 0 || !sendSuccess) {
572                 HILOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
573                 break;
574             }
575         }
576     }
577 
578     HILOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
579     return Status::OK;
580 }
581 
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)582 Status ProfilerService::StopSession(ServerContext* context,
583                                     const ::StopSessionRequest* request,
584                                     ::StopSessionResponse* response)
585 {
586     CHECK_REQUEST_RESPONSE(context, request, response);
587     HILOG_INFO(LOG_CORE, "StopSession from '%s'", context->peer().c_str());
588 
589     uint32_t sessionId = request->session_id();
590     HILOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
591 
592     auto ctx = GetSessionContext(sessionId);
593     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
594     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
595         CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
596         ctx->traceFileWriter.get()->SetStopSplitFile(true);
597     }
598     CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
599     HILOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
600     return Status::OK;
601 }
602 
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)603 Status ProfilerService::DestroySession(ServerContext* context,
604                                        const ::DestroySessionRequest* request,
605                                        ::DestroySessionResponse* response)
606 {
607     CHECK_REQUEST_RESPONSE(context, request, response);
608     HILOG_INFO(LOG_CORE, "DestroySession from '%s'", context->peer().c_str());
609 
610     uint32_t sessionId = request->session_id();
611     HILOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
612 
613     auto ctx = GetSessionContext(sessionId);
614     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
615 
616     CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
617     CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
618                           "remove plugin session FAILED!");
619     HILOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
620 
621     if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
622         uint32_t pluginId = 0;
623         PluginContextPtr pluginCtx = nullptr;
624         for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
625             auto pluginName = ctx->pluginNames[i];
626             std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
627             if (pluginCtx->isStandaloneFileData == true) {
628                 std::string file = ctx->sessionConfig.result_file();
629                 if (ctx->sessionConfig.split_file()) {
630                     file = ctx->traceFileWriter.get()->Path();
631                 }
632                 MergeStandaloneFile(file, pluginName, pluginCtx->outFileName, pluginCtx->pluginVersion);
633             }
634         }
635     }
636 
637     return Status::OK;
638 }
639 
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)640 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
641                                             const ::KeepSessionRequest* request,
642                                             ::KeepSessionResponse* response)
643 {
644     CHECK_REQUEST_RESPONSE(context, request, response);
645     HILOG_INFO(LOG_CORE, "KeepSession from '%s'", context->peer().c_str());
646 
647     uint32_t sessionId = request->session_id();
648     HILOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
649 
650     auto ctx = GetSessionContext(sessionId);
651     CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
652 
653     // update keep alive time if keep_alive_time parameter provided
654     auto keepAliveTime = request->keep_alive_time();
655     if (keepAliveTime) {
656         CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
657         ctx->SetKeepAliveTime(keepAliveTime);
658     }
659 
660     // reschedule session timeout task
661     if (ctx->timeoutTask.size() > 0) {
662         ctx->StopSessionExpireTask();
663         ctx->StartSessionExpireTask();
664     }
665     HILOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
666     return Status::OK;
667 }
668 
669 struct LoggingInterceptor : public grpc::experimental::Interceptor {
670 public:
LoggingInterceptorLoggingInterceptor671     explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
672 
InterceptLoggingInterceptor673     void Intercept(experimental::InterceptorBatchMethods* methods) override
674     {
675         const char* method = info_->method();
676         if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
677             HILOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
678         } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
679             HILOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
680         }
681         methods->Proceed();
682     }
683 
684 private:
685     grpc::experimental::ServerRpcInfo* info_ = nullptr;
686 };
687 
688 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
689 protected:
CreateServerInterceptorInterceptorFactory690     grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
691     {
692         return new LoggingInterceptor(info);
693     }
694 };
695 
StartService(const std::string & listenUri)696 bool ProfilerService::StartService(const std::string& listenUri)
697 {
698     if (listenUri == "") {
699         HILOG_WARN(LOG_CORE, "listenUri empty!");
700         return false;
701     }
702 
703     ServerBuilder builder;
704     builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
705     builder.RegisterService(this);
706 
707     auto server = builder.BuildAndStart();
708     CHECK_NOTNULL(server, false, "start service on %s failed!", listenUri.c_str());
709     HILOG_INFO(LOG_CORE, "Server listening on %s", listenUri.c_str());
710 
711     server_ = std::move(server);
712     return true;
713 }
714 
WaitServiceDone()715 void ProfilerService::WaitServiceDone()
716 {
717     if (server_) {
718         HILOG_INFO(LOG_CORE, "waiting Server...");
719         server_->Wait();
720         HILOG_INFO(LOG_CORE, "Server done!");
721     }
722 }
723 
StopService()724 void ProfilerService::StopService()
725 {
726     if (server_) {
727         server_->Shutdown();
728         HILOG_INFO(LOG_CORE, "Server stop done!");
729     }
730 }
731