Framework
The framework of the ADM Toolbox provides a structure for components which process ADM files to fit into.
This takes the form of a processing graph: The individual components are processes, which have input and output ports through which they communicate. These can be connected together in a graph structure, which is a collection of processes, and connections between their ports.
For example, consider a “read BW64” process with output ports for audio samples and an ADM document, and a “write BW64” process with input ports for audio samples and an ADM document. These may be connected together to form a kind of BW64 copy operation.
Processes
There are several kinds of process, shown in the following inheritance diagram:
Process
is the base class for all process types. It has input and
output ports, and can be added to graphs.
This is split into two types: AtomicProcess
and
CompositeProcess
. Atomic processes actually implement some processing
(i.e. they are not divisible), while composite processes just contain other
processes and connections between them, which may be themselves be composite or
atomic.
Atomic processes are further divided into two types:
FunctionalAtomicProcess
and StreamingAtomicProcess
.
In a functional processes, the outputs are a function of the inputs: they
implement a process()
method, which is called once, and should read from
the input ports and write to the output ports.
While functional processes are limited to operating on single pieces of data, streaming processes operate on streams of data. They implement three methods:
initialise()
is called once, and can read from non-streaming input portsprocess()
should read from streaming input ports and write to streaming output ports, and is called repeatedly as long as any ports are not closed (see below)finalise()
is called once, and can write to non-streaming output ports
For example, a loudness meter would be a streaming process: process
would
read audio samples from a streaming input port, performing the analysis. and
the accumulated loudness values would be written to a non-streaming output port
in finalise
.
Ports
As aluded to above, processes can have two kinds of ports, data ports and streaming ports. Additionally, each port has a type, and can only be connected to ports of the same type. This is shown in this inheritance diagram:
Port
can be used to reference any type of port, and is mainly used for
making connections. DataPort
and StreamPort
are concrete
ports of a particular type, and are mostly used inside processes.
DataPortBase
and StreamPortBase
are interfaces used by the
implementation.
Data ports hold a value of the given type. A process writing to a data port
should use set_value()
, while a process reading from a data port should
use get_value()
.
The framework moves or copies this value between connected ports.
Stream ports hold a queue of values of the given type, and an eof
(End Of
File) flag.
Writers should call push()
to write items, followed by close()
to
signal that no more items will be pushed.
Readers should use available()
to see if there are any items in the
queue, and pop()
to read an item. When eof()
returns true, there
are no items left to read, and the writer has closed the port.
The framework moves or copies items and the eof
flag between ports.
See Port Value Semantics for more detail on how data is transferred between ports.
An Example
The graph below is for an application which normalises the levels in an audio file while retaining ADM metadata:
Red lines represent streaming connections. Processes are shown as columns with the input ports on the left, output ports on the right, and the process name in the middle.
The components are as follows:
in_path and out_path are
DataSource
processes which produce the input and output path namereader is a composite process which reads ADM metadata and samples (streaming) from a BW64 file
writer is a composite process which writes ADM metadata and samples (streaming) to a BW64 file.
normalise is a composite process which normalises its input samples to produce some output samples.
If these composite processes are expanded it looks like this (you may have to open in a new tab…):
Here, composite processes are shown as boxes containing their constituent processes, with rounded boxes representing their input and output ports (due to the limitations of graphviz).
Zooming in, reader looks like this:
It consists of two independent processes which read the samples and ADM data, so there is no ordering constraint between them.
Writer is more complex:
libbw64 does not support editing files, so the samples and ADM metadata need to
be written using the same Bw64Writer
object. To do this, the audio writer
process sends the writer object out of a port, which is used by the adm
writer process. These could technically be merged into one atomic process, but
this way the ADM metadata does not have to be available before the samples.
The normalise process looks like this:
The analyse process takes streaming audio and measures the RMS level of the whole of each channel; these are produced on a data port. These RMS levels are used by the apply process to modify the level of the input samples.
Evaluation
To evaluate the graph, the first step is to flatten it, expanding composite processes:
This exposes a problem: there is a streaming connection from audio reader to
analyse and apply, but there’s a non-streaming connection between analyse
and apply. Because non-streaming ports are read before streaming and written
after streaming (see StreamingAtomicProcess
), it’s not possible to
stream between all three processes simultaneously.
To deal with this situation, buffer writer and buffer reader processes are automatically inserted to split enough streaming connections that this does not occur. The graph then looks like this:
Now audio reader, analyse and buffer writer can run together, followed by buffer reader, apply and audio writer, because there are no non-streaming connections within each of these sub-graphs.
The type of buffer writer and reader used can be specialised for each type of
streaming port by specialising MakeBuffer
. The default implementation
buffers stream values into a std::vector
, which defeats the memory savings
of streaming. A specialisation is provided for audio samples which writes to a
temporary wav file instead.
Port Value Semantics
One output port may be connected to multiple input ports. To implement this, the value stored in the port is copied to all but the last connected port, and moved to the last output port.
Thus, the data stored in a port should have value semantics – that is a copy creates a new value with the same contents, and changing one copy does not affect other copies. This is done because it’s much easier to implement sensible reference semantics on top of value semantics, than it is the other way around.
Basic types (ints, floats etc.), POD types and STL containers meet this
criteria, while std::shared_ptr
and some custom classes do not.
To work with types like libadm documents which are always accessed through a
shared_ptr
, ValuePtr
is provided. This allows each reader to chose
whether they want a const or a non-const pointer, which can save copying the
document in cases where all readers access the value through a const pointer,
or there is only one reader which access the value through a non-const pointer.
Other Features
This section is lists features that the framework is designed to support, but are not currently implemented.
Progress
When processing large files it would be nice to indicate the progress to the user. There are two parts to this:
Each
ExecStep
in aPlan
represents one step of the evaluation. These should provide more information about what they are doing (e.g. a name and a list of processes it will run) so that the overall process through the graph can be reported.Streams should be able to optionally report their progress as a percentage. Often there will be just one process in a streaming sub-graph that knows how far through it is (e.g. a file reader), and this can be reported through a callback.
Streaming ADM
A streaming ADM BW64 file can be thought of as a sequence of ADM documents with associated ranges of samples. To process these within this framework, one solution would be to allow the graph to run multiple times (once on each document). This should allow components to be shared between streaming and non-streaming uses.
Duplicating Streaming Processes
The example in the last section is wasteful, in that the samples from the original file are written to and read from a temporary file in order to break a streaming connection – It would be better if the original file could be read a second time.
Processes should be able to be specify that they are safe to copy (i.e. will always produce the same output given the same inputs with no side-effects), and the framework should prefer to use this if possible to break streaming connections.
Exceptions
Processes can raise exceptions while running. Currently these are just propagated to the user, but should be wrapped in another exception that records which process they came from in order to improve error reporting.
Writing Processes
This section briefly explains how to write some different kinds of processes.
Functional Atomic Process
An example of a process that adds 1 to an integer input:
class AddOne : public FunctionalAtomicProcess {
public:
AddOne(const std::string &name)
: FunctionalAtomicProcess(name),
in(add_in_port<DataPort<int>>("in")),
out(add_out_port<DataPort<int>>("out")) {}
virtual void process() override {
out->set_value(in->get_value() + 1);
}
private:
DataPortPtr<int> in;
DataPortPtr<int> out;
};
Note that:
The process name is passed in through the constructor, and should normally be passed straight to the
FunctionalAtomicProcess
constructor.Ports are added through
Process::add_in_port()
andProcess::add_out_port()
, with the port type as a parameter. These are saved (as the corresponding pointer type) for use in process.DataPort::get_value()
andDataPort::set_value()
are used to get and set the values of input and output port respectively.
For heavier types, std::move()
should be used in process()
like
this example with a std::vector input and output:
virtual void process() override {
// move from the input port to avoid copying the data
std::vector<int> value = std::move(in->get_value());
// modify it
value.push_back(7);
// move to the output port to avoid copying again
out->set_value(std::move(value));
}
For types using ValuePtr
, it will look something like this for
modifying a value in-place:
virtual void process() override {
// get the wrapper
ValuePtr<std::vector<int>> value_ptr = std::move(in->get_value());
// extract the value; this will copy or move if it's the last user
std::shared_ptr<std::vector<int>> value = value_ptr.move_or_copy();
// modify it
value->push_back(7);
// move to the output port
out->set_value(std::move(value));
}
Or this if read-only access is OK:
virtual void process() override {
// get the wrapper
ValuePtr<std::vector<int>> value_ptr = std::move(in->get_value());
// extract a reference to the value
std::shared_ptr<const std::vector<int>> value = value_ptr.read();
// use it somehow
out->set_value(value->at(0));
}
Streaming Atomic Process
An example of a process that produces a stream that’s the same as the input, but one greater:
class AddOneStream : public StreamingAtomicProcess {
public:
AddOneStream(const std::string &name)
: StreamingAtomicProcess(name),
in(add_in_port<StreamPort<int>>("in")),
out(add_out_port<StreamPort<int>>("out")) {}
virtual void process() override {
while (in->available())
out->push(in->pop() + 1);
if (in->eof())
out->close();
}
private:
StreamPortPtr<int> in;
StreamPortPtr<int> out;
};
Note that:
All ports must be empty and closed (check with
StreamPort::eof()
) for the streaming to finish. Generally output ports should be closed once corresponding inputs have ended (StreamPort::eof()
). It’s valid to close a port multiple times (it has no effect), so there’s no need to track if it’s been closed or not.StreamPort::push()
andStreamPort::pop()
behave similarly toDataPort::set_value()
andDataPort::get_value()
, so the above information aboutstd::move()
andValuePtr
apply here too.StreamPort::pop()
will throw an exception if there’s nothing in the queue, so useStreamPort::available()
.To get data from input data ports or send data to output data ports, override
initialise()
orfinalise()
respectively.
Composite Process
Here’s a composite process which chains together two AddOne
processes
defined earlier:
class AddTwo : public CompositeProcess {
public:
AddTwo(const std::string &name)
: CompositeProcess(name) {
// add ports for this process
auto in = add_in_port<DataPort<int>>("in");
auto out = add_out_port<DataPort<int>>("out");
// add sub-processes
auto p1 = add_process<AddOne>("p1");
auto p2 = add_process<AddOne>("p2");
// connect everything together
connect(in, p1->get_in_port("in"));
connect(p1->get_out_port("out"), p2->get_in_port("in"));
connect(p2->get_out_port("out"), out);
}
};
Ports are added using the same functions as for atomic processes.
Sub-processes are added using
Graph::add_process()
orGraph::register_process()
.Ports are connected using
Graph::connect()
.Process::get_out_port()
andProcess::get_in_port()
are used to access ports of sub-processes by name. All external ports, and ports of sub-processes must be connected.There’s no need to keep a reference to the ports or processes.
Building Processing Graphs
Processing graphs can be built with the Graph
class, and ran using
evaluate()
, like this:
Graph g;
auto p1 = g.add_process<AddOne>("p1");
auto p2 = g.add_process<AddOne>("p2");
// connect processes together
g.connect(p1->get_out_port("out"), p2->get_in_port("in"));
// add input
auto in = g.add_process<DataSource<int>>("input", 5);
g.connect(in, p1->get_in_port("in"));
// add output
auto out = g.add_process<DataSink<int>>("output");
g.connect(p2->get_out_port("out"), out);
// run the graph
evaluate(g);
// check the output
assert(out->get_value() == 7);
This is exactly the same API as is used for building composite processes.
Again, all ports of all processes mus be connected. Inputs and outputs can
accessed by adding and connecting DataSource
and DataSink
processes, and unused output ports can be terminated with NullSink
processes.