Software Development

Request Ready Record


The cluster node maintains a ready checklist which maps a key and
a callback operate. The secret’s chosen relying on the precise
standards to invoke the callback.
For instance if it must be invoked every time
a message from different cluster node is obtained, it may be the
Correlation Identifer of the message.
Within the case of Replicated Log it’s the
Excessive-Water Mark. The callback handles
the response and decides if the consumer request may be fulfilled.

Think about the instance of a key-value retailer the place, knowledge is replicated
on a number of servers. Right here, Quorum can be utilized
to determine when a replication may be thought-about profitable
to provoke a response to the consumer.
The cluster node then tracks the requests despatched to different cluster nodes,
and a callback is registered with every request.
Every request is marked with a Correlation Identifer,
which is used to map response to the request.
The ready checklist is then notified to invoke the callback
when the response from different cluster nodes are obtained.

For the sake of this instance, let’s name our three cluster nodes
athens, byzantium and cyrene.
The consumer connects with athens to retailer “title” as “Microservices”.
Athens replicates it on byzantium and cyrene; so it sends
a request to itself to retailer the key-value and sends
requests to each byzantium and cyrene concurrently.
To trace responses, athens creates a WriteQuorumResponseCallback
and provides it to the ready checklist for every of the requests despatched.

For each response obtained, the WriteQuorumResponseCallback is
invoked to deal with the response. It checks whether or not the required quantity
of responses have been obtained.
As soon as the response is obtained from byzantium, the quorum is reached
and the pending consumer request is accomplished.
Cyrene can reply later, however the response may be despatched to the consumer
with out ready for it.

The code appears to be like just like the pattern under:
Observe that each cluster node maintains its personal occasion of a ready checklist.
The ready checklist tracks the important thing and related callback and
shops the timestamp at which the callback was registered.
The timestamp is used to verify whether or not the callbacks must be expired
if responses have not been obtained throughout the anticipated time.

public class RequestWaitingList<Key, Response> {
    non-public Map<Key, CallbackDetails> pendingRequests = new ConcurrentHashMap<>();
    public void add(Key key, RequestCallback<Response> callback) 
        pendingRequests.put(key, new CallbackDetails(callback, clock.nanoTime()));
class CallbackDetails 
    RequestCallback requestCallback;
    lengthy createTime;

    public CallbackDetails(RequestCallback requestCallback, lengthy createTime) 
        this.requestCallback = requestCallback;
        this.createTime = createTime;

    public RequestCallback getRequestCallback() 
        return requestCallback;

    public lengthy elapsedTime(lengthy now) 
        return now - createTime;

public interface RequestCallback<T> 
    void onResponse(T r);
    void onError(Throwable e);

It’s requested to deal with the response or error
as soon as the response has been obtained from the opposite cluster node.

class RequestWaitingList…

  public void handleResponse(Key key, Response response) 
      if (!pendingRequests.containsKey(key)) 
      CallbackDetails callbackDetails = pendingRequests.take away(key);


class RequestWaitingList…

  public void handleError(int requestId, Throwable e) 
      CallbackDetails callbackDetails = pendingRequests.take away(requestId);

The ready checklist can then be used to deal with quorum responses
with the implementation wanting one thing like this:

static class WriteQuorumCallback implements RequestCallback<RequestOrResponse> 
    non-public last int quorum;
    non-public risky int expectedNumberOfResponses;
    non-public risky int receivedResponses;
    non-public risky int receivedErrors;
    non-public risky boolean finished;

    non-public last RequestOrResponse request;
    non-public last ClientConnection clientConnection;

    public WriteQuorumCallback(int totalExpectedResponses, RequestOrResponse clientRequest, ClientConnection clientConnection) 
        this.expectedNumberOfResponses = totalExpectedResponses;
        this.quorum = expectedNumberOfResponses / 2 + 1;
        this.request = clientRequest;
        this.clientConnection = clientConnection;

    public void onResponse(RequestOrResponse response) 
        if (receivedResponses == quorum && !finished) 
            finished = true;

    public void onError(Throwable t) 
        if (receivedErrors == quorum && !finished) 
            finished = true;

    non-public void respondToClient(String response) 
        clientConnection.write(new RequestOrResponse(RequestId.SetValueResponse.getId(), response.getBytes(), request.getCorrelationId()));

Every time a cluster node sends requests to different nodes,
it provides a callback to the ready checklist mapping with the Correlation Identifer
of the request despatched.

class ClusterNode…

  non-public void handleSetValueClientRequestRequiringQuorum(Record<InetAddressAndPort> replicas, RequestOrResponse request, ClientConnection clientConnection) 
      int totalExpectedResponses = replicas.dimension();
      RequestCallback requestCallback = new WriteQuorumCallback(totalExpectedResponses, request, clientConnection);
      for (InetAddressAndPort duplicate : replicas) 
          int correlationId = nextRequestId();
          requestWaitingList.add(correlationId, requestCallback);
              SocketClient consumer = new SocketClient(duplicate);
              consumer.sendOneway(new RequestOrResponse(RequestId.SetValueRequest.getId(), request.getMessageBodyJson(), correlationId, listenAddress));
           catch (IOException e) 
              requestWaitingList.handleError(correlationId, e);

As soon as the response is obtained, the ready checklist is requested to deal with it:

class ClusterNode…

  non-public void handleSetValueResponse(RequestOrResponse response) 
      requestWaitingList.handleResponse(response.getCorrelationId(), response);

The ready checklist will then invoke the related WriteQuorumCallback.
The WriteQuorumCallback occasion verifies if
the quorum responses have been obtained and invokes the callback
to answer the consumer.

Expiring Lengthy Pending Requests

Typically, responses from the opposite cluster nodes are
delayed. In these situations the ready checklist typically has
a mechanism to run out requests after a timeout:

class RequestWaitingList…

  non-public SystemClock clock;
  non-public ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
  non-public lengthy expirationIntervalMillis = 2000;
  public RequestWaitingList(SystemClock clock) 
      this.clock = clock;
      executor.scheduleWithFixedDelay(this::expire, expirationIntervalMillis, expirationIntervalMillis, MILLISECONDS);

  non-public void expire() 
      lengthy now = clock.nanoTime();
      Record<Key> expiredRequestKeys = getExpiredRequestKeys(now); -> 
          CallbackDetails request = pendingRequests.take away(expiredRequestKey);
          request.requestCallback.onError(new TimeoutException("Request expired"));

  non-public Record<Key> getExpiredRequestKeys(lengthy now) 
      return pendingRequests.entrySet().stream().filter(entry -> entry.getValue().elapsedTime(now) > expirationIntervalMillis).map(e -> e.getKey()).gather(Collectors.toList());

What's your reaction?

Leave A Reply

Your email address will not be published. Required fields are marked *