1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15/* eslint-env browser */ 16 17import { 18 Channel, 19 Client, 20 decode, 21 MethodStub, 22 ServiceClient, 23} from 'pigweedjs/pw_rpc'; 24import { Status } from 'pigweedjs/pw_status'; 25import { 26 PacketType, 27 RpcPacket, 28} from 'pigweedjs/protos/pw_rpc/internal/packet_pb'; 29import { ProtoCollection } from 'pigweedjs/protos/collection'; 30import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb'; 31 32import { Manager } from './client'; 33import { ProgressStats } from './transfer'; 34 35const DEFAULT_TIMEOUT_S = 0.3; 36 37describe('Transfer client', () => { 38 const textEncoder = new TextEncoder(); 39 const textDecoder = new TextDecoder(); 40 let client: Client; 41 let service: ServiceClient; 42 let sentChunks: Chunk[]; 43 let packetsToSend: Uint8Array[][]; 44 45 beforeEach(() => { 46 const lib = new ProtoCollection(); 47 const channels: Channel[] = [new Channel(1, handleRequest)]; 48 client = Client.fromProtoSet(channels, lib); 49 service = client.channel(1)!.service('pw.transfer.Transfer')!; 50 51 sentChunks = []; 52 packetsToSend = []; 53 }); 54 55 function handleRequest(data: Uint8Array): void { 56 const packet = decode(data); 57 if (packet.getType() !== PacketType.CLIENT_STREAM) { 58 return; 59 } 60 61 const chunk = Chunk.deserializeBinary(packet.getPayload_asU8()); 62 sentChunks.push(chunk); 63 64 if (packetsToSend.length > 0) { 65 const responses = packetsToSend.shift()!; 66 for (const response of responses) { 67 client.processPacket(response); 68 } 69 } 70 } 71 72 function receivedData(): Uint8Array { 73 let length = 0; 74 sentChunks.forEach((chunk: Chunk) => { 75 length += chunk.getData().length; 76 }); 77 const data = new Uint8Array(length); 78 let offset = 0; 79 sentChunks.forEach((chunk: Chunk) => { 80 data.set(chunk.getData() as Uint8Array, offset); 81 offset += chunk.getData().length; 82 }); 83 return data; 84 } 85 86 function enqueueServerError(method: MethodStub, error: Status): void { 87 const packet = new RpcPacket(); 88 packet.setType(PacketType.SERVER_ERROR); 89 packet.setChannelId(1); 90 packet.setServiceId(service.id); 91 packet.setMethodId(method.id); 92 packet.setCallId(method.rpcs.nextCallId); 93 packet.setStatus(error); 94 packetsToSend.push([packet.serializeBinary()]); 95 } 96 97 function enqueueServerResponses(method: MethodStub, responses: Chunk[][]) { 98 for (const responseGroup of responses) { 99 const serializedGroup = []; 100 for (const response of responseGroup) { 101 const packet = new RpcPacket(); 102 packet.setType(PacketType.SERVER_STREAM); 103 packet.setChannelId(1); 104 packet.setServiceId(service.id); 105 packet.setMethodId(method.id); 106 packet.setCallId(method.rpcs.nextCallId); 107 packet.setStatus(Status.OK); 108 packet.setPayload(response.serializeBinary()); 109 serializedGroup.push(packet.serializeBinary()); 110 } 111 packetsToSend.push(serializedGroup); 112 } 113 } 114 115 function buildChunk( 116 sessionId: number, 117 offset: number, 118 data: string, 119 remainingBytes: number, 120 ): Chunk { 121 const chunk = new Chunk(); 122 chunk.setTransferId(sessionId); 123 chunk.setOffset(offset); 124 chunk.setData(textEncoder.encode(data)); 125 chunk.setRemainingBytes(remainingBytes); 126 return chunk; 127 } 128 129 it('read transfer basic', async () => { 130 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 131 132 const chunk1 = buildChunk(3, 0, 'abc', 0); 133 enqueueServerResponses(service.method('Read')!, [[chunk1]]); 134 135 const data = await manager.read(3); 136 expect(textDecoder.decode(data)).toEqual('abc'); 137 expect(sentChunks).toHaveLength(2); 138 expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true); 139 expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK); 140 }); 141 142 it('read transfer multichunk', async () => { 143 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 144 145 const chunk1 = buildChunk(3, 0, 'abc', 3); 146 const chunk2 = buildChunk(3, 3, 'def', 0); 147 enqueueServerResponses(service.method('Read')!, [[chunk1, chunk2]]); 148 149 const data = await manager.read(3); 150 expect(data).toEqual(textEncoder.encode('abcdef')); 151 expect(sentChunks).toHaveLength(2); 152 expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true); 153 expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK); 154 }); 155 156 it('read transfer progress callback', async () => { 157 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 158 159 const chunk1 = buildChunk(3, 0, 'abc', 3); 160 const chunk2 = buildChunk(3, 3, 'def', 0); 161 enqueueServerResponses(service.method('Read')!, [[chunk1, chunk2]]); 162 163 const progress: Array<ProgressStats> = []; 164 165 const data = await manager.read(3, (stats: ProgressStats) => { 166 progress.push(stats); 167 }); 168 expect(textDecoder.decode(data)).toEqual('abcdef'); 169 expect(sentChunks).toHaveLength(2); 170 expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true); 171 expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK); 172 173 expect(progress).toEqual([ 174 new ProgressStats(3, 3, 6), 175 new ProgressStats(6, 6, 6), 176 ]); 177 }); 178 179 it('read transfer retry bad offset', async () => { 180 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 181 182 const chunk1 = buildChunk(3, 0, '123', 6); 183 const chunk2 = buildChunk(3, 1, '456', 3); // Incorrect offset; expecting 3 184 const chunk3 = buildChunk(3, 3, '456', 3); 185 const chunk4 = buildChunk(3, 6, '789', 0); 186 187 enqueueServerResponses(service.method('Read')!, [ 188 [chunk1, chunk2], 189 [chunk3, chunk4], 190 ]); 191 192 const data = await manager.read(3); 193 expect(data).toEqual(textEncoder.encode('123456789')); 194 expect(sentChunks).toHaveLength(3); 195 expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true); 196 expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK); 197 }); 198 199 it('read transfer retry timeout', async () => { 200 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 201 202 const chunk = buildChunk(3, 0, 'xyz', 0); 203 enqueueServerResponses(service.method('Read')!, [[], [chunk]]); 204 205 const data = await manager.read(3); 206 expect(textDecoder.decode(data)).toEqual('xyz'); 207 208 // Two transfer parameter requests should have been sent. 209 expect(sentChunks).toHaveLength(3); 210 expect(sentChunks[sentChunks.length - 1].hasStatus()).toBe(true); 211 expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK); 212 }); 213 214 it('read transfer timeout', async () => { 215 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 216 217 await manager 218 .read(27) 219 .then(() => { 220 fail('Unexpected completed promise'); 221 }) 222 .catch((error) => { 223 expect(error.id).toEqual(27); 224 expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]); 225 expect(sentChunks).toHaveLength(4); 226 }); 227 }); 228 229 it('read transfer error', async () => { 230 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 231 232 const chunk = new Chunk(); 233 chunk.setStatus(Status.NOT_FOUND); 234 chunk.setTransferId(31); 235 enqueueServerResponses(service.method('Read')!, [[chunk]]); 236 237 await manager 238 .read(31) 239 .then(() => { 240 fail('Unexpected completed promise'); 241 }) 242 .catch((error) => { 243 expect(error.id).toEqual(31); 244 expect(Status[error.status]).toEqual(Status[Status.NOT_FOUND]); 245 }); 246 }); 247 248 it('read transfer server error', async () => { 249 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 250 251 enqueueServerError(service.method('Read')!, Status.NOT_FOUND); 252 await manager 253 .read(31) 254 .then((data) => { 255 fail('Unexpected completed promise'); 256 }) 257 .catch((error) => { 258 expect(error.id).toEqual(31); 259 expect(Status[error.status]).toEqual(Status[Status.INTERNAL]); 260 }); 261 }); 262 263 it('write transfer basic', async () => { 264 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 265 266 const chunk = new Chunk(); 267 chunk.setTransferId(4); 268 chunk.setOffset(0); 269 chunk.setPendingBytes(32); 270 chunk.setMaxChunkSizeBytes(8); 271 272 const completeChunk = new Chunk(); 273 completeChunk.setTransferId(4); 274 completeChunk.setStatus(Status.OK); 275 276 enqueueServerResponses(service.method('Write')!, [ 277 [chunk], 278 [completeChunk], 279 ]); 280 281 await manager.write(4, textEncoder.encode('hello')); 282 expect(sentChunks).toHaveLength(2); 283 expect(receivedData()).toEqual(textEncoder.encode('hello')); 284 }); 285 286 it('write transfer max chunk size', async () => { 287 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 288 289 const chunk = new Chunk(); 290 chunk.setTransferId(4); 291 chunk.setOffset(0); 292 chunk.setPendingBytes(32); 293 chunk.setMaxChunkSizeBytes(8); 294 295 const completeChunk = new Chunk(); 296 completeChunk.setTransferId(4); 297 completeChunk.setStatus(Status.OK); 298 299 enqueueServerResponses(service.method('Write')!, [ 300 [chunk], 301 [completeChunk], 302 ]); 303 304 await manager.write(4, textEncoder.encode('hello world')); 305 expect(sentChunks).toHaveLength(3); 306 expect(receivedData()).toEqual(textEncoder.encode('hello world')); 307 expect(sentChunks[1].getData()).toEqual(textEncoder.encode('hello wo')); 308 expect(sentChunks[2].getData()).toEqual(textEncoder.encode('rld')); 309 }); 310 311 it('write transfer multiple parameters', async () => { 312 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 313 314 const chunk = new Chunk(); 315 chunk.setTransferId(4); 316 chunk.setOffset(0); 317 chunk.setPendingBytes(8); 318 chunk.setMaxChunkSizeBytes(8); 319 320 const chunk2 = new Chunk(); 321 chunk2.setTransferId(4); 322 chunk2.setOffset(8); 323 chunk2.setPendingBytes(8); 324 chunk2.setMaxChunkSizeBytes(8); 325 326 const completeChunk = new Chunk(); 327 completeChunk.setTransferId(4); 328 completeChunk.setStatus(Status.OK); 329 330 enqueueServerResponses(service.method('Write')!, [ 331 [chunk], 332 [chunk2], 333 [completeChunk], 334 ]); 335 336 await manager.write(4, textEncoder.encode('data to write')); 337 expect(sentChunks).toHaveLength(3); 338 expect(receivedData()).toEqual(textEncoder.encode('data to write')); 339 expect(sentChunks[1].getData()).toEqual(textEncoder.encode('data to ')); 340 expect(sentChunks[2].getData()).toEqual(textEncoder.encode('write')); 341 }); 342 343 it('write transfer parameters update', async () => { 344 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 345 346 const chunk = new Chunk(); 347 chunk.setTransferId(4); 348 chunk.setOffset(0); 349 chunk.setPendingBytes(8); 350 chunk.setMaxChunkSizeBytes(4); 351 chunk.setType(Chunk.Type.PARAMETERS_RETRANSMIT); 352 chunk.setWindowEndOffset(8); 353 354 const chunk2 = new Chunk(); 355 chunk2.setTransferId(4); 356 chunk2.setOffset(4); 357 chunk2.setPendingBytes(8); 358 chunk2.setType(Chunk.Type.PARAMETERS_CONTINUE); 359 chunk2.setWindowEndOffset(12); 360 361 const chunk3 = new Chunk(); 362 chunk3.setTransferId(4); 363 chunk3.setOffset(8); 364 chunk3.setPendingBytes(8); 365 chunk3.setType(Chunk.Type.PARAMETERS_CONTINUE); 366 chunk3.setWindowEndOffset(16); 367 368 const chunk4 = new Chunk(); 369 chunk4.setTransferId(4); 370 chunk4.setOffset(12); 371 chunk4.setPendingBytes(8); 372 chunk4.setType(Chunk.Type.PARAMETERS_CONTINUE); 373 chunk4.setWindowEndOffset(20); 374 375 const chunk5 = new Chunk(); 376 chunk5.setTransferId(4); 377 chunk5.setOffset(16); 378 chunk5.setPendingBytes(8); 379 chunk5.setType(Chunk.Type.PARAMETERS_CONTINUE); 380 chunk5.setWindowEndOffset(24); 381 382 const chunk6 = new Chunk(); 383 chunk6.setTransferId(4); 384 chunk6.setOffset(20); 385 chunk6.setPendingBytes(8); 386 chunk6.setType(Chunk.Type.PARAMETERS_CONTINUE); 387 chunk6.setWindowEndOffset(28); 388 389 const completeChunk = new Chunk(); 390 completeChunk.setTransferId(4); 391 completeChunk.setStatus(Status.OK); 392 393 enqueueServerResponses(service.method('Write')!, [ 394 [chunk], 395 [chunk2], 396 [chunk3], 397 [chunk4], 398 [chunk5], 399 [chunk6], 400 [completeChunk], 401 ]); 402 403 await manager.write(4, textEncoder.encode('hello this is a message')); 404 expect(receivedData()).toEqual( 405 textEncoder.encode('hello this is a message'), 406 ); 407 expect(sentChunks[1].getData()).toEqual(textEncoder.encode('hell')); 408 expect(sentChunks[2].getData()).toEqual(textEncoder.encode('o th')); 409 expect(sentChunks[3].getData()).toEqual(textEncoder.encode('is i')); 410 expect(sentChunks[4].getData()).toEqual(textEncoder.encode('s a ')); 411 expect(sentChunks[5].getData()).toEqual(textEncoder.encode('mess')); 412 expect(sentChunks[6].getData()).toEqual(textEncoder.encode('age')); 413 }); 414 415 it('write transfer progress callback', async () => { 416 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 417 418 const chunk = new Chunk(); 419 chunk.setTransferId(4); 420 chunk.setOffset(0); 421 chunk.setPendingBytes(8); 422 chunk.setMaxChunkSizeBytes(8); 423 424 const chunk2 = new Chunk(); 425 chunk2.setTransferId(4); 426 chunk2.setOffset(8); 427 chunk2.setPendingBytes(8); 428 chunk2.setMaxChunkSizeBytes(8); 429 430 const completeChunk = new Chunk(); 431 completeChunk.setTransferId(4); 432 completeChunk.setStatus(Status.OK); 433 434 enqueueServerResponses(service.method('Write')!, [ 435 [chunk], 436 [chunk2], 437 [completeChunk], 438 ]); 439 440 const progress: Array<ProgressStats> = []; 441 await manager.write( 442 4, 443 textEncoder.encode('data to write'), 444 (stats: ProgressStats) => { 445 progress.push(stats); 446 }, 447 ); 448 expect(sentChunks).toHaveLength(3); 449 expect(receivedData()).toEqual(textEncoder.encode('data to write')); 450 expect(sentChunks[1].getData()).toEqual(textEncoder.encode('data to ')); 451 expect(sentChunks[2].getData()).toEqual(textEncoder.encode('write')); 452 453 console.log(progress); 454 expect(progress).toEqual([ 455 new ProgressStats(8, 0, 13), 456 new ProgressStats(13, 8, 13), 457 new ProgressStats(13, 13, 13), 458 ]); 459 }); 460 461 it('write transfer rewind', async () => { 462 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 463 464 const chunk1 = new Chunk(); 465 chunk1.setTransferId(4); 466 chunk1.setOffset(0); 467 chunk1.setPendingBytes(8); 468 chunk1.setMaxChunkSizeBytes(8); 469 470 const chunk2 = new Chunk(); 471 chunk2.setTransferId(4); 472 chunk2.setOffset(8); 473 chunk2.setPendingBytes(8); 474 chunk2.setMaxChunkSizeBytes(8); 475 476 const chunk3 = new Chunk(); 477 chunk3.setTransferId(4); 478 chunk3.setOffset(4); // Rewind 479 chunk3.setPendingBytes(8); 480 chunk3.setMaxChunkSizeBytes(8); 481 482 const chunk4 = new Chunk(); 483 chunk4.setTransferId(4); 484 chunk4.setOffset(12); // Rewind 485 chunk4.setPendingBytes(16); 486 chunk4.setMaxChunkSizeBytes(16); 487 488 const completeChunk = new Chunk(); 489 completeChunk.setTransferId(4); 490 completeChunk.setStatus(Status.OK); 491 492 enqueueServerResponses(service.method('Write')!, [ 493 [chunk1], 494 [chunk2], 495 [chunk3], 496 [chunk4], 497 [completeChunk], 498 ]); 499 500 await manager.write(4, textEncoder.encode('pigweed data transfer')); 501 expect(sentChunks).toHaveLength(5); 502 expect(sentChunks[1].getData()).toEqual(textEncoder.encode('pigweed ')); 503 expect(sentChunks[2].getData()).toEqual(textEncoder.encode('data tra')); 504 expect(sentChunks[3].getData()).toEqual(textEncoder.encode('eed data')); 505 expect(sentChunks[4].getData()).toEqual(textEncoder.encode(' transfer')); 506 }); 507 508 it('write transfer bad offset', async () => { 509 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 510 511 const chunk1 = new Chunk(); 512 chunk1.setTransferId(4); 513 chunk1.setOffset(0); 514 chunk1.setPendingBytes(8); 515 chunk1.setMaxChunkSizeBytes(8); 516 517 const chunk2 = new Chunk(); 518 chunk2.setTransferId(4); 519 chunk2.setOffset(100); // larger offset than data 520 chunk2.setPendingBytes(8); 521 chunk2.setMaxChunkSizeBytes(8); 522 523 const completeChunk = new Chunk(); 524 completeChunk.setTransferId(4); 525 completeChunk.setStatus(Status.OK); 526 527 enqueueServerResponses(service.method('Write')!, [ 528 [chunk1], 529 [chunk2], 530 [completeChunk], 531 ]); 532 533 await manager 534 .write(4, textEncoder.encode('small data')) 535 .then(() => { 536 fail('Unexpected succesful promise'); 537 }) 538 .catch((error) => { 539 expect(error.id).toEqual(4); 540 expect(Status[error.status]).toEqual(Status[Status.OUT_OF_RANGE]); 541 }); 542 }); 543 544 it('write transfer error', async () => { 545 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 546 547 const chunk = new Chunk(); 548 chunk.setTransferId(21); 549 chunk.setStatus(Status.UNAVAILABLE); 550 551 enqueueServerResponses(service.method('Write')!, [[chunk]]); 552 553 await manager 554 .write(21, textEncoder.encode('no write')) 555 .then(() => { 556 fail('Unexpected succesful promise'); 557 }) 558 .catch((error) => { 559 expect(error.id).toEqual(21); 560 expect(Status[error.status]).toEqual(Status[Status.UNAVAILABLE]); 561 }); 562 }); 563 564 it('write transfer server error', async () => { 565 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 566 567 const chunk = new Chunk(); 568 chunk.setTransferId(21); 569 chunk.setStatus(Status.NOT_FOUND); 570 571 enqueueServerError(service.method('Write')!, Status.NOT_FOUND); 572 573 await manager 574 .write(21, textEncoder.encode('server error')) 575 .then(() => { 576 fail('Unexpected succesful promise'); 577 }) 578 .catch((error) => { 579 expect(error.id).toEqual(21); 580 expect(Status[error.status]).toEqual(Status[Status.INTERNAL]); 581 }); 582 }); 583 584 it('write transfer timeout after initial chunk', async () => { 585 const manager = new Manager(service, 0.001, 4, 2); 586 587 await manager 588 .write(22, textEncoder.encode('no server response!')) 589 .then(() => { 590 fail('unexpected succesful write'); 591 }) 592 .catch((error) => { 593 expect(sentChunks).toHaveLength(3); // Initial chunk + two retries. 594 expect(error.id).toEqual(22); 595 expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]); 596 }); 597 }); 598 599 it('write transfer timeout after intermediate chunk', async () => { 600 const manager = new Manager(service, DEFAULT_TIMEOUT_S, 4, 2); 601 602 const chunk = new Chunk(); 603 chunk.setTransferId(22); 604 chunk.setPendingBytes(10); 605 chunk.setMaxChunkSizeBytes(5); 606 607 enqueueServerResponses(service.method('Write')!, [[chunk]]); 608 609 await manager 610 .write(22, textEncoder.encode('0123456789')) 611 .then(() => { 612 fail('unexpected succesful write'); 613 }) 614 .catch((error) => { 615 const expectedChunk1 = new Chunk(); 616 expectedChunk1.setTransferId(22); 617 expectedChunk1.setResourceId(22); 618 expectedChunk1.setType(Chunk.Type.START); 619 const expectedChunk2 = new Chunk(); 620 expectedChunk2.setTransferId(22); 621 expectedChunk2.setData(textEncoder.encode('01234')); 622 expectedChunk2.setType(Chunk.Type.DATA); 623 const lastChunk = new Chunk(); 624 lastChunk.setTransferId(22); 625 lastChunk.setData(textEncoder.encode('56789')); 626 lastChunk.setOffset(5); 627 lastChunk.setRemainingBytes(0); 628 lastChunk.setType(Chunk.Type.DATA); 629 630 const expectedChunks = [ 631 expectedChunk1, 632 expectedChunk2, 633 lastChunk, 634 lastChunk, // retry 1 635 lastChunk, // retry 2 636 ]; 637 638 expect(sentChunks).toEqual(expectedChunks); 639 640 expect(error.id).toEqual(22); 641 expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]); 642 }); 643 }); 644 645 it('write zero pending bytes is internal error', async () => { 646 const manager = new Manager(service, DEFAULT_TIMEOUT_S); 647 648 const chunk = new Chunk(); 649 chunk.setTransferId(23); 650 chunk.setPendingBytes(0); 651 652 enqueueServerResponses(service.method('Write')!, [[chunk]]); 653 654 await manager 655 .write(23, textEncoder.encode('no write')) 656 .then(() => { 657 fail('Unexpected succesful promise'); 658 }) 659 .catch((error) => { 660 expect(error.id).toEqual(23); 661 expect(Status[error.status]).toEqual(Status[Status.INTERNAL]); 662 }); 663 }); 664}); 665