• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.services.eventbridge;
17 
18 import static org.assertj.core.api.Assertions.assertThat;
19 import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
20 
21 import java.io.UncheckedIOException;
22 import java.net.InetAddress;
23 import java.net.URI;
24 import java.time.Duration;
25 import org.junit.jupiter.api.BeforeEach;
26 import org.junit.jupiter.api.Test;
27 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
28 import software.amazon.awssdk.core.interceptor.Context;
29 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
30 import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
31 import software.amazon.awssdk.http.SdkHttpRequest;
32 import software.amazon.awssdk.regions.Region;
33 import software.amazon.awssdk.services.eventbridge.model.DescribeEndpointResponse;
34 import software.amazon.awssdk.services.eventbridge.model.EndpointState;
35 import software.amazon.awssdk.services.eventbridge.model.PutEventsResponse;
36 import software.amazon.awssdk.services.eventbridge.model.ReplicationState;
37 import software.amazon.awssdk.services.eventbridge.model.ResourceAlreadyExistsException;
38 import software.amazon.awssdk.services.route53.Route53Client;
39 import software.amazon.awssdk.services.route53.model.CreateHealthCheckResponse;
40 import software.amazon.awssdk.services.route53.model.HealthCheckType;
41 import software.amazon.awssdk.testutils.Waiter;
42 import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase;
43 import software.amazon.awssdk.utils.Logger;
44 
45 class EventBridgeMultiRegionEndpointIntegrationTest extends AwsIntegrationTestBase {
46     private static final Logger log = Logger.loggerFor(EventBridgeMultiRegionEndpointIntegrationTest.class);
47     private static final String RESOURCE_PREFIX = "java-sdk-integ-test-eb-mrep-";
48     private static final Region PRIMARY_REGION = Region.US_EAST_1;
49     private static final Region FAILOVER_REGION = Region.US_WEST_2;
50 
51     private CapturingExecutionInterceptor interceptor;
52     private Route53Client route53;
53     private EventBridgeClient primaryEventBridge;
54     private EventBridgeClient failoverEventBridge;
55 
56     @BeforeEach
setup()57     public void setup() {
58         interceptor = new CapturingExecutionInterceptor();
59         route53 = Route53Client.builder()
60                                .credentialsProvider(getCredentialsProvider())
61                                .region(Region.AWS_GLOBAL)
62                                .build();
63         primaryEventBridge = createEventBridgeClient(PRIMARY_REGION);
64         failoverEventBridge = createEventBridgeClient(FAILOVER_REGION);
65     }
66 
createEventBridgeClient(Region region)67     private EventBridgeClient createEventBridgeClient(Region region) {
68         return EventBridgeClient.builder()
69                                 .region(region)
70                                 .credentialsProvider(getCredentialsProvider())
71                                 .overrideConfiguration(ClientOverrideConfiguration
72                                                            .builder()
73                                                            .addExecutionInterceptor(interceptor)
74                                                            .build())
75                                 .build();
76     }
77 
78     @Test
testPutEventsToMultiRegionEndpoint()79     void testPutEventsToMultiRegionEndpoint() {
80         String endpointId = prepareEndpoint();
81         PutEventsResponse putEventsResponse = putEvents(endpointId);
82         // Assert the endpointId parameter was used to infer the request endpoint
83         assertThat(lastRequest().host()).isEqualTo(endpointId + ".endpoint.events.amazonaws.com");
84         // Assert the request was signed with SigV4a
85         assertThat(lastRequest().firstMatchingHeader("Authorization").get()).contains("AWS4-ECDSA-P256-SHA256");
86         assertThat(lastRequest().firstMatchingHeader("X-Amz-Region-Set")).hasValue("*");
87         // Assert the request succeeded normally
88         if (putEventsResponse.failedEntryCount() != 0) {
89             throw new AssertionError("Failed to put events: " + putEventsResponse);
90         }
91     }
92 
93     /**
94      * Execute all the steps needed to prepare a global endpoint, creating the relevant resources if needed.
95      */
prepareEndpoint()96     private String prepareEndpoint() {
97         String healthCheckId = getOrCreateHealthCheck();
98         log.info(() -> "healthCheckId: " + healthCheckId);
99 
100         String primaryEventBusArn = getOrCreateEventBus(primaryEventBridge);
101         log.info(() -> "primaryEventBusArn: " + primaryEventBusArn);
102 
103         String failoverEventBusArn = getOrCreateEventBus(failoverEventBridge);
104         log.info(() -> "failoverEventBusArn: " + failoverEventBusArn);
105 
106         String endpointName = getOrCreateMultiRegionEndpoint(healthCheckId, primaryEventBusArn, failoverEventBusArn);
107         log.info(() -> "endpointName: " + endpointName);
108 
109         String endpointId = getOrAwaitEndpointId(endpointName);
110         log.info(() -> "endpointId: " + endpointId);
111         assertThat(endpointId).isNotBlank();
112         return endpointId;
113     }
114 
115     /**
116      * Returns the Route 53 healthcheck ID used for testing, creating a healthcheck if needed.
117      * <p>
118      * The healthcheck is created as {@code DISABLED}, meaning it always reports as healthy.
119      * <p>
120      * (A healthcheck is required to create an EventBridge multi-region endpoint.)
121      */
getOrCreateHealthCheck()122     private String getOrCreateHealthCheck() {
123         URI primaryEndpoint = EventBridgeClient.serviceMetadata().endpointFor(PRIMARY_REGION);
124         CreateHealthCheckResponse createHealthCheckResponse = route53.createHealthCheck(r -> r
125             .callerReference(resourceName("monitor"))
126             .healthCheckConfig(hcc -> hcc
127                 .type(HealthCheckType.TCP)
128                 .port(443)
129                 .fullyQualifiedDomainName(primaryEndpoint.toString())
130                 .disabled(true)));
131         return createHealthCheckResponse.healthCheck().id();
132     }
133 
134     /**
135      * Returns the event bus ARN used for testing, creating an event bus if needed.
136      * <p>
137      * (An event bus is required to create an EventBridge multi-region endpoint.)
138      */
getOrCreateEventBus(EventBridgeClient eventBridge)139     private String getOrCreateEventBus(EventBridgeClient eventBridge) {
140         String eventBusName = resourceName("eventBus");
141         try {
142             return eventBridge.createEventBus(r -> r.name(eventBusName))
143                               .eventBusArn();
144         } catch (ResourceAlreadyExistsException ignored) {
145             log.debug(() -> "Event bus " + eventBusName + " already exists");
146             return eventBridge.describeEventBus(r -> r.name(eventBusName))
147                               .arn();
148         }
149     }
150 
151     /**
152      * Returns the name of the multi-region endpoint used for testing, creating an endpoint if needed.
153      */
getOrCreateMultiRegionEndpoint(String healthCheckId, String primaryEventBusArn, String failoverEventBusArn)154     private String getOrCreateMultiRegionEndpoint(String healthCheckId, String primaryEventBusArn, String failoverEventBusArn) {
155         String endpointName = resourceName("endpoint");
156         try {
157             primaryEventBridge.createEndpoint(r -> r
158                 .name(endpointName)
159                 .description("Used for SDK Acceptance Testing")
160                 .eventBuses(eb -> eb.eventBusArn(primaryEventBusArn),
161                             eb -> eb.eventBusArn(failoverEventBusArn))
162                 .routingConfig(rc -> rc
163                     .failoverConfig(fc -> fc
164                         .primary(p -> p.healthCheck("arn:aws:route53:::healthcheck/" + healthCheckId))
165                         .secondary(s -> s.route(FAILOVER_REGION.id()))))
166                 .replicationConfig(rc -> rc.state(ReplicationState.DISABLED)));
167         } catch (ResourceAlreadyExistsException ignored) {
168             log.debug(() -> "Endpoint " + endpointName + " already exists");
169         }
170         return endpointName;
171     }
172 
173     /**
174      * Returns the endpoint ID associated with the given endpoint name, waiting for the endpoint to finish creating if needed.
175      */
getOrAwaitEndpointId(String endpointName)176     private String getOrAwaitEndpointId(String endpointName) {
177         DescribeEndpointResponse response =
178             Waiter.run(() -> primaryEventBridge.describeEndpoint(r -> r.name(endpointName)))
179                   .until(ep -> ep.state() != EndpointState.CREATING)
180                   .orFailAfter(Duration.ofMinutes(2));
181         assertThat(response.state()).isEqualTo(EndpointState.ACTIVE);
182 
183         log.info(() -> "Endpoint ID is active. Waiting until we can resolve the endpoint. This will take a some time if the "
184                        + "endpoint was just created.");
185         URI endpointUri = URI.create(response.endpointUrl());
186         Waiter.run(() -> invokeSafely(() -> InetAddress.getByName(endpointUri.getHost())))
187               .ignoringException(UncheckedIOException.class)
188               .orFailAfter(Duration.ofMinutes(10));
189 
190         return response.endpointId();
191     }
192 
193     /**
194      * Put test events to the given endpoint ID, which is expected to override the request's endpoint and to sign the request with
195      * SigV4a.
196      */
putEvents(String endpointId)197     private PutEventsResponse putEvents(String endpointId) {
198         return primaryEventBridge.putEvents(r -> r
199             .endpointId(endpointId)
200             .entries(e -> e
201                 .eventBusName(resourceName("eventBus"))
202                 .resources("resource1", "resource2")
203                 .source("com.mycompany.myapp")
204                 .detailType("myDetailType")
205                 .detail("{ \"key1\": \"value1\", \"key2\": \"value2\" }")));
206     }
207 
208     /**
209      * Return a test-friendly name for a given named resource.
210      */
resourceName(String suffix)211     private String resourceName(String suffix) {
212         return RESOURCE_PREFIX + suffix;
213     }
214 
215     /**
216      * Get the last request that was sent with the {@link EventBridgeClient}.
217      */
lastRequest()218     private SdkHttpRequest lastRequest() {
219         return interceptor.beforeTransmission;
220     }
221 
222     /**
223      * Captures {@link SdkHttpRequest}s and saves them to then assert against.
224      */
225     public static class CapturingExecutionInterceptor implements ExecutionInterceptor {
226         private SdkHttpRequest beforeTransmission;
227 
228         @Override
beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes)229         public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
230             this.beforeTransmission = context.httpRequest();
231         }
232     }
233 }
234