1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Base class for interceptors that operate on all RPC types.""" 15 16import grpc 17 18 19class _GenericClientInterceptor( 20 grpc.UnaryUnaryClientInterceptor, 21 grpc.UnaryStreamClientInterceptor, 22 grpc.StreamUnaryClientInterceptor, 23 grpc.StreamStreamClientInterceptor, 24): 25 def __init__(self, interceptor_function): 26 self._fn = interceptor_function 27 28 def intercept_unary_unary(self, continuation, client_call_details, request): 29 new_details, new_request_iterator, postprocess = self._fn( 30 client_call_details, iter((request,)), False, False 31 ) 32 response = continuation(new_details, next(new_request_iterator)) 33 return postprocess(response) if postprocess else response 34 35 def intercept_unary_stream( 36 self, continuation, client_call_details, request 37 ): 38 new_details, new_request_iterator, postprocess = self._fn( 39 client_call_details, iter((request,)), False, True 40 ) 41 response_it = continuation(new_details, next(new_request_iterator)) 42 return postprocess(response_it) if postprocess else response_it 43 44 def intercept_stream_unary( 45 self, continuation, client_call_details, request_iterator 46 ): 47 new_details, new_request_iterator, postprocess = self._fn( 48 client_call_details, request_iterator, True, False 49 ) 50 response = continuation(new_details, new_request_iterator) 51 return postprocess(response) if postprocess else response 52 53 def intercept_stream_stream( 54 self, continuation, client_call_details, request_iterator 55 ): 56 new_details, new_request_iterator, postprocess = self._fn( 57 client_call_details, request_iterator, True, True 58 ) 59 response_it = continuation(new_details, new_request_iterator) 60 return postprocess(response_it) if postprocess else response_it 61 62 63def create(intercept_call): 64 return _GenericClientInterceptor(intercept_call) 65