# Owner(s): ["oncall: distributed"] # This test file contains positive tests for c10d with NCCL backend. # During the test, it is expected that ProcessGroup will not be aborted, destroyed or incur fatal error. # Please be mindful of this when adding tests here. # If you need to add tests for group creation, abort or destroy, please add tests in test_c10d_nccl.py. # There are two ways to launch tests in this file: # 1. Run this file directly with `python test_c10d_ops_nccl.py` # 2. Use multi-process launcher, e.g. `torchrun --standalone --nproc-per-node 2 test_c10d_ops_nccl.py` import math import os import sys import tempfile import torch import torch.distributed as c10d if not c10d.is_available() or not c10d.is_nccl_available(): print("c10d NCCL not available, skipping tests", file=sys.stderr) sys.exit(0) import torch.distributed as dist from torch.testing._internal.common_cuda import TEST_MULTIGPU from torch.testing._internal.common_distributed import ( init_multigpu_helper, MultiProcContinousTest, requires_nccl, ) from torch.testing._internal.common_utils import ( skip_but_pass_in_sandcastle_if, skipIfRocm, TEST_WITH_DEV_DBG_ASAN, ) if TEST_WITH_DEV_DBG_ASAN: print( "Skip ASAN as torch + multiprocessing spawn have known issues", file=sys.stderr ) sys.exit(0) class ProcessGroupNCCLOpTest(MultiProcContinousTest): @classmethod def backend_str(cls) -> str: return "nccl" @classmethod def opts(cls, high_priority_stream=False): opts = c10d.ProcessGroupNCCL.Options() opts.is_high_priority_stream = high_priority_stream return opts @property def rank_to_GPU(self): # return rank to GPU map return init_multigpu_helper(self.world_size, "nccl") @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_empty_tensors(self): pg = self.pg local_device_idx = self.rank_to_GPU[self.rank][0] xs = [torch.FloatTensor([]).cuda(local_device_idx)] pg.broadcast(xs).wait() self.assertEqual(0, xs[0].numel()) pg.allreduce(xs).wait() self.assertEqual(0, xs[0].numel()) pg.reduce(xs).wait() self.assertEqual(0, xs[0].numel()) ys = [ [ torch.FloatTensor([]).cuda(local_device_idx) for _ in range(self.world_size) ] ] pg.allgather(ys, xs).wait() for y in ys[0]: self.assertEqual(0, y.numel()) ys = [torch.FloatTensor([]).cuda(local_device_idx)] xs = [ [ torch.FloatTensor([]).cuda(local_device_idx) for _ in range(self.world_size) ] ] pg.reduce_scatter(ys, xs).wait() self.assertEqual(0, ys[0].numel()) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_broadcast_ops(self): pg = self.pg def broadcast(xs, rootRank, rootTensor): opts = c10d.BroadcastOptions() opts.rootRank = rootRank opts.rootTensor = rootTensor work = pg.broadcast(xs, opts) work.wait() return xs # Every rank is root once for i in range(self.world_size): # Run with 1 input tensor x = torch.tensor([self.rank]).cuda(self.rank_to_GPU[self.rank][0]) output = broadcast([x], i, 0) self.assertEqual(torch.tensor([i]), output[0]) expected_tensor = torch.empty([i + 1, i + 1]).fill_(i + 1) xs = [ torch.empty([i + 1, i + 1]).fill_(-1).cuda(device=device_idx) for device_idx in self.rank_to_GPU[self.rank] ] # test with multiple input tensors (multiple gpu in one rank) for j in range(len(xs)): if self.rank == i: xs[j] = expected_tensor.cuda(device=self.rank_to_GPU[self.rank][j]) broadcast(xs, i, j) for tensor in xs: self.assertEqual(tensor, expected_tensor) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_sparse_allreduce_ops(self): pg = self.pg indices = torch.tensor([[0, 1]]) values = torch.tensor([[1, 2, 0], [4, 0, 6]]) sparse_tensor = torch.sparse_coo_tensor(indices, values, size=(2, 3)).to( self.rank ) # sparse allreduce call is wrapped in a try catch since the c10d API is only available in the nccl experimental branch try: tensor_list = [sparse_tensor] work = pg.allreduce(tensor_list) work.wait() # tensor_list is a list of size 1, with the allreduce output as a dense tensor a = torch.tensor([[2, 4, 0], [8, 0, 12]]).to(self.rank) self.assertEqual(tensor_list[0], a) except RuntimeError as e: if "NCCL does not support all_reduce with sparse tensors" in str(e): pass else: # Rethrow the exception if it's a different error raise @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_allreduce_ops(self): device_count = torch.cuda.device_count() pg = self.pg local_device_id = self.rank_to_GPU[self.rank][0] def allreduce(tensors, op): opts = c10d.AllreduceOptions() opts.reduceOp = op work = pg.allreduce(tensors, opts) work.wait() # Sum tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)] allreduce(tensors, c10d.ReduceOp.SUM) ndev = self.world_size self.assertEqual( torch.tensor([ndev * (ndev + 1) // 2]), tensors[0], ) # Avg (only available for NCCL 2.10+) if torch.cuda.nccl.version() >= (2, 10, 0): tensors = [torch.tensor([self.rank + 1.0]).cuda(local_device_id)] allreduce(tensors, c10d.ReduceOp.AVG) ndev = self.world_size self.assertEqual( torch.tensor([ndev * (ndev + 1.0) / (2.0 * ndev)]), tensors[0], ) # Premul Sum if torch.cuda.nccl.version() >= (2, 11, 1): for dtype in torch.half, torch.float, torch.double: for factor in ( 3.0, torch.tensor([5.0], device=local_device_id, dtype=dtype), ): tensors = [ torch.tensor([self.rank + 1]) .cuda(local_device_id) .to(dtype=dtype) ] allreduce(tensors, c10d._make_nccl_premul_sum(factor)) self.assertEqual( factor * torch.tensor( [self.world_size * (self.world_size + 1) / 2], dtype=dtype, device=local_device_id, ), tensors[0], ) # Product tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)] allreduce(tensors, c10d.ReduceOp.PRODUCT) self.assertEqual(torch.tensor([math.factorial(self.world_size)]), tensors[0]) # Min tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)] allreduce(tensors, c10d.ReduceOp.MIN) self.assertEqual(torch.tensor([1]), tensors[0]) # Max tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)] allreduce(tensors, c10d.ReduceOp.MAX) self.assertEqual(torch.tensor([self.world_size]), tensors[0]) for op, err in zip( (c10d.ReduceOp.BAND, c10d.ReduceOp.BOR, c10d.ReduceOp.BXOR), ("ReduceOp.BAND", "ReduceOp.BOR", "ReduceOp.BXOR"), ): with self.assertRaisesRegex(ValueError, "Cannot use " + err + " with NCCL"): allreduce(tensors, op) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_alltoall_ops_with_cudafree_race(self): pg = self.pg opts = c10d.AllToAllOptions() local_device = f"cuda:{self.rank_to_GPU[self.rank][0]}" torch.cuda.set_device(local_device) input = torch.rand(1000, 1000, device=local_device) output = torch.rand(1000, 1000, device=local_device) race_tensors = [] # create some tensors to race with alltoall collective for _ in range(10): tmp = [] for i in range(5): tmp.append(torch.rand(10 ** (3 + i), device=local_device)) race_tensors.append(tmp) for i in range(10): race_tensors.pop() work = pg.alltoall_base(output, input, [], [], opts) # this triggers cudaFree torch.cuda.empty_cache() work.wait() torch.cuda.synchronize(device=local_device) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_allreduce_in_cudagraph(self): pg = self.pg local_device_idx = self.rank_to_GPU[self.rank][0] with torch.cuda.device(local_device_idx): xs = [torch.FloatTensor([1]).cuda(local_device_idx)] # single warmup pg.allreduce(xs).wait() self.assertEqual(xs[0].item(), 2) graph = torch.cuda.CUDAGraph() with torch.cuda.graph(graph): pg.allreduce(xs).wait() self.assertEqual(xs[0].item(), 2) graph.replay() graph.replay() self.assertEqual(xs[0].item(), 8) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") @skipIfRocm() def test_nccl_watchdog_cudagraph(self): # test that the watchdog does not crash graphs with disallowed event query pg = self.pg rank = self.rank_to_GPU[self.rank][0] with torch.cuda.device(rank): for i in range(10): xs = [torch.FloatTensor([1]).cuda(rank)] ys = [torch.FloatTensor([4]).cuda(rank)] for _ in range(30): pg.allreduce(xs[0]).wait() graph = torch.cuda.CUDAGraph() with torch.cuda.graph(graph): xs[0] += 0.0 pg.allreduce(xs[0]).wait() pg.allreduce(xs[0]).wait() pg.allreduce(xs[0]).wait() xs[0] += 0.0 for _ in range(100): graph.replay() @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_reduce_ops(self): pg = self.pg local_device_id = self.rank_to_GPU[self.rank][0] def reduce(xs, rootRank, rootTensor, op=None): opts = c10d.ReduceOptions() opts.rootRank = rootRank opts.rootTensor = rootTensor if op: opts.reduceOp = op work = pg.reduce(xs, opts) work.wait() # for every root tensor for rt in range(self.world_size): tensors = [torch.tensor([self.rank + 1]).cuda(local_device_id)] reduce(tensors, rt, 0) if self.rank == rt: self.assertEqual( torch.tensor([self.world_size * (self.world_size + 1) // 2]), tensors[0], ) else: self.assertEqual( torch.tensor([self.rank + 1]), tensors[0], ) for op, err in zip( (c10d.ReduceOp.BAND, c10d.ReduceOp.BOR, c10d.ReduceOp.BXOR), ("ReduceOp.BAND", "ReduceOp.BOR", "ReduceOp.BXOR"), ): with self.assertRaisesRegex( ValueError, "Cannot use " + err + " with NCCL" ): reduce(tensors, self.rank, rt, op) # Premul sum if torch.cuda.nccl.version() >= (2, 11, 1): for factor in (3.0, torch.tensor([5.0], device=local_device_id)): if isinstance(factor, torch.Tensor): factor_ref = factor.cpu().item() else: factor_ref = factor float_tensors = [ torch.tensor( [self.rank + 1.0], device=f"cuda:{local_device_id}" ) ] float_tensors_ref = [ torch.tensor( [(self.rank + 1.0) * factor_ref], device=f"cuda:{local_device_id}", ) ] reduce(float_tensors_ref, rt, 0) reduce(float_tensors, rt, 0, c10d._make_nccl_premul_sum(factor)) if self.rank == rt: self.assertEqual(float_tensors_ref[0], float_tensors[0]) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_allgather_ops(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] def allgather(output_ts, input_ts): work = pg.allgather(output_ts, input_ts) return work.wait() tensors = [torch.empty(2, 2).fill_(2).cuda(device=i) for i in local_device_ids] output_tensors = [] expected_output = [] output_per_gpu = ( [torch.empty(2, 2).fill_(-1)] * len(local_device_ids) * self.world_size ) expected_per_gpu = ( [torch.empty(2, 2).fill_(2)] * len(local_device_ids) * self.world_size ) for gpu in local_device_ids: output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu]) expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu]) result = allgather(output_tensors, tensors) # Verification self.assertEqual(output_tensors, expected_output) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_allgather_base_ops(self): pg = self.pg local_device_id = self.rank_to_GPU[self.rank][0] def allgather_base(output_t, input_t): work = pg._allgather_base(output_t, input_t) work.wait() # allgather_base is GPU number agnostic. # Each rank contribute one tensor regardless of GPU counts tensor = torch.tensor([self.rank]).cuda(local_device_id) output_t = torch.empty((self.world_size), dtype=tensor.dtype).cuda( local_device_id ) allgather_base(output_t, tensor) # Verification self.assertEqual(torch.arange(self.world_size), output_t) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_allgather_base_basics(self): pg = self.pg local_device_id = self.rank_to_GPU[self.rank][0] def allgather_base(output_t, input_t): work = pg._allgather_base(output_t, input_t) work.wait() # anticipate an error with self.assertRaisesRegex( ValueError, "output tensor size must be equal to world_size times input tensor size", ): tensor = torch.tensor([self.rank]).cuda(local_device_id) output_t = torch.empty((self.world_size + 1), dtype=tensor.dtype).cuda( local_device_id ) # fails the check because output_t is not correctly sized allgather_base(output_t, tensor) # anticipate an error with self.assertRaisesRegex( TypeError, "output tensor must have the same type as input tensor" ): tensor = torch.tensor([self.rank], dtype=torch.float).cuda(local_device_id) output_t = torch.empty((self.world_size + 1), dtype=torch.long).cuda( local_device_id ) # fails the check because the dtype is different allgather_base(output_t, tensor) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_gather_ops(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] num_gpus = len(local_device_ids) def gather(output_t, input_t, rootRank): opts = c10d.GatherOptions() opts.rootRank = rootRank if rootRank == self.rank: work = pg.gather(output_t, input_t, opts) else: work = pg.gather([], input_t, opts) work.wait() # init input tensors = [] for device_id in local_device_ids: tensors.append(torch.tensor([self.rank]).cuda(device_id)) # init output output_ts = [] for idx in range(num_gpus): gpu_idx = local_device_ids[idx] output_ts.append([]) for rank in range(self.world_size): output_ts[idx].append(torch.tensor([-1]).cuda(gpu_idx)) expected = [[torch.tensor([rank]) for rank in range(self.world_size)]] for rank in range(self.world_size): gather(output_ts, tensors, rank) if rank == self.rank: self.assertEqual(expected, output_ts) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_gather_stress(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] num_gpus = len(local_device_ids) def gather(output_t, input_t, rootRank): opts = c10d.GatherOptions() opts.rootRank = rootRank if rootRank == self.rank: work = pg.gather(output_t, input_t, opts) else: work = pg.gather([], input_t, opts) work.wait() stress_length = 1000 # init input tensors = [] for i in range(stress_length): tensors.append([]) for device_id in local_device_ids: tensors[i].append(torch.tensor([self.rank]).cuda(device_id)) # init output output_ts = [] for i in range(stress_length): output_ts.append([[] for _ in range(num_gpus)]) for idx, ls in enumerate(output_ts[i]): gpu_idx = local_device_ids[idx] for _ in range(self.world_size): ls.append(torch.tensor([-1]).cuda(gpu_idx)) expected = [[torch.tensor([rank]) for rank in range(self.world_size)]] for i in range(stress_length): for rank in range(self.world_size): gather(output_ts[i], tensors[i], rank) # Verification if rank == self.rank: self.assertEqual(output_ts[i], expected) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_gather_checks(self): pg = self.pg device_id = self.rank_to_GPU[self.rank][0] # init input tensor = torch.tensor([self.rank]).cuda(device_id) # init output output_ts = [] for rank in range(self.world_size): output_ts.append(torch.tensor([-1]).cuda(device_id)) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.GatherOptions() opts.rootRank = -1 pg.gather([output_ts], [tensor], opts) with self.assertRaisesRegex(TypeError, "incompatible function arguments"): pg.gather([output_ts], [tensor], 0) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.GatherOptions() opts.rootRank = self.world_size pg.gather([output_ts], [tensor], opts) with self.assertRaisesRegex( # throws error message from dispatcher RuntimeError, "There were no tensor arguments to this function", ): opts = c10d.GatherOptions() opts.rootRank = 0 pg.gather([output_ts], [], opts) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_scatter_ops(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] num_gpus = len(local_device_ids) def scatter(output_t, input_t, rootRank): opts = c10d.ScatterOptions() opts.rootRank = rootRank if rootRank == self.rank: work = pg.scatter(output_t, input_t, opts) else: work = pg.scatter(output_t, [], opts) work.wait() # init output tensors = [] for device_id in local_device_ids: tensors.append(torch.tensor([-1]).cuda(device_id)) # init input scatter_list = [] for idx in range(num_gpus): gpu_idx = local_device_ids[idx] scatter_list.append([]) for rank in range(self.world_size): scatter_list[idx].append(torch.tensor([rank]).cuda(gpu_idx)) # test each rank to scatter expected = [torch.tensor([self.rank])] for rank in range(self.world_size): scatter(tensors, scatter_list, rank) self.assertEqual(expected, tensors) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_scatter_stress(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] num_gpus = len(local_device_ids) def scatter(output_t, input_t, rootRank): opts = c10d.ScatterOptions() opts.rootRank = rootRank if rootRank == self.rank: work = pg.scatter(output_t, input_t, opts) else: work = pg.scatter(output_t, [], opts) work.wait() stress_length = 1000 # init output tensors = [] for i in range(stress_length): tensors.append([]) for device_id in local_device_ids: tensors[i].append(torch.tensor([-1]).cuda(device_id)) # init input scatter_list = [] for i in range(stress_length): scatter_list.append([[] for _ in range(num_gpus)]) for idx, ls in enumerate(scatter_list[i]): gpu_idx = local_device_ids[idx] for rank in range(self.world_size): ls.append(torch.tensor([rank]).cuda(gpu_idx)) # test each rank to scatter expected = [torch.tensor([self.rank])] for i in range(stress_length): for rank in range(self.world_size): scatter(tensors[i], scatter_list[i], rank) # Verification self.assertEqual(tensors[i], expected) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_scatter_checks(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] num_gpus = len(local_device_ids) # init output tensors = [] for device_id in local_device_ids: tensors.append(torch.tensor([-1]).cuda(device_id)) # init input scatter_list = [] for idx in range(num_gpus): gpu_idx = local_device_ids[idx] scatter_list.append([]) for rank in range(self.world_size): scatter_list[idx].append(torch.tensor([rank]).cuda(gpu_idx)) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.ScatterOptions() opts.rootRank = -1 pg.scatter(tensors, scatter_list, opts) with self.assertRaisesRegex(TypeError, "incompatible function arguments"): pg.scatter(tensors, scatter_list, 0) with self.assertRaisesRegex(ValueError, "invalid root rank"): opts = c10d.ScatterOptions() opts.rootRank = self.world_size pg.scatter(tensors, scatter_list, opts) with self.assertRaisesRegex( # throws error message from dispatcher RuntimeError, "There were no tensor arguments to this function", ): opts = c10d.ScatterOptions() opts.rootRank = 0 pg.scatter([], scatter_list, opts) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_reduce_scatter_base_basics(self): pg = self.pg local_device_id = self.rank_to_GPU[self.rank][0] def reduce_scatter_base(output_t, input_t): work = pg._reduce_scatter_base(output_t, input_t) work.wait() # anticipate an error with self.assertRaisesRegex( ValueError, "input tensor must be the same size as output size times world size", ): input_t = torch.tensor([self.rank]).cuda(local_device_id) output_t = torch.empty((self.world_size + 1), dtype=input_t.dtype).cuda( local_device_id ) # fails the check because output_t is not correctly sized reduce_scatter_base(output_t, input_t) # anticipate an error with self.assertRaisesRegex( TypeError, "input tensor must be the same type as the output tensor." ): tensor = torch.tensor([self.rank], dtype=torch.float).cuda(local_device_id) output_t = torch.empty((self.world_size + 1), dtype=torch.long).cuda( local_device_id ) # fails the check because the dtype is different reduce_scatter_base(output_t, tensor) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_reduce_scatter_ops(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] num_gpus = len(local_device_ids) def reduce_scatter(outputs, input_lists, op): opts = c10d.ReduceScatterOptions() opts.reduceOp = op work = pg.reduce_scatter(outputs, input_lists, opts) work.wait() output = [torch.tensor([0]).cuda(i) for i in local_device_ids] # GPU/rank # 0 [1], [2], [3], [4] # 1 [2], [3], [4], [5] # 2 [3], [4], [5], [6] # 3 [4], [5], [6], [7] # Sum tensor_lists = [] input_per_gpu = [] for i in range(self.world_size): input_per_gpu.append(torch.tensor([self.rank + i + 1])) for gpu in local_device_ids: tensor_lists.append([t.cuda(device=gpu) for t in input_per_gpu]) reduce_scatter(output, tensor_lists, c10d.ReduceOp.SUM) for i in range(num_gpus): expected = torch.tensor( [ (1 + self.world_size) * self.world_size // 2 + self.world_size * self.rank ] ) self.assertEqual(expected, output[i]) # Min reduce_scatter(output, tensor_lists, c10d.ReduceOp.MIN) for i in range(num_gpus): expected = torch.tensor([self.rank + 1 + i]) self.assertEqual(expected, output[i]) # Max reduce_scatter(output, tensor_lists, c10d.ReduceOp.MAX) for i in range(num_gpus): expected = torch.tensor([self.rank + self.world_size + i]) self.assertEqual(expected, output[i]) # Product reduce_scatter(output, tensor_lists, c10d.ReduceOp.PRODUCT) # math package don't have math.perm until python 3.8, so # we implement a naive version here. def perm(n, k): prod_val = n for val in range(n - k + 1, n): prod_val *= val return prod_val for i in range(num_gpus): prod_val = perm(self.rank + self.world_size, self.world_size) expected = torch.tensor([prod_val]) self.assertEqual(expected, output[i]) # Test the input params overridden scenarios, aka, when the input is # a list and output is just one tensor. # Sum output_tensor = torch.empty_like(input_per_gpu[0][0]).cuda(self.rank) input_list = [tensor[0].cuda(self.rank) for tensor in input_per_gpu] pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.SUM).wait() expected = torch.tensor( (1 + self.world_size) * self.world_size // 2 + self.world_size * self.rank ) self.assertEqual(expected, output_tensor) # Min pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.MIN).wait() expected = torch.tensor(self.rank + 1) self.assertEqual(expected, output_tensor) # Max pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.MAX).wait() expected = torch.tensor(self.rank + self.world_size) self.assertEqual(expected, output_tensor) # Product pg.reduce_scatter(output_tensor, input_list, c10d.ReduceOp.PRODUCT).wait() prod_val = self.rank + 1 for k in range(1, self.world_size): prod_val = prod_val * (self.rank + 1 + k) expected = torch.tensor(prod_val) self.assertEqual(expected, output_tensor) if torch.cuda.nccl.version() >= (2, 11, 1): for factor in (3.0, torch.tensor([5.0], device=self.rank)): if isinstance(factor, torch.Tensor): factor_ref = factor.cpu().item() else: factor_ref = factor output = [t.float() for t in output] tensor_lists = [[t.float() for t in tl] for tl in tensor_lists] output_ref = [t.float() for t in output] tensor_lists_ref = [ [t.float() * factor_ref for t in tl] for tl in tensor_lists ] reduce_scatter(output, tensor_lists, c10d._make_nccl_premul_sum(factor)) reduce_scatter(output_ref, tensor_lists_ref, c10d.ReduceOp.SUM) self.assertEqual(output_ref, output) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_reduce_scatter_base_ops(self): pg = self.pg local_device_id = self.rank_to_GPU[self.rank][0] def reduce_scatter_base(output_t, input_t): work = pg._reduce_scatter_base(output_t, input_t) work.wait() # reduce_scatter_base is GPU number agnostic. # Each rank contribute one tensor regardless of GPU counts output_t = torch.empty([1]).cuda(local_device_id) tensor = torch.arange(self.world_size, dtype=output_t.dtype).cuda( local_device_id ) reduce_scatter_base(output_t, tensor) # Verification self.assertEqual(output_t[0], self.rank * self.world_size) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_barrier(self): pg = self.pg local_device_ids = self.rank_to_GPU[self.rank] def allreduce(tensors): opts = c10d.AllreduceOptions() work = pg.allreduce(tensors, opts) return work # Making the collective to operate on # 1, 2, 3, 4, .... len(local_device_ids) GPUs tensors_list = [[] for _ in range(len(local_device_ids))] for i in range(1, len(local_device_ids) + 1): for j in range(i): tensors_list[i - 1].append( torch.tensor([j + 1]).cuda(local_device_ids[j]) ) works = [] for tensors in tensors_list: work = allreduce(tensors) works.append(work) # Barrier will ensure that all previous work is completed pg.barrier().wait() for i in range(1, len(local_device_ids) + 1): for j in range(i): self.assertEqual( torch.tensor([(j + 1) * self.world_size]), tensors_list[i - 1][j] ) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_send_recv(self): pg = self.pg device = self.rank_to_GPU[self.rank][0] # Generate the same random tensor torch.manual_seed(0) send_tensor = torch.rand(10, 10, device=device) if self.rank == 0: dist.send(send_tensor, 1) if self.rank == 1: recv_tensor = torch.rand(10, 10, device=device) dist.recv(recv_tensor, 0) self.assertEqual(send_tensor, recv_tensor) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_send_recv_complex(self): pg = self.pg device = self.rank_to_GPU[self.rank][0] # Generate the same random tensor torch.manual_seed(0) send_tensor = torch.rand(10, 10, dtype=torch.cfloat, device=device) if self.rank == 0: dist.send(send_tensor, 1) if self.rank == 1: recv_tensor = torch.rand(10, 10, dtype=torch.cfloat, device=device) dist.recv(recv_tensor, 0) self.assertEqual(send_tensor, recv_tensor) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_send_recv_object_list(self): device = self.rank_to_GPU[self.rank][0] val = 99 if self.rank == 0 else None object_list = [val] * self.world_size if self.rank == 0: dist.send_object_list(object_list, 1, device=device) if self.rank == 1: dist.recv_object_list(object_list, 0, device=device) self.assertEqual(object_list[0], 99) @requires_nccl() @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs") def test_tensor_register_hook(self): os.environ["TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"] = "1" pg = self.pg local_device_id = self.rank_to_GPU[self.rank][0] def allgather_base(output_t, input_t): work = pg._allgather_base(output_t, input_t) work.wait() # allgather_base is GPU number agnostic. # Each rank contribute one tensor regardless of GPU counts tensor = torch.tensor([self.rank]).cuda(local_device_id) output_t = torch.empty((self.world_size), dtype=tensor.dtype).cuda( local_device_id ) allgather_base(output_t, tensor) # Verification self.assertEqual(torch.arange(self.world_size), output_t) # Unset env del os.environ["TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"] if __name__ == "__main__": rank = int(os.getenv("RANK", -1)) world_size = int(os.getenv("WORLD_SIZE", 2)) if rank != -1: # Launched with torchrun or other multi-proc launchers. Directly run the test. ProcessGroupNCCLOpTest.run_rank(rank, world_size) else: # Launched as a single process. Spawn subprocess to run the tests. # Also need a rendezvous file for `init_process_group` purpose. rdvz_file = tempfile.NamedTemporaryFile(delete=False).name torch.multiprocessing.spawn( ProcessGroupNCCLOpTest.run_rank, nprocs=world_size, args=(world_size, rdvz_file), )