Changeset 44699


Ignore:
Timestamp:
Apr 21, 2008, 4:22:16 PM (10 years ago)
Author:
Anthony Williams
Message:

Revamped condition variable to try and fix swallowed-notify problems (trac issue #1834)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/boost/thread/win32/condition_variable.hpp

    r44168 r44699  
    1515#include "interlocked_read.hpp"
    1616#include <boost/thread/xtime.hpp>
     17#include <vector>
     18#include <boost/intrusive_ptr.hpp>
    1719
    1820namespace boost
     
    2830            struct list_entry
    2931            {
    30                 detail::win32::handle semaphore;
    31                 long count;
     32                detail::win32::handle_manager semaphore;
     33                detail::win32::handle_manager wake_sem;
     34                long waiters;
    3235                bool notified;
     36                long references;
    3337
    3438                list_entry():
    35                     semaphore(0),count(0),notified(0)
     39                    semaphore(detail::win32::create_anonymous_semaphore(0,LONG_MAX)),
     40                    wake_sem(0),
     41                    waiters(1),notified(false),references(0)
    3642                {}
    3743
    38                 void release(unsigned count_to_release=1)
    39                 {
     44                void release(unsigned count_to_release)
     45                {
     46                    notified=true;
    4047                    detail::win32::ReleaseSemaphore(semaphore,count_to_release,0);
    4148                }
    42                
     49
     50                friend void intrusive_ptr_add_ref(list_entry * p)
     51                {
     52                    BOOST_INTERLOCKED_INCREMENT(&p->references);
     53                }
     54           
     55                friend void intrusive_ptr_release(list_entry * p)
     56                {
     57                    if(!BOOST_INTERLOCKED_DECREMENT(&p->references))
     58                    {
     59                        delete p;
     60                    }
     61                }
    4362            };
    4463
    45             BOOST_STATIC_CONSTANT(unsigned,generation_count=3);
    46 
    47             list_entry generations[generation_count];
    48             detail::win32::handle wake_sem;
     64            typedef boost::intrusive_ptr<list_entry> entry_ptr;
     65            typedef std::vector<entry_ptr> generation_list;
     66
     67            generation_list generations;
     68            detail::win32::handle_manager wake_sem;
    4969
    5070            void wake_waiters(long count_to_wake)
     
    5474            }
    5575           
    56 
    57             static bool no_waiters(list_entry const& entry)
    58             {
    59                 return entry.count==0;
    60             }
    61 
    62             void shift_generations_down()
    63             {
    64                 list_entry* const last_active_entry=std::remove_if(generations,generations+generation_count,no_waiters);
    65                 if(last_active_entry==generations+generation_count)
    66                 {
    67                     broadcast_entry(generations[generation_count-1]);
    68                 }
    69                 else
    70                 {
    71                     active_generation_count=unsigned(last_active_entry-generations)+1;
    72                 }
    73 
    74 #ifdef BOOST_MSVC
    75 #pragma warning(push)
    76 #pragma warning(disable:4996)
    77 #endif
    78                 std::copy_backward(generations,generations+active_generation_count-1,generations+active_generation_count);
    79 #ifdef BOOST_MSVC
    80 #pragma warning(pop)
    81 #endif
    82                 generations[0]=list_entry();
    83             }
    84 
    85             void broadcast_entry(list_entry& entry)
    86             {
    87                 entry.release(entry.count);
    88                 entry.count=0;
    89                 dispose_entry(entry);
    90             }
    91        
    92 
    93             void dispose_entry(list_entry& entry)
    94             {
    95                 if(entry.semaphore)
    96                 {
    97                     BOOST_VERIFY(detail::win32::CloseHandle(entry.semaphore));
    98                     entry.semaphore=0;
    99                 }
    100                 entry.notified=false;
    101             }
    102 
    10376            template<typename lock_type>
    10477            struct relocker
     
    12497                }
    12598            private:
     99                relocker(relocker&);
    126100                void operator=(relocker&);
    127101            };
    128102           
    129103
    130             template<typename lock_type>
    131             void start_wait_loop_first_time(relocker<lock_type>& locker,
    132                                             detail::win32::handle_manager& local_wake_sem)
    133             {
    134                 detail::interlocked_write_release(&total_count,total_count+1);
    135                 locker.unlock();
     104            entry_ptr get_wait_entry()
     105            {
     106                boost::lock_guard<boost::mutex> internal_lock(internal_mutex);
     107
    136108                if(!wake_sem)
    137109                {
     
    139111                    BOOST_ASSERT(wake_sem);
    140112                }
    141                 local_wake_sem=detail::win32::duplicate_handle(wake_sem);
    142                        
    143                 if(generations[0].notified)
    144                 {
    145                     shift_generations_down();
    146                 }
    147                 else if(!active_generation_count)
    148                 {
    149                     active_generation_count=1;
    150                 }
    151             }
    152 
    153             void ensure_generation_present()
    154             {
    155                 if(!generations[0].semaphore)
    156                 {
    157                     generations[0].semaphore=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
    158                     BOOST_ASSERT(generations[0].semaphore);
    159                 }
    160             }
    161            
    162             template<typename lock_type>
    163             void start_wait_loop(relocker<lock_type>& locker,
    164                                  detail::win32::handle_manager& local_wake_sem,
    165                                  detail::win32::handle_manager& sem)
    166             {
    167                 boost::mutex::scoped_lock internal_lock(internal_mutex);
    168                 if(!local_wake_sem)
    169                 {
    170                     start_wait_loop_first_time(locker,local_wake_sem);
    171                 }
    172                 ensure_generation_present();
    173                 ++generations[0].count;
    174                 sem=detail::win32::duplicate_handle(generations[0].semaphore);
    175             }
     113
     114                detail::interlocked_write_release(&total_count,total_count+1);
     115                if(generations.empty() || generations.back()->notified)
     116                {
     117                    entry_ptr new_entry(new list_entry);
     118                    new_entry->wake_sem=wake_sem.duplicate();
     119                    generations.push_back(new_entry);
     120                    return new_entry;
     121                }
     122                else
     123                {
     124                    BOOST_INTERLOCKED_INCREMENT(&generations.back()->waiters);
     125                    return generations.back();
     126                }
     127            }
     128           
     129            struct entry_manager
     130            {
     131                entry_ptr const entry;
     132                   
     133                entry_manager(entry_ptr const& entry_):
     134                    entry(entry_)
     135                {}
     136                   
     137                ~entry_manager()
     138                {
     139                    BOOST_INTERLOCKED_DECREMENT(&entry->waiters);
     140                }
     141
     142                list_entry* operator->()
     143                {
     144                    return entry.get();
     145                }
     146
     147            private:
     148                void operator=(entry_manager&);
     149                entry_manager(entry_manager&);
     150            };
     151               
    176152
    177153        protected:
     
    179155            bool do_wait(lock_type& lock,timeout wait_until)
    180156            {
    181                 detail::win32::handle_manager local_wake_sem;
    182                 detail::win32::handle_manager sem;
     157                relocker<lock_type> locker(lock);
     158               
     159                entry_manager entry=get_wait_entry();
     160
     161                locker.unlock();
     162
    183163                bool woken=false;
    184 
    185                 relocker<lock_type> locker(lock);
    186            
    187164                while(!woken)
    188165                {
    189                     start_wait_loop(locker,local_wake_sem,sem);
    190                    
    191                     if(!this_thread::interruptible_wait(sem,wait_until))
     166                    if(!this_thread::interruptible_wait(entry->semaphore,wait_until))
    192167                    {
    193168                        return false;
    194169                    }
    195170               
    196                     unsigned long const woken_result=detail::win32::WaitForSingleObject(local_wake_sem,0);
     171                    unsigned long const woken_result=detail::win32::WaitForSingleObject(entry->wake_sem,0);
    197172                    BOOST_ASSERT(woken_result==detail::win32::timeout || woken_result==0);
    198173
     
    215190            basic_condition_variable(const basic_condition_variable& other);
    216191            basic_condition_variable& operator=(const basic_condition_variable& other);
     192
     193            static bool no_waiters(entry_ptr const& entry)
     194            {
     195                return !detail::interlocked_read_acquire(&entry->waiters);
     196            }
    217197        public:
    218198            basic_condition_variable():
     
    221201           
    222202            ~basic_condition_variable()
    223             {
    224                 for(unsigned i=0;i<generation_count;++i)
    225                 {
    226                     dispose_entry(generations[i]);
    227                 }
    228                 detail::win32::CloseHandle(wake_sem);
    229             }
    230 
    231        
     203            {}
     204
    232205            void notify_one()
    233206            {
     
    240213                    }
    241214                    wake_waiters(1);
    242                    
    243                     unsigned waiting_count=0;
    244                    
    245                     for(unsigned generation=active_generation_count;generation!=0;--generation)
    246                     {
    247                         list_entry& entry=generations[generation-1];
    248                         waiting_count+=entry.count;
    249                         if(entry.count)
    250                         {
    251                             entry.notified=true;
    252                             entry.release();
    253                             if(!--entry.count)
    254                             {
    255                                 dispose_entry(entry);
    256                                 if(generation==active_generation_count)
    257                                 {
    258                                     --active_generation_count;
    259                                 }
    260                             }
    261                         }
    262                     }
    263                     if(waiting_count<=total_count)
    264                     {
    265                         shift_generations_down();
    266                         ensure_generation_present();
    267                         generations[0].release();
    268                     }
     215
     216                    for(generation_list::iterator it=generations.begin(),
     217                            end=generations.end();
     218                        it!=end;++it)
     219                    {
     220                        (*it)->release(1);
     221                    }
     222                    generations.erase(std::remove_if(generations.begin(),generations.end(),no_waiters),generations.end());
    269223                }
    270224            }
     
    275229                {
    276230                    boost::mutex::scoped_lock internal_lock(internal_mutex);
    277                     long waiting_count=total_count;
    278                    
     231                    if(!total_count)
     232                    {
     233                        return;
     234                    }
    279235                    wake_waiters(total_count);
    280                     for(unsigned generation=active_generation_count;generation!=0;--generation)
    281                     {
    282                         list_entry& entry=generations[generation-1];
    283                         if(entry.count)
    284                         {
    285                             waiting_count-=entry.count;
    286                             broadcast_entry(entry);
    287                         }
    288                     }
    289                     if(waiting_count)
    290                     {
    291                         ensure_generation_present();
    292                         generations[0].release(waiting_count);
    293                     }
    294                     active_generation_count=0;
     236                    for(generation_list::iterator it=generations.begin(),
     237                            end=generations.end();
     238                        it!=end;++it)
     239                    {
     240                        (*it)->release(detail::interlocked_read_acquire(&(*it)->waiters));
     241                    }
     242                    wake_sem=detail::win32::handle(0);
    295243                }
    296244            }
Note: See TracChangeset for help on using the changeset viewer.