Skip to content

Commit

Permalink
[router] Reduced allocations in router read path
Browse files Browse the repository at this point in the history
A lot of multi-key specific state was moved out of VenicePath and into
its subclasses, which makes single get queries require less heap space.
This includes streaming (chunkedResponse), HAR (helixGroupId and
requestId), longTailRetryThresholdMs and responseHeaders.

In addition, all kinds of VenicePaths are also getting their heap size
reduced by leveraging shared instances where possible. For example:

- The resourceName, storeName and versionNumber properties are replaced
  by a single reference to a shared instance of StoreVersionName, which
  is a new class obtained from a NameRepository.

- The smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs and
  longTailRetryThresholdMs properties are replaced by a single pointer
  to a RouterRetryConfig object, which is a simple facade wrapping the
  VeniceRouterConfig.

- Removed the time property from VenicePath since the only time it is
  used by tests to inject a MockTime, it is done from a subclass, and
  therefore can be achieved via extension, rather than composition.

- The responseDecompressor is now coming from a map of shared instances
  in the VenicePathParser.

New config:

- name.repository.max.entry.count : controls the maximum number of
  entries (per type) to be cached in the NameRepository class. For now
  this config is used only in the router, but it would likely become
  used in the server and controller as well, later on.

Miscellaneous:

- Various refactorings enable the VenicePath constructors to have fewer
  params and to make more of the properties final.

- Made use of StoreName rather than String in the VenicePathParser's
  RetryManager maps.

- Cleaned up generics in the VenicePathParser and ScatterGatherHelper
  builder.

- Deleted VeniceMetricsProvider which can be trivially inlined.
  • Loading branch information
FelixGV committed Jan 8, 2025
1 parent 5ded7f5 commit d1bd342
Show file tree
Hide file tree
Showing 34 changed files with 797 additions and 681 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ScatterGatherHelper<H, P extends ResourcePath<K>, K, R, BASIC_HTTP_
private final @Nullable ResponseAggregatorFactory<BASIC_HTTP_REQUEST, HTTP_RESPONSE> _responseAggregatorFactory;
private final @Nonnull Function<Headers, Long> _requestTimeout;
private final @Nonnull LongTailRetrySupplier<P, K> _longTailRetrySupplier;
private final @Nonnull Function<BasicRequest, Metrics> _metricsProvider;
private final @Nonnull Function<BASIC_HTTP_REQUEST, Metrics> _metricsProvider;
private final @Nonnull BiFunction<Headers, Metrics, Headers> _metricsDecorator;
private final @Nonnull Function<Headers, Metrics> _responseMetrics;
private final @Nonnull Function<P, ScatterGatherStats> _scatterGatherStatsProvider;
Expand Down Expand Up @@ -73,7 +73,7 @@ protected ScatterGatherHelper(
@Nonnull Optional<ResponseAggregatorFactory<BASIC_HTTP_REQUEST, HTTP_RESPONSE>> responseAggregatorFactory,
@Nonnull Function<Headers, Long> requestTimeout,
@Nonnull LongTailRetrySupplier<P, K> longTailRetrySupplier,
@Nonnull Function<BasicRequest, Metrics> metricsProvider,
@Nonnull Function<BASIC_HTTP_REQUEST, Metrics> metricsProvider,
@Nonnull BiFunction<Headers, Metrics, Headers> metricsDecorator,
@Nonnull Function<Headers, Metrics> responseMetrics,
@Nonnull Function<P, ScatterGatherStats> scatterGatherStatsProvider,
Expand Down Expand Up @@ -306,7 +306,7 @@ public void decorateResponse(@Nonnull Headers responseHeaders, @Nonnull Headers
}
}

public Metrics initializeMetrics(@Nonnull BasicRequest request) {
public Metrics initializeMetrics(@Nonnull BASIC_HTTP_REQUEST request) {
return _metricsProvider.apply(request);
}

Expand All @@ -318,7 +318,7 @@ public Metrics responseMetrics(@Nonnull Headers headers) {
return _scatterGatherStatsProvider.apply(path);
}

public static Builder<?, ?, ?, ?, ?, ?, ?> builder() {
public static <H, P extends ResourcePath<K>, K, R, HTTP_REQUEST extends BasicRequest, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> Builder<H, P, K, R, HTTP_REQUEST, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> builder() {
return new Builder<>();
}

Expand Down Expand Up @@ -354,7 +354,7 @@ public static class Builder<H, P extends ResourcePath<K>, K, R, HTTP_REQUEST ext
Optional.empty();
private Function<Headers, Long> _requestTimeout = headers -> null;
private LongTailRetrySupplier<P, K> _longTailRetrySupplier = (resourceName, methodName) -> AsyncFuture.cancelled();
private Function<BasicRequest, Metrics> _metricsProvider = http -> null;
private Function<HTTP_REQUEST, Metrics> _metricsProvider = http -> null;
private Function<Headers, Metrics> _responseMetrics = headers -> null;
private BiFunction<Headers, Metrics, Headers> _metricsDecorator = (headers, metrics) -> Headers.EMPTY_HEADERS;
private Function<P, ScatterGatherStats> _scatterGatherStatsProvider = path -> new ScatterGatherStats();
Expand Down Expand Up @@ -486,7 +486,7 @@ public Builder<H, P, K, R, HTTP_REQUEST, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> lo
}

public Builder<H, P, K, R, HTTP_REQUEST, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> metricsProvider(
@Nonnull Function<BasicRequest, Metrics> metricsProvider) {
@Nonnull Function<HTTP_REQUEST, Metrics> metricsProvider) {
_metricsProvider = Objects.requireNonNull(metricsProvider, "metricsProvider");
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class HttpConstants {
public static final String VENICE_ALLOW_REDIRECT = "X-VENICE-ALLOW-REDIRECT";

public static final String VENICE_CLIENT_COMPUTE = "X-VENICE-CLIENT-COMPUTE";
public static final String VENICE_CLIENT_COMPUTE_TRUE = "1";
public static final String VENICE_CLIENT_COMPUTE_FALSE = "0";

public static final int SC_MISDIRECTED_REQUEST = 421;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2351,4 +2351,12 @@ private ConfigKeys() {

public static final String SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP =
"server.delete.unassigned.partitions.on.startup";

/**
* The maximum number of entries (per type) to be cached in the {@link com.linkedin.venice.meta.NameRepository}.
*
* Default: {@value com.linkedin.venice.meta.NameRepository#DEFAULT_MAXIMUM_ENTRY_COUNT}
*/
public static final String NAME_REPOSITORY_MAX_ENTRY_COUNT = "name.repository.max.entry.count";

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* A repository of {@link StoreName} and {@link StoreVersionName}, which are intended to be used as shared instances.
*/
public class NameRepository {
private static final int DEFAULT_MAXIMUM_ENTRY_COUNT = 2000;
public static final int DEFAULT_MAXIMUM_ENTRY_COUNT = 2000;
private final LoadingCache<String, StoreName> storeNameCache;
private final LoadingCache<String, StoreVersionName> storeVersionNameCache;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.venice.meta;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.StoreVersionNotFoundException;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -243,6 +246,16 @@ static boolean isSystemStore(String storeName) {

List<Version> getVersions();

@JsonIgnore
default IntSet getVersionNumbers() {
List<Version> versions = getVersions();
IntSet versionNumbers = new IntOpenHashSet(versions.size());
for (Version version: versions) {
versionNumbers.add(version.getNumber());
}
return versionNumbers;
}

void setVersions(List<Version> versions);

void addVersion(Version version);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.linkedin.venice.meta;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.*;

import com.linkedin.venice.exceptions.StoreDisabledException;
import com.linkedin.venice.exceptions.StoreVersionNotFoundException;
Expand Down Expand Up @@ -313,44 +310,50 @@ public void testUseTheDeletedVersionNumber() {
public void testAddVersion() {
String storeName = Utils.getUniqueString("store");
Store store = TestUtils.createTestStore(storeName, "owner", System.currentTimeMillis());
assertNull(store.getVersion(2));
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(2));
assertNull(store.getVersion(5));
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(5));
assertNull(store.getVersion(6));
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(6));
assertEquals(store.getVersions().size(), 0);
assertMissingVersion(store, 2, 5, 6);
assertVersionCount(store, 0);

store.addVersion(new VersionImpl(storeName, 5));
assertNull(store.getVersion(2));
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(2));
assertNotNull(store.getVersion(5));
assertEquals(store.getVersionOrThrow(5).getNumber(), 5);
assertNull(store.getVersion(6));
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(6));
assertEquals(store.getVersions().size(), 1);
assertMissingVersion(store, 2, 6);
assertPresentVersion(store, 5);
assertVersionCount(store, 1);
// largest used version is 5
assertEquals(store.peekNextVersion().getNumber(), 6);

store.addVersion(new VersionImpl(storeName, 2));
assertNotNull(store.getVersion(2));
assertEquals(store.getVersionOrThrow(2).getNumber(), 2);
assertNotNull(store.getVersion(5));
assertEquals(store.getVersionOrThrow(5).getNumber(), 5);
assertNull(store.getVersion(6));
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(6));
assertEquals(store.getVersions().size(), 2);
assertMissingVersion(store, 6);
assertPresentVersion(store, 2, 5);
assertVersionCount(store, 2);
// largest used version is still 5
Assert.assertEquals(store.peekNextVersion().getNumber(), 6);

Version version = new VersionImpl(store.getName(), store.getLargestUsedVersionNumber() + 1, "pushJobId");
Assert.assertEquals(version.getNumber(), 6);
store.addVersion(version);
Assert.assertEquals(store.peekNextVersion().getNumber(), 7);
assertNotNull(store.getVersion(2));
assertEquals(store.getVersionOrThrow(2).getNumber(), 2);
assertNotNull(store.getVersion(5));
assertEquals(store.getVersionOrThrow(5).getNumber(), 5);
assertNotNull(store.getVersion(6));
assertEquals(store.getVersionOrThrow(6).getNumber(), 6);
assertEquals(store.getVersions().size(), 3);
assertPresentVersion(store, 2, 5, 6);
assertVersionCount(store, 3);
}

private void assertMissingVersion(Store store, int... versionNumbers) {
for (int versionNumber: versionNumbers) {
assertNull(store.getVersion(versionNumber));
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(versionNumber));
assertFalse(store.getVersionNumbers().contains(versionNumber));
}
}

private void assertPresentVersion(Store store, int... versionNumbers) {
for (int versionNumber: versionNumbers) {
assertNotNull(store.getVersion(versionNumber));
assertEquals(store.getVersionOrThrow(versionNumber).getNumber(), versionNumber);
assertTrue(store.getVersionNumbers().contains(versionNumber));
}
}

private void assertVersionCount(Store store, int versionCount) {
assertEquals(store.getVersions().size(), versionCount);
assertEquals(store.getVersionNumbers().size(), versionCount);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.venice.HttpConstants.VENICE_COMPRESSION_STRATEGY;
import static com.linkedin.venice.HttpConstants.VENICE_REQUEST_RCU;
import static com.linkedin.venice.HttpConstants.VENICE_SUPPORTED_COMPRESSION_STRATEGY;
import static com.linkedin.venice.utils.TestUtils.getVenicePathParser;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -26,6 +27,7 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.LiveInstanceMonitor;
import com.linkedin.venice.meta.NameRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.VeniceRouterConfig;
Expand Down Expand Up @@ -63,6 +65,8 @@

//TODO: refactor Dispatcher to take a HttpClient Factory, so we don't need to spin up an HTTP server for these tests
public class TestVeniceDispatcher {
private final NameRepository nameRepository = new NameRepository();

@Test
public void testErrorRetry() {
VeniceDispatcher dispatcher = getMockDispatcher(false, false);
Expand Down Expand Up @@ -420,9 +424,11 @@ private void triggerResponse(
return modifyingCompressor;
})).when(compressorFactory).getCompressor(any());

doReturn(new VeniceResponseDecompressor(true, routerStats, mockRequest, "test_store", 1, compressorFactory))
.when(mockPath)
.getResponseDecompressor();
VenicePathParser pathParser = getVenicePathParser(compressorFactory, true);

VeniceResponseDecompressor decompressor =
pathParser.getDecompressor(this.nameRepository.getStoreVersionName("test_store", 1), mockRequest);
doReturn(decompressor).when(mockPath).getResponseDecompressor();

AsyncPromise mockHostSelected = mock(AsyncPromise.class);
AsyncPromise mockTimeoutFuture = mock(AsyncPromise.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import static com.linkedin.venice.ConfigKeys.SERVER_FORKED_PROCESS_JVM_ARGUMENT_LIST;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE;
import static com.linkedin.venice.utils.Utils.getUniqueString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

Expand All @@ -26,6 +28,7 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.GzipCompressor;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
Expand Down Expand Up @@ -53,6 +56,7 @@
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.NameRepository;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.PartitionerConfigImpl;
Expand All @@ -76,6 +80,12 @@
import com.linkedin.venice.pubsub.api.PubSubTopicType;
import com.linkedin.venice.pubsub.manager.TopicManagerRepository;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.router.VeniceRouterConfig;
import com.linkedin.venice.router.api.VenicePartitionFinder;
import com.linkedin.venice.router.api.VenicePathParser;
import com.linkedin.venice.router.api.VeniceVersionFinder;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.router.stats.RouterStats;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
Expand All @@ -84,6 +94,7 @@
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.metrics.MetricsRepository;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
Expand All @@ -106,6 +117,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -987,4 +999,27 @@ public static String loadFileAsString(String fileName) {
return null;
}
}

public static VenicePathParser getVenicePathParser(CompressorFactory compressorFactory, boolean decompressOnClient) {
RouterStats stats = mock(RouterStats.class);
when(stats.getStatsByType(any())).thenReturn(mock(AggRouterHttpRequestStats.class));
ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class);
Store store = mock(Store.class);
when(store.getClientDecompressionEnabled()).thenReturn(decompressOnClient);
when(readOnlyStoreRepository.getStoreOrThrow(anyString())).thenReturn(store);

VeniceRouterConfig routerConfig = mock(VeniceRouterConfig.class);
when(routerConfig.isDecompressOnClient()).thenReturn(decompressOnClient);

return new VenicePathParser(
mock(VeniceVersionFinder.class),
mock(VenicePartitionFinder.class),
stats,
readOnlyStoreRepository,
routerConfig,
compressorFactory,
mock(MetricsRepository.class),
mock(ScheduledExecutorService.class),
new NameRepository());
}
}
2 changes: 2 additions & 0 deletions services/venice-router/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ dependencies {
exclude group: 'org.mockito', module: 'mockito-all' // this will introduce another different mockito-all version
}

implementation libraries.fastUtil

implementation('org.apache.helix:helix-core:1.4.1:jdk8') {
exclude group: 'org.apache.helix'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.venice.router;

import java.util.TreeMap;


/**
* A facade for the {@link VeniceRouterConfig}, so that retry-related configs can be passed around without giving access
* to the rest of the configs.
*/
public interface RouterRetryConfig {
TreeMap<Integer, Integer> getLongTailRetryForBatchGetThresholdMs();

int getLongTailRetryForSingleGetThresholdMs();

int getLongTailRetryMaxRouteForMultiKeyReq();

int getSmartLongTailRetryAbortThresholdMs();

boolean isSmartLongTailRetryEnabled();
}
Loading

0 comments on commit d1bd342

Please sign in to comment.