1 /* 2 * Copyright 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 /* 18 * FlowGraph.h 19 * 20 * Processing node and ports that can be used in a simple data flow graph. 21 * This was designed to work with audio but could be used for other 22 * types of data. 23 */ 24 25 #ifndef FLOWGRAPH_FLOW_GRAPH_NODE_H 26 #define FLOWGRAPH_FLOW_GRAPH_NODE_H 27 28 #include <cassert> 29 #include <cstring> 30 #include <math.h> 31 #include <memory> 32 #include <sys/types.h> 33 #include <time.h> 34 #include <unistd.h> 35 #include <vector> 36 37 // TODO Move these classes into separate files. 38 // TODO Review use of raw pointers for connect(). Maybe use smart pointers but need to avoid 39 // run-time deallocation in audio thread. 40 41 // Set this to 1 if using it inside the Android framework. 42 // This code is kept here so that it can be moved easily between Oboe and AAudio. 43 #ifndef FLOWGRAPH_ANDROID_INTERNAL 44 #define FLOWGRAPH_ANDROID_INTERNAL 0 45 #endif 46 47 // Set this to a name that will prevent AAudio from calling into Oboe. 48 // AAudio and Oboe both use a version of this flowgraph package. 49 // There was a problem in the unit tests where AAudio would call a constructor 50 // in AAudio and then call a destructor in Oboe! That caused memory corruption. 51 // For more details, see Issue #930. 52 #ifndef FLOWGRAPH_OUTER_NAMESPACE 53 #define FLOWGRAPH_OUTER_NAMESPACE oboe 54 #endif 55 56 namespace FLOWGRAPH_OUTER_NAMESPACE { 57 namespace flowgraph { 58 59 // Default block size that can be overridden when the FlowGraphPortFloat is created. 60 // If it is too small then we will have too much overhead from switching between nodes. 61 // If it is too high then we will thrash the caches. 62 constexpr int kDefaultBufferSize = 8; // arbitrary 63 64 class FlowGraphPort; 65 class FlowGraphPortFloatInput; 66 67 /***************************************************************************/ 68 /** 69 * Base class for all nodes in the flowgraph. 70 */ 71 class FlowGraphNode { 72 public: FlowGraphNode()73 FlowGraphNode() {} 74 virtual ~FlowGraphNode() = default; 75 76 /** 77 * Read from the input ports, 78 * generate multiple frames of data then write the results to the output ports. 79 * 80 * @param numFrames maximum number of frames requested for processing 81 * @return number of frames actually processed 82 */ 83 virtual int32_t onProcess(int32_t numFrames) = 0; 84 85 /** 86 * If the callCount is at or after the previous callCount then call 87 * pullData on all of the upstreamNodes. 88 * Then call onProcess(). 89 * This prevents infinite recursion in case of cyclic graphs. 90 * It also prevents nodes upstream from a branch from being executed twice. 91 * 92 * @param callCount 93 * @param numFrames 94 * @return number of frames valid 95 */ 96 int32_t pullData(int32_t numFrames, int64_t callCount); 97 98 /** 99 * Recursively reset all the nodes in the graph, starting from a Sink. 100 * 101 * This must not be called at the same time as pullData! 102 */ 103 void pullReset(); 104 105 /** 106 * Reset framePosition counters. 107 */ 108 virtual void reset(); 109 addInputPort(FlowGraphPort & port)110 void addInputPort(FlowGraphPort &port) { 111 mInputPorts.push_back(port); 112 } 113 isDataPulledAutomatically()114 bool isDataPulledAutomatically() const { 115 return mDataPulledAutomatically; 116 } 117 118 /** 119 * Set true if you want the data pulled through the graph automatically. 120 * This is the default. 121 * 122 * Set false if you want to pull the data from the input ports in the onProcess() method. 123 * You might do this, for example, in a sample rate converting node. 124 * 125 * @param automatic 126 */ setDataPulledAutomatically(bool automatic)127 void setDataPulledAutomatically(bool automatic) { 128 mDataPulledAutomatically = automatic; 129 } 130 getName()131 virtual const char *getName() { 132 return "FlowGraph"; 133 } 134 getLastCallCount()135 int64_t getLastCallCount() { 136 return mLastCallCount; 137 } 138 139 protected: 140 141 static constexpr int64_t kInitialCallCount = -1; 142 int64_t mLastCallCount = kInitialCallCount; 143 144 std::vector<std::reference_wrapper<FlowGraphPort>> mInputPorts; 145 146 private: 147 bool mDataPulledAutomatically = true; 148 bool mBlockRecursion = false; 149 int32_t mLastFrameCount = 0; 150 151 }; 152 153 /***************************************************************************/ 154 /** 155 * This is a connector that allows data to flow between modules. 156 * 157 * The ports are the primary means of interacting with a module. 158 * So they are generally declared as public. 159 * 160 */ 161 class FlowGraphPort { 162 public: FlowGraphPort(FlowGraphNode & parent,int32_t samplesPerFrame)163 FlowGraphPort(FlowGraphNode &parent, int32_t samplesPerFrame) 164 : mContainingNode(parent) 165 , mSamplesPerFrame(samplesPerFrame) { 166 } 167 168 // Ports are often declared public. So let's make them non-copyable. 169 FlowGraphPort(const FlowGraphPort&) = delete; 170 FlowGraphPort& operator=(const FlowGraphPort&) = delete; 171 getSamplesPerFrame()172 int32_t getSamplesPerFrame() const { 173 return mSamplesPerFrame; 174 } 175 176 virtual int32_t pullData(int64_t framePosition, int32_t numFrames) = 0; 177 pullReset()178 virtual void pullReset() {} 179 180 protected: 181 FlowGraphNode &mContainingNode; 182 183 private: 184 const int32_t mSamplesPerFrame = 1; 185 }; 186 187 /***************************************************************************/ 188 /** 189 * This port contains a 32-bit float buffer that can contain several frames of data. 190 * Processing the data in a block improves performance. 191 * 192 * The size is framesPerBuffer * samplesPerFrame). 193 */ 194 class FlowGraphPortFloat : public FlowGraphPort { 195 public: 196 FlowGraphPortFloat(FlowGraphNode &parent, 197 int32_t samplesPerFrame, 198 int32_t framesPerBuffer = kDefaultBufferSize 199 ); 200 201 virtual ~FlowGraphPortFloat() = default; 202 getFramesPerBuffer()203 int32_t getFramesPerBuffer() const { 204 return mFramesPerBuffer; 205 } 206 207 protected: 208 209 /** 210 * @return buffer internal to the port or from a connected port 211 */ getBuffer()212 virtual float *getBuffer() { 213 return mBuffer.get(); 214 } 215 216 private: 217 const int32_t mFramesPerBuffer = 1; 218 std::unique_ptr<float[]> mBuffer; // allocated in constructor 219 }; 220 221 /***************************************************************************/ 222 /** 223 * The results of a node's processing are stored in the buffers of the output ports. 224 */ 225 class FlowGraphPortFloatOutput : public FlowGraphPortFloat { 226 public: FlowGraphPortFloatOutput(FlowGraphNode & parent,int32_t samplesPerFrame)227 FlowGraphPortFloatOutput(FlowGraphNode &parent, int32_t samplesPerFrame) 228 : FlowGraphPortFloat(parent, samplesPerFrame) { 229 } 230 231 virtual ~FlowGraphPortFloatOutput() = default; 232 233 using FlowGraphPortFloat::getBuffer; 234 235 /** 236 * Connect to the input of another module. 237 * An input port can only have one connection. 238 * An output port can have multiple connections. 239 * If you connect a second output port to an input port 240 * then it overwrites the previous connection. 241 * 242 * This not thread safe. Do not modify the graph topology from another thread while running. 243 * Also do not delete a module while it is connected to another port if the graph is running. 244 */ 245 void connect(FlowGraphPortFloatInput *port); 246 247 /** 248 * Disconnect from the input of another module. 249 * This not thread safe. 250 */ 251 void disconnect(FlowGraphPortFloatInput *port); 252 253 /** 254 * Call the parent module's onProcess() method. 255 * That may pull data from its inputs and recursively 256 * process the entire graph. 257 * @return number of frames actually pulled 258 */ 259 int32_t pullData(int64_t framePosition, int32_t numFrames) override; 260 261 262 void pullReset() override; 263 264 }; 265 266 /***************************************************************************/ 267 268 /** 269 * An input port for streaming audio data. 270 * You can set a value that will be used for processing. 271 * If you connect an output port to this port then its value will be used instead. 272 */ 273 class FlowGraphPortFloatInput : public FlowGraphPortFloat { 274 public: FlowGraphPortFloatInput(FlowGraphNode & parent,int32_t samplesPerFrame)275 FlowGraphPortFloatInput(FlowGraphNode &parent, int32_t samplesPerFrame) 276 : FlowGraphPortFloat(parent, samplesPerFrame) { 277 // Add to parent so it can pull data from each input. 278 parent.addInputPort(*this); 279 } 280 281 virtual ~FlowGraphPortFloatInput() = default; 282 283 /** 284 * If connected to an output port then this will return 285 * that output ports buffers. 286 * If not connected then it returns the input ports own buffer 287 * which can be loaded using setValue(). 288 */ 289 float *getBuffer() override; 290 291 /** 292 * Write every value of the float buffer. 293 * This value will be ignored if an output port is connected 294 * to this port. 295 */ setValue(float value)296 void setValue(float value) { 297 int numFloats = kDefaultBufferSize * getSamplesPerFrame(); 298 float *buffer = getBuffer(); 299 for (int i = 0; i < numFloats; i++) { 300 *buffer++ = value; 301 } 302 } 303 304 /** 305 * Connect to the output of another module. 306 * An input port can only have one connection. 307 * An output port can have multiple connections. 308 * This not thread safe. 309 */ connect(FlowGraphPortFloatOutput * port)310 void connect(FlowGraphPortFloatOutput *port) { 311 assert(getSamplesPerFrame() == port->getSamplesPerFrame()); 312 mConnected = port; 313 } 314 disconnect(FlowGraphPortFloatOutput * port)315 void disconnect(FlowGraphPortFloatOutput *port) { 316 assert(mConnected == port); 317 (void) port; 318 mConnected = nullptr; 319 } 320 disconnect()321 void disconnect() { 322 mConnected = nullptr; 323 } 324 325 /** 326 * Pull data from any output port that is connected. 327 */ 328 int32_t pullData(int64_t framePosition, int32_t numFrames) override; 329 330 void pullReset() override; 331 332 private: 333 FlowGraphPortFloatOutput *mConnected = nullptr; 334 }; 335 336 /***************************************************************************/ 337 338 /** 339 * Base class for an edge node in a graph that has no upstream nodes. 340 * It outputs data but does not consume data. 341 * By default, it will read its data from an external buffer. 342 */ 343 class FlowGraphSource : public FlowGraphNode { 344 public: FlowGraphSource(int32_t channelCount)345 explicit FlowGraphSource(int32_t channelCount) 346 : output(*this, channelCount) { 347 } 348 349 virtual ~FlowGraphSource() = default; 350 351 FlowGraphPortFloatOutput output; 352 }; 353 354 /***************************************************************************/ 355 356 /** 357 * Base class for an edge node in a graph that has no upstream nodes. 358 * It outputs data but does not consume data. 359 * By default, it will read its data from an external buffer. 360 */ 361 class FlowGraphSourceBuffered : public FlowGraphSource { 362 public: FlowGraphSourceBuffered(int32_t channelCount)363 explicit FlowGraphSourceBuffered(int32_t channelCount) 364 : FlowGraphSource(channelCount) {} 365 366 virtual ~FlowGraphSourceBuffered() = default; 367 368 /** 369 * Specify buffer that the node will read from. 370 * 371 * @param data TODO Consider using std::shared_ptr. 372 * @param numFrames 373 */ setData(const void * data,int32_t numFrames)374 void setData(const void *data, int32_t numFrames) { 375 mData = data; 376 mSizeInFrames = numFrames; 377 mFrameIndex = 0; 378 } 379 380 protected: 381 const void *mData = nullptr; 382 int32_t mSizeInFrames = 0; // number of frames in mData 383 int32_t mFrameIndex = 0; // index of next frame to be processed 384 }; 385 386 /***************************************************************************/ 387 /** 388 * Base class for an edge node in a graph that has no downstream nodes. 389 * It consumes data but does not output data. 390 * This graph will be executed when data is read() from this node 391 * by pulling data from upstream nodes. 392 */ 393 class FlowGraphSink : public FlowGraphNode { 394 public: FlowGraphSink(int32_t channelCount)395 explicit FlowGraphSink(int32_t channelCount) 396 : input(*this, channelCount) { 397 } 398 399 virtual ~FlowGraphSink() = default; 400 401 FlowGraphPortFloatInput input; 402 403 /** 404 * Dummy processor. The work happens in the read() method. 405 * 406 * @param numFrames 407 * @return number of frames actually processed 408 */ onProcess(int32_t numFrames)409 int32_t onProcess(int32_t numFrames) override { 410 return numFrames; 411 } 412 413 virtual int32_t read(void *data, int32_t numFrames) = 0; 414 415 protected: 416 /** 417 * Pull data through the graph using this nodes last callCount. 418 * @param numFrames 419 * @return 420 */ 421 int32_t pullData(int32_t numFrames); 422 }; 423 424 /***************************************************************************/ 425 /** 426 * Base class for a node that has an input and an output with the same number of channels. 427 * This may include traditional filters, eg. FIR, but also include 428 * any processing node that converts input to output. 429 */ 430 class FlowGraphFilter : public FlowGraphNode { 431 public: FlowGraphFilter(int32_t channelCount)432 explicit FlowGraphFilter(int32_t channelCount) 433 : input(*this, channelCount) 434 , output(*this, channelCount) { 435 } 436 437 virtual ~FlowGraphFilter() = default; 438 439 FlowGraphPortFloatInput input; 440 FlowGraphPortFloatOutput output; 441 }; 442 443 } /* namespace flowgraph */ 444 } /* namespace FLOWGRAPH_OUTER_NAMESPACE */ 445 446 #endif /* FLOWGRAPH_FLOW_GRAPH_NODE_H */ 447