1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 #include "common.h"
21 #include "logging.h"
22 #include "native_hook_config.pb.h"
23 #include "plugin_service.h"
24 #include "plugin_session.h"
25 #include "plugin_session_manager.h"
26 #include "profiler_capability_manager.h"
27 #include "profiler_data_repeater.h"
28 #include "trace_file_writer.h"
29 #include <cinttypes>
30
31 using namespace ::grpc;
32 using PluginContextPtr = std::shared_ptr<PluginContext>;
33
34 #define CHECK_REQUEST_RESPONSE(context, request, response) \
35 do { \
36 CHECK_POINTER_NOTNULL(context, "context ptr invalid!"); \
37 CHECK_POINTER_NOTNULL(request, "request ptr invalid!"); \
38 CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
39 } while (0)
40
41 #define CHECK_POINTER_NOTNULL(ptr, errorMessage) \
42 do { \
43 if ((ptr) == nullptr) { \
44 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
45 return {StatusCode::INTERNAL, errorMessage}; \
46 } \
47 } while (0)
48
49 #define CHECK_EXPRESSION_TRUE(expr, errorMessage) \
50 do { \
51 if (!(expr)) { \
52 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
53 return {StatusCode::INTERNAL, (errorMessage)}; \
54 } \
55 } while (0)
56
57 namespace {
58 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
59 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
60 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
61 constexpr uint64_t CHECK_HEARTBEAT_INTERVAL = 500;
62 constexpr long BYTE_PER_KB = 1024;
63 const std::string HIPROFILER_PLUGINS_NAME("hiprofiler_plugins");
64 #ifdef PERFORMANCE_DEBUG
65 constexpr uint64_t S_TO_NS = 1000 * 1000 * 1000;
66 #endif
67 } // namespace
68
ProfilerService(const PluginServicePtr & pluginService)69 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
70 : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
71 {
72 pluginService_->SetPluginSessionManager(pluginSessionManager_);
73 }
74
~ProfilerService()75 ProfilerService::~ProfilerService() {}
76
~SessionContext()77 ProfilerService::SessionContext::~SessionContext()
78 {
79 PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
80 if (offlineScheduleTaskFd != -1) {
81 stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
82 }
83 StopSessionExpireTask(service->removeTask_);
84 service->pluginSessionManager_->RemovePluginSessions(pluginNames);
85 }
86
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)87 Status ProfilerService::GetCapabilities(ServerContext* context,
88 const ::GetCapabilitiesRequest* request,
89 ::GetCapabilitiesResponse* response)
90 {
91 CHECK_REQUEST_RESPONSE(context, request, response);
92 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
93
94 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
95 std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
96
97 response->set_status(StatusCode::OK);
98 for (size_t i = 0; i < capabilities.size(); i++) {
99 *response->add_capabilities() = capabilities[i];
100 }
101 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
102 return Status::OK;
103 }
104
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)105 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
106 {
107 std::unordered_map<std::string, size_t> nameIndex;
108 for (size_t i = 0; i < pluginConfigs.size(); i++) {
109 nameIndex[pluginConfigs[i].name()] = i;
110 }
111
112 size_t updateCount = 0;
113 for (auto& cfg : newPluginConfigs) {
114 auto it = nameIndex.find(cfg.name());
115 if (it != nameIndex.end()) {
116 size_t index = it->second;
117 pluginConfigs[index] = cfg;
118 updateCount++;
119 }
120 }
121 return updateCount;
122 }
123
CreatePluginSessions()124 bool ProfilerService::SessionContext::CreatePluginSessions()
125 {
126 if (bufferConfigs.size() > 0) { // with buffer configs
127 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater,
128 stateRepeater), false, "create plugin sessions with buffer configs failed!");
129 } else { // without buffer configs
130 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater, stateRepeater),
131 false, "create plugin sessions without buffer configs failed!");
132 }
133 return true;
134 }
135
StartPluginSessions()136 bool ProfilerService::SessionContext::StartPluginSessions()
137 {
138 std::unique_lock<std::mutex> lock(sessionMutex);
139
140 // if dataRepeater exists, reset it to usable state.
141 if (dataRepeater) {
142 dataRepeater->Reset();
143 }
144
145 if (stateRepeater) {
146 stateRepeater->Reset();
147 }
148
149 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
150 uint32_t sampleDuration = sessionConfig.sample_duration();
151 if (sampleDuration > 0) {
152 traceFileWriter->SetTimeSource();
153 std::weak_ptr<SessionContext> weakCtx(shared_from_this());
154 // start offline trace timeout task
155 offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
156 [weakCtx]() {
157 if (auto ctx = weakCtx.lock(); ctx != nullptr) {
158 ctx->StopPluginSessions();
159 }
160 },
161 sampleDuration,
162 true);
163 // keep_alive_time not set by client, but the sample_duration setted
164 if (sessionConfig.keep_alive_time() == 0) {
165 // use sample_duration add a little time to set keep_alive_time
166 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
167 StartSessionExpireTask(service->removeTask_);
168 }
169 }
170 }
171
172 // start each plugin sessions
173 service->pluginSessionManager_->StartPluginSessions(pluginNames);
174 return true;
175 }
176
StopPluginSessions()177 bool ProfilerService::SessionContext::StopPluginSessions()
178 {
179 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
180 if (offlineScheduleTaskFd != -1) {
181 stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
182 offlineScheduleTaskFd = -1;
183 } else {
184 return true;
185 }
186 traceFileWriter->SetDurationTime();
187 }
188
189 // stop each plugin sessions
190 service->pluginSessionManager_->StopPluginSessions(pluginNames);
191 // stop epoll thread receiving shared memory messages
192 service->pluginService_->StopEpollThread();
193
194 // Read the remaining data of shared memory of all plugins.
195 for (auto& name : pluginNames) {
196 service->pluginService_->FlushAllData(name);
197 }
198 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
199 // update file header.
200 traceFileWriter->Finish();
201 }
202
203 // make sure FetchData thread exit
204 if (dataRepeater) {
205 dataRepeater->Close();
206 }
207
208 if (stateRepeater) {
209 stateRepeater->Close();
210 }
211 return true;
212 }
213
214 namespace {
IsValidKeepAliveTime(uint32_t timeout)215 bool IsValidKeepAliveTime(uint32_t timeout)
216 {
217 if (timeout < MIN_SESSION_TIMEOUT_MS) {
218 return false;
219 }
220 if (timeout > MAX_SESSION_TIMEOUT_MS) {
221 return false;
222 }
223 return true;
224 }
225 } // namespace
226
SetKeepAliveTime(uint32_t timeout)227 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
228 {
229 if (timeout > 0) {
230 sessionConfig.set_keep_alive_time(timeout);
231 }
232 }
233
StartSessionExpireTask(ScheduleTaskManager & task)234 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
235 {
236 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
237 timeoutScheduleTaskFd = task.ScheduleTask(
238 std::bind(&ProfilerService::RemoveSessionContext, service, id),
239 sessionConfig.keep_alive_time(), true);
240 }
241 }
242
StopSessionExpireTask(ScheduleTaskManager & task)243 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
244 {
245 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
246 task.UnscheduleTask(timeoutScheduleTaskFd);
247 timeoutScheduleTaskFd = -1;
248 }
249 }
250
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)251 Status ProfilerService::CreateSession(ServerContext* context,
252 const ::CreateSessionRequest* request,
253 ::CreateSessionResponse* response)
254 {
255 CHECK_REQUEST_RESPONSE(context, request, response);
256 CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
257 // check plugin configs
258 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
259 const int nConfigs = request->plugin_configs_size();
260 CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
261
262 // check buffer configs
263 std::shared_ptr<ProfilerSessionConfig> sessionConfig =
264 std::make_shared<ProfilerSessionConfig>(request->session_config());
265 const int nBuffers = sessionConfig->buffers_size();
266 CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
267 // copy plugin configs from request
268 std::vector<ProfilerPluginConfig> pluginConfigs;
269 pluginConfigs.reserve(nConfigs);
270
271 for (int i = 0; i < nConfigs; i++) {
272 pluginConfigs.push_back(request->plugin_configs(i));
273 }
274
275 if (pluginConfigs.empty()) {
276 PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
277 return Status(StatusCode::PERMISSION_DENIED, "");
278 }
279 // copy buffer configs
280 std::vector<BufferConfig> bufferConfigs;
281 if (nBuffers == 1) {
282 // if only one buffer config provided, all plugin use the same buffer config
283 bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
284 } else if (nBuffers > 0) {
285 // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
286 bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
287 }
288 PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
289 std::vector<std::string> pluginNames;
290 std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
291 [](ProfilerPluginConfig& config) { return config.name(); });
292 std::sort(pluginNames.begin(), pluginNames.end());
293 //set session configs
294 pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
295
296 // create TraceFileWriter for offline mode
297 TraceFileWriterPtr traceWriter;
298 std::shared_ptr<ProfilerDataRepeater<ProfilerPluginData>> dataRepeater = nullptr;
299 if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
300 auto resultFile = sessionConfig->result_file();
301 CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
302 traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
303 sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
304 CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
305 pluginService_->SetTraceWriter(traceWriter);
306 for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
307 ProfilerPluginData pluginData;
308 pluginData.set_name(pluginConfigs[i].name() + "_config");
309 pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
310 pluginData.set_data(pluginConfigs[i].config_data());
311 std::vector<char> msgData(pluginData.ByteSizeLong());
312 if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
313 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
314 }
315 traceWriter->SetPluginConfig(msgData.data(), msgData.size());
316 }
317 traceWriter->Flush();
318 } else {
319 dataRepeater = std::make_shared<ProfilerDataRepeater<ProfilerPluginData>>(DEFAULT_REPEATER_BUFFER_SIZE);
320 CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
321 }
322 ProfilerPluginState* state = response->add_plugin_status();
323 state->set_version(COMMON::STATE_VERSION);
324 // create session context
325 auto ctx = std::make_shared<SessionContext>();
326 CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
327
328 // fill fields of SessionContext
329 ctx->service = this;
330 if (dataRepeater != nullptr) {
331 ctx->dataRepeater = dataRepeater;
332 }
333 if (stateRepeater_ != nullptr) {
334 ctx->stateRepeater = stateRepeater_;
335 }
336 if (traceWriter != nullptr) {
337 ctx->traceFileWriter = traceWriter;
338 }
339 ctx->sessionConfig = *sessionConfig;
340 ctx->pluginNames = std::move(pluginNames);
341 ctx->pluginConfigs = std::move(pluginConfigs);
342 ctx->bufferConfigs = std::move(bufferConfigs);
343
344 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
345 if (stateRepeater_ == nullptr) {
346 stateRepeater_ = std::make_shared<ProfilerDataRepeater<ProfilerPluginState>>(DEFAULT_REPEATER_BUFFER_SIZE);
347 }
348 if (stateRepeater_ != nullptr) {
349 ctx->stateRepeater = stateRepeater_;
350 }
351 }
352 // create plugin sessions
353 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
354 // alloc new session id
355 uint32_t sessionId = ++sessionIdCounter_;
356 ctx->id = sessionId;
357 ctx->name = "session-" + std::to_string(sessionId);
358
359 // add {sessionId, ctx} to map
360 CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
361
362 // create session expire schedule task
363 auto keepAliveTime = sessionConfig->keep_alive_time();
364 if (keepAliveTime) {
365 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
366 // create schedule task for session timeout feature
367 ctx->SetKeepAliveTime(keepAliveTime);
368 ctx->StartSessionExpireTask(removeTask_);
369 }
370
371 // prepare response data fields
372 response->set_status(0);
373 response->set_session_id(sessionId);
374
375 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
376 return Status::OK;
377 }
378
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)379 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
380 {
381 std::unique_lock<std::mutex> lock(sessionContextMutex_);
382 CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
383 sessionContext_[sessionId] = sessionCtx;
384 return true;
385 }
386
CheckClientStatus()387 void ProfilerService::CheckClientStatus()
388 {
389 int pidVal = 0;
390 if (!COMMON::IsProcessExist(HIPROFILER_PLUGINS_NAME, pidVal)) {
391 ProfilerPluginState pluginState;
392 pluginState.set_version(COMMON::STATE_VERSION);
393 pluginState.set_event_detail("hiprofiler_plugins process is dead!");
394 pluginState.set_event(ProfilerPluginState::RUNNING_ERR);
395 CHECK_NOTNULL(stateRepeater_, NO_RETVAL, "cannot push event, stateRepeater is null!");
396 stateRepeater_->PutPluginData(std::make_shared<ProfilerPluginState>(pluginState));
397 if (heartbeatFd_ != -1) {
398 checkStatusManager_.UnscheduleTask(heartbeatFd_);
399 }
400 }
401 }
402
GetSessionContext(uint32_t sessionId) const403 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
404 {
405 std::unique_lock<std::mutex> lock(sessionContextMutex_);
406 auto it = sessionContext_.find(sessionId);
407 if (it != sessionContext_.end()) {
408 auto ptr = it->second;
409 return ptr;
410 }
411 return nullptr;
412 }
413
RemoveSessionContext(uint32_t sessionId)414 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
415 {
416 std::unique_lock<std::mutex> lock(sessionContextMutex_);
417 auto it = sessionContext_.find(sessionId);
418 if (it != sessionContext_.end()) {
419 auto ptr = it->second;
420 PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
421 sessionContext_.erase(it);
422 return true;
423 }
424 return false;
425 }
426
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)427 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
428 const std::string& outputFile, const std::string& pluginVersion)
429 {
430 if (pluginName.empty() || outputFile.empty()) {
431 PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
432 pluginName.c_str(), outputFile.c_str());
433 return;
434 }
435 auto retFile = COMMON::CheckNotExistsFilePath(outputFile);
436 if (!retFile.first) {
437 PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, outputFile.c_str());
438 return;
439 }
440 std::ifstream fsFile {}; // read from output file
441 fsFile.open(retFile.second, std::ios_base::in | std::ios_base::binary);
442 if (!fsFile.good()) {
443 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
444 return;
445 }
446 auto targetFile = COMMON::CheckNotExistsFilePath(resultFile);
447 if (!targetFile.first) {
448 PROFILER_LOG_INFO(LOG_CORE, "%s:check file path %s fail", __func__, resultFile.c_str());
449 return;
450 }
451 std::ofstream fsTarget {}; // write to profiler ouput file
452 fsTarget.open(targetFile.second, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
453 if (!fsTarget.good()) {
454 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
455 return;
456 }
457 fsTarget.seekp(0, std::ios_base::end);
458 int posFile = fsTarget.tellp(); // for update sha256
459
460 TraceFileHeader header {};
461 if (pluginName == "hiperf-plugin") {
462 header.data_.dataType = DataType::HIPERF_DATA;
463 } else {
464 header.data_.dataType = DataType::STANDALONE_DATA;
465 }
466 fsFile.seekg(0, std::ios_base::end);
467 uint64_t fileSize = (uint64_t)(fsFile.tellg());
468 header.data_.length += fileSize;
469 size_t pluginSize = sizeof(header.data_.standalonePluginName);
470 int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
471 if (ret != EOK) {
472 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
473 return;
474 }
475 pluginSize = sizeof(header.data_.pluginVersion);
476 ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
477 if (ret != EOK) {
478 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
479 return;
480 }
481 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
482 if (!fsTarget.good()) {
483 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
484 return;
485 }
486
487 SHA256_CTX sha256Ctx;
488 SHA256_Init(&sha256Ctx);
489 constexpr uint64_t bufSize = 4 * 1024 * 1024;
490 std::vector<char> buf(bufSize);
491 uint64_t readSize = 0;
492 fsFile.seekg(0);
493 while ((readSize = std::min(bufSize, fileSize)) > 0) {
494 fsFile.read(buf.data(), readSize);
495 fsTarget.write(buf.data(), readSize);
496 if (!fsTarget.good()) {
497 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
498 return;
499 }
500 fileSize -= readSize;
501
502 SHA256_Update(&sha256Ctx, buf.data(), readSize);
503 }
504 SHA256_Final(header.data_.sha256, &sha256Ctx);
505 fsTarget.seekp(posFile, std::ios_base::beg);
506 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
507
508 fsFile.close();
509 fsTarget.close();
510
511 PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
512 }
513
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)514 Status ProfilerService::StartSession(ServerContext* context,
515 const ::StartSessionRequest* request,
516 ::StartSessionResponse* response)
517 {
518 CHECK_REQUEST_RESPONSE(context, request, response);
519
520 uint32_t sessionId = request->session_id();
521 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
522
523 // copy plugin configs from request
524 std::vector<ProfilerPluginConfig> newPluginConfigs;
525 newPluginConfigs.reserve(request->update_configs_size());
526 for (int i = 0; i < request->update_configs_size(); i++) {
527 PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
528 newPluginConfigs.push_back(request->update_configs(i));
529 }
530
531 // get plugin names in request
532 std::vector<std::string> requestNames;
533 std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
534 [](auto& config) { return config.name(); });
535 std::sort(requestNames.begin(), requestNames.end());
536
537 auto ctx = GetSessionContext(sessionId);
538 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
539
540 // start epoll thread to receive shared memory messages.
541 CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
542
543 // get intersection set of requestNames and pluginNames
544 std::vector<std::string> updateNames;
545 std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
546 std::back_inserter(updateNames));
547
548 if (updateNames.size() > 0) {
549 // remove old plugin sessions
550 pluginSessionManager_->RemovePluginSessions(updateNames);
551
552 // update plugin configs
553 size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
554
555 // re-create plugin sessions
556 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
557 PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
558 }
559
560 // start plugin sessions with configs
561 CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
562 ProfilerPluginState* state = response->add_plugin_status();
563 state->set_version(COMMON::STATE_VERSION);
564 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
565 if (heartbeatFd_ == -1) {
566 auto callback = [this] { this->CheckClientStatus(); };
567 heartbeatFd_ = checkStatusManager_.ScheduleTask(callback, CHECK_HEARTBEAT_INTERVAL);
568 if (heartbeatFd_ == -1) {
569 PROFILER_LOG_INFO(LOG_CORE, "schedule task failed for heartbeat check");
570 }
571 }
572 return Status::OK;
573 }
574
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)575 Status ProfilerService::FetchData(ServerContext* context,
576 const ::FetchDataRequest* request,
577 ServerWriter<::FetchDataResponse>* writer)
578 {
579 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
580 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
581 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
582
583 CHECK_POINTER_NOTNULL(request, "request invalid!");
584 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
585
586 uint32_t sessionId = request->session_id();
587 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
588
589 auto ctx = GetSessionContext(sessionId);
590 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
591
592 // check each plugin session states
593 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
594 "session status invalid!");
595
596 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
597 auto dataRepeater = ctx->dataRepeater;
598 CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
599
600 while (1) {
601 ctx = GetSessionContext(sessionId);
602 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
603
604 FetchDataResponse response;
605 response.set_status(StatusCode::OK);
606 response.set_response_id(++responseIdCounter_);
607
608 std::vector<ProfilerPluginDataPtr> pluginDataVec;
609 int count = dataRepeater->TakePluginData(pluginDataVec);
610 if (count > 0) {
611 response.set_has_more(true);
612 for (int i = 0; i < count; i++) {
613 auto data = response.add_plugin_data();
614 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
615 CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
616 *data = *pluginDataVec[i];
617 }
618 } else {
619 response.set_has_more(false);
620 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
621 }
622
623 bool sendSuccess = writer->Write(response);
624 if (count <= 0 || !sendSuccess) {
625 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
626 break;
627 }
628 }
629 }
630
631 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
632 return Status::OK;
633 }
634
SubscribeProfilerEvt(ServerContext * context,const::SubscribeProfilerEvtRequest * request,ServerWriter<::SubscribeProfilerEvtResponse> * writer)635 Status ProfilerService::SubscribeProfilerEvt(ServerContext* context,
636 const ::SubscribeProfilerEvtRequest* request,
637 ServerWriter<::SubscribeProfilerEvtResponse>* writer)
638 {
639 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
640 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
641 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
642
643 CHECK_POINTER_NOTNULL(request, "request invalid!");
644 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
645
646 uint32_t sessionId = request->session_id();
647 PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u start", request->request_id(), sessionId);
648
649 auto ctx = GetSessionContext(sessionId);
650 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
651
652 // check each plugin session states
653 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
654 "session status invalid!");
655
656 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
657 auto stateRepeater = ctx->stateRepeater;
658 CHECK_POINTER_NOTNULL(stateRepeater, "state repeater invalid!");
659
660 while (1) {
661 ctx = GetSessionContext(sessionId);
662 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
663
664 SubscribeProfilerEvtResponse response;
665 response.set_status(StatusCode::OK);
666 response.set_response_id(++responseIdCounter_);
667
668 std::vector<ProfilerPluginStatePtr> pluginStateVec;
669 int count = stateRepeater->TakePluginData(pluginStateVec);
670 if (count > 0) {
671 response.set_has_more(true);
672 for (int i = 0; i < count; i++) {
673 auto data = response.add_plugin_state();
674 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
675 CHECK_POINTER_NOTNULL(pluginStateVec[i], "plugin state invalid");
676 *data = *pluginStateVec[i];
677 }
678 } else {
679 response.set_has_more(false);
680 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
681 }
682
683 bool sendSuccess = writer->Write(response);
684 if (count <= 0 || !sendSuccess) {
685 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
686 break;
687 }
688 }
689 }
690
691 PROFILER_LOG_INFO(LOG_CORE, "SubscribeProfilerEvt %d %u done!", request->request_id(), sessionId);
692 return Status::OK;
693 }
694
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)695 Status ProfilerService::StopSession(ServerContext* context,
696 const ::StopSessionRequest* request,
697 ::StopSessionResponse* response)
698 {
699 #ifdef PERFORMANCE_DEBUG
700 struct timespec start = {};
701 clock_gettime(CLOCK_REALTIME, &start);
702 #endif
703 CHECK_REQUEST_RESPONSE(context, request, response);
704 uint32_t sessionId = request->session_id();
705 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
706
707 auto ctx = GetSessionContext(sessionId);
708 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
709 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
710 CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
711 ctx->traceFileWriter.get()->SetStopSplitFile(true);
712 }
713 CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
714 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
715 #ifdef PERFORMANCE_DEBUG
716 struct timespec end = {};
717 clock_gettime(CLOCK_REALTIME, &end);
718 uint64_t costTime = (end.tv_sec - start.tv_sec) * S_TO_NS + (end.tv_nsec - start.tv_nsec);
719 PROFILER_LOG_INFO(LOG_CORE, "StopSession cost time %" PRIu64 " ns", costTime);
720 #endif
721 return Status::OK;
722 }
723
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)724 Status ProfilerService::DestroySession(ServerContext* context,
725 const ::DestroySessionRequest* request,
726 ::DestroySessionResponse* response)
727 {
728 CHECK_REQUEST_RESPONSE(context, request, response);
729
730 uint32_t sessionId = request->session_id();
731 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
732
733 auto ctx = GetSessionContext(sessionId);
734 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
735
736 CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
737 CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
738 "remove plugin session FAILED!");
739 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
740
741 struct stat fileInfo;
742 if (stat(ctx->sessionConfig.result_file().c_str(), &fileInfo) == 0) {
743 long result = static_cast<long>(fileInfo.st_size / BYTE_PER_KB);
744 PROFILER_LOG_INFO(LOG_CORE, "result file %s size is %ld KB", ctx->sessionConfig.result_file().c_str(), result);
745 }
746
747 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
748 uint32_t pluginId = 0;
749 PluginContextPtr pluginCtx = nullptr;
750 for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
751 auto pluginName = ctx->pluginNames[i];
752 std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
753 if (pluginCtx->isStandaloneFileData == true) {
754 if (!ctx->sessionConfig.split_file()) {
755 MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
756 pluginCtx->pluginVersion);
757 }
758 }
759 }
760 }
761
762 return Status::OK;
763 }
764
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)765 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
766 const ::KeepSessionRequest* request,
767 ::KeepSessionResponse* response)
768 {
769 CHECK_REQUEST_RESPONSE(context, request, response);
770 uint32_t sessionId = request->session_id();
771 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
772
773 auto ctx = GetSessionContext(sessionId);
774 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
775
776 // update keep alive time if keep_alive_time parameter provided
777 auto keepAliveTime = request->keep_alive_time();
778 if (keepAliveTime) {
779 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
780 ctx->SetKeepAliveTime(keepAliveTime);
781 }
782
783 // reschedule session timeout task
784 if (ctx->sessionConfig.keep_alive_time() > 0) {
785 ctx->StopSessionExpireTask(removeTask_);
786 ctx->StartSessionExpireTask(removeTask_);
787 }
788 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
789 return Status::OK;
790 }
791
792 struct LoggingInterceptor : public grpc::experimental::Interceptor {
793 public:
LoggingInterceptorLoggingInterceptor794 explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
795
InterceptLoggingInterceptor796 void Intercept(experimental::InterceptorBatchMethods* methods) override
797 {
798 const char* method = info_->method();
799 if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
800 PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
801 } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
802 PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
803 }
804 methods->Proceed();
805 }
806
807 private:
808 grpc::experimental::ServerRpcInfo* info_ = nullptr;
809 };
810
811 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
812 protected:
CreateServerInterceptorInterceptorFactory813 grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
814 {
815 return new LoggingInterceptor(info);
816 }
817 };
818
StartService(const std::string & listenUri)819 bool ProfilerService::StartService(const std::string& listenUri)
820 {
821 CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
822
823 ServerBuilder builder;
824 builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
825 builder.RegisterService(this);
826
827 auto server = builder.BuildAndStart();
828 CHECK_NOTNULL(server, false, "start service failed!");
829 PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
830 server_ = std::move(server);
831 return true;
832 }
833
WaitServiceDone()834 void ProfilerService::WaitServiceDone()
835 {
836 if (server_) {
837 PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
838 server_->Wait();
839 PROFILER_LOG_INFO(LOG_CORE, "Server done!");
840 }
841 }
842
StopService()843 void ProfilerService::StopService()
844 {
845 if (server_) {
846 server_->Shutdown();
847 PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
848 }
849 }