1 /* 2 * Copyright 2022 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static io.grpc.testing.integration.AbstractInteropTest.ORCA_OOB_REPORT_KEY; 20 import static io.grpc.testing.integration.AbstractInteropTest.ORCA_RPC_REPORT_KEY; 21 22 import io.grpc.ConnectivityState; 23 import io.grpc.LoadBalancer; 24 import io.grpc.LoadBalancerProvider; 25 import io.grpc.LoadBalancerRegistry; 26 import io.grpc.services.MetricReport; 27 import io.grpc.testing.integration.Messages.TestOrcaReport; 28 import io.grpc.util.ForwardingLoadBalancer; 29 import io.grpc.util.ForwardingLoadBalancerHelper; 30 import io.grpc.xds.orca.OrcaOobUtil; 31 import io.grpc.xds.orca.OrcaPerRequestUtil; 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.atomic.AtomicReference; 34 35 /** 36 * Implements a test LB policy that receives ORCA load reports. 37 */ 38 final class CustomBackendMetricsLoadBalancerProvider extends LoadBalancerProvider { 39 40 static final String TEST_ORCA_LB_POLICY_NAME = "test_backend_metrics_load_balancer"; 41 private volatile TestOrcaReport latestOobReport; 42 43 @Override newLoadBalancer(LoadBalancer.Helper helper)44 public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { 45 return new CustomBackendMetricsLoadBalancer(helper); 46 } 47 48 @Override isAvailable()49 public boolean isAvailable() { 50 return true; 51 } 52 53 @Override getPriority()54 public int getPriority() { 55 return 0; 56 } 57 58 @Override getPolicyName()59 public String getPolicyName() { 60 return TEST_ORCA_LB_POLICY_NAME; 61 } 62 63 private final class CustomBackendMetricsLoadBalancer extends ForwardingLoadBalancer { 64 private LoadBalancer delegate; 65 CustomBackendMetricsLoadBalancer(Helper helper)66 public CustomBackendMetricsLoadBalancer(Helper helper) { 67 this.delegate = LoadBalancerRegistry.getDefaultRegistry() 68 .getProvider("pick_first") 69 .newLoadBalancer(new CustomBackendMetricsLoadBalancerHelper(helper)); 70 } 71 72 @Override delegate()73 public LoadBalancer delegate() { 74 return delegate; 75 } 76 77 private final class CustomBackendMetricsLoadBalancerHelper 78 extends ForwardingLoadBalancerHelper { 79 private final Helper orcaHelper; 80 CustomBackendMetricsLoadBalancerHelper(Helper helper)81 public CustomBackendMetricsLoadBalancerHelper(Helper helper) { 82 this.orcaHelper = OrcaOobUtil.newOrcaReportingHelper(helper); 83 } 84 85 @Override createSubchannel(CreateSubchannelArgs args)86 public Subchannel createSubchannel(CreateSubchannelArgs args) { 87 Subchannel subchannel = super.createSubchannel(args); 88 OrcaOobUtil.setListener(subchannel, new OrcaOobUtil.OrcaOobReportListener() { 89 @Override 90 public void onLoadReport(MetricReport orcaLoadReport) { 91 latestOobReport = fromCallMetricReport(orcaLoadReport); 92 } 93 }, 94 OrcaOobUtil.OrcaReportingConfig.newBuilder() 95 .setReportInterval(1, TimeUnit.SECONDS) 96 .build() 97 ); 98 return subchannel; 99 } 100 101 @Override updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker)102 public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { 103 delegate().updateBalancingState(newState, new MayReportLoadPicker(newPicker)); 104 } 105 106 @Override delegate()107 public Helper delegate() { 108 return orcaHelper; 109 } 110 } 111 112 private final class MayReportLoadPicker extends SubchannelPicker { 113 private SubchannelPicker delegate; 114 MayReportLoadPicker(SubchannelPicker delegate)115 public MayReportLoadPicker(SubchannelPicker delegate) { 116 this.delegate = delegate; 117 } 118 119 @Override pickSubchannel(PickSubchannelArgs args)120 public PickResult pickSubchannel(PickSubchannelArgs args) { 121 PickResult result = delegate.pickSubchannel(args); 122 if (result.getSubchannel() == null) { 123 return result; 124 } 125 AtomicReference<TestOrcaReport> reportRef = 126 args.getCallOptions().getOption(ORCA_OOB_REPORT_KEY); 127 if (reportRef != null) { 128 reportRef.set(latestOobReport); 129 } 130 131 return PickResult.withSubchannel( 132 result.getSubchannel(), 133 OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory( 134 new OrcaPerRequestUtil.OrcaPerRequestReportListener() { 135 @Override 136 public void onLoadReport(MetricReport callMetricReport) { 137 AtomicReference<TestOrcaReport> reportRef = 138 args.getCallOptions().getOption(ORCA_RPC_REPORT_KEY); 139 if (reportRef != null) { 140 reportRef.set(fromCallMetricReport(callMetricReport)); 141 } 142 } 143 })); 144 } 145 } 146 } 147 148 private static TestOrcaReport fromCallMetricReport(MetricReport callMetricReport) { 149 return TestOrcaReport.newBuilder() 150 .setCpuUtilization(callMetricReport.getCpuUtilization()) 151 .setMemoryUtilization(callMetricReport.getMemoryUtilization()) 152 .putAllRequestCost(callMetricReport.getRequestCostMetrics()) 153 .putAllUtilization(callMetricReport.getUtilizationMetrics()) 154 .build(); 155 } 156 } 157