@Dubyoo
2016-02-26T06:54:38.000000Z
字数 6038
阅读 3303
ACE
#include <iostream>#include <string>#include "ace/ACE.h"#include "ace/Task_T.h"#include "ace/OS.h"#include "ace/Method_Request.h"#include "ace/Activation_Queue.h"using namespace std;
class Logger : public ACE_Task<ACE_MT_SYNCH>{public:Logger(){this->activate(); // 激活线程}int svc() // 该函数在另一个线程中执行{while (true){// 遍历队列,执行命令auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());if (mq->call() == -1)break;}return 0;}void LogMsg(const string& msg){cout << endl << msg << endl;ACE_OS::sleep(2);}void LogMsgActive(const string& msg);private:ACE_Activation_Queue cmdQueue;};
class LogMsgCmd : public ACE_Method_Request{public:LogMsgCmd(Logger* plog, const string& msg){this->log = plog;this->msg = msg;}int call(){this->log->LogMsg(msg);return 0;}private:Logger* log;string msg;};
// 以主动的方式记录日志void Logger::LogMsgActive(const string& msg){// 生成命令对象,插入到命令队列中cmdQueue.enqueue(new LogMsgCmd(this, msg));}
int main(){Logger log;log.LogMsgActive("Hello");ACE_OS::sleep(1);log.LogMsgActive("ABC");while (true)ACE_OS::sleep(10);return 0;}
主动对象的基本结构就是这样,然而,由于主动对象是异步调用的,又引出了如下两个新问题:
ACE_Future是表示一个会在将来被赋值的"期货"对象,可以通过ready()函数查询它是否已经被赋值。该对象创建的时候是未赋值的,后期可以通过set()函数来进行赋值,所赋的值可以通过get()函数来获取。
#include <iostream>#include "ace/Future.h"using namespace std;void get_infor(ACE_Future<string> &fu){string state = fu.ready() ? "ready" : "not ready";cout << endl << state << endl;if (fu.ready()){string value;fu.get(value);cout << "value\t" << value << endl;}}int main(){ACE_Future<string> fu; // 新创建的Future对象,未经过set(),ready()返回falseget_infor(fu);fu.set("12345"); // set()之后,ready()返回trueget_infor(fu);get_infor(fu);fu.cancel(); // 重置Future对象,重置后,ready()返回falseget_infor(fu);return 0;}
#include <iostream>#include <string>#include "ace/ACE.h"#include "ace/Task_T.h"#include "ace/OS.h"#include "ace/Method_Request.h"#include "ace/Activation_Queue.h"#include "ace/Future.h"using namespace std;
class Logger : public ACE_Task<ACE_MT_SYNCH>{public:Logger(){this->activate(); // 激活线程}int svc() // 该函数在另一个线程中执行{while (true){// 遍历队列,执行命令auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());if (mq->call() == -1)break;}return 0;}string LogMsg(const string& msg){ACE_OS::sleep(2);cout << endl << msg << endl;return msg;}void LogMsgActive(const string& msg, ACE_Future<string>* result);private:ACE_Activation_Queue cmdQueue;};
class LogMsgCmd : public ACE_Method_Request{public:LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result){this->log = plog;this->msg = msg;this->result = result;}int call(){string reply = this->log->LogMsg(msg);result->set(reply);return 0;}private:ACE_Future<string>* result;Logger* log;string msg;};
// 以主动的方式记录日志void Logger::LogMsgActive(const string& msg, ACE_Future<string>* result){// 生成命令对象,插入到命令队列中cmdQueue.enqueue(new LogMsgCmd(this, msg, result));}
// 获取结果,打印结果,返回bool表示是否获取成功bool get_info(ACE_Future<string>& fu){bool ready = fu.ready();string state = ready ? "\t-->ready" : "\t-->not ready";cout << endl << state << endl;if (ready){string value;fu.get(value);cout << "\t--->value:\t" << value << endl;}return ready;}
int main(){ACE_Future<string> result;Logger log;log.LogMsgActive("Hello", &result);while (true){if (get_info(result)) // 循环获取结果,直到获取到结果,再继续向下执行break;ACE_OS::sleep(1);}cout << endl << "cmd end" << endl;result.cancel(); // 重置Futurelog.LogMsgActive("ABC", &result);while (true){if (get_info(result)) // 再次获取结果break;ACE_OS::sleep(1);}cout << endl << "cmd end" << endl;}
这种查询模式比较简单有效,但存在一个问题:调用线程必须不断轮询ACE_Future对象以获取返回值,这样的效率比较低。可以通过观察者模式解决这个问题:在ACE_Future对象上注册一个观察者,当ACE_Future对象的值发生改变(异步命令执行完成)时主动通知该观察者,从而获取返回值。
ACE中的观察者模式可以通过ACE_Future_Observer来实现,使用方法如下:
#include "ace/Future.h"#include <string>#include <iostream>using namespace std;class MyObserver:public ACE_Future_Observer<string>{virtual void update (const ACE_Future<string> &future){string value;future.get(value);cout<<endl<<"change:\t"<<value<<endl;}};int main(int argc, char *argv[]){MyObserver obv;ACE_Future<string> fu;fu.attach(&obv);ACE_OS::sleep(3);fu.set("12345");while(true)ACE_OS::sleep(3);return 0;}
#pragma once#include <iostream>#include "ace/Task_T.h"#include "ace/OS.h"#include "ace/Method_Request.h"#include "ace/Activation_Queue.h"#include "ace/Future.h"using namespace std;// 观察者 Future Observerclass MyObserver : public ACE_Future_Observer<string>{public:void update(const ACE_Future<string>& future);};// 主动对象 Loggerclass Logger : public ACE_Task<ACE_MT_SYNCH>{public:Logger();virtual int svc(); // 该函数在另一个线程中执行string LogMsg(const string& msg);void LogMsgActive(const string& msg, ACE_Future<string>* result);private:ACE_Activation_Queue cmdQueue;};// ACE_Method_Request 是ACE提供的命令模式接口,// 命令接口调用函数为int call(),// 通过它可以把每一个操作日志的调用命令封装成一个LogMsgCmd对象class LogMsgCmd : public ACE_Method_Request{public:LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result);int call(); // 命令模式接口函数private:ACE_Future<string>* result;Logger* log;string msg;};
#include "ACEActiveObjectLogger.h"void MyObserver::update(const ACE_Future<string>& future){string value;future.get(value);cout << "Observer:\t--->value:\t" << value << endl;}Logger::Logger(){this->activate(); // 激活一个新的线程}int Logger::svc() // 新线程的启动后执行的任务{while (true){// 遍历队列,执行命令auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());if (mq->call() == -1)break;}return 0;}string Logger::LogMsg(const string& msg){ACE_OS::sleep(2); // 模拟任务处理时间cout << endl << "Handle message:\t" << msg << endl;return msg;}// 以主动的方式记录日志void Logger::LogMsgActive(const string& msg, ACE_Future<string>* result){// 生成日志的命令对象,插入到命令队列中cmdQueue.enqueue(new LogMsgCmd(this, msg, result));}LogMsgCmd::LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result): log(plog), msg(msg), result(result){}int LogMsgCmd::call(){string reply = this->log->LogMsg(msg);result->set(reply); // 处理完毕的返回值放入 ACE_Future(观察者的观察对象)return 0;}
#pragma comment(lib, "ACED.lib")#include "ACEActiveObjectLogger.h"int main(int, char*[]){MyObserver observer, observer2;ACE_Future<string> result, result2;// 注册ACE_Future到观察者result.attach(&observer);result2.attach(&observer2);Logger log;log.LogMsgActive("Hello", &result);log.LogMsgActive("ABC", &result2);while (true)ACE_OS::sleep(10);return 0;}