28 #include <boost/date_time/posix_time/posix_time.hpp>
29 #include <boost/shared_ptr.hpp>
30 #include <boost/shared_array.hpp>
31 #include <boost/scoped_array.hpp>
32 #include <boost/function.hpp>
33 #include <boost/bind.hpp>
34 #include <boost/thread.hpp>
71 typedef typename std::vector<Item>::iterator
iterator;
106 void start(
int instance=-1) {
m_items.front().m_statement->connection.queue(*
this, instance); }
129 virtual void commit(
const unsigned int thread=0)=0;
130 virtual void rollback(
const unsigned int thread=0)=0;
132 boost::scoped_array<boost::condition_variable>
wakeUp;
156 class Queries:
public std::queue<QuerySet>,
public boost::mutex {};
193 inline void queue(T*
const& statement,
QueryPar& query,
int instance);
216 boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
220 boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
221 while(m_threads<maxThreads)
224 threadsChanged.wait(threadsLock);
231 boost::lock_guard<boost::mutex> terminateLock(terminateMutex);
234 for(boost::condition_variable* i=wakeUp.get(); i<wakeUp.get()+threads(); ++i)
237 boost::unique_lock<boost::mutex> threadsLock(threadsMutex);
239 threadsChanged.wait(threadsLock);
245 boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
248 threadsChanged.notify_one();
250 boost::unique_lock<boost::mutex> terminateLock(terminateMutex, boost::defer_lock_t());
251 boost::unique_lock<boost::mutex> queriesLock(queries[
id], boost::defer_lock_t());
255 terminateLock.lock();
258 terminateLock.unlock();
261 if(!queries[
id].size())
263 wakeUp[id].wait(queriesLock);
264 queriesLock.unlock();
267 QuerySet querySet=queries[id].front();
269 queriesLock.unlock();
295 catch(
const Error& e)
303 while(!querySet.
m_commit && queries[
id].size())
305 tmpQuerySet=queries[id].front();
311 queriesLock.unlock();
318 boost::lock_guard<boost::mutex> threadsLock(threadsMutex);
321 threadsChanged.notify_one();
329 for(
unsigned int i=1; i<threads(); ++i)
331 boost::lock_guard<boost::mutex> queriesLock(queries[i]);
332 if(queries[i].size() < queries[instance].size())
337 boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
338 queries[instance].push(
QuerySet(query, statement,
true));
339 wakeUp[instance].notify_one();
349 for(
unsigned int i=1; i<threads(); ++i)
351 boost::lock_guard<boost::mutex> queriesLock(queries[i]);
352 if(queries[i].size() < queries[instance].size())
357 boost::lock_guard<boost::mutex> queriesLock(queries[instance]);
360 queries[instance].push(
QuerySet(it->m_query, it->m_statement,
false));
361 queries[instance].back().m_commit =
true;
363 wakeUp[instance].notify_one();
368 for(
iterator it=begin(); it!=end(); ++it)
369 it->m_query.cancel();
375 for(
unsigned int i=1; i<threads(); ++i)
377 boost::lock_guard<boost::mutex> queriesLock(queries[i]);
378 size += queries[i].size();