diff --git a/Dockerfile b/Dockerfile index b40bfa9..cc8b6c8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,22 +19,22 @@ EXPOSE 42424 WORKDIR /opt/accumulo-proxy -ARG HADOOP_VERSION=3.3.4 -ARG ZOOKEEPER_VERSION=3.8.0 -ARG ACCUMULO_VERSION=2.1.1 +ARG HADOOP_VERSION=3.3.6 +ARG ZOOKEEPER_VERSION=3.9.2 +ARG ACCUMULO_VERSION=2.1.3 ARG ACCUMULO_PROXY_VERSION=2.0.0-SNAPSHOT -ARG HADOOP_HASH=ca5e12625679ca95b8fd7bb7babc2a8dcb2605979b901df9ad137178718821097b67555115fafc6dbf6bb32b61864ccb6786dbc555e589694a22bf69147780b4 -ARG ZOOKEEPER_HASH=d66e3a40451f840406901b2cd940992b001f92049a372ae48d8b420891605871cd1ae5f6cceb3b10665491e7abef36a4078dace158bd1e0938fcd3567b5234ca -ARG ACCUMULO_HASH=adb23e56362c2e3e813d07791389b8ca2d5976df8b00a29b607e6ae05ea465eff80ada6d1ec9a9c596df8b4066c51078cd5a4006dc78568ac38f638a1d3895be +ARG HADOOP_HASH=de3eaca2e0517e4b569a88b63c89fae19cb8ac6c01ff990f1ff8f0cc0f3128c8e8a23db01577ca562a0e0bb1b4a3889f8c74384e609cd55e537aada3dcaa9f8a +ARG ZOOKEEPER_HASH=2b5ae02d618a27ca8cd54924855d5344263b7d9dee760181f9d66bafa9230324d2ad31786895f0654c969dc38d4a3d0077f74cc376b58b5fa2fb94beb1ab445f +ARG ACCUMULO_HASH=1a27a144dc31f55ccc8e081b6c1bc6cc0362a8391838c53c166cb45291ff8f35867fd8e4729aa7b2c540f8b721f8c6953281bf589fc7fe320e4dc4d20b87abc4 # Download from Apache mirrors instead of archive #9 -ENV APACHE_DIST_URLS \ +ENV APACHE_DIST_URLS="\ https://www.apache.org/dyn/closer.cgi?action=download&filename= \ # if the version is outdated (or we're grabbing the .asc file), we might have to pull from the dist/archive :/ https://www-us.apache.org/dist/ \ https://www.apache.org/dist/ \ - https://archive.apache.org/dist/ + https://archive.apache.org/dist/" RUN set -eux; \ download_bin() { \ @@ -42,7 +42,7 @@ RUN set -eux; \ local hash="$1"; shift; \ local distFile="$1"; shift; \ local success=; \ - local distUrl=; \ + local distUrl=; \ for distUrl in ${APACHE_DIST_URLS}; do \ if wget -nv -O "/tmp/${f}" "${distUrl}${distFile}"; then \ success=1; \ @@ -63,16 +63,16 @@ RUN tar xzf /tmp/hadoop.tar.gz -C /opt/ && ln -s /opt/hadoop-${HADOOP_VERSION} / RUN tar xzf /tmp/apache-zookeeper.tar.gz -C /opt/ && ln -s /opt/apache-zookeeper-${ZOOKEEPER_VERSION}-bin /opt/apache-zookeeper RUN tar xzf /tmp/accumulo.tar.gz -C /opt/ && ln -s /opt/accumulo-${ACCUMULO_VERSION} /opt/accumulo && sed -i 's/\${ZOOKEEPER_HOME}\/\*/\${ZOOKEEPER_HOME}\/\*\:\${ZOOKEEPER_HOME}\/lib\/\*/g' /opt/accumulo/conf/accumulo-env.sh -ENV HADOOP_HOME /opt/hadoop -ENV ZOOKEEPER_HOME /opt/apache-zookeeper -ENV ACCUMULO_HOME /opt/accumulo +ENV HADOOP_HOME=/opt/hadoop +ENV ZOOKEEPER_HOME=/opt/apache-zookeeper +ENV ACCUMULO_HOME=/opt/accumulo # Add the proxy binary COPY target/accumulo-proxy-${ACCUMULO_PROXY_VERSION}-bin.tar.gz /tmp/ RUN tar xzf /tmp/accumulo-proxy-${ACCUMULO_PROXY_VERSION}-bin.tar.gz -C /opt/accumulo-proxy --strip 1 -ENV ACCUMULO_PROXY_HOME /opt/accumulo-proxy +ENV ACCUMULO_PROXY_HOME=/opt/accumulo-proxy # Ensure Accumulo is on the path. -ENV PATH "${PATH}:${ACCUMULO_HOME}/bin" +ENV PATH="${PATH}:${ACCUMULO_HOME}/bin" CMD ["/opt/accumulo-proxy/bin/accumulo-proxy", "-p", "/opt/accumulo-proxy/conf/proxy.properties"] diff --git a/pom.xml b/pom.xml index 9f70da8..ab74d73 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ org.apache apache - 29 + 31 org.apache.accumulo accumulo-proxy @@ -85,27 +85,33 @@ https://github.com/apache/accumulo-proxy/actions - 2.1.1 + 2.1.3 contrib/Eclipse-Accumulo-Codestyle.xml - 3.3.4 + 3.3.6 false 11 11 11 - 2.0.6 + 2.0.12 source-release-tar src/main/spotbugs/exclude-filter.xml - 3.0.0-M8 0.17.0 + + io.micrometer + micrometer-bom + 1.12.2 + pom + import + org.apache.logging.log4j log4j-bom - 2.19.0 + 2.23.1 pom import @@ -121,13 +127,13 @@ com.github.spotbugs spotbugs-annotations - 4.7.3 + 4.8.3 true com.google.auto.service auto-service - 1.0.1 + 1.1.1 true @@ -140,6 +146,10 @@ commons-lang 2.6 + + io.micrometer + micrometer-core + org.apache.accumulo accumulo-core @@ -214,7 +224,7 @@ org.junit.jupiter junit-jupiter-api - 5.9.2 + 5.10.1 test @@ -225,12 +235,12 @@ org.codehaus.mojo versions-maven-plugin - 2.14.2 + 2.16.2 com.mycila license-maven-plugin - 4.1 + 4.3
${session.executionRootDirectory}/contrib/license-header.txt
@@ -255,7 +265,7 @@ org.gaul modernizer-maven-plugin - 2.5.0 + 2.7.0 ${maven.compiler.target} @@ -263,7 +273,7 @@ com.github.spotbugs spotbugs-maven-plugin - 4.7.3.0 + 4.7.3.6 true Max @@ -313,7 +323,7 @@ com.github.koraktor mavanagaiata - 1.0.0 + 1.0.1 true @@ -337,7 +347,6 @@ org.apache.maven.plugins maven-compiler-plugin - true true true @@ -410,22 +419,22 @@ org.codehaus.mojo build-helper-maven-plugin - 3.3.0 + 3.5.0 org.codehaus.mojo exec-maven-plugin - 3.0.0 + 3.1.1 net.revelc.code apilyzer-maven-plugin - 1.2.0 + 1.3.0 net.revelc.code.formatter formatter-maven-plugin - 2.21.0 + 2.23.0 ${eclipseFormatterStyle} ${maven.compiler.source} @@ -435,7 +444,6 @@ **/thrift-gen-java/**/*.java LF - true true true true @@ -645,7 +653,7 @@ com.puppycrawl.tools checkstyle - 10.6.0 + 10.12.6 diff --git a/src/main/java/org/apache/accumulo/proxy/Proxy.java b/src/main/java/org/apache/accumulo/proxy/Proxy.java index 017b298..b4020d4 100644 --- a/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/src/main/java/org/apache/accumulo/proxy/Proxy.java @@ -22,6 +22,8 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.file.Files; +import java.util.Collection; +import java.util.List; import java.util.Properties; import org.apache.accumulo.core.cli.Help; @@ -30,6 +32,8 @@ import org.apache.accumulo.core.clientImpl.ClientConfConverter; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.HostAndPort; @@ -55,6 +59,8 @@ import com.google.auto.service.AutoService; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; @AutoService(KeywordExecutable.class) public class Proxy implements KeywordExecutable { @@ -250,7 +256,7 @@ public static ServerAddress createProxyServer(HostAndPort address, break; } - TimedProcessor timedProcessor = new TimedProcessor(processor); + TimedProcessor timedProcessor = new TimedProcessor(processor, new ProxyMetricsInfo()); // Create the thrift server with our processor and properties return TServerUtils.startTServer(serverType, timedProcessor, protocolFactory, serverName, @@ -258,4 +264,55 @@ public static ServerAddress createProxyServer(HostAndPort address, ClientConfConverter.toAccumuloConf(props), 1000L, maxFrameSize, sslParams, saslParams, serverSocketTimeout, address); } + + // TODO: This MetricsInfo is a stub Metrics Info to allow the timed processor to build. If Metrics + // are wanted or needed in a later version of the proxy, this can be updated. + static private class ProxyMetricsInfo implements MetricsInfo { + + @Override + public boolean isMetricsEnabled() { + return false; + } + + @Override + public void addServiceTags(String applicationName, HostAndPort hostAndPort) { + throw new UnsupportedOperationException("Unimplemented method 'addServiceTags'"); + } + + @Override + public void addCommonTags(List updates) { + throw new UnsupportedOperationException("Unimplemented method 'addCommonTags'"); + } + + @Override + public Collection getCommonTags() { + throw new UnsupportedOperationException("Unimplemented method 'getCommonTags'"); + } + + @Override + public void addRegistry(MeterRegistry registry) { + throw new UnsupportedOperationException("Unimplemented method 'addRegistry'"); + } + + @Override + public void addMetricsProducers(MetricsProducer... producer) { + return; + } + + @Override + public void init() { + throw new UnsupportedOperationException("Unimplemented method 'init'"); + } + + @Override + public MeterRegistry getRegistry() { + throw new UnsupportedOperationException("Unimplemented method 'getRegistry'"); + } + + @Override + public void close() { + throw new UnsupportedOperationException("Unimplemented method 'close'"); + } + + } } diff --git a/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/src/main/java/org/apache/accumulo/proxy/ProxyServer.java index 54ad3e4..49c7809 100644 --- a/src/main/java/org/apache/accumulo/proxy/ProxyServer.java +++ b/src/main/java/org/apache/accumulo/proxy/ProxyServer.java @@ -222,6 +222,7 @@ public ProxyServer(Properties props) { } protected AccumuloClient getClient(String sharedSecret) throws Exception { + if (sharedSecret.equals(this.sharedSecret)) { return client; } else { @@ -2118,7 +2119,9 @@ public String createConditionalWriter(String sharedSecret, String tableName, try { ConditionalWriterConfig cwc = new ConditionalWriterConfig(); if (options.getMaxMemory() != 0) { - // TODO + // TODO - This was left blank, I'm not sure why it's here. For now, it will log the max + // memory + logger.info("Max memory: " + options.getMaxMemory()); } if (options.isSetThreads() && options.getThreads() != 0) { cwc.setMaxWriteThreads(options.getThreads()); diff --git a/src/main/java/org/apache/accumulo/proxy/TServerUtils.java b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java index c378f59..6c4af19 100644 --- a/src/main/java/org/apache/accumulo/proxy/TServerUtils.java +++ b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -187,8 +188,10 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) { - final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createFixedThreadPool( - executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true); + final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools() + .getPoolBuilder(serverName + "-ClientPool").numCoreThreads(executorThreads) + .numMaxThreads(executorThreads).withTimeOut(threadTimeOut, TimeUnit.MILLISECONDS) + .enableThreadPoolMetrics(true).withQueue(new LinkedBlockingQueue<>()).build(); // periodically adjust the number of threads we need by checking how busy our threads are ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> { // there is a minor race condition between sampling the current state of the thread pool @@ -276,7 +279,7 @@ private static TServerSocket getSslServerSocket(int port, int timeout, InetAddre TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address); } else { tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, - params.getTTransportParams()); + params.getTSSLTransportParameters()); } final ServerSocket serverSock = tServerSock.getServerSocket(); diff --git a/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java b/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java index 0d71d4e..5435135 100644 --- a/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java +++ b/src/test/java/org/apache/accumulo/proxy/its/SimpleProxyBase.java @@ -241,7 +241,8 @@ public void setup(TestInfo info) throws Exception { proxyClient = new TestProxyClient(hostname, proxyPort, factory); client = proxyClient.proxy(); - testName = info.getTestMethod().get().getName(); + testName = info.getTestMethod() + .orElseThrow(() -> new IllegalArgumentException("Test method is missing")).getName(); // Create some unique names for tables, namespaces, etc. String[] uniqueNames = getUniqueNameArray(2); @@ -1194,7 +1195,8 @@ public void attachIteratorsWithScans() throws Exception { assertTrue(scanFromClient.isPresent(), "Could not find any scan matching the client principal"); - ActiveScan scan = scanFromClient.get(); + ActiveScan scan = + scanFromClient.orElseThrow(() -> new IllegalArgumentException("ActiveScan is missing")); assertTrue( ScanState.RUNNING.equals(scan.getState()) || ScanState.QUEUED.equals(scan.getState())); @@ -2146,17 +2148,17 @@ public void testConditionalWriter() throws Exception { } private void assertNumericValueConstraintIsPresent() throws Exception { - assertTrue( - Wait.waitFor(() -> client.listConstraints(sharedSecret, tableName) - .containsKey(NumericValueConstraint.class.getName()), 30_000L, 2_000L), - "Expected to find NumericValueConstraint in constraints."); + Wait.waitFor( + () -> client.listConstraints(sharedSecret, tableName) + .containsKey(NumericValueConstraint.class.getName()), + 30_000L, 2_000L, "Expected to find NumericValueConstraint in constraints."); } private void assertNumericValueConstraintIsAbsent() throws Exception { - assertTrue( - Wait.waitFor(() -> !client.listConstraints(sharedSecret, tableName) - .containsKey(NumericValueConstraint.class.getName()), 30_000L, 2_000L), - "Found NumericValueConstraint in constraints, expected it to be absent."); + Wait.waitFor( + () -> !client.listConstraints(sharedSecret, tableName) + .containsKey(NumericValueConstraint.class.getName()), + 30_000L, 2_000L, "Found NumericValueConstraint in constraints, expected it to be absent."); } private void checkKey(String row, String cf, String cq, String val, KeyValue keyValue) {