// // Copyright 2012 Josh Blum // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with this program. If not, see . #ifndef INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP #define INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace gras { static GRAS_FORCE_INLINE unsigned long myulround(const double x) { return (unsigned long)(x + 0.5); } static GRAS_FORCE_INLINE unsigned long long myullround(const double x) { return (unsigned long long)(x + 0.5); } struct BlockActor : Apology::Worker { BlockActor(void); ~BlockActor(void); Block *block_ptr; std::string name; //for debug ThreadPool thread_pool; //do it here so we can match w/ the handler declarations void register_handlers(void) { this->RegisterHandler(this, &BlockActor::handle_topology); this->RegisterHandler(this, &BlockActor::handle_top_alloc); this->RegisterHandler(this, &BlockActor::handle_top_active); this->RegisterHandler(this, &BlockActor::handle_top_inert); this->RegisterHandler(this, &BlockActor::handle_top_token); this->RegisterHandler(this, &BlockActor::handle_top_config); this->RegisterHandler(this, &BlockActor::handle_top_thread_group); this->RegisterHandler(this, &BlockActor::handle_input_tag); this->RegisterHandler(this, &BlockActor::handle_input_buffer); this->RegisterHandler(this, &BlockActor::handle_input_token); this->RegisterHandler(this, &BlockActor::handle_input_check); this->RegisterHandler(this, &BlockActor::handle_input_alloc); this->RegisterHandler(this, &BlockActor::handle_output_buffer); this->RegisterHandler(this, &BlockActor::handle_output_token); this->RegisterHandler(this, &BlockActor::handle_output_check); this->RegisterHandler(this, &BlockActor::handle_output_hint); this->RegisterHandler(this, &BlockActor::handle_output_alloc); this->RegisterHandler(this, &BlockActor::handle_self_kick); this->RegisterHandler(this, &BlockActor::handle_update_inputs); } //handlers void handle_topology(const Apology::WorkerTopologyMessage &, const Theron::Address); void handle_top_alloc(const TopAllocMessage &, const Theron::Address); void handle_top_active(const TopActiveMessage &, const Theron::Address); void handle_top_inert(const TopInertMessage &, const Theron::Address); void handle_top_token(const TopTokenMessage &, const Theron::Address); void handle_top_config(const GlobalBlockConfig &, const Theron::Address); void handle_top_thread_group(const SharedThreadGroup &, const Theron::Address); void handle_input_tag(const InputTagMessage &, const Theron::Address); void handle_input_buffer(const InputBufferMessage &, const Theron::Address); void handle_input_token(const InputTokenMessage &, const Theron::Address); void handle_input_check(const InputCheckMessage &, const Theron::Address); void handle_input_alloc(const InputAllocMessage &, const Theron::Address); void handle_output_buffer(const OutputBufferMessage &, const Theron::Address); void handle_output_token(const OutputTokenMessage &, const Theron::Address); void handle_output_check(const OutputCheckMessage &, const Theron::Address); void handle_output_hint(const OutputHintMessage &, const Theron::Address); void handle_output_alloc(const OutputAllocMessage &, const Theron::Address); void handle_self_kick(const SelfKickMessage &, const Theron::Address); void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address); //helpers void buffer_returner(const size_t index, SBuffer &buffer); void mark_done(void); void handle_task(void); void input_fail(const size_t index); void sort_tags(const size_t index); void trim_tags(const size_t index); GRAS_FORCE_INLINE bool any_inputs_done(void) { if (this->inputs_done.none() or this->input_queues.all_ready()) return false; for (size_t i = 0; i < this->get_num_inputs(); i++) { if (this->inputs_done[i] and not this->input_queues.ready(i)) { return true; } } return false; } //per port properties std::vector input_items_sizes; std::vector output_items_sizes; std::vector input_configs; std::vector output_configs; size_t output_multiple_items; //keeps track of production std::vector items_consumed; std::vector items_produced; //work buffers for the classic interface size_t work_noutput_items; std::vector work_input_items; std::vector work_output_items; std::vector work_ninput_items; std::vector fcast_ninput_items; //work buffers for the new work interface Block::InputItems input_items; Block::OutputItems output_items; ptrdiff_t work_io_ptr_mask; //track work's calls to produce and consume std::vector produce_items; std::vector consume_items; std::vector consume_called; //track the subscriber counts std::vector input_tokens; std::vector output_tokens; BitSet inputs_done; BitSet outputs_done; std::set token_pool; std::vector output_buffer_tokens; //buffer queues and ready conditions InputBufferQueues input_queues; OutputBufferQueues output_queues; //tag tracking std::vector input_tags_changed; std::vector > input_tags; Block::tag_propagation_policy_t tag_prop_policy; //interruptible thread stuff bool interruptible_work; SharedThreadGroup thread_group; boost::shared_ptr interruptible_thread; //work helpers int work_ret; inline void task_work(void) { this->work_ret = block_ptr->work(this->input_items, this->output_items); } //is the fg running? enum { BLOCK_STATE_INIT, BLOCK_STATE_LIVE, BLOCK_STATE_DONE, } block_state; long buffer_affinity; std::vector > output_allocation_hints; //rate settings bool enable_fixed_rate; double relative_rate; bool forecast_enable; bool topology_init; }; } //namespace gras #endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP*/