From 8a125effb6fd1f4530dc43fe0edf06623b01e5f0 Mon Sep 17 00:00:00 2001 From: Norwin Schnyder Date: Wed, 19 May 2021 18:02:44 +0200 Subject: [PATCH 1/2] Added parameter to set await termination timeout --- .../logging/HttpEventCollectorLog4jAppender.java | 3 ++- .../logging/HttpEventCollectorLogbackAppender.java | 8 ++++++++ .../logging/HttpEventCollectorLoggingHandler.java | 4 +++- .../splunk/logging/HttpEventCollectorSender.java | 13 ++++++++++++- src/test/resources/log4j2.xml | 1 + src/test/resources/logback.xml | 1 + 6 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java b/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java index dca76120..5726e863 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java @@ -157,6 +157,7 @@ public static HttpEventCollectorLog4jAppender createAppender( @PluginAttribute(value = "call_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT) final long callTimeout, @PluginAttribute(value = "read_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT) final long readTimeout, @PluginAttribute(value = "write_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT) final long writeTimeout, + @PluginAttribute(value = "await_termination_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_AWAIT_TERMINATION_TIMEOUT) final long awaitTerminationTimeout, @PluginElement("Layout") Layout layout, @PluginElement("Filter") final Filter filter ) @@ -219,7 +220,7 @@ public static HttpEventCollectorLog4jAppender createAppender( disableCertificateValidation, eventBodySerializer, eventHeaderSerializer, - new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout) + new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout, awaitTerminationTimeout) ); } diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java b/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java index fe786299..df39b27c 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java @@ -383,6 +383,14 @@ public long getWriteTimeout(long milliseconds) { return this.timeoutSettings.writeTimeout = milliseconds; } + public void setAwaitTerminationTimeout(long milliseconds) { + this.timeoutSettings.awaitTerminationTimeout = milliseconds; + } + + public long getAwaitTerminationTimeout(long milliseconds) { + return this.timeoutSettings.awaitTerminationTimeout = milliseconds; + } + private static long parseLong(String string, int defaultValue) { try { return Long.parseLong(string); diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java b/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java index 34826ded..68858ec8 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java @@ -114,6 +114,7 @@ public final class HttpEventCollectorLoggingHandler extends Handler { private final String CallTimeoutConfTag = "call_timeout"; private final String ReadTimeoutConfTag = "read_timeout"; private final String WriteTimeoutConfTag = "write_timeout"; + private final String AwaitTerminationTimeoutConfTag = "await_termination_timeout"; /** HttpEventCollectorLoggingHandler c-or */ public HttpEventCollectorLoggingHandler() { @@ -165,7 +166,8 @@ public HttpEventCollectorLoggingHandler() { getConfigurationNumericProperty(ConnectTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_CONNECT_TIMEOUT), getConfigurationNumericProperty(CallTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT), getConfigurationNumericProperty(ReadTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT), - getConfigurationNumericProperty(WriteTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT) + getConfigurationNumericProperty(WriteTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT), + getConfigurationNumericProperty(AwaitTerminationTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_AWAIT_TERMINATION_TIMEOUT) ); if ("raw".equalsIgnoreCase(type)) { diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorSender.java b/src/main/java/com/splunk/logging/HttpEventCollectorSender.java index 36fbcbd9..e0c77372 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorSender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorSender.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.security.cert.CertificateException; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -263,6 +264,13 @@ public static void putIfPresent(JsonObject collection, String tag, Object value) private void stopHttpClient() { if (httpClient != null) { httpClient.dispatcher().executorService().shutdown(); + + if (timeoutSettings.awaitTerminationTimeout > 0) { + try { + httpClient.dispatcher().executorService().awaitTermination(timeoutSettings.awaitTerminationTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { /* nop */ } + } + httpClient = null; } } @@ -392,19 +400,22 @@ public static class TimeoutSettings { public static final long DEFAULT_WRITE_TIMEOUT = 0; // 0 means no timeout public static final long DEFAULT_CALL_TIMEOUT = 0; public static final long DEFAULT_READ_TIMEOUT = 0; + public static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT = 0; public long connectTimeout = DEFAULT_CONNECT_TIMEOUT; public long callTimeout = DEFAULT_CALL_TIMEOUT; public long readTimeout = DEFAULT_READ_TIMEOUT; public long writeTimeout = DEFAULT_WRITE_TIMEOUT; + public long awaitTerminationTimeout = DEFAULT_AWAIT_TERMINATION_TIMEOUT; public TimeoutSettings() {} - public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout) { + public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout, long awaitTerminationTimeout) { this.connectTimeout = connectTimeout; this.callTimeout = callTimeout; this.readTimeout = readTimeout; this.writeTimeout = writeTimeout; + this.awaitTerminationTimeout = awaitTerminationTimeout; } } } diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 18fb5b87..147884d2 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -45,6 +45,7 @@ under the License. batch_size_count="0" batch_interval="0" connect_timeout="5000" + await_termination_timeout="1000" disableCertificateValidation="true"> diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 960372a5..f3f9c20b 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -60,6 +60,7 @@ under the License. text HttpEventCollectorUnitTestMiddleware 5000 + 2000 %msg From 427bca6419a66c484694ebfe3b16714dd29b0817 Mon Sep 17 00:00:00 2001 From: Norwin Schnyder Date: Mon, 24 May 2021 10:14:26 +0200 Subject: [PATCH 2/2] Add timeout to wait until queued messages are processed --- .../HttpEventCollectorLog4jAppender.java | 4 +- .../HttpEventCollectorLogbackAppender.java | 8 ++-- .../HttpEventCollectorLoggingHandler.java | 4 +- .../logging/HttpEventCollectorSender.java | 43 ++++++++++++++----- src/test/resources/log4j2.xml | 2 +- src/test/resources/logback.xml | 2 +- 6 files changed, 42 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java b/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java index 5726e863..3e5ca93d 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java @@ -157,7 +157,7 @@ public static HttpEventCollectorLog4jAppender createAppender( @PluginAttribute(value = "call_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT) final long callTimeout, @PluginAttribute(value = "read_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT) final long readTimeout, @PluginAttribute(value = "write_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT) final long writeTimeout, - @PluginAttribute(value = "await_termination_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_AWAIT_TERMINATION_TIMEOUT) final long awaitTerminationTimeout, + @PluginAttribute(value = "termination_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_TERMINATION_TIMEOUT) final long terminationTimeout, @PluginElement("Layout") Layout layout, @PluginElement("Filter") final Filter filter ) @@ -220,7 +220,7 @@ public static HttpEventCollectorLog4jAppender createAppender( disableCertificateValidation, eventBodySerializer, eventHeaderSerializer, - new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout, awaitTerminationTimeout) + new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout, terminationTimeout) ); } diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java b/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java index df39b27c..45d62f5b 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java @@ -383,12 +383,12 @@ public long getWriteTimeout(long milliseconds) { return this.timeoutSettings.writeTimeout = milliseconds; } - public void setAwaitTerminationTimeout(long milliseconds) { - this.timeoutSettings.awaitTerminationTimeout = milliseconds; + public void setTerminationTimeout(long milliseconds) { + this.timeoutSettings.terminationTimeout = milliseconds; } - public long getAwaitTerminationTimeout(long milliseconds) { - return this.timeoutSettings.awaitTerminationTimeout = milliseconds; + public long getTerminationTimeout(long milliseconds) { + return this.timeoutSettings.terminationTimeout = milliseconds; } private static long parseLong(String string, int defaultValue) { diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java b/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java index 68858ec8..88821771 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java @@ -114,7 +114,7 @@ public final class HttpEventCollectorLoggingHandler extends Handler { private final String CallTimeoutConfTag = "call_timeout"; private final String ReadTimeoutConfTag = "read_timeout"; private final String WriteTimeoutConfTag = "write_timeout"; - private final String AwaitTerminationTimeoutConfTag = "await_termination_timeout"; + private final String TerminationTimeoutConfTag = "termination_timeout"; /** HttpEventCollectorLoggingHandler c-or */ public HttpEventCollectorLoggingHandler() { @@ -167,7 +167,7 @@ public HttpEventCollectorLoggingHandler() { getConfigurationNumericProperty(CallTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT), getConfigurationNumericProperty(ReadTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT), getConfigurationNumericProperty(WriteTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT), - getConfigurationNumericProperty(AwaitTerminationTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_AWAIT_TERMINATION_TIMEOUT) + getConfigurationNumericProperty(TerminationTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_TERMINATION_TIMEOUT) ); if ("raw".equalsIgnoreCase(type)) { diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorSender.java b/src/main/java/com/splunk/logging/HttpEventCollectorSender.java index e0c77372..e65202d1 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorSender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorSender.java @@ -263,15 +263,36 @@ public static void putIfPresent(JsonObject collection, String tag, Object value) private void stopHttpClient() { if (httpClient != null) { - httpClient.dispatcher().executorService().shutdown(); + Dispatcher dispatcher = httpClient.dispatcher(); + httpClient = null; - if (timeoutSettings.awaitTerminationTimeout > 0) { - try { - httpClient.dispatcher().executorService().awaitTermination(timeoutSettings.awaitTerminationTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { /* nop */ } - } + if (timeoutSettings.terminationTimeout > 0) { + // wait for queued messages in the dispatcher to be promoted to the executor service + long start = System.currentTimeMillis(); + while (dispatcher.queuedCallsCount() > 0 && start + timeoutSettings.terminationTimeout > System.currentTimeMillis()) { + try { + TimeUnit.MILLISECONDS.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } - httpClient = null; + // initialize the shutdown of the executor service + dispatcher.executorService().shutdown(); + + // wait for the messages in the dispatcher's executor service to be sent out + long awaitTerminationTimeout = timeoutSettings.terminationTimeout - (System.currentTimeMillis() - start); + if (awaitTerminationTimeout > 0) { + try { + dispatcher.executorService().awaitTermination(awaitTerminationTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } else { + dispatcher.executorService().shutdown(); + } } } @@ -400,22 +421,22 @@ public static class TimeoutSettings { public static final long DEFAULT_WRITE_TIMEOUT = 0; // 0 means no timeout public static final long DEFAULT_CALL_TIMEOUT = 0; public static final long DEFAULT_READ_TIMEOUT = 0; - public static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT = 0; + public static final long DEFAULT_TERMINATION_TIMEOUT = 0; public long connectTimeout = DEFAULT_CONNECT_TIMEOUT; public long callTimeout = DEFAULT_CALL_TIMEOUT; public long readTimeout = DEFAULT_READ_TIMEOUT; public long writeTimeout = DEFAULT_WRITE_TIMEOUT; - public long awaitTerminationTimeout = DEFAULT_AWAIT_TERMINATION_TIMEOUT; + public long terminationTimeout = DEFAULT_TERMINATION_TIMEOUT; public TimeoutSettings() {} - public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout, long awaitTerminationTimeout) { + public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout, long terminationTimeout) { this.connectTimeout = connectTimeout; this.callTimeout = callTimeout; this.readTimeout = readTimeout; this.writeTimeout = writeTimeout; - this.awaitTerminationTimeout = awaitTerminationTimeout; + this.terminationTimeout = terminationTimeout; } } } diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 147884d2..6ca9f362 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -45,7 +45,7 @@ under the License. batch_size_count="0" batch_interval="0" connect_timeout="5000" - await_termination_timeout="1000" + termination_timeout="1000" disableCertificateValidation="true"> diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index f3f9c20b..00637f13 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -60,7 +60,7 @@ under the License. text HttpEventCollectorUnitTestMiddleware 5000 - 2000 + 2000 %msg