• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package ohos.devtools.datasources.utils.datahandler.datapoller;
17 
18 import io.grpc.StatusRuntimeException;
19 import io.grpc.stub.StreamObserver;
20 import ohos.devtools.datasources.transport.grpc.HiProfilerClient;
21 import ohos.devtools.datasources.transport.grpc.ProfilerServiceHelper;
22 import ohos.devtools.datasources.transport.grpc.service.CommonTypes;
23 import ohos.devtools.datasources.transport.grpc.service.ProfilerServiceTypes;
24 import ohos.devtools.datasources.utils.common.util.CommonUtil;
25 import ohos.devtools.datasources.utils.common.util.DateTimeUtil;
26 import ohos.devtools.datasources.utils.device.entity.DeviceIPPortInfo;
27 import ohos.devtools.datasources.utils.plugin.entity.PluginConf;
28 import ohos.devtools.datasources.utils.plugin.service.PlugManager;
29 import ohos.devtools.datasources.utils.profilerlog.ProfilerLogManager;
30 import ohos.devtools.datasources.utils.session.service.SessionManager;
31 import org.apache.logging.log4j.LogManager;
32 import org.apache.logging.log4j.Logger;
33 
34 import java.lang.reflect.InvocationTargetException;
35 import java.util.ArrayList;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Objects;
40 import java.util.Queue;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.SynchronousQueue;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 
47 import static ohos.devtools.datasources.utils.common.Constant.MEMORY_PLUG;
48 
49 /**
50  * DataPoller utilities class
51  *
52  * @since 2021/5/19 16:39
53  */
54 public class DataPoller extends Thread {
55     private static final Logger LOGGER = LogManager.getLogger(DataPoller.class);
56     private long localSessionId;
57     private int sessionId;
58     private DeviceIPPortInfo deviceIPPortInfo;
59     private boolean stopFlag = false;
60     private boolean startRefresh = false;
61     private ExecutorService executorService =
62         new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
63     private Map<String, Queue> queueMap = new HashMap<>();
64     private List<AbsDataConsumer> consumers = new ArrayList<>();
65     private Integer count = 0;
66 
67     /**
68      * Data Poller
69      *
70      * @param localSessionId local SessionId
71      * @param sessionId session Id
72      * @param device device
73      */
DataPoller(Long localSessionId, int sessionId, DeviceIPPortInfo device)74     public DataPoller(Long localSessionId, int sessionId, DeviceIPPortInfo device) {
75         if (ProfilerLogManager.isInfoEnabled()) {
76             LOGGER.info("DataPoller");
77         }
78         super.setName("DataPoller");
79         this.localSessionId = localSessionId;
80         this.sessionId = sessionId;
81         this.deviceIPPortInfo = device;
82         init();
83     }
84 
init()85     private void init() {
86         List<PluginConf> items = PlugManager.getInstance().getProfilerPlugConfig(localSessionId);
87         for (PluginConf conf : items) {
88             Class<? extends AbsDataConsumer> consumerClass = conf.getConsumerClass();
89             if (Objects.isNull(consumerClass)) {
90                 continue;
91             }
92             AbsDataConsumer absDataConsumer = null;
93             try {
94                 absDataConsumer = consumerClass.getConstructor().newInstance();
95                 LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
96                 queueMap.put(conf.getPluginDataName(), linkedBlockingQueue);
97                 absDataConsumer.init(linkedBlockingQueue, sessionId, localSessionId);
98                 executorService.execute(absDataConsumer);
99                 consumers.add(absDataConsumer);
100                 if (sessionId == Integer.MAX_VALUE) {
101                     if (!startRefresh) {
102                         long timeStamp = DateTimeUtil.getNowTimeLong();
103                         SessionManager.getInstance().stopLoadingView(localSessionId, timeStamp);
104                         startRefresh = true;
105                     }
106                 }
107             } catch (InstantiationException | IllegalAccessException | InvocationTargetException
108                 | NoSuchMethodException exception) {
109                 LOGGER.error("start Poll init has Exception {}", exception.getMessage());
110             }
111         }
112     }
113 
startPoll()114     private void startPoll() {
115         if (ProfilerLogManager.isInfoEnabled()) {
116             LOGGER.info("startPoll");
117         }
118         ProfilerServiceTypes.FetchDataRequest request =
119             ProfilerServiceHelper.fetchDataRequest(CommonUtil.getRequestId(), sessionId, null);
120         if (ProfilerLogManager.isInfoEnabled()) {
121             LOGGER.info("start Poller fetchData01, {}", DateTimeUtil.getNowTimeLong());
122         }
123         HiProfilerClient.getInstance()
124             .getProfilerClient(deviceIPPortInfo.getIp(), deviceIPPortInfo.getForwardPort()).getProfilerServiceStub()
125             .withMaxInboundMessageSize(Integer.MAX_VALUE).withMaxOutboundMessageSize(Integer.MAX_VALUE)
126             .fetchData(request, new StreamObserver<ProfilerServiceTypes.FetchDataResponse>() {
127                 @Override
128                 public void onNext(ProfilerServiceTypes.FetchDataResponse fetchDataResponse) {
129                     count = 0;
130                     List<CommonTypes.ProfilerPluginData> lists = fetchDataResponse.getPluginDataList();
131                     buildPluginData(lists);
132                 }
133 
134                 @Override
135                 public void onError(Throwable throwable) {
136                     setErrorPoll(throwable);
137                 }
138 
139                 @Override
140                 public void onCompleted() {
141                     shutDown();
142                 }
143             });
144     }
145 
setErrorPoll(Throwable throwable)146     private void setErrorPoll(Throwable throwable) {
147         LOGGER.error("start Poll has Exception", throwable);
148         if (throwable.getMessage().contains("session_id invalid!")) {
149             SessionManager.getInstance().deleteLocalSession(localSessionId);
150         } else {
151             LOGGER.info("restart poller session id {}", sessionId);
152             try {
153                 TimeUnit.MILLISECONDS.sleep(500);
154             } catch (InterruptedException interruptedException) {
155                 LOGGER.error("restart poller InterruptedException session id {}", sessionId);
156             }
157             count++;
158             if (count < 3) {
159                 HiProfilerClient.getInstance()
160                     .destroyProfiler(deviceIPPortInfo.getIp(), deviceIPPortInfo.getForwardPort());
161                 startPoll();
162             }
163         }
164     }
165 
buildPluginData(List<CommonTypes.ProfilerPluginData> lists)166     private void buildPluginData(List<CommonTypes.ProfilerPluginData> lists) {
167         lists.parallelStream().forEach(pluginData -> {
168             if (ProfilerLogManager.isInfoEnabled()) {
169                 LOGGER.info("handleData");
170             }
171             if (pluginData.getStatus() != 0) {
172                 return;
173             }
174             String name = pluginData.getName();
175             if (name.equals(MEMORY_PLUG)) {
176                 if (ProfilerLogManager.isInfoEnabled()) {
177                     LOGGER.info("get Memory Date, time is {}", DateTimeUtil.getNowTimeLong());
178                 }
179             }
180             Queue queue = queueMap.get(name);
181             if (Objects.nonNull(queue)) {
182                 queue.offer(pluginData);
183             }
184             if (!startRefresh) {
185                 long timeStamp = (pluginData.getTvSec() * 1000000000L + pluginData.getTvNsec()) / 1000000;
186                 SessionManager.getInstance().stopLoadingView(localSessionId, timeStamp);
187                 startRefresh = true;
188             }
189         });
190     }
191 
192     /**
193      * shutDown
194      */
shutDown()195     public void shutDown() {
196         if (ProfilerLogManager.isInfoEnabled()) {
197             LOGGER.info("shutDown");
198         }
199         consumers.forEach(absDataConsumer -> absDataConsumer.shutDown());
200         executorService.shutdown();
201         stopFlag = true;
202     }
203 
204     /**
205      * run
206      */
207     @Override
run()208     public void run() {
209         try {
210             startPoll();
211         } catch (StatusRuntimeException exception) {
212             if (ProfilerLogManager.isErrorEnabled()) {
213                 LOGGER.error("exception error{}", exception.getMessage());
214             }
215         }
216     }
217 }
218