dataqueue.cc

Go to the documentation of this file.
00001 ///
00002 /// \file       dataqueue.cc
00003 ///             FIFO queue of Data objects
00004 ///
00005 
00006 /*
00007     Copyright (C) 2008-2012, Net Direct Inc. (http://www.netdirect.ca/)
00008 
00009     This program is free software; you can redistribute it and/or modify
00010     it under the terms of the GNU General Public License as published by
00011     the Free Software Foundation; either version 2 of the License, or
00012     (at your option) any later version.
00013 
00014     This program is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00017 
00018     See the GNU General Public License in the COPYING file at the
00019     root directory of this project for more details.
00020 */
00021 
00022 #include "dataqueue.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "time.h"
00026 #include <iostream>
00027 
00028 using namespace std;
00029 
00030 namespace Barry {
00031 
00032 //////////////////////////////////////////////////////////////////////////////
00033 // DataQueue class
00034 
00035 DataQueue::DataQueue()
00036 {
00037         pthread_mutex_init(&m_waitMutex, NULL);
00038         pthread_cond_init(&m_waitCond, NULL);
00039 
00040         pthread_mutex_init(&m_accessMutex, NULL);
00041 }
00042 
00043 DataQueue::~DataQueue()
00044 {
00045         scoped_lock lock(m_accessMutex);        // FIXME - is this sane?
00046 
00047         while( m_queue.size() ) {
00048                 delete raw_pop();
00049         }
00050 }
00051 
00052 // a push without locking - adds to the back
00053 void DataQueue::raw_push(Data *data)
00054 {
00055         try {
00056                 m_queue.push_back(data);
00057         }
00058         catch(...) {
00059                 delete data;
00060                 throw;
00061         }
00062 }
00063 
00064 // a pop without locking - removes from the front, and returns it
00065 Data* DataQueue::raw_pop()
00066 {
00067         if( m_queue.size() == 0 )
00068                 return 0;
00069 
00070         Data *ret = m_queue.front();
00071         m_queue.pop_front();
00072         return ret;
00073 }
00074 
00075 //
00076 // push
00077 //
00078 /// Pushes data into the end of the queue.
00079 ///
00080 /// The queue owns this pointer as soon as the function is
00081 /// called.  In the case of an exception, it will be freed.
00082 /// Performs a thread broadcast once new data has been added.
00083 ///
00084 void DataQueue::push(Data *data)
00085 {
00086         {
00087                 scoped_lock lock(m_accessMutex);
00088                 raw_push(data);
00089         }
00090 
00091         // on success, signal
00092         scoped_lock wait(m_waitMutex);
00093         pthread_cond_broadcast(&m_waitCond);
00094 }
00095 
00096 //
00097 // pop
00098 //
00099 /// Pops the next element off the front of the queue.
00100 ///
00101 /// Returns 0 if empty.
00102 /// The queue no longer owns this pointer upon return.
00103 ///
00104 Data* DataQueue::pop()
00105 {
00106         scoped_lock lock(m_accessMutex);
00107         return raw_pop();
00108 }
00109 
00110 //
00111 // wait_pop
00112 //
00113 /// Pops the next element off the front of the queue, and
00114 /// waits until one exists if empty.  If still no data
00115 /// on timeout, returns null.
00116 /// (unlock the access mutex while waiting!)
00117 ///
00118 /// Timeout specified in milliseconds.  Default is wait forever.
00119 ///
00120 Data* DataQueue::wait_pop(int timeout)
00121 {
00122         // check if something's there already
00123         {
00124                 scoped_lock access(m_accessMutex);
00125                 if( m_queue.size() ) {
00126                         return raw_pop();
00127                 }
00128         }
00129 
00130         // nothing there, so wait...
00131 
00132         if( timeout == -1 ) {
00133                 // no timeout
00134                 int size = 0;
00135                 do {
00136                         {
00137                                 scoped_lock wait(m_waitMutex);
00138                                 pthread_cond_wait(&m_waitCond, &m_waitMutex);
00139                         }
00140 
00141                         // anything there?
00142                         scoped_lock access(m_accessMutex);
00143                         size = m_queue.size();
00144                         if( size != 0 ) {
00145                                 // already have the lock, return now
00146                                 return raw_pop();
00147                         }
00148 
00149                 } while( size == 0 );
00150         }
00151         else {
00152                 // timeout in conditional wait
00153                 struct timespec to;
00154                 scoped_lock wait(m_waitMutex);
00155                 pthread_cond_timedwait(&m_waitCond, &m_waitMutex,
00156                         ThreadTimeout(timeout, &to));
00157         }
00158 
00159         scoped_lock access(m_accessMutex);
00160         return raw_pop();
00161 }
00162 
00163 //
00164 // append_from
00165 //
00166 /// Pops all data from other and appends it to this.
00167 ///
00168 /// After calling this function, other will be empty, and
00169 /// this will contain all its data.
00170 ///
00171 /// In the case of an exception, any uncopied data will
00172 /// remain in other.
00173 ///
00174 /// This is a locking optimization, so all copying can happen
00175 /// inside one lock, instead of locking for each copy.
00176 ///
00177 void DataQueue::append_from(DataQueue &other)
00178 {
00179         scoped_lock us(m_accessMutex);
00180         scoped_lock them(other.m_accessMutex);
00181 
00182         while( other.m_queue.size() ) {
00183                 raw_push( other.m_queue.front() );
00184 
00185                 // only pop after the copy, since in the
00186                 // case of an exception we want to leave other intact
00187                 other.raw_pop();
00188         }
00189 }
00190 
00191 //
00192 // empty
00193 //
00194 /// Returns true if the queue is empty.
00195 ///
00196 bool DataQueue::empty() const
00197 {
00198         scoped_lock access(m_accessMutex);
00199         return m_queue.size() == 0;
00200 }
00201 
00202 //
00203 // size
00204 //
00205 /// Returns number of items in the queue.
00206 ///
00207 size_t DataQueue::size() const
00208 {
00209         scoped_lock access(m_accessMutex);
00210         return m_queue.size();
00211 }
00212 
00213 void DataQueue::DumpAll(std::ostream &os) const
00214 {
00215         // queue is pushed to the back, and popped from the front
00216         // (see raw_() functions) so this iterator direction will
00217         // print the packets in the order they arrived
00218         scoped_lock access(m_accessMutex);
00219         queue_type::const_iterator b = m_queue.begin(), e = m_queue.end();
00220         for( ; b != e; ++b ) {
00221                 os << **b << endl;
00222         }
00223 }
00224 
00225 } // namespace Barry
00226 
00227 
00228 #ifdef __DQ_TEST_MODE__
00229 
00230 #include <iostream>
00231 
00232 using namespace std;
00233 using namespace Barry;
00234 
00235 void *WriteThread(void *userdata)
00236 {
00237         DataQueue *dq = (DataQueue*) userdata;
00238 
00239         dq->push( new Data );
00240         dq->push( new Data );
00241         sleep(5);
00242         dq->push( new Data );
00243 
00244         return 0;
00245 }
00246 
00247 void *ReadThread(void *userdata)
00248 {
00249         DataQueue *dq = (DataQueue*) userdata;
00250 
00251         sleep(1);
00252         if( Data *d = dq->pop() ) {
00253                 cout << "Received via pop: " << d << endl;
00254                 delete d;
00255         }
00256         else {
00257                 cout << "No data in the queue yet." << endl;
00258         }
00259 
00260         while( Data *d = dq->wait_pop(5010) ) {
00261                 cout << "Received: " << d << endl;
00262                 delete d;
00263         }
00264         return 0;
00265 }
00266 
00267 int main()
00268 {
00269         DataQueue from;
00270         from.push( new Data );
00271 
00272         DataQueue dq;
00273         dq.append_from(from);
00274 
00275         pthread_t t1, t2;
00276         pthread_create(&t1, NULL, &ReadThread, &dq);
00277         pthread_create(&t2, NULL, &WriteThread, &dq);
00278 
00279         pthread_join(t2, NULL);
00280         pthread_join(t1, NULL);
00281 }
00282 
00283 #endif
00284