1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 package ohos.devtools.datasources.utils.datahandler.datapoller; 17 18 import io.grpc.StatusRuntimeException; 19 import io.grpc.stub.StreamObserver; 20 import ohos.devtools.datasources.transport.grpc.HiProfilerClient; 21 import ohos.devtools.datasources.transport.grpc.ProfilerServiceHelper; 22 import ohos.devtools.datasources.transport.grpc.service.CommonTypes; 23 import ohos.devtools.datasources.transport.grpc.service.ProfilerServiceTypes; 24 import ohos.devtools.datasources.utils.common.util.CommonUtil; 25 import ohos.devtools.datasources.utils.common.util.DateTimeUtil; 26 import ohos.devtools.datasources.utils.device.entity.DeviceIPPortInfo; 27 import ohos.devtools.datasources.utils.plugin.entity.PluginConf; 28 import ohos.devtools.datasources.utils.plugin.service.PlugManager; 29 import ohos.devtools.datasources.utils.profilerlog.ProfilerLogManager; 30 import ohos.devtools.datasources.utils.session.service.SessionManager; 31 import org.apache.logging.log4j.LogManager; 32 import org.apache.logging.log4j.Logger; 33 34 import java.lang.reflect.InvocationTargetException; 35 import java.util.ArrayList; 36 import java.util.HashMap; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.Objects; 40 import java.util.Queue; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.LinkedBlockingQueue; 43 import java.util.concurrent.SynchronousQueue; 44 import java.util.concurrent.ThreadPoolExecutor; 45 import java.util.concurrent.TimeUnit; 46 47 import static ohos.devtools.datasources.utils.common.Constant.MEMORY_PLUG; 48 49 /** 50 * DataPoller utilities class 51 * 52 * @since 2021/5/19 16:39 53 */ 54 public class DataPoller extends Thread { 55 private static final Logger LOGGER = LogManager.getLogger(DataPoller.class); 56 private long localSessionId; 57 private int sessionId; 58 private DeviceIPPortInfo deviceIPPortInfo; 59 private boolean stopFlag = false; 60 private boolean startRefresh = false; 61 private ExecutorService executorService = 62 new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 63 private Map<String, Queue> queueMap = new HashMap<>(); 64 private List<AbsDataConsumer> consumers = new ArrayList<>(); 65 private Integer count = 0; 66 67 /** 68 * Data Poller 69 * 70 * @param localSessionId local SessionId 71 * @param sessionId session Id 72 * @param device device 73 */ DataPoller(Long localSessionId, int sessionId, DeviceIPPortInfo device)74 public DataPoller(Long localSessionId, int sessionId, DeviceIPPortInfo device) { 75 if (ProfilerLogManager.isInfoEnabled()) { 76 LOGGER.info("DataPoller"); 77 } 78 super.setName("DataPoller"); 79 this.localSessionId = localSessionId; 80 this.sessionId = sessionId; 81 this.deviceIPPortInfo = device; 82 init(); 83 } 84 init()85 private void init() { 86 List<PluginConf> items = PlugManager.getInstance().getProfilerPlugConfig(localSessionId); 87 for (PluginConf conf : items) { 88 Class<? extends AbsDataConsumer> consumerClass = conf.getConsumerClass(); 89 if (Objects.isNull(consumerClass)) { 90 continue; 91 } 92 AbsDataConsumer absDataConsumer = null; 93 try { 94 absDataConsumer = consumerClass.getConstructor().newInstance(); 95 LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(); 96 queueMap.put(conf.getPluginDataName(), linkedBlockingQueue); 97 absDataConsumer.init(linkedBlockingQueue, sessionId, localSessionId); 98 executorService.execute(absDataConsumer); 99 consumers.add(absDataConsumer); 100 if (sessionId == Integer.MAX_VALUE) { 101 if (!startRefresh) { 102 long timeStamp = DateTimeUtil.getNowTimeLong(); 103 SessionManager.getInstance().stopLoadingView(localSessionId, timeStamp); 104 startRefresh = true; 105 } 106 } 107 } catch (InstantiationException | IllegalAccessException | InvocationTargetException 108 | NoSuchMethodException exception) { 109 LOGGER.error("start Poll init has Exception {}", exception.getMessage()); 110 } 111 } 112 } 113 startPoll()114 private void startPoll() { 115 if (ProfilerLogManager.isInfoEnabled()) { 116 LOGGER.info("startPoll"); 117 } 118 ProfilerServiceTypes.FetchDataRequest request = 119 ProfilerServiceHelper.fetchDataRequest(CommonUtil.getRequestId(), sessionId, null); 120 if (ProfilerLogManager.isInfoEnabled()) { 121 LOGGER.info("start Poller fetchData01, {}", DateTimeUtil.getNowTimeLong()); 122 } 123 HiProfilerClient.getInstance() 124 .getProfilerClient(deviceIPPortInfo.getIp(), deviceIPPortInfo.getForwardPort()).getProfilerServiceStub() 125 .withMaxInboundMessageSize(Integer.MAX_VALUE).withMaxOutboundMessageSize(Integer.MAX_VALUE) 126 .fetchData(request, new StreamObserver<ProfilerServiceTypes.FetchDataResponse>() { 127 @Override 128 public void onNext(ProfilerServiceTypes.FetchDataResponse fetchDataResponse) { 129 count = 0; 130 List<CommonTypes.ProfilerPluginData> lists = fetchDataResponse.getPluginDataList(); 131 buildPluginData(lists); 132 } 133 134 @Override 135 public void onError(Throwable throwable) { 136 setErrorPoll(throwable); 137 } 138 139 @Override 140 public void onCompleted() { 141 shutDown(); 142 } 143 }); 144 } 145 setErrorPoll(Throwable throwable)146 private void setErrorPoll(Throwable throwable) { 147 LOGGER.error("start Poll has Exception", throwable); 148 if (throwable.getMessage().contains("session_id invalid!")) { 149 SessionManager.getInstance().deleteLocalSession(localSessionId); 150 } else { 151 LOGGER.info("restart poller session id {}", sessionId); 152 try { 153 TimeUnit.MILLISECONDS.sleep(500); 154 } catch (InterruptedException interruptedException) { 155 LOGGER.error("restart poller InterruptedException session id {}", sessionId); 156 } 157 count++; 158 if (count < 3) { 159 HiProfilerClient.getInstance() 160 .destroyProfiler(deviceIPPortInfo.getIp(), deviceIPPortInfo.getForwardPort()); 161 startPoll(); 162 } 163 } 164 } 165 buildPluginData(List<CommonTypes.ProfilerPluginData> lists)166 private void buildPluginData(List<CommonTypes.ProfilerPluginData> lists) { 167 lists.parallelStream().forEach(pluginData -> { 168 if (ProfilerLogManager.isInfoEnabled()) { 169 LOGGER.info("handleData"); 170 } 171 if (pluginData.getStatus() != 0) { 172 return; 173 } 174 String name = pluginData.getName(); 175 if (name.equals(MEMORY_PLUG)) { 176 if (ProfilerLogManager.isInfoEnabled()) { 177 LOGGER.info("get Memory Date, time is {}", DateTimeUtil.getNowTimeLong()); 178 } 179 } 180 Queue queue = queueMap.get(name); 181 if (Objects.nonNull(queue)) { 182 queue.offer(pluginData); 183 } 184 if (!startRefresh) { 185 long timeStamp = (pluginData.getTvSec() * 1000000000L + pluginData.getTvNsec()) / 1000000; 186 SessionManager.getInstance().stopLoadingView(localSessionId, timeStamp); 187 startRefresh = true; 188 } 189 }); 190 } 191 192 /** 193 * shutDown 194 */ shutDown()195 public void shutDown() { 196 if (ProfilerLogManager.isInfoEnabled()) { 197 LOGGER.info("shutDown"); 198 } 199 consumers.forEach(absDataConsumer -> absDataConsumer.shutDown()); 200 executorService.shutdown(); 201 stopFlag = true; 202 } 203 204 /** 205 * run 206 */ 207 @Override run()208 public void run() { 209 try { 210 startPoll(); 211 } catch (StatusRuntimeException exception) { 212 if (ProfilerLogManager.isErrorEnabled()) { 213 LOGGER.error("exception error{}", exception.getMessage()); 214 } 215 } 216 } 217 } 218