1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "result_demuxer.h"
28 #include "schedule_task_manager.h"
29 #include "trace_file_writer.h"
30
31 using namespace ::grpc;
32 using PluginContextPtr = std::shared_ptr<PluginContext>;
33
34 #define CHECK_REQUEST_RESPONSE(context, request, response) \
35 do { \
36 CHECK_POINTER_NOTNULL(context, "context ptr invalid!"); \
37 CHECK_POINTER_NOTNULL(request, "request ptr invalid!"); \
38 CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
39 } while (0)
40
41 #define CHECK_POINTER_NOTNULL(ptr, errorMessage) \
42 do { \
43 if ((ptr) == nullptr) { \
44 HILOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
45 return {StatusCode::INTERNAL, errorMessage}; \
46 } \
47 } while (0)
48
49 #define CHECK_EXPRESSION_TRUE(expr, errorMessage) \
50 do { \
51 if (!(expr)) { \
52 HILOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
53 return {StatusCode::INTERNAL, (errorMessage)}; \
54 } \
55 } while (0)
56
57 namespace {
58 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
59 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
60 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
61 } // namespace
62
ProfilerService(const PluginServicePtr & pluginService)63 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
64 : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
65 {
66 pluginService_->SetPluginSessionManager(pluginSessionManager_);
67 }
68
~ProfilerService()69 ProfilerService::~ProfilerService() {}
70
~SessionContext()71 ProfilerService::SessionContext::~SessionContext()
72 {
73 HILOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
74 if (offlineTask.size() > 0) {
75 ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
76 }
77 StopSessionExpireTask();
78 service->pluginSessionManager_->RemovePluginSessions(pluginNames);
79 }
80
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)81 Status ProfilerService::GetCapabilities(ServerContext* context,
82 const ::GetCapabilitiesRequest* request,
83 ::GetCapabilitiesResponse* response)
84 {
85 CHECK_REQUEST_RESPONSE(context, request, response);
86 HILOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
87
88 HILOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
89 std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
90
91 response->set_status(StatusCode::OK);
92 for (size_t i = 0; i < capabilities.size(); i++) {
93 *response->add_capabilities() = capabilities[i];
94 }
95 HILOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
96 return Status::OK;
97 }
98
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)99 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
100 {
101 std::unordered_map<std::string, size_t> nameIndex;
102 for (size_t i = 0; i < pluginConfigs.size(); i++) {
103 nameIndex[pluginConfigs[i].name()] = i;
104 }
105
106 size_t updateCount = 0;
107 for (auto& cfg : newPluginConfigs) {
108 auto it = nameIndex.find(cfg.name());
109 if (it != nameIndex.end()) {
110 int index = it->second;
111 pluginConfigs[index] = cfg;
112 updateCount++;
113 }
114 }
115 return updateCount;
116 }
117
CreatePluginSessions()118 bool ProfilerService::SessionContext::CreatePluginSessions()
119 {
120 if (bufferConfigs.size() > 0) { // with buffer configs
121 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
122 false, "create plugin sessions with buffer configs failed!");
123 } else { // without buffer configs
124 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
125 "create plugin sessions without buffer configs failed!");
126 }
127 return true;
128 }
129
StartPluginSessions()130 bool ProfilerService::SessionContext::StartPluginSessions()
131 {
132 std::unique_lock<std::mutex> lock(sessionMutex);
133
134 // if dataRepeater exists, reset it to usable state.
135 if (dataRepeater) {
136 dataRepeater->Reset();
137 }
138
139 // start demuxer take result thread
140 if (resultDemuxer != nullptr && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
141 resultDemuxer->StartTakeResults(); // start write file thread
142 uint32_t sampleDuration = sessionConfig.sample_duration();
143 if (sampleDuration > 0) {
144 offlineTask = "stop-session-" + std::to_string(id);
145 std::weak_ptr<SessionContext> weakCtx(shared_from_this());
146 // start offline trace timeout task
147 ScheduleTaskManager::GetInstance().ScheduleTask(
148 offlineTask,
149 [weakCtx]() {
150 if (auto ctx = weakCtx.lock(); ctx != nullptr) {
151 ctx->StopPluginSessions();
152 }
153 },
154 std::chrono::milliseconds(0), // do not repeat
155 std::chrono::milliseconds(sampleDuration));
156
157 // keep_alive_time not set by client, but the sample_duration setted
158 if (sessionConfig.keep_alive_time() == 0) {
159 // use sample_duration add a little time to set keep_alive_time
160 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
161 StartSessionExpireTask();
162 }
163 }
164 }
165
166 // start each plugin sessions
167 service->pluginSessionManager_->StartPluginSessions(pluginNames);
168 return true;
169 }
170
StopPluginSessions()171 bool ProfilerService::SessionContext::StopPluginSessions()
172 {
173 std::unique_lock<std::mutex> lock(sessionMutex);
174 // stop each plugin sessions
175 service->pluginSessionManager_->StopPluginSessions(pluginNames);
176
177 // stop demuxer take result thread
178 if (resultDemuxer && sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
179 if (offlineTask.size() > 0) {
180 ScheduleTaskManager::GetInstance().UnscheduleTask(offlineTask);
181 }
182 resultDemuxer->StopTakeResults(); // stop write file thread
183 }
184
185 // make sure FetchData thread exit
186 if (dataRepeater) {
187 dataRepeater->Close();
188 }
189 return true;
190 }
191
192 namespace {
IsValidKeepAliveTime(uint32_t timeout)193 bool IsValidKeepAliveTime(uint32_t timeout)
194 {
195 if (timeout < MIN_SESSION_TIMEOUT_MS) {
196 return false;
197 }
198 if (timeout > MAX_SESSION_TIMEOUT_MS) {
199 return false;
200 }
201 return true;
202 }
203 } // namespace
204
SetKeepAliveTime(uint32_t timeout)205 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
206 {
207 if (timeout > 0) {
208 timeoutTask = "timeout-session-" + std::to_string(id);
209 sessionConfig.set_keep_alive_time(timeout);
210 }
211 }
212
StartSessionExpireTask()213 void ProfilerService::SessionContext::StartSessionExpireTask()
214 {
215 if (timeoutTask.size() > 0) {
216 ScheduleTaskManager::GetInstance().ScheduleTask(timeoutTask,
217 std::bind(&ProfilerService::RemoveSessionContext, service, id),
218 std::chrono::milliseconds(0), // do not repeat
219 std::chrono::milliseconds(sessionConfig.keep_alive_time()));
220 }
221 }
222
StopSessionExpireTask()223 void ProfilerService::SessionContext::StopSessionExpireTask()
224 {
225 if (timeoutTask.size() > 0) {
226 ScheduleTaskManager::GetInstance().UnscheduleTask(timeoutTask);
227 }
228 }
229
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)230 Status ProfilerService::CreateSession(ServerContext* context,
231 const ::CreateSessionRequest* request,
232 ::CreateSessionResponse* response)
233 {
234 CHECK_REQUEST_RESPONSE(context, request, response);
235 HILOG_INFO(LOG_CORE, "CreateSession from '%s'", context->peer().c_str());
236 CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
237
238 // check plugin configs
239 HILOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
240 const int nConfigs = request->plugin_configs_size();
241 CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
242
243 // check buffer configs
244 ProfilerSessionConfig sessionConfig = request->session_config();
245 const int nBuffers = sessionConfig.buffers_size();
246 CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
247 // copy plugin configs from request
248 std::vector<ProfilerPluginConfig> pluginConfigs;
249 pluginConfigs.reserve(nConfigs);
250
251 for (int i = 0; i < nConfigs; i++) {
252 if (request->plugin_configs(i).name() == "nativehook" && getuid() != 0) {
253 NativeHookConfig hookConfig;
254 std::string cfgData = request->plugin_configs(i).config_data();
255 if (hookConfig.ParseFromArray(reinterpret_cast<const uint8_t*>(cfgData.c_str()), cfgData.size()) <= 0) {
256 HILOG_ERROR(LOG_CORE, "%s: ParseFromArray failed", __func__);
257 continue;
258 }
259 if (!COMMON::CheckApplicationPermission(hookConfig.pid(), hookConfig.process_name())) {
260 HILOG_ERROR(LOG_CORE, "Application debug permisson denied!");
261 continue;
262 }
263 }
264 pluginConfigs.push_back(request->plugin_configs(i));
265 }
266
267 if (pluginConfigs.empty()) {
268 HILOG_ERROR(LOG_CORE, "No plugins are loaded!");
269 return Status(StatusCode::PERMISSION_DENIED, "");
270 }
271 // copy buffer configs
272 std::vector<BufferConfig> bufferConfigs;
273 if (nBuffers == 1) {
274 // if only one buffer config provided, all plugin use the same buffer config
275 bufferConfigs.resize(pluginConfigs.size(), sessionConfig.buffers(0));
276 } else if (nBuffers > 0) {
277 // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
278 bufferConfigs.assign(sessionConfig.buffers().begin(), sessionConfig.buffers().end());
279 }
280 HILOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
281 std::vector<std::string> pluginNames;
282 std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
283 [](ProfilerPluginConfig& config) { return config.name(); });
284 std::sort(pluginNames.begin(), pluginNames.end());
285
286 // create ProfilerDataRepeater
287 auto dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
288 CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
289
290 // create ResultDemuxer
291 auto resultDemuxer = std::make_shared<ResultDemuxer>(dataRepeater, pluginSessionManager_);
292 CHECK_POINTER_NOTNULL(resultDemuxer, "alloc ResultDemuxer failed!");
293
294 // create TraceFileWriter for offline mode
295 TraceFileWriterPtr traceWriter;
296 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
297 auto resultFile = sessionConfig.result_file();
298 CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
299 traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig.split_file(),
300 sessionConfig.single_file_max_size_mb());
301 CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
302 resultDemuxer->SetTraceWriter(traceWriter);
303 for (int i = 0; i < pluginConfigs.size(); i++) {
304 ProfilerPluginData pluginData;
305 pluginData.set_name(pluginConfigs[i].name() + "_config");
306 pluginData.set_data(pluginConfigs[i].config_data());
307 std::vector<char> msgData(pluginData.ByteSizeLong());
308 if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
309 HILOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
310 }
311 traceWriter->SetPluginConfig(msgData.data(), msgData.size());
312 }
313 traceWriter->Flush();
314 }
315
316 // create session context
317 auto ctx = std::make_shared<SessionContext>();
318 CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
319
320 // fill fields of SessionContext
321 ctx->service = this;
322 ctx->dataRepeater = dataRepeater;
323 ctx->resultDemuxer = resultDemuxer;
324 ctx->traceFileWriter = traceWriter;
325 ctx->sessionConfig = sessionConfig;
326 ctx->pluginNames = std::move(pluginNames);
327 ctx->pluginConfigs = std::move(pluginConfigs);
328 ctx->bufferConfigs = std::move(bufferConfigs);
329
330 // create plugin sessions
331 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
332 // alloc new session id
333 uint32_t sessionId = ++sessionIdCounter_;
334 ctx->id = sessionId;
335 ctx->name = "session-" + std::to_string(sessionId);
336
337 // add {sessionId, ctx} to map
338 CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
339
340 // create session expire schedule task
341 auto keepAliveTime = sessionConfig.keep_alive_time();
342 if (keepAliveTime) {
343 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
344 // create schedule task for session timeout feature
345 ctx->SetKeepAliveTime(keepAliveTime);
346 ctx->StartSessionExpireTask();
347 }
348
349 // prepare response data fields
350 response->set_status(0);
351 response->set_session_id(sessionId);
352
353 pluginService_->SetProfilerSessionConfig(sessionConfig);
354 HILOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
355 return Status::OK;
356 }
357
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)358 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
359 {
360 std::unique_lock<std::mutex> lock(sessionContextMutex_);
361 if (sessionContext_.count(sessionId) > 0) {
362 HILOG_WARN(LOG_CORE, "sessionId already exists!");
363 return false;
364 }
365 sessionContext_[sessionId] = sessionCtx;
366 return true;
367 }
368
GetSessionContext(uint32_t sessionId) const369 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
370 {
371 std::unique_lock<std::mutex> lock(sessionContextMutex_);
372 auto it = sessionContext_.find(sessionId);
373 if (it != sessionContext_.end()) {
374 auto ptr = it->second;
375 return ptr;
376 }
377 return nullptr;
378 }
379
RemoveSessionContext(uint32_t sessionId)380 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
381 {
382 std::unique_lock<std::mutex> lock(sessionContextMutex_);
383 auto it = sessionContext_.find(sessionId);
384 if (it != sessionContext_.end()) {
385 auto ptr = it->second;
386 HILOG_INFO(LOG_CORE, "DelCtx %p use_count: %ld", ptr.get(), ptr.use_count());
387 sessionContext_.erase(it);
388 return true;
389 }
390 return false;
391 }
392
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)393 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
394 const std::string& outputFile, const std::string& pluginVersion)
395 {
396 if (pluginName.empty() || outputFile.empty()) {
397 HILOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)", pluginName.c_str(), outputFile.c_str());
398 return;
399 }
400
401 std::ifstream fsFile {}; // read from output file
402 fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
403 if (!fsFile.good()) {
404 HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
405 return;
406 }
407
408 std::ofstream fsTarget {}; // write to profiler ouput file
409 fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
410 if (!fsTarget.good()) {
411 HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
412 return;
413 }
414 fsTarget.seekp(0, std::ios_base::end);
415 int posFile = fsTarget.tellp(); // for update sha256
416
417 TraceFileHeader header {};
418 if (pluginName == "hiperf-plugin") {
419 header.data_.dataType = DataType::HIPERF_DATA;
420 } else {
421 header.data_.dataType = DataType::STANDALONE_DATA;
422 }
423 fsFile.seekg(0, std::ios_base::end);
424 uint64_t fileSize = (uint64_t)(fsFile.tellg());
425 header.data_.length += fileSize;
426 size_t pluginSize = sizeof(header.data_.standalonePluginName);
427 int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
428 if (ret != EOK) {
429 HILOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
430 return;
431 }
432 pluginSize = sizeof(header.data_.pluginVersion);
433 ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
434 if (ret != EOK) {
435 HILOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
436 return;
437 }
438 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
439 if (!fsTarget.good()) {
440 HILOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
441 return;
442 }
443
444 SHA256_CTX sha256Ctx;
445 SHA256_Init(&sha256Ctx);
446 constexpr uint64_t bufSize = 4 * 1024 * 1024;
447 std::vector<char> buf(bufSize);
448 uint64_t readSize = 0;
449 fsFile.seekg(0);
450 while ((readSize = std::min(bufSize, fileSize)) > 0) {
451 fsFile.read(buf.data(), readSize);
452 fsTarget.write(buf.data(), readSize);
453 if (!fsTarget.good()) {
454 HILOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
455 return;
456 }
457 fileSize -= readSize;
458
459 SHA256_Update(&sha256Ctx, buf.data(), readSize);
460 }
461 SHA256_Final(header.data_.sha256, &sha256Ctx);
462 fsTarget.seekp(posFile, std::ios_base::beg);
463 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
464
465 fsFile.close();
466 fsTarget.close();
467
468 HILOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
469 }
470
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)471 Status ProfilerService::StartSession(ServerContext* context,
472 const ::StartSessionRequest* request,
473 ::StartSessionResponse* response)
474 {
475 CHECK_REQUEST_RESPONSE(context, request, response);
476 HILOG_INFO(LOG_CORE, "StartSession from '%s'", context->peer().c_str());
477
478 uint32_t sessionId = request->session_id();
479 HILOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
480
481 // copy plugin configs from request
482 std::vector<ProfilerPluginConfig> newPluginConfigs;
483 newPluginConfigs.reserve(request->update_configs_size());
484 for (int i = 0; i < request->update_configs_size(); i++) {
485 HILOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
486 newPluginConfigs.push_back(request->update_configs(i));
487 }
488
489 // get plugin names in request
490 std::vector<std::string> requestNames;
491 std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
492 [](auto& config) { return config.name(); });
493 std::sort(requestNames.begin(), requestNames.end());
494
495 auto ctx = GetSessionContext(sessionId);
496 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
497
498 // get intersection set of requestNames and pluginNames
499 std::vector<std::string> updateNames;
500 std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
501 std::back_inserter(updateNames));
502
503 if (updateNames.size() > 0) {
504 // remove old plugin sessions
505 pluginSessionManager_->RemovePluginSessions(updateNames);
506
507 // update plugin configs
508 size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
509
510 // re-create plugin sessions
511 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
512 HILOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
513 }
514
515 // start plugin sessions with configs
516 CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
517 HILOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
518 return Status::OK;
519 }
520
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)521 Status ProfilerService::FetchData(ServerContext* context,
522 const ::FetchDataRequest* request,
523 ServerWriter<::FetchDataResponse>* writer)
524 {
525 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
526 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
527 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
528
529 HILOG_INFO(LOG_CORE, "FetchData from '%s'", context->peer().c_str());
530 CHECK_POINTER_NOTNULL(request, "request invalid!");
531 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
532
533 uint32_t sessionId = request->session_id();
534 HILOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
535
536 auto ctx = GetSessionContext(sessionId);
537 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
538
539 // check each plugin session states
540 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
541 "session status invalid!");
542
543 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
544 auto dataRepeater = ctx->dataRepeater;
545 CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
546
547 while (1) {
548 ctx = GetSessionContext(sessionId);
549 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
550
551 FetchDataResponse response;
552 response.set_status(StatusCode::OK);
553 response.set_response_id(++responseIdCounter_);
554
555 std::vector<ProfilerPluginDataPtr> pluginDataVec;
556 int count = dataRepeater->TakePluginData(pluginDataVec);
557 if (count > 0) {
558 response.set_has_more(true);
559 for (int i = 0; i < count; i++) {
560 auto data = response.add_plugin_data();
561 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
562 CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
563 *data = *pluginDataVec[i];
564 }
565 } else {
566 response.set_has_more(false);
567 HILOG_INFO(LOG_CORE, "no more data need to fill to response!");
568 }
569
570 bool sendSuccess = writer->Write(response);
571 if (count <= 0 || !sendSuccess) {
572 HILOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
573 break;
574 }
575 }
576 }
577
578 HILOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
579 return Status::OK;
580 }
581
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)582 Status ProfilerService::StopSession(ServerContext* context,
583 const ::StopSessionRequest* request,
584 ::StopSessionResponse* response)
585 {
586 CHECK_REQUEST_RESPONSE(context, request, response);
587 HILOG_INFO(LOG_CORE, "StopSession from '%s'", context->peer().c_str());
588
589 uint32_t sessionId = request->session_id();
590 HILOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
591
592 auto ctx = GetSessionContext(sessionId);
593 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
594 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
595 CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
596 ctx->traceFileWriter.get()->SetStopSplitFile(true);
597 }
598 CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
599 HILOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
600 return Status::OK;
601 }
602
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)603 Status ProfilerService::DestroySession(ServerContext* context,
604 const ::DestroySessionRequest* request,
605 ::DestroySessionResponse* response)
606 {
607 CHECK_REQUEST_RESPONSE(context, request, response);
608 HILOG_INFO(LOG_CORE, "DestroySession from '%s'", context->peer().c_str());
609
610 uint32_t sessionId = request->session_id();
611 HILOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
612
613 auto ctx = GetSessionContext(sessionId);
614 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
615
616 CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
617 CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
618 "remove plugin session FAILED!");
619 HILOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
620
621 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
622 uint32_t pluginId = 0;
623 PluginContextPtr pluginCtx = nullptr;
624 for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
625 auto pluginName = ctx->pluginNames[i];
626 std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
627 if (pluginCtx->isStandaloneFileData == true) {
628 std::string file = ctx->sessionConfig.result_file();
629 if (ctx->sessionConfig.split_file()) {
630 file = ctx->traceFileWriter.get()->Path();
631 }
632 MergeStandaloneFile(file, pluginName, pluginCtx->outFileName, pluginCtx->pluginVersion);
633 }
634 }
635 }
636
637 return Status::OK;
638 }
639
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)640 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
641 const ::KeepSessionRequest* request,
642 ::KeepSessionResponse* response)
643 {
644 CHECK_REQUEST_RESPONSE(context, request, response);
645 HILOG_INFO(LOG_CORE, "KeepSession from '%s'", context->peer().c_str());
646
647 uint32_t sessionId = request->session_id();
648 HILOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
649
650 auto ctx = GetSessionContext(sessionId);
651 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
652
653 // update keep alive time if keep_alive_time parameter provided
654 auto keepAliveTime = request->keep_alive_time();
655 if (keepAliveTime) {
656 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
657 ctx->SetKeepAliveTime(keepAliveTime);
658 }
659
660 // reschedule session timeout task
661 if (ctx->timeoutTask.size() > 0) {
662 ctx->StopSessionExpireTask();
663 ctx->StartSessionExpireTask();
664 }
665 HILOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
666 return Status::OK;
667 }
668
669 struct LoggingInterceptor : public grpc::experimental::Interceptor {
670 public:
LoggingInterceptorLoggingInterceptor671 explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
672
InterceptLoggingInterceptor673 void Intercept(experimental::InterceptorBatchMethods* methods) override
674 {
675 const char* method = info_->method();
676 if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
677 HILOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
678 } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
679 HILOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
680 }
681 methods->Proceed();
682 }
683
684 private:
685 grpc::experimental::ServerRpcInfo* info_ = nullptr;
686 };
687
688 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
689 protected:
CreateServerInterceptorInterceptorFactory690 grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
691 {
692 return new LoggingInterceptor(info);
693 }
694 };
695
StartService(const std::string & listenUri)696 bool ProfilerService::StartService(const std::string& listenUri)
697 {
698 if (listenUri == "") {
699 HILOG_WARN(LOG_CORE, "listenUri empty!");
700 return false;
701 }
702
703 ServerBuilder builder;
704 builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
705 builder.RegisterService(this);
706
707 auto server = builder.BuildAndStart();
708 CHECK_NOTNULL(server, false, "start service on %s failed!", listenUri.c_str());
709 HILOG_INFO(LOG_CORE, "Server listening on %s", listenUri.c_str());
710
711 server_ = std::move(server);
712 return true;
713 }
714
WaitServiceDone()715 void ProfilerService::WaitServiceDone()
716 {
717 if (server_) {
718 HILOG_INFO(LOG_CORE, "waiting Server...");
719 server_->Wait();
720 HILOG_INFO(LOG_CORE, "Server done!");
721 }
722 }
723
StopService()724 void ProfilerService::StopService()
725 {
726 if (server_) {
727 server_->Shutdown();
728 HILOG_INFO(LOG_CORE, "Server stop done!");
729 }
730 }
731