router.cc

Go to the documentation of this file.
00001 ///
00002 /// \file       router.cc
00003 ///             Support classes for the pluggable socket routing system.
00004 ///
00005 
00006 /*
00007     Copyright (C) 2008-2012, Net Direct Inc. (http://www.netdirect.ca/)
00008 
00009     This program is free software; you can redistribute it and/or modify
00010     it under the terms of the GNU General Public License as published by
00011     the Free Software Foundation; either version 2 of the License, or
00012     (at your option) any later version.
00013 
00014     This program is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00017 
00018     See the GNU General Public License in the COPYING file at the
00019     root directory of this project for more details.
00020 */
00021 
00022 #include "router.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "protostructs.h"
00026 #include "protocol.h"
00027 #include "usbwrap.h"
00028 #include "endian.h"
00029 #include "debug.h"
00030 #include <unistd.h>
00031 #include <iostream>
00032 #include <iomanip>
00033 
00034 using namespace std;
00035 
00036 namespace Barry {
00037 
00038 ///////////////////////////////////////////////////////////////////////////////
00039 // SocketDataHandler default methods
00040 
00041 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
00042 {
00043         // Just log the error
00044         eout("SocketDataHandler: Error: " << error.what());
00045         (void) error;
00046 }
00047 
00048 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
00049 {
00050         // Nothing to destroy
00051 }
00052 
00053 ///////////////////////////////////////////////////////////////////////////////
00054 // SocketRoutingQueue constructors
00055 
00056 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count,
00057                                         int default_read_timeout)
00058         : m_dev(0)
00059         , m_writeEp(0)
00060         , m_readEp(0)
00061         , m_interest(false)
00062         , m_seen_usb_error(false)
00063         , m_timeout(default_read_timeout)
00064         , m_continue_reading(false)
00065 {
00066         pthread_mutex_init(&m_mutex, NULL);
00067 
00068         pthread_mutex_init(&m_readwaitMutex, NULL);
00069         pthread_cond_init(&m_readwaitCond, NULL);
00070 
00071         AllocateBuffers(prealloc_buffer_count);
00072 }
00073 
00074 SocketRoutingQueue::~SocketRoutingQueue()
00075 {
00076         // thread running?
00077         if( m_continue_reading ) {
00078                 m_continue_reading = false;
00079                 pthread_join(m_usb_read_thread, NULL);
00080         }
00081 
00082         // dump all unused packets to debug output
00083         SocketQueueMap::const_iterator b = m_socketQueues.begin();
00084         for( ; b != m_socketQueues.end(); ++b ) {
00085                 DumpSocketQueue(b->first, b->second->m_queue);
00086         }
00087         if( m_default.size() ) {
00088                 ddout("(Default queue is socket 0)");
00089                 DumpSocketQueue(0, m_default);
00090         }
00091 }
00092 
00093 ///////////////////////////////////////////////////////////////////////////////
00094 // protected members
00095 
00096 //
00097 // ReturnBuffer
00098 //
00099 /// Provides a method of returning a buffer to the free queue
00100 /// after processing.  The DataHandle class calls this automatically
00101 /// from its destructor.
00102 void SocketRoutingQueue::ReturnBuffer(Data *buf)
00103 {
00104         // don't need to lock here, since m_free handles its own locking
00105         m_free.push(buf);
00106 }
00107 
00108 //
00109 // QueuePacket
00110 //
00111 /// Helper function to add a buffer to a socket queue.
00112 /// Returns false if no queue is available for that socket.
00113 //// Also empties the DataHandle on success.
00114 ///
00115 bool SocketRoutingQueue::QueuePacket(SocketId socket, DataHandle &buf)
00116 {
00117         if( m_interest ) {
00118                 // lock so we can access the m_socketQueues map safely
00119                 scoped_lock lock(m_mutex);
00120 
00121                 // search for registration of socket
00122                 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00123                 if( qi != m_socketQueues.end() ) {
00124                         qi->second->m_queue.push(buf.release());
00125                         return true;
00126                 }
00127         }
00128 
00129         return false;
00130 }
00131 
00132 bool SocketRoutingQueue::QueuePacket(DataQueue &queue, DataHandle &buf)
00133 {
00134         // don't need to lock here, since queue handles its own locking
00135         queue.push(buf.release());
00136         return true;
00137 }
00138 
00139 //
00140 // RouteOrQueuePacket
00141 //
00142 /// Same as QueuePacket, except sends the data to the callback if
00143 /// a callback is available.
00144 ///
00145 /// This function duplicates code from QueuePacket(), in order to
00146 /// optimize the mutex locking.
00147 ///
00148 bool SocketRoutingQueue::RouteOrQueuePacket(SocketId socket, DataHandle &buf)
00149 {
00150         // search for registration of socket
00151         if( m_interest ) {
00152                 // lock so we can access the m_socketQueues map safely
00153                 scoped_lock lock(m_mutex);
00154 
00155                 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00156                 if( qi != m_socketQueues.end() ) {
00157                         SocketDataHandlerPtr &sdh = qi->second->m_handler;
00158 
00159                         // is there a handler?
00160                         if( sdh ) {
00161                                 // unlock & let the handler process it
00162                                 lock.unlock();
00163                                 sdh->DataReceived(*buf.get());
00164 
00165                                 // no exceptions thrown, clear the
00166                                 // DataHandle, sending packet back to its
00167                                 // free list
00168                                 buf.reset();
00169                                 return true;
00170                         }
00171                         else {
00172                                 qi->second->m_queue.push(buf.release());
00173                                 return true;
00174                         }
00175                 }
00176         }
00177 
00178         return false;
00179 }
00180 
00181 //
00182 // SimpleReadThread()
00183 //
00184 /// Convenience thread to handle USB read activity.
00185 ///
00186 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
00187 {
00188         SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
00189 
00190         // read from USB and write to stdout until finished
00191         q->m_seen_usb_error = false;
00192         while( q->m_continue_reading ) {
00193                 try {
00194                         q->DoRead(1000);        // timeout in milliseconds
00195                 }
00196                 catch (std::runtime_error const &e) {
00197                         eout("SimpleReadThread received uncaught exception: " <<  typeid(e).name() << " what: " << e.what());
00198                 }
00199                 catch (...) {
00200                         eout("SimpleReadThread recevied uncaught exception of unknown type");
00201                 }
00202         }
00203         return 0;
00204 }
00205 
00206 void SocketRoutingQueue::DumpSocketQueue(SocketId socket, const DataQueue &dq)
00207 {
00208         // dump a record of any unused packets in the queue, for debugging
00209         if( dq.size() ) {
00210                 ddout("SocketRoutingQueue Leftovers: "
00211                         << dec << dq.size()
00212                         << " packet(s) for socket 0x"
00213                         << hex << (unsigned int) socket
00214                         << "\n"
00215                         << dq);
00216         }
00217 }
00218 
00219 
00220 ///////////////////////////////////////////////////////////////////////////////
00221 // public API
00222 
00223 // These functions connect the router to an external Usb::Device
00224 // object.  Normally this is handled automatically by the
00225 // Controller class, but are public here in case they are needed.
00226 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
00227                                         SocketDataHandlerPtr callback)
00228 {
00229         scoped_lock lock(m_mutex);
00230         m_dev = dev;
00231         m_usb_error_dev_callback = callback;
00232         m_writeEp = writeEp;
00233         m_readEp = readEp;
00234 }
00235 
00236 void SocketRoutingQueue::ClearUsbDevice()
00237 {
00238         scoped_lock lock(m_mutex);
00239         m_dev = 0;
00240         m_usb_error_dev_callback.reset();
00241         lock.unlock();
00242 
00243         // wait for the DoRead cycle to finish, so the external
00244         // Usb::Device object doesn't close before we're done with it
00245         scoped_lock wait(m_readwaitMutex);
00246         pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
00247 }
00248 
00249 bool SocketRoutingQueue::UsbDeviceReady()
00250 {
00251         scoped_lock lock(m_mutex);
00252         return m_dev != 0 && !m_seen_usb_error;
00253 }
00254 
00255 //
00256 // AllocateBuffers
00257 //
00258 /// This class starts out with no buffers, and will grow one buffer
00259 /// at a time if needed.  Call this to allocate count buffers
00260 /// all at once and place them on the free queue.  After calling
00261 /// this function, at least count buffers will exist in the free
00262 /// queue.  If there are already count buffers, none will be added.
00263 ///
00264 void SocketRoutingQueue::AllocateBuffers(int count)
00265 {
00266         int todo = count - m_free.size();
00267 
00268         for( int i = 0; i < todo; i++ ) {
00269                 // m_free handles its own locking
00270                 m_free.push( new Data );
00271         }
00272 }
00273 
00274 //
00275 // DefaultRead (both variations)
00276 //
00277 /// Returns the data for the next unregistered socket.
00278 /// Blocks until timeout or data is available.
00279 /// Returns false (or null pointer) on timeout and no data.
00280 /// With the return version of the function, there is no
00281 /// copying performed.
00282 ///
00283 /// This version performs a copy.
00284 ///
00285 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
00286 {
00287         DataHandle buf = DefaultRead(timeout);
00288         if( !buf.get() )
00289                 return false;
00290 
00291         // copy to desired buffer
00292         receive = *buf.get();
00293         return true;
00294 }
00295 
00296 ///
00297 /// This version does not perform a copy.
00298 ///
00299 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
00300 {
00301         if( m_seen_usb_error && timeout == -1 ) {
00302                 // If an error has been seen and not cleared then no
00303                 // more data will be read into the queue by
00304                 // DoRead(). Forcing the timeout to zero allows any
00305                 // data already in the queue to be read, but prevents
00306                 // waiting for data which will never arrive.
00307                 timeout = 0;
00308         }
00309 
00310         // m_default handles its own locking
00311         // Be careful with the queue timeout, since its -1 means "forever"
00312         Data *buf = m_default.wait_pop(timeout == -1 ? m_timeout : timeout);
00313         return DataHandle(*this, buf);
00314 }
00315 
00316 //
00317 // RegisterInterest
00318 //
00319 /// Register an interest in data from a certain socket.  To read
00320 /// from that socket, use the SocketRead() function from then on.
00321 ///
00322 /// Any non-registered socket goes in the default queue
00323 /// and must be read by DefaultRead()
00324 ///
00325 /// If not null, handler is called when new data is read.  It will
00326 /// be called in the same thread instance that DoRead() is called from.
00327 /// Handler is passed the DataQueue Data pointer, and so no
00328 /// copying is done.  Once the handler returns, the data is
00329 /// considered processed and not added to the interested queue,
00330 /// but instead returned to m_free.
00331 ///
00332 /// Throws std::logic_error if already registered.
00333 ///
00334 void SocketRoutingQueue::RegisterInterest(SocketId socket,
00335                                           SocketDataHandlerPtr handler)
00336 {
00337         // modifying our own std::map, need a lock
00338         scoped_lock lock(m_mutex);
00339 
00340         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00341         if( qi != m_socketQueues.end() )
00342                 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
00343 
00344         m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
00345         m_interest = true;
00346 }
00347 
00348 //
00349 // UnregisterInterest
00350 //
00351 /// Unregisters interest in data from the given socket, and discards
00352 /// any existing data in its interest queue.  Any new incoming data
00353 /// for this socket will be placed in the default queue.
00354 ///
00355 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
00356 {
00357         // modifying our own std::map, need a lock
00358         scoped_lock lock(m_mutex);
00359 
00360         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00361         if( qi == m_socketQueues.end() )
00362                 return; // nothing registered, done
00363 
00364         // dump a record of any unused packets in the queue, for debugging
00365         DumpSocketQueue(qi->first, qi->second->m_queue);
00366 
00367         // salvage all our data buffers
00368         m_free.append_from( qi->second->m_queue );
00369 
00370         // remove the QueueEntryPtr from the map
00371         m_socketQueues.erase( qi );
00372 
00373         // check the interest flag
00374         m_interest = m_socketQueues.size() > 0;
00375 }
00376 
00377 //
00378 // SocketRead
00379 //
00380 /// Reads data from the interested socket cache.  Can only read
00381 /// from sockets that have been previously registered.
00382 ///
00383 /// Blocks until timeout or data is available.
00384 ///
00385 /// Returns false (or null pointer) on timeout and no data.
00386 /// With the return version of the function, there is no
00387 /// copying performed.
00388 ///
00389 /// Throws std::logic_error if a socket was requested that was
00390 /// not previously registered.
00391 ///
00392 /// Copying is performed with this function.
00393 ///
00394 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
00395 {
00396         DataHandle buf = SocketRead(socket, timeout);
00397         if( !buf.get() )
00398                 return false;
00399 
00400         // copy to desired buffer
00401         receive = *buf.get();
00402         return true;
00403 }
00404 
00405 ///
00406 /// Copying is not performed with this function.
00407 ///
00408 /// Throws std::logic_error if a socket was requested that was
00409 /// not previously registered.
00410 ///
00411 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
00412 {
00413         QueueEntryPtr qep;
00414         DataQueue *dq = 0;
00415 
00416         // accessing our own std::map, need a lock
00417         {
00418                 scoped_lock lock(m_mutex);
00419                 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00420                 if( qi == m_socketQueues.end() )
00421                         throw std::logic_error("SocketRead requested data from unregistered socket.");
00422 
00423                 // got our queue, save the whole QueueEntryPtr (shared_ptr),
00424                 // and unlock, since we will be waiting on the DataQueue,
00425                 // not the socketQueues map
00426                 //
00427                 // This is safe, since even if UnregisterInterest is called,
00428                 // our pointer won't be deleted until our shared_ptr
00429                 // (QueueEntryPtr) goes out of scope.
00430                 //
00431                 // The remaining problem is that wait_pop() might wait
00432                 // forever if there is no timeout... c'est la vie.
00433                 // Should'a used a timeout. :-)
00434                 qep = qi->second;
00435                 dq = &qep->m_queue;
00436         }
00437 
00438         // get data from DataQueue
00439         // Be careful with the queue timeout, since its -1 means "forever"
00440         Data *buf = dq->wait_pop(timeout == -1 ? m_timeout : timeout);
00441 
00442         // specifically delete our copy of shared pointer, in a locked
00443         // environment
00444         {
00445                 scoped_lock lock(m_mutex);
00446                 qep.reset();
00447         }
00448 
00449         return DataHandle(*this, buf);
00450 }
00451 
00452 // Returns true if data is available for that socket.
00453 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
00454 {
00455         scoped_lock lock(m_mutex);
00456         SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
00457         if( qi == m_socketQueues.end() )
00458                 return false;
00459         return qi->second->m_queue.size() > 0;
00460 }
00461 
00462 //
00463 // DoRead
00464 //
00465 /// Called by the application's "read thread" to read the next usb
00466 /// packet and route it to the correct queue.  Returns after every
00467 /// read, even if a handler is associated with a queue.
00468 /// Note: this function is safe to call before SetUsbDevice() is
00469 /// called... it just doesn't do anything if there is no usb
00470 /// device to work with.
00471 ///
00472 /// Timeout is in milliseconds.
00473 //  This timeout is for the USB subsystem, so no special handling
00474 //  for it is needed... just use usbwrap's default timeout.
00475 void SocketRoutingQueue::DoRead(int timeout)
00476 {
00477         class ReadWaitSignal
00478         {
00479                 pthread_mutex_t &m_Mutex;
00480                 pthread_cond_t &m_Cond;
00481         public:
00482                 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
00483                         : m_Mutex(mut), m_Cond(cond)
00484                         {}
00485                 ~ReadWaitSignal()
00486                 {
00487                         scoped_lock wait(m_Mutex);
00488                         pthread_cond_signal(&m_Cond);
00489                 }
00490         } readwait(m_readwaitMutex, m_readwaitCond);
00491 
00492         Usb::Device * volatile dev = 0;
00493         int readEp;
00494         DataHandle buf(*this, 0);
00495 
00496         // if we are not connected to a USB device yet, just wait
00497         {
00498                 scoped_lock lock(m_mutex);
00499 
00500                 if( !m_dev || m_seen_usb_error ) {
00501                         lock.unlock();  // unlock early, since we're sleeping
00502                         // sleep only a short time, since things could be
00503                         // in the process of setup or teardown
00504                         usleep(125000);
00505                         return;
00506                 }
00507 
00508                 dev = m_dev;
00509                 readEp = m_readEp;
00510 
00511                 // fetch a free buffer
00512                 Data *raw = m_free.pop();
00513                 if( !raw )
00514                         buf = DataHandle(*this, new Data);
00515                 else
00516                         buf = DataHandle(*this, raw);
00517         }
00518 
00519         // take a chance and do the read unlocked, as this has the potential
00520         // for blocking for a while
00521         try {
00522 
00523                 Data &data = *buf.get();
00524 
00525                 if( !dev->BulkRead(readEp, data, timeout) )
00526                         return; // no data, done!
00527 
00528                 MAKE_PACKET(pack, data);
00529 
00530                 // make sure the size is right
00531                 if( data.GetSize() < SB_PACKET_SOCKET_SIZE )
00532                         return; // bad size, just skip
00533 
00534                 // extract the socket from the packet
00535                 uint16_t socket = btohs(pack->socket);
00536 
00537                 // if this is a sequence packet, handle it specially
00538                 if( Protocol::IsSequencePacket(data) ) {
00539                         // sequence.socket is a single byte
00540                         socket = pack->u.sequence.socket;
00541 
00542                         //////////////////////////////////////////////
00543                         // ALWAYS queue sequence packets, so that
00544                         // the socket code can handle SyncSend()
00545                         if( !QueuePacket(socket, buf) ) {
00546                                 // if no queue available for this
00547                                 // socket, send it to the default
00548                                 // queue
00549                                 QueuePacket(m_default, buf);
00550                         }
00551 
00552                         // done with sequence packet
00553                         return;
00554                 }
00555 
00556                 // we have data, now route or queue it
00557                 if( RouteOrQueuePacket(socket, buf) )
00558                         return; // done
00559 
00560                 // if we get here, send to default queue
00561                 QueuePacket(m_default, buf);
00562         }
00563         catch( Usb::Timeout & ) {
00564                 // this is expected... just ignore
00565         }
00566         catch( Usb::Error &ue ) {
00567                 // set the flag first, in case any of the handlers
00568                 // are able to recover from this error
00569                 m_seen_usb_error = true;
00570 
00571                 // this is unexpected, but we're in a thread here...
00572                 // Need to iterate through all the registered handlers
00573                 // calling their error callback.
00574                 // Can't be locked when calling the callback, so need
00575                 // to make a list of them first.
00576                 scoped_lock lock(m_mutex);
00577                 std::vector<SocketDataHandlerPtr> handlers;
00578                 SocketQueueMap::iterator qi = m_socketQueues.begin();
00579                 while( qi != m_socketQueues.end() ) {
00580                         SocketDataHandlerPtr &sdh = qi->second->m_handler;
00581                         // is there a handler?
00582                         if( sdh ) {
00583                                 handlers.push_back(sdh);
00584                         }
00585                         ++qi;
00586                 }
00587 
00588                 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
00589 
00590                 lock.unlock();
00591                 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
00592                 while( hi != handlers.end() ) {
00593                         (*hi)->Error(ue);
00594                         ++hi;
00595                 }
00596 
00597                 // and finally, call the specific error callback if available
00598                 if( usb_error_handler.get() ) {
00599                         usb_error_handler->Error(ue);
00600                 }
00601         }
00602 }
00603 
00604 void SocketRoutingQueue::SpinoffSimpleReadThread()
00605 {
00606         // signal that it's ok to run inside the thread
00607         if( m_continue_reading )
00608                 return; // already running
00609         m_continue_reading = true;
00610 
00611         // Start USB read thread, to handle all routing
00612         int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
00613         if( ret ) {
00614                 m_continue_reading = false;
00615                 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
00616         }
00617 }
00618 
00619 } // namespace Barry
00620