1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Threading.Tasks; 22 23 namespace Grpc.Core.Utils 24 { 25 /// <summary> 26 /// Extension methods that simplify work with gRPC streaming calls. 27 /// </summary> 28 public static class AsyncStreamExtensions 29 { 30 /// <summary> 31 /// Reads the entire stream and executes an async action for each element. 32 /// </summary> 33 public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction) 34 where T : class 35 { 36 while (await streamReader.MoveNext().ConfigureAwait(false)) 37 { 38 await asyncAction(streamReader.Current).ConfigureAwait(false); 39 } 40 } 41 42 /// <summary> 43 /// Reads the entire stream and creates a list containing all the elements read. 44 /// </summary> 45 public static async Task<List<T>> ToListAsync<T>(this IAsyncStreamReader<T> streamReader) 46 where T : class 47 { 48 var result = new List<T>(); 49 while (await streamReader.MoveNext().ConfigureAwait(false)) 50 { 51 result.Add(streamReader.Current); 52 } 53 return result; 54 } 55 56 /// <summary> 57 /// Writes all elements from given enumerable to the stream. 58 /// Completes the stream afterwards unless close = false. 59 /// </summary> 60 public static async Task WriteAllAsync<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true) 61 where T : class 62 { 63 foreach (var element in elements) 64 { 65 await streamWriter.WriteAsync(element).ConfigureAwait(false); 66 } 67 if (complete) 68 { 69 await streamWriter.CompleteAsync().ConfigureAwait(false); 70 } 71 } 72 73 /// <summary> 74 /// Writes all elements from given enumerable to the stream. 75 /// </summary> 76 public static async Task WriteAllAsync<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements) 77 where T : class 78 { 79 foreach (var element in elements) 80 { 81 await streamWriter.WriteAsync(element).ConfigureAwait(false); 82 } 83 } 84 } 85 } 86