• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/* eslint-env browser */
16
17import {
18  Channel,
19  Client,
20  decode,
21  MethodStub,
22  ServiceClient,
23} from 'pigweedjs/pw_rpc';
24import { Status } from 'pigweedjs/pw_status';
25import {
26  PacketType,
27  RpcPacket,
28} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
29import { ProtoCollection } from 'pigweedjs/protos/collection';
30import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb';
31
32import { Manager } from './client';
33import { ProgressStats } from './transfer';
34
35const DEFAULT_TIMEOUT_S = 0.3;
36
37describe('Transfer client', () => {
38  const textEncoder = new TextEncoder();
39  const textDecoder = new TextDecoder();
40  let client: Client;
41  let service: ServiceClient;
42  let sentChunks: Chunk[];
43  let packetsToSend: Uint8Array[][];
44
45  beforeEach(() => {
46    const lib = new ProtoCollection();
47    const channels: Channel[] = [new Channel(1, handleRequest)];
48    client = Client.fromProtoSet(channels, lib);
49    service = client.channel(1)!.service('pw.transfer.Transfer')!;
50
51    sentChunks = [];
52    packetsToSend = [];
53  });
54
55  function handleRequest(data: Uint8Array): void {
56    const packet = decode(data);
57    if (packet.getType() !== PacketType.CLIENT_STREAM) {
58      return;
59    }
60
61    const chunk = Chunk.deserializeBinary(packet.getPayload_asU8());
62    sentChunks.push(chunk);
63
64    if (packetsToSend.length > 0) {
65      const responses = packetsToSend.shift()!;
66      for (const response of responses) {
67        client.processPacket(response);
68      }
69    }
70  }
71
72  function receivedData(): Uint8Array {
73    let length = 0;
74    sentChunks.forEach((chunk: Chunk) => {
75      length += chunk.getData().length;
76    });
77    const data = new Uint8Array(length);
78    let offset = 0;
79    sentChunks.forEach((chunk: Chunk) => {
80      data.set(chunk.getData() as Uint8Array, offset);
81      offset += chunk.getData().length;
82    });
83    return data;
84  }
85
86  function enqueueServerError(method: MethodStub, error: Status): void {
87    const packet = new RpcPacket();
88    packet.setType(PacketType.SERVER_ERROR);
89    packet.setChannelId(1);
90    packet.setServiceId(service.id);
91    packet.setMethodId(method.id);
92    packet.setCallId(method.rpcs.nextCallId);
93    packet.setStatus(error);
94    packetsToSend.push([packet.serializeBinary()]);
95  }
96
97  function enqueueServerResponses(method: MethodStub, responses: Chunk[][]) {
98    for (const responseGroup of responses) {
99      const serializedGroup = [];
100      for (const response of responseGroup) {
101        const packet = new RpcPacket();
102        packet.setType(PacketType.SERVER_STREAM);
103        packet.setChannelId(1);
104        packet.setServiceId(service.id);
105        packet.setMethodId(method.id);
106        packet.setCallId(method.rpcs.nextCallId);
107        packet.setStatus(Status.OK);
108        packet.setPayload(response.serializeBinary());
109        serializedGroup.push(packet.serializeBinary());
110      }
111      packetsToSend.push(serializedGroup);
112    }
113  }
114
115  function buildChunk(
116    sessionId: number,
117    offset: number,
118    data: string,
119    remainingBytes: number,
120  ): Chunk {
121    const chunk = new Chunk();
122    chunk.setTransferId(sessionId);
123    chunk.setOffset(offset);
124    chunk.setData(textEncoder.encode(data));
125    chunk.setRemainingBytes(remainingBytes);
126    return chunk;
127  }
128
129  it('read transfer basic', async () => {
130    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
131
132    const chunk1 = buildChunk(3, 0, 'abc', 0);
133    enqueueServerResponses(service.method('Read')!, [[chunk1]]);
134
135    const data = await manager.read(3);
136    expect(textDecoder.decode(data)).toEqual('abc');
137    expect(sentChunks).toHaveLength(2);
138    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true);
139    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
140  });
141
142  it('read transfer multichunk', async () => {
143    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
144
145    const chunk1 = buildChunk(3, 0, 'abc', 3);
146    const chunk2 = buildChunk(3, 3, 'def', 0);
147    enqueueServerResponses(service.method('Read')!, [[chunk1, chunk2]]);
148
149    const data = await manager.read(3);
150    expect(data).toEqual(textEncoder.encode('abcdef'));
151    expect(sentChunks).toHaveLength(2);
152    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true);
153    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
154  });
155
156  it('read transfer progress callback', async () => {
157    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
158
159    const chunk1 = buildChunk(3, 0, 'abc', 3);
160    const chunk2 = buildChunk(3, 3, 'def', 0);
161    enqueueServerResponses(service.method('Read')!, [[chunk1, chunk2]]);
162
163    const progress: Array<ProgressStats> = [];
164
165    const data = await manager.read(3, (stats: ProgressStats) => {
166      progress.push(stats);
167    });
168    expect(textDecoder.decode(data)).toEqual('abcdef');
169    expect(sentChunks).toHaveLength(2);
170    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true);
171    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
172
173    expect(progress).toEqual([
174      new ProgressStats(3, 3, 6),
175      new ProgressStats(6, 6, 6),
176    ]);
177  });
178
179  it('read transfer retry bad offset', async () => {
180    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
181
182    const chunk1 = buildChunk(3, 0, '123', 6);
183    const chunk2 = buildChunk(3, 1, '456', 3); // Incorrect offset; expecting 3
184    const chunk3 = buildChunk(3, 3, '456', 3);
185    const chunk4 = buildChunk(3, 6, '789', 0);
186
187    enqueueServerResponses(service.method('Read')!, [
188      [chunk1, chunk2],
189      [chunk3, chunk4],
190    ]);
191
192    const data = await manager.read(3);
193    expect(data).toEqual(textEncoder.encode('123456789'));
194    expect(sentChunks).toHaveLength(3);
195    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true);
196    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
197  });
198
199  it('read transfer retry timeout', async () => {
200    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
201
202    const chunk = buildChunk(3, 0, 'xyz', 0);
203    enqueueServerResponses(service.method('Read')!, [[], [chunk]]);
204
205    const data = await manager.read(3);
206    expect(textDecoder.decode(data)).toEqual('xyz');
207
208    // Two transfer parameter requests should have been sent.
209    expect(sentChunks).toHaveLength(3);
210    expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true);
211    expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
212  });
213
214  it('read transfer timeout', async () => {
215    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
216
217    await manager
218      .read(27)
219      .then(() => {
220        fail('Unexpected completed promise');
221      })
222      .catch((error) => {
223        expect(error.id).toEqual(27);
224        expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
225        expect(sentChunks).toHaveLength(4);
226      });
227  });
228
229  it('read transfer error', async () => {
230    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
231
232    const chunk = new Chunk();
233    chunk.setStatus(Status.NOT_FOUND);
234    chunk.setTransferId(31);
235    enqueueServerResponses(service.method('Read')!, [[chunk]]);
236
237    await manager
238      .read(31)
239      .then(() => {
240        fail('Unexpected completed promise');
241      })
242      .catch((error) => {
243        expect(error.id).toEqual(31);
244        expect(Status[error.status]).toEqual(Status[Status.NOT_FOUND]);
245      });
246  });
247
248  it('read transfer server error', async () => {
249    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
250
251    enqueueServerError(service.method('Read')!, Status.NOT_FOUND);
252    await manager
253      .read(31)
254      .then((data) => {
255        fail('Unexpected completed promise');
256      })
257      .catch((error) => {
258        expect(error.id).toEqual(31);
259        expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
260      });
261  });
262
263  it('write transfer basic', async () => {
264    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
265
266    const chunk = new Chunk();
267    chunk.setTransferId(4);
268    chunk.setOffset(0);
269    chunk.setPendingBytes(32);
270    chunk.setMaxChunkSizeBytes(8);
271
272    const completeChunk = new Chunk();
273    completeChunk.setTransferId(4);
274    completeChunk.setStatus(Status.OK);
275
276    enqueueServerResponses(service.method('Write')!, [
277      [chunk],
278      [completeChunk],
279    ]);
280
281    await manager.write(4, textEncoder.encode('hello'));
282    expect(sentChunks).toHaveLength(2);
283    expect(receivedData()).toEqual(textEncoder.encode('hello'));
284  });
285
286  it('write transfer max chunk size', async () => {
287    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
288
289    const chunk = new Chunk();
290    chunk.setTransferId(4);
291    chunk.setOffset(0);
292    chunk.setPendingBytes(32);
293    chunk.setMaxChunkSizeBytes(8);
294
295    const completeChunk = new Chunk();
296    completeChunk.setTransferId(4);
297    completeChunk.setStatus(Status.OK);
298
299    enqueueServerResponses(service.method('Write')!, [
300      [chunk],
301      [completeChunk],
302    ]);
303
304    await manager.write(4, textEncoder.encode('hello world'));
305    expect(sentChunks).toHaveLength(3);
306    expect(receivedData()).toEqual(textEncoder.encode('hello world'));
307    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('hello wo'));
308    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('rld'));
309  });
310
311  it('write transfer multiple parameters', async () => {
312    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
313
314    const chunk = new Chunk();
315    chunk.setTransferId(4);
316    chunk.setOffset(0);
317    chunk.setPendingBytes(8);
318    chunk.setMaxChunkSizeBytes(8);
319
320    const chunk2 = new Chunk();
321    chunk2.setTransferId(4);
322    chunk2.setOffset(8);
323    chunk2.setPendingBytes(8);
324    chunk2.setMaxChunkSizeBytes(8);
325
326    const completeChunk = new Chunk();
327    completeChunk.setTransferId(4);
328    completeChunk.setStatus(Status.OK);
329
330    enqueueServerResponses(service.method('Write')!, [
331      [chunk],
332      [chunk2],
333      [completeChunk],
334    ]);
335
336    await manager.write(4, textEncoder.encode('data to write'));
337    expect(sentChunks).toHaveLength(3);
338    expect(receivedData()).toEqual(textEncoder.encode('data to write'));
339    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('data to '));
340    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('write'));
341  });
342
343  it('write transfer parameters update', async () => {
344    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
345
346    const chunk = new Chunk();
347    chunk.setTransferId(4);
348    chunk.setOffset(0);
349    chunk.setPendingBytes(8);
350    chunk.setMaxChunkSizeBytes(4);
351    chunk.setType(Chunk.Type.PARAMETERS_RETRANSMIT);
352    chunk.setWindowEndOffset(8);
353
354    const chunk2 = new Chunk();
355    chunk2.setTransferId(4);
356    chunk2.setOffset(4);
357    chunk2.setPendingBytes(8);
358    chunk2.setType(Chunk.Type.PARAMETERS_CONTINUE);
359    chunk2.setWindowEndOffset(12);
360
361    const chunk3 = new Chunk();
362    chunk3.setTransferId(4);
363    chunk3.setOffset(8);
364    chunk3.setPendingBytes(8);
365    chunk3.setType(Chunk.Type.PARAMETERS_CONTINUE);
366    chunk3.setWindowEndOffset(16);
367
368    const chunk4 = new Chunk();
369    chunk4.setTransferId(4);
370    chunk4.setOffset(12);
371    chunk4.setPendingBytes(8);
372    chunk4.setType(Chunk.Type.PARAMETERS_CONTINUE);
373    chunk4.setWindowEndOffset(20);
374
375    const chunk5 = new Chunk();
376    chunk5.setTransferId(4);
377    chunk5.setOffset(16);
378    chunk5.setPendingBytes(8);
379    chunk5.setType(Chunk.Type.PARAMETERS_CONTINUE);
380    chunk5.setWindowEndOffset(24);
381
382    const chunk6 = new Chunk();
383    chunk6.setTransferId(4);
384    chunk6.setOffset(20);
385    chunk6.setPendingBytes(8);
386    chunk6.setType(Chunk.Type.PARAMETERS_CONTINUE);
387    chunk6.setWindowEndOffset(28);
388
389    const completeChunk = new Chunk();
390    completeChunk.setTransferId(4);
391    completeChunk.setStatus(Status.OK);
392
393    enqueueServerResponses(service.method('Write')!, [
394      [chunk],
395      [chunk2],
396      [chunk3],
397      [chunk4],
398      [chunk5],
399      [chunk6],
400      [completeChunk],
401    ]);
402
403    await manager.write(4, textEncoder.encode('hello this is a message'));
404    expect(receivedData()).toEqual(
405      textEncoder.encode('hello this is a message'),
406    );
407    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('hell'));
408    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('o th'));
409    expect(sentChunks[3].getData()).toEqual(textEncoder.encode('is i'));
410    expect(sentChunks[4].getData()).toEqual(textEncoder.encode('s a '));
411    expect(sentChunks[5].getData()).toEqual(textEncoder.encode('mess'));
412    expect(sentChunks[6].getData()).toEqual(textEncoder.encode('age'));
413  });
414
415  it('write transfer progress callback', async () => {
416    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
417
418    const chunk = new Chunk();
419    chunk.setTransferId(4);
420    chunk.setOffset(0);
421    chunk.setPendingBytes(8);
422    chunk.setMaxChunkSizeBytes(8);
423
424    const chunk2 = new Chunk();
425    chunk2.setTransferId(4);
426    chunk2.setOffset(8);
427    chunk2.setPendingBytes(8);
428    chunk2.setMaxChunkSizeBytes(8);
429
430    const completeChunk = new Chunk();
431    completeChunk.setTransferId(4);
432    completeChunk.setStatus(Status.OK);
433
434    enqueueServerResponses(service.method('Write')!, [
435      [chunk],
436      [chunk2],
437      [completeChunk],
438    ]);
439
440    const progress: Array<ProgressStats> = [];
441    await manager.write(
442      4,
443      textEncoder.encode('data to write'),
444      (stats: ProgressStats) => {
445        progress.push(stats);
446      },
447    );
448    expect(sentChunks).toHaveLength(3);
449    expect(receivedData()).toEqual(textEncoder.encode('data to write'));
450    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('data to '));
451    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('write'));
452
453    console.log(progress);
454    expect(progress).toEqual([
455      new ProgressStats(8, 0, 13),
456      new ProgressStats(13, 8, 13),
457      new ProgressStats(13, 13, 13),
458    ]);
459  });
460
461  it('write transfer rewind', async () => {
462    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
463
464    const chunk1 = new Chunk();
465    chunk1.setTransferId(4);
466    chunk1.setOffset(0);
467    chunk1.setPendingBytes(8);
468    chunk1.setMaxChunkSizeBytes(8);
469
470    const chunk2 = new Chunk();
471    chunk2.setTransferId(4);
472    chunk2.setOffset(8);
473    chunk2.setPendingBytes(8);
474    chunk2.setMaxChunkSizeBytes(8);
475
476    const chunk3 = new Chunk();
477    chunk3.setTransferId(4);
478    chunk3.setOffset(4); // Rewind
479    chunk3.setPendingBytes(8);
480    chunk3.setMaxChunkSizeBytes(8);
481
482    const chunk4 = new Chunk();
483    chunk4.setTransferId(4);
484    chunk4.setOffset(12); // Rewind
485    chunk4.setPendingBytes(16);
486    chunk4.setMaxChunkSizeBytes(16);
487
488    const completeChunk = new Chunk();
489    completeChunk.setTransferId(4);
490    completeChunk.setStatus(Status.OK);
491
492    enqueueServerResponses(service.method('Write')!, [
493      [chunk1],
494      [chunk2],
495      [chunk3],
496      [chunk4],
497      [completeChunk],
498    ]);
499
500    await manager.write(4, textEncoder.encode('pigweed data transfer'));
501    expect(sentChunks).toHaveLength(5);
502    expect(sentChunks[1].getData()).toEqual(textEncoder.encode('pigweed '));
503    expect(sentChunks[2].getData()).toEqual(textEncoder.encode('data tra'));
504    expect(sentChunks[3].getData()).toEqual(textEncoder.encode('eed data'));
505    expect(sentChunks[4].getData()).toEqual(textEncoder.encode(' transfer'));
506  });
507
508  it('write transfer bad offset', async () => {
509    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
510
511    const chunk1 = new Chunk();
512    chunk1.setTransferId(4);
513    chunk1.setOffset(0);
514    chunk1.setPendingBytes(8);
515    chunk1.setMaxChunkSizeBytes(8);
516
517    const chunk2 = new Chunk();
518    chunk2.setTransferId(4);
519    chunk2.setOffset(100); // larger offset than data
520    chunk2.setPendingBytes(8);
521    chunk2.setMaxChunkSizeBytes(8);
522
523    const completeChunk = new Chunk();
524    completeChunk.setTransferId(4);
525    completeChunk.setStatus(Status.OK);
526
527    enqueueServerResponses(service.method('Write')!, [
528      [chunk1],
529      [chunk2],
530      [completeChunk],
531    ]);
532
533    await manager
534      .write(4, textEncoder.encode('small data'))
535      .then(() => {
536        fail('Unexpected succesful promise');
537      })
538      .catch((error) => {
539        expect(error.id).toEqual(4);
540        expect(Status[error.status]).toEqual(Status[Status.OUT_OF_RANGE]);
541      });
542  });
543
544  it('write transfer error', async () => {
545    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
546
547    const chunk = new Chunk();
548    chunk.setTransferId(21);
549    chunk.setStatus(Status.UNAVAILABLE);
550
551    enqueueServerResponses(service.method('Write')!, [[chunk]]);
552
553    await manager
554      .write(21, textEncoder.encode('no write'))
555      .then(() => {
556        fail('Unexpected succesful promise');
557      })
558      .catch((error) => {
559        expect(error.id).toEqual(21);
560        expect(Status[error.status]).toEqual(Status[Status.UNAVAILABLE]);
561      });
562  });
563
564  it('write transfer server error', async () => {
565    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
566
567    const chunk = new Chunk();
568    chunk.setTransferId(21);
569    chunk.setStatus(Status.NOT_FOUND);
570
571    enqueueServerError(service.method('Write')!, Status.NOT_FOUND);
572
573    await manager
574      .write(21, textEncoder.encode('server error'))
575      .then(() => {
576        fail('Unexpected succesful promise');
577      })
578      .catch((error) => {
579        expect(error.id).toEqual(21);
580        expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
581      });
582  });
583
584  it('write transfer timeout after initial chunk', async () => {
585    const manager = new Manager(service, 0.001, 4, 2);
586
587    await manager
588      .write(22, textEncoder.encode('no server response!'))
589      .then(() => {
590        fail('unexpected succesful write');
591      })
592      .catch((error) => {
593        expect(sentChunks).toHaveLength(3); // Initial chunk + two retries.
594        expect(error.id).toEqual(22);
595        expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
596      });
597  });
598
599  it('write transfer timeout after intermediate chunk', async () => {
600    const manager = new Manager(service, DEFAULT_TIMEOUT_S, 4, 2);
601
602    const chunk = new Chunk();
603    chunk.setTransferId(22);
604    chunk.setPendingBytes(10);
605    chunk.setMaxChunkSizeBytes(5);
606
607    enqueueServerResponses(service.method('Write')!, [[chunk]]);
608
609    await manager
610      .write(22, textEncoder.encode('0123456789'))
611      .then(() => {
612        fail('unexpected succesful write');
613      })
614      .catch((error) => {
615        const expectedChunk1 = new Chunk();
616        expectedChunk1.setTransferId(22);
617        expectedChunk1.setResourceId(22);
618        expectedChunk1.setType(Chunk.Type.START);
619        const expectedChunk2 = new Chunk();
620        expectedChunk2.setTransferId(22);
621        expectedChunk2.setData(textEncoder.encode('01234'));
622        expectedChunk2.setType(Chunk.Type.DATA);
623        const lastChunk = new Chunk();
624        lastChunk.setTransferId(22);
625        lastChunk.setData(textEncoder.encode('56789'));
626        lastChunk.setOffset(5);
627        lastChunk.setRemainingBytes(0);
628        lastChunk.setType(Chunk.Type.DATA);
629
630        const expectedChunks = [
631          expectedChunk1,
632          expectedChunk2,
633          lastChunk,
634          lastChunk, // retry 1
635          lastChunk, // retry 2
636        ];
637
638        expect(sentChunks).toEqual(expectedChunks);
639
640        expect(error.id).toEqual(22);
641        expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
642      });
643  });
644
645  it('write zero pending bytes is internal error', async () => {
646    const manager = new Manager(service, DEFAULT_TIMEOUT_S);
647
648    const chunk = new Chunk();
649    chunk.setTransferId(23);
650    chunk.setPendingBytes(0);
651
652    enqueueServerResponses(service.method('Write')!, [[chunk]]);
653
654    await manager
655      .write(23, textEncoder.encode('no write'))
656      .then(() => {
657        fail('Unexpected succesful promise');
658      })
659      .catch((error) => {
660        expect(error.id).toEqual(23);
661        expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
662      });
663  });
664});
665