1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ProfilerService"
16 #include "profiler_service.h"
17 #include <algorithm>
18 #include <unistd.h>
19 #include "common.h"
20 #include "logging.h"
21 #include "native_hook_config.pb.h"
22 #include "plugin_service.h"
23 #include "plugin_session.h"
24 #include "plugin_session_manager.h"
25 #include "profiler_capability_manager.h"
26 #include "profiler_data_repeater.h"
27 #include "trace_file_writer.h"
28
29 using namespace ::grpc;
30 using PluginContextPtr = std::shared_ptr<PluginContext>;
31
32 #define CHECK_REQUEST_RESPONSE(context, request, response) \
33 do { \
34 CHECK_POINTER_NOTNULL(context, "context ptr invalid!"); \
35 CHECK_POINTER_NOTNULL(request, "request ptr invalid!"); \
36 CHECK_POINTER_NOTNULL(response, "response ptr invalid!"); \
37 } while (0)
38
39 #define CHECK_POINTER_NOTNULL(ptr, errorMessage) \
40 do { \
41 if ((ptr) == nullptr) { \
42 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
43 return {StatusCode::INTERNAL, errorMessage}; \
44 } \
45 } while (0)
46
47 #define CHECK_EXPRESSION_TRUE(expr, errorMessage) \
48 do { \
49 if (!(expr)) { \
50 PROFILER_LOG_ERROR(LOG_CORE, "%s: FAILED, %s", __func__, errorMessage); \
51 return {StatusCode::INTERNAL, (errorMessage)}; \
52 } \
53 } while (0)
54
55 namespace {
56 constexpr int MIN_SESSION_TIMEOUT_MS = 1000;
57 constexpr int MAX_SESSION_TIMEOUT_MS = 1000 * 3600;
58 constexpr int DEFAULT_KEEP_ALIVE_DELTA = 3000;
59 } // namespace
60
ProfilerService(const PluginServicePtr & pluginService)61 ProfilerService::ProfilerService(const PluginServicePtr& pluginService)
62 : pluginService_(pluginService), pluginSessionManager_(std::make_shared<PluginSessionManager>(pluginService))
63 {
64 pluginService_->SetPluginSessionManager(pluginSessionManager_);
65 }
66
~ProfilerService()67 ProfilerService::~ProfilerService() {}
68
~SessionContext()69 ProfilerService::SessionContext::~SessionContext()
70 {
71 PROFILER_LOG_INFO(LOG_CORE, "~SessionContext id = %d", id);
72 if (offlineScheduleTaskFd != -1) {
73 stopExpireTask.UnscheduleTask(offlineScheduleTaskFd);
74 }
75 StopSessionExpireTask(service->removeTask_);
76 service->pluginSessionManager_->RemovePluginSessions(pluginNames);
77 }
78
GetCapabilities(ServerContext * context,const::GetCapabilitiesRequest * request,::GetCapabilitiesResponse * response)79 Status ProfilerService::GetCapabilities(ServerContext* context,
80 const ::GetCapabilitiesRequest* request,
81 ::GetCapabilitiesResponse* response)
82 {
83 CHECK_REQUEST_RESPONSE(context, request, response);
84 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities from '%s'", context->peer().c_str());
85
86 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d start", request->request_id());
87 std::vector<ProfilerPluginCapability> capabilities = ProfilerCapabilityManager::GetInstance().GetCapabilities();
88
89 response->set_status(StatusCode::OK);
90 for (size_t i = 0; i < capabilities.size(); i++) {
91 *response->add_capabilities() = capabilities[i];
92 }
93 PROFILER_LOG_INFO(LOG_CORE, "GetCapabilities %d done!", request->request_id());
94 return Status::OK;
95 }
96
UpdatePluginConfigs(const std::vector<ProfilerPluginConfig> & newPluginConfigs)97 size_t ProfilerService::SessionContext::UpdatePluginConfigs(const std::vector<ProfilerPluginConfig>& newPluginConfigs)
98 {
99 std::unordered_map<std::string, size_t> nameIndex;
100 for (size_t i = 0; i < pluginConfigs.size(); i++) {
101 nameIndex[pluginConfigs[i].name()] = i;
102 }
103
104 size_t updateCount = 0;
105 for (auto& cfg : newPluginConfigs) {
106 auto it = nameIndex.find(cfg.name());
107 if (it != nameIndex.end()) {
108 int index = it->second;
109 pluginConfigs[index] = cfg;
110 updateCount++;
111 }
112 }
113 return updateCount;
114 }
115
CreatePluginSessions()116 bool ProfilerService::SessionContext::CreatePluginSessions()
117 {
118 if (bufferConfigs.size() > 0) { // with buffer configs
119 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, bufferConfigs, dataRepeater),
120 false, "create plugin sessions with buffer configs failed!");
121 } else { // without buffer configs
122 CHECK_TRUE(service->pluginSessionManager_->CreatePluginSessions(pluginConfigs, dataRepeater), false,
123 "create plugin sessions without buffer configs failed!");
124 }
125 return true;
126 }
127
StartPluginSessions()128 bool ProfilerService::SessionContext::StartPluginSessions()
129 {
130 std::unique_lock<std::mutex> lock(sessionMutex);
131
132 // if dataRepeater exists, reset it to usable state.
133 if (dataRepeater) {
134 dataRepeater->Reset();
135 }
136
137 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
138 uint32_t sampleDuration = sessionConfig.sample_duration();
139 if (sampleDuration > 0) {
140 traceFileWriter->SetTimeSource();
141 std::weak_ptr<SessionContext> weakCtx(shared_from_this());
142 // start offline trace timeout task
143 offlineScheduleTaskFd = stopExpireTask.ScheduleTask(
144 [weakCtx]() {
145 if (auto ctx = weakCtx.lock(); ctx != nullptr) {
146 ctx->StopPluginSessions();
147 }
148 },
149 sampleDuration,
150 true);
151 // keep_alive_time not set by client, but the sample_duration setted
152 if (sessionConfig.keep_alive_time() == 0) {
153 // use sample_duration add a little time to set keep_alive_time
154 SetKeepAliveTime(sampleDuration + DEFAULT_KEEP_ALIVE_DELTA);
155 StartSessionExpireTask(service->removeTask_);
156 }
157 }
158 }
159
160 // start each plugin sessions
161 service->pluginSessionManager_->StartPluginSessions(pluginNames);
162 return true;
163 }
164
StopPluginSessions()165 bool ProfilerService::SessionContext::StopPluginSessions()
166 {
167 std::unique_lock<std::mutex> lock(sessionMutex);
168 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
169 if (offlineScheduleTaskFd != -1) {
170 stopExpireTask.UnscheduleTaskLockless(offlineScheduleTaskFd);
171 offlineScheduleTaskFd = -1;
172 } else {
173 return true;
174 }
175 traceFileWriter->SetDurationTime();
176 }
177
178 // stop each plugin sessions
179 service->pluginSessionManager_->StopPluginSessions(pluginNames);
180 // stop epoll thread receiving shared memory messages
181 service->pluginService_->StopEpollThread();
182
183 // Read the remaining data of shared memory of all plugins.
184 for (auto& name : pluginNames) {
185 service->pluginService_->FlushAllData(name);
186 }
187 if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
188 // update file header.
189 traceFileWriter->Finish();
190 }
191
192 // make sure FetchData thread exit
193 if (dataRepeater) {
194 dataRepeater->Close();
195 }
196 return true;
197 }
198
199 namespace {
IsValidKeepAliveTime(uint32_t timeout)200 bool IsValidKeepAliveTime(uint32_t timeout)
201 {
202 if (timeout < MIN_SESSION_TIMEOUT_MS) {
203 return false;
204 }
205 if (timeout > MAX_SESSION_TIMEOUT_MS) {
206 return false;
207 }
208 return true;
209 }
210 } // namespace
211
SetKeepAliveTime(uint32_t timeout)212 void ProfilerService::SessionContext::SetKeepAliveTime(uint32_t timeout)
213 {
214 if (timeout > 0) {
215 sessionConfig.set_keep_alive_time(timeout);
216 }
217 }
218
StartSessionExpireTask(ScheduleTaskManager & task)219 void ProfilerService::SessionContext::StartSessionExpireTask(ScheduleTaskManager& task)
220 {
221 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd == -1) {
222 timeoutScheduleTaskFd = task.ScheduleTask(
223 std::bind(&ProfilerService::RemoveSessionContext, service, id),
224 sessionConfig.keep_alive_time(), true);
225 }
226 }
227
StopSessionExpireTask(ScheduleTaskManager & task)228 void ProfilerService::SessionContext::StopSessionExpireTask(ScheduleTaskManager& task)
229 {
230 if (sessionConfig.keep_alive_time() > 0 && timeoutScheduleTaskFd != -1) {
231 task.UnscheduleTaskLockless(timeoutScheduleTaskFd);
232 timeoutScheduleTaskFd = -1;
233 }
234 }
235
CreateSession(ServerContext * context,const::CreateSessionRequest * request,::CreateSessionResponse * response)236 Status ProfilerService::CreateSession(ServerContext* context,
237 const ::CreateSessionRequest* request,
238 ::CreateSessionResponse* response)
239 {
240 CHECK_REQUEST_RESPONSE(context, request, response);
241 CHECK_POINTER_NOTNULL(pluginService_, "plugin service not ready!");
242 // check plugin configs
243 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d start", request->request_id());
244 const int nConfigs = request->plugin_configs_size();
245 CHECK_EXPRESSION_TRUE(nConfigs > 0, "no plugin configs!");
246
247 // check buffer configs
248 std::shared_ptr<ProfilerSessionConfig> sessionConfig =
249 std::make_shared<ProfilerSessionConfig>(request->session_config());
250 const int nBuffers = sessionConfig->buffers_size();
251 CHECK_EXPRESSION_TRUE(nBuffers == 0 || nBuffers == 1 || nBuffers == nConfigs, "buffers config invalid!");
252 // copy plugin configs from request
253 std::vector<ProfilerPluginConfig> pluginConfigs;
254 pluginConfigs.reserve(nConfigs);
255
256 for (int i = 0; i < nConfigs; i++) {
257 pluginConfigs.push_back(request->plugin_configs(i));
258 }
259
260 if (pluginConfigs.empty()) {
261 PROFILER_LOG_ERROR(LOG_CORE, "No plugins are loaded!");
262 return Status(StatusCode::PERMISSION_DENIED, "");
263 }
264 // copy buffer configs
265 std::vector<BufferConfig> bufferConfigs;
266 if (nBuffers == 1) {
267 // if only one buffer config provided, all plugin use the same buffer config
268 bufferConfigs.resize(pluginConfigs.size(), sessionConfig->buffers(0));
269 } else if (nBuffers > 0) {
270 // if more than one buffer config provided, the number of buffer configs must equals number of plugin configs
271 bufferConfigs.assign(sessionConfig->buffers().begin(), sessionConfig->buffers().end());
272 }
273 PROFILER_LOG_INFO(LOG_CORE, "bufferConfigs: %zu", bufferConfigs.size());
274 std::vector<std::string> pluginNames;
275 std::transform(pluginConfigs.begin(), pluginConfigs.end(), std::back_inserter(pluginNames),
276 [](ProfilerPluginConfig& config) { return config.name(); });
277 std::sort(pluginNames.begin(), pluginNames.end());
278 //set session configs
279 pluginService_->SetProfilerSessionConfig(sessionConfig, pluginNames);
280
281 // create TraceFileWriter for offline mode
282 TraceFileWriterPtr traceWriter;
283 std::shared_ptr<ProfilerDataRepeater> dataRepeater = nullptr;
284 if (sessionConfig->session_mode() == ProfilerSessionConfig::OFFLINE) {
285 auto resultFile = sessionConfig->result_file();
286 CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
287 traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig->split_file(),
288 sessionConfig->split_file_max_size_mb(), sessionConfig->split_file_max_num());
289 CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
290 pluginService_->SetTraceWriter(traceWriter);
291 for (std::vector<ProfilerPluginConfig>::size_type i = 0; i < pluginConfigs.size(); i++) {
292 ProfilerPluginData pluginData;
293 pluginData.set_name(pluginConfigs[i].name() + "_config");
294 pluginData.set_sample_interval(request->plugin_configs(i).sample_interval());
295 pluginData.set_data(pluginConfigs[i].config_data());
296 std::vector<char> msgData(pluginData.ByteSizeLong());
297 if (pluginData.SerializeToArray(msgData.data(), msgData.size()) <= 0) {
298 PROFILER_LOG_WARN(LOG_CORE, "PluginConfig SerializeToArray failed!");
299 }
300 traceWriter->SetPluginConfig(msgData.data(), msgData.size());
301 }
302 traceWriter->Flush();
303 } else {
304 dataRepeater = std::make_shared<ProfilerDataRepeater>(DEFAULT_REPEATER_BUFFER_SIZE);
305 CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
306 }
307
308 // create session context
309 auto ctx = std::make_shared<SessionContext>();
310 CHECK_POINTER_NOTNULL(ctx, "alloc SessionContext failed!");
311
312 // fill fields of SessionContext
313 ctx->service = this;
314 if (dataRepeater != nullptr) {
315 ctx->dataRepeater = dataRepeater;
316 }
317 if (traceWriter != nullptr) {
318 ctx->traceFileWriter = traceWriter;
319 }
320 ctx->sessionConfig = *sessionConfig;
321 ctx->pluginNames = std::move(pluginNames);
322 ctx->pluginConfigs = std::move(pluginConfigs);
323 ctx->bufferConfigs = std::move(bufferConfigs);
324
325 // create plugin sessions
326 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "create plugin sessions failed!");
327 // alloc new session id
328 uint32_t sessionId = ++sessionIdCounter_;
329 ctx->id = sessionId;
330 ctx->name = "session-" + std::to_string(sessionId);
331
332 // add {sessionId, ctx} to map
333 CHECK_EXPRESSION_TRUE(AddSessionContext(sessionId, ctx), "sessionId conflict!");
334
335 // create session expire schedule task
336 auto keepAliveTime = sessionConfig->keep_alive_time();
337 if (keepAliveTime) {
338 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
339 // create schedule task for session timeout feature
340 ctx->SetKeepAliveTime(keepAliveTime);
341 ctx->StartSessionExpireTask(removeTask_);
342 }
343
344 // prepare response data fields
345 response->set_status(0);
346 response->set_session_id(sessionId);
347
348 PROFILER_LOG_INFO(LOG_CORE, "CreateSession %d %u done!", request->request_id(), sessionId);
349 return Status::OK;
350 }
351
AddSessionContext(uint32_t sessionId,const SessionContextPtr & sessionCtx)352 bool ProfilerService::AddSessionContext(uint32_t sessionId, const SessionContextPtr& sessionCtx)
353 {
354 std::unique_lock<std::mutex> lock(sessionContextMutex_);
355 CHECK_TRUE(sessionContext_.count(sessionId) == 0, false, "sessionId already exists!");
356 sessionContext_[sessionId] = sessionCtx;
357 return true;
358 }
359
GetSessionContext(uint32_t sessionId) const360 ProfilerService::SessionContextPtr ProfilerService::GetSessionContext(uint32_t sessionId) const
361 {
362 std::unique_lock<std::mutex> lock(sessionContextMutex_);
363 auto it = sessionContext_.find(sessionId);
364 if (it != sessionContext_.end()) {
365 auto ptr = it->second;
366 return ptr;
367 }
368 return nullptr;
369 }
370
RemoveSessionContext(uint32_t sessionId)371 bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
372 {
373 std::unique_lock<std::mutex> lock(sessionContextMutex_);
374 auto it = sessionContext_.find(sessionId);
375 if (it != sessionContext_.end()) {
376 auto ptr = it->second;
377 PROFILER_LOG_INFO(LOG_CORE, "DelCtx use_count: %ld", ptr.use_count());
378 sessionContext_.erase(it);
379 return true;
380 }
381 return false;
382 }
383
MergeStandaloneFile(const std::string & resultFile,const std::string & pluginName,const std::string & outputFile,const std::string & pluginVersion)384 void ProfilerService::MergeStandaloneFile(const std::string& resultFile, const std::string& pluginName,
385 const std::string& outputFile, const std::string& pluginVersion)
386 {
387 if (pluginName.empty() || outputFile.empty()) {
388 PROFILER_LOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)",
389 pluginName.c_str(), outputFile.c_str());
390 return;
391 }
392
393 std::ifstream fsFile {}; // read from output file
394 fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
395 if (!fsFile.good()) {
396 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
397 return;
398 }
399
400 std::ofstream fsTarget {}; // write to profiler ouput file
401 fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
402 if (!fsTarget.good()) {
403 PROFILER_LOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
404 return;
405 }
406 fsTarget.seekp(0, std::ios_base::end);
407 int posFile = fsTarget.tellp(); // for update sha256
408
409 TraceFileHeader header {};
410 if (pluginName == "hiperf-plugin") {
411 header.data_.dataType = DataType::HIPERF_DATA;
412 } else {
413 header.data_.dataType = DataType::STANDALONE_DATA;
414 }
415 fsFile.seekg(0, std::ios_base::end);
416 uint64_t fileSize = (uint64_t)(fsFile.tellg());
417 header.data_.length += fileSize;
418 size_t pluginSize = sizeof(header.data_.standalonePluginName);
419 int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
420 if (ret != EOK) {
421 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
422 return;
423 }
424 pluginSize = sizeof(header.data_.pluginVersion);
425 ret = strncpy_s(header.data_.pluginVersion, pluginSize, pluginVersion.c_str(), pluginSize - 1);
426 if (ret != EOK) {
427 PROFILER_LOG_ERROR(LOG_CORE, "strncpy_s error! pluginVersion is %s", pluginVersion.c_str());
428 return;
429 }
430 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
431 if (!fsTarget.good()) {
432 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
433 return;
434 }
435
436 SHA256_CTX sha256Ctx;
437 SHA256_Init(&sha256Ctx);
438 constexpr uint64_t bufSize = 4 * 1024 * 1024;
439 std::vector<char> buf(bufSize);
440 uint64_t readSize = 0;
441 fsFile.seekg(0);
442 while ((readSize = std::min(bufSize, fileSize)) > 0) {
443 fsFile.read(buf.data(), readSize);
444 fsTarget.write(buf.data(), readSize);
445 if (!fsTarget.good()) {
446 PROFILER_LOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
447 return;
448 }
449 fileSize -= readSize;
450
451 SHA256_Update(&sha256Ctx, buf.data(), readSize);
452 }
453 SHA256_Final(header.data_.sha256, &sha256Ctx);
454 fsTarget.seekp(posFile, std::ios_base::beg);
455 fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
456
457 fsFile.close();
458 fsTarget.close();
459
460 PROFILER_LOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
461 }
462
StartSession(ServerContext * context,const::StartSessionRequest * request,::StartSessionResponse * response)463 Status ProfilerService::StartSession(ServerContext* context,
464 const ::StartSessionRequest* request,
465 ::StartSessionResponse* response)
466 {
467 CHECK_REQUEST_RESPONSE(context, request, response);
468
469 uint32_t sessionId = request->session_id();
470 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u start", request->request_id(), sessionId);
471
472 // copy plugin configs from request
473 std::vector<ProfilerPluginConfig> newPluginConfigs;
474 newPluginConfigs.reserve(request->update_configs_size());
475 for (int i = 0; i < request->update_configs_size(); i++) {
476 PROFILER_LOG_INFO(LOG_CORE, "update_configs %d, name = %s", i, request->update_configs(i).name().c_str());
477 newPluginConfigs.push_back(request->update_configs(i));
478 }
479
480 // get plugin names in request
481 std::vector<std::string> requestNames;
482 std::transform(newPluginConfigs.begin(), newPluginConfigs.end(), std::back_inserter(requestNames),
483 [](auto& config) { return config.name(); });
484 std::sort(requestNames.begin(), requestNames.end());
485
486 auto ctx = GetSessionContext(sessionId);
487 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
488
489 // start epoll thread to receive shared memory messages.
490 CHECK_EXPRESSION_TRUE(pluginService_->StartEpollThread(), "start epoll thread failed!");
491
492 // get intersection set of requestNames and pluginNames
493 std::vector<std::string> updateNames;
494 std::set_intersection(requestNames.begin(), requestNames.end(), ctx->pluginNames.begin(), ctx->pluginNames.end(),
495 std::back_inserter(updateNames));
496
497 if (updateNames.size() > 0) {
498 // remove old plugin sessions
499 pluginSessionManager_->RemovePluginSessions(updateNames);
500
501 // update plugin configs
502 size_t updates = ctx->UpdatePluginConfigs(newPluginConfigs);
503
504 // re-create plugin sessions
505 CHECK_EXPRESSION_TRUE(ctx->CreatePluginSessions(), "refresh sessions failed!");
506 PROFILER_LOG_INFO(LOG_CORE, "StartSession %zu plugin config updated!", updates);
507 }
508
509 // start plugin sessions with configs
510 CHECK_EXPRESSION_TRUE(ctx->StartPluginSessions(), "start plugin sessions failed!");
511 PROFILER_LOG_INFO(LOG_CORE, "StartSession %d %u done!", request->request_id(), sessionId);
512 return Status::OK;
513 }
514
FetchData(ServerContext * context,const::FetchDataRequest * request,ServerWriter<::FetchDataResponse> * writer)515 Status ProfilerService::FetchData(ServerContext* context,
516 const ::FetchDataRequest* request,
517 ServerWriter<::FetchDataResponse>* writer)
518 {
519 CHECK_POINTER_NOTNULL(context, "context ptr invalid!");
520 CHECK_POINTER_NOTNULL(request, "request ptr invalid!");
521 CHECK_POINTER_NOTNULL(writer, "writer ptr invalid!");
522
523 CHECK_POINTER_NOTNULL(request, "request invalid!");
524 CHECK_POINTER_NOTNULL(writer, "writer invalid!");
525
526 uint32_t sessionId = request->session_id();
527 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u start", request->request_id(), sessionId);
528
529 auto ctx = GetSessionContext(sessionId);
530 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
531
532 // check each plugin session states
533 CHECK_EXPRESSION_TRUE(pluginSessionManager_->CheckStatus(ctx->pluginNames, PluginSession::STARTED),
534 "session status invalid!");
535
536 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::ONLINE) {
537 auto dataRepeater = ctx->dataRepeater;
538 CHECK_POINTER_NOTNULL(dataRepeater, "repeater invalid!");
539
540 while (1) {
541 ctx = GetSessionContext(sessionId);
542 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
543
544 FetchDataResponse response;
545 response.set_status(StatusCode::OK);
546 response.set_response_id(++responseIdCounter_);
547
548 std::vector<ProfilerPluginDataPtr> pluginDataVec;
549 int count = dataRepeater->TakePluginData(pluginDataVec);
550 if (count > 0) {
551 response.set_has_more(true);
552 for (int i = 0; i < count; i++) {
553 auto data = response.add_plugin_data();
554 CHECK_POINTER_NOTNULL(data, "new plugin data invalid");
555 CHECK_POINTER_NOTNULL(pluginDataVec[i], "plugin data invalid");
556 *data = *pluginDataVec[i];
557 }
558 } else {
559 response.set_has_more(false);
560 PROFILER_LOG_INFO(LOG_CORE, "no more data need to fill to response!");
561 }
562
563 bool sendSuccess = writer->Write(response);
564 if (count <= 0 || !sendSuccess) {
565 PROFILER_LOG_INFO(LOG_CORE, "count = %d, sendSuccess = %d", count, sendSuccess);
566 break;
567 }
568 }
569 }
570
571 PROFILER_LOG_INFO(LOG_CORE, "FetchData %d %u done!", request->request_id(), sessionId);
572 return Status::OK;
573 }
574
StopSession(ServerContext * context,const::StopSessionRequest * request,::StopSessionResponse * response)575 Status ProfilerService::StopSession(ServerContext* context,
576 const ::StopSessionRequest* request,
577 ::StopSessionResponse* response)
578 {
579 CHECK_REQUEST_RESPONSE(context, request, response);
580 uint32_t sessionId = request->session_id();
581 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u start", request->request_id(), sessionId);
582
583 auto ctx = GetSessionContext(sessionId);
584 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
585 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
586 CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
587 ctx->traceFileWriter.get()->SetStopSplitFile(true);
588 }
589 CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
590 PROFILER_LOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
591 return Status::OK;
592 }
593
DestroySession(ServerContext * context,const::DestroySessionRequest * request,::DestroySessionResponse * response)594 Status ProfilerService::DestroySession(ServerContext* context,
595 const ::DestroySessionRequest* request,
596 ::DestroySessionResponse* response)
597 {
598 CHECK_REQUEST_RESPONSE(context, request, response);
599
600 uint32_t sessionId = request->session_id();
601 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u start", request->request_id(), sessionId);
602
603 auto ctx = GetSessionContext(sessionId);
604 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
605
606 CHECK_EXPRESSION_TRUE(RemoveSessionContext(sessionId), "remove session FAILED!");
607 CHECK_EXPRESSION_TRUE(pluginSessionManager_->RemovePluginSessions(ctx->pluginNames),
608 "remove plugin session FAILED!");
609 PROFILER_LOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
610
611 if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
612 uint32_t pluginId = 0;
613 PluginContextPtr pluginCtx = nullptr;
614 for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
615 auto pluginName = ctx->pluginNames[i];
616 std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
617 if (pluginCtx->isStandaloneFileData == true) {
618 if (!ctx->sessionConfig.split_file()) {
619 MergeStandaloneFile(ctx->sessionConfig.result_file(), pluginName, pluginCtx->outFileName,
620 pluginCtx->pluginVersion);
621 }
622 }
623 }
624 }
625
626 return Status::OK;
627 }
628
KeepSession(::grpc::ServerContext * context,const::KeepSessionRequest * request,::KeepSessionResponse * response)629 ::grpc::Status ProfilerService::KeepSession(::grpc::ServerContext* context,
630 const ::KeepSessionRequest* request,
631 ::KeepSessionResponse* response)
632 {
633 CHECK_REQUEST_RESPONSE(context, request, response);
634 uint32_t sessionId = request->session_id();
635 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u start", request->request_id(), sessionId);
636
637 auto ctx = GetSessionContext(sessionId);
638 CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
639
640 // update keep alive time if keep_alive_time parameter provided
641 auto keepAliveTime = request->keep_alive_time();
642 if (keepAliveTime) {
643 CHECK_EXPRESSION_TRUE(IsValidKeepAliveTime(keepAliveTime), "keep_alive_time invalid!");
644 ctx->SetKeepAliveTime(keepAliveTime);
645 }
646
647 // reschedule session timeout task
648 if (ctx->sessionConfig.keep_alive_time() > 0) {
649 ctx->StopSessionExpireTask(removeTask_);
650 ctx->StartSessionExpireTask(removeTask_);
651 }
652 PROFILER_LOG_INFO(LOG_CORE, "KeepSession %d %u done!", request->request_id(), sessionId);
653 return Status::OK;
654 }
655
656 struct LoggingInterceptor : public grpc::experimental::Interceptor {
657 public:
LoggingInterceptorLoggingInterceptor658 explicit LoggingInterceptor(grpc::experimental::ServerRpcInfo* info) : info_(info) {}
659
InterceptLoggingInterceptor660 void Intercept(experimental::InterceptorBatchMethods* methods) override
661 {
662 const char* method = info_->method();
663 if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_SEND_MESSAGE)) {
664 PROFILER_LOG_DEBUG(LOG_CORE, "POST_SEND_MESSAGE method: %s", method);
665 } else if (methods->QueryInterceptionHookPoint(experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
666 PROFILER_LOG_DEBUG(LOG_CORE, "POST_RECV_MESSAGE method: %s", method);
667 }
668 methods->Proceed();
669 }
670
671 private:
672 grpc::experimental::ServerRpcInfo* info_ = nullptr;
673 };
674
675 struct InterceptorFactory : public grpc::experimental::ServerInterceptorFactoryInterface {
676 protected:
CreateServerInterceptorInterceptorFactory677 grpc::experimental::Interceptor* CreateServerInterceptor(grpc::experimental::ServerRpcInfo* info) override
678 {
679 return new LoggingInterceptor(info);
680 }
681 };
682
StartService(const std::string & listenUri)683 bool ProfilerService::StartService(const std::string& listenUri)
684 {
685 CHECK_TRUE(!listenUri.empty(), false, "listenUri empty!");
686
687 ServerBuilder builder;
688 builder.AddListeningPort(listenUri, grpc::InsecureServerCredentials());
689 builder.RegisterService(this);
690
691 auto server = builder.BuildAndStart();
692 CHECK_NOTNULL(server, false, "start service failed!");
693 PROFILER_LOG_INFO(LOG_CORE, "Service started successfully.");
694 server_ = std::move(server);
695 return true;
696 }
697
WaitServiceDone()698 void ProfilerService::WaitServiceDone()
699 {
700 if (server_) {
701 PROFILER_LOG_INFO(LOG_CORE, "waiting Server...");
702 server_->Wait();
703 PROFILER_LOG_INFO(LOG_CORE, "Server done!");
704 }
705 }
706
StopService()707 void ProfilerService::StopService()
708 {
709 if (server_) {
710 server_->Shutdown();
711 PROFILER_LOG_INFO(LOG_CORE, "Server stop done!");
712 }
713 }