1 /* 2 * Copyright 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 /* 18 * FlowGraph.h 19 * 20 * Processing node and ports that can be used in a simple data flow graph. 21 * This was designed to work with audio but could be used for other 22 * types of data. 23 */ 24 25 #ifndef FLOWGRAPH_FLOW_GRAPH_NODE_H 26 #define FLOWGRAPH_FLOW_GRAPH_NODE_H 27 28 #include <cassert> 29 #include <cstring> 30 #include <math.h> 31 #include <memory> 32 #include <sys/types.h> 33 #include <time.h> 34 #include <unistd.h> 35 #include <vector> 36 37 // TODO Move these classes into separate files. 38 // TODO Review use of raw pointers for connect(). Maybe use smart pointers but need to avoid 39 // run-time deallocation in audio thread. 40 41 // Set flags FLOWGRAPH_ANDROID_INTERNAL and FLOWGRAPH_OUTER_NAMESPACE based on whether compiler 42 // flag __ANDROID_NDK__ is defined. __ANDROID_NDK__ should be defined in oboe and not aaudio. 43 44 #ifndef FLOWGRAPH_ANDROID_INTERNAL 45 #ifdef __ANDROID_NDK__ 46 #define FLOWGRAPH_ANDROID_INTERNAL 0 47 #else 48 #define FLOWGRAPH_ANDROID_INTERNAL 1 49 #endif // __ANDROID_NDK__ 50 #endif // FLOWGRAPH_ANDROID_INTERNAL 51 52 #ifndef FLOWGRAPH_OUTER_NAMESPACE 53 #ifdef __ANDROID_NDK__ 54 #define FLOWGRAPH_OUTER_NAMESPACE oboe 55 #else 56 #define FLOWGRAPH_OUTER_NAMESPACE aaudio 57 #endif // __ANDROID_NDK__ 58 #endif // FLOWGRAPH_OUTER_NAMESPACE 59 60 namespace FLOWGRAPH_OUTER_NAMESPACE::flowgraph { 61 62 // Default block size that can be overridden when the FlowGraphPortFloat is created. 63 // If it is too small then we will have too much overhead from switching between nodes. 64 // If it is too high then we will thrash the caches. 65 constexpr int kDefaultBufferSize = 8; // arbitrary 66 67 class FlowGraphPort; 68 class FlowGraphPortFloatInput; 69 70 /***************************************************************************/ 71 /** 72 * Base class for all nodes in the flowgraph. 73 */ 74 class FlowGraphNode { 75 public: 76 FlowGraphNode() = default; 77 virtual ~FlowGraphNode() = default; 78 79 /** 80 * Read from the input ports, 81 * generate multiple frames of data then write the results to the output ports. 82 * 83 * @param numFrames maximum number of frames requested for processing 84 * @return number of frames actually processed 85 */ 86 virtual int32_t onProcess(int32_t numFrames) = 0; 87 88 /** 89 * If the callCount is at or after the previous callCount then call 90 * pullData on all of the upstreamNodes. 91 * Then call onProcess(). 92 * This prevents infinite recursion in case of cyclic graphs. 93 * It also prevents nodes upstream from a branch from being executed twice. 94 * 95 * @param callCount 96 * @param numFrames 97 * @return number of frames valid 98 */ 99 int32_t pullData(int32_t numFrames, int64_t callCount); 100 101 /** 102 * Recursively reset all the nodes in the graph, starting from a Sink. 103 * 104 * This must not be called at the same time as pullData! 105 */ 106 void pullReset(); 107 108 /** 109 * Reset framePosition counters. 110 */ 111 virtual void reset(); 112 addInputPort(FlowGraphPort & port)113 void addInputPort(FlowGraphPort &port) { 114 mInputPorts.emplace_back(port); 115 } 116 isDataPulledAutomatically()117 bool isDataPulledAutomatically() const { 118 return mDataPulledAutomatically; 119 } 120 121 /** 122 * Set true if you want the data pulled through the graph automatically. 123 * This is the default. 124 * 125 * Set false if you want to pull the data from the input ports in the onProcess() method. 126 * You might do this, for example, in a sample rate converting node. 127 * 128 * @param automatic 129 */ setDataPulledAutomatically(bool automatic)130 void setDataPulledAutomatically(bool automatic) { 131 mDataPulledAutomatically = automatic; 132 } 133 getName()134 virtual const char *getName() { 135 return "FlowGraph"; 136 } 137 getLastCallCount()138 int64_t getLastCallCount() { 139 return mLastCallCount; 140 } 141 142 protected: 143 144 static constexpr int64_t kInitialCallCount = -1; 145 int64_t mLastCallCount = kInitialCallCount; 146 147 std::vector<std::reference_wrapper<FlowGraphPort>> mInputPorts; 148 149 private: 150 bool mDataPulledAutomatically = true; 151 bool mBlockRecursion = false; 152 int32_t mLastFrameCount = 0; 153 154 }; 155 156 /***************************************************************************/ 157 /** 158 * This is a connector that allows data to flow between modules. 159 * 160 * The ports are the primary means of interacting with a module. 161 * So they are generally declared as public. 162 * 163 */ 164 class FlowGraphPort { 165 public: FlowGraphPort(FlowGraphNode & parent,int32_t samplesPerFrame)166 FlowGraphPort(FlowGraphNode &parent, int32_t samplesPerFrame) 167 : mContainingNode(parent) 168 , mSamplesPerFrame(samplesPerFrame) { 169 } 170 171 virtual ~FlowGraphPort() = default; 172 173 // Ports are often declared public. So let's make them non-copyable. 174 FlowGraphPort(const FlowGraphPort&) = delete; 175 FlowGraphPort& operator=(const FlowGraphPort&) = delete; 176 getSamplesPerFrame()177 int32_t getSamplesPerFrame() const { 178 return mSamplesPerFrame; 179 } 180 181 virtual int32_t pullData(int64_t framePosition, int32_t numFrames) = 0; 182 pullReset()183 virtual void pullReset() {} 184 185 protected: 186 FlowGraphNode &mContainingNode; 187 188 private: 189 const int32_t mSamplesPerFrame = 1; 190 }; 191 192 /***************************************************************************/ 193 /** 194 * This port contains a 32-bit float buffer that can contain several frames of data. 195 * Processing the data in a block improves performance. 196 * 197 * The size is framesPerBuffer * samplesPerFrame). 198 */ 199 class FlowGraphPortFloat : public FlowGraphPort { 200 public: 201 FlowGraphPortFloat(FlowGraphNode &parent, 202 int32_t samplesPerFrame, 203 int32_t framesPerBuffer = kDefaultBufferSize 204 ); 205 206 virtual ~FlowGraphPortFloat() = default; 207 getFramesPerBuffer()208 int32_t getFramesPerBuffer() const { 209 return mFramesPerBuffer; 210 } 211 212 protected: 213 214 /** 215 * @return buffer internal to the port or from a connected port 216 */ getBuffer()217 virtual float *getBuffer() { 218 return mBuffer.get(); 219 } 220 221 private: 222 const int32_t mFramesPerBuffer = 1; 223 std::unique_ptr<float[]> mBuffer; // allocated in constructor 224 }; 225 226 /***************************************************************************/ 227 /** 228 * The results of a node's processing are stored in the buffers of the output ports. 229 */ 230 class FlowGraphPortFloatOutput : public FlowGraphPortFloat { 231 public: FlowGraphPortFloatOutput(FlowGraphNode & parent,int32_t samplesPerFrame)232 FlowGraphPortFloatOutput(FlowGraphNode &parent, int32_t samplesPerFrame) 233 : FlowGraphPortFloat(parent, samplesPerFrame) { 234 } 235 236 virtual ~FlowGraphPortFloatOutput() = default; 237 238 using FlowGraphPortFloat::getBuffer; 239 240 /** 241 * Connect to the input of another module. 242 * An input port can only have one connection. 243 * An output port can have multiple connections. 244 * If you connect a second output port to an input port 245 * then it overwrites the previous connection. 246 * 247 * This not thread safe. Do not modify the graph topology from another thread while running. 248 * Also do not delete a module while it is connected to another port if the graph is running. 249 */ 250 void connect(FlowGraphPortFloatInput *port); 251 252 /** 253 * Disconnect from the input of another module. 254 * This not thread safe. 255 */ 256 void disconnect(FlowGraphPortFloatInput *port); 257 258 /** 259 * Call the parent module's onProcess() method. 260 * That may pull data from its inputs and recursively 261 * process the entire graph. 262 * @return number of frames actually pulled 263 */ 264 int32_t pullData(int64_t framePosition, int32_t numFrames) override; 265 266 267 void pullReset() override; 268 269 }; 270 271 /***************************************************************************/ 272 273 /** 274 * An input port for streaming audio data. 275 * You can set a value that will be used for processing. 276 * If you connect an output port to this port then its value will be used instead. 277 */ 278 class FlowGraphPortFloatInput : public FlowGraphPortFloat { 279 public: FlowGraphPortFloatInput(FlowGraphNode & parent,int32_t samplesPerFrame)280 FlowGraphPortFloatInput(FlowGraphNode &parent, int32_t samplesPerFrame) 281 : FlowGraphPortFloat(parent, samplesPerFrame) { 282 // Add to parent so it can pull data from each input. 283 parent.addInputPort(*this); 284 } 285 286 virtual ~FlowGraphPortFloatInput() = default; 287 288 /** 289 * If connected to an output port then this will return 290 * that output ports buffers. 291 * If not connected then it returns the input ports own buffer 292 * which can be loaded using setValue(). 293 */ 294 float *getBuffer() override; 295 296 /** 297 * Write every value of the float buffer. 298 * This value will be ignored if an output port is connected 299 * to this port. 300 */ setValue(float value)301 void setValue(float value) { 302 int numFloats = kDefaultBufferSize * getSamplesPerFrame(); 303 float *buffer = getBuffer(); 304 for (int i = 0; i < numFloats; i++) { 305 *buffer++ = value; 306 } 307 } 308 309 /** 310 * Connect to the output of another module. 311 * An input port can only have one connection. 312 * An output port can have multiple connections. 313 * This not thread safe. 314 */ connect(FlowGraphPortFloatOutput * port)315 void connect(FlowGraphPortFloatOutput *port) { 316 assert(getSamplesPerFrame() == port->getSamplesPerFrame()); 317 mConnected = port; 318 } 319 disconnect(FlowGraphPortFloatOutput * port)320 void disconnect(FlowGraphPortFloatOutput *port) { 321 assert(mConnected == port); 322 (void) port; 323 mConnected = nullptr; 324 } 325 disconnect()326 void disconnect() { 327 mConnected = nullptr; 328 } 329 330 /** 331 * Pull data from any output port that is connected. 332 */ 333 int32_t pullData(int64_t framePosition, int32_t numFrames) override; 334 335 void pullReset() override; 336 337 private: 338 FlowGraphPortFloatOutput *mConnected = nullptr; 339 }; 340 341 /***************************************************************************/ 342 343 /** 344 * Base class for an edge node in a graph that has no upstream nodes. 345 * It outputs data but does not consume data. 346 * By default, it will read its data from an external buffer. 347 */ 348 class FlowGraphSource : public FlowGraphNode { 349 public: FlowGraphSource(int32_t channelCount)350 explicit FlowGraphSource(int32_t channelCount) 351 : output(*this, channelCount) { 352 } 353 354 virtual ~FlowGraphSource() = default; 355 356 FlowGraphPortFloatOutput output; 357 }; 358 359 /***************************************************************************/ 360 361 /** 362 * Base class for an edge node in a graph that has no upstream nodes. 363 * It outputs data but does not consume data. 364 * By default, it will read its data from an external buffer. 365 */ 366 class FlowGraphSourceBuffered : public FlowGraphSource { 367 public: FlowGraphSourceBuffered(int32_t channelCount)368 explicit FlowGraphSourceBuffered(int32_t channelCount) 369 : FlowGraphSource(channelCount) {} 370 371 virtual ~FlowGraphSourceBuffered() = default; 372 373 /** 374 * Specify buffer that the node will read from. 375 * 376 * @param data TODO Consider using std::shared_ptr. 377 * @param numFrames 378 */ setData(const void * data,int32_t numFrames)379 void setData(const void *data, int32_t numFrames) { 380 mData = data; 381 mSizeInFrames = numFrames; 382 mFrameIndex = 0; 383 } 384 385 protected: 386 const void *mData = nullptr; 387 int32_t mSizeInFrames = 0; // number of frames in mData 388 int32_t mFrameIndex = 0; // index of next frame to be processed 389 }; 390 391 /***************************************************************************/ 392 /** 393 * Base class for an edge node in a graph that has no downstream nodes. 394 * It consumes data but does not output data. 395 * This graph will be executed when data is read() from this node 396 * by pulling data from upstream nodes. 397 */ 398 class FlowGraphSink : public FlowGraphNode { 399 public: FlowGraphSink(int32_t channelCount)400 explicit FlowGraphSink(int32_t channelCount) 401 : input(*this, channelCount) { 402 } 403 404 virtual ~FlowGraphSink() = default; 405 406 FlowGraphPortFloatInput input; 407 408 /** 409 * Do nothing. The work happens in the read() method. 410 * 411 * @param numFrames 412 * @return number of frames actually processed 413 */ onProcess(int32_t numFrames)414 int32_t onProcess(int32_t numFrames) override { 415 return numFrames; 416 } 417 418 virtual int32_t read(void *data, int32_t numFrames) = 0; 419 420 protected: 421 /** 422 * Pull data through the graph using this nodes last callCount. 423 * @param numFrames 424 * @return 425 */ 426 int32_t pullData(int32_t numFrames); 427 }; 428 429 /***************************************************************************/ 430 /** 431 * Base class for a node that has an input and an output with the same number of channels. 432 * This may include traditional filters, eg. FIR, but also include 433 * any processing node that converts input to output. 434 */ 435 class FlowGraphFilter : public FlowGraphNode { 436 public: FlowGraphFilter(int32_t channelCount)437 explicit FlowGraphFilter(int32_t channelCount) 438 : input(*this, channelCount) 439 , output(*this, channelCount) { 440 } 441 442 virtual ~FlowGraphFilter() = default; 443 444 FlowGraphPortFloatInput input; 445 FlowGraphPortFloatOutput output; 446 }; 447 448 } /* namespace FLOWGRAPH_OUTER_NAMESPACE::flowgraph */ 449 450 #endif /* FLOWGRAPH_FLOW_GRAPH_NODE_H */ 451