CAF 0.17.6
|
Manages a single stream with any number of in- and outbound paths. More...
#include <stream_manager.hpp>
Public Types | |
using | inbound_paths_list = std::vector< inbound_path * > |
Public Member Functions | |
stream_manager (scheduled_actor *selfptr, stream_priority prio=stream_priority::normal) | |
virtual void | handle (inbound_path *from, downstream_msg::batch &x) |
virtual void | handle (inbound_path *from, downstream_msg::close &x) |
virtual void | handle (inbound_path *from, downstream_msg::forced_close &x) |
virtual bool | handle (stream_slots, upstream_msg::ack_open &x) |
virtual void | handle (stream_slots slots, upstream_msg::ack_batch &x) |
virtual void | handle (stream_slots slots, upstream_msg::drop &x) |
virtual void | handle (stream_slots slots, upstream_msg::forced_drop &x) |
virtual void | stop (error reason=none) |
Closes all output and input paths and sends the final result to the client. | |
virtual void | shutdown () |
Mark this stream as shutting down, only allowing flushing all related buffers of in- and outbound paths. | |
void | advance () |
Tries to advance the stream by generating more credit or by sending batches. | |
virtual void | push () |
Pushes new data to downstream actors by sending batches. | |
virtual bool | congested () const noexcept |
Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own. | |
virtual void | deliver_handshake (response_promise &rp, stream_slot slot, message handshake) |
Sends a handshake to dest . | |
virtual bool | generate_messages () |
Tries to generate new messages for the stream. | |
virtual downstream_manager & | out ()=0 |
Returns the manager for downstream communication. | |
const downstream_manager & | out () const |
Returns the manager for downstream communication. | |
virtual bool | done () const =0 |
Returns whether the manager has reached the end and can be discarded safely. | |
virtual bool | idle () const noexcept=0 |
Returns whether the manager cannot make any progress on its own at the moment. | |
virtual void | cycle_timeout (size_t cycle_nr) |
Advances time. | |
virtual void | register_input_path (inbound_path *x) |
Informs the manager that a new input path opens. | |
virtual void | deregister_input_path (inbound_path *x) noexcept |
Informs the manager that an input path closes. | |
virtual void | remove_input_path (stream_slot slot, error reason, bool silent) |
Removes an input path. | |
bool | running () const noexcept |
Returns whether this stream is neither shutting down nor has stopped. | |
bool | continuous () const noexcept |
Returns whether this stream remains open even if no in- or outbound paths exist. | |
void | continuous (bool x) noexcept |
Sets whether this stream remains open even if no in- or outbound paths exist. | |
const inbound_paths_list & | inbound_paths () const noexcept |
Returns the list of inbound paths. | |
inbound_path * | get_inbound_path (stream_slot x) const noexcept |
Returns the inbound paths at slot x . | |
bool | inbound_paths_idle () const noexcept |
Queries whether all inbound paths are up-to-date and have non-zero credit. | |
scheduled_actor * | self () |
Returns the parent actor. | |
virtual int32_t | acquire_credit (inbound_path *path, int32_t desired) |
Acquires credit on an inbound path. | |
stream_slot | add_unchecked_inbound_path_impl (rtti_pair rtti) |
Adds the current sender as an inbound path. | |
![]() | |
ref_counted (const ref_counted &) | |
ref_counted & | operator= (const ref_counted &) |
void | ref () const noexcept |
Increases reference count by one. | |
void | deref () const noexcept |
Decreases reference count by one and calls request_deletion when it drops to zero. | |
bool | unique () const noexcept |
Queries whether there is exactly one reference. | |
size_t | get_reference_count () const noexcept |
![]() | |
virtual void | request_deletion (bool decremented_rc) const noexcept |
Default implementations calls `delete this, but can be overriden in case deletion depends on some condition or the class doesn't use default new/delete. | |
Static Public Attributes | |
static constexpr int | is_continuous_flag = 0x0001 |
Configures whether this stream shall remain open even if no in- or outbound paths exist. | |
static constexpr int | is_shutting_down_flag = 0x0002 |
Denotes whether the stream is about to stop, only sending buffered elements. | |
static constexpr int | is_stopped_flag = 0x0004 |
Denotes whether the manager has stopped. | |
Protected Member Functions | |
stream_slot | assign_next_slot () |
stream_slot | assign_next_pending_slot () |
virtual void | finalize (const error &reason) |
virtual void | input_closed (error reason) |
Called when in().closed() changes to true . | |
virtual void | downstream_demand (outbound_path *ptr, long demand) |
Called whenever new credit becomes available. | |
virtual void | output_closed (error reason) |
Called when out().closed() changes to true . | |
Protected Attributes | |
scheduled_actor * | self_ |
Points to the parent actor. | |
inbound_paths_list | inbound_paths_ |
Stores non-owning pointers to all input paths. | |
long | pending_handshakes_ |
Keeps track of pending handshakes. | |
stream_priority | priority_ |
Configures the importance of outgoing traffic. | |
int | flags_ |
Stores individual flags, for continuous streaming or when shutting down. | |
![]() | |
std::atomic< size_t > | rc_ |
Related Symbols | |
(Note that these are not member symbols.) | |
using | stream_manager_ptr = intrusive_ptr< stream_manager > |
A reference counting pointer to a stream_manager . | |
![]() | |
void | intrusive_ptr_add_ref (const ref_counted *p) |
void | intrusive_ptr_release (const ref_counted *p) |
Manages a single stream with any number of in- and outbound paths.
|
virtual |
Acquires credit on an inbound path.
The calculated credit to fill our queue fro two cycles is desired
, but the manager is allowed to return any non-negative value.
stream_slot caf::stream_manager::add_unchecked_inbound_path_impl | ( | rtti_pair | rtti | ) |
Adds the current sender as an inbound path.
open_stream_msg
.
|
noexcept |
Returns whether this stream remains open even if no in- or outbound paths exist.
The default is false
. Does not keep a source alive past the point where its driver returns done() == true
.
|
virtual |
Sends a handshake to dest
.
dest != nullptr
|
virtualnoexcept |
Informs the manager that an input path closes.
inbound_path
. Returns whether the manager has reached the end and can be discarded safely.
Implemented in caf::detail::stream_distribution_tree< Policy >.
|
protectedvirtual |
Called whenever new credit becomes available.
The default implementation logs an error (sources are expected to override this hook).
Tries to generate new messages for the stream.
This member function does nothing on stages and sinks, but can trigger a source to produce more messages.
Returns whether the manager cannot make any progress on its own at the moment.
For example, a source is idle if it has filled its output buffer and there isn't any credit left.
Implemented in caf::detail::stream_distribution_tree< Policy >.
|
noexcept |
Queries whether all inbound paths are up-to-date and have non-zero credit.
A sink is idle if this function returns true
.
Called when in().closed()
changes to true
.
The default implementation does nothing.
|
pure virtual |
Returns the manager for downstream communication.
Implemented in caf::detail::stream_distribution_tree< Policy >.
Called when out().closed()
changes to true
.
The default implementation does nothing.
Pushes new data to downstream actors by sending batches.
The amount of pushed data is limited by the available credit.
|
virtual |
Informs the manager that a new input path opens.
inbound_path
. Denotes whether the manager has stopped.
Calling member functions such as stop() or abort() on it no longer has any effect.