Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.spotify.confidence;

import com.spotify.confidence.flags.resolver.v1.ResolveWithStickyRequest;
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsRequest;
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsResponse;
import com.spotify.futures.CompletableFutures;
import java.util.ArrayList;
Expand All @@ -17,16 +16,16 @@
* Pre-initialized resolver instances mapped by thread ID to CPU core count. This eliminates both
* lock contention and lazy initialization overhead.
*/
class ThreadLocalSwapWasmResolverApi implements ResolverApi {
private static final Logger logger =
LoggerFactory.getLogger(ThreadLocalSwapWasmResolverApi.class);
class ConcurrentRotatingResolverApi implements RotatingWasmResolverApi {
private static final Logger logger = LoggerFactory.getLogger(ConcurrentRotatingResolverApi.class);
private final WasmFlagLogger flagLogger;
private final StickyResolveStrategy stickyResolveStrategy;
private volatile byte[] currentState;
private volatile String currentAccountId;

// Pre-initialized resolver instances mapped by core index
private final Map<Integer, SwapWasmResolverApi> resolverInstances = new ConcurrentHashMap<>();
private final Map<Integer, SingleRotatingWasmResolverApi> resolverInstances =
new ConcurrentHashMap<>();
private final int numInstances;
private final AtomicInteger nextInstanceIndex = new AtomicInteger(0);
private final ThreadLocal<Integer> threadInstanceIndex =
Expand All @@ -37,7 +36,7 @@ protected Integer initialValue() {
}
};

public ThreadLocalSwapWasmResolverApi(
public ConcurrentRotatingResolverApi(
WasmFlagLogger flagLogger,
byte[] initialState,
String accountId,
Expand All @@ -60,7 +59,7 @@ public ThreadLocalSwapWasmResolverApi(
CompletableFuture.runAsync(
() -> {
final var instance =
new SwapWasmResolverApi(
new SingleRotatingWasmResolverApi(
this.flagLogger,
this.currentState,
this.currentAccountId,
Expand All @@ -75,13 +74,13 @@ public ThreadLocalSwapWasmResolverApi(
* typically called by a scheduled task to refresh the resolver state.
*/
@Override
public void updateStateAndFlushLogs(byte[] state, String accountId) {
public void rotate(byte[] state, String accountId) {
this.currentState = state;
this.currentAccountId = accountId;

final var futures =
resolverInstances.values().stream()
.map(v -> CompletableFuture.runAsync(() -> v.updateStateAndFlushLogs(state, accountId)))
.map(v -> CompletableFuture.runAsync(() -> v.rotate(state, accountId)))
.toList();
CompletableFutures.allAsList(futures).join();
}
Expand All @@ -91,36 +90,21 @@ public void updateStateAndFlushLogs(byte[] state, String accountId) {
* assigned to an instance index when first accessed, ensuring even distribution across available
* instances.
*/
private SwapWasmResolverApi getResolverForCurrentThread() {
private SingleRotatingWasmResolverApi getResolverForCurrentThread() {
final int instanceIndex = threadInstanceIndex.get();
return resolverInstances.get(instanceIndex);
}

/** Delegates resolveWithSticky to the assigned SwapWasmResolverApi instance. */
@Override
public CompletableFuture<ResolveFlagsResponse> resolveWithSticky(
ResolveWithStickyRequest request) {
return getResolverForCurrentThread().resolveWithSticky(request);
}

/** Delegates resolve to the assigned SwapWasmResolverApi instance. */
@Override
public ResolveFlagsResponse resolve(ResolveFlagsRequest request) {
public CompletableFuture<ResolveFlagsResponse> resolve(ResolveWithStickyRequest request) {
return getResolverForCurrentThread().resolve(request);
}

/**
* Returns the number of pre-initialized resolver instances. This is primarily for debugging and
* monitoring purposes.
*/
public int getInstanceCount() {
return resolverInstances.size();
}

/** Closes all pre-initialized resolver instances and clears the map. */
@Override
public void close() {
resolverInstances.values().forEach(SwapWasmResolverApi::close);
resolverInstances.values().forEach(SingleRotatingWasmResolverApi::close);
resolverInstances.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LocalResolverServiceFactory implements ResolverServiceFactory {
private final AtomicReference<ResolverState> resolverStateHolder;
private final ResolveTokenConverter resolveTokenConverter;

private final ResolverApi wasmResolveApi;
private final RotatingWasmResolverApi wasmResolveApi;
private final Supplier<Instant> timeSupplier;
private final Supplier<String> resolveIdSupplier;
private final FlagLogger flagLogger;
Expand Down Expand Up @@ -107,8 +107,8 @@ private static FlagResolverService createFlagResolverService(

final var wasmFlagLogger = new GrpcWasmFlagLogger(apiSecret);
if (isWasm) {
final ResolverApi wasmResolverApi =
new ThreadLocalSwapWasmResolverApi(
final RotatingWasmResolverApi wasmResolverApi =
new ConcurrentRotatingResolverApi(
wasmFlagLogger,
sidecarFlagsAdminFetcher.rawStateHolder().get().toByteArray(),
sidecarFlagsAdminFetcher.accountId,
Expand All @@ -121,7 +121,7 @@ private static FlagResolverService createFlagResolverService(

logPollExecutor.scheduleAtFixedRate(
() -> {
wasmResolverApi.updateStateAndFlushLogs(
wasmResolverApi.rotate(
sidecarFlagsAdminFetcher.rawStateHolder().get().toByteArray(),
sidecarFlagsAdminFetcher.accountId);
},
Expand Down Expand Up @@ -180,16 +180,16 @@ private static FlagResolverService createFlagResolverService(
final AtomicReference<byte[]> resolverStateProtobuf =
new AtomicReference<>(accountStateProvider.provide());
final WasmFlagLogger flagLogger = request -> WriteFlagLogsResponse.getDefaultInstance();
final ResolverApi wasmResolverApi =
new ThreadLocalSwapWasmResolverApi(
final RotatingWasmResolverApi wasmResolverApi =
new ConcurrentRotatingResolverApi(
flagLogger, resolverStateProtobuf.get(), accountId, stickyResolveStrategy);
flagsFetcherExecutor.scheduleAtFixedRate(
() -> resolverStateProtobuf.set(accountStateProvider.provide()),
pollIntervalSeconds,
pollIntervalSeconds,
TimeUnit.SECONDS);
logPollExecutor.scheduleAtFixedRate(
() -> wasmResolverApi.updateStateAndFlushLogs(resolverStateProtobuf.get(), accountId),
() -> wasmResolverApi.rotate(resolverStateProtobuf.get(), accountId),
POLL_LOG_INTERVAL.getSeconds(),
POLL_LOG_INTERVAL.getSeconds(),
TimeUnit.SECONDS);
Expand All @@ -212,7 +212,7 @@ private static FlagResolverService createFlagResolverService(
}

LocalResolverServiceFactory(
ResolverApi wasmResolveApi,
RotatingWasmResolverApi wasmResolveApi,
AtomicReference<ResolverState> resolverStateHolder,
ResolveTokenConverter resolveTokenConverter,
FlagLogger flagLogger,
Expand All @@ -228,7 +228,7 @@ private static FlagResolverService createFlagResolverService(
}

LocalResolverServiceFactory(
ResolverApi wasmResolveApi,
RotatingWasmResolverApi wasmResolveApi,
AtomicReference<ResolverState> resolverStateHolder,
ResolveTokenConverter resolveTokenConverter,
Supplier<Instant> timeSupplier,
Expand All @@ -247,7 +247,7 @@ private static FlagResolverService createFlagResolverService(
@VisibleForTesting
public void setState(byte[] state, String accountId) {
if (this.wasmResolveApi != null) {
wasmResolveApi.updateStateAndFlushLogs(state, accountId);
wasmResolveApi.rotate(state, accountId);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.spotify.confidence;

import com.spotify.confidence.flags.resolver.v1.ResolveWithStickyRequest;
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class IsClosedException extends Exception {}

class LockedWasmResolverApi implements ResolverApi {
private final ResolverApi resolverApi;
private boolean isConsumed = false;
private final ReadWriteLock wasmLock = new ReentrantReadWriteLock();

public LockedWasmResolverApi(ResolverApi resolverApi) {
this.resolverApi = resolverApi;
}

@Override
public CompletableFuture<ResolveFlagsResponse> resolve(ResolveWithStickyRequest request)
throws IsClosedException {
if (!wasmLock.writeLock().tryLock() || isConsumed) {
throw new IsClosedException();
}
try {
return resolverApi.resolve(request);
} finally {
wasmLock.writeLock().unlock();
}
}

@Override
public void flush() {
wasmLock.readLock().lock();
try {
resolverApi.flush();
isConsumed = true;
} finally {
wasmLock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,12 @@
package com.spotify.confidence;

import com.spotify.confidence.flags.resolver.v1.ResolveWithStickyRequest;
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsRequest;
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsResponse;
import java.util.concurrent.CompletableFuture;

/** Common interface for WASM-based flag resolver implementations. */
interface ResolverApi {
CompletableFuture<ResolveFlagsResponse> resolve(ResolveWithStickyRequest request)
throws IsClosedException;

/**
* Resolves flags with sticky assignment support.
*
* @param request The resolve request with sticky context
* @return A future containing the resolve response
*/
CompletableFuture<ResolveFlagsResponse> resolveWithSticky(ResolveWithStickyRequest request);

/**
* Resolves flags without sticky assignment support.
*
* @param request The resolve request
* @return The resolve response
*/
ResolveFlagsResponse resolve(ResolveFlagsRequest request);

/**
* Updates the resolver state and flushes any pending logs.
*
* @param state The new resolver state
* @param accountId The account ID
*/
void updateStateAndFlushLogs(byte[] state, String accountId);

/** Closes the resolver and releases any resources. */
void close();
void flush();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.spotify.confidence;

import com.spotify.confidence.flags.resolver.v1.ResolveWithStickyRequest;
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsResponse;
import java.util.concurrent.CompletableFuture;

/** Common interface for WASM-based flag resolver implementations. */
interface RotatingWasmResolverApi {
/**
* Resolves flags with sticky assignment support.
*
* @param request The resolve request
* @return The resolve response
*/
CompletableFuture<ResolveFlagsResponse> resolve(ResolveWithStickyRequest request);

/**
* Updates the resolver state and flushes any pending logs.
*
* @param state The new resolver state
* @param accountId The account ID
*/
void rotate(byte[] state, String accountId);

/** Closes the resolver and releases any resources. */
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.spotify.confidence;

import com.spotify.confidence.flags.resolver.v1.ResolveWithStickyRequest;
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

class SingleRotatingWasmResolverApi implements RotatingWasmResolverApi {
private final AtomicReference<LockedWasmResolverApi> lockedResolverApi = new AtomicReference<>();
private final StickyResolveStrategy stickyResolveStrategy;
private final WasmFlagLogger flagLogger;

public SingleRotatingWasmResolverApi(
WasmFlagLogger flagLogger,
byte[] initialState,
String accountId,
StickyResolveStrategy stickyResolveStrategy) {
this.stickyResolveStrategy = stickyResolveStrategy;
this.flagLogger = flagLogger;

// Create initial instance
final WasmResolveApi initialInstance = new WasmResolveApi(flagLogger);
initialInstance.setResolverState(initialState, accountId);
final var stickyResolverApi = new StickyResolverApi(initialInstance, stickyResolveStrategy);
this.lockedResolverApi.set(new LockedWasmResolverApi(stickyResolverApi));
}

@Override
public void rotate(byte[] state, String accountId) {
// Create new instance with updated state
final WasmResolveApi newInstance = new WasmResolveApi(flagLogger);
newInstance.setResolverState(state, accountId);

final var stickyResolverApi = new StickyResolverApi(newInstance, stickyResolveStrategy);
final LockedWasmResolverApi oldInstance =
lockedResolverApi.getAndSet(new LockedWasmResolverApi(stickyResolverApi));
if (oldInstance != null) {
oldInstance.flush();
}
}

@Override
public void close() {}

@Override
public CompletableFuture<ResolveFlagsResponse> resolve(ResolveWithStickyRequest request) {
try {
return lockedResolverApi.get().resolve(request);
} catch (IsClosedException e) {
return resolve(request);
}
}
}
Loading