• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * FlowGraph.h
19  *
20  * Processing node and ports that can be used in a simple data flow graph.
21  * This was designed to work with audio but could be used for other
22  * types of data.
23  */
24 
25 #ifndef FLOWGRAPH_FLOW_GRAPH_NODE_H
26 #define FLOWGRAPH_FLOW_GRAPH_NODE_H
27 
28 #include <cassert>
29 #include <cstring>
30 #include <math.h>
31 #include <memory>
32 #include <sys/types.h>
33 #include <time.h>
34 #include <unistd.h>
35 #include <vector>
36 
37 // TODO Move these classes into separate files.
38 // TODO Review use of raw pointers for connect(). Maybe use smart pointers but need to avoid
39 //      run-time deallocation in audio thread.
40 
41 // Set this to 1 if using it inside the Android framework.
42 // This code is kept here so that it can be moved easily between Oboe and AAudio.
43 #ifndef FLOWGRAPH_ANDROID_INTERNAL
44 #define FLOWGRAPH_ANDROID_INTERNAL 0
45 #endif
46 
47 // Set this to a name that will prevent AAudio from calling into Oboe.
48 // AAudio and Oboe both use a version of this flowgraph package.
49 // There was a problem in the unit tests where AAudio would call a constructor
50 // in AAudio and then call a destructor in Oboe! That caused memory corruption.
51 // For more details, see Issue #930.
52 #ifndef FLOWGRAPH_OUTER_NAMESPACE
53 #define FLOWGRAPH_OUTER_NAMESPACE oboe
54 #endif
55 
56 namespace FLOWGRAPH_OUTER_NAMESPACE {
57 namespace flowgraph {
58 
59 // Default block size that can be overridden when the FlowGraphPortFloat is created.
60 // If it is too small then we will have too much overhead from switching between nodes.
61 // If it is too high then we will thrash the caches.
62 constexpr int kDefaultBufferSize = 8; // arbitrary
63 
64 class FlowGraphPort;
65 class FlowGraphPortFloatInput;
66 
67 /***************************************************************************/
68 /**
69  * Base class for all nodes in the flowgraph.
70  */
71 class FlowGraphNode {
72 public:
FlowGraphNode()73     FlowGraphNode() {}
74     virtual ~FlowGraphNode() = default;
75 
76     /**
77      * Read from the input ports,
78      * generate multiple frames of data then write the results to the output ports.
79      *
80      * @param numFrames maximum number of frames requested for processing
81      * @return number of frames actually processed
82      */
83     virtual int32_t onProcess(int32_t numFrames) = 0;
84 
85     /**
86      * If the callCount is at or after the previous callCount then call
87      * pullData on all of the upstreamNodes.
88      * Then call onProcess().
89      * This prevents infinite recursion in case of cyclic graphs.
90      * It also prevents nodes upstream from a branch from being executed twice.
91      *
92      * @param callCount
93      * @param numFrames
94      * @return number of frames valid
95      */
96     int32_t pullData(int32_t numFrames, int64_t callCount);
97 
98     /**
99      * Recursively reset all the nodes in the graph, starting from a Sink.
100      *
101      * This must not be called at the same time as pullData!
102      */
103     void pullReset();
104 
105     /**
106      * Reset framePosition counters.
107      */
108     virtual void reset();
109 
addInputPort(FlowGraphPort & port)110     void addInputPort(FlowGraphPort &port) {
111         mInputPorts.push_back(port);
112     }
113 
isDataPulledAutomatically()114     bool isDataPulledAutomatically() const {
115         return mDataPulledAutomatically;
116     }
117 
118     /**
119      * Set true if you want the data pulled through the graph automatically.
120      * This is the default.
121      *
122      * Set false if you want to pull the data from the input ports in the onProcess() method.
123      * You might do this, for example, in a sample rate converting node.
124      *
125      * @param automatic
126      */
setDataPulledAutomatically(bool automatic)127     void setDataPulledAutomatically(bool automatic) {
128         mDataPulledAutomatically = automatic;
129     }
130 
getName()131     virtual const char *getName() {
132         return "FlowGraph";
133     }
134 
getLastCallCount()135     int64_t getLastCallCount() {
136         return mLastCallCount;
137     }
138 
139 protected:
140 
141     static constexpr int64_t  kInitialCallCount = -1;
142     int64_t  mLastCallCount = kInitialCallCount;
143 
144     std::vector<std::reference_wrapper<FlowGraphPort>> mInputPorts;
145 
146 private:
147     bool     mDataPulledAutomatically = true;
148     bool     mBlockRecursion = false;
149     int32_t  mLastFrameCount = 0;
150 
151 };
152 
153 /***************************************************************************/
154 /**
155   * This is a connector that allows data to flow between modules.
156   *
157   * The ports are the primary means of interacting with a module.
158   * So they are generally declared as public.
159   *
160   */
161 class FlowGraphPort {
162 public:
FlowGraphPort(FlowGraphNode & parent,int32_t samplesPerFrame)163     FlowGraphPort(FlowGraphNode &parent, int32_t samplesPerFrame)
164             : mContainingNode(parent)
165             , mSamplesPerFrame(samplesPerFrame) {
166     }
167 
168     // Ports are often declared public. So let's make them non-copyable.
169     FlowGraphPort(const FlowGraphPort&) = delete;
170     FlowGraphPort& operator=(const FlowGraphPort&) = delete;
171 
getSamplesPerFrame()172     int32_t getSamplesPerFrame() const {
173         return mSamplesPerFrame;
174     }
175 
176     virtual int32_t pullData(int64_t framePosition, int32_t numFrames) = 0;
177 
pullReset()178     virtual void pullReset() {}
179 
180 protected:
181     FlowGraphNode &mContainingNode;
182 
183 private:
184     const int32_t    mSamplesPerFrame = 1;
185 };
186 
187 /***************************************************************************/
188 /**
189  * This port contains a 32-bit float buffer that can contain several frames of data.
190  * Processing the data in a block improves performance.
191  *
192  * The size is framesPerBuffer * samplesPerFrame).
193  */
194 class FlowGraphPortFloat  : public FlowGraphPort {
195 public:
196     FlowGraphPortFloat(FlowGraphNode &parent,
197                    int32_t samplesPerFrame,
198                    int32_t framesPerBuffer = kDefaultBufferSize
199                 );
200 
201     virtual ~FlowGraphPortFloat() = default;
202 
getFramesPerBuffer()203     int32_t getFramesPerBuffer() const {
204         return mFramesPerBuffer;
205     }
206 
207 protected:
208 
209     /**
210      * @return buffer internal to the port or from a connected port
211      */
getBuffer()212     virtual float *getBuffer() {
213         return mBuffer.get();
214     }
215 
216 private:
217     const int32_t    mFramesPerBuffer = 1;
218     std::unique_ptr<float[]> mBuffer; // allocated in constructor
219 };
220 
221 /***************************************************************************/
222 /**
223   * The results of a node's processing are stored in the buffers of the output ports.
224   */
225 class FlowGraphPortFloatOutput : public FlowGraphPortFloat {
226 public:
FlowGraphPortFloatOutput(FlowGraphNode & parent,int32_t samplesPerFrame)227     FlowGraphPortFloatOutput(FlowGraphNode &parent, int32_t samplesPerFrame)
228             : FlowGraphPortFloat(parent, samplesPerFrame) {
229     }
230 
231     virtual ~FlowGraphPortFloatOutput() = default;
232 
233     using FlowGraphPortFloat::getBuffer;
234 
235     /**
236      * Connect to the input of another module.
237      * An input port can only have one connection.
238      * An output port can have multiple connections.
239      * If you connect a second output port to an input port
240      * then it overwrites the previous connection.
241      *
242      * This not thread safe. Do not modify the graph topology from another thread while running.
243      * Also do not delete a module while it is connected to another port if the graph is running.
244      */
245     void connect(FlowGraphPortFloatInput *port);
246 
247     /**
248      * Disconnect from the input of another module.
249      * This not thread safe.
250      */
251     void disconnect(FlowGraphPortFloatInput *port);
252 
253     /**
254      * Call the parent module's onProcess() method.
255      * That may pull data from its inputs and recursively
256      * process the entire graph.
257      * @return number of frames actually pulled
258      */
259     int32_t pullData(int64_t framePosition, int32_t numFrames) override;
260 
261 
262     void pullReset() override;
263 
264 };
265 
266 /***************************************************************************/
267 
268 /**
269  * An input port for streaming audio data.
270  * You can set a value that will be used for processing.
271  * If you connect an output port to this port then its value will be used instead.
272  */
273 class FlowGraphPortFloatInput : public FlowGraphPortFloat {
274 public:
FlowGraphPortFloatInput(FlowGraphNode & parent,int32_t samplesPerFrame)275     FlowGraphPortFloatInput(FlowGraphNode &parent, int32_t samplesPerFrame)
276             : FlowGraphPortFloat(parent, samplesPerFrame) {
277         // Add to parent so it can pull data from each input.
278         parent.addInputPort(*this);
279     }
280 
281     virtual ~FlowGraphPortFloatInput() = default;
282 
283     /**
284      * If connected to an output port then this will return
285      * that output ports buffers.
286      * If not connected then it returns the input ports own buffer
287      * which can be loaded using setValue().
288      */
289     float *getBuffer() override;
290 
291     /**
292      * Write every value of the float buffer.
293      * This value will be ignored if an output port is connected
294      * to this port.
295      */
setValue(float value)296     void setValue(float value) {
297         int numFloats = kDefaultBufferSize * getSamplesPerFrame();
298         float *buffer = getBuffer();
299         for (int i = 0; i < numFloats; i++) {
300             *buffer++ = value;
301         }
302     }
303 
304     /**
305      * Connect to the output of another module.
306      * An input port can only have one connection.
307      * An output port can have multiple connections.
308      * This not thread safe.
309      */
connect(FlowGraphPortFloatOutput * port)310     void connect(FlowGraphPortFloatOutput *port) {
311         assert(getSamplesPerFrame() == port->getSamplesPerFrame());
312         mConnected = port;
313     }
314 
disconnect(FlowGraphPortFloatOutput * port)315     void disconnect(FlowGraphPortFloatOutput *port) {
316         assert(mConnected == port);
317         (void) port;
318         mConnected = nullptr;
319     }
320 
disconnect()321     void disconnect() {
322         mConnected = nullptr;
323     }
324 
325     /**
326      * Pull data from any output port that is connected.
327      */
328     int32_t pullData(int64_t framePosition, int32_t numFrames) override;
329 
330     void pullReset() override;
331 
332 private:
333     FlowGraphPortFloatOutput *mConnected = nullptr;
334 };
335 
336 /***************************************************************************/
337 
338 /**
339  * Base class for an edge node in a graph that has no upstream nodes.
340  * It outputs data but does not consume data.
341  * By default, it will read its data from an external buffer.
342  */
343 class FlowGraphSource : public FlowGraphNode {
344 public:
FlowGraphSource(int32_t channelCount)345     explicit FlowGraphSource(int32_t channelCount)
346             : output(*this, channelCount) {
347     }
348 
349     virtual ~FlowGraphSource() = default;
350 
351     FlowGraphPortFloatOutput output;
352 };
353 
354 /***************************************************************************/
355 
356 /**
357  * Base class for an edge node in a graph that has no upstream nodes.
358  * It outputs data but does not consume data.
359  * By default, it will read its data from an external buffer.
360  */
361 class FlowGraphSourceBuffered : public FlowGraphSource {
362 public:
FlowGraphSourceBuffered(int32_t channelCount)363     explicit FlowGraphSourceBuffered(int32_t channelCount)
364             : FlowGraphSource(channelCount) {}
365 
366     virtual ~FlowGraphSourceBuffered() = default;
367 
368     /**
369      * Specify buffer that the node will read from.
370      *
371      * @param data TODO Consider using std::shared_ptr.
372      * @param numFrames
373      */
setData(const void * data,int32_t numFrames)374     void setData(const void *data, int32_t numFrames) {
375         mData = data;
376         mSizeInFrames = numFrames;
377         mFrameIndex = 0;
378     }
379 
380 protected:
381     const void *mData = nullptr;
382     int32_t     mSizeInFrames = 0; // number of frames in mData
383     int32_t     mFrameIndex = 0; // index of next frame to be processed
384 };
385 
386 /***************************************************************************/
387 /**
388  * Base class for an edge node in a graph that has no downstream nodes.
389  * It consumes data but does not output data.
390  * This graph will be executed when data is read() from this node
391  * by pulling data from upstream nodes.
392  */
393 class FlowGraphSink : public FlowGraphNode {
394 public:
FlowGraphSink(int32_t channelCount)395     explicit FlowGraphSink(int32_t channelCount)
396             : input(*this, channelCount) {
397     }
398 
399     virtual ~FlowGraphSink() = default;
400 
401     FlowGraphPortFloatInput input;
402 
403     /**
404      * Dummy processor. The work happens in the read() method.
405      *
406      * @param numFrames
407      * @return number of frames actually processed
408      */
onProcess(int32_t numFrames)409     int32_t onProcess(int32_t numFrames) override {
410         return numFrames;
411     }
412 
413     virtual int32_t read(void *data, int32_t numFrames) = 0;
414 
415 protected:
416     /**
417      * Pull data through the graph using this nodes last callCount.
418      * @param numFrames
419      * @return
420      */
421     int32_t pullData(int32_t numFrames);
422 };
423 
424 /***************************************************************************/
425 /**
426  * Base class for a node that has an input and an output with the same number of channels.
427  * This may include traditional filters, eg. FIR, but also include
428  * any processing node that converts input to output.
429  */
430 class FlowGraphFilter : public FlowGraphNode {
431 public:
FlowGraphFilter(int32_t channelCount)432     explicit FlowGraphFilter(int32_t channelCount)
433             : input(*this, channelCount)
434             , output(*this, channelCount) {
435     }
436 
437     virtual ~FlowGraphFilter() = default;
438 
439     FlowGraphPortFloatInput input;
440     FlowGraphPortFloatOutput output;
441 };
442 
443 } /* namespace flowgraph */
444 } /* namespace FLOWGRAPH_OUTER_NAMESPACE */
445 
446 #endif /* FLOWGRAPH_FLOW_GRAPH_NODE_H */
447