Skip to content

Conversation

@ivandika3
Copy link
Contributor

@ivandika3 ivandika3 commented Jan 17, 2026

What changes were proposed in this pull request?

This task is to come up with the basic implementation of follower read client proxy as a baseline before further performance improvements. The idea is to simply pick an OM node in random (which can be a leader or follower) and use it to submit read requests. The read requests need to keep sending to that OM node unless the OM is down which triggers failover. Write requests should be sent to the OM leader directly.

Further improvements such as followers affinity or picking OM based on latency, applied index, etc will be implemented in follow up tasks. The main focus of this patch is to ensure that long-lived client (e.g. S3G Ozone clients) will stick to the OM follower once it picks it as the current proxy. In the previous leader proxy provider implementation, the client only read from followers until a new write request triggers OMNotLeaderException and the failover causes proxy to always be pointing to the leader.

The implementation is to introduce HadoopRpcOMFollowerReadProxyProvider which wraps
HadoopRpcOMFailoverProxyProvider. FollowerReadProxyProvider tracks a different currentOmNodeId from HadoopRpcOMFailoverProxyProvider. FollowerReadInvocationHandler will check whether the request is a read request (using OmUtils#isReadOnly) and if so forwards it to its current proxy. If it's a write request, the request if forwarded to HadoopRpcOMFailoverProxyProvider to be sent to the leader.

So the proxy hierarchy (each with its own InvocationHandler) is

  • TracingUtil's proxy (InvocationHandler: TraceAllMethod)
    • RetryProxy (InvocationHandler: RetryInvocationHandler)
      • HadoopRpcOMFollowerReadProxyProvider (InvocationHandler: FollowerReadInvocationHandler)
        • ProtocolProxy (which is created in OMFailoverProxyProviderBase#createOMProxy): ProtobufRpcEngine.Invoker

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-14379

How was this patch tested?

UT and IT.

Clean CI: https://github.com/ivandika3/ozone/actions/runs/21102966224

@ivandika3 ivandika3 marked this pull request as ready for review January 18, 2026 05:09
@ivandika3
Copy link
Contributor Author

cc: @greenwich

@ivandika3 ivandika3 self-assigned this Jan 18, 2026
@szetszwo
Copy link
Contributor

@ivandika3 , thanks for working on this! I am reviewing this. The change is quite big. Need some time.

@chungen0126 chungen0126 self-requested a review January 20, 2026 05:40
Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , thanks a lot for working on this and adding a lot of tests! Please see the comments inlined.

BTW, filed HDDS-14455 and HDDS-14470 for improving the current code.

Comment on lines +126 to 127
protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
if (omProxyInfo.proxy == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronized is for omProxyInfo.proxy. We probably should do it in OMProxyInfo. Let me fix it in HDDS-14470.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will wait until HDDS-14470 is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let merge this first if it is ready. I can update the change for HDDS-14470.

new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
}

@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to wrappedProxy. Then it would only suppress the warning there but not the entire method.

+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -103,7 +103,6 @@ public HadoopRpcOMFollowerReadFailoverProxyProvider(
         new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
   }

-  @SuppressWarnings("unchecked")
   public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
       HadoopRpcOMFailoverProxyProvider<T> failoverProxy) throws IOException {
     this.protocolClass = protocol;
@@ -119,6 +118,7 @@ public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T>
       combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
     }
     combinedInfo.append(']');
+    @SuppressWarnings("unchecked")
     T wrappedProxy = (T) Proxy.newProxyInstance(
         FollowerReadInvocationHandler.class.getClassLoader(),
         new Class<?>[] {protocol}, new FollowerReadInvocationHandler());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 114 to 121
StringBuilder combinedInfo = new StringBuilder("[");
for (int i = 0; i < failoverProxy.getOMProxies().size(); i++) {
if (i > 0) {
combinedInfo.append(',');
}
combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
}
combinedInfo.append(']');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may use "map/reduce".

    final String combinedInfo = "[" + failoverProxy.getOMProxies().stream()
        .map(a -> a.proxyInfo)
        .reduce((a, b) -> a + ", " + b).orElse("") + "]";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

}

@SuppressWarnings("unchecked")
public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity, let support only OzoneManagerProtocolPB for now? Then, we can remove the followerReadEnabled and simplify the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Could you clarify on this?

Currently there are guards to ensure that follower read only supports OzoneManagerProtocolPB. One is during construction by disabling useFollowerRead if it's not OzoneManagerProtocolPB. Second is during FollowerReadInvocationHandler.

Do you mean to make HadoopRpcOMFollowerReadFailoverProxyProvider to be HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> implements FailoverProxy<OzoneManagerProtocolPB>? I believe this might mean that we need to replace T generic type parameter with OzoneManagerProtocolPB. This adds verbosity and reduces the generic benefit, not sure if this is a good tradeoff for removing userFollowerRead.

Please let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the case that it uses follower read only if protocol == OzoneManagerProtocolPB.class? If yes, we don't pass the protocol parameter.

Comment on lines 168 to 179
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
if (args == null || args.length < 2 || !(args[1] instanceof Message)) {
LOG.error("Request failed since OM request is null and cannot be parsed");
// Throws a non-retriable exception to prevent retry and failover
// See the HddsUtils#shouldNotFailoverOnRpcException used in
// OMFailoverProxyProviderBase#shouldFailover
throw wrapInServiceException(
new RpcNoSuchProtocolException("OM request is null and cannot be parsed"));
}
final Message theRequest = (Message) args[1];
return (OMRequest) theRequest;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's provide a more specific error message:

  private static OMRequest parseOMRequest(Object[] args) throws ServiceException {
    final String error = args == null ? "args == null"
        : args.length < 2 ? "args.length == " + args.length + " < 2"
        : !(args[1] instanceof OMRequest) ? "Non-OMRequest: " + args[1].getClass()
        : null;
    if (error != null) {
      // Throws a non-retriable exception to prevent retry and failover
      // See the HddsUtils#shouldNotFailoverOnRpcException used in
      // OMFailoverProxyProviderBase#shouldFailover
      throw wrapInServiceException(new RpcNoSuchProtocolException("Failed to parseOMRequest: " + error));
    }
    return (OMRequest) args[1];
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to handle null args[1]. I also changed this to use normal if else blocks since I personally find chained ternary operators hard to read.

public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
lastProxy = null;
if (method.getDeclaringClass() == Object.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check (method.getDeclaringClass() != OzoneManagerProtocolPB.class) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, better. Updated, thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this due to failing test.

The actual Method#getDeclaringClass is org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos$OzoneManagerService$BlockingInterface so all submitRequest will go to this block.

Copy link
Contributor Author

@ivandika3 ivandika3 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, I tried to change the proxy to be pass through if it's not OzoneManagerProtocolPB.class

From

return method.invoke(this, args);

to

return method.invoke(proxy, args);

However, it seems because org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos$OzoneManagerService$BlockingInterface is not concrete class, it throws exception when usineg Object#toString.

Caused by: java.lang.NoSuchMethodException: org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB.toString()

Not sure if there is a way to handle all cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying it! Let's keep using Object.class.

Comment on lines 407 to 412
private static Throwable wrapInServiceException(Throwable e) {
if (e instanceof ServiceException) {
return e;
}
return new ServiceException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws ServiceException.

  private static void throwServiceException(Throwable e) throws ServiceException {
    throw e instanceof ServiceException ? (ServiceException) e : new ServiceException(e);
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated. However, retVal needs to be initialized to null since the compiler cannot detect that throwServiceException always throws exception (since it might think that it simply returns). So I have initialized retVal to null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, we may return ServiceException. Anyway, both are about the same.

  private static ServiceException getServiceException(Throwable e) {
    return e instanceof ServiceException ? (ServiceException) e : new ServiceException(e);
  }

*
* @return parsed OM request.
*/
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws ServiceException instead of Throwable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

* Whether reading from follower is enabled. If this is false, all read
* requests will still go to OM leader.
*/
private volatile boolean followerReadEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename it to useFollowerRead to avoid confusion with the conf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

OMRequest omRequest = parseOMRequest(args);
if (followerReadEnabled && OmUtils.shouldSendToFollower(omRequest)) {
int failedCount = 0;
for (int i = 0; i < failoverProxy.getOmNodesInOrder().size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check followerReadEnabled: followerReadEnabled && i < failoverProxy.getOmNodesInOrder().size()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Copy link
Contributor Author

@ivandika3 ivandika3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szetszwo Thanks for the review.

Comment on lines +126 to 127
protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
if (omProxyInfo.proxy == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will wait until HDDS-14470 is merged.

* Whether reading from follower is enabled. If this is false, all read
* requests will still go to OM leader.
*/
private volatile boolean followerReadEnabled;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
}

@SuppressWarnings("unchecked")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 114 to 121
StringBuilder combinedInfo = new StringBuilder("[");
for (int i = 0; i < failoverProxy.getOMProxies().size(); i++) {
if (i > 0) {
combinedInfo.append(',');
}
combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
}
combinedInfo.append(']');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

*
* @return parsed OM request.
*/
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 168 to 179
private static OMRequest parseOMRequest(Object[] args) throws Throwable {
if (args == null || args.length < 2 || !(args[1] instanceof Message)) {
LOG.error("Request failed since OM request is null and cannot be parsed");
// Throws a non-retriable exception to prevent retry and failover
// See the HddsUtils#shouldNotFailoverOnRpcException used in
// OMFailoverProxyProviderBase#shouldFailover
throw wrapInServiceException(
new RpcNoSuchProtocolException("OM request is null and cannot be parsed"));
}
final Message theRequest = (Message) args[1];
return (OMRequest) theRequest;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
lastProxy = null;
if (method.getDeclaringClass() == Object.class) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, better. Updated, thanks.

OMRequest omRequest = parseOMRequest(args);
if (followerReadEnabled && OmUtils.shouldSendToFollower(omRequest)) {
int failedCount = 0;
for (int i = 0; i < failoverProxy.getOmNodesInOrder().size(); i++) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines 407 to 412
private static Throwable wrapInServiceException(Throwable e) {
if (e instanceof ServiceException) {
return e;
}
return new ServiceException(e);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated. However, retVal needs to be initialized to null since the compiler cannot detect that throwServiceException always throws exception (since it might think that it simply returns). So I have initialized retVal to null.

}

@SuppressWarnings("unchecked")
public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Could you clarify on this?

Currently there are guards to ensure that follower read only supports OzoneManagerProtocolPB. One is during construction by disabling useFollowerRead if it's not OzoneManagerProtocolPB. Second is during FollowerReadInvocationHandler.

Do you mean to make HadoopRpcOMFollowerReadFailoverProxyProvider to be HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> implements FailoverProxy<OzoneManagerProtocolPB>? I believe this might mean that we need to replace T generic type parameter with OzoneManagerProtocolPB. This adds verbosity and reduces the generic benefit, not sure if this is a good tradeoff for removing userFollowerRead.

Please let me know what you think.

# Conflicts:
#	hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
#	hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
#	hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
#	hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

Comment on lines +126 to 127
protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
if (omProxyInfo.proxy == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let merge this first if it is ready. I can update the change for HDDS-14470.

}

@SuppressWarnings("unchecked")
public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the case that it uses follower read only if protocol == OzoneManagerProtocolPB.class? If yes, we don't pass the protocol parameter.

public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
lastProxy = null;
if (method.getDeclaringClass() == Object.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying it! Let's keep using Object.class.

@szetszwo szetszwo merged commit a698f48 into apache:master Jan 22, 2026
44 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants