1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "trace_file_writer.h"
28 #include <cinttypes>
29
30 using namespace ::grpc;
31 using PluginContextPtr = std::shared_ptr<PluginContext>;
32
33 #define CHECK_REQUEST_RESPONSE(context, request, response) \
34 do { \
35 CHECK_POINTER_NOTNULL(context, "context ptr invalid!"); \
36 CHECK_POINTER_NOTNULL(request, "request ptr invalid!"); \
37 CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
38 } while (0)
39
40 #define CHECK_POINTER_NOTNULL(ptr, errorMessage) \
41 do { \
42 if ((ptr) == nullptr) { \
43 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
44 return {StatusCode::INTERNAL, errorMessage}; \
45 } \
46 } while (0)
47
48 #define CHECK_EXPRESSION_TRUE(expr, errorMessage) \
49 do { \
50 if (!(expr)) { \
51 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
52 return {StatusCode::INTERNAL, (errorMessage)}; \
53 } \
54 } while (0)
55
56 namespace {
57 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
58 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
59 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
60 #ifdef PERFORMANCE_DEBUG
61 constexpr uint64_t S_TO_NS = 1000 * 1000 * 1000;
62 #endif
63 } // namespace
64
ProfilerService(const PluginServicePtr & pluginService)65 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
66 : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
67 {
68 pluginService_->SetPluginSessionManager(pluginSessionManager_);
69 }
70
~ProfilerService()71 ProfilerService::~ProfilerService() {}
72
~SessionContext()73 ProfilerService::SessionContext::~SessionContext()
74 {
75 PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
76 if (offlineScheduleTaskFd != -1) {
77 stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
78 }
79 StopSessionExpireTask(service->removeTask_);
80 service->pluginSessionManager_->RemovePluginSessions(pluginNames);
81 }
82
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)83 Status ProfilerService::GetCapabilities(ServerContext* context,
84 const ::GetCapabilitiesRequest* request,
85 ::GetCapabilitiesResponse* response)
86 {
87 CHECK_REQUEST_RESPONSE(context, request, response);
88 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
89
90 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
91 std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
92
93 response->set_status(StatusCode::OK);
94 for (size_t i = 0; i < capabilities.size(); i++) {
95 *response->add_capabilities() = capabilities[i];
96 }
97 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
98 return Status::OK;
99 }
100
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)101 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
102 {
103 std::unordered_map<std::string, size_t> nameIndex;
104 for (size_t i = 0; i < pluginConfigs.size(); i++) {
105 nameIndex[pluginConfigs[i].name()] = i;
106 }
107
108 size_t updateCount = 0;
109 for (auto& cfg : newPluginConfigs) {
110 auto it = nameIndex.find(cfg.name());
111 if (it != nameIndex.end()) {
112 int index = it->second;
113 pluginConfigs[index] = cfg;
114 updateCount++;
115 }
116 }
117 return updateCount;
118 }
119
CreatePluginSessions()120 bool ProfilerService::SessionContext::CreatePluginSessions()
121 {
122 if (bufferConfigs.size() > 0) { // with buffer configs
123 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
124 false, "create plugin sessions with buffer configs failed!");
125 } else { // without buffer configs
126 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
127 "create plugin sessions without buffer configs failed!");
128 }
129 return true;
130 }
131
StartPluginSessions()132 bool ProfilerService::SessionContext::StartPluginSessions()
133 {
134 std::unique_lock<std::mutex> lock(sessionMutex);
135
136 // if dataRepeater exists, reset it to usable state.
137 if (dataRepeater) {
138 dataRepeater->Reset();
139 }
140
141 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
142 uint32_t sampleDuration = sessionConfig.sample_duration();
143 if (sampleDuration > 0) {
144 traceFileWriter->SetTimeSource();
145 std::weak_ptr<SessionContext> weakCtx(shared_from_this());
146 // start offline trace timeout task
147 offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
148 [weakCtx]() {
149 if (auto ctx = weakCtx.lock(); ctx != nullptr) {
150 ctx->StopPluginSessions();
151 }
152 },
153 sampleDuration,
154 true);
155 // keep_alive_time not set by client, but the sample_duration setted
156 if (sessionConfig.keep_alive_time() == 0) {
157 // use sample_duration add a little time to set keep_alive_time
158 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
159 StartSessionExpireTask(service->removeTask_);
160 }
161 }
162 }
163
164 // start each plugin sessions
165 service->pluginSessionManager_->StartPluginSessions(pluginNames);
166 return true;
167 }
168
StopPluginSessions()169 bool ProfilerService::SessionContext::StopPluginSessions()
170 {
171 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
172 if (offlineScheduleTaskFd != -1) {
173 stopExpireTask.UnscheduleTaskLockless(offlineScheduleTaskFd);
174 offlineScheduleTaskFd = -1;
175 } else {
176 return true;
177 }
178 traceFileWriter->SetDurationTime();
179 }
180
181 // stop each plugin sessions
182 service->pluginSessionManager_->StopPluginSessions(pluginNames);
183 // stop epoll thread receiving shared memory messages
184 service->pluginService_->StopEpollThread();
185
186 // Read the remaining data of shared memory of all plugins.
187 for (auto& name : pluginNames) {
188 service->pluginService_->FlushAllData(name);
189 }
190 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
191 // update file header.
192 traceFileWriter->Finish();
193 }
194
195 // make sure FetchData thread exit
196 if (dataRepeater) {
197 dataRepeater->Close();
198 }
199 return true;
200 }
201
202 namespace {
IsValidKeepAliveTime(uint32_t timeout)203 bool IsValidKeepAliveTime(uint32_t timeout)
204 {
205 if (timeout < MIN_SESSION_TIMEOUT_MS) {
206 return false;
207 }
208 if (timeout > MAX_SESSION_TIMEOUT_MS) {
209 return false;
210 }
211 return true;
212 }
213 } // namespace
214
SetKeepAliveTime(uint32_t timeout)215 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
216 {
217 if (timeout > 0) {
218 sessionConfig.set_keep_alive_time(timeout);
219 }
220 }
221
StartSessionExpireTask(ScheduleTaskManager & task)222 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
223 {
224 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
225 timeoutScheduleTaskFd = task.ScheduleTask(
226 std::bind(&ProfilerService::RemoveSessionContext, service, id),
227 sessionConfig.keep_alive_time(), true);
228 }
229 }
230
StopSessionExpireTask(ScheduleTaskManager & task)231 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
232 {
233 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
234 task.UnscheduleTaskLockless(timeoutScheduleTaskFd);
235 timeoutScheduleTaskFd = -1;
236 }
237 }
238
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)239 Status ProfilerService::CreateSession(ServerContext* context,
240 const ::CreateSessionRequest* request,
241 ::CreateSessionResponse* response)
242 {
243 CHECK_REQUEST_RESPONSE(context, request, response);
244 CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
245 // check plugin configs
246 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
247 const int nConfigs = request->plugin_configs_size();
248 CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
249
250 // check buffer configs
251 std::shared_ptr<ProfilerSessionConfig> sessionConfig =
252 std::make_shared<ProfilerSessionConfig>(request->session_config());
253 const int nBuffers = sessionConfig->buffers_size();
254 CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
255 // copy plugin configs from request
256 std::vector<ProfilerPluginConfig> pluginConfigs;
257 pluginConfigs.reserve(nConfigs);
258
259 for (int i = 0; i < nConfigs; i++) {
260 pluginConfigs.push_back(request->plugin_configs(i));
261 }
262
263 if (pluginConfigs.empty()) {
264 PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
265 return Status(StatusCode::PERMISSION_DENIED, "");
266 }
267 // copy buffer configs
268 std::vector<BufferConfig> bufferConfigs;
269 if (nBuffers == 1) {
270 // if only one buffer config provided, all plugin use the same buffer config
271 bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
272 } else if (nBuffers > 0) {
273 // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
274 bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
275 }
276 PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
277 std::vector<std::string> pluginNames;
278 std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
279 [](ProfilerPluginConfig& config) { return config.name(); });
280 std::sort(pluginNames.begin(), pluginNames.end());
281 //set session configs
282 pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
283
284 // create TraceFileWriter for offline mode
285 TraceFileWriterPtr traceWriter;
286 std::shared_ptr<ProfilerDataRepeater> dataRepeater = nullptr;
287 if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
288 auto resultFile = sessionConfig->result_file();
289 CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
290 traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
291 sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
292 CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
293 pluginService_->SetTraceWriter(traceWriter);
294 for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
295 ProfilerPluginData pluginData;
296 pluginData.set_name(pluginConfigs[i].name() + "_config");
297 pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
298 pluginData.set_data(pluginConfigs[i].config_data());
299 std::vector<char> msgData(pluginData.ByteSizeLong());
300 if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
301 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
302 }
303 traceWriter->SetPluginConfig(msgData.data(), msgData.size());
304 }
305 traceWriter->Flush();
306 } else {
307 dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
308 CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
309 }
310
311 // create session context
312 auto ctx = std::make_shared<SessionContext>();
313 CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
314
315 // fill fields of SessionContext
316 ctx->service = this;
317 if (dataRepeater != nullptr) {
318 ctx->dataRepeater = dataRepeater;
319 }
320 if (traceWriter != nullptr) {
321 ctx->traceFileWriter = traceWriter;
322 }
323 ctx->sessionConfig = *sessionConfig;
324 ctx->pluginNames = std::move(pluginNames);
325 ctx->pluginConfigs = std::move(pluginConfigs);
326 ctx->bufferConfigs = std::move(bufferConfigs);
327
328 // create plugin sessions
329 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
330 // alloc new session id
331 uint32_t sessionId = ++sessionIdCounter_;
332 ctx->id = sessionId;
333 ctx->name = "session-" + std::to_string(sessionId);
334
335 // add {sessionId, ctx} to map
336 CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
337
338 // create session expire schedule task
339 auto keepAliveTime = sessionConfig->keep_alive_time();
340 if (keepAliveTime) {
341 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
342 // create schedule task for session timeout feature
343 ctx->SetKeepAliveTime(keepAliveTime);
344 ctx->StartSessionExpireTask(removeTask_);
345 }
346
347 // prepare response data fields
348 response->set_status(0);
349 response->set_session_id(sessionId);
350
351 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
352 return Status::OK;
353 }
354
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)355 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
356 {
357 std::unique_lock<std::mutex> lock(sessionContextMutex_);
358 CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
359 sessionContext_[sessionId] = sessionCtx;
360 return true;
361 }
362
GetSessionContext(uint32_t sessionId) const363 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
364 {
365 std::unique_lock<std::mutex> lock(sessionContextMutex_);
366 auto it = sessionContext_.find(sessionId);
367 if (it != sessionContext_.end()) {
368 auto ptr = it->second;
369 return ptr;
370 }
371 return nullptr;
372 }
373
RemoveSessionContext(uint32_t sessionId)374 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
375 {
376 std::unique_lock<std::mutex> lock(sessionContextMutex_);
377 auto it = sessionContext_.find(sessionId);
378 if (it != sessionContext_.end()) {
379 auto ptr = it->second;
380 PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
381 sessionContext_.erase(it);
382 return true;
383 }
384 return false;
385 }
386
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)387 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
388 const std::string& outputFile, const std::string& pluginVersion)
389 {
390 if (pluginName.empty() || outputFile.empty()) {
391 PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
392 pluginName.c_str(), outputFile.c_str());
393 return;
394 }
395
396 std::ifstream fsFile {}; // read from output file
397 fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
398 if (!fsFile.good()) {
399 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
400 return;
401 }
402
403 std::ofstream fsTarget {}; // write to profiler ouput file
404 fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
405 if (!fsTarget.good()) {
406 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
407 return;
408 }
409 fsTarget.seekp(0, std::ios_base::end);
410 int posFile = fsTarget.tellp(); // for update sha256
411
412 TraceFileHeader header {};
413 if (pluginName == "hiperf-plugin") {
414 header.data_.dataType = DataType::HIPERF_DATA;
415 } else {
416 header.data_.dataType = DataType::STANDALONE_DATA;
417 }
418 fsFile.seekg(0, std::ios_base::end);
419 uint64_t fileSize = (uint64_t)(fsFile.tellg());
420 header.data_.length += fileSize;
421 size_t pluginSize = sizeof(header.data_.standalonePluginName);
422 int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
423 if (ret != EOK) {
424 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
425 return;
426 }
427 pluginSize = sizeof(header.data_.pluginVersion);
428 ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
429 if (ret != EOK) {
430 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
431 return;
432 }
433 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
434 if (!fsTarget.good()) {
435 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
436 return;
437 }
438
439 SHA256_CTX sha256Ctx;
440 SHA256_Init(&sha256Ctx);
441 constexpr uint64_t bufSize = 4 * 1024 * 1024;
442 std::vector<char> buf(bufSize);
443 uint64_t readSize = 0;
444 fsFile.seekg(0);
445 while ((readSize = std::min(bufSize, fileSize)) > 0) {
446 fsFile.read(buf.data(), readSize);
447 fsTarget.write(buf.data(), readSize);
448 if (!fsTarget.good()) {
449 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
450 return;
451 }
452 fileSize -= readSize;
453
454 SHA256_Update(&sha256Ctx, buf.data(), readSize);
455 }
456 SHA256_Final(header.data_.sha256, &sha256Ctx);
457 fsTarget.seekp(posFile, std::ios_base::beg);
458 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
459
460 fsFile.close();
461 fsTarget.close();
462
463 PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
464 }
465
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)466 Status ProfilerService::StartSession(ServerContext* context,
467 const ::StartSessionRequest* request,
468 ::StartSessionResponse* response)
469 {
470 CHECK_REQUEST_RESPONSE(context, request, response);
471
472 uint32_t sessionId = request->session_id();
473 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
474
475 // copy plugin configs from request
476 std::vector<ProfilerPluginConfig> newPluginConfigs;
477 newPluginConfigs.reserve(request->update_configs_size());
478 for (int i = 0; i < request->update_configs_size(); i++) {
479 PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
480 newPluginConfigs.push_back(request->update_configs(i));
481 }
482
483 // get plugin names in request
484 std::vector<std::string> requestNames;
485 std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
486 [](auto& config) { return config.name(); });
487 std::sort(requestNames.begin(), requestNames.end());
488
489 auto ctx = GetSessionContext(sessionId);
490 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
491
492 // start epoll thread to receive shared memory messages.
493 CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
494
495 // get intersection set of requestNames and pluginNames
496 std::vector<std::string> updateNames;
497 std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
498 std::back_inserter(updateNames));
499
500 if (updateNames.size() > 0) {
501 // remove old plugin sessions
502 pluginSessionManager_->RemovePluginSessions(updateNames);
503
504 // update plugin configs
505 size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
506
507 // re-create plugin sessions
508 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
509 PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
510 }
511
512 // start plugin sessions with configs
513 CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
514 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
515 return Status::OK;
516 }
517
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)518 Status ProfilerService::FetchData(ServerContext* context,
519 const ::FetchDataRequest* request,
520 ServerWriter<::FetchDataResponse>* writer)
521 {
522 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
523 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
524 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
525
526 CHECK_POINTER_NOTNULL(request, "request invalid!");
527 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
528
529 uint32_t sessionId = request->session_id();
530 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
531
532 auto ctx = GetSessionContext(sessionId);
533 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
534
535 // check each plugin session states
536 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
537 "session status invalid!");
538
539 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
540 auto dataRepeater = ctx->dataRepeater;
541 CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
542
543 while (1) {
544 ctx = GetSessionContext(sessionId);
545 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
546
547 FetchDataResponse response;
548 response.set_status(StatusCode::OK);
549 response.set_response_id(++responseIdCounter_);
550
551 std::vector<ProfilerPluginDataPtr> pluginDataVec;
552 int count = dataRepeater->TakePluginData(pluginDataVec);
553 if (count > 0) {
554 response.set_has_more(true);
555 for (int i = 0; i < count; i++) {
556 auto data = response.add_plugin_data();
557 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
558 CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
559 *data = *pluginDataVec[i];
560 }
561 } else {
562 response.set_has_more(false);
563 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
564 }
565
566 bool sendSuccess = writer->Write(response);
567 if (count <= 0 || !sendSuccess) {
568 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
569 break;
570 }
571 }
572 }
573
574 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
575 return Status::OK;
576 }
577
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)578 Status ProfilerService::StopSession(ServerContext* context,
579 const ::StopSessionRequest* request,
580 ::StopSessionResponse* response)
581 {
582 #ifdef PERFORMANCE_DEBUG
583 struct timespec start = {};
584 clock_gettime(CLOCK_REALTIME, &start);
585 #endif
586 CHECK_REQUEST_RESPONSE(context, request, response);
587 uint32_t sessionId = request->session_id();
588 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
589
590 auto ctx = GetSessionContext(sessionId);
591 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
592 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
593 CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
594 ctx->traceFileWriter.get()->SetStopSplitFile(true);
595 }
596 CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
597 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
598 #ifdef PERFORMANCE_DEBUG
599 struct timespec end = {};
600 clock_gettime(CLOCK_REALTIME, &end);
601 uint64_t costTime = (end.tv_sec - start.tv_sec) * S_TO_NS + (end.tv_nsec - start.tv_nsec);
602 PROFILER_LOG_INFO(LOG_CORE, "StopSession cost time %" PRIu64 " ns", costTime);
603 #endif
604 return Status::OK;
605 }
606
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)607 Status ProfilerService::DestroySession(ServerContext* context,
608 const ::DestroySessionRequest* request,
609 ::DestroySessionResponse* response)
610 {
611 CHECK_REQUEST_RESPONSE(context, request, response);
612
613 uint32_t sessionId = request->session_id();
614 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
615
616 auto ctx = GetSessionContext(sessionId);
617 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
618
619 CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
620 CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
621 "remove plugin session FAILED!");
622 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
623
624 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
625 uint32_t pluginId = 0;
626 PluginContextPtr pluginCtx = nullptr;
627 for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
628 auto pluginName = ctx->pluginNames[i];
629 std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
630 if (pluginCtx->isStandaloneFileData == true) {
631 if (!ctx->sessionConfig.split_file()) {
632 MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
633 pluginCtx->pluginVersion);
634 }
635 }
636 }
637 }
638
639 return Status::OK;
640 }
641
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)642 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
643 const ::KeepSessionRequest* request,
644 ::KeepSessionResponse* response)
645 {
646 CHECK_REQUEST_RESPONSE(context, request, response);
647 uint32_t sessionId = request->session_id();
648 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
649
650 auto ctx = GetSessionContext(sessionId);
651 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
652
653 // update keep alive time if keep_alive_time parameter provided
654 auto keepAliveTime = request->keep_alive_time();
655 if (keepAliveTime) {
656 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
657 ctx->SetKeepAliveTime(keepAliveTime);
658 }
659
660 // reschedule session timeout task
661 if (ctx->sessionConfig.keep_alive_time() > 0) {
662 ctx->StopSessionExpireTask(removeTask_);
663 ctx->StartSessionExpireTask(removeTask_);
664 }
665 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
666 return Status::OK;
667 }
668
669 struct LoggingInterceptor : public grpc::experimental::Interceptor {
670 public:
LoggingInterceptorLoggingInterceptor671 explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
672
InterceptLoggingInterceptor673 void Intercept(experimental::InterceptorBatchMethods* methods) override
674 {
675 const char* method = info_->method();
676 if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
677 PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
678 } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
679 PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
680 }
681 methods->Proceed();
682 }
683
684 private:
685 grpc::experimental::ServerRpcInfo* info_ = nullptr;
686 };
687
688 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
689 protected:
CreateServerInterceptorInterceptorFactory690 grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
691 {
692 return new LoggingInterceptor(info);
693 }
694 };
695
StartService(const std::string & listenUri)696 bool ProfilerService::StartService(const std::string& listenUri)
697 {
698 CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
699
700 ServerBuilder builder;
701 builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
702 builder.RegisterService(this);
703
704 auto server = builder.BuildAndStart();
705 CHECK_NOTNULL(server, false, "start service failed!");
706 PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
707 server_ = std::move(server);
708 return true;
709 }
710
WaitServiceDone()711 void ProfilerService::WaitServiceDone()
712 {
713 if (server_) {
714 PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
715 server_->Wait();
716 PROFILER_LOG_INFO(LOG_CORE, "Server done!");
717 }
718 }
719
StopService()720 void ProfilerService::StopService()
721 {
722 if (server_) {
723 server_->Shutdown();
724 PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
725 }
726 }