1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.services.eventbridge; 17 18 import static org.assertj.core.api.Assertions.assertThat; 19 import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; 20 21 import java.io.UncheckedIOException; 22 import java.net.InetAddress; 23 import java.net.URI; 24 import java.time.Duration; 25 import org.junit.jupiter.api.BeforeEach; 26 import org.junit.jupiter.api.Test; 27 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; 28 import software.amazon.awssdk.core.interceptor.Context; 29 import software.amazon.awssdk.core.interceptor.ExecutionAttributes; 30 import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; 31 import software.amazon.awssdk.http.SdkHttpRequest; 32 import software.amazon.awssdk.regions.Region; 33 import software.amazon.awssdk.services.eventbridge.model.DescribeEndpointResponse; 34 import software.amazon.awssdk.services.eventbridge.model.EndpointState; 35 import software.amazon.awssdk.services.eventbridge.model.PutEventsResponse; 36 import software.amazon.awssdk.services.eventbridge.model.ReplicationState; 37 import software.amazon.awssdk.services.eventbridge.model.ResourceAlreadyExistsException; 38 import software.amazon.awssdk.services.route53.Route53Client; 39 import software.amazon.awssdk.services.route53.model.CreateHealthCheckResponse; 40 import software.amazon.awssdk.services.route53.model.HealthCheckType; 41 import software.amazon.awssdk.testutils.Waiter; 42 import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase; 43 import software.amazon.awssdk.utils.Logger; 44 45 class EventBridgeMultiRegionEndpointIntegrationTest extends AwsIntegrationTestBase { 46 private static final Logger log = Logger.loggerFor(EventBridgeMultiRegionEndpointIntegrationTest.class); 47 private static final String RESOURCE_PREFIX = "java-sdk-integ-test-eb-mrep-"; 48 private static final Region PRIMARY_REGION = Region.US_EAST_1; 49 private static final Region FAILOVER_REGION = Region.US_WEST_2; 50 51 private CapturingExecutionInterceptor interceptor; 52 private Route53Client route53; 53 private EventBridgeClient primaryEventBridge; 54 private EventBridgeClient failoverEventBridge; 55 56 @BeforeEach setup()57 public void setup() { 58 interceptor = new CapturingExecutionInterceptor(); 59 route53 = Route53Client.builder() 60 .credentialsProvider(getCredentialsProvider()) 61 .region(Region.AWS_GLOBAL) 62 .build(); 63 primaryEventBridge = createEventBridgeClient(PRIMARY_REGION); 64 failoverEventBridge = createEventBridgeClient(FAILOVER_REGION); 65 } 66 createEventBridgeClient(Region region)67 private EventBridgeClient createEventBridgeClient(Region region) { 68 return EventBridgeClient.builder() 69 .region(region) 70 .credentialsProvider(getCredentialsProvider()) 71 .overrideConfiguration(ClientOverrideConfiguration 72 .builder() 73 .addExecutionInterceptor(interceptor) 74 .build()) 75 .build(); 76 } 77 78 @Test testPutEventsToMultiRegionEndpoint()79 void testPutEventsToMultiRegionEndpoint() { 80 String endpointId = prepareEndpoint(); 81 PutEventsResponse putEventsResponse = putEvents(endpointId); 82 // Assert the endpointId parameter was used to infer the request endpoint 83 assertThat(lastRequest().host()).isEqualTo(endpointId + ".endpoint.events.amazonaws.com"); 84 // Assert the request was signed with SigV4a 85 assertThat(lastRequest().firstMatchingHeader("Authorization").get()).contains("AWS4-ECDSA-P256-SHA256"); 86 assertThat(lastRequest().firstMatchingHeader("X-Amz-Region-Set")).hasValue("*"); 87 // Assert the request succeeded normally 88 if (putEventsResponse.failedEntryCount() != 0) { 89 throw new AssertionError("Failed to put events: " + putEventsResponse); 90 } 91 } 92 93 /** 94 * Execute all the steps needed to prepare a global endpoint, creating the relevant resources if needed. 95 */ prepareEndpoint()96 private String prepareEndpoint() { 97 String healthCheckId = getOrCreateHealthCheck(); 98 log.info(() -> "healthCheckId: " + healthCheckId); 99 100 String primaryEventBusArn = getOrCreateEventBus(primaryEventBridge); 101 log.info(() -> "primaryEventBusArn: " + primaryEventBusArn); 102 103 String failoverEventBusArn = getOrCreateEventBus(failoverEventBridge); 104 log.info(() -> "failoverEventBusArn: " + failoverEventBusArn); 105 106 String endpointName = getOrCreateMultiRegionEndpoint(healthCheckId, primaryEventBusArn, failoverEventBusArn); 107 log.info(() -> "endpointName: " + endpointName); 108 109 String endpointId = getOrAwaitEndpointId(endpointName); 110 log.info(() -> "endpointId: " + endpointId); 111 assertThat(endpointId).isNotBlank(); 112 return endpointId; 113 } 114 115 /** 116 * Returns the Route 53 healthcheck ID used for testing, creating a healthcheck if needed. 117 * <p> 118 * The healthcheck is created as {@code DISABLED}, meaning it always reports as healthy. 119 * <p> 120 * (A healthcheck is required to create an EventBridge multi-region endpoint.) 121 */ getOrCreateHealthCheck()122 private String getOrCreateHealthCheck() { 123 URI primaryEndpoint = EventBridgeClient.serviceMetadata().endpointFor(PRIMARY_REGION); 124 CreateHealthCheckResponse createHealthCheckResponse = route53.createHealthCheck(r -> r 125 .callerReference(resourceName("monitor")) 126 .healthCheckConfig(hcc -> hcc 127 .type(HealthCheckType.TCP) 128 .port(443) 129 .fullyQualifiedDomainName(primaryEndpoint.toString()) 130 .disabled(true))); 131 return createHealthCheckResponse.healthCheck().id(); 132 } 133 134 /** 135 * Returns the event bus ARN used for testing, creating an event bus if needed. 136 * <p> 137 * (An event bus is required to create an EventBridge multi-region endpoint.) 138 */ getOrCreateEventBus(EventBridgeClient eventBridge)139 private String getOrCreateEventBus(EventBridgeClient eventBridge) { 140 String eventBusName = resourceName("eventBus"); 141 try { 142 return eventBridge.createEventBus(r -> r.name(eventBusName)) 143 .eventBusArn(); 144 } catch (ResourceAlreadyExistsException ignored) { 145 log.debug(() -> "Event bus " + eventBusName + " already exists"); 146 return eventBridge.describeEventBus(r -> r.name(eventBusName)) 147 .arn(); 148 } 149 } 150 151 /** 152 * Returns the name of the multi-region endpoint used for testing, creating an endpoint if needed. 153 */ getOrCreateMultiRegionEndpoint(String healthCheckId, String primaryEventBusArn, String failoverEventBusArn)154 private String getOrCreateMultiRegionEndpoint(String healthCheckId, String primaryEventBusArn, String failoverEventBusArn) { 155 String endpointName = resourceName("endpoint"); 156 try { 157 primaryEventBridge.createEndpoint(r -> r 158 .name(endpointName) 159 .description("Used for SDK Acceptance Testing") 160 .eventBuses(eb -> eb.eventBusArn(primaryEventBusArn), 161 eb -> eb.eventBusArn(failoverEventBusArn)) 162 .routingConfig(rc -> rc 163 .failoverConfig(fc -> fc 164 .primary(p -> p.healthCheck("arn:aws:route53:::healthcheck/" + healthCheckId)) 165 .secondary(s -> s.route(FAILOVER_REGION.id())))) 166 .replicationConfig(rc -> rc.state(ReplicationState.DISABLED))); 167 } catch (ResourceAlreadyExistsException ignored) { 168 log.debug(() -> "Endpoint " + endpointName + " already exists"); 169 } 170 return endpointName; 171 } 172 173 /** 174 * Returns the endpoint ID associated with the given endpoint name, waiting for the endpoint to finish creating if needed. 175 */ getOrAwaitEndpointId(String endpointName)176 private String getOrAwaitEndpointId(String endpointName) { 177 DescribeEndpointResponse response = 178 Waiter.run(() -> primaryEventBridge.describeEndpoint(r -> r.name(endpointName))) 179 .until(ep -> ep.state() != EndpointState.CREATING) 180 .orFailAfter(Duration.ofMinutes(2)); 181 assertThat(response.state()).isEqualTo(EndpointState.ACTIVE); 182 183 log.info(() -> "Endpoint ID is active. Waiting until we can resolve the endpoint. This will take a some time if the " 184 + "endpoint was just created."); 185 URI endpointUri = URI.create(response.endpointUrl()); 186 Waiter.run(() -> invokeSafely(() -> InetAddress.getByName(endpointUri.getHost()))) 187 .ignoringException(UncheckedIOException.class) 188 .orFailAfter(Duration.ofMinutes(10)); 189 190 return response.endpointId(); 191 } 192 193 /** 194 * Put test events to the given endpoint ID, which is expected to override the request's endpoint and to sign the request with 195 * SigV4a. 196 */ putEvents(String endpointId)197 private PutEventsResponse putEvents(String endpointId) { 198 return primaryEventBridge.putEvents(r -> r 199 .endpointId(endpointId) 200 .entries(e -> e 201 .eventBusName(resourceName("eventBus")) 202 .resources("resource1", "resource2") 203 .source("com.mycompany.myapp") 204 .detailType("myDetailType") 205 .detail("{ \"key1\": \"value1\", \"key2\": \"value2\" }"))); 206 } 207 208 /** 209 * Return a test-friendly name for a given named resource. 210 */ resourceName(String suffix)211 private String resourceName(String suffix) { 212 return RESOURCE_PREFIX + suffix; 213 } 214 215 /** 216 * Get the last request that was sent with the {@link EventBridgeClient}. 217 */ lastRequest()218 private SdkHttpRequest lastRequest() { 219 return interceptor.beforeTransmission; 220 } 221 222 /** 223 * Captures {@link SdkHttpRequest}s and saves them to then assert against. 224 */ 225 public static class CapturingExecutionInterceptor implements ExecutionInterceptor { 226 private SdkHttpRequest beforeTransmission; 227 228 @Override beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes)229 public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) { 230 this.beforeTransmission = context.httpRequest(); 231 } 232 } 233 } 234