diff options
-rw-r--r-- | usrp/host/lib/legacy/circular_buffer.h | 28 | ||||
-rw-r--r-- | usrp/host/lib/legacy/circular_linked_list.h | 17 | ||||
-rw-r--r-- | usrp/host/lib/legacy/fusb_darwin.cc | 95 | ||||
-rw-r--r-- | usrp/host/lib/legacy/mld_threads.h | 11 |
4 files changed, 114 insertions, 37 deletions
diff --git a/usrp/host/lib/legacy/circular_buffer.h b/usrp/host/lib/legacy/circular_buffer.h index fa451d607..8898e4194 100644 --- a/usrp/host/lib/legacy/circular_buffer.h +++ b/usrp/host/lib/legacy/circular_buffer.h @@ -26,7 +26,9 @@ #include "mld_threads.h" #include <stdexcept> +#ifndef DO_DEBUG #define DO_DEBUG 0 +#endif #if DO_DEBUG #define DEBUG(X) do{X} while(0); @@ -82,7 +84,7 @@ public: DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, " "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I, (d_doWriteBlock ? "true" : "false"), - (d_doFullRead ? "true" : "false"))); + (d_doFullRead ? "true" : "false"));); }; ~circular_buffer () { @@ -150,7 +152,7 @@ public: int enqueue (T* buf, UInt32 bufLen_I) { DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, " "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I, - d_n_avail_write_I, d_n_avail_read_I)); + d_n_avail_write_I, d_n_avail_read_I);); if (bufLen_I > d_bufLen_I) { fprintf (stderr, "cannot add buffer longer (%ld" ") than instantiated length (%ld" @@ -173,21 +175,21 @@ public: if (bufLen_I > d_n_avail_write_I) { if (d_doWriteBlock) { while (bufLen_I > d_n_avail_write_I) { - DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n")); + DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n");); // wait will automatically unlock() the internal mutex d_writeBlock->wait (); // and lock() it here. if (d_doAbort) { d_internal->unlock (); - DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n")); + DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n");); return (2); } - DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n")); + DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n");); } } else { d_n_avail_read_I = d_bufLen_I - bufLen_I; d_n_avail_write_I = bufLen_I; - DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n")); + DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n");); retval = -1; } } @@ -233,7 +235,7 @@ public: int dequeue (T* buf, UInt32* bufLen_I) { DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, " "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I, - d_n_avail_write_I, d_n_avail_read_I)); + d_n_avail_write_I, d_n_avail_read_I);); if (!bufLen_I) throw std::runtime_error ("circular_buffer::dequeue(): " "input bufLen pointer is NULL.\n"); @@ -257,29 +259,29 @@ public: } if (d_doFullRead) { while (d_n_avail_read_I < l_bufLen_I) { - DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n");); // wait will automatically unlock() the internal mutex d_readBlock->wait (); // and lock() it here. if (d_doAbort) { d_internal->unlock (); - DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n");); return (2); } - DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n");); } } else { while (d_n_avail_read_I == 0) { - DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n");); // wait will automatically unlock() the internal mutex d_readBlock->wait (); // and lock() it here. if (d_doAbort) { d_internal->unlock (); - DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n");); return (2); } - DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n");); } } if (l_bufLen_I > d_n_avail_read_I) diff --git a/usrp/host/lib/legacy/circular_linked_list.h b/usrp/host/lib/legacy/circular_linked_list.h index 14d81ac91..e495d609b 100644 --- a/usrp/host/lib/legacy/circular_linked_list.h +++ b/usrp/host/lib/legacy/circular_linked_list.h @@ -27,7 +27,10 @@ #include <stdexcept> #define __INLINE__ inline + +#ifndef DO_DEBUG #define DO_DEBUG 0 +#endif #if DO_DEBUG #define DEBUG(X) do{X} while(0); @@ -168,17 +171,17 @@ public: d_internal->lock (); // find an available node s_node_ptr l_node = d_available; - DEBUG (fprintf (stderr, "w ")); + DEBUG (fprintf (stderr, "w ");); while (! l_node) { - DEBUG (fprintf (stderr, "x\n")); + DEBUG (fprintf (stderr, "x\n");); // the ioBlock condition will automatically unlock() d_internal d_ioBlock->wait (); // and lock() is here - DEBUG (fprintf (stderr, "y\n")); + DEBUG (fprintf (stderr, "y\n");); l_node = d_available; } DEBUG (fprintf (stderr, "::f_n_a_n: #u = %ld, node = %p\n", - num_used(), l_node)); + num_used(), l_node);); // remove this one from the current available list if (num_available () == 1) { // last one, just set available to NULL @@ -201,7 +204,7 @@ public: if (!l_node) return; d_internal->lock (); DEBUG (fprintf (stderr, "::m_n_a: #u = %ld, node = %p\n", - num_used(), l_node)); + num_used(), l_node);); // remove this node from the inUse list if (num_used () == 1) { // last one, just set inUse to NULL @@ -216,10 +219,10 @@ public: l_node->insert_before (d_available); d_n_used--; - DEBUG (fprintf (stderr, "s%ld ", d_n_used)); + DEBUG (fprintf (stderr, "s%ld ", d_n_used);); // signal the condition when new data arrives d_ioBlock->signal (); - DEBUG (fprintf (stderr, "t ")); + DEBUG (fprintf (stderr, "t ");); // unlock the mutex for thread safety d_internal->unlock (); diff --git a/usrp/host/lib/legacy/fusb_darwin.cc b/usrp/host/lib/legacy/fusb_darwin.cc index 05db29e80..737387b87 100644 --- a/usrp/host/lib/legacy/fusb_darwin.cc +++ b/usrp/host/lib/legacy/fusb_darwin.cc @@ -27,6 +27,7 @@ // tell mld_threads to NOT use omni_threads, // but rather Darwin's pthreads #define _USE_OMNI_THREADS_ +#define DO_DEBUG 0 #include <usb.h> #include "fusb.h" @@ -190,13 +191,18 @@ fusb_ephandle_darwin::start () d_endpoint, d_pipeRef, d_interface, d_interfaceRef, direction, number, interval, maxPacketSize); -// set global start boolean + // set global start boolean d_started = true; -// create the run thread, which allows OSX to process I/O separately + // lock the runBlock mutex, before creating the run thread. + // this guarantees that we can control execution between these 2 threads + d_runBlock->mutex ()->lock (); + + // create the run thread, which allows OSX to process I/O separately d_runThread = new mld_thread (run_thread, this); -// wait until the threads are -really- going + // wait until the run thread (and possibky read thread) are -really- + // going; this will unlock the mutex before waiting for a signal () d_runBlock->wait (); if (usb_debug) @@ -210,11 +216,16 @@ void fusb_ephandle_darwin::run_thread (void* arg) { fusb_ephandle_darwin* This = static_cast<fusb_ephandle_darwin*>(arg); + + // lock the run thread running mutex; if ::stop() is called, it will + // first abort() the pipe then wait for the run thread to finish, + // via a lock() on this mutex mld_mutex_ptr l_runThreadRunning = This->d_runThreadRunning; l_runThreadRunning->lock (); mld_mutex_ptr l_readRunning = This->d_readRunning; mld_condition_ptr l_readBlock = This->d_readBlock; + mld_mutex_ptr l_readBlock_mutex = l_readBlock->mutex (); bool l_input_p = This->d_input_p; @@ -237,24 +248,41 @@ fusb_ephandle_darwin::run_thread (void* arg) mld_thread_ptr l_rwThread = NULL; if (l_input_p) { + // lock the readBlock mutex, before creating the read thread. + // this guarantees that we can control execution between these 2 threads + l_readBlock_mutex->lock (); + // create the read thread, which just issues all of the starting + // async read commands, then returns l_rwThread = new mld_thread (read_thread, arg); -// wait until the the rwThread is -really- going + // wait until the the read thread is -really- going; this will + // unlock the read block mutex before waiting for a signal () l_readBlock->wait (); } -// now signal the run condition to release and finish ::start() + // now signal the run condition to release and finish ::start(). + + // lock the runBlock mutex first; this will force waiting until the + // ->wait() command is issued in ::start() + mld_mutex_ptr l_run_block_mutex = This->d_runBlock->mutex (); + l_run_block_mutex->lock (); + + // now that the lock is in place, signal the parent thread that + // things are running This->d_runBlock->signal (); -// run the loop + // release the run_block mutex, just in case + l_run_block_mutex->unlock (); + + // run the loop CFRunLoopRun (); if (l_input_p) { -// wait for read_thread () to finish + // wait for read_thread () to finish, if needed l_readRunning->lock (); l_readRunning->unlock (); } -// remove run loop stuff + // remove run loop stuff CFRunLoopRemoveSource (CFRunLoopGetCurrent (), l_cfSource, kCFRunLoopDefaultMode); @@ -262,6 +290,7 @@ fusb_ephandle_darwin::run_thread (void* arg) fprintf (stderr, "fusb_ephandle_darwin::run_thread: finished for %s.\n", l_input_p ? "read" : "write"); + // release the run thread running mutex l_runThreadRunning->unlock (); } @@ -273,13 +302,27 @@ fusb_ephandle_darwin::read_thread (void* arg) fusb_ephandle_darwin* This = static_cast<fusb_ephandle_darwin*>(arg); + // before doing anything else, lock the read running mutex. this + // mutex does flow control between this thread and the run_thread mld_mutex_ptr l_readRunning = This->d_readRunning; l_readRunning->lock (); -// signal the read condition from run_thread() to continue + // signal the read condition from run_thread() to continue + + // lock the readBlock mutex first; this will force waiting until the + // ->wait() command is issued in ::run_thread() mld_condition_ptr l_readBlock = This->d_readBlock; + mld_mutex_ptr l_read_block_mutex = l_readBlock->mutex (); + l_read_block_mutex->lock (); + + // now that the lock is in place, signal the parent thread that + // things are running here l_readBlock->signal (); + // release the run_block mutex, just in case + l_read_block_mutex->unlock (); + + // queue up all of the available read requests s_queue_ptr l_queue = This->d_queue; l_queue->iterate_start (); s_node_ptr l_node = l_queue->iterate_next (); @@ -291,14 +334,21 @@ fusb_ephandle_darwin::read_thread (void* arg) if (usb_debug) fprintf (stderr, "fusb_ephandle_darwin::read_thread: finished.\n"); + // release the read running mutex, to let the parent thread knows + // that this thread is finished l_readRunning->unlock (); } void fusb_ephandle_darwin::read_issue (s_both_ptr l_both) { - if ((! l_both) || (! d_started)) + if ((! l_both) || (! d_started)) { + if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::read_issue: Doing nothing; " + "l_both is %X; started is %s\n", (unsigned int) l_both, + d_started ? "TRUE" : "FALSE"); return; + } // set the node and buffer from the input "both" s_node_ptr l_node = l_both->node (); @@ -328,6 +378,9 @@ fusb_ephandle_darwin::read_issue (s_both_ptr l_both) "(ReadPipeAsync%s): %s", d_transferType == kUSBInterrupt ? "" : "TO", darwin_error_str (result)); + else if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::read_issue: " + "Queued %X (%ld Bytes)\n", (unsigned int) l_both, bufLen); } void @@ -347,6 +400,10 @@ fusb_ephandle_darwin::read_completed (void* refCon, fprintf (stderr, "fusb_ephandle_darwin::read_completed: " "Expected %ld bytes; read %ld.\n", l_i_size, l_size); + else if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::read_completed: " + "Read %X (%ld bytes)\n", + (unsigned int) l_both, l_size); // add this read to the transfer buffer if (l_buffer->enqueue (l_buf->buffer (), l_size) == -1) { @@ -378,7 +435,12 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes) { UInt32 l_nbytes = (UInt32) nbytes; - if (! d_started) return (0); + if (! d_started) { + if (usb_debug) + fprintf (stderr, "fusb_ephandle_darwin::write: Not yet started.\n"); + + return (0); + } while (l_nbytes != 0) { // find out how much data to copy; limited to "d_bufLenBytes" per node @@ -400,11 +462,11 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes) if (d_transferType == kUSBInterrupt) /* This is an interrupt pipe ... can't specify a timeout. */ result = d_interface->WritePipeAsync - (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes, + (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes, (IOAsyncCallback1) write_completed, (void*) l_both); else result = d_interface->WritePipeAsyncTO - (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes, 0, USB_TIMEOUT, + (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes, 0, USB_TIMEOUT, (IOAsyncCallback1) write_completed, (void*) l_both); if (result != kIOReturnSuccess) @@ -413,6 +475,10 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes) "(WritePipeAsync%s): %s", d_transferType == kUSBInterrupt ? "" : "TO", darwin_error_str (result)); + else if (usb_debug > 4) { + fprintf (stderr, "fusb_ephandle_darwin::write_thread: " + "Queued %X (%ld Bytes)\n", (unsigned int) l_both, t_nbytes); + } l_nbytes -= t_nbytes; } @@ -436,6 +502,9 @@ fusb_ephandle_darwin::write_completed (void* refCon, fprintf (stderr, "fusb_ephandle_darwin::write_completed: " "Expected %ld bytes written; wrote %ld.\n", l_i_size, l_size); + else if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::write_completed: " + "Wrote %X (%ld Bytes)\n", (unsigned int) l_both, l_size); // set buffer's # data to 0 l_buf->n_used (0); diff --git a/usrp/host/lib/legacy/mld_threads.h b/usrp/host/lib/legacy/mld_threads.h index a59a92863..b2ec65751 100644 --- a/usrp/host/lib/legacy/mld_threads.h +++ b/usrp/host/lib/legacy/mld_threads.h @@ -37,7 +37,10 @@ #include <stdexcept> #define __INLINE__ inline + +#ifndef DO_DEBUG #define DO_DEBUG 0 +#endif #if DO_DEBUG #define DEBUG(X) do{X} while(0); @@ -182,7 +185,7 @@ public: __INLINE__ mld_mutex_ptr mutex () {return (d_mutex);}; __INLINE__ void signal () { - DEBUG (fprintf (stderr, "a ")); + DEBUG (fprintf (stderr, "a ");); #ifdef _USE_OMNI_THREADS_ d_condition->signal (); @@ -193,11 +196,11 @@ public: "Error %d.\n", l_ret); } #endif - DEBUG (fprintf (stderr, "b ")); + DEBUG (fprintf (stderr, "b ");); }; __INLINE__ void wait () { - DEBUG (fprintf (stderr, "c ")); + DEBUG (fprintf (stderr, "c ");); #ifdef _USE_OMNI_THREADS_ d_condition->wait (); #else @@ -207,7 +210,7 @@ public: "Error %d.\n", l_ret); } #endif - DEBUG (printf (stderr, "d ")); + DEBUG (fprintf (stderr, "d ");); }; }; |