1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "trace_file_writer.h"
28 #include <cinttypes>
29
30 using namespace ::grpc;
31 using PluginContextPtr = std::shared_ptr<PluginContext>;
32
33 #define CHECK_REQUEST_RESPONSE(context, request, response) \
34 do { \
35 CHECK_POINTER_NOTNULL(context, "context ptr invalid!"); \
36 CHECK_POINTER_NOTNULL(request, "request ptr invalid!"); \
37 CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
38 } while (0)
39
40 #define CHECK_POINTER_NOTNULL(ptr, errorMessage) \
41 do { \
42 if ((ptr) == nullptr) { \
43 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
44 return {StatusCode::INTERNAL, errorMessage}; \
45 } \
46 } while (0)
47
48 #define CHECK_EXPRESSION_TRUE(expr, errorMessage) \
49 do { \
50 if (!(expr)) { \
51 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
52 return {StatusCode::INTERNAL, (errorMessage)}; \
53 } \
54 } while (0)
55
56 namespace {
57 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
58 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
59 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
60 #ifdef PERFORMANCE_DEBUG
61 constexpr uint64_t S_TO_NS = 1000 * 1000 * 1000;
62 #endif
63 } // namespace
64
ProfilerService(const PluginServicePtr & pluginService)65 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
66 : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
67 {
68 pluginService_->SetPluginSessionManager(pluginSessionManager_);
69 }
70
~ProfilerService()71 ProfilerService::~ProfilerService() {}
72
~SessionContext()73 ProfilerService::SessionContext::~SessionContext()
74 {
75 PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
76 if (offlineScheduleTaskFd != -1) {
77 stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
78 }
79 StopSessionExpireTask(service->removeTask_);
80 service->pluginSessionManager_->RemovePluginSessions(pluginNames);
81 }
82
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)83 Status ProfilerService::GetCapabilities(ServerContext* context,
84 const ::GetCapabilitiesRequest* request,
85 ::GetCapabilitiesResponse* response)
86 {
87 CHECK_REQUEST_RESPONSE(context, request, response);
88 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
89
90 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
91 std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
92
93 response->set_status(StatusCode::OK);
94 for (size_t i = 0; i < capabilities.size(); i++) {
95 *response->add_capabilities() = capabilities[i];
96 }
97 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
98 return Status::OK;
99 }
100
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)101 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
102 {
103 std::unordered_map<std::string, size_t> nameIndex;
104 for (size_t i = 0; i < pluginConfigs.size(); i++) {
105 nameIndex[pluginConfigs[i].name()] = i;
106 }
107
108 size_t updateCount = 0;
109 for (auto& cfg : newPluginConfigs) {
110 auto it = nameIndex.find(cfg.name());
111 if (it != nameIndex.end()) {
112 int index = it->second;
113 pluginConfigs[index] = cfg;
114 updateCount++;
115 }
116 }
117 return updateCount;
118 }
119
CreatePluginSessions()120 bool ProfilerService::SessionContext::CreatePluginSessions()
121 {
122 if (bufferConfigs.size() > 0) { // with buffer configs
123 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
124 false, "create plugin sessions with buffer configs failed!");
125 } else { // without buffer configs
126 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
127 "create plugin sessions without buffer configs failed!");
128 }
129 return true;
130 }
131
StartPluginSessions()132 bool ProfilerService::SessionContext::StartPluginSessions()
133 {
134 std::unique_lock<std::mutex> lock(sessionMutex);
135
136 // if dataRepeater exists, reset it to usable state.
137 if (dataRepeater) {
138 dataRepeater->Reset();
139 }
140
141 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
142 uint32_t sampleDuration = sessionConfig.sample_duration();
143 if (sampleDuration > 0) {
144 traceFileWriter->SetTimeSource();
145 std::weak_ptr<SessionContext> weakCtx(shared_from_this());
146 // start offline trace timeout task
147 offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
148 [weakCtx]() {
149 if (auto ctx = weakCtx.lock(); ctx != nullptr) {
150 ctx->StopPluginSessions();
151 }
152 },
153 sampleDuration,
154 true);
155 // keep_alive_time not set by client, but the sample_duration setted
156 if (sessionConfig.keep_alive_time() == 0) {
157 // use sample_duration add a little time to set keep_alive_time
158 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
159 StartSessionExpireTask(service->removeTask_);
160 }
161 }
162 }
163
164 // start each plugin sessions
165 service->pluginSessionManager_->StartPluginSessions(pluginNames);
166 return true;
167 }
168
StopPluginSessions()169 bool ProfilerService::SessionContext::StopPluginSessions()
170 {
171 std::unique_lock<std::mutex> lock(sessionMutex);
172 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
173 if (offlineScheduleTaskFd != -1) {
174 stopExpireTask.UnscheduleTaskLockless(offlineScheduleTaskFd);
175 offlineScheduleTaskFd = -1;
176 } else {
177 return true;
178 }
179 traceFileWriter->SetDurationTime();
180 }
181
182 // stop each plugin sessions
183 service->pluginSessionManager_->StopPluginSessions(pluginNames);
184 // stop epoll thread receiving shared memory messages
185 service->pluginService_->StopEpollThread();
186
187 // Read the remaining data of shared memory of all plugins.
188 for (auto& name : pluginNames) {
189 service->pluginService_->FlushAllData(name);
190 }
191 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
192 // update file header.
193 traceFileWriter->Finish();
194 }
195
196 // make sure FetchData thread exit
197 if (dataRepeater) {
198 dataRepeater->Close();
199 }
200 return true;
201 }
202
203 namespace {
IsValidKeepAliveTime(uint32_t timeout)204 bool IsValidKeepAliveTime(uint32_t timeout)
205 {
206 if (timeout < MIN_SESSION_TIMEOUT_MS) {
207 return false;
208 }
209 if (timeout > MAX_SESSION_TIMEOUT_MS) {
210 return false;
211 }
212 return true;
213 }
214 } // namespace
215
SetKeepAliveTime(uint32_t timeout)216 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
217 {
218 if (timeout > 0) {
219 sessionConfig.set_keep_alive_time(timeout);
220 }
221 }
222
StartSessionExpireTask(ScheduleTaskManager & task)223 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
224 {
225 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
226 timeoutScheduleTaskFd = task.ScheduleTask(
227 std::bind(&ProfilerService::RemoveSessionContext, service, id),
228 sessionConfig.keep_alive_time(), true);
229 }
230 }
231
StopSessionExpireTask(ScheduleTaskManager & task)232 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
233 {
234 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
235 task.UnscheduleTaskLockless(timeoutScheduleTaskFd);
236 timeoutScheduleTaskFd = -1;
237 }
238 }
239
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)240 Status ProfilerService::CreateSession(ServerContext* context,
241 const ::CreateSessionRequest* request,
242 ::CreateSessionResponse* response)
243 {
244 CHECK_REQUEST_RESPONSE(context, request, response);
245 CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
246 // check plugin configs
247 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
248 const int nConfigs = request->plugin_configs_size();
249 CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
250
251 // check buffer configs
252 std::shared_ptr<ProfilerSessionConfig> sessionConfig =
253 std::make_shared<ProfilerSessionConfig>(request->session_config());
254 const int nBuffers = sessionConfig->buffers_size();
255 CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
256 // copy plugin configs from request
257 std::vector<ProfilerPluginConfig> pluginConfigs;
258 pluginConfigs.reserve(nConfigs);
259
260 for (int i = 0; i < nConfigs; i++) {
261 pluginConfigs.push_back(request->plugin_configs(i));
262 }
263
264 if (pluginConfigs.empty()) {
265 PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
266 return Status(StatusCode::PERMISSION_DENIED, "");
267 }
268 // copy buffer configs
269 std::vector<BufferConfig> bufferConfigs;
270 if (nBuffers == 1) {
271 // if only one buffer config provided, all plugin use the same buffer config
272 bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
273 } else if (nBuffers > 0) {
274 // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
275 bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
276 }
277 PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
278 std::vector<std::string> pluginNames;
279 std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
280 [](ProfilerPluginConfig& config) { return config.name(); });
281 std::sort(pluginNames.begin(), pluginNames.end());
282 //set session configs
283 pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
284
285 // create TraceFileWriter for offline mode
286 TraceFileWriterPtr traceWriter;
287 std::shared_ptr<ProfilerDataRepeater> dataRepeater = nullptr;
288 if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
289 auto resultFile = sessionConfig->result_file();
290 CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
291 traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
292 sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
293 CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
294 pluginService_->SetTraceWriter(traceWriter);
295 for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
296 ProfilerPluginData pluginData;
297 pluginData.set_name(pluginConfigs[i].name() + "_config");
298 pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
299 pluginData.set_data(pluginConfigs[i].config_data());
300 std::vector<char> msgData(pluginData.ByteSizeLong());
301 if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
302 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
303 }
304 traceWriter->SetPluginConfig(msgData.data(), msgData.size());
305 }
306 traceWriter->Flush();
307 } else {
308 dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
309 CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
310 }
311
312 // create session context
313 auto ctx = std::make_shared<SessionContext>();
314 CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
315
316 // fill fields of SessionContext
317 ctx->service = this;
318 if (dataRepeater != nullptr) {
319 ctx->dataRepeater = dataRepeater;
320 }
321 if (traceWriter != nullptr) {
322 ctx->traceFileWriter = traceWriter;
323 }
324 ctx->sessionConfig = *sessionConfig;
325 ctx->pluginNames = std::move(pluginNames);
326 ctx->pluginConfigs = std::move(pluginConfigs);
327 ctx->bufferConfigs = std::move(bufferConfigs);
328
329 // create plugin sessions
330 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
331 // alloc new session id
332 uint32_t sessionId = ++sessionIdCounter_;
333 ctx->id = sessionId;
334 ctx->name = "session-" + std::to_string(sessionId);
335
336 // add {sessionId, ctx} to map
337 CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
338
339 // create session expire schedule task
340 auto keepAliveTime = sessionConfig->keep_alive_time();
341 if (keepAliveTime) {
342 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
343 // create schedule task for session timeout feature
344 ctx->SetKeepAliveTime(keepAliveTime);
345 ctx->StartSessionExpireTask(removeTask_);
346 }
347
348 // prepare response data fields
349 response->set_status(0);
350 response->set_session_id(sessionId);
351
352 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
353 return Status::OK;
354 }
355
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)356 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
357 {
358 std::unique_lock<std::mutex> lock(sessionContextMutex_);
359 CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
360 sessionContext_[sessionId] = sessionCtx;
361 return true;
362 }
363
GetSessionContext(uint32_t sessionId) const364 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
365 {
366 std::unique_lock<std::mutex> lock(sessionContextMutex_);
367 auto it = sessionContext_.find(sessionId);
368 if (it != sessionContext_.end()) {
369 auto ptr = it->second;
370 return ptr;
371 }
372 return nullptr;
373 }
374
RemoveSessionContext(uint32_t sessionId)375 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
376 {
377 std::unique_lock<std::mutex> lock(sessionContextMutex_);
378 auto it = sessionContext_.find(sessionId);
379 if (it != sessionContext_.end()) {
380 auto ptr = it->second;
381 PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
382 sessionContext_.erase(it);
383 return true;
384 }
385 return false;
386 }
387
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)388 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
389 const std::string& outputFile, const std::string& pluginVersion)
390 {
391 if (pluginName.empty() || outputFile.empty()) {
392 PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
393 pluginName.c_str(), outputFile.c_str());
394 return;
395 }
396
397 std::ifstream fsFile {}; // read from output file
398 fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
399 if (!fsFile.good()) {
400 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
401 return;
402 }
403
404 std::ofstream fsTarget {}; // write to profiler ouput file
405 fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
406 if (!fsTarget.good()) {
407 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
408 return;
409 }
410 fsTarget.seekp(0, std::ios_base::end);
411 int posFile = fsTarget.tellp(); // for update sha256
412
413 TraceFileHeader header {};
414 if (pluginName == "hiperf-plugin") {
415 header.data_.dataType = DataType::HIPERF_DATA;
416 } else {
417 header.data_.dataType = DataType::STANDALONE_DATA;
418 }
419 fsFile.seekg(0, std::ios_base::end);
420 uint64_t fileSize = (uint64_t)(fsFile.tellg());
421 header.data_.length += fileSize;
422 size_t pluginSize = sizeof(header.data_.standalonePluginName);
423 int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
424 if (ret != EOK) {
425 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
426 return;
427 }
428 pluginSize = sizeof(header.data_.pluginVersion);
429 ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
430 if (ret != EOK) {
431 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
432 return;
433 }
434 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
435 if (!fsTarget.good()) {
436 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
437 return;
438 }
439
440 SHA256_CTX sha256Ctx;
441 SHA256_Init(&sha256Ctx);
442 constexpr uint64_t bufSize = 4 * 1024 * 1024;
443 std::vector<char> buf(bufSize);
444 uint64_t readSize = 0;
445 fsFile.seekg(0);
446 while ((readSize = std::min(bufSize, fileSize)) > 0) {
447 fsFile.read(buf.data(), readSize);
448 fsTarget.write(buf.data(), readSize);
449 if (!fsTarget.good()) {
450 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
451 return;
452 }
453 fileSize -= readSize;
454
455 SHA256_Update(&sha256Ctx, buf.data(), readSize);
456 }
457 SHA256_Final(header.data_.sha256, &sha256Ctx);
458 fsTarget.seekp(posFile, std::ios_base::beg);
459 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
460
461 fsFile.close();
462 fsTarget.close();
463
464 PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
465 }
466
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)467 Status ProfilerService::StartSession(ServerContext* context,
468 const ::StartSessionRequest* request,
469 ::StartSessionResponse* response)
470 {
471 CHECK_REQUEST_RESPONSE(context, request, response);
472
473 uint32_t sessionId = request->session_id();
474 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
475
476 // copy plugin configs from request
477 std::vector<ProfilerPluginConfig> newPluginConfigs;
478 newPluginConfigs.reserve(request->update_configs_size());
479 for (int i = 0; i < request->update_configs_size(); i++) {
480 PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
481 newPluginConfigs.push_back(request->update_configs(i));
482 }
483
484 // get plugin names in request
485 std::vector<std::string> requestNames;
486 std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
487 [](auto& config) { return config.name(); });
488 std::sort(requestNames.begin(), requestNames.end());
489
490 auto ctx = GetSessionContext(sessionId);
491 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
492
493 // start epoll thread to receive shared memory messages.
494 CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
495
496 // get intersection set of requestNames and pluginNames
497 std::vector<std::string> updateNames;
498 std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
499 std::back_inserter(updateNames));
500
501 if (updateNames.size() > 0) {
502 // remove old plugin sessions
503 pluginSessionManager_->RemovePluginSessions(updateNames);
504
505 // update plugin configs
506 size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
507
508 // re-create plugin sessions
509 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
510 PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
511 }
512
513 // start plugin sessions with configs
514 CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
515 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
516 return Status::OK;
517 }
518
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)519 Status ProfilerService::FetchData(ServerContext* context,
520 const ::FetchDataRequest* request,
521 ServerWriter<::FetchDataResponse>* writer)
522 {
523 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
524 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
525 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
526
527 CHECK_POINTER_NOTNULL(request, "request invalid!");
528 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
529
530 uint32_t sessionId = request->session_id();
531 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
532
533 auto ctx = GetSessionContext(sessionId);
534 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
535
536 // check each plugin session states
537 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
538 "session status invalid!");
539
540 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
541 auto dataRepeater = ctx->dataRepeater;
542 CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
543
544 while (1) {
545 ctx = GetSessionContext(sessionId);
546 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
547
548 FetchDataResponse response;
549 response.set_status(StatusCode::OK);
550 response.set_response_id(++responseIdCounter_);
551
552 std::vector<ProfilerPluginDataPtr> pluginDataVec;
553 int count = dataRepeater->TakePluginData(pluginDataVec);
554 if (count > 0) {
555 response.set_has_more(true);
556 for (int i = 0; i < count; i++) {
557 auto data = response.add_plugin_data();
558 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
559 CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
560 *data = *pluginDataVec[i];
561 }
562 } else {
563 response.set_has_more(false);
564 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
565 }
566
567 bool sendSuccess = writer->Write(response);
568 if (count <= 0 || !sendSuccess) {
569 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
570 break;
571 }
572 }
573 }
574
575 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
576 return Status::OK;
577 }
578
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)579 Status ProfilerService::StopSession(ServerContext* context,
580 const ::StopSessionRequest* request,
581 ::StopSessionResponse* response)
582 {
583 #ifdef PERFORMANCE_DEBUG
584 struct timespec start = {};
585 clock_gettime(CLOCK_REALTIME, &start);
586 #endif
587 CHECK_REQUEST_RESPONSE(context, request, response);
588 uint32_t sessionId = request->session_id();
589 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
590
591 auto ctx = GetSessionContext(sessionId);
592 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
593 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
594 CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
595 ctx->traceFileWriter.get()->SetStopSplitFile(true);
596 }
597 CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
598 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
599 #ifdef PERFORMANCE_DEBUG
600 struct timespec end = {};
601 clock_gettime(CLOCK_REALTIME, &end);
602 uint64_t costTime = (end.tv_sec - start.tv_sec) * S_TO_NS + (end.tv_nsec - start.tv_nsec);
603 PROFILER_LOG_INFO(LOG_CORE, "StopSession cost time %" PRIu64 " ns", costTime);
604 #endif
605 return Status::OK;
606 }
607
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)608 Status ProfilerService::DestroySession(ServerContext* context,
609 const ::DestroySessionRequest* request,
610 ::DestroySessionResponse* response)
611 {
612 CHECK_REQUEST_RESPONSE(context, request, response);
613
614 uint32_t sessionId = request->session_id();
615 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
616
617 auto ctx = GetSessionContext(sessionId);
618 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
619
620 CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
621 CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
622 "remove plugin session FAILED!");
623 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
624
625 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
626 uint32_t pluginId = 0;
627 PluginContextPtr pluginCtx = nullptr;
628 for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
629 auto pluginName = ctx->pluginNames[i];
630 std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
631 if (pluginCtx->isStandaloneFileData == true) {
632 if (!ctx->sessionConfig.split_file()) {
633 MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
634 pluginCtx->pluginVersion);
635 }
636 }
637 }
638 }
639
640 return Status::OK;
641 }
642
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)643 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
644 const ::KeepSessionRequest* request,
645 ::KeepSessionResponse* response)
646 {
647 CHECK_REQUEST_RESPONSE(context, request, response);
648 uint32_t sessionId = request->session_id();
649 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
650
651 auto ctx = GetSessionContext(sessionId);
652 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
653
654 // update keep alive time if keep_alive_time parameter provided
655 auto keepAliveTime = request->keep_alive_time();
656 if (keepAliveTime) {
657 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
658 ctx->SetKeepAliveTime(keepAliveTime);
659 }
660
661 // reschedule session timeout task
662 if (ctx->sessionConfig.keep_alive_time() > 0) {
663 ctx->StopSessionExpireTask(removeTask_);
664 ctx->StartSessionExpireTask(removeTask_);
665 }
666 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
667 return Status::OK;
668 }
669
670 struct LoggingInterceptor : public grpc::experimental::Interceptor {
671 public:
LoggingInterceptorLoggingInterceptor672 explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
673
InterceptLoggingInterceptor674 void Intercept(experimental::InterceptorBatchMethods* methods) override
675 {
676 const char* method = info_->method();
677 if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
678 PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
679 } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
680 PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
681 }
682 methods->Proceed();
683 }
684
685 private:
686 grpc::experimental::ServerRpcInfo* info_ = nullptr;
687 };
688
689 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
690 protected:
CreateServerInterceptorInterceptorFactory691 grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
692 {
693 return new LoggingInterceptor(info);
694 }
695 };
696
StartService(const std::string & listenUri)697 bool ProfilerService::StartService(const std::string& listenUri)
698 {
699 CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
700
701 ServerBuilder builder;
702 builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
703 builder.RegisterService(this);
704
705 auto server = builder.BuildAndStart();
706 CHECK_NOTNULL(server, false, "start service failed!");
707 PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
708 server_ = std::move(server);
709 return true;
710 }
711
WaitServiceDone()712 void ProfilerService::WaitServiceDone()
713 {
714 if (server_) {
715 PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
716 server_->Wait();
717 PROFILER_LOG_INFO(LOG_CORE, "Server done!");
718 }
719 }
720
StopService()721 void ProfilerService::StopService()
722 {
723 if (server_) {
724 server_->Shutdown();
725 PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
726 }
727 }