• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/support/client_interceptor.h>
20 
21 #include <map>
22 
23 #include "absl/log/check.h"
24 
25 #ifdef BAZEL_BUILD
26 #include "examples/protos/keyvaluestore.grpc.pb.h"
27 #else
28 #include "keyvaluestore.grpc.pb.h"
29 #endif
30 
31 // This is a naive implementation of a cache. A new cache is for each call. For
32 // each new key request, the key is first searched in the map and if found, the
33 // interceptor fills in the return value without making a request to the server.
34 // Only if the key is not found in the cache do we make a request.
35 class CachingInterceptor : public grpc::experimental::Interceptor {
36  public:
CachingInterceptor(grpc::experimental::ClientRpcInfo * info)37   CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {}
38 
Intercept(::grpc::experimental::InterceptorBatchMethods * methods)39   void Intercept(
40       ::grpc::experimental::InterceptorBatchMethods* methods) override {
41     bool hijack = false;
42     if (methods->QueryInterceptionHookPoint(
43             grpc::experimental::InterceptionHookPoints::
44                 PRE_SEND_INITIAL_METADATA)) {
45       // Hijack all calls
46       hijack = true;
47       // Create a stream on which this interceptor can make requests
48       stub_ = keyvaluestore::KeyValueStore::NewStub(
49           methods->GetInterceptedChannel());
50       stream_ = stub_->GetValues(&context_);
51     }
52     if (methods->QueryInterceptionHookPoint(
53             grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
54       // We know that clients perform a Read and a Write in a loop, so we don't
55       // need to maintain a list of the responses.
56       std::string requested_key;
57       const keyvaluestore::Request* req_msg =
58           static_cast<const keyvaluestore::Request*>(methods->GetSendMessage());
59       if (req_msg != nullptr) {
60         requested_key = req_msg->key();
61       } else {
62         // The non-serialized form would not be available in certain scenarios,
63         // so add a fallback
64         keyvaluestore::Request req_msg;
65         auto* buffer = methods->GetSerializedSendMessage();
66         auto copied_buffer = *buffer;
67         CHECK(grpc::SerializationTraits<keyvaluestore::Request>::Deserialize(
68                   &copied_buffer, &req_msg)
69                   .ok());
70         requested_key = req_msg.key();
71       }
72 
73       // Check if the key is present in the map
74       auto search = cached_map_.find(requested_key);
75       if (search != cached_map_.end()) {
76         std::cout << requested_key << " found in map" << std::endl;
77         response_ = search->second;
78       } else {
79         std::cout << requested_key << " not found in cache" << std::endl;
80         // Key was not found in the cache, so make a request
81         keyvaluestore::Request req;
82         req.set_key(requested_key);
83         stream_->Write(req);
84         keyvaluestore::Response resp;
85         stream_->Read(&resp);
86         response_ = resp.value();
87         // Insert the pair in the cache for future requests
88         cached_map_.insert({requested_key, response_});
89       }
90     }
91     if (methods->QueryInterceptionHookPoint(
92             grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) {
93       stream_->WritesDone();
94     }
95     if (methods->QueryInterceptionHookPoint(
96             grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) {
97       keyvaluestore::Response* resp =
98           static_cast<keyvaluestore::Response*>(methods->GetRecvMessage());
99       resp->set_value(response_);
100     }
101     if (methods->QueryInterceptionHookPoint(
102             grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) {
103       auto* status = methods->GetRecvStatus();
104       *status = grpc::Status::OK;
105     }
106     // One of Hijack or Proceed always needs to be called to make progress.
107     if (hijack) {
108       // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in
109       // the hook points
110       methods->Hijack();
111     } else {
112       // Proceed is an indicator that the interceptor is done intercepting the
113       // batch.
114       methods->Proceed();
115     }
116   }
117 
118  private:
119   grpc::ClientContext context_;
120   std::unique_ptr<keyvaluestore::KeyValueStore::Stub> stub_;
121   std::unique_ptr<
122       grpc::ClientReaderWriter<keyvaluestore::Request, keyvaluestore::Response>>
123       stream_;
124   std::map<std::string, std::string> cached_map_;
125   std::string response_;
126 };
127 
128 class CachingInterceptorFactory
129     : public grpc::experimental::ClientInterceptorFactoryInterface {
130  public:
CreateClientInterceptor(grpc::experimental::ClientRpcInfo * info)131   grpc::experimental::Interceptor* CreateClientInterceptor(
132       grpc::experimental::ClientRpcInfo* info) override {
133     return new CachingInterceptor(info);
134   }
135 };
136