| 
 | 1 | +/*  | 
 | 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more  | 
 | 3 | + * contributor license agreements.  See the NOTICE file distributed with  | 
 | 4 | + * this work for additional information regarding copyright ownership.  | 
 | 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0  | 
 | 6 | + * (the "License"); you may not use this file except in compliance with  | 
 | 7 | + * the License.  You may obtain a copy of the License at  | 
 | 8 | + *  | 
 | 9 | + *    http://www.apache.org/licenses/LICENSE-2.0  | 
 | 10 | + *  | 
 | 11 | + * Unless required by applicable law or agreed to in writing, software  | 
 | 12 | + * distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 14 | + * See the License for the specific language governing permissions and  | 
 | 15 | + * limitations under the License.  | 
 | 16 | + */  | 
 | 17 | + | 
 | 18 | +package org.apache.flink.connector.cloudwatch.sink.test;  | 
 | 19 | + | 
 | 20 | +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;  | 
 | 21 | +import org.apache.flink.connector.aws.testutils.LocalstackContainer;  | 
 | 22 | +import org.apache.flink.connector.aws.util.AWSGeneralUtil;  | 
 | 23 | +import org.apache.flink.connector.cloudwatch.sink.CloudWatchSink;  | 
 | 24 | +import org.apache.flink.connector.cloudwatch.sink.MetricWriteRequest;  | 
 | 25 | +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  | 
 | 26 | +import org.apache.flink.test.junit5.MiniClusterExtension;  | 
 | 27 | + | 
 | 28 | +import org.junit.jupiter.api.AfterEach;  | 
 | 29 | +import org.junit.jupiter.api.BeforeEach;  | 
 | 30 | +import org.junit.jupiter.api.Test;  | 
 | 31 | +import org.junit.jupiter.api.extension.ExtendWith;  | 
 | 32 | +import org.slf4j.Logger;  | 
 | 33 | +import org.slf4j.LoggerFactory;  | 
 | 34 | +import org.testcontainers.containers.Network;  | 
 | 35 | +import org.testcontainers.junit.jupiter.Container;  | 
 | 36 | +import org.testcontainers.junit.jupiter.Testcontainers;  | 
 | 37 | +import org.testcontainers.utility.DockerImageName;  | 
 | 38 | +import software.amazon.awssdk.core.SdkSystemSetting;  | 
 | 39 | +import software.amazon.awssdk.http.SdkHttpClient;  | 
 | 40 | +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;  | 
 | 41 | +import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataRequest;  | 
 | 42 | +import software.amazon.awssdk.services.cloudwatch.model.GetMetricDataResponse;  | 
 | 43 | +import software.amazon.awssdk.services.cloudwatch.model.Metric;  | 
 | 44 | +import software.amazon.awssdk.services.cloudwatch.model.MetricDataQuery;  | 
 | 45 | +import software.amazon.awssdk.services.cloudwatch.model.MetricStat;  | 
 | 46 | + | 
 | 47 | +import java.time.Instant;  | 
 | 48 | +import java.util.UUID;  | 
 | 49 | + | 
 | 50 | +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;  | 
 | 51 | +import static org.assertj.core.api.Assertions.assertThat;  | 
 | 52 | + | 
 | 53 | +/** Integration test for {@link CloudWatchSink}. */  | 
 | 54 | +@Testcontainers  | 
 | 55 | +@ExtendWith(MiniClusterExtension.class)  | 
 | 56 | +public class CloudWatchSinkITCase {  | 
 | 57 | +    private static final Logger LOG = LoggerFactory.getLogger(CloudWatchSinkITCase.class);  | 
 | 58 | + | 
 | 59 | +    private static String testMetricName;  | 
 | 60 | +    private static final int NUMBER_OF_ELEMENTS = 50;  | 
 | 61 | + | 
 | 62 | +    private static StreamExecutionEnvironment env;  | 
 | 63 | + | 
 | 64 | +    private CloudWatchClient cloudWatchClient;  | 
 | 65 | +    private SdkHttpClient httpClient;  | 
 | 66 | +    private static final Network network = Network.newNetwork();  | 
 | 67 | +    private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2";  | 
 | 68 | +    private static final String TEST_NAMESPACE = "test_namespace";  | 
 | 69 | + | 
 | 70 | +    @Container  | 
 | 71 | +    private static final LocalstackContainer MOCK_CLOUDWATCH_CONTAINER =  | 
 | 72 | +            new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION))  | 
 | 73 | +                    .withNetwork(network)  | 
 | 74 | +                    .withNetworkAliases("localstack");  | 
 | 75 | + | 
 | 76 | +    @BeforeEach  | 
 | 77 | +    public void setup() {  | 
 | 78 | +        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");  | 
 | 79 | + | 
 | 80 | +        testMetricName = UUID.randomUUID().toString();  | 
 | 81 | + | 
 | 82 | +        env = StreamExecutionEnvironment.getExecutionEnvironment();  | 
 | 83 | +        env.setParallelism(1);  | 
 | 84 | + | 
 | 85 | +        httpClient = AWSServicesTestUtils.createHttpClient();  | 
 | 86 | + | 
 | 87 | +        cloudWatchClient =  | 
 | 88 | +                AWSServicesTestUtils.createAwsSyncClient(  | 
 | 89 | +                        MOCK_CLOUDWATCH_CONTAINER.getEndpoint(),  | 
 | 90 | +                        httpClient,  | 
 | 91 | +                        CloudWatchClient.builder());  | 
 | 92 | + | 
 | 93 | +        LOG.info("Done setting up the localstack.");  | 
 | 94 | +    }  | 
 | 95 | + | 
 | 96 | +    @AfterEach  | 
 | 97 | +    public void teardown() {  | 
 | 98 | +        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());  | 
 | 99 | +        AWSGeneralUtil.closeResources(httpClient, cloudWatchClient);  | 
 | 100 | +    }  | 
 | 101 | + | 
 | 102 | +    @Test  | 
 | 103 | +    public void testRandomDataSuccessfullyWritten() throws Exception {  | 
 | 104 | +        CloudWatchSink<MetricWriteRequest> cloudWatchSink =  | 
 | 105 | +                CloudWatchSink.<MetricWriteRequest>builder()  | 
 | 106 | +                        .setNamespace(TEST_NAMESPACE)  | 
 | 107 | +                        .setCloudWatchClientProperties(  | 
 | 108 | +                                createConfig(MOCK_CLOUDWATCH_CONTAINER.getEndpoint()))  | 
 | 109 | +                        .build();  | 
 | 110 | + | 
 | 111 | +        Instant testTimestamp = Instant.now();  | 
 | 112 | + | 
 | 113 | +        env.fromSequence(1, NUMBER_OF_ELEMENTS)  | 
 | 114 | +                .map(  | 
 | 115 | +                        data ->  | 
 | 116 | +                                MetricWriteRequest.builder()  | 
 | 117 | +                                        .withMetricName(testMetricName)  | 
 | 118 | +                                        .addValue(1.0d)  | 
 | 119 | +                                        .withTimestamp(testTimestamp)  | 
 | 120 | +                                        .build())  | 
 | 121 | +                .sinkTo(cloudWatchSink);  | 
 | 122 | + | 
 | 123 | +        env.execute("Integration Test");  | 
 | 124 | + | 
 | 125 | +        GetMetricDataResponse response =  | 
 | 126 | +                cloudWatchClient.getMetricData(  | 
 | 127 | +                        GetMetricDataRequest.builder()  | 
 | 128 | +                                .metricDataQueries(  | 
 | 129 | +                                        MetricDataQuery.builder()  | 
 | 130 | +                                                .metricStat(getMetricStat("Sum"))  | 
 | 131 | +                                                .build(),  | 
 | 132 | +                                        MetricDataQuery.builder()  | 
 | 133 | +                                                .metricStat(getMetricStat("SampleCount"))  | 
 | 134 | +                                                .build())  | 
 | 135 | +                                .startTime(testTimestamp.minusSeconds(300))  | 
 | 136 | +                                .endTime(testTimestamp.plusSeconds(300))  | 
 | 137 | +                                .build());  | 
 | 138 | + | 
 | 139 | +        response.metricDataResults()  | 
 | 140 | +                .forEach(  | 
 | 141 | +                        result ->  | 
 | 142 | +                                assertThat(result.values())  | 
 | 143 | +                                        .containsExactly(Double.valueOf(NUMBER_OF_ELEMENTS)));  | 
 | 144 | +    }  | 
 | 145 | + | 
 | 146 | +    private static MetricStat getMetricStat(String stat) {  | 
 | 147 | +        return MetricStat.builder()  | 
 | 148 | +                .metric(  | 
 | 149 | +                        Metric.builder()  | 
 | 150 | +                                .namespace(TEST_NAMESPACE)  | 
 | 151 | +                                .metricName(testMetricName)  | 
 | 152 | +                                .build())  | 
 | 153 | +                .stat(stat)  | 
 | 154 | +                .period(300)  | 
 | 155 | +                .build();  | 
 | 156 | +    }  | 
 | 157 | +}  | 
0 commit comments