libqi-api
2.1.0.18
|
00001 #pragma once 00002 /* 00003 ** Copyright (C) 2012 Aldebaran Robotics 00004 ** See COPYING for the license 00005 */ 00006 00007 #ifndef _QI_DETAILS_FUTURE_HXX_ 00008 #define _QI_DETAILS_FUTURE_HXX_ 00009 00010 #include <vector> 00011 #include <utility> // pair 00012 #include <boost/bind.hpp> 00013 #include <qi/eventloop.hpp> 00014 00015 #include <qi/log.hpp> 00016 00017 namespace qi { 00018 00019 namespace detail { 00020 00021 class FutureBasePrivate; 00022 class QI_API FutureBase { 00023 public: 00024 FutureBase(); 00025 ~FutureBase(); 00026 00027 FutureState wait(int msecs) const; 00028 FutureState state() const; 00029 bool isRunning() const; 00030 bool isFinished() const; 00031 bool isCanceled() const; 00032 bool hasError(int msecs) const; 00033 bool hasValue(int msecs) const; 00034 const std::string &error(int msecs) const; 00035 void reportStart(); 00036 void reset(); 00037 00038 protected: 00039 void reportValue(); 00040 void reportError(const std::string &message); 00041 void reportCanceled(); 00042 boost::recursive_mutex& mutex(); 00043 void notifyFinish(); 00044 00045 public: 00046 FutureBasePrivate *_p; 00047 }; 00048 00049 00050 //common state shared between a Promise and multiple Futures 00051 template <typename T> 00052 class FutureBaseTyped : public FutureBase { 00053 public: 00054 typedef typename FutureType<T>::type ValueType; 00055 FutureBaseTyped() 00056 : _value() 00057 , _async(FutureCallbackType_Async) 00058 { 00059 } 00060 00061 bool isCancelable() const 00062 { 00063 return _onCancel; 00064 } 00065 00066 void cancel(qi::Future<T>& future) 00067 { 00068 if (isFinished()) 00069 return; 00070 if (!_onCancel) 00071 throw FutureException(FutureException::ExceptionState_FutureNotCancelable); 00072 _onCancel(Promise<T>(future)); 00073 } 00074 00075 void setOnCancel(boost::function<void (Promise<T>)> onCancel) 00076 { 00077 _onCancel = onCancel; 00078 } 00079 00080 00081 void callCbNotify(qi::Future<T>& future) 00082 { 00083 for(unsigned i = 0; i<_onResult.size(); ++i) 00084 { 00085 try { 00086 if (_async == FutureCallbackType_Async) 00087 getEventLoop()->post(boost::bind(_onResult[i], future)); 00088 else 00089 _onResult[i](future); 00090 } catch(const qi::PointerLockException&) { // do nothing 00091 } catch(const std::exception& e) { 00092 qiLogError("qi.future") << "Exception caught in future callback " 00093 << e.what(); 00094 } catch (...) { 00095 qiLogError("qi.future") 00096 << "Unknown exception caught in future callback"; 00097 } 00098 } 00099 notifyFinish(); 00100 } 00101 00102 void setValue(qi::Future<T>& future, const ValueType &value) 00103 { 00104 // report-ready + onResult() must be Atomic to avoid 00105 // missing callbacks/double calls in case connect() is invoked at 00106 // the same time 00107 boost::recursive_mutex::scoped_lock lock(mutex()); 00108 if (!isRunning()) 00109 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00110 00111 _value = value; 00112 reportValue(); 00113 callCbNotify(future); 00114 } 00115 00116 /* 00117 * inplace api for promise 00118 */ 00119 void set(qi::Future<T>& future) 00120 { 00121 // report-ready + onResult() must be Atomic to avoid 00122 // missing callbacks/double calls in case connect() is invoked at 00123 // the same time 00124 boost::recursive_mutex::scoped_lock lock(mutex()); 00125 if (!isRunning()) 00126 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00127 00128 reportValue(); 00129 callCbNotify(future); 00130 } 00131 00132 void setError(qi::Future<T>& future, const std::string &message) 00133 { 00134 boost::recursive_mutex::scoped_lock lock(mutex()); 00135 if (!isRunning()) 00136 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00137 00138 reportError(message); 00139 callCbNotify(future); 00140 } 00141 00142 void setCanceled(qi::Future<T>& future) { 00143 boost::recursive_mutex::scoped_lock lock(mutex()); 00144 if (!isRunning()) 00145 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00146 00147 reportCanceled(); 00148 callCbNotify(future); 00149 } 00150 00151 00152 void connect(qi::Future<T> future, 00153 const boost::function<void (qi::Future<T>)> &s, 00154 FutureCallbackType type) 00155 { 00156 bool ready; 00157 { 00158 boost::recursive_mutex::scoped_lock lock(mutex()); 00159 _onResult.push_back(s); 00160 ready = isFinished(); 00161 } 00162 //result already ready, notify the callback 00163 if (ready) { 00164 if (type == FutureCallbackType_Async) 00165 getEventLoop()->post(boost::bind(s, future)); 00166 else 00167 { 00168 try { 00169 s(future); 00170 } catch(const ::qi::PointerLockException&) 00171 {/*do nothing*/} 00172 } 00173 } 00174 } 00175 00176 const ValueType &value(int msecs) const { 00177 FutureState state = wait(msecs); 00178 if (state == FutureState_Running) 00179 throw FutureException(FutureException::ExceptionState_FutureTimeout); 00180 if (state == FutureState_Canceled) 00181 throw FutureException(FutureException::ExceptionState_FutureCanceled); 00182 if (state == FutureState_FinishedWithError) 00183 throw FutureUserException(error(FutureTimeout_None)); 00184 return _value; 00185 } 00186 00187 private: 00188 friend class Promise<T>; 00189 typedef std::vector<boost::function<void (qi::Future<T>)> > Callbacks; 00190 Callbacks _onResult; 00191 ValueType _value; 00192 boost::function<void (Promise<T>)> _onCancel; 00193 FutureCallbackType _async; 00194 }; 00195 00196 template <typename T> 00197 void waitForFirstHelper(qi::Promise< qi::Future<T> >& prom, 00198 qi::Future<T>& fut, 00199 qi::Atomic<int>* count) { 00200 if (!prom.future().isFinished() && !fut.hasError()) 00201 { 00202 // An other future can trigger at the same time. 00203 // Don't bother to lock, just catch the FutureAlreadySet exception 00204 try 00205 { 00206 prom.setValue(fut); 00207 } 00208 catch(const FutureException&) 00209 {} 00210 } 00211 if (! --*count) 00212 { 00213 // I'm the last 00214 if (!prom.future().isFinished()) 00215 { 00216 // same 'race' as above. between two setError, not between a value and 00217 // an error. 00218 try 00219 { 00220 prom.setValue(makeFutureError<T>("No future returned successfully.")); 00221 } 00222 catch(const FutureException&) 00223 {} 00224 } 00225 delete count; 00226 } 00227 } 00228 } // namespace detail 00229 00230 template <typename T> 00231 qi::Future<T> makeFutureError(const std::string &error, FutureCallbackType async) { 00232 qi::Promise<T> prom(async); 00233 prom.setError(error); 00234 return prom.future(); 00235 } 00236 00237 template <typename T> 00238 void waitForAll(std::vector<Future<T> >& vect) { 00239 typename std::vector< Future<T> >::iterator it; 00240 qi::FutureBarrier<T> barrier; 00241 00242 for (it = vect.begin(); it != vect.end(); ++it) { 00243 barrier.addFuture(*it); 00244 } 00245 barrier.future().wait(); 00246 } 00247 00248 template <typename T> 00249 qi::FutureSync< qi::Future<T> > waitForFirst(std::vector< Future<T> >& vect) { 00250 typename std::vector< Future<T> >::iterator it; 00251 qi::Promise< qi::Future<T> > prom; 00252 qi::Atomic<int>* count = new qi::Atomic<int>(); 00253 count->swap((int)vect.size()); 00254 for (it = vect.begin(); it != vect.end(); ++it) { 00255 it->connect(boost::bind<void>(&detail::waitForFirstHelper<T>, prom, *it, count)); 00256 } 00257 return prom.future(); 00258 } 00259 00260 namespace detail 00261 { 00262 template<typename FT, typename PT, typename CONV> 00263 void futureAdapter(Future<FT> f, Promise<PT> p, CONV converter) 00264 { 00265 if (f.hasError()) 00266 p.setError(f.error()); 00267 else if (f.isCanceled()) 00268 p.setCanceled(); 00269 else 00270 { 00271 try { 00272 converter(f.value(), p.value()); 00273 } 00274 catch (const std::exception& e) 00275 { 00276 p.setError(std::string("futureAdapter conversion error: ") + e.what()); 00277 return; 00278 } 00279 p.trigger(); 00280 } 00281 } 00282 00283 template<typename FT> 00284 void futureCancelAdapter(boost::weak_ptr<FutureBaseTyped<FT> > wf) 00285 { 00286 if (boost::shared_ptr<FutureBaseTyped<FT> > f = wf.lock()) 00287 Future<FT>(f).cancel(); 00288 } 00289 } 00290 00291 template <> 00292 struct FutureValueConverter<void, void> 00293 { 00294 void operator()(void* in, void* out) 00295 { 00296 } 00297 }; 00298 00299 template <typename T> 00300 struct FutureValueConverter<T, void> 00301 { 00302 void operator()(const T& in, void* out) 00303 { 00304 } 00305 }; 00306 00307 template <typename T> 00308 struct FutureValueConverter<void, T> 00309 { 00310 void operator()(void* in, const T& out) 00311 { 00312 } 00313 }; 00314 00315 template<typename FT, typename PT> 00316 void adaptFuture(const Future<FT>& f, Promise<PT>& p) 00317 { 00318 if (f.isCancelable()) 00319 p.setup(boost::bind(&detail::futureCancelAdapter<FT>, 00320 boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p))); 00321 const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, FutureValueConverter<FT, PT> >, _1, p, 00322 FutureValueConverter<FT, PT>())); 00323 } 00324 00325 template<typename FT, typename PT, typename CONV> 00326 void adaptFuture(const Future<FT>& f, Promise<PT>& p, CONV converter) 00327 { 00328 if (f.isCancelable()) 00329 p.setup(boost::bind(&detail::futureCancelAdapter<FT>, 00330 boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p))); 00331 const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, CONV>, _1, p, converter)); 00332 } 00333 } 00334 00335 #endif // _QI_DETAILS_FUTURE_HXX_