diff --git a/Dockerfile b/Dockerfile
index b40bfa9..cc8b6c8 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -19,22 +19,22 @@ EXPOSE 42424
WORKDIR /opt/accumulo-proxy
-ARG HADOOP_VERSION=3.3.4
-ARG ZOOKEEPER_VERSION=3.8.0
-ARG ACCUMULO_VERSION=2.1.1
+ARG HADOOP_VERSION=3.3.6
+ARG ZOOKEEPER_VERSION=3.9.2
+ARG ACCUMULO_VERSION=2.1.3
ARG ACCUMULO_PROXY_VERSION=2.0.0-SNAPSHOT
-ARG HADOOP_HASH=ca5e12625679ca95b8fd7bb7babc2a8dcb2605979b901df9ad137178718821097b67555115fafc6dbf6bb32b61864ccb6786dbc555e589694a22bf69147780b4
-ARG ZOOKEEPER_HASH=d66e3a40451f840406901b2cd940992b001f92049a372ae48d8b420891605871cd1ae5f6cceb3b10665491e7abef36a4078dace158bd1e0938fcd3567b5234ca
-ARG ACCUMULO_HASH=adb23e56362c2e3e813d07791389b8ca2d5976df8b00a29b607e6ae05ea465eff80ada6d1ec9a9c596df8b4066c51078cd5a4006dc78568ac38f638a1d3895be
+ARG HADOOP_HASH=de3eaca2e0517e4b569a88b63c89fae19cb8ac6c01ff990f1ff8f0cc0f3128c8e8a23db01577ca562a0e0bb1b4a3889f8c74384e609cd55e537aada3dcaa9f8a
+ARG ZOOKEEPER_HASH=2b5ae02d618a27ca8cd54924855d5344263b7d9dee760181f9d66bafa9230324d2ad31786895f0654c969dc38d4a3d0077f74cc376b58b5fa2fb94beb1ab445f
+ARG ACCUMULO_HASH=1a27a144dc31f55ccc8e081b6c1bc6cc0362a8391838c53c166cb45291ff8f35867fd8e4729aa7b2c540f8b721f8c6953281bf589fc7fe320e4dc4d20b87abc4
# Download from Apache mirrors instead of archive #9
-ENV APACHE_DIST_URLS \
+ENV APACHE_DIST_URLS="\
https://www.apache.org/dyn/closer.cgi?action=download&filename= \
# if the version is outdated (or we're grabbing the .asc file), we might have to pull from the dist/archive :/
https://www-us.apache.org/dist/ \
https://www.apache.org/dist/ \
- https://archive.apache.org/dist/
+ https://archive.apache.org/dist/"
RUN set -eux; \
download_bin() { \
@@ -42,7 +42,7 @@ RUN set -eux; \
local hash="$1"; shift; \
local distFile="$1"; shift; \
local success=; \
- local distUrl=; \
+ local distUrl=; \
for distUrl in ${APACHE_DIST_URLS}; do \
if wget -nv -O "/tmp/${f}" "${distUrl}${distFile}"; then \
success=1; \
@@ -63,16 +63,16 @@ RUN tar xzf /tmp/hadoop.tar.gz -C /opt/ && ln -s /opt/hadoop-${HADOOP_VERSION} /
RUN tar xzf /tmp/apache-zookeeper.tar.gz -C /opt/ && ln -s /opt/apache-zookeeper-${ZOOKEEPER_VERSION}-bin /opt/apache-zookeeper
RUN tar xzf /tmp/accumulo.tar.gz -C /opt/ && ln -s /opt/accumulo-${ACCUMULO_VERSION} /opt/accumulo && sed -i 's/\${ZOOKEEPER_HOME}\/\*/\${ZOOKEEPER_HOME}\/\*\:\${ZOOKEEPER_HOME}\/lib\/\*/g' /opt/accumulo/conf/accumulo-env.sh
-ENV HADOOP_HOME /opt/hadoop
-ENV ZOOKEEPER_HOME /opt/apache-zookeeper
-ENV ACCUMULO_HOME /opt/accumulo
+ENV HADOOP_HOME=/opt/hadoop
+ENV ZOOKEEPER_HOME=/opt/apache-zookeeper
+ENV ACCUMULO_HOME=/opt/accumulo
# Add the proxy binary
COPY target/accumulo-proxy-${ACCUMULO_PROXY_VERSION}-bin.tar.gz /tmp/
RUN tar xzf /tmp/accumulo-proxy-${ACCUMULO_PROXY_VERSION}-bin.tar.gz -C /opt/accumulo-proxy --strip 1
-ENV ACCUMULO_PROXY_HOME /opt/accumulo-proxy
+ENV ACCUMULO_PROXY_HOME=/opt/accumulo-proxy
# Ensure Accumulo is on the path.
-ENV PATH "${PATH}:${ACCUMULO_HOME}/bin"
+ENV PATH="${PATH}:${ACCUMULO_HOME}/bin"
CMD ["/opt/accumulo-proxy/bin/accumulo-proxy", "-p", "/opt/accumulo-proxy/conf/proxy.properties"]
diff --git a/pom.xml b/pom.xml
index 9f70da8..ab74d73 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
org.apache
apache
- 29
+ 31
org.apache.accumulo
accumulo-proxy
@@ -85,27 +85,33 @@
https://github.com/apache/accumulo-proxy/actions
- 2.1.1
+ 2.1.3
contrib/Eclipse-Accumulo-Codestyle.xml
- 3.3.4
+ 3.3.6
false
11
11
11
- 2.0.6
+ 2.0.12
source-release-tar
src/main/spotbugs/exclude-filter.xml
- 3.0.0-M8
0.17.0
+
+ io.micrometer
+ micrometer-bom
+ 1.12.2
+ pom
+ import
+
org.apache.logging.log4j
log4j-bom
- 2.19.0
+ 2.23.1
pom
import
@@ -121,13 +127,13 @@
com.github.spotbugs
spotbugs-annotations
- 4.7.3
+ 4.8.3
true
com.google.auto.service
auto-service
- 1.0.1
+ 1.1.1
true
@@ -140,6 +146,10 @@
commons-lang
2.6
+
+ io.micrometer
+ micrometer-core
+
org.apache.accumulo
accumulo-core
@@ -214,7 +224,7 @@
org.junit.jupiter
junit-jupiter-api
- 5.9.2
+ 5.10.1
test
@@ -225,12 +235,12 @@
org.codehaus.mojo
versions-maven-plugin
- 2.14.2
+ 2.16.2
com.mycila
license-maven-plugin
- 4.1
+ 4.3
${session.executionRootDirectory}/contrib/license-header.txt
@@ -255,7 +265,7 @@
org.gaul
modernizer-maven-plugin
- 2.5.0
+ 2.7.0
${maven.compiler.target}
@@ -263,7 +273,7 @@
com.github.spotbugs
spotbugs-maven-plugin
- 4.7.3.0
+ 4.7.3.6
true
Max
@@ -313,7 +323,7 @@
com.github.koraktor
mavanagaiata
- 1.0.0
+ 1.0.1
true
@@ -337,7 +347,6 @@
org.apache.maven.plugins
maven-compiler-plugin
- true
true
true
@@ -410,22 +419,22 @@
org.codehaus.mojo
build-helper-maven-plugin
- 3.3.0
+ 3.5.0
org.codehaus.mojo
exec-maven-plugin
- 3.0.0
+ 3.1.1
net.revelc.code
apilyzer-maven-plugin
- 1.2.0
+ 1.3.0
net.revelc.code.formatter
formatter-maven-plugin
- 2.21.0
+ 2.23.0
${eclipseFormatterStyle}
${maven.compiler.source}
@@ -435,7 +444,6 @@
**/thrift-gen-java/**/*.java
LF
- true
true
true
true
@@ -645,7 +653,7 @@
com.puppycrawl.tools
checkstyle
- 10.6.0
+ 10.12.6
diff --git a/src/main/java/org/apache/accumulo/proxy/Proxy.java b/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 017b298..b4020d4 100644
--- a/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -22,6 +22,8 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
+import java.util.Collection;
+import java.util.List;
import java.util.Properties;
import org.apache.accumulo.core.cli.Help;
@@ -30,6 +32,8 @@
import org.apache.accumulo.core.clientImpl.ClientConfConverter;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.metrics.MetricsInfo;
+import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
@@ -55,6 +59,8 @@
import com.google.auto.service.AutoService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
@AutoService(KeywordExecutable.class)
public class Proxy implements KeywordExecutable {
@@ -250,7 +256,7 @@ public static ServerAddress createProxyServer(HostAndPort address,
break;
}
- TimedProcessor timedProcessor = new TimedProcessor(processor);
+ TimedProcessor timedProcessor = new TimedProcessor(processor, new ProxyMetricsInfo());
// Create the thrift server with our processor and properties
return TServerUtils.startTServer(serverType, timedProcessor, protocolFactory, serverName,
@@ -258,4 +264,55 @@ public static ServerAddress createProxyServer(HostAndPort address,
ClientConfConverter.toAccumuloConf(props), 1000L, maxFrameSize, sslParams, saslParams,
serverSocketTimeout, address);
}
+
+ // TODO: This MetricsInfo is a stub Metrics Info to allow the timed processor to build. If Metrics
+ // are wanted or needed in a later version of the proxy, this can be updated.
+ static private class ProxyMetricsInfo implements MetricsInfo {
+
+ @Override
+ public boolean isMetricsEnabled() {
+ return false;
+ }
+
+ @Override
+ public void addServiceTags(String applicationName, HostAndPort hostAndPort) {
+ throw new UnsupportedOperationException("Unimplemented method 'addServiceTags'");
+ }
+
+ @Override
+ public void addCommonTags(List updates) {
+ throw new UnsupportedOperationException("Unimplemented method 'addCommonTags'");
+ }
+
+ @Override
+ public Collection getCommonTags() {
+ throw new UnsupportedOperationException("Unimplemented method 'getCommonTags'");
+ }
+
+ @Override
+ public void addRegistry(MeterRegistry registry) {
+ throw new UnsupportedOperationException("Unimplemented method 'addRegistry'");
+ }
+
+ @Override
+ public void addMetricsProducers(MetricsProducer... producer) {
+ return;
+ }
+
+ @Override
+ public void init() {
+ throw new UnsupportedOperationException("Unimplemented method 'init'");
+ }
+
+ @Override
+ public MeterRegistry getRegistry() {
+ throw new UnsupportedOperationException("Unimplemented method 'getRegistry'");
+ }
+
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException("Unimplemented method 'close'");
+ }
+
+ }
}
diff --git a/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index 54ad3e4..49c7809 100644
--- a/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -222,6 +222,7 @@ public ProxyServer(Properties props) {
}
protected AccumuloClient getClient(String sharedSecret) throws Exception {
+
if (sharedSecret.equals(this.sharedSecret)) {
return client;
} else {
@@ -2118,7 +2119,9 @@ public String createConditionalWriter(String sharedSecret, String tableName,
try {
ConditionalWriterConfig cwc = new ConditionalWriterConfig();
if (options.getMaxMemory() != 0) {
- // TODO
+ // TODO - This was left blank, I'm not sure why it's here. For now, it will log the max
+ // memory
+ logger.info("Max memory: " + options.getMaxMemory());
}
if (options.isSetThreads() && options.getThreads() != 0) {
cwc.setMaxWriteThreads(options.getThreads());
diff --git a/src/main/java/org/apache/accumulo/proxy/TServerUtils.java b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java
index c378f59..6c4af19 100644
--- a/src/main/java/org/apache/accumulo/proxy/TServerUtils.java
+++ b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java
@@ -31,6 +31,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -187,8 +188,10 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce
private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName,
final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf,
long timeBetweenThreadChecks) {
- final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createFixedThreadPool(
- executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true);
+ final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools()
+ .getPoolBuilder(serverName + "-ClientPool").numCoreThreads(executorThreads)
+ .numMaxThreads(executorThreads).withTimeOut(threadTimeOut, TimeUnit.MILLISECONDS)
+ .enableThreadPoolMetrics(true).withQueue(new LinkedBlockingQueue<>()).build();
// periodically adjust the number of threads we need by checking how busy our threads are
ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> {
// there is a minor race condition between sampling the current state of the thread pool
@@ -276,7 +279,7 @@ private static TServerSocket getSslServerSocket(int port, int timeout, InetAddre
TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
} else {
tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address,
- params.getTTransportParams());
+ params.getTSSLTransportParameters());
}
final ServerSocket serverSock = tServerSock.getServerSocket();
diff --git a/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java b/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java
index 0d71d4e..5435135 100644
--- a/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java
+++ b/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java
@@ -241,7 +241,8 @@ public void setup(TestInfo info) throws Exception {
proxyClient = new TestProxyClient(hostname, proxyPort, factory);
client = proxyClient.proxy();
- testName = info.getTestMethod().get().getName();
+ testName = info.getTestMethod()
+ .orElseThrow(() -> new IllegalArgumentException("Test method is missing")).getName();
// Create some unique names for tables, namespaces, etc.
String[] uniqueNames = getUniqueNameArray(2);
@@ -1194,7 +1195,8 @@ public void attachIteratorsWithScans() throws Exception {
assertTrue(scanFromClient.isPresent(), "Could not find any scan matching the client principal");
- ActiveScan scan = scanFromClient.get();
+ ActiveScan scan =
+ scanFromClient.orElseThrow(() -> new IllegalArgumentException("ActiveScan is missing"));
assertTrue(
ScanState.RUNNING.equals(scan.getState()) || ScanState.QUEUED.equals(scan.getState()));
@@ -2146,17 +2148,17 @@ public void testConditionalWriter() throws Exception {
}
private void assertNumericValueConstraintIsPresent() throws Exception {
- assertTrue(
- Wait.waitFor(() -> client.listConstraints(sharedSecret, tableName)
- .containsKey(NumericValueConstraint.class.getName()), 30_000L, 2_000L),
- "Expected to find NumericValueConstraint in constraints.");
+ Wait.waitFor(
+ () -> client.listConstraints(sharedSecret, tableName)
+ .containsKey(NumericValueConstraint.class.getName()),
+ 30_000L, 2_000L, "Expected to find NumericValueConstraint in constraints.");
}
private void assertNumericValueConstraintIsAbsent() throws Exception {
- assertTrue(
- Wait.waitFor(() -> !client.listConstraints(sharedSecret, tableName)
- .containsKey(NumericValueConstraint.class.getName()), 30_000L, 2_000L),
- "Found NumericValueConstraint in constraints, expected it to be absent.");
+ Wait.waitFor(
+ () -> !client.listConstraints(sharedSecret, tableName)
+ .containsKey(NumericValueConstraint.class.getName()),
+ 30_000L, 2_000L, "Found NumericValueConstraint in constraints, expected it to be absent.");
}
private void checkKey(String row, String cf, String cq, String val, KeyValue keyValue) {