Skip to content

Made segment upload to remote async #18333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 143 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2158,6 +2158,79 @@ public RemoteSegmentStoreDirectory getRemoteDirectory() {
return ((RemoteSegmentStoreDirectory) remoteDirectory);
}

/**
* Waits for the local commit to be uploaded to the remote store within the specified timeout.
* This method captures the local commit information once at the start and then periodically checks if
* that specific commit is in the remote store. This approach prevents issues when multiple local commits
* are happening during the wait period.
*
* @param timeout The maximum time to wait for the commit to be uploaded
* @param pollInterval The interval between checks
* @return true if the commit was uploaded within the timeout period, false otherwise
* @throws AlreadyClosedException if the shard is closed during the wait
*/
public boolean waitForLocalCommitToBeUploadedToRemote(TimeValue timeout, TimeValue pollInterval) throws IOException {
assert indexSettings.isAssignedOnRemoteNode();

if (timeout == null || pollInterval == null) {
throw new IllegalArgumentException("Timeout and poll interval must not be null");
}

long startTimeNanos = System.nanoTime();
long timeoutNanos = timeout.nanos();
long pollIntervalMillis = pollInterval.millis();

// Capture the current local commit generation once at the start
long localCommitGeneration;
try (GatedCloseable<IndexCommit> localCommit = acquireLastIndexCommit(false)) {
localCommitGeneration = localCommit.get().getGeneration();
logger.debug(
"Waiting for local commit generation [{}] to be uploaded to remote (timeout: {}, poll interval: {})",
localCommitGeneration,
timeout,
pollInterval
);
} catch (Exception e) {
logger.error("Failed to get local commit generation", e);
return false;
}

while (System.nanoTime() - startTimeNanos < timeoutNanos) {
try {
// Check if the remote has a commit with generation >= our captured local commit generation
RemoteSegmentMetadata remoteMetadata = null;
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
remoteMetadata = directory.readLatestMetadataFile();
} catch (Exception e) {
logger.debug("Error reading remote metadata file", e);
}

if (remoteMetadata != null && remoteMetadata.getGeneration() >= localCommitGeneration) {
logger.debug(
"Local commit generation [{}] has been successfully uploaded to remote within {} ms",
localCommitGeneration,
TimeValue.nsecToMSec(System.nanoTime() - startTimeNanos)
);
return true;
}

try {
Thread.sleep(pollIntervalMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for local commit to be uploaded to remote", e);
}
} catch (AlreadyClosedException e) {
logger.trace("Shard closed while waiting for local commit to be uploaded to remote");
throw e;
}
}

logger.warn("Timed out waiting for local commit generation [{}] to be uploaded to remote after {}", localCommitGeneration, timeout);
return false;
}

/**
* Returns true iff it is able to verify that remote segment store
* is in sync with local
Expand Down Expand Up @@ -2203,40 +2276,98 @@ public void waitForRemoteStoreSync() throws IOException {
Throws IOException if the remote store is not synced within the timeout
*/
public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
waitForRemoteStoreSyncWithConfig(getRecoverySettings().internalRemoteUploadTimeout(), TimeValue.timeValueSeconds(30), onProgress);
}

/**
* Waits for remote store to sync with configurable timeout and poll interval.
* The method will periodically check if remote segments store is in sync with local store.
*
* @param timeout The maximum time to wait for the sync to complete
* @param pollInterval The time to wait between checks
* @param onProgress A runnable that will be executed when progress is detected
* @throws IOException if the sync doesn't complete within the timeout period
*/
private void waitForRemoteStoreSyncWithConfig(TimeValue timeout, TimeValue pollInterval, Runnable onProgress) throws IOException {
assert indexSettings.isAssignedOnRemoteNode();

if (timeout == null || pollInterval == null) {
throw new IllegalArgumentException("Timeout and poll interval must not be null");
}

RemoteSegmentStoreDirectory directory = getRemoteDirectory();
int segmentUploadeCount = 0;
int segmentUploadCount = 0;

if (shardRouting.primary() == false) {
return;
}

long startNanos = System.nanoTime();

while (System.nanoTime() - startNanos < getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
while (System.nanoTime() - startNanos < timeout.nanos()) {
try {
if (isRemoteSegmentStoreInSync()) {
return;
} else {
if (directory.getSegmentsUploadedToRemoteStore().size() > segmentUploadeCount) {
if (directory.getSegmentsUploadedToRemoteStore().size() > segmentUploadCount) {
onProgress.run();
logger.debug("Uploaded segment count {}", directory.getSegmentsUploadedToRemoteStore().size());
segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size();
segmentUploadCount = directory.getSegmentsUploadedToRemoteStore().size();
}
try {
Thread.sleep(TimeValue.timeValueSeconds(30).millis());
Thread.sleep(pollInterval.millis());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
throw new OpenSearchException("Interrupted while waiting for remote store sync", ie);
}
}
} catch (AlreadyClosedException e) {
// There is no point in waiting as shard is now closed .
// There is no point in waiting as shard is now closed
return;
}
}
throw new IOException(
"Failed to upload to remote segment store within remote upload timeout of "
+ getRecoverySettings().internalRemoteUploadTimeout().getMinutes()
+ " minutes"
);
throw new IOException("Failed to upload to remote segment store within timeout of " + timeout);
}

/**
* Waits for remote store to sync with configurable timeout and poll interval.
* The method will periodically check if remote segments store is in sync with local store.
*
* @param timeout The maximum time to wait for the sync to complete
* @param pollInterval The time to wait between checks
* @throws IOException if the sync doesn't complete within the timeout period
*/
public void awaitRemoteStoreSync(TimeValue timeout, TimeValue pollInterval) throws IOException {
waitForRemoteStoreSyncWithConfig(timeout, pollInterval, () -> {});
}

/**
* Blocks until remote store is synced with local shard or timeout occurs.
* This method should only be used in tests.
*
* @throws IOException if there is some failure while checking remote store sync status
*/
public void awaitRemoteStoreSync() throws IOException {
if (indexSettings.isAssignedOnRemoteNode() == false) {
return;
}
awaitRemoteStoreSync(TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(20));
}

/**
* Gets the local segment info files.
*
* @return Collection of local segment info files
* @throws IOException if there's an error reading segment info
*/
private Collection<String> getLocalSegmentInfosFiles() throws IOException {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
return segmentInfosGatedCloseable.get().files(true);
} catch (AlreadyClosedException e) {
throw e;
} catch (Throwable e) {
logger.error("Exception while reading latest metadata", e);
throw new IOException("Failed to read segment info files", e);
}
}

public void preRecovery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ReleasableRetryableRefreshListener(ThreadPool threadPool) {
}

@Override
public final void afterRefresh(boolean didRefresh) throws IOException {
public void afterRefresh(boolean didRefresh) throws IOException {
if (closed.get()) {
return;
}
Expand Down Expand Up @@ -157,7 +157,7 @@ protected boolean isRetryEnabled() {
* The synchronised block ensures that if there is a retry or afterRefresh waiting, then it waits until the previous
* execution finishes.
*/
private synchronized void runAfterRefreshWithPermit(boolean didRefresh, Runnable runFinally) {
protected synchronized void runAfterRefreshWithPermit(boolean didRefresh, Runnable runFinally) {
if (closed.get()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
}
}

@Override
public final void afterRefresh(boolean didRefresh) throws IOException {
if (isClosed()) {
return;
}
runAfterRefreshExactlyOnce(didRefresh);
this.indexShard.getThreadPool()
.schedule(
() -> runAfterRefreshWithPermit(didRefresh, () -> {}),
new TimeValue(0, TimeUnit.MILLISECONDS),
getRemoteRefreshThreadPoolName()
);
}

/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded.
Expand Down Expand Up @@ -380,6 +394,10 @@ protected String getRetryThreadPoolName() {
return ThreadPool.Names.REMOTE_REFRESH_RETRY;
}

private String getRemoteRefreshThreadPoolName() {
return ThreadPool.Names.REMOTE_REFRESH_SEGMENT_SYNC;
}

private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -416,6 +417,11 @@ private void snapshot(
commitGeneration = lastRemoteUploadedIndexCommit.getGeneration();
} else {
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
// ToDo: Validate that this check is sufficient to handle snapshot.
indexShard.waitForLocalCommitToBeUploadedToRemote(
TimeValue.timeValueSeconds(2),
TimeValue.timeValueMillis(100)
);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public static class Names {
public static final String REMOTE_STATE_READ = "remote_state_read";
public static final String INDEX_SEARCHER = "index_searcher";
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
public static final String REMOTE_REFRESH_SEGMENT_SYNC = "remote_refresh_segment_sync";
}

static Set<String> scalingThreadPoolKeys = new HashSet<>(Arrays.asList("max", "core"));
Expand Down Expand Up @@ -201,6 +202,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.FIXED);
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
map.put(Names.REMOTE_REFRESH_SEGMENT_SYNC, ThreadPoolType.SCALING);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}

Expand Down Expand Up @@ -325,6 +327,10 @@ public ThreadPool(
Names.REMOTE_STATE_CHECKSUM,
new FixedExecutorBuilder(settings, Names.REMOTE_STATE_CHECKSUM, ClusterStateChecksum.COMPONENT_SIZE, 1000)
);
builders.put(
Names.REMOTE_REFRESH_SEGMENT_SYNC,
new ScalingExecutorBuilder(Names.REMOTE_REFRESH_SEGMENT_SYNC, 1, halfProc, TimeValue.timeValueMinutes(5))
);

for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
Expand Down
Loading
Loading