• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/* eslint-env browser, jasmine */
16import 'jasmine';
17
18import {
19  Channel,
20  Client,
21  decode,
22  MethodStub,
23  ServiceClient,
24} from '@pigweed/pw_rpc';
25import {Status} from '@pigweed/pw_status';
26import {
27  PacketType,
28  RpcPacket,
29} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb';
30import {ProtoCollection} from 'transfer_proto_collection/generated/ts_proto_collection';
31import {Chunk} from 'transfer_proto_tspb/transfer_proto_tspb_pb/pw_transfer/transfer_pb';
32
33import {Manager} from './client';
34import {ProgressStats} from './transfer';
35
36const DEFAULT_TIMEOUT_S = 0.3;
37
38describe('Encoder', () => {
39  const textEncoder = new TextEncoder();
40  const textDecoder = new TextDecoder();
41  let client: Client;
42  let service: ServiceClient;
43  let sentChunks: Chunk[];
44  let packetsToSend: Uint8Array[][];
45
46  beforeEach(() => {
47    const lib = new ProtoCollection();
48    const channels: Channel[] = [new Channel(1, handleRequest)];
49    client = Client.fromProtoSet(channels, lib);
50    service = client.channel(1)!.service('pw.transfer.Transfer')!;
51
52    sentChunks = [];
53    packetsToSend = [];
54  });
55
56  function handleRequest(data: Uint8Array): void {
57    const packet = decode(data);
58    if (packet.getType() !== PacketType.CLIENT_STREAM) {
59      return;
60    }
61
62    const chunk = Chunk.deserializeBinary(packet.getPayload_asU8());
63    sentChunks.push(chunk);
64
65    if (packetsToSend.length > 0) {
66      const responses = packetsToSend.shift()!;
67      for (const response of responses) {
68        client.processPacket(response);
69      }
70    }
71  }
72
73  function receivedData(): Uint8Array {
74    let length = 0;
75    sentChunks.forEach((chunk: Chunk) => {
76      length += chunk.getData().length;
77    });
78    const data = new Uint8Array(length);
79    let offset = 0;
80    sentChunks.forEach((chunk: Chunk) => {
81      data.set(chunk.getData() as Uint8Array, offset);
82      offset += chunk.getData().length;
83    });
84    return data;
85  }
86
87  function enqueueServerError(method: MethodStub, error: Status): void {
88    const packet = new RpcPacket();
89    packet.setType(PacketType.SERVER_ERROR);
90    packet.setChannelId(1);
91    packet.setServiceId(service.id);
92    packet.setMethodId(method.id);
93    packet.setStatus(error);
94    packetsToSend.push([packet.serializeBinary()]);
95  }
96
97  function enqueueServerResponses(method: MethodStub, responses: Chunk[][]) {
98    for (const responseGroup of responses) {
99      const serializedGroup = [];
100      for (const response of responseGroup) {
101        const packet = new RpcPacket();
102        packet.setType(PacketType.SERVER_STREAM);
103        packet.setChannelId(1);
104        packet.setServiceId(service.id);
105        packet.setMethodId(method.id);
106        packet.setStatus(Status.OK);
107        packet.setPayload(response.serializeBinary());
108        serializedGroup.push(packet.serializeBinary());
109      }
110      packetsToSend.push(serializedGroup);
111    }
112  }
113
114  function buildChunk(
115    transferId: number,
116    offset: number,
117    data: string,
118    remainingBytes: number
119  ): Chunk {
120    const chunk = new Chunk();
121    chunk.setTransferId(transferId);
122    chunk.setOffset(offset);
123    chunk.setData(textEncoder.encode(data));
124    chunk.setRemainingBytes(remainingBytes);
125    return chunk;
126  }
127
128  it('read transfer basic', async () => {
129    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
130
131    const chunk1 = buildChunk(3, 0, 'abc', 0);
132    enqueueServerResponses(service.method('Read')!, [[chunk1]]);
133
134    const data = await manager.read(3);
135    expect(textDecoder.decode(data)).toEqual('abc');
136    expect(sentChunks).toHaveSize(2);
137    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
138    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
139  });
140
141  it('read transfer multichunk', async () => {
142    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
143
144    const chunk1 = buildChunk(3, 0, 'abc', 3);
145    const chunk2 = buildChunk(3, 3, 'def', 0);
146    enqueueServerResponses(service.method('Read')!, [[chunk1, chunk2]]);
147
148    const data = await manager.read(3);
149    expect(data).toEqual(textEncoder.encode('abcdef'));
150    expect(sentChunks).toHaveSize(2);
151    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
152    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
153  });
154
155  it('read transfer progress callback', async () => {
156    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
157
158    const chunk1 = buildChunk(3, 0, 'abc', 3);
159    const chunk2 = buildChunk(3, 3, 'def', 0);
160    enqueueServerResponses(service.method('Read')!, [[chunk1, chunk2]]);
161
162    const progress: Array<ProgressStats> = [];
163
164    const data = await manager.read(3, (stats: ProgressStats) => {
165      progress.push(stats);
166    });
167    expect(textDecoder.decode(data)).toEqual('abcdef');
168    expect(sentChunks).toHaveSize(2);
169    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
170    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
171
172    expect(progress).toEqual([
173      new ProgressStats(3, 3, 6),
174      new ProgressStats(6, 6, 6),
175    ]);
176  });
177
178  it('read transfer retry bad offset', async () => {
179    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
180
181    const chunk1 = buildChunk(3, 0, '123', 6);
182    const chunk2 = buildChunk(3, 1, '456', 3); // Incorrect offset; expecting 3
183    const chunk3 = buildChunk(3, 3, '456', 3);
184    const chunk4 = buildChunk(3, 6, '789', 0);
185
186    enqueueServerResponses(service.method('Read')!, [
187      [chunk1, chunk2],
188      [chunk3, chunk4],
189    ]);
190
191    const data = await manager.read(3);
192    expect(data).toEqual(textEncoder.encode('123456789'));
193    expect(sentChunks).toHaveSize(3);
194    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
195    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
196  });
197
198  it('read transfer retry timeout', async () => {
199    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
200
201    const chunk = buildChunk(3, 0, 'xyz', 0);
202    enqueueServerResponses(service.method('Read')!, [[], [chunk]]);
203
204    const data = await manager.read(3);
205    expect(textDecoder.decode(data)).toEqual('xyz');
206
207    // Two transfer parameter requests should have been sent.
208    expect(sentChunks).toHaveSize(3);
209    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
210    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
211  });
212
213  it('read transfer timeout', async () => {
214    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
215
216    await manager
217      .read(27)
218      .then(() => {
219        fail('Unexpected completed promise');
220      })
221      .catch(error => {
222        expect(error.id).toEqual(27);
223        expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
224        expect(sentChunks).toHaveSize(4);
225      });
226  });
227
228  it('read transfer error', async () => {
229    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
230
231    const chunk = new Chunk();
232    chunk.setStatus(Status.NOT_FOUND);
233    chunk.setTransferId(31);
234    enqueueServerResponses(service.method('Read')!, [[chunk]]);
235
236    await manager
237      .read(31)
238      .then(() => {
239        fail('Unexpected completed promise');
240      })
241      .catch(error => {
242        expect(error.id).toEqual(31);
243        expect(Status[error.status]).toEqual(Status[Status.NOT_FOUND]);
244      });
245  });
246
247  it('read transfer server error', async () => {
248    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
249
250    enqueueServerError(service.method('Read')!, Status.NOT_FOUND);
251    await manager
252      .read(31)
253      .then(data => {
254        fail('Unexpected completed promise');
255      })
256      .catch(error => {
257        expect(error.id).toEqual(31);
258        expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
259      });
260  });
261
262  it('write transfer basic', async () => {
263    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
264
265    const chunk = new Chunk();
266    chunk.setTransferId(4);
267    chunk.setOffset(0);
268    chunk.setPendingBytes(32);
269    chunk.setMaxChunkSizeBytes(8);
270
271    const completeChunk = new Chunk();
272    completeChunk.setTransferId(4);
273    completeChunk.setStatus(Status.OK);
274
275    enqueueServerResponses(service.method('Write')!, [
276      [chunk],
277      [completeChunk],
278    ]);
279
280    await manager.write(4, textEncoder.encode('hello'));
281    expect(sentChunks).toHaveSize(2);
282    expect(receivedData()).toEqual(textEncoder.encode('hello'));
283  });
284
285  it('write transfer max chunk size', async () => {
286    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
287
288    const chunk = new Chunk();
289    chunk.setTransferId(4);
290    chunk.setOffset(0);
291    chunk.setPendingBytes(32);
292    chunk.setMaxChunkSizeBytes(8);
293
294    const completeChunk = new Chunk();
295    completeChunk.setTransferId(4);
296    completeChunk.setStatus(Status.OK);
297
298    enqueueServerResponses(service.method('Write')!, [
299      [chunk],
300      [completeChunk],
301    ]);
302
303    await manager.write(4, textEncoder.encode('hello world'));
304    expect(sentChunks).toHaveSize(3);
305    expect(receivedData()).toEqual(textEncoder.encode('hello world'));
306    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('hello wo'));
307    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('rld'));
308  });
309
310  it('write transfer multiple parameters', async () => {
311    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
312
313    const chunk = new Chunk();
314    chunk.setTransferId(4);
315    chunk.setOffset(0);
316    chunk.setPendingBytes(8);
317    chunk.setMaxChunkSizeBytes(8);
318
319    const chunk2 = new Chunk();
320    chunk2.setTransferId(4);
321    chunk2.setOffset(8);
322    chunk2.setPendingBytes(8);
323    chunk2.setMaxChunkSizeBytes(8);
324
325    const completeChunk = new Chunk();
326    completeChunk.setTransferId(4);
327    completeChunk.setStatus(Status.OK);
328
329    enqueueServerResponses(service.method('Write')!, [
330      [chunk],
331      [chunk2],
332      [completeChunk],
333    ]);
334
335    await manager.write(4, textEncoder.encode('data to write'));
336    expect(sentChunks).toHaveSize(3);
337    expect(receivedData()).toEqual(textEncoder.encode('data to write'));
338    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('data to '));
339    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('write'));
340  });
341
342  it('write transfer parameters update', async () => {
343    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
344
345    const chunk = new Chunk();
346    chunk.setTransferId(4);
347    chunk.setOffset(0);
348    chunk.setPendingBytes(8);
349    chunk.setMaxChunkSizeBytes(4);
350    chunk.setType(Chunk.Type.PARAMETERS_RETRANSMIT);
351    chunk.setWindowEndOffset(8);
352
353    const chunk2 = new Chunk();
354    chunk2.setTransferId(4);
355    chunk2.setOffset(4);
356    chunk2.setPendingBytes(8);
357    chunk2.setType(Chunk.Type.PARAMETERS_CONTINUE);
358    chunk2.setWindowEndOffset(12);
359
360    const chunk3 = new Chunk();
361    chunk3.setTransferId(4);
362    chunk3.setOffset(8);
363    chunk3.setPendingBytes(8);
364    chunk3.setType(Chunk.Type.PARAMETERS_CONTINUE);
365    chunk3.setWindowEndOffset(16);
366
367    const chunk4 = new Chunk();
368    chunk4.setTransferId(4);
369    chunk4.setOffset(12);
370    chunk4.setPendingBytes(8);
371    chunk4.setType(Chunk.Type.PARAMETERS_CONTINUE);
372    chunk4.setWindowEndOffset(20);
373
374    const chunk5 = new Chunk();
375    chunk5.setTransferId(4);
376    chunk5.setOffset(16);
377    chunk5.setPendingBytes(8);
378    chunk5.setType(Chunk.Type.PARAMETERS_CONTINUE);
379    chunk5.setWindowEndOffset(24);
380
381    const chunk6 = new Chunk();
382    chunk6.setTransferId(4);
383    chunk6.setOffset(20);
384    chunk6.setPendingBytes(8);
385    chunk6.setType(Chunk.Type.PARAMETERS_CONTINUE);
386    chunk6.setWindowEndOffset(28);
387
388    const completeChunk = new Chunk();
389    completeChunk.setTransferId(4);
390    completeChunk.setStatus(Status.OK);
391
392    enqueueServerResponses(service.method('Write')!, [
393      [chunk],
394      [chunk2],
395      [chunk3],
396      [chunk4],
397      [chunk5],
398      [chunk6],
399      [completeChunk],
400    ]);
401
402    await manager.write(4, textEncoder.encode('hello this is a message'));
403    expect(receivedData()).toEqual(
404      textEncoder.encode('hello this is a message')
405    );
406    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('hell'));
407    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('o th'));
408    expect(sentChunks[3].getData()).toEqual(textEncoder.encode('is i'));
409    expect(sentChunks[4].getData()).toEqual(textEncoder.encode('s a '));
410    expect(sentChunks[5].getData()).toEqual(textEncoder.encode('mess'));
411    expect(sentChunks[6].getData()).toEqual(textEncoder.encode('age'));
412  });
413
414  it('write transfer progress callback', async () => {
415    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
416
417    const chunk = new Chunk();
418    chunk.setTransferId(4);
419    chunk.setOffset(0);
420    chunk.setPendingBytes(8);
421    chunk.setMaxChunkSizeBytes(8);
422
423    const chunk2 = new Chunk();
424    chunk2.setTransferId(4);
425    chunk2.setOffset(8);
426    chunk2.setPendingBytes(8);
427    chunk2.setMaxChunkSizeBytes(8);
428
429    const completeChunk = new Chunk();
430    completeChunk.setTransferId(4);
431    completeChunk.setStatus(Status.OK);
432
433    enqueueServerResponses(service.method('Write')!, [
434      [chunk],
435      [chunk2],
436      [completeChunk],
437    ]);
438
439    const progress: Array<ProgressStats> = [];
440    await manager.write(
441      4,
442      textEncoder.encode('data to write'),
443      (stats: ProgressStats) => {
444        progress.push(stats);
445      }
446    );
447    expect(sentChunks).toHaveSize(3);
448    expect(receivedData()).toEqual(textEncoder.encode('data to write'));
449    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('data to '));
450    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('write'));
451
452    console.log(progress);
453    expect(progress).toEqual([
454      new ProgressStats(8, 0, 13),
455      new ProgressStats(13, 8, 13),
456      new ProgressStats(13, 13, 13),
457    ]);
458  });
459
460  it('write transfer rewind', async () => {
461    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
462
463    const chunk1 = new Chunk();
464    chunk1.setTransferId(4);
465    chunk1.setOffset(0);
466    chunk1.setPendingBytes(8);
467    chunk1.setMaxChunkSizeBytes(8);
468
469    const chunk2 = new Chunk();
470    chunk2.setTransferId(4);
471    chunk2.setOffset(8);
472    chunk2.setPendingBytes(8);
473    chunk2.setMaxChunkSizeBytes(8);
474
475    const chunk3 = new Chunk();
476    chunk3.setTransferId(4);
477    chunk3.setOffset(4); // Rewind
478    chunk3.setPendingBytes(8);
479    chunk3.setMaxChunkSizeBytes(8);
480
481    const chunk4 = new Chunk();
482    chunk4.setTransferId(4);
483    chunk4.setOffset(12); // Rewind
484    chunk4.setPendingBytes(16);
485    chunk4.setMaxChunkSizeBytes(16);
486
487    const completeChunk = new Chunk();
488    completeChunk.setTransferId(4);
489    completeChunk.setStatus(Status.OK);
490
491    enqueueServerResponses(service.method('Write')!, [
492      [chunk1],
493      [chunk2],
494      [chunk3],
495      [chunk4],
496      [completeChunk],
497    ]);
498
499    await manager.write(4, textEncoder.encode('pigweed data transfer'));
500    expect(sentChunks).toHaveSize(5);
501    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('pigweed '));
502    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('data tra'));
503    expect(sentChunks[3].getData()).toEqual(textEncoder.encode('eed data'));
504    expect(sentChunks[4].getData()).toEqual(textEncoder.encode(' transfer'));
505  });
506
507  it('write transfer bad offset', async () => {
508    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
509
510    const chunk1 = new Chunk();
511    chunk1.setTransferId(4);
512    chunk1.setOffset(0);
513    chunk1.setPendingBytes(8);
514    chunk1.setMaxChunkSizeBytes(8);
515
516    const chunk2 = new Chunk();
517    chunk2.setTransferId(4);
518    chunk2.setOffset(100); // larger offset than data
519    chunk2.setPendingBytes(8);
520    chunk2.setMaxChunkSizeBytes(8);
521
522    const completeChunk = new Chunk();
523    completeChunk.setTransferId(4);
524    completeChunk.setStatus(Status.OK);
525
526    enqueueServerResponses(service.method('Write')!, [
527      [chunk1],
528      [chunk2],
529      [completeChunk],
530    ]);
531
532    await manager
533      .write(4, textEncoder.encode('small data'))
534      .then(() => {
535        fail('Unexpected succesful promise');
536      })
537      .catch(error => {
538        expect(error.id).toEqual(4);
539        expect(Status[error.status]).toEqual(Status[Status.OUT_OF_RANGE]);
540      });
541  });
542
543  it('write transfer error', async () => {
544    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
545
546    const chunk = new Chunk();
547    chunk.setTransferId(21);
548    chunk.setStatus(Status.UNAVAILABLE);
549
550    enqueueServerResponses(service.method('Write')!, [[chunk]]);
551
552    await manager
553      .write(21, textEncoder.encode('no write'))
554      .then(() => {
555        fail('Unexpected succesful promise');
556      })
557      .catch(error => {
558        expect(error.id).toEqual(21);
559        expect(Status[error.status]).toEqual(Status[Status.UNAVAILABLE]);
560      });
561  });
562
563  it('write transfer server error', async () => {
564    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
565
566    const chunk = new Chunk();
567    chunk.setTransferId(21);
568    chunk.setStatus(Status.NOT_FOUND);
569
570    enqueueServerError(service.method('Write')!, Status.NOT_FOUND);
571
572    await manager
573      .write(21, textEncoder.encode('server error'))
574      .then(() => {
575        fail('Unexpected succesful promise');
576      })
577      .catch(error => {
578        expect(error.id).toEqual(21);
579        expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
580      });
581  });
582
583  it('write transfer timeout after initial chunk', async () => {
584    const manager = new Manager(service, 0.001, 4, 2);
585
586    await manager
587      .write(22, textEncoder.encode('no server response!'))
588      .then(() => {
589        fail('unexpected succesful write');
590      })
591      .catch(error => {
592        expect(sentChunks).toHaveSize(3); // Initial chunk + two retries.
593        expect(error.id).toEqual(22);
594        expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
595      });
596  });
597
598  it('write transfer timeout after intermediate chunk', async () => {
599    const manager = new Manager(service, DEFAULT_TIMEOUT_S, 4, 2);
600
601    const chunk = new Chunk();
602    chunk.setTransferId(22);
603    chunk.setPendingBytes(10);
604    chunk.setMaxChunkSizeBytes(5);
605
606    enqueueServerResponses(service.method('Write')!, [[chunk]]);
607
608    await manager
609      .write(22, textEncoder.encode('0123456789'))
610      .then(() => {
611        fail('unexpected succesful write');
612      })
613      .catch(error => {
614        const expectedChunk1 = new Chunk();
615        expectedChunk1.setTransferId(22);
616        expectedChunk1.setType(Chunk.Type.TRANSFER_START);
617        const expectedChunk2 = new Chunk();
618        expectedChunk2.setTransferId(22);
619        expectedChunk2.setData(textEncoder.encode('01234'));
620        expectedChunk2.setType(Chunk.Type.TRANSFER_DATA);
621        const lastChunk = new Chunk();
622        lastChunk.setTransferId(22);
623        lastChunk.setData(textEncoder.encode('56789'));
624        lastChunk.setOffset(5);
625        lastChunk.setRemainingBytes(0);
626        lastChunk.setType(Chunk.Type.TRANSFER_DATA);
627
628        const expectedChunks = [
629          expectedChunk1,
630          expectedChunk2,
631          lastChunk,
632          lastChunk, // retry 1
633          lastChunk, // retry 2
634        ];
635
636        expect(sentChunks).toEqual(expectedChunks);
637
638        expect(error.id).toEqual(22);
639        expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
640      });
641  });
642
643  it('write zero pending bytes is internal error', async () => {
644    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
645
646    const chunk = new Chunk();
647    chunk.setTransferId(23);
648    chunk.setPendingBytes(0);
649
650    enqueueServerResponses(service.method('Write')!, [[chunk]]);
651
652    await manager
653      .write(23, textEncoder.encode('no write'))
654      .then(() => {
655        fail('Unexpected succesful promise');
656      })
657      .catch(error => {
658        expect(error.id).toEqual(23);
659        expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
660      });
661  });
662});
663