1 /* 2 * Copyright 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 /* 18 * FlowGraph.h 19 * 20 * Processing node and ports that can be used in a simple data flow graph. 21 * This was designed to work with audio but could be used for other 22 * types of data. 23 */ 24 25 #ifndef FLOWGRAPH_FLOW_GRAPH_NODE_H 26 #define FLOWGRAPH_FLOW_GRAPH_NODE_H 27 28 #include <cassert> 29 #include <cstring> 30 #include <math.h> 31 #include <memory> 32 #include <sys/types.h> 33 #include <time.h> 34 #include <unistd.h> 35 #include <vector> 36 37 // TODO Move these classes into separate files. 38 // TODO Review use of raw pointers for connect(). Maybe use smart pointers but need to avoid 39 // run-time deallocation in audio thread. 40 41 // Set this to 1 if using it inside the Android framework. 42 // This code is kept here so that it can be moved easily between Oboe and AAudio. 43 #ifndef FLOWGRAPH_ANDROID_INTERNAL 44 #define FLOWGRAPH_ANDROID_INTERNAL 0 45 #endif 46 47 // Set this to a name that will prevent AAudio from calling into Oboe. 48 // AAudio and Oboe both use a version of this flowgraph package. 49 // There was a problem in the unit tests where AAudio would call a constructor 50 // in AAudio and then call a destructor in Oboe! That caused memory corruption. 51 // For more details, see Issue #930. 52 #ifndef FLOWGRAPH_OUTER_NAMESPACE 53 #define FLOWGRAPH_OUTER_NAMESPACE oboe 54 #endif 55 56 namespace FLOWGRAPH_OUTER_NAMESPACE { 57 namespace flowgraph { 58 59 // Default block size that can be overridden when the FlowGraphPortFloat is created. 60 // If it is too small then we will have too much overhead from switching between nodes. 61 // If it is too high then we will thrash the caches. 62 constexpr int kDefaultBufferSize = 8; // arbitrary 63 64 class FlowGraphPort; 65 class FlowGraphPortFloatInput; 66 67 /***************************************************************************/ 68 /** 69 * Base class for all nodes in the flowgraph. 70 */ 71 class FlowGraphNode { 72 public: FlowGraphNode()73 FlowGraphNode() {} 74 virtual ~FlowGraphNode() = default; 75 76 /** 77 * Read from the input ports, 78 * generate multiple frames of data then write the results to the output ports. 79 * 80 * @param numFrames maximum number of frames requested for processing 81 * @return number of frames actually processed 82 */ 83 virtual int32_t onProcess(int32_t numFrames) = 0; 84 85 /** 86 * If the callCount is at or after the previous callCount then call 87 * pullData on all of the upstreamNodes. 88 * Then call onProcess(). 89 * This prevents infinite recursion in case of cyclic graphs. 90 * It also prevents nodes upstream from a branch from being executed twice. 91 * 92 * @param callCount 93 * @param numFrames 94 * @return number of frames valid 95 */ 96 int32_t pullData(int32_t numFrames, int64_t callCount); 97 98 /** 99 * Recursively reset all the nodes in the graph, starting from a Sink. 100 * 101 * This must not be called at the same time as pullData! 102 */ 103 void pullReset(); 104 105 /** 106 * Reset framePosition counters. 107 */ 108 virtual void reset(); 109 addInputPort(FlowGraphPort & port)110 void addInputPort(FlowGraphPort &port) { 111 mInputPorts.push_back(port); 112 } 113 isDataPulledAutomatically()114 bool isDataPulledAutomatically() const { 115 return mDataPulledAutomatically; 116 } 117 118 /** 119 * Set true if you want the data pulled through the graph automatically. 120 * This is the default. 121 * 122 * Set false if you want to pull the data from the input ports in the onProcess() method. 123 * You might do this, for example, in a sample rate converting node. 124 * 125 * @param automatic 126 */ setDataPulledAutomatically(bool automatic)127 void setDataPulledAutomatically(bool automatic) { 128 mDataPulledAutomatically = automatic; 129 } 130 getName()131 virtual const char *getName() { 132 return "FlowGraph"; 133 } 134 getLastCallCount()135 int64_t getLastCallCount() { 136 return mLastCallCount; 137 } 138 139 protected: 140 141 static constexpr int64_t kInitialCallCount = -1; 142 int64_t mLastCallCount = kInitialCallCount; 143 144 std::vector<std::reference_wrapper<FlowGraphPort>> mInputPorts; 145 146 private: 147 bool mDataPulledAutomatically = true; 148 bool mBlockRecursion = false; 149 int32_t mLastFrameCount = 0; 150 151 }; 152 153 /***************************************************************************/ 154 /** 155 * This is a connector that allows data to flow between modules. 156 * 157 * The ports are the primary means of interacting with a module. 158 * So they are generally declared as public. 159 * 160 */ 161 class FlowGraphPort { 162 public: FlowGraphPort(FlowGraphNode & parent,int32_t samplesPerFrame)163 FlowGraphPort(FlowGraphNode &parent, int32_t samplesPerFrame) 164 : mContainingNode(parent) 165 , mSamplesPerFrame(samplesPerFrame) { 166 } 167 168 virtual ~FlowGraphPort() = default; 169 170 // Ports are often declared public. So let's make them non-copyable. 171 FlowGraphPort(const FlowGraphPort&) = delete; 172 FlowGraphPort& operator=(const FlowGraphPort&) = delete; 173 getSamplesPerFrame()174 int32_t getSamplesPerFrame() const { 175 return mSamplesPerFrame; 176 } 177 178 virtual int32_t pullData(int64_t framePosition, int32_t numFrames) = 0; 179 pullReset()180 virtual void pullReset() {} 181 182 protected: 183 FlowGraphNode &mContainingNode; 184 185 private: 186 const int32_t mSamplesPerFrame = 1; 187 }; 188 189 /***************************************************************************/ 190 /** 191 * This port contains a 32-bit float buffer that can contain several frames of data. 192 * Processing the data in a block improves performance. 193 * 194 * The size is framesPerBuffer * samplesPerFrame). 195 */ 196 class FlowGraphPortFloat : public FlowGraphPort { 197 public: 198 FlowGraphPortFloat(FlowGraphNode &parent, 199 int32_t samplesPerFrame, 200 int32_t framesPerBuffer = kDefaultBufferSize 201 ); 202 203 virtual ~FlowGraphPortFloat() = default; 204 getFramesPerBuffer()205 int32_t getFramesPerBuffer() const { 206 return mFramesPerBuffer; 207 } 208 209 protected: 210 211 /** 212 * @return buffer internal to the port or from a connected port 213 */ getBuffer()214 virtual float *getBuffer() { 215 return mBuffer.get(); 216 } 217 218 private: 219 const int32_t mFramesPerBuffer = 1; 220 std::unique_ptr<float[]> mBuffer; // allocated in constructor 221 }; 222 223 /***************************************************************************/ 224 /** 225 * The results of a node's processing are stored in the buffers of the output ports. 226 */ 227 class FlowGraphPortFloatOutput : public FlowGraphPortFloat { 228 public: FlowGraphPortFloatOutput(FlowGraphNode & parent,int32_t samplesPerFrame)229 FlowGraphPortFloatOutput(FlowGraphNode &parent, int32_t samplesPerFrame) 230 : FlowGraphPortFloat(parent, samplesPerFrame) { 231 } 232 233 virtual ~FlowGraphPortFloatOutput() = default; 234 235 using FlowGraphPortFloat::getBuffer; 236 237 /** 238 * Connect to the input of another module. 239 * An input port can only have one connection. 240 * An output port can have multiple connections. 241 * If you connect a second output port to an input port 242 * then it overwrites the previous connection. 243 * 244 * This not thread safe. Do not modify the graph topology from another thread while running. 245 * Also do not delete a module while it is connected to another port if the graph is running. 246 */ 247 void connect(FlowGraphPortFloatInput *port); 248 249 /** 250 * Disconnect from the input of another module. 251 * This not thread safe. 252 */ 253 void disconnect(FlowGraphPortFloatInput *port); 254 255 /** 256 * Call the parent module's onProcess() method. 257 * That may pull data from its inputs and recursively 258 * process the entire graph. 259 * @return number of frames actually pulled 260 */ 261 int32_t pullData(int64_t framePosition, int32_t numFrames) override; 262 263 264 void pullReset() override; 265 266 }; 267 268 /***************************************************************************/ 269 270 /** 271 * An input port for streaming audio data. 272 * You can set a value that will be used for processing. 273 * If you connect an output port to this port then its value will be used instead. 274 */ 275 class FlowGraphPortFloatInput : public FlowGraphPortFloat { 276 public: FlowGraphPortFloatInput(FlowGraphNode & parent,int32_t samplesPerFrame)277 FlowGraphPortFloatInput(FlowGraphNode &parent, int32_t samplesPerFrame) 278 : FlowGraphPortFloat(parent, samplesPerFrame) { 279 // Add to parent so it can pull data from each input. 280 parent.addInputPort(*this); 281 } 282 283 virtual ~FlowGraphPortFloatInput() = default; 284 285 /** 286 * If connected to an output port then this will return 287 * that output ports buffers. 288 * If not connected then it returns the input ports own buffer 289 * which can be loaded using setValue(). 290 */ 291 float *getBuffer() override; 292 293 /** 294 * Write every value of the float buffer. 295 * This value will be ignored if an output port is connected 296 * to this port. 297 */ setValue(float value)298 void setValue(float value) { 299 int numFloats = kDefaultBufferSize * getSamplesPerFrame(); 300 float *buffer = getBuffer(); 301 for (int i = 0; i < numFloats; i++) { 302 *buffer++ = value; 303 } 304 } 305 306 /** 307 * Connect to the output of another module. 308 * An input port can only have one connection. 309 * An output port can have multiple connections. 310 * This not thread safe. 311 */ connect(FlowGraphPortFloatOutput * port)312 void connect(FlowGraphPortFloatOutput *port) { 313 assert(getSamplesPerFrame() == port->getSamplesPerFrame()); 314 mConnected = port; 315 } 316 disconnect(FlowGraphPortFloatOutput * port)317 void disconnect(FlowGraphPortFloatOutput *port) { 318 assert(mConnected == port); 319 (void) port; 320 mConnected = nullptr; 321 } 322 disconnect()323 void disconnect() { 324 mConnected = nullptr; 325 } 326 327 /** 328 * Pull data from any output port that is connected. 329 */ 330 int32_t pullData(int64_t framePosition, int32_t numFrames) override; 331 332 void pullReset() override; 333 334 private: 335 FlowGraphPortFloatOutput *mConnected = nullptr; 336 }; 337 338 /***************************************************************************/ 339 340 /** 341 * Base class for an edge node in a graph that has no upstream nodes. 342 * It outputs data but does not consume data. 343 * By default, it will read its data from an external buffer. 344 */ 345 class FlowGraphSource : public FlowGraphNode { 346 public: FlowGraphSource(int32_t channelCount)347 explicit FlowGraphSource(int32_t channelCount) 348 : output(*this, channelCount) { 349 } 350 351 virtual ~FlowGraphSource() = default; 352 353 FlowGraphPortFloatOutput output; 354 }; 355 356 /***************************************************************************/ 357 358 /** 359 * Base class for an edge node in a graph that has no upstream nodes. 360 * It outputs data but does not consume data. 361 * By default, it will read its data from an external buffer. 362 */ 363 class FlowGraphSourceBuffered : public FlowGraphSource { 364 public: FlowGraphSourceBuffered(int32_t channelCount)365 explicit FlowGraphSourceBuffered(int32_t channelCount) 366 : FlowGraphSource(channelCount) {} 367 368 virtual ~FlowGraphSourceBuffered() = default; 369 370 /** 371 * Specify buffer that the node will read from. 372 * 373 * @param data TODO Consider using std::shared_ptr. 374 * @param numFrames 375 */ setData(const void * data,int32_t numFrames)376 void setData(const void *data, int32_t numFrames) { 377 mData = data; 378 mSizeInFrames = numFrames; 379 mFrameIndex = 0; 380 } 381 382 protected: 383 const void *mData = nullptr; 384 int32_t mSizeInFrames = 0; // number of frames in mData 385 int32_t mFrameIndex = 0; // index of next frame to be processed 386 }; 387 388 /***************************************************************************/ 389 /** 390 * Base class for an edge node in a graph that has no downstream nodes. 391 * It consumes data but does not output data. 392 * This graph will be executed when data is read() from this node 393 * by pulling data from upstream nodes. 394 */ 395 class FlowGraphSink : public FlowGraphNode { 396 public: FlowGraphSink(int32_t channelCount)397 explicit FlowGraphSink(int32_t channelCount) 398 : input(*this, channelCount) { 399 } 400 401 virtual ~FlowGraphSink() = default; 402 403 FlowGraphPortFloatInput input; 404 405 /** 406 * Dummy processor. The work happens in the read() method. 407 * 408 * @param numFrames 409 * @return number of frames actually processed 410 */ onProcess(int32_t numFrames)411 int32_t onProcess(int32_t numFrames) override { 412 return numFrames; 413 } 414 415 virtual int32_t read(void *data, int32_t numFrames) = 0; 416 417 protected: 418 /** 419 * Pull data through the graph using this nodes last callCount. 420 * @param numFrames 421 * @return 422 */ 423 int32_t pullData(int32_t numFrames); 424 }; 425 426 /***************************************************************************/ 427 /** 428 * Base class for a node that has an input and an output with the same number of channels. 429 * This may include traditional filters, eg. FIR, but also include 430 * any processing node that converts input to output. 431 */ 432 class FlowGraphFilter : public FlowGraphNode { 433 public: FlowGraphFilter(int32_t channelCount)434 explicit FlowGraphFilter(int32_t channelCount) 435 : input(*this, channelCount) 436 , output(*this, channelCount) { 437 } 438 439 virtual ~FlowGraphFilter() = default; 440 441 FlowGraphPortFloatInput input; 442 FlowGraphPortFloatOutput output; 443 }; 444 445 } /* namespace flowgraph */ 446 } /* namespace FLOWGRAPH_OUTER_NAMESPACE */ 447 448 #endif /* FLOWGRAPH_FLOW_GRAPH_NODE_H */ 449