#include <queue> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/thread/tss.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <iostream> using namespace std; using namespace boost; boost::recursive_mutex io_mutex; boost::condition_variable_any cond; std::queue< int> iq; class printer { public: printer(boost::asio::io_service& io, int n) : strand_(io), timer1_(io, boost::posix_time::seconds( 1)), timer2_(io, boost::posix_time::seconds( 1)), count_(n) { timer2_.async_wait(boost::bind(&printer::enqueue, this)); timer1_.async_wait(boost::bind(&printer::dequeue, this)); } ~printer() { boost::recursive_mutex::scoped_lock lock(io_mutex); std::cout << " Final count is " << count_ << " \n "; } void dequeue() { boost::recursive_mutex::scoped_lock lock(io_mutex); while(iq.empty()) { cond.wait( lock); } int pop = 0; if (!iq.empty()) { pop = iq.front(); iq.pop(); cout<< " ------------pop "<<pop<<endl; } cond.notify_all(); timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds( 1)); timer1_.async_wait(boost::bind(&printer::dequeue, this)); } void enqueue() { boost::recursive_mutex::scoped_lock lock(io_mutex); while(iq.size() == 1000) { cond.wait( lock); } iq.push(count_);; cond.notify_all(); cout<< " ------------push "<<count_<<endl; timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds( 1)); timer1_.async_wait(boost::bind(&printer::enqueue, this)); } private: boost::asio::strand strand_; boost::asio::deadline_timer timer1_; boost::asio::deadline_timer timer2_; int count_; }; int main() { thread_group threads; boost::asio::io_service io; printer * p[ 1000]; int num = 1000; for( int i = 0; i < num; i++) { p[i] = new printer(io,i); threads.create_thread(boost::bind(&boost::asio::io_service::run, &io)); } io.run(); threads.join_all(); std::system( " pause "); return 0; }