From 4022e568aa33f223df17b869af80f5c6565df286 Mon Sep 17 00:00:00 2001 From: michaelld Date: Sun, 13 Jan 2008 20:41:11 +0000 Subject: Merged OSX fixes for 10.5 (backwards compatible with 10.4 if not earlier) for USRP legacy fast-usb code from r7358 branch into trunk: Fixed DEBUG commands in all files. Fixed flow control between originating and spawned threads. Fixed WritePipeAsync buffer write size. Added in debugging comments to fusb code, to better track async flow. NOT YET updated for MacOS X 10.5-specific IOKit code, but everything seems to work just fine as is. git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@7417 221aa14e-8319-0410-a670-987f0aec2ac5 --- usrp/host/lib/legacy/circular_buffer.h | 28 +++++---- usrp/host/lib/legacy/circular_linked_list.h | 17 +++--- usrp/host/lib/legacy/fusb_darwin.cc | 95 +++++++++++++++++++++++++---- usrp/host/lib/legacy/mld_threads.h | 11 ++-- 4 files changed, 114 insertions(+), 37 deletions(-) (limited to 'usrp') diff --git a/usrp/host/lib/legacy/circular_buffer.h b/usrp/host/lib/legacy/circular_buffer.h index fa451d607..8898e4194 100644 --- a/usrp/host/lib/legacy/circular_buffer.h +++ b/usrp/host/lib/legacy/circular_buffer.h @@ -26,7 +26,9 @@ #include "mld_threads.h" #include +#ifndef DO_DEBUG #define DO_DEBUG 0 +#endif #if DO_DEBUG #define DEBUG(X) do{X} while(0); @@ -82,7 +84,7 @@ public: DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, " "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I, (d_doWriteBlock ? "true" : "false"), - (d_doFullRead ? "true" : "false"))); + (d_doFullRead ? "true" : "false"));); }; ~circular_buffer () { @@ -150,7 +152,7 @@ public: int enqueue (T* buf, UInt32 bufLen_I) { DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, " "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I, - d_n_avail_write_I, d_n_avail_read_I)); + d_n_avail_write_I, d_n_avail_read_I);); if (bufLen_I > d_bufLen_I) { fprintf (stderr, "cannot add buffer longer (%ld" ") than instantiated length (%ld" @@ -173,21 +175,21 @@ public: if (bufLen_I > d_n_avail_write_I) { if (d_doWriteBlock) { while (bufLen_I > d_n_avail_write_I) { - DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n")); + DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n");); // wait will automatically unlock() the internal mutex d_writeBlock->wait (); // and lock() it here. if (d_doAbort) { d_internal->unlock (); - DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n")); + DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n");); return (2); } - DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n")); + DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n");); } } else { d_n_avail_read_I = d_bufLen_I - bufLen_I; d_n_avail_write_I = bufLen_I; - DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n")); + DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n");); retval = -1; } } @@ -233,7 +235,7 @@ public: int dequeue (T* buf, UInt32* bufLen_I) { DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, " "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I, - d_n_avail_write_I, d_n_avail_read_I)); + d_n_avail_write_I, d_n_avail_read_I);); if (!bufLen_I) throw std::runtime_error ("circular_buffer::dequeue(): " "input bufLen pointer is NULL.\n"); @@ -257,29 +259,29 @@ public: } if (d_doFullRead) { while (d_n_avail_read_I < l_bufLen_I) { - DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n");); // wait will automatically unlock() the internal mutex d_readBlock->wait (); // and lock() it here. if (d_doAbort) { d_internal->unlock (); - DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n");); return (2); } - DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n");); } } else { while (d_n_avail_read_I == 0) { - DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n");); // wait will automatically unlock() the internal mutex d_readBlock->wait (); // and lock() it here. if (d_doAbort) { d_internal->unlock (); - DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n");); return (2); } - DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n")); + DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n");); } } if (l_bufLen_I > d_n_avail_read_I) diff --git a/usrp/host/lib/legacy/circular_linked_list.h b/usrp/host/lib/legacy/circular_linked_list.h index 14d81ac91..e495d609b 100644 --- a/usrp/host/lib/legacy/circular_linked_list.h +++ b/usrp/host/lib/legacy/circular_linked_list.h @@ -27,7 +27,10 @@ #include #define __INLINE__ inline + +#ifndef DO_DEBUG #define DO_DEBUG 0 +#endif #if DO_DEBUG #define DEBUG(X) do{X} while(0); @@ -168,17 +171,17 @@ public: d_internal->lock (); // find an available node s_node_ptr l_node = d_available; - DEBUG (fprintf (stderr, "w ")); + DEBUG (fprintf (stderr, "w ");); while (! l_node) { - DEBUG (fprintf (stderr, "x\n")); + DEBUG (fprintf (stderr, "x\n");); // the ioBlock condition will automatically unlock() d_internal d_ioBlock->wait (); // and lock() is here - DEBUG (fprintf (stderr, "y\n")); + DEBUG (fprintf (stderr, "y\n");); l_node = d_available; } DEBUG (fprintf (stderr, "::f_n_a_n: #u = %ld, node = %p\n", - num_used(), l_node)); + num_used(), l_node);); // remove this one from the current available list if (num_available () == 1) { // last one, just set available to NULL @@ -201,7 +204,7 @@ public: if (!l_node) return; d_internal->lock (); DEBUG (fprintf (stderr, "::m_n_a: #u = %ld, node = %p\n", - num_used(), l_node)); + num_used(), l_node);); // remove this node from the inUse list if (num_used () == 1) { // last one, just set inUse to NULL @@ -216,10 +219,10 @@ public: l_node->insert_before (d_available); d_n_used--; - DEBUG (fprintf (stderr, "s%ld ", d_n_used)); + DEBUG (fprintf (stderr, "s%ld ", d_n_used);); // signal the condition when new data arrives d_ioBlock->signal (); - DEBUG (fprintf (stderr, "t ")); + DEBUG (fprintf (stderr, "t ");); // unlock the mutex for thread safety d_internal->unlock (); diff --git a/usrp/host/lib/legacy/fusb_darwin.cc b/usrp/host/lib/legacy/fusb_darwin.cc index 05db29e80..737387b87 100644 --- a/usrp/host/lib/legacy/fusb_darwin.cc +++ b/usrp/host/lib/legacy/fusb_darwin.cc @@ -27,6 +27,7 @@ // tell mld_threads to NOT use omni_threads, // but rather Darwin's pthreads #define _USE_OMNI_THREADS_ +#define DO_DEBUG 0 #include #include "fusb.h" @@ -190,13 +191,18 @@ fusb_ephandle_darwin::start () d_endpoint, d_pipeRef, d_interface, d_interfaceRef, direction, number, interval, maxPacketSize); -// set global start boolean + // set global start boolean d_started = true; -// create the run thread, which allows OSX to process I/O separately + // lock the runBlock mutex, before creating the run thread. + // this guarantees that we can control execution between these 2 threads + d_runBlock->mutex ()->lock (); + + // create the run thread, which allows OSX to process I/O separately d_runThread = new mld_thread (run_thread, this); -// wait until the threads are -really- going + // wait until the run thread (and possibky read thread) are -really- + // going; this will unlock the mutex before waiting for a signal () d_runBlock->wait (); if (usb_debug) @@ -210,11 +216,16 @@ void fusb_ephandle_darwin::run_thread (void* arg) { fusb_ephandle_darwin* This = static_cast(arg); + + // lock the run thread running mutex; if ::stop() is called, it will + // first abort() the pipe then wait for the run thread to finish, + // via a lock() on this mutex mld_mutex_ptr l_runThreadRunning = This->d_runThreadRunning; l_runThreadRunning->lock (); mld_mutex_ptr l_readRunning = This->d_readRunning; mld_condition_ptr l_readBlock = This->d_readBlock; + mld_mutex_ptr l_readBlock_mutex = l_readBlock->mutex (); bool l_input_p = This->d_input_p; @@ -237,24 +248,41 @@ fusb_ephandle_darwin::run_thread (void* arg) mld_thread_ptr l_rwThread = NULL; if (l_input_p) { + // lock the readBlock mutex, before creating the read thread. + // this guarantees that we can control execution between these 2 threads + l_readBlock_mutex->lock (); + // create the read thread, which just issues all of the starting + // async read commands, then returns l_rwThread = new mld_thread (read_thread, arg); -// wait until the the rwThread is -really- going + // wait until the the read thread is -really- going; this will + // unlock the read block mutex before waiting for a signal () l_readBlock->wait (); } -// now signal the run condition to release and finish ::start() + // now signal the run condition to release and finish ::start(). + + // lock the runBlock mutex first; this will force waiting until the + // ->wait() command is issued in ::start() + mld_mutex_ptr l_run_block_mutex = This->d_runBlock->mutex (); + l_run_block_mutex->lock (); + + // now that the lock is in place, signal the parent thread that + // things are running This->d_runBlock->signal (); -// run the loop + // release the run_block mutex, just in case + l_run_block_mutex->unlock (); + + // run the loop CFRunLoopRun (); if (l_input_p) { -// wait for read_thread () to finish + // wait for read_thread () to finish, if needed l_readRunning->lock (); l_readRunning->unlock (); } -// remove run loop stuff + // remove run loop stuff CFRunLoopRemoveSource (CFRunLoopGetCurrent (), l_cfSource, kCFRunLoopDefaultMode); @@ -262,6 +290,7 @@ fusb_ephandle_darwin::run_thread (void* arg) fprintf (stderr, "fusb_ephandle_darwin::run_thread: finished for %s.\n", l_input_p ? "read" : "write"); + // release the run thread running mutex l_runThreadRunning->unlock (); } @@ -273,13 +302,27 @@ fusb_ephandle_darwin::read_thread (void* arg) fusb_ephandle_darwin* This = static_cast(arg); + // before doing anything else, lock the read running mutex. this + // mutex does flow control between this thread and the run_thread mld_mutex_ptr l_readRunning = This->d_readRunning; l_readRunning->lock (); -// signal the read condition from run_thread() to continue + // signal the read condition from run_thread() to continue + + // lock the readBlock mutex first; this will force waiting until the + // ->wait() command is issued in ::run_thread() mld_condition_ptr l_readBlock = This->d_readBlock; + mld_mutex_ptr l_read_block_mutex = l_readBlock->mutex (); + l_read_block_mutex->lock (); + + // now that the lock is in place, signal the parent thread that + // things are running here l_readBlock->signal (); + // release the run_block mutex, just in case + l_read_block_mutex->unlock (); + + // queue up all of the available read requests s_queue_ptr l_queue = This->d_queue; l_queue->iterate_start (); s_node_ptr l_node = l_queue->iterate_next (); @@ -291,14 +334,21 @@ fusb_ephandle_darwin::read_thread (void* arg) if (usb_debug) fprintf (stderr, "fusb_ephandle_darwin::read_thread: finished.\n"); + // release the read running mutex, to let the parent thread knows + // that this thread is finished l_readRunning->unlock (); } void fusb_ephandle_darwin::read_issue (s_both_ptr l_both) { - if ((! l_both) || (! d_started)) + if ((! l_both) || (! d_started)) { + if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::read_issue: Doing nothing; " + "l_both is %X; started is %s\n", (unsigned int) l_both, + d_started ? "TRUE" : "FALSE"); return; + } // set the node and buffer from the input "both" s_node_ptr l_node = l_both->node (); @@ -328,6 +378,9 @@ fusb_ephandle_darwin::read_issue (s_both_ptr l_both) "(ReadPipeAsync%s): %s", d_transferType == kUSBInterrupt ? "" : "TO", darwin_error_str (result)); + else if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::read_issue: " + "Queued %X (%ld Bytes)\n", (unsigned int) l_both, bufLen); } void @@ -347,6 +400,10 @@ fusb_ephandle_darwin::read_completed (void* refCon, fprintf (stderr, "fusb_ephandle_darwin::read_completed: " "Expected %ld bytes; read %ld.\n", l_i_size, l_size); + else if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::read_completed: " + "Read %X (%ld bytes)\n", + (unsigned int) l_both, l_size); // add this read to the transfer buffer if (l_buffer->enqueue (l_buf->buffer (), l_size) == -1) { @@ -378,7 +435,12 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes) { UInt32 l_nbytes = (UInt32) nbytes; - if (! d_started) return (0); + if (! d_started) { + if (usb_debug) + fprintf (stderr, "fusb_ephandle_darwin::write: Not yet started.\n"); + + return (0); + } while (l_nbytes != 0) { // find out how much data to copy; limited to "d_bufLenBytes" per node @@ -400,11 +462,11 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes) if (d_transferType == kUSBInterrupt) /* This is an interrupt pipe ... can't specify a timeout. */ result = d_interface->WritePipeAsync - (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes, + (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes, (IOAsyncCallback1) write_completed, (void*) l_both); else result = d_interface->WritePipeAsyncTO - (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes, 0, USB_TIMEOUT, + (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes, 0, USB_TIMEOUT, (IOAsyncCallback1) write_completed, (void*) l_both); if (result != kIOReturnSuccess) @@ -413,6 +475,10 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes) "(WritePipeAsync%s): %s", d_transferType == kUSBInterrupt ? "" : "TO", darwin_error_str (result)); + else if (usb_debug > 4) { + fprintf (stderr, "fusb_ephandle_darwin::write_thread: " + "Queued %X (%ld Bytes)\n", (unsigned int) l_both, t_nbytes); + } l_nbytes -= t_nbytes; } @@ -436,6 +502,9 @@ fusb_ephandle_darwin::write_completed (void* refCon, fprintf (stderr, "fusb_ephandle_darwin::write_completed: " "Expected %ld bytes written; wrote %ld.\n", l_i_size, l_size); + else if (usb_debug > 4) + fprintf (stderr, "fusb_ephandle_darwin::write_completed: " + "Wrote %X (%ld Bytes)\n", (unsigned int) l_both, l_size); // set buffer's # data to 0 l_buf->n_used (0); diff --git a/usrp/host/lib/legacy/mld_threads.h b/usrp/host/lib/legacy/mld_threads.h index a59a92863..b2ec65751 100644 --- a/usrp/host/lib/legacy/mld_threads.h +++ b/usrp/host/lib/legacy/mld_threads.h @@ -37,7 +37,10 @@ #include #define __INLINE__ inline + +#ifndef DO_DEBUG #define DO_DEBUG 0 +#endif #if DO_DEBUG #define DEBUG(X) do{X} while(0); @@ -182,7 +185,7 @@ public: __INLINE__ mld_mutex_ptr mutex () {return (d_mutex);}; __INLINE__ void signal () { - DEBUG (fprintf (stderr, "a ")); + DEBUG (fprintf (stderr, "a ");); #ifdef _USE_OMNI_THREADS_ d_condition->signal (); @@ -193,11 +196,11 @@ public: "Error %d.\n", l_ret); } #endif - DEBUG (fprintf (stderr, "b ")); + DEBUG (fprintf (stderr, "b ");); }; __INLINE__ void wait () { - DEBUG (fprintf (stderr, "c ")); + DEBUG (fprintf (stderr, "c ");); #ifdef _USE_OMNI_THREADS_ d_condition->wait (); #else @@ -207,7 +210,7 @@ public: "Error %d.\n", l_ret); } #endif - DEBUG (printf (stderr, "d ")); + DEBUG (fprintf (stderr, "d ");); }; }; -- cgit