• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.eventstream;
2 
3 import org.junit.jupiter.api.Test;
4 
5 import java.io.ByteArrayOutputStream;
6 import java.nio.ByteBuffer;
7 import java.util.ArrayList;
8 import java.util.Collections;
9 import java.util.List;
10 import java.util.Random;
11 import java.util.stream.Collectors;
12 import java.util.stream.IntStream;
13 
14 import static java.util.Collections.emptyList;
15 import static org.junit.jupiter.api.Assertions.assertEquals;
16 import static org.junit.jupiter.api.Assertions.assertTrue;
17 
18 public class MessageDecoderTest {
19     long SEED = 8912374098123423L;
20 
21     @Test
testDecoder()22     public void testDecoder() throws Exception {
23         TestUtils utils = new TestUtils(SEED);
24         Random rand = new Random(SEED);
25         List<Message> expected = IntStream.range(0, 10_000)
26                                           .mapToObj(x -> utils.randomMessage())
27                                           .collect(Collectors.toList());
28         ByteArrayOutputStream baos = new ByteArrayOutputStream();
29         expected.forEach(x -> x.encode(baos));
30         ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
31 
32         List<Message> actual = new ArrayList<>();
33         MessageDecoder decoder = new MessageDecoder(actual::add);
34         while (buf.remaining() > 0) {
35             int bufSize = Math.min(1 + rand.nextInt(1024), buf.remaining());
36             byte[] bs = new byte[bufSize];
37             buf.get(bs);
38             decoder.feed(bs);
39         }
40 
41         assertEquals(expected, actual);
42     }
43 
44     @Test
testDecoder_WithOffset()45     public void testDecoder_WithOffset() throws Exception {
46         TestUtils utils = new TestUtils(SEED);
47         Random rand = new Random(SEED);
48         List<Message> expected = IntStream.range(0, 10_000)
49                                           .mapToObj(x -> utils.randomMessage())
50                                           .collect(Collectors.toList());
51         ByteArrayOutputStream baos = new ByteArrayOutputStream();
52         expected.forEach(x -> x.encode(baos));
53         byte[] data = baos.toByteArray();
54         int toRead = data.length;
55         int read = 0;
56 
57         List<Message> actual = new ArrayList<>();
58         MessageDecoder decoder = new MessageDecoder(actual::add);
59         while (toRead > 0) {
60             int length = rand.nextInt(100);
61             if (read + length > data.length) {
62                 length = data.length - read;
63             }
64             decoder.feed(data, read, length);
65             read += length;
66             toRead -= length;
67         }
68 
69         assertEquals(expected, actual);
70     }
71 
72     @Test
preludeFedFirst_DecodesCorrectly()73     public void preludeFedFirst_DecodesCorrectly() {
74         TestUtils utils = new TestUtils(SEED);
75         Message message = utils.randomMessage();
76         int messageSize = message.toByteBuffer().remaining();
77         List<Message> expected = Collections.singletonList(message);
78 
79         ByteArrayOutputStream baos = new ByteArrayOutputStream();
80         expected.forEach(x -> x.encode(baos));
81         ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
82 
83         List<Message> actual = new ArrayList<>();
84         MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
85 
86         // Feed just the prelude in it's entirety
87         byte[] bs = new byte[15];
88         buf.get(bs);
89         decoder.feed(bs);
90 
91         // No messages should be decoded yet
92         assertEquals(emptyList(), actual);
93 
94         // Feed rest of message in it's entirety.
95         bs = new byte[messageSize - 15];
96         buf.get(bs);
97         decoder.feed(bs);
98 
99         // Should have successfully decoded the one message
100         assertEquals(1, actual.size());
101     }
102 
103     @Test
preludeFedInParts_DecodesCorrectly()104     public void preludeFedInParts_DecodesCorrectly() {
105         TestUtils utils = new TestUtils(SEED);
106         Message message = utils.randomMessage();
107         int messageSize = message.toByteBuffer().remaining();
108         List<Message> expected = Collections.singletonList(message);
109 
110         ByteArrayOutputStream baos = new ByteArrayOutputStream();
111         expected.forEach(x -> x.encode(baos));
112         ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
113 
114         List<Message> actual = new ArrayList<>();
115         MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
116 
117         // Feed the prelude in parts
118         byte[] bs = new byte[7];
119         buf.get(bs);
120         decoder.feed(bs);
121 
122         // Feed rest of prelude
123         bs = new byte[8];
124         buf.get(bs);
125         decoder.feed(bs);
126 
127         // No messages should be decoded yet
128         assertEquals(emptyList(), actual);
129 
130         // Feed rest of message in it's entirety.
131         bs = new byte[messageSize - 15];
132         buf.get(bs);
133         decoder.feed(bs);
134 
135         // Should have successfully decoded the one message
136         assertEquals(1, actual.size());
137     }
138 
139     @Test
bufferNeedsToGrow()140     public void bufferNeedsToGrow() {
141         TestUtils utils = new TestUtils(SEED);
142         Message message = utils.randomMessage(8192 * 2);
143         int messageSize = message.toByteBuffer().remaining();
144         List<Message> expected = Collections.singletonList(message);
145 
146         ByteArrayOutputStream baos = new ByteArrayOutputStream();
147         expected.forEach(x -> x.encode(baos));
148         ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
149 
150         List<Message> actual = new ArrayList<>();
151         MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
152 
153         // Feed all at once
154         byte[] bs = new byte[messageSize];
155         buf.get(bs);
156         decoder.feed(bs);
157 
158         // Should have successfully decoded the one message
159         assertEquals(1, actual.size());
160     }
161 
162     @Test
multipleMessagesDoesNotGrowBuffer()163     public void multipleMessagesDoesNotGrowBuffer() {
164         TestUtils utils = new TestUtils(SEED);
165         Message message = utils.randomMessage(4096);
166         List<Message> expected = IntStream.range(0, 100)
167                                           .mapToObj(x -> message)
168                                           .collect(Collectors.toList());
169 
170         ByteArrayOutputStream baos = new ByteArrayOutputStream();
171         expected.forEach(x -> x.encode(baos));
172         ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
173 
174         List<Message> actual = new ArrayList<>();
175         MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
176 
177         // Feed all at once
178         byte[] bs = new byte[buf.capacity()];
179         buf.get(bs);
180         decoder.feed(bs);
181 
182         assertEquals(expected, actual);
183         assertEquals(8192, decoder.currentBufferSize());
184     }
185 
186     @Test
multipleLargeMessages_GrowsBufferAsNeeded()187     public void multipleLargeMessages_GrowsBufferAsNeeded() {
188         TestUtils utils = new TestUtils(SEED);
189         Message message = utils.randomMessage(9001);
190         List<Message> expected = IntStream.range(0, 100)
191                                           .mapToObj(x -> message)
192                                           .collect(Collectors.toList());
193 
194         ByteArrayOutputStream baos = new ByteArrayOutputStream();
195         expected.forEach(x -> x.encode(baos));
196         ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
197 
198         List<Message> actual = new ArrayList<>();
199         MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
200 
201         // Feed all at once
202         byte[] bs = new byte[buf.capacity()];
203         buf.get(bs);
204         decoder.feed(bs);
205 
206         assertEquals(expected, actual);
207         assertTrue(decoder.currentBufferSize() > 9001);
208     }
209 }
210