1 package software.amazon.eventstream; 2 3 import org.junit.jupiter.api.Test; 4 5 import java.io.ByteArrayOutputStream; 6 import java.nio.ByteBuffer; 7 import java.util.ArrayList; 8 import java.util.Collections; 9 import java.util.List; 10 import java.util.Random; 11 import java.util.stream.Collectors; 12 import java.util.stream.IntStream; 13 14 import static java.util.Collections.emptyList; 15 import static org.junit.jupiter.api.Assertions.assertEquals; 16 import static org.junit.jupiter.api.Assertions.assertTrue; 17 18 public class MessageDecoderTest { 19 long SEED = 8912374098123423L; 20 21 @Test testDecoder()22 public void testDecoder() throws Exception { 23 TestUtils utils = new TestUtils(SEED); 24 Random rand = new Random(SEED); 25 List<Message> expected = IntStream.range(0, 10_000) 26 .mapToObj(x -> utils.randomMessage()) 27 .collect(Collectors.toList()); 28 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 29 expected.forEach(x -> x.encode(baos)); 30 ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); 31 32 List<Message> actual = new ArrayList<>(); 33 MessageDecoder decoder = new MessageDecoder(actual::add); 34 while (buf.remaining() > 0) { 35 int bufSize = Math.min(1 + rand.nextInt(1024), buf.remaining()); 36 byte[] bs = new byte[bufSize]; 37 buf.get(bs); 38 decoder.feed(bs); 39 } 40 41 assertEquals(expected, actual); 42 } 43 44 @Test testDecoder_WithOffset()45 public void testDecoder_WithOffset() throws Exception { 46 TestUtils utils = new TestUtils(SEED); 47 Random rand = new Random(SEED); 48 List<Message> expected = IntStream.range(0, 10_000) 49 .mapToObj(x -> utils.randomMessage()) 50 .collect(Collectors.toList()); 51 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 52 expected.forEach(x -> x.encode(baos)); 53 byte[] data = baos.toByteArray(); 54 int toRead = data.length; 55 int read = 0; 56 57 List<Message> actual = new ArrayList<>(); 58 MessageDecoder decoder = new MessageDecoder(actual::add); 59 while (toRead > 0) { 60 int length = rand.nextInt(100); 61 if (read + length > data.length) { 62 length = data.length - read; 63 } 64 decoder.feed(data, read, length); 65 read += length; 66 toRead -= length; 67 } 68 69 assertEquals(expected, actual); 70 } 71 72 @Test preludeFedFirst_DecodesCorrectly()73 public void preludeFedFirst_DecodesCorrectly() { 74 TestUtils utils = new TestUtils(SEED); 75 Message message = utils.randomMessage(); 76 int messageSize = message.toByteBuffer().remaining(); 77 List<Message> expected = Collections.singletonList(message); 78 79 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 80 expected.forEach(x -> x.encode(baos)); 81 ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); 82 83 List<Message> actual = new ArrayList<>(); 84 MessageDecoder decoder = new MessageDecoder(actual::add, 8192); 85 86 // Feed just the prelude in it's entirety 87 byte[] bs = new byte[15]; 88 buf.get(bs); 89 decoder.feed(bs); 90 91 // No messages should be decoded yet 92 assertEquals(emptyList(), actual); 93 94 // Feed rest of message in it's entirety. 95 bs = new byte[messageSize - 15]; 96 buf.get(bs); 97 decoder.feed(bs); 98 99 // Should have successfully decoded the one message 100 assertEquals(1, actual.size()); 101 } 102 103 @Test preludeFedInParts_DecodesCorrectly()104 public void preludeFedInParts_DecodesCorrectly() { 105 TestUtils utils = new TestUtils(SEED); 106 Message message = utils.randomMessage(); 107 int messageSize = message.toByteBuffer().remaining(); 108 List<Message> expected = Collections.singletonList(message); 109 110 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 111 expected.forEach(x -> x.encode(baos)); 112 ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); 113 114 List<Message> actual = new ArrayList<>(); 115 MessageDecoder decoder = new MessageDecoder(actual::add, 8192); 116 117 // Feed the prelude in parts 118 byte[] bs = new byte[7]; 119 buf.get(bs); 120 decoder.feed(bs); 121 122 // Feed rest of prelude 123 bs = new byte[8]; 124 buf.get(bs); 125 decoder.feed(bs); 126 127 // No messages should be decoded yet 128 assertEquals(emptyList(), actual); 129 130 // Feed rest of message in it's entirety. 131 bs = new byte[messageSize - 15]; 132 buf.get(bs); 133 decoder.feed(bs); 134 135 // Should have successfully decoded the one message 136 assertEquals(1, actual.size()); 137 } 138 139 @Test bufferNeedsToGrow()140 public void bufferNeedsToGrow() { 141 TestUtils utils = new TestUtils(SEED); 142 Message message = utils.randomMessage(8192 * 2); 143 int messageSize = message.toByteBuffer().remaining(); 144 List<Message> expected = Collections.singletonList(message); 145 146 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 147 expected.forEach(x -> x.encode(baos)); 148 ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); 149 150 List<Message> actual = new ArrayList<>(); 151 MessageDecoder decoder = new MessageDecoder(actual::add, 8192); 152 153 // Feed all at once 154 byte[] bs = new byte[messageSize]; 155 buf.get(bs); 156 decoder.feed(bs); 157 158 // Should have successfully decoded the one message 159 assertEquals(1, actual.size()); 160 } 161 162 @Test multipleMessagesDoesNotGrowBuffer()163 public void multipleMessagesDoesNotGrowBuffer() { 164 TestUtils utils = new TestUtils(SEED); 165 Message message = utils.randomMessage(4096); 166 List<Message> expected = IntStream.range(0, 100) 167 .mapToObj(x -> message) 168 .collect(Collectors.toList()); 169 170 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 171 expected.forEach(x -> x.encode(baos)); 172 ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); 173 174 List<Message> actual = new ArrayList<>(); 175 MessageDecoder decoder = new MessageDecoder(actual::add, 8192); 176 177 // Feed all at once 178 byte[] bs = new byte[buf.capacity()]; 179 buf.get(bs); 180 decoder.feed(bs); 181 182 assertEquals(expected, actual); 183 assertEquals(8192, decoder.currentBufferSize()); 184 } 185 186 @Test multipleLargeMessages_GrowsBufferAsNeeded()187 public void multipleLargeMessages_GrowsBufferAsNeeded() { 188 TestUtils utils = new TestUtils(SEED); 189 Message message = utils.randomMessage(9001); 190 List<Message> expected = IntStream.range(0, 100) 191 .mapToObj(x -> message) 192 .collect(Collectors.toList()); 193 194 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 195 expected.forEach(x -> x.encode(baos)); 196 ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); 197 198 List<Message> actual = new ArrayList<>(); 199 MessageDecoder decoder = new MessageDecoder(actual::add, 8192); 200 201 // Feed all at once 202 byte[] bs = new byte[buf.capacity()]; 203 buf.get(bs); 204 decoder.feed(bs); 205 206 assertEquals(expected, actual); 207 assertTrue(decoder.currentBufferSize() > 9001); 208 } 209 } 210