1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "trace_file_writer.h"
28 #include <cinttypes>
29
30 using namespace ::grpc;
31 using PluginContextPtr = std::shared_ptr<PluginContext>;
32
33 #define CHECK_REQUEST_RESPONSE(context, request, response) \
34 do { \
35 CHECK_POINTER_NOTNULL(context, "context ptr invalid!"); \
36 CHECK_POINTER_NOTNULL(request, "request ptr invalid!"); \
37 CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
38 } while (0)
39
40 #define CHECK_POINTER_NOTNULL(ptr, errorMessage) \
41 do { \
42 if ((ptr) == nullptr) { \
43 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
44 return {StatusCode::INTERNAL, errorMessage}; \
45 } \
46 } while (0)
47
48 #define CHECK_EXPRESSION_TRUE(expr, errorMessage) \
49 do { \
50 if (!(expr)) { \
51 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
52 return {StatusCode::INTERNAL, (errorMessage)}; \
53 } \
54 } while (0)
55
56 namespace {
57 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
58 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
59 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
60 constexpr uint64_t CHECK_HEARTBEAT_INTERVAL = 500;
61 const std::string HIPROFILER_PLUGINS_NAME("hiprofiler_plugins");
62 #ifdef PERFORMANCE_DEBUG
63 constexpr uint64_t S_TO_NS = 1000 * 1000 * 1000;
64 #endif
65 } // namespace
66
ProfilerService(const PluginServicePtr & pluginService)67 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
68 : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
69 {
70 pluginService_->SetPluginSessionManager(pluginSessionManager_);
71 }
72
~ProfilerService()73 ProfilerService::~ProfilerService() {}
74
~SessionContext()75 ProfilerService::SessionContext::~SessionContext()
76 {
77 PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
78 if (offlineScheduleTaskFd != -1) {
79 stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
80 }
81 StopSessionExpireTask(service->removeTask_);
82 service->pluginSessionManager_->RemovePluginSessions(pluginNames);
83 }
84
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)85 Status ProfilerService::GetCapabilities(ServerContext* context,
86 const ::GetCapabilitiesRequest* request,
87 ::GetCapabilitiesResponse* response)
88 {
89 CHECK_REQUEST_RESPONSE(context, request, response);
90 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
91
92 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
93 std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
94
95 response->set_status(StatusCode::OK);
96 for (size_t i = 0; i < capabilities.size(); i++) {
97 *response->add_capabilities() = capabilities[i];
98 }
99 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
100 return Status::OK;
101 }
102
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)103 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
104 {
105 std::unordered_map<std::string, size_t> nameIndex;
106 for (size_t i = 0; i < pluginConfigs.size(); i++) {
107 nameIndex[pluginConfigs[i].name()] = i;
108 }
109
110 size_t updateCount = 0;
111 for (auto& cfg : newPluginConfigs) {
112 auto it = nameIndex.find(cfg.name());
113 if (it != nameIndex.end()) {
114 size_t index = it->second;
115 pluginConfigs[index] = cfg;
116 updateCount++;
117 }
118 }
119 return updateCount;
120 }
121
CreatePluginSessions()122 bool ProfilerService::SessionContext::CreatePluginSessions()
123 {
124 if (bufferConfigs.size() > 0) { // with buffer configs
125 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater,
126 stateRepeater), false, "create plugin sessions with buffer configs failed!");
127 } else { // without buffer configs
128 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater, stateRepeater),
129 false, "create plugin sessions without buffer configs failed!");
130 }
131 return true;
132 }
133
StartPluginSessions()134 bool ProfilerService::SessionContext::StartPluginSessions()
135 {
136 std::unique_lock<std::mutex> lock(sessionMutex);
137
138 // if dataRepeater exists, reset it to usable state.
139 if (dataRepeater) {
140 dataRepeater->Reset();
141 }
142
143 if (stateRepeater) {
144 stateRepeater->Reset();
145 }
146
147 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
148 uint32_t sampleDuration = sessionConfig.sample_duration();
149 if (sampleDuration > 0) {
150 traceFileWriter->SetTimeSource();
151 std::weak_ptr<SessionContext> weakCtx(shared_from_this());
152 // start offline trace timeout task
153 offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
154 [weakCtx]() {
155 if (auto ctx = weakCtx.lock(); ctx != nullptr) {
156 ctx->StopPluginSessions();
157 }
158 },
159 sampleDuration,
160 true);
161 // keep_alive_time not set by client, but the sample_duration setted
162 if (sessionConfig.keep_alive_time() == 0) {
163 // use sample_duration add a little time to set keep_alive_time
164 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
165 StartSessionExpireTask(service->removeTask_);
166 }
167 }
168 }
169
170 // start each plugin sessions
171 service->pluginSessionManager_->StartPluginSessions(pluginNames);
172 return true;
173 }
174
StopPluginSessions()175 bool ProfilerService::SessionContext::StopPluginSessions()
176 {
177 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
178 if (offlineScheduleTaskFd != -1) {
179 stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
180 offlineScheduleTaskFd = -1;
181 } else {
182 return true;
183 }
184 traceFileWriter->SetDurationTime();
185 }
186
187 // stop each plugin sessions
188 service->pluginSessionManager_->StopPluginSessions(pluginNames);
189 // stop epoll thread receiving shared memory messages
190 service->pluginService_->StopEpollThread();
191
192 // Read the remaining data of shared memory of all plugins.
193 for (auto& name : pluginNames) {
194 service->pluginService_->FlushAllData(name);
195 }
196 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
197 // update file header.
198 traceFileWriter->Finish();
199 }
200
201 // make sure FetchData thread exit
202 if (dataRepeater) {
203 dataRepeater->Close();
204 }
205
206 if (stateRepeater) {
207 stateRepeater->Close();
208 }
209 return true;
210 }
211
212 namespace {
IsValidKeepAliveTime(uint32_t timeout)213 bool IsValidKeepAliveTime(uint32_t timeout)
214 {
215 if (timeout < MIN_SESSION_TIMEOUT_MS) {
216 return false;
217 }
218 if (timeout > MAX_SESSION_TIMEOUT_MS) {
219 return false;
220 }
221 return true;
222 }
223 } // namespace
224
SetKeepAliveTime(uint32_t timeout)225 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
226 {
227 if (timeout > 0) {
228 sessionConfig.set_keep_alive_time(timeout);
229 }
230 }
231
StartSessionExpireTask(ScheduleTaskManager & task)232 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
233 {
234 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
235 timeoutScheduleTaskFd = task.ScheduleTask(
236 std::bind(&ProfilerService::RemoveSessionContext, service, id),
237 sessionConfig.keep_alive_time(), true);
238 }
239 }
240
StopSessionExpireTask(ScheduleTaskManager & task)241 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
242 {
243 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
244 task.UnscheduleTask(timeoutScheduleTaskFd);
245 timeoutScheduleTaskFd = -1;
246 }
247 }
248
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)249 Status ProfilerService::CreateSession(ServerContext* context,
250 const ::CreateSessionRequest* request,
251 ::CreateSessionResponse* response)
252 {
253 CHECK_REQUEST_RESPONSE(context, request, response);
254 CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
255 // check plugin configs
256 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
257 const int nConfigs = request->plugin_configs_size();
258 CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
259
260 // check buffer configs
261 std::shared_ptr<ProfilerSessionConfig> sessionConfig =
262 std::make_shared<ProfilerSessionConfig>(request->session_config());
263 const int nBuffers = sessionConfig->buffers_size();
264 CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
265 // copy plugin configs from request
266 std::vector<ProfilerPluginConfig> pluginConfigs;
267 pluginConfigs.reserve(nConfigs);
268
269 for (int i = 0; i < nConfigs; i++) {
270 pluginConfigs.push_back(request->plugin_configs(i));
271 }
272
273 if (pluginConfigs.empty()) {
274 PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
275 return Status(StatusCode::PERMISSION_DENIED, "");
276 }
277 // copy buffer configs
278 std::vector<BufferConfig> bufferConfigs;
279 if (nBuffers == 1) {
280 // if only one buffer config provided, all plugin use the same buffer config
281 bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
282 } else if (nBuffers > 0) {
283 // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
284 bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
285 }
286 PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
287 std::vector<std::string> pluginNames;
288 std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
289 [](ProfilerPluginConfig& config) { return config.name(); });
290 std::sort(pluginNames.begin(), pluginNames.end());
291 //set session configs
292 pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
293
294 // create TraceFileWriter for offline mode
295 TraceFileWriterPtr traceWriter;
296 std::shared_ptr<ProfilerDataRepeater<ProfilerPluginData>> dataRepeater = nullptr;
297 if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
298 auto resultFile = sessionConfig->result_file();
299 CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
300 traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
301 sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
302 CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
303 pluginService_->SetTraceWriter(traceWriter);
304 for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
305 ProfilerPluginData pluginData;
306 pluginData.set_name(pluginConfigs[i].name() + "_config");
307 pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
308 pluginData.set_data(pluginConfigs[i].config_data());
309 std::vector<char> msgData(pluginData.ByteSizeLong());
310 if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
311 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
312 }
313 traceWriter->SetPluginConfig(msgData.data(), msgData.size());
314 }
315 traceWriter->Flush();
316 } else {
317 dataRepeater = std::make_shared<ProfilerDataRepeater<ProfilerPluginData>>(DEFAULT_REPEATER_BUFFER_SIZE);
318 CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
319 }
320 ProfilerPluginState* state = response->add_plugin_status();
321 state->set_version(COMMON::STATE_VERSION);
322 // create session context
323 auto ctx = std::make_shared<SessionContext>();
324 CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
325
326 // fill fields of SessionContext
327 ctx->service = this;
328 if (dataRepeater != nullptr) {
329 ctx->dataRepeater = dataRepeater;
330 }
331 if (stateRepeater_ != nullptr) {
332 ctx->stateRepeater = stateRepeater_;
333 }
334 if (traceWriter != nullptr) {
335 ctx->traceFileWriter = traceWriter;
336 }
337 ctx->sessionConfig = *sessionConfig;
338 ctx->pluginNames = std::move(pluginNames);
339 ctx->pluginConfigs = std::move(pluginConfigs);
340 ctx->bufferConfigs = std::move(bufferConfigs);
341
342 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
343 if (stateRepeater_ == nullptr) {
344 stateRepeater_ = std::make_shared<ProfilerDataRepeater<ProfilerPluginState>>(DEFAULT_REPEATER_BUFFER_SIZE);
345 }
346 if (stateRepeater_ != nullptr) {
347 ctx->stateRepeater = stateRepeater_;
348 }
349 }
350 // create plugin sessions
351 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
352 // alloc new session id
353 uint32_t sessionId = ++sessionIdCounter_;
354 ctx->id = sessionId;
355 ctx->name = "session-" + std::to_string(sessionId);
356
357 // add {sessionId, ctx} to map
358 CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
359
360 // create session expire schedule task
361 auto keepAliveTime = sessionConfig->keep_alive_time();
362 if (keepAliveTime) {
363 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
364 // create schedule task for session timeout feature
365 ctx->SetKeepAliveTime(keepAliveTime);
366 ctx->StartSessionExpireTask(removeTask_);
367 }
368
369 // prepare response data fields
370 response->set_status(0);
371 response->set_session_id(sessionId);
372
373 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
374 return Status::OK;
375 }
376
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)377 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
378 {
379 std::unique_lock<std::mutex> lock(sessionContextMutex_);
380 CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
381 sessionContext_[sessionId] = sessionCtx;
382 return true;
383 }
384
CheckClientStatus()385 void ProfilerService::CheckClientStatus()
386 {
387 int pidVal = 0;
388 if (!COMMON::IsProcessExist(HIPROFILER_PLUGINS_NAME, pidVal)) {
389 ProfilerPluginState pluginState;
390 pluginState.set_version(COMMON::STATE_VERSION);
391 pluginState.set_event_detail("hiprofiler_plugins process is dead!");
392 pluginState.set_event(ProfilerPluginState::RUNNING_ERR);
393 CHECK_NOTNULL(stateRepeater_, NO_RETVAL, "cannot push event, stateRepeater is null!");
394 stateRepeater_->PutPluginData(std::make_shared<ProfilerPluginState>(pluginState));
395 if (heartbeatFd_ != -1) {
396 checkStatusManager_.UnscheduleTask(heartbeatFd_);
397 }
398 }
399 }
400
GetSessionContext(uint32_t sessionId) const401 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
402 {
403 std::unique_lock<std::mutex> lock(sessionContextMutex_);
404 auto it = sessionContext_.find(sessionId);
405 if (it != sessionContext_.end()) {
406 auto ptr = it->second;
407 return ptr;
408 }
409 return nullptr;
410 }
411
RemoveSessionContext(uint32_t sessionId)412 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
413 {
414 std::unique_lock<std::mutex> lock(sessionContextMutex_);
415 auto it = sessionContext_.find(sessionId);
416 if (it != sessionContext_.end()) {
417 auto ptr = it->second;
418 PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
419 sessionContext_.erase(it);
420 return true;
421 }
422 return false;
423 }
424
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)425 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
426 const std::string& outputFile, const std::string& pluginVersion)
427 {
428 if (pluginName.empty() || outputFile.empty()) {
429 PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
430 pluginName.c_str(), outputFile.c_str());
431 return;
432 }
433 auto retFile = COMMON::CheckNotExistsFilePath(outputFile);
434 if (!retFile.first) {
435 PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, outputFile.c_str());
436 return;
437 }
438 std::ifstream fsFile {}; // read from output file
439 fsFile.open(retFile.second, std::ios_base::in | std::ios_base::binary);
440 if (!fsFile.good()) {
441 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
442 return;
443 }
444 auto targetFile = COMMON::CheckNotExistsFilePath(resultFile);
445 if (!targetFile.first) {
446 PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, resultFile.c_str());
447 return;
448 }
449 std::ofstream fsTarget {}; // write to profiler ouput file
450 fsTarget.open(targetFile.second, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
451 if (!fsTarget.good()) {
452 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
453 return;
454 }
455 fsTarget.seekp(0, std::ios_base::end);
456 int posFile = fsTarget.tellp(); // for update sha256
457
458 TraceFileHeader header {};
459 if (pluginName == "hiperf-plugin") {
460 header.data_.dataType = DataType::HIPERF_DATA;
461 } else {
462 header.data_.dataType = DataType::STANDALONE_DATA;
463 }
464 fsFile.seekg(0, std::ios_base::end);
465 uint64_t fileSize = (uint64_t)(fsFile.tellg());
466 header.data_.length += fileSize;
467 size_t pluginSize = sizeof(header.data_.standalonePluginName);
468 int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
469 if (ret != EOK) {
470 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
471 return;
472 }
473 pluginSize = sizeof(header.data_.pluginVersion);
474 ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
475 if (ret != EOK) {
476 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
477 return;
478 }
479 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
480 if (!fsTarget.good()) {
481 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
482 return;
483 }
484
485 SHA256_CTX sha256Ctx;
486 SHA256_Init(&sha256Ctx);
487 constexpr uint64_t bufSize = 4 * 1024 * 1024;
488 std::vector<char> buf(bufSize);
489 uint64_t readSize = 0;
490 fsFile.seekg(0);
491 while ((readSize = std::min(bufSize, fileSize)) > 0) {
492 fsFile.read(buf.data(), readSize);
493 fsTarget.write(buf.data(), readSize);
494 if (!fsTarget.good()) {
495 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
496 return;
497 }
498 fileSize -= readSize;
499
500 SHA256_Update(&sha256Ctx, buf.data(), readSize);
501 }
502 SHA256_Final(header.data_.sha256, &sha256Ctx);
503 fsTarget.seekp(posFile, std::ios_base::beg);
504 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
505
506 fsFile.close();
507 fsTarget.close();
508
509 PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
510 }
511
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)512 Status ProfilerService::StartSession(ServerContext* context,
513 const ::StartSessionRequest* request,
514 ::StartSessionResponse* response)
515 {
516 CHECK_REQUEST_RESPONSE(context, request, response);
517
518 uint32_t sessionId = request->session_id();
519 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
520
521 // copy plugin configs from request
522 std::vector<ProfilerPluginConfig> newPluginConfigs;
523 newPluginConfigs.reserve(request->update_configs_size());
524 for (int i = 0; i < request->update_configs_size(); i++) {
525 PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
526 newPluginConfigs.push_back(request->update_configs(i));
527 }
528
529 // get plugin names in request
530 std::vector<std::string> requestNames;
531 std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
532 [](auto& config) { return config.name(); });
533 std::sort(requestNames.begin(), requestNames.end());
534
535 auto ctx = GetSessionContext(sessionId);
536 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
537
538 // start epoll thread to receive shared memory messages.
539 CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
540
541 // get intersection set of requestNames and pluginNames
542 std::vector<std::string> updateNames;
543 std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
544 std::back_inserter(updateNames));
545
546 if (updateNames.size() > 0) {
547 // remove old plugin sessions
548 pluginSessionManager_->RemovePluginSessions(updateNames);
549
550 // update plugin configs
551 size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
552
553 // re-create plugin sessions
554 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
555 PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
556 }
557
558 // start plugin sessions with configs
559 CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
560 ProfilerPluginState* state = response->add_plugin_status();
561 state->set_version(COMMON::STATE_VERSION);
562 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
563 if (heartbeatFd_ == -1) {
564 auto callback = [this] { this->CheckClientStatus(); };
565 heartbeatFd_ = checkStatusManager_.ScheduleTask(callback, CHECK_HEARTBEAT_INTERVAL);
566 if (heartbeatFd_ == -1) {
567 PROFILER_LOG_INFO(LOG_CORE, "schedule task failed for heartbeat check");
568 }
569 }
570 return Status::OK;
571 }
572
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)573 Status ProfilerService::FetchData(ServerContext* context,
574 const ::FetchDataRequest* request,
575 ServerWriter<::FetchDataResponse>* writer)
576 {
577 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
578 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
579 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
580
581 CHECK_POINTER_NOTNULL(request, "request invalid!");
582 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
583
584 uint32_t sessionId = request->session_id();
585 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
586
587 auto ctx = GetSessionContext(sessionId);
588 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
589
590 // check each plugin session states
591 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
592 "session status invalid!");
593
594 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
595 auto dataRepeater = ctx->dataRepeater;
596 CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
597
598 while (1) {
599 ctx = GetSessionContext(sessionId);
600 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
601
602 FetchDataResponse response;
603 response.set_status(StatusCode::OK);
604 response.set_response_id(++responseIdCounter_);
605
606 std::vector<ProfilerPluginDataPtr> pluginDataVec;
607 int count = dataRepeater->TakePluginData(pluginDataVec);
608 if (count > 0) {
609 response.set_has_more(true);
610 for (int i = 0; i < count; i++) {
611 auto data = response.add_plugin_data();
612 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
613 CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
614 *data = *pluginDataVec[i];
615 }
616 } else {
617 response.set_has_more(false);
618 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
619 }
620
621 bool sendSuccess = writer->Write(response);
622 if (count <= 0 || !sendSuccess) {
623 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
624 break;
625 }
626 }
627 }
628
629 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
630 return Status::OK;
631 }
632
SubscribeProfilerEvt(ServerContext * context,const::SubscribeProfilerEvtRequest * request,ServerWriter<::SubscribeProfilerEvtResponse> * writer)633 Status ProfilerService::SubscribeProfilerEvt(ServerContext* context,
634 const ::SubscribeProfilerEvtRequest* request,
635 ServerWriter<::SubscribeProfilerEvtResponse>* writer)
636 {
637 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
638 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
639 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
640
641 CHECK_POINTER_NOTNULL(request, "request invalid!");
642 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
643
644 uint32_t sessionId = request->session_id();
645 PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u start", request->request_id(), sessionId);
646
647 auto ctx = GetSessionContext(sessionId);
648 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
649
650 // check each plugin session states
651 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
652 "session status invalid!");
653
654 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
655 auto stateRepeater = ctx->stateRepeater;
656 CHECK_POINTER_NOTNULL(stateRepeater, "state repeater invalid!");
657
658 while (1) {
659 ctx = GetSessionContext(sessionId);
660 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
661
662 SubscribeProfilerEvtResponse response;
663 response.set_status(StatusCode::OK);
664 response.set_response_id(++responseIdCounter_);
665
666 std::vector<ProfilerPluginStatePtr> pluginStateVec;
667 int count = stateRepeater->TakePluginData(pluginStateVec);
668 if (count > 0) {
669 response.set_has_more(true);
670 for (int i = 0; i < count; i++) {
671 auto data = response.add_plugin_state();
672 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
673 CHECK_POINTER_NOTNULL(pluginStateVec[i], "plugin state invalid");
674 *data = *pluginStateVec[i];
675 }
676 } else {
677 response.set_has_more(false);
678 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
679 }
680
681 bool sendSuccess = writer->Write(response);
682 if (count <= 0 || !sendSuccess) {
683 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
684 break;
685 }
686 }
687 }
688
689 PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u done!", request->request_id(), sessionId);
690 return Status::OK;
691 }
692
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)693 Status ProfilerService::StopSession(ServerContext* context,
694 const ::StopSessionRequest* request,
695 ::StopSessionResponse* response)
696 {
697 #ifdef PERFORMANCE_DEBUG
698 struct timespec start = {};
699 clock_gettime(CLOCK_REALTIME, &start);
700 #endif
701 CHECK_REQUEST_RESPONSE(context, request, response);
702 uint32_t sessionId = request->session_id();
703 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
704
705 auto ctx = GetSessionContext(sessionId);
706 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
707 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
708 CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
709 ctx->traceFileWriter.get()->SetStopSplitFile(true);
710 }
711 CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
712 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
713 #ifdef PERFORMANCE_DEBUG
714 struct timespec end = {};
715 clock_gettime(CLOCK_REALTIME, &end);
716 uint64_t costTime = (end.tv_sec - start.tv_sec) * S_TO_NS + (end.tv_nsec - start.tv_nsec);
717 PROFILER_LOG_INFO(LOG_CORE, "StopSession cost time %" PRIu64 " ns", costTime);
718 #endif
719 return Status::OK;
720 }
721
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)722 Status ProfilerService::DestroySession(ServerContext* context,
723 const ::DestroySessionRequest* request,
724 ::DestroySessionResponse* response)
725 {
726 CHECK_REQUEST_RESPONSE(context, request, response);
727
728 uint32_t sessionId = request->session_id();
729 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
730
731 auto ctx = GetSessionContext(sessionId);
732 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
733
734 CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
735 CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
736 "remove plugin session FAILED!");
737 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
738
739 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
740 uint32_t pluginId = 0;
741 PluginContextPtr pluginCtx = nullptr;
742 for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
743 auto pluginName = ctx->pluginNames[i];
744 std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
745 if (pluginCtx->isStandaloneFileData == true) {
746 if (!ctx->sessionConfig.split_file()) {
747 MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
748 pluginCtx->pluginVersion);
749 }
750 }
751 }
752 }
753
754 return Status::OK;
755 }
756
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)757 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
758 const ::KeepSessionRequest* request,
759 ::KeepSessionResponse* response)
760 {
761 CHECK_REQUEST_RESPONSE(context, request, response);
762 uint32_t sessionId = request->session_id();
763 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
764
765 auto ctx = GetSessionContext(sessionId);
766 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
767
768 // update keep alive time if keep_alive_time parameter provided
769 auto keepAliveTime = request->keep_alive_time();
770 if (keepAliveTime) {
771 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
772 ctx->SetKeepAliveTime(keepAliveTime);
773 }
774
775 // reschedule session timeout task
776 if (ctx->sessionConfig.keep_alive_time() > 0) {
777 ctx->StopSessionExpireTask(removeTask_);
778 ctx->StartSessionExpireTask(removeTask_);
779 }
780 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
781 return Status::OK;
782 }
783
784 struct LoggingInterceptor : public grpc::experimental::Interceptor {
785 public:
LoggingInterceptorLoggingInterceptor786 explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
787
InterceptLoggingInterceptor788 void Intercept(experimental::InterceptorBatchMethods* methods) override
789 {
790 const char* method = info_->method();
791 if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
792 PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
793 } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
794 PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
795 }
796 methods->Proceed();
797 }
798
799 private:
800 grpc::experimental::ServerRpcInfo* info_ = nullptr;
801 };
802
803 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
804 protected:
CreateServerInterceptorInterceptorFactory805 grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
806 {
807 return new LoggingInterceptor(info);
808 }
809 };
810
StartService(const std::string & listenUri)811 bool ProfilerService::StartService(const std::string& listenUri)
812 {
813 CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
814
815 ServerBuilder builder;
816 builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
817 builder.RegisterService(this);
818
819 auto server = builder.BuildAndStart();
820 CHECK_NOTNULL(server, false, "start service failed!");
821 PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
822 server_ = std::move(server);
823 return true;
824 }
825
WaitServiceDone()826 void ProfilerService::WaitServiceDone()
827 {
828 if (server_) {
829 PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
830 server_->Wait();
831 PROFILER_LOG_INFO(LOG_CORE, "Server done!");
832 }
833 }
834
StopService()835 void ProfilerService::StopService()
836 {
837 if (server_) {
838 server_->Shutdown();
839 PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
840 }
841 }