• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/compression/message_compress.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 
28 #include <zlib.h>
29 
30 #include "src/core/lib/slice/slice_internal.h"
31 
32 #define OUTPUT_BLOCK_SIZE 1024
33 
zlib_body(z_stream * zs,grpc_slice_buffer * input,grpc_slice_buffer * output,int (* flate)(z_stream * zs,int flush))34 static int zlib_body(z_stream* zs, grpc_slice_buffer* input,
35                      grpc_slice_buffer* output,
36                      int (*flate)(z_stream* zs, int flush)) {
37   int r = Z_STREAM_END; /* Do not fail on an empty input. */
38   int flush;
39   size_t i;
40   grpc_slice outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE);
41   const uInt uint_max = ~static_cast<uInt>(0);
42 
43   GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max);
44   zs->avail_out = static_cast<uInt> GRPC_SLICE_LENGTH(outbuf);
45   zs->next_out = GRPC_SLICE_START_PTR(outbuf);
46   flush = Z_NO_FLUSH;
47   for (i = 0; i < input->count; i++) {
48     if (i == input->count - 1) flush = Z_FINISH;
49     GPR_ASSERT(GRPC_SLICE_LENGTH(input->slices[i]) <= uint_max);
50     zs->avail_in = static_cast<uInt> GRPC_SLICE_LENGTH(input->slices[i]);
51     zs->next_in = GRPC_SLICE_START_PTR(input->slices[i]);
52     do {
53       if (zs->avail_out == 0) {
54         grpc_slice_buffer_add_indexed(output, outbuf);
55         outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE);
56         GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max);
57         zs->avail_out = static_cast<uInt> GRPC_SLICE_LENGTH(outbuf);
58         zs->next_out = GRPC_SLICE_START_PTR(outbuf);
59       }
60       r = flate(zs, flush);
61       if (r < 0 && r != Z_BUF_ERROR /* not fatal */) {
62         gpr_log(GPR_INFO, "zlib error (%d)", r);
63         goto error;
64       }
65     } while (zs->avail_out == 0);
66     if (zs->avail_in) {
67       gpr_log(GPR_INFO, "zlib: not all input consumed");
68       goto error;
69     }
70   }
71   if (r != Z_STREAM_END) {
72     gpr_log(GPR_INFO, "zlib: Data error");
73     goto error;
74   }
75 
76   GPR_ASSERT(outbuf.refcount);
77   outbuf.data.refcounted.length -= zs->avail_out;
78   grpc_slice_buffer_add_indexed(output, outbuf);
79 
80   return 1;
81 
82 error:
83   grpc_slice_unref_internal(outbuf);
84   return 0;
85 }
86 
zalloc_gpr(void *,unsigned int items,unsigned int size)87 static void* zalloc_gpr(void* /*opaque*/, unsigned int items,
88                         unsigned int size) {
89   return gpr_malloc(items * size);
90 }
91 
zfree_gpr(void *,void * address)92 static void zfree_gpr(void* /*opaque*/, void* address) { gpr_free(address); }
93 
zlib_compress(grpc_slice_buffer * input,grpc_slice_buffer * output,int gzip)94 static int zlib_compress(grpc_slice_buffer* input, grpc_slice_buffer* output,
95                          int gzip) {
96   z_stream zs;
97   int r;
98   size_t i;
99   size_t count_before = output->count;
100   size_t length_before = output->length;
101   memset(&zs, 0, sizeof(zs));
102   zs.zalloc = zalloc_gpr;
103   zs.zfree = zfree_gpr;
104   r = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 | (gzip ? 16 : 0),
105                    8, Z_DEFAULT_STRATEGY);
106   GPR_ASSERT(r == Z_OK);
107   r = zlib_body(&zs, input, output, deflate) && output->length < input->length;
108   if (!r) {
109     for (i = count_before; i < output->count; i++) {
110       grpc_slice_unref_internal(output->slices[i]);
111     }
112     output->count = count_before;
113     output->length = length_before;
114   }
115   deflateEnd(&zs);
116   return r;
117 }
118 
zlib_decompress(grpc_slice_buffer * input,grpc_slice_buffer * output,int gzip)119 static int zlib_decompress(grpc_slice_buffer* input, grpc_slice_buffer* output,
120                            int gzip) {
121   z_stream zs;
122   int r;
123   size_t i;
124   size_t count_before = output->count;
125   size_t length_before = output->length;
126   memset(&zs, 0, sizeof(zs));
127   zs.zalloc = zalloc_gpr;
128   zs.zfree = zfree_gpr;
129   r = inflateInit2(&zs, 15 | (gzip ? 16 : 0));
130   GPR_ASSERT(r == Z_OK);
131   r = zlib_body(&zs, input, output, inflate);
132   if (!r) {
133     for (i = count_before; i < output->count; i++) {
134       grpc_slice_unref_internal(output->slices[i]);
135     }
136     output->count = count_before;
137     output->length = length_before;
138   }
139   inflateEnd(&zs);
140   return r;
141 }
142 
copy(grpc_slice_buffer * input,grpc_slice_buffer * output)143 static int copy(grpc_slice_buffer* input, grpc_slice_buffer* output) {
144   size_t i;
145   for (i = 0; i < input->count; i++) {
146     grpc_slice_buffer_add(output, grpc_slice_ref_internal(input->slices[i]));
147   }
148   return 1;
149 }
150 
compress_inner(grpc_message_compression_algorithm algorithm,grpc_slice_buffer * input,grpc_slice_buffer * output)151 static int compress_inner(grpc_message_compression_algorithm algorithm,
152                           grpc_slice_buffer* input, grpc_slice_buffer* output) {
153   switch (algorithm) {
154     case GRPC_MESSAGE_COMPRESS_NONE:
155       /* the fallback path always needs to be send uncompressed: we simply
156          rely on that here */
157       return 0;
158     case GRPC_MESSAGE_COMPRESS_DEFLATE:
159       return zlib_compress(input, output, 0);
160     case GRPC_MESSAGE_COMPRESS_GZIP:
161       return zlib_compress(input, output, 1);
162     case GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT:
163       break;
164   }
165   gpr_log(GPR_ERROR, "invalid compression algorithm %d", algorithm);
166   return 0;
167 }
168 
grpc_msg_compress(grpc_message_compression_algorithm algorithm,grpc_slice_buffer * input,grpc_slice_buffer * output)169 int grpc_msg_compress(grpc_message_compression_algorithm algorithm,
170                       grpc_slice_buffer* input, grpc_slice_buffer* output) {
171   if (!compress_inner(algorithm, input, output)) {
172     copy(input, output);
173     return 0;
174   }
175   return 1;
176 }
177 
grpc_msg_decompress(grpc_message_compression_algorithm algorithm,grpc_slice_buffer * input,grpc_slice_buffer * output)178 int grpc_msg_decompress(grpc_message_compression_algorithm algorithm,
179                         grpc_slice_buffer* input, grpc_slice_buffer* output) {
180   switch (algorithm) {
181     case GRPC_MESSAGE_COMPRESS_NONE:
182       return copy(input, output);
183     case GRPC_MESSAGE_COMPRESS_DEFLATE:
184       return zlib_decompress(input, output, 0);
185     case GRPC_MESSAGE_COMPRESS_GZIP:
186       return zlib_decompress(input, output, 1);
187     case GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT:
188       break;
189   }
190   gpr_log(GPR_ERROR, "invalid compression algorithm %d", algorithm);
191   return 0;
192 }
193