Program Listing for File process.hpp

Return to documentation for file (include/eat/framework/process.hpp)

#pragma once
#include <deque>
#include <map>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <vector>

namespace eat::framework {

class Port;
class DataPortBase;
class StreamPortBase;
class Process;
class FunctionalAtomicProcess;
class StreamingAtomicProcess;
class AtomicProcess;
class CompositeProcess;
class Graph;

using PortPtr = std::shared_ptr<Port>;
using ProcessPtr = std::shared_ptr<Process>;
using GraphPtr = std::shared_ptr<Graph>;

using DataPortBasePtr = std::shared_ptr<DataPortBase>;
using StreamPortBasePtr = std::shared_ptr<StreamPortBase>;
using AtomicProcessPtr = std::shared_ptr<AtomicProcess>;
using FunctionalAtomicProcessPtr = std::shared_ptr<FunctionalAtomicProcess>;
using StreamingAtomicProcessPtr = std::shared_ptr<StreamingAtomicProcess>;
using CompositeProcessPtr = std::shared_ptr<CompositeProcess>;

class Graph {
 public:
  virtual ~Graph() {}

  template <typename T, typename... Args>
  std::shared_ptr<T> add_process(Args &&...args);

  ProcessPtr register_process(ProcessPtr process);
  void connect(const PortPtr &a, const PortPtr &b);

  const std::vector<ProcessPtr> &get_processes() const { return processes; }
  const std::map<PortPtr, PortPtr> &get_port_inputs() const { return port_inputs; }

 private:
  std::vector<ProcessPtr> processes;

 protected:
  std::map<PortPtr, PortPtr> port_inputs;
};

class Process {
 public:
  explicit Process(const std::string &name);
  virtual ~Process() {}

  template <typename T>
  std::shared_ptr<T> add_in_port(const std::string &name);

  template <typename T>
  std::shared_ptr<T> add_out_port(const std::string &name);

  template <typename T = Port>
  std::shared_ptr<T> get_in_port(const std::string &name) const;

  template <typename T = Port>
  std::shared_ptr<T> get_out_port(const std::string &name) const;

  // get all ports in one map for compatibility
  std::map<std::string, PortPtr> get_port_map() const;

  const std::map<std::string, PortPtr> &get_in_port_map() const { return in_ports; }
  const std::map<std::string, PortPtr> &get_out_port_map() const { return out_ports; }

  const std::string &name() const { return name_; }

 private:
  std::map<std::string, PortPtr> in_ports;
  std::map<std::string, PortPtr> out_ports;
  std::string name_;
};

class AtomicProcess : public Process {
 public:
  using Process::Process;

 private:
};

class FunctionalAtomicProcess : public AtomicProcess {
 public:
  using AtomicProcess::AtomicProcess;
  virtual void process() = 0;
};

class StreamingAtomicProcess : public AtomicProcess {
 public:
  using AtomicProcess::AtomicProcess;
  virtual void initialise() {}
  virtual void process() {}
  virtual void finalise() {}

  virtual std::optional<float> get_progress() { return std::nullopt; }
};

class CompositeProcess : public Process, public Graph {
 public:
  void connect(const PortPtr &a, const PortPtr &b);

  using Process::Process;
};

class Port {
 public:
  explicit Port(const std::string &name);
  virtual ~Port() = default;
  const std::string &name() const { return name_; }

  virtual bool compatible(const PortPtr &other) const = 0;

 private:
  std::string name_;
};

class DataPortBase : public Port {
 public:
  // methods used by evaluate to move data around
  virtual void move_to(DataPortBase &) = 0;
  virtual void copy_to(DataPortBase &) = 0;
  using Port::Port;
};

template <typename T>
class DataPort : public DataPortBase {
 public:
  virtual bool compatible(const PortPtr &other) const override;

  template <typename U>
  void set_value(U &&value);

  const T &get_value() const;
  T &get_value();

  // internals used to connect ports:

  virtual void move_to(DataPortBase &other) override;
  virtual void copy_to(DataPortBase &other) override;

  using DataPortBase::DataPortBase;

 private:
  T value;
};

template <typename T>
using DataPortPtr = std::shared_ptr<DataPort<T>>;

class StreamPortBase : public Port {
 public:
  virtual void close() = 0;

  virtual bool eof() = 0;

  virtual bool eof_triggered() = 0;

  // internals used to connect ports:

  virtual void copy_to(StreamPortBase &other) = 0;
  virtual void move_to(StreamPortBase &other) = 0;
  virtual void clear() = 0;

  virtual ProcessPtr get_buffer_writer(const std::string &name) = 0;
  virtual ProcessPtr get_buffer_reader(const std::string &name) = 0;

  using Port::Port;
};

template <typename T>
class StreamPort : public StreamPortBase {
 public:
  virtual bool compatible(const PortPtr &other) const override;

  void push(T value);
  bool available() const;
  T pop();

  virtual void close() override;

  virtual bool eof() override;

  // internals

  virtual bool eof_triggered() override;
  virtual void copy_to(StreamPortBase &other) override;
  virtual void move_to(StreamPortBase &other) override;
  virtual void clear() override;

  virtual ProcessPtr get_buffer_writer(const std::string &name) override;
  virtual ProcessPtr get_buffer_reader(const std::string &name) override;

  using StreamPortBase::StreamPortBase;

 private:
  std::deque<T> queue;
  bool eof_ = false;
};

template <typename T>
using StreamPortPtr = std::shared_ptr<StreamPort<T>>;

template <typename T>
struct MakeBuffer {
  static ProcessPtr get_buffer_writer(const std::string &name);
  static ProcessPtr get_buffer_reader(const std::string &name);
};

// implementations

template <typename T, typename... Args>
std::shared_ptr<T> Graph::add_process(Args &&...args) {
  auto process = std::make_shared<T>(std::forward<Args>(args)...);
  register_process(process);
  return process;
}

template <typename T>
std::shared_ptr<T> Process::add_in_port(const std::string &name) {
  auto port = std::make_shared<T>(name);
  auto result = in_ports.emplace(port->name(), port);
  if (!result.second) throw std::runtime_error("duplicate input port: " + name);
  return port;
}

template <typename T>
std::shared_ptr<T> Process::add_out_port(const std::string &name) {
  auto port = std::make_shared<T>(name);
  auto result = out_ports.emplace(port->name(), port);
  if (!result.second) throw std::runtime_error("duplicate output port: " + name);
  return port;
}

template <typename T>
std::shared_ptr<T> Process::get_in_port(const std::string &name) const {
  auto port_it = in_ports.find(name);
  if (port_it == in_ports.end()) throw std::runtime_error("process has no input port named " + name);
  auto port_t = std::dynamic_pointer_cast<T>(port_it->second);
  if (!port_t) throw std::runtime_error("bad port type when requesting " + name);
  return port_t;
}

template <typename T>
std::shared_ptr<T> Process::get_out_port(const std::string &name) const {
  auto port_it = out_ports.find(name);
  if (port_it == out_ports.end()) throw std::runtime_error("process has no output port named " + name);
  auto port_t = std::dynamic_pointer_cast<T>(port_it->second);
  if (!port_t) throw std::runtime_error("bad port type when requesting " + name);
  return port_t;
}

template <typename T>
bool DataPort<T>::compatible(const PortPtr &other) const {
  return static_cast<bool>(std::dynamic_pointer_cast<DataPort<T>>(other));
}

template <typename T>
template <typename U>
void DataPort<T>::set_value(U &&new_value) {
  value = std::forward<U>(new_value);
}

template <typename T>
const T &DataPort<T>::get_value() const {
  return value;
}

template <typename T>
T &DataPort<T>::get_value() {
  return value;
}

template <typename T>
void DataPort<T>::move_to(DataPortBase &other) {
  dynamic_cast<DataPort<T> &>(other).set_value(std::move(value));
}

template <typename T>
void DataPort<T>::copy_to(DataPortBase &other) {
  dynamic_cast<DataPort<T> &>(other).set_value(value);
}

template <typename T>
bool StreamPort<T>::compatible(const PortPtr &other) const {
  return static_cast<bool>(std::dynamic_pointer_cast<StreamPort<T>>(other));
}

template <typename T>
bool StreamPort<T>::available() const {
  return queue.size();
}

template <typename T>
void StreamPort<T>::push(T value) {
  if (eof_) throw std::runtime_error("push to closed queue");
  queue.push_back(std::move(value));
}

template <typename T>
T StreamPort<T>::pop() {
  if (!queue.size()) throw std::runtime_error("pop from empty queue");
  T value = std::move(queue.front());  // ?
  queue.pop_front();
  return value;
}

template <typename T>
bool StreamPort<T>::eof() {
  return !queue.size() && eof_;
}

template <typename T>
void StreamPort<T>::close() {
  eof_ = true;
}

template <typename T>
bool StreamPort<T>::eof_triggered() {
  return eof_;
}

template <typename T>
void StreamPort<T>::copy_to(StreamPortBase &other) {
  auto &other_t = dynamic_cast<StreamPort<T> &>(other);
  for (auto &item : queue) {
    other_t.push(item);
  }
  if (eof_) other_t.close();
}

template <typename T>
void StreamPort<T>::move_to(StreamPortBase &other) {
  auto &other_t = dynamic_cast<StreamPort<T> &>(other);
  for (auto &&item : queue) {
    other_t.push(std::move(item));
  }
  if (eof_) other_t.close();
}

template <typename T>
void StreamPort<T>::clear() {
  queue.clear();
}

// default buffering implementation, writing to an in-memory buffer
// TODO: make the output a shared_ptr or something so that it doesn't need to
// be copied?

namespace detail {

template <typename T>
class InMemBufferWrite : public StreamingAtomicProcess {
 public:
  InMemBufferWrite(const std::string &name)
      : StreamingAtomicProcess(name),
        in(add_in_port<StreamPort<T>>("in")),
        out(add_out_port<DataPort<std::vector<T>>>("out")) {}

  void process() override {
    while (in->available()) {
      buf.emplace_back(in->pop());
    }
  }

  void finalise() override { out->set_value(std::move(buf)); }

 private:
  StreamPortPtr<T> in;
  DataPortPtr<std::vector<T>> out;

  std::vector<T> buf;
};

template <typename T>
class InMemBufferRead : public StreamingAtomicProcess {
 public:
  InMemBufferRead(const std::string &name)
      : StreamingAtomicProcess(name),
        in(add_in_port<DataPort<std::vector<T>>>("in")),
        out(add_out_port<StreamPort<T>>("out")) {}

  void initialise() override { buf = in->get_value(); }

  void process() override {
    if (idx < buf.size()) {
      out->push(buf[idx]);
      idx++;
    } else
      out->close();
  }

 private:
  DataPortPtr<std::vector<T>> in;
  StreamPortPtr<T> out;

  std::optional<float> get_progress() override {
    if (buf.size())
      return static_cast<float>(idx) / static_cast<float>(buf.size());
    else
      return std::nullopt;
  }

  std::vector<T> buf;
  size_t idx = 0;
};

}  // namespace detail

template <typename T>
ProcessPtr StreamPort<T>::get_buffer_writer(const std::string &name) {
  return MakeBuffer<T>::get_buffer_writer(name);
}

template <typename T>
ProcessPtr StreamPort<T>::get_buffer_reader(const std::string &name) {
  return MakeBuffer<T>::get_buffer_reader(name);
}

template <typename T>
ProcessPtr MakeBuffer<T>::get_buffer_reader(const std::string &name) {
  return std::make_shared<detail::InMemBufferRead<T>>(name);
}

template <typename T>
ProcessPtr MakeBuffer<T>::get_buffer_writer(const std::string &name) {
  return std::make_shared<detail::InMemBufferWrite<T>>(name);
}

}  // namespace eat::framework