libqi-api  2.1.0.18
/home/opennao/work/master/sdk/libqi/qi/details/future.hxx
Go to the documentation of this file.
00001 #pragma once
00002 /*
00003 **  Copyright (C) 2012 Aldebaran Robotics
00004 **  See COPYING for the license
00005 */
00006 
00007 #ifndef _QI_DETAILS_FUTURE_HXX_
00008 #define _QI_DETAILS_FUTURE_HXX_
00009 
00010 #include <vector>
00011 #include <utility> // pair
00012 #include <boost/bind.hpp>
00013 #include <qi/eventloop.hpp>
00014 
00015 #include <qi/log.hpp>
00016 
00017 namespace qi {
00018 
00019   namespace detail {
00020 
00021     class FutureBasePrivate;
00022     class QI_API FutureBase {
00023     public:
00024       FutureBase();
00025       ~FutureBase();
00026 
00027       FutureState wait(int msecs) const;
00028       FutureState state() const;
00029       bool isRunning() const;
00030       bool isFinished() const;
00031       bool isCanceled() const;
00032       bool hasError(int msecs) const;
00033       bool hasValue(int msecs) const;
00034       const std::string &error(int msecs) const;
00035       void reportStart();
00036       void reset();
00037 
00038     protected:
00039       void reportValue();
00040       void reportError(const std::string &message);
00041       void reportCanceled();
00042       boost::recursive_mutex& mutex();
00043       void notifyFinish();
00044 
00045     public:
00046       FutureBasePrivate *_p;
00047     };
00048 
00049 
00050     //common state shared between a Promise and multiple Futures
00051     template <typename T>
00052     class FutureBaseTyped : public FutureBase {
00053     public:
00054       typedef typename FutureType<T>::type ValueType;
00055       FutureBaseTyped()
00056         : _value()
00057         , _async(FutureCallbackType_Async)
00058       {
00059       }
00060 
00061       bool isCancelable() const
00062       {
00063         return _onCancel;
00064       }
00065 
00066       void cancel(qi::Future<T>& future)
00067       {
00068         if (isFinished())
00069           return;
00070         if (!_onCancel)
00071           throw FutureException(FutureException::ExceptionState_FutureNotCancelable);
00072         _onCancel(Promise<T>(future));
00073       }
00074 
00075       void setOnCancel(boost::function<void (Promise<T>)> onCancel)
00076       {
00077         _onCancel = onCancel;
00078       }
00079 
00080 
00081       void callCbNotify(qi::Future<T>& future)
00082       {
00083         for(unsigned i = 0; i<_onResult.size(); ++i)
00084         {
00085           try {
00086             if (_async == FutureCallbackType_Async)
00087               getEventLoop()->post(boost::bind(_onResult[i], future));
00088             else
00089               _onResult[i](future);
00090           } catch(const qi::PointerLockException&) { // do nothing
00091           } catch(const std::exception& e) {
00092             qiLogError("qi.future") << "Exception caught in future callback "
00093                                     << e.what();
00094           } catch (...) {
00095             qiLogError("qi.future")
00096                 << "Unknown exception caught in future callback";
00097           }
00098         }
00099         notifyFinish();
00100       }
00101 
00102       void setValue(qi::Future<T>& future, const ValueType &value)
00103       {
00104         // report-ready + onResult() must be Atomic to avoid
00105         // missing callbacks/double calls in case connect() is invoked at
00106         // the same time
00107         boost::recursive_mutex::scoped_lock lock(mutex());
00108         if (!isRunning())
00109           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00110 
00111         _value = value;
00112         reportValue();
00113         callCbNotify(future);
00114       }
00115 
00116       /*
00117        * inplace api for promise
00118        */
00119       void set(qi::Future<T>& future)
00120       {
00121         // report-ready + onResult() must be Atomic to avoid
00122         // missing callbacks/double calls in case connect() is invoked at
00123         // the same time
00124         boost::recursive_mutex::scoped_lock lock(mutex());
00125         if (!isRunning())
00126           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00127 
00128         reportValue();
00129         callCbNotify(future);
00130       }
00131 
00132       void setError(qi::Future<T>& future, const std::string &message)
00133       {
00134         boost::recursive_mutex::scoped_lock lock(mutex());
00135         if (!isRunning())
00136           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00137 
00138         reportError(message);
00139         callCbNotify(future);
00140       }
00141 
00142       void setCanceled(qi::Future<T>& future) {
00143         boost::recursive_mutex::scoped_lock lock(mutex());
00144         if (!isRunning())
00145           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00146 
00147         reportCanceled();
00148         callCbNotify(future);
00149       }
00150 
00151 
00152       void connect(qi::Future<T> future,
00153           const boost::function<void (qi::Future<T>)> &s,
00154           FutureCallbackType type)
00155       {
00156         bool ready;
00157         {
00158           boost::recursive_mutex::scoped_lock lock(mutex());
00159           _onResult.push_back(s);
00160           ready = isFinished();
00161         }
00162         //result already ready, notify the callback
00163         if (ready) {
00164           if (type == FutureCallbackType_Async)
00165             getEventLoop()->post(boost::bind(s, future));
00166           else
00167           {
00168             try {
00169               s(future);
00170             } catch(const ::qi::PointerLockException&)
00171             {/*do nothing*/}
00172           }
00173         }
00174       }
00175 
00176       const ValueType &value(int msecs) const {
00177         FutureState state = wait(msecs);
00178         if (state == FutureState_Running)
00179           throw FutureException(FutureException::ExceptionState_FutureTimeout);
00180         if (state == FutureState_Canceled)
00181           throw FutureException(FutureException::ExceptionState_FutureCanceled);
00182         if (state == FutureState_FinishedWithError)
00183           throw FutureUserException(error(FutureTimeout_None));
00184         return _value;
00185       }
00186 
00187     private:
00188       friend class Promise<T>;
00189       typedef std::vector<boost::function<void (qi::Future<T>)> > Callbacks;
00190       Callbacks                _onResult;
00191       ValueType                _value;
00192       boost::function<void (Promise<T>)> _onCancel;
00193       FutureCallbackType       _async;
00194     };
00195 
00196     template <typename T>
00197     void waitForFirstHelper(qi::Promise< qi::Future<T> >& prom,
00198                             qi::Future<T>& fut,
00199                             qi::Atomic<int>* count) {
00200       if (!prom.future().isFinished() && !fut.hasError())
00201       {
00202         // An other future can trigger at the same time.
00203         // Don't bother to lock, just catch the FutureAlreadySet exception
00204         try
00205         {
00206           prom.setValue(fut);
00207         }
00208         catch(const FutureException&)
00209         {}
00210       }
00211       if (! --*count)
00212       {
00213         // I'm the last
00214         if (!prom.future().isFinished())
00215         {
00216           // same 'race' as above. between two setError, not between a value and
00217           // an error.
00218           try
00219           {
00220             prom.setValue(makeFutureError<T>("No future returned successfully."));
00221           }
00222           catch(const FutureException&)
00223           {}
00224         }
00225         delete count;
00226       }
00227     }
00228   } // namespace detail
00229 
00230   template <typename T>
00231   qi::Future<T> makeFutureError(const std::string &error, FutureCallbackType async) {
00232     qi::Promise<T> prom(async);
00233     prom.setError(error);
00234     return prom.future();
00235   }
00236 
00237   template <typename T>
00238   void waitForAll(std::vector<Future<T> >& vect) {
00239     typename std::vector< Future<T> >::iterator it;
00240     qi::FutureBarrier<T> barrier;
00241 
00242     for (it = vect.begin(); it != vect.end(); ++it) {
00243       barrier.addFuture(*it);
00244     }
00245     barrier.future().wait();
00246   }
00247 
00248   template <typename T>
00249   qi::FutureSync< qi::Future<T> > waitForFirst(std::vector< Future<T> >& vect) {
00250     typename std::vector< Future<T> >::iterator it;
00251     qi::Promise< qi::Future<T> > prom;
00252     qi::Atomic<int>* count = new qi::Atomic<int>();
00253     count->swap((int)vect.size());
00254     for (it = vect.begin(); it != vect.end(); ++it) {
00255       it->connect(boost::bind<void>(&detail::waitForFirstHelper<T>, prom, *it, count));
00256     }
00257     return prom.future();
00258   }
00259 
00260   namespace detail
00261   {
00262     template<typename FT, typename PT, typename CONV>
00263     void futureAdapter(Future<FT> f, Promise<PT> p, CONV converter)
00264     {
00265       if (f.hasError())
00266         p.setError(f.error());
00267       else if (f.isCanceled())
00268         p.setCanceled();
00269       else
00270       {
00271         try {
00272           converter(f.value(), p.value());
00273         }
00274         catch (const std::exception& e)
00275         {
00276           p.setError(std::string("futureAdapter conversion error: ") + e.what());
00277           return;
00278         }
00279         p.trigger();
00280       }
00281     }
00282 
00283     template<typename FT>
00284     void futureCancelAdapter(boost::weak_ptr<FutureBaseTyped<FT> > wf)
00285     {
00286       if (boost::shared_ptr<FutureBaseTyped<FT> > f = wf.lock())
00287         Future<FT>(f).cancel();
00288     }
00289   }
00290 
00291   template <>
00292   struct FutureValueConverter<void, void>
00293   {
00294     void operator()(void* in, void* out)
00295     {
00296     }
00297   };
00298 
00299   template <typename T>
00300   struct FutureValueConverter<T, void>
00301   {
00302     void operator()(const T& in, void* out)
00303     {
00304     }
00305   };
00306 
00307   template <typename T>
00308   struct FutureValueConverter<void, T>
00309   {
00310     void operator()(void* in, const T& out)
00311     {
00312     }
00313   };
00314 
00315   template<typename FT, typename PT>
00316   void adaptFuture(const Future<FT>& f, Promise<PT>& p)
00317   {
00318     if (f.isCancelable())
00319       p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
00320             boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
00321     const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, FutureValueConverter<FT, PT> >, _1, p,
00322       FutureValueConverter<FT, PT>()));
00323   }
00324 
00325   template<typename FT, typename PT, typename CONV>
00326   void adaptFuture(const Future<FT>& f, Promise<PT>& p, CONV converter)
00327   {
00328     if (f.isCancelable())
00329       p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
00330             boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
00331     const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, CONV>, _1, p, converter));
00332   }
00333 }
00334 
00335 #endif  // _QI_DETAILS_FUTURE_HXX_
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines