1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpcpp/support/client_interceptor.h> 20 21 #include <map> 22 23 #include "absl/log/check.h" 24 25 #ifdef BAZEL_BUILD 26 #include "examples/protos/keyvaluestore.grpc.pb.h" 27 #else 28 #include "keyvaluestore.grpc.pb.h" 29 #endif 30 31 // This is a naive implementation of a cache. A new cache is for each call. For 32 // each new key request, the key is first searched in the map and if found, the 33 // interceptor fills in the return value without making a request to the server. 34 // Only if the key is not found in the cache do we make a request. 35 class CachingInterceptor : public grpc::experimental::Interceptor { 36 public: CachingInterceptor(grpc::experimental::ClientRpcInfo * info)37 CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {} 38 Intercept(::grpc::experimental::InterceptorBatchMethods * methods)39 void Intercept( 40 ::grpc::experimental::InterceptorBatchMethods* methods) override { 41 bool hijack = false; 42 if (methods->QueryInterceptionHookPoint( 43 grpc::experimental::InterceptionHookPoints:: 44 PRE_SEND_INITIAL_METADATA)) { 45 // Hijack all calls 46 hijack = true; 47 // Create a stream on which this interceptor can make requests 48 stub_ = keyvaluestore::KeyValueStore::NewStub( 49 methods->GetInterceptedChannel()); 50 stream_ = stub_->GetValues(&context_); 51 } 52 if (methods->QueryInterceptionHookPoint( 53 grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { 54 // We know that clients perform a Read and a Write in a loop, so we don't 55 // need to maintain a list of the responses. 56 std::string requested_key; 57 const keyvaluestore::Request* req_msg = 58 static_cast<const keyvaluestore::Request*>(methods->GetSendMessage()); 59 if (req_msg != nullptr) { 60 requested_key = req_msg->key(); 61 } else { 62 // The non-serialized form would not be available in certain scenarios, 63 // so add a fallback 64 keyvaluestore::Request req_msg; 65 auto* buffer = methods->GetSerializedSendMessage(); 66 auto copied_buffer = *buffer; 67 CHECK(grpc::SerializationTraits<keyvaluestore::Request>::Deserialize( 68 &copied_buffer, &req_msg) 69 .ok()); 70 requested_key = req_msg.key(); 71 } 72 73 // Check if the key is present in the map 74 auto search = cached_map_.find(requested_key); 75 if (search != cached_map_.end()) { 76 std::cout << requested_key << " found in map" << std::endl; 77 response_ = search->second; 78 } else { 79 std::cout << requested_key << " not found in cache" << std::endl; 80 // Key was not found in the cache, so make a request 81 keyvaluestore::Request req; 82 req.set_key(requested_key); 83 stream_->Write(req); 84 keyvaluestore::Response resp; 85 stream_->Read(&resp); 86 response_ = resp.value(); 87 // Insert the pair in the cache for future requests 88 cached_map_.insert({requested_key, response_}); 89 } 90 } 91 if (methods->QueryInterceptionHookPoint( 92 grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { 93 stream_->WritesDone(); 94 } 95 if (methods->QueryInterceptionHookPoint( 96 grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) { 97 keyvaluestore::Response* resp = 98 static_cast<keyvaluestore::Response*>(methods->GetRecvMessage()); 99 resp->set_value(response_); 100 } 101 if (methods->QueryInterceptionHookPoint( 102 grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) { 103 auto* status = methods->GetRecvStatus(); 104 *status = grpc::Status::OK; 105 } 106 // One of Hijack or Proceed always needs to be called to make progress. 107 if (hijack) { 108 // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in 109 // the hook points 110 methods->Hijack(); 111 } else { 112 // Proceed is an indicator that the interceptor is done intercepting the 113 // batch. 114 methods->Proceed(); 115 } 116 } 117 118 private: 119 grpc::ClientContext context_; 120 std::unique_ptr<keyvaluestore::KeyValueStore::Stub> stub_; 121 std::unique_ptr< 122 grpc::ClientReaderWriter<keyvaluestore::Request, keyvaluestore::Response>> 123 stream_; 124 std::map<std::string, std::string> cached_map_; 125 std::string response_; 126 }; 127 128 class CachingInterceptorFactory 129 : public grpc::experimental::ClientInterceptorFactoryInterface { 130 public: CreateClientInterceptor(grpc::experimental::ClientRpcInfo * info)131 grpc::experimental::Interceptor* CreateClientInterceptor( 132 grpc::experimental::ClientRpcInfo* info) override { 133 return new CachingInterceptor(info); 134 } 135 }; 136