• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Threading.Tasks;
22 
23 namespace Grpc.Core.Utils
24 {
25     /// <summary>
26     /// Extension methods that simplify work with gRPC streaming calls.
27     /// </summary>
28     public static class AsyncStreamExtensions
29     {
30         /// <summary>
31         /// Reads the entire stream and executes an async action for each element.
32         /// </summary>
33         public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
34             where T : class
35         {
36             while (await streamReader.MoveNext().ConfigureAwait(false))
37             {
38                 await asyncAction(streamReader.Current).ConfigureAwait(false);
39             }
40         }
41 
42         /// <summary>
43         /// Reads the entire stream and creates a list containing all the elements read.
44         /// </summary>
45         public static async Task<List<T>> ToListAsync<T>(this IAsyncStreamReader<T> streamReader)
46             where T : class
47         {
48             var result = new List<T>();
49             while (await streamReader.MoveNext().ConfigureAwait(false))
50             {
51                 result.Add(streamReader.Current);
52             }
53             return result;
54         }
55 
56         /// <summary>
57         /// Writes all elements from given enumerable to the stream.
58         /// Completes the stream afterwards unless close = false.
59         /// </summary>
60         public static async Task WriteAllAsync<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
61             where T : class
62         {
63             foreach (var element in elements)
64             {
65                 await streamWriter.WriteAsync(element).ConfigureAwait(false);
66             }
67             if (complete)
68             {
69                 await streamWriter.CompleteAsync().ConfigureAwait(false);
70             }
71         }
72 
73         /// <summary>
74         /// Writes all elements from given enumerable to the stream.
75         /// </summary>
76         public static async Task WriteAllAsync<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements)
77             where T : class
78         {
79             foreach (var element in elements)
80             {
81                 await streamWriter.WriteAsync(element).ConfigureAwait(false);
82             }
83         }
84     }
85 }
86