#include <ace/Message_Queue_T.h>
template<ACE_SYNCH_DECL> class ACE_Message_Queue : public ACE_Message_Queue_Base {
public:
friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> ITERATOR;
typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> REVERSE_ITERATOR;ACE_Message_Queue ( size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy * = 0 );
virtual int open ( size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy * = 0 );
virtual int close (void);
virtual ~ACE_Message_Queue (void);
virtual int peek_dequeue_head ( ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0 );
virtual int enqueue_prio ( ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0 );
virtual int enqueue ( ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0 );
virtual int enqueue_tail ( ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0 );
virtual int enqueue_head ( ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0 );
virtual int dequeue ( ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0 );
virtual int dequeue_head ( ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0 );
virtual int is_full (void);
virtual int is_empty (void);
virtual size_t message_bytes (void);
virtual size_t message_length (void);
virtual size_t message_count (void);
virtual void message_bytes (size_t new_size);
virtual void message_length (size_t new_length);
virtual size_t high_water_mark (void);
virtual void high_water_mark (size_t hwm);
virtual size_t low_water_mark (void);
virtual void low_water_mark (size_t lwm);
virtual int deactivate (void);
virtual int activate (void);
virtual int deactivated (void);
virtual int notify (void);
virtual ACE_Notification_Strategy *notification_strategy (void);
virtual void notification_strategy (ACE_Notification_Strategy *s);
ACE_SYNCH_MUTEX_T &lock (void);
virtual void dump (void) const;
ACE_ALLOC_HOOK_DECLARE;
protected:
virtual int enqueue_i (ACE_Message_Block *new_item);
virtual int enqueue_tail_i (ACE_Message_Block *new_item);
virtual int enqueue_head_i (ACE_Message_Block *new_item);
virtual int dequeue_head_i (ACE_Message_Block *&first_item);
virtual int is_full_i (void);
virtual int is_empty_i (void);
virtual int deactivate_i (void);
virtual int activate_i (void);
virtual int wait_not_full_cond ( ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, ACE_Time_Value *timeout );
virtual int wait_not_empty_cond ( ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, ACE_Time_Value *timeout );
virtual int signal_enqueue_waiters (void);
virtual int signal_dequeue_waiters (void);
ACE_Message_Block *head_;
ACE_Message_Block *tail_;
size_t low_water_mark_;
size_t high_water_mark_;
size_t cur_bytes_;
size_t cur_length_;
size_t cur_count_;
int deactivated_;
ACE_Notification_Strategy *notification_strategy_;
ACE_SYNCH_MUTEX_T lock_;
ACE_SYNCH_SEMAPHORE_T not_empty_cond_;
ACE_SYNCH_SEMAPHORE_T not_full_cond_;
size_t dequeue_waiters_;
size_t enqueue_waiters_;
ACE_SYNCH_CONDITION_T not_empty_cond_;
ACE_SYNCH_CONDITION_T not_full_cond_;
private:
inline ACE_UNIMPLEMENTED_FUNC ( void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &) );
};
ACE_Message_Queue
is the central queueing facility for
messages in the ASX framework. If ACE_SYNCH_DECL
is
ACE_MT_SYNCH
then all operations are thread-safe.
Otherwise, if it's ACE_NULL_SYNCH
then there's no locking
overhead.
typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> ITERATOR;
typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> REVERSE_ITERATOR;
ACE_Message_Queue (
size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy * = 0
);
ACE_Message_Queue
. The high_water_mark
determines how many bytes can be stored in a queue before it's
considered "full." Supplier threads must block until the queue
is no longer full. The low_water_mark
determines how many
bytes must be in the queue before supplier threads are allowed to
enqueue additional ACE_Message_Block
s. By default, the
high_water_mark
equals the low_water_mark
, which means that
suppliers will be able to enqueue new messages as soon as a
consumer removes any message from the queue. Making the
low_water_mark
smaller than the high_water_mark
forces
consumers to drain more messages from the queue before suppliers
can enqueue new messages, which can minimize the "silly window
syndrome."
virtual int open (
size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy * = 0
);
ACE_Message_Queue
. The high_water_mark
determines how many bytes can be stored in a queue before it's
considered "full." Supplier threads must block until the queue
is no longer full. The low_water_mark
determines how many
bytes must be in the queue before supplier threads are allowed to
enqueue additional ACE_Message_Block
s. By default, the
high_water_mark
equals the low_water_mark
, which means that
suppliers will be able to enqueue new messages as soon as a
consumer removes any message from the queue. Making the
low_water_mark
smaller than the high_water_mark
forces
consumers to drain more messages from the queue before suppliers
can enqueue new messages, which can minimize the "silly window
syndrome."
virtual int close (void);
virtual ~ACE_Message_Queue (void);
timeout
== 0,
the caller will block until action is possible, else will wait
until the absolute time specified in *timeout
elapses). These
calls will return, however, when queue is closed, deactivated,
when a signal occurs, or if the time specified in timeout
elapses, (in which case errno = EWOULDBLOCK).
virtual int peek_dequeue_head (
ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0
);
ACE_Message_Block
without removing it. Note
that timeout
uses absolute time rather than relative
time. If the timeout
elapses without receiving a message -1 is
returned and errno
is set to EWOULDBLOCK
. If the queue is
deactivated -1 is returned and errno
is set to ESHUTDOWN
.
Otherwise, returns -1 on failure, else the number of items still
on the queue.
virtual int enqueue_prio (
ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0
);
ACE_Message_Block *
into the Message_Queue
in
accordance with its msg_priority
(0 is lowest priority). FIFO
order is maintained when messages of the same priority are
inserted consecutively. Note that timeout
uses absolute
time rather than relative time. If the timeout
elapses
without receiving a message -1 is returned and errno
is set to
EWOULDBLOCK
. If the queue is deactivated -1 is returned and
errno
is set to ESHUTDOWN
. Otherwise, returns -1 on failure,
else the number of items still on the queue.
virtual int enqueue (
ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0
);
enqueue_prio
. It's only here for
backwards compatibility and will go away in a subsequent release.
Please use enqueue_prio
instead. Note that timeout
uses
absolute time rather than relative time.
virtual int enqueue_tail (
ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0
);
ACE_Message_Block *
at the end of the queue. Note
that timeout
uses absolute time rather than relative
time. If the timeout
elapses without receiving a message -1 is
returned and errno
is set to EWOULDBLOCK
. If the queue is
deactivated -1 is returned and errno
is set to ESHUTDOWN
.
Otherwise, returns -1 on failure, else the number of items still
on the queue.
virtual int enqueue_head (
ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0
);
ACE_Message_Block *
at the head of the queue. Note
that timeout
uses absolute time rather than relative
time. If the timeout
elapses without receiving a message -1 is
returned and errno
is set to EWOULDBLOCK
. If the queue is
deactivated -1 is returned and errno
is set to ESHUTDOWN
.
Otherwise, returns -1 on failure, else the number of items still
on the queue.
virtual int dequeue (
ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0
);
dequeue_head
method.
virtual int dequeue_head (
ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0
);
ACE_Message_Block *
at the head of the
queue. Note that timeout
uses absolute time rather than
relative time. If the timeout
elapses without receiving a
message -1 is returned and errno
is set to EWOULDBLOCK
. If
the queue is deactivated -1 is returned and errno
is set to
ESHUTDOWN
. Otherwise, returns -1 on failure, else the number
of items still on the queue.
virtual int is_full (void);
virtual int is_empty (void);
virtual size_t message_bytes (void);
virtual size_t message_length (void);
virtual size_t message_count (void);
virtual void message_bytes (size_t new_size);
virtual void message_length (size_t new_length);
virtual size_t high_water_mark (void);
virtual void high_water_mark (size_t hwm);
virtual size_t low_water_mark (void);
virtual void low_water_mark (size_t lwm);
ACE_Message_Block
s.
virtual int deactivate (void);
errno
==
ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
call and WAS_ACTIVE if queue was active before the call.
virtual int activate (void);
virtual int deactivated (void);
deactivated_
is enabled.
virtual int notify (void);
enqueue_head
,
enqueue_tail
, and enqueue_prio
when a new item is inserted
into the queue. Subclasses can override this method to perform
specific notification strategies (e.g., signaling events for a
WFMO_Reactor
, notifying a Reactor
, etc.). In a
multi-threaded application with concurrent consumers, there is no
guarantee that the queue will be still be non-empty by the time
the notification occurs.
= Get/set the notification strategy for the Message_Queue
virtual ACE_Notification_Strategy *notification_strategy (void);
virtual void notification_strategy (ACE_Notification_Strategy *s);
ACE_SYNCH_MUTEX_T &lock (void);
ACE_Message_Queue
.
virtual void dump (void) const;
ACE_ALLOC_HOOK_DECLARE;
ACE_Message_Queue
.
virtual int enqueue_i (ACE_Message_Block *new_item);
ACE_Message_Block *
in accordance with its priority.
virtual int enqueue_tail_i (ACE_Message_Block *new_item);
ACE_Message_Block *
at the end of the queue.
virtual int enqueue_head_i (ACE_Message_Block *new_item);
ACE_Message_Block *
at the head of the queue.
virtual int dequeue_head_i (ACE_Message_Block *&first_item);
ACE_Message_Block *
at the head of the
queue.
virtual int is_full_i (void);
virtual int is_empty_i (void);
= Implementation of the public activate
and deactivate
methods.
These methods assume locks are held.
virtual int deactivate_i (void);
virtual int activate_i (void);
virtual int wait_not_full_cond (
ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
ACE_Time_Value *timeout
);
virtual int wait_not_empty_cond (
ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
ACE_Time_Value *timeout
);
virtual int signal_enqueue_waiters (void);
virtual int signal_dequeue_waiters (void);
ACE_Message_Block *head_;
ACE_Message_Block *tail_;
size_t low_water_mark_;
size_t high_water_mark_;
size_t cur_bytes_;
size_t cur_length_;
size_t cur_count_;
int deactivated_;
ACE_Notification_Strategy *notification_strategy_;
ACE_SYNCH_MUTEX_T lock_;
ACE_SYNCH_SEMAPHORE_T not_empty_cond_;
ACE_SYNCH_SEMAPHORE_T not_full_cond_;
size_t dequeue_waiters_;
Message_Block
.
size_t enqueue_waiters_;
Message_Block
.
ACE_SYNCH_CONDITION_T not_empty_cond_;
ACE_SYNCH_CONDITION_T not_full_cond_;
inline ACE_UNIMPLEMENTED_FUNC (
void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)
);