Enhancing Streaming and Request Handling Efficiency with gRPC and Coroutines: Part 3

Enhancing Streaming and Request Handling Efficiency with gRPC and Coroutines: Part 3

Overview

In previous articles, we primarily focused on the gateway side, where we handle requests from OQS and fetch the necessary data from workers. The gateway operates as both a client and a server.  To manage the heavy workload of handling 2,160 workers with only 64 threads, we use coroutines to avoid slowing down when juggling lots of requests at once. On the worker side, each worker handles one request at a time, so they don't need coroutines as everything runs smoothly with one thread.  We have specific threads—one for handling user queries and another for analytics (there are more threads, like for schemaless query, but I will skip this).

In this article, I’ll demonstrate an alternative approach for writing an efficient server-side solution for instances that handle fewer requests. I’ll also use std::shared_ptr to automate memory management and avoid manual allocation. Additionally, we will focus on implementing a mechanism for interrupting queries, enabling the cancellation of long-running requests.

Protobuf structure

As usual, we begin by defining how the Request will be structured in the protobuf file.

message Request { 
    string message = 1; 
} 

message Response { 
    string message = 1; 
} 

service Worker { 
    rpc Handle (Request) returns (Response) {} 
} 

Listing 1. Example protobuf structure for streaming a Handle request.

Based on the method specified in the Request JSON message, the worker will determine which thread will handle it. The response will also be in JSON format, which is why we can define the response as a string. Both the server and client will parse this string and create a JSON object to interpret the request and response.

Request Context class

Since everything will be handled asynchronously, we need to allocate memory and store all necessary data—such as the server context, request messages, and so on—in a single location. After completing a request, we need to release the memory to make room for new requests. Previously, we handled this process manually, which may be risky and may cause a memory leak. A more reliable solution is to use a reference-counting mechanism. Each time an object is copied, its reference count increases. When the last reference is destroyed, the memory is automatically freed. In C++, this is managed using std::shared_ptr.

As mentioned earlier, the request can be in three states: Pending, Finished, or Interrupted (asynchronously completed). We will create a class to represent each state and implement the corresponding logic for each state. For example, a Pending state will initialize a new request, while a Finished state will finalize the request and send a status code. Each state will be represented by a tag which hold a std::shared_ptr to the context, ensuring that the memory is only freed when all tags have been processed. This approach ensures automatic memory management and immediate cleanup once the request is finished.

To cater to the varied logic of each request, we can employ the Template Method design pattern. This pattern allows subclasses to define their own behavior for certain functions, like init or createNewHandler, while adhering to a common structure provided by the base class. Currently, we only have the Handle method, but in the future, we can easily add as many methods as needed to extend the functionality.

class RequestContext : 
    public std::enable_shared_from_this<RequestContext> { 
public: 

    using Callback = 
        std::function<void(std::shared_ptr<RequestContext>)>; 

    RequestContext(grpc::ServerCompletionQueue* cq, 
                   Callback callback, 
                   Worker::AsyncService* service); 

    // Virtual D’tor and other rule of 5 

    void tryCancel(bool ok); 

    const std::atomic<bool>& cancelled() const&;

    virtual void run(); 

    virtual void initRequest(const std::string& requestName); 

    virtual void finish(const QByteArray& result) = 0; 

protected: 
    virtual void init(HandlerTag& pendingTag) = 0; 

    virtual void createNewHandler() = 0; 

private: 
    HandlerTag _pendingTag; 
    HandlerTag _finishTag; 
    HandlerTag _doneTag; 
    // and other members 
}; 

Listing 2. RequestContext class

class HandlerTag { 
public: 
    using TagFunction = 
        std::function<void(RequestContext&, bool ok)>; 

    HandlerTag(std::shared_ptr<RequestContext> context, 
               TagFunction function); 

    void run(bool ok); 

    RequestContext* release(); 

private: 
    std::shared_ptr<RequestContext> _context; 
    TagFunction _function; 
    bool _processed{false}; 
}; 

Listing 3. HandlerTag class

Initialize server

We’re all set to initialize the server and start processing messages. The next step is to create a completion queue and enter a loop to process the tags. Since our tag is an object, we can simply cast it and run the appropriate logic. We can leave the memory management headaches behind, as it's all release automatically. The main things we need to focus on are error handling, deadlines and managing request interruptions.

Img1. Overview of requests handling by worker

Error handling can be easily managed using a try-catch block. If an error occurs while preparing a response or during function execution, we can catch it and send the error message as the response. Dealing with lost client connections can present a more complex challenge. In scenarios where the server is unable to send a response due to a lost connection, the server won't initiate an automatic retry. Instead, it will remain in a state where it waits for the same request to come back around, giving it another shot at sending the response. If this solution is not ideal, we could implement a retry mechanism that triggers a reattempt to send the message. This approach can be useful, especially when it takes a significant amount of time to collect data. Rather than re-collecting the data, we can try several times to send the response, potentially improving the system's resilience in the face of intermittent communication failures.

Navigating deadlines introduces yet another layer of complexity. When do we kick off the countdown for these timelines? Is it right after the request is sent? Since the worker queues requests and handles them one by one, a request might spend up to 2 minutes waiting in the queue. How should we handle a situation when a request takes 10ms to process, but spends 2 minutes in the queue? Should the deadline be considered reached in this case?

In Terrarium, we decided to start measuring the deadline from the moment we send a request from OQS. For example, if a request has a 5-minute deadline, we set the deadline to "now + 5 minutes." So, if a request spends 1 minute in the gateway queue and another 2 minutes in the worker queue, the worker will have 2 minutes to finish processing the request. If the worker doesn't complete the request in that time, it will be interrupted. This solution helps protect the system from overly heavy requests that could block the entire process and make the gateway unresponsive. For example, if a worker spends 15 minutes processing an analytics request, other requests may be left waiting indefinitely without a response. By enforcing deadlines in this way, we ensure that long-running requests don't block the system and maintain the gateway's responsiveness.

The last piece of the puzzle revolves around handling client-initiated request interruptions. For example, we may initiate a task that takes several minutes to complete, such as fetching heavy data. However, after just 10 seconds, we realize the query was wrong and want to stop it. Without a proper interruption mechanism, we'd have to wait for the entire task to finish (e.g., 4 minutes) only to get an unwanted result, then submit a new query.

While gRPC supports cancellation, implementing this in the asynchronous version can be a bit tricky. Our strategy involves using two tags: Finish and Interrupt, and each request needs to process both tags—even if the request isn't interrupted. These tags can arrive in any order, so determining when to stop a task isn't straightforward. For example, an interrupt tag could arrive at the end of a request, after it's already being finalized.

Managing interrupt tags presents another hurdle: once we receive a tag, how do we actually interrupt a running function? Our best course of action involves meticulously designing interruptible segments within the function. This strategic planning should be an integral part of the initial design phase when crafting the data collection algorithm. For instance, if we're reading data from a file inside a loop, we can check for a cancellation request after each iteration. Upon receiving an interrupt tag, we can trigger a callback or set a flag. The function that fetches the data will periodically check this flag and stop if necessary, or the callback function will prevent further calculations. This way, the request can be interrupted smoothly without waiting for the task to finish.

Img2. Sequence diagram of request handling

Since we use std::shared_ptr, we don't need to worry about manually calling cancel, even after the query is finished. This is because all the necessary data is stored in the RequestContext object, which will only be deleted once all tags have been processed. So, even if we receive the Finish tag first and the Interrupt tag later, we can still set a flag or invoke a callback. The object will not be deallocated until both tags are processed, ensuring that no memory is prematurely freed. If the interrupt is received after the finish, it will essentially do nothing, but it won't cause a crash. When the last tag is processed, the memory will be automatically released.

Img3. Processing a pending tag. Handling request and verify cancellation.

After designing the server, we can quickly implement a solution that addresses all the mentioned issues. The first step is to add a function to the server that creates an initial handler to set up the gRPC Handle method. This function will push a completion queue, a callback, and a pointer to the service, allowing the server to manage requests and handle asynchronous processing effectively.

void Handler::initHandleRequest() { 
    auto handler = std::make_shared<HandleContext>( 
           handlerCq.get(), 
           std::bindfront(&Handler::grpcHandle, this), 
           this); 

    // We use enable shared form this
    // so shared_ptr can be copy inside initRequest
    handler->initRequest("Handle"); 
} 

Listing4. Initialization of Handle request

Next, we need to create a class named HandleContext, which will inherit from RequestHandler and override the missing functions using the template method design pattern.

void HandleContext::finish(const json& reponse) { 
    Response response; 
    *response.mutable_response() = reponse.str(); 

    if (_cancelled) { 
        responder.Finish(response, 
                         grpc::Status::CANCELLED, 
                         (void*)&finishTag); 
    } else { 
        responder.Finish(response, 
                         grpc::Status::OK,
                         (void*)&finishTag); 
    } 
} 

void HandleContext::createNewHandler() { 
    auto handler = 
    	std::make_shared<HandleContext>(
	    _cq, callback, service); 

    handler->initRequest(); 
} 

void HandleContext::init(HandlerTag& pendingTag) { 
    service->RequestHandle(&context, 
                           &_request, 
                           &_responder, 
                           cq, 
                           cq, 
                           (void*)&pendingTag); 
} 

Listing5. Override method by HandleContext class

When we decide to add a new gRPC method, we can easily create a new class and override these three functions, thanks to the template method pattern. Now, we need to implement the HandlerTag function. This function should release the reference to the shared_ptr upon completion. Additionally, it must ensure that the same tag is not invoked twice, as doing so would constitute a bug that should never occur.

void HandlerTag::run(bool ok) { 
    if (std::exchange(_processed, true)) { 
        throw Exception(
		 "Tag with address {} alread processed!", 
                 (void*)this); 
    } 

    if (!_context) { 
        throw ("Run uninitialized tag! {}", (void*)this); 
    } 

    function(*context, ok); 
} 

RequestContext* HandlerTag::release() { 
    auto* context = _context.get(); 

    // reset shared_ptr will decrement reference counter 
    _context.reset(); 

    return context; 
} 

Listing 6. HandlerTag class

Once we make all implementation, we need to implement completion queue, that will support deadlines and other errors

bool shouldRetry (const grpc::StatusCode& ec) {
    return ec != grpc::StatusCode::CANCELLED &&
           ec != grpc::StatusCode::DEADLINE_EXCEEDED;
}

bool expired(const grpc::StatusCode& ec) {
    return ec == grpc::StatusCode::DEADLINE_EXCEEDED
}

void Handler::handle(grpc::ServerCompletionQueue& cq) { 
    void* tag; 
    bool ok; 

    while (cq.Next(&tag, &ok)) { 
        auto* handler = reinterpret_cast<HandlerTag*>(tag); 
        if (!handler->status().ok()) { 
            if (shouldRetry(req->status().error_code())) { 
                handler->retry();       
            } else if (expired(req->status().error_code())) { 
                handler->reportTimeout(); 
            } else { 
                handler->reportOtherError(); 
            } 
        } 

        handler->run(ok); 

        if (!handler->release()) { 
            throw Exception(
		"Release twice the same handler! {}", tag); 
        } 
    } 
} 

Listing 7. Completion queue loop for handling requests

Comparison of Different Approaches

Summarizing the three approaches discussed in previous articles, we can evaluate their pros and cons:

Global Map with Tags as Keys and Coroutines as Values

Advantages:

  • Straightforward to implement if a high-performance thread-safe map is available.
  • No need to manage memory manually, as coroutines encapsulate necessary data on the stack and restore it during resumption.

Disadvantages:

  • Performance depends heavily on the efficiency of the thread-safe map. A slow map can significantly impact coroutine retrieval. Optimization is possible by using separate maps for each completion queue, reducing contention, and improving access speed when multiple queues are used.

Self-Allocate/Deallocate Objects

Advantages:

  • High performance since only essential data is stored and manually freed after request processing.
  • Avoids overhead from reference counting or thread-safe mechanisms.

Disadvantages:

  • Increased complexity in implementation.
  • Risk of memory leaks if objects are not properly deallocated.
  • Harder to maintain and understand, making debugging and future development more challenging.

Reference Counter Objects

Advantages:

  • Uses std::shared_ptr for automatic memory management, simplifying lifetime handling.
  • Minimizes the risk of memory leaks when implemented correctly.
  • Easier to write and maintain compared to manual memory management.

Disadvantages:

  • Slower than manual object management due to the overhead of reference counting, which often relies on thread-safe locking mechanisms.

Choosing the Right Approach

The ideal approach depends on the specific requirements of your application:

  • For high-concurrency scenarios where managing thousands of requests simultaneously is critical, coroutines are preferable to minimize thread-switching overhead.
  • For less demanding code areas, simpler implementations like manual or reference-counted object management may suffice, making the code easier to maintain and understand.

Ultimately, balancing performance, complexity, and maintainability is key. Developers can select the most suitable approach based on the trade-offs and the stress levels of different parts of the system.

Conclusion

In this series, we explored strategies for enhancing the efficiency of streaming and request handling using gRPC and coroutines. This installment focused on designing an optimized server-side solution tailored for instances handling fewer requests. By leveraging std::shared_ptr for automatic memory management and introducing mechanisms to handle request interruptions, we tackled some of the core challenges in building resilient systems.

We implemented the Template Method pattern to standardize request handling while enabling extensibility for additional gRPC methods. This design simplifies the creation of new handlers, ensuring consistent logic across request types. Furthermore, the HandlerTag class effectively manages state transitions while preventing issues like double-processing or premature memory deallocation.

To ensure responsiveness, we addressed key challenges, including error handling, managing deadlines, and interrupting long-running requests. By adopting these solutions, we prevent blocking operations, maintain system responsiveness, and handle client cancellations smoothly without risking crashes or memory leaks.

With the addition of a robust completion queue and the ability to handle asynchronous workflows, we demonstrated how to efficiently manage thousands of requests while maintaining high system performance. These techniques ensure scalability, reliability, and maintainability, making them indispensable for real-world gRPC-based systems.

This comprehensive approach lays the groundwork for future extensions and adaptations, empowering developers to build efficient and reliable distributed systems.

Mateusz Adamski, Senior C++ developer at Synerise

Subscribe to Synerise AI/BigData Research

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe