@Dubyoo
2016-02-26T14:54:38.000000Z
字数 6038
阅读 3015
ACE
#include <iostream>
#include <string>
#include "ace/ACE.h"
#include "ace/Task_T.h"
#include "ace/OS.h"
#include "ace/Method_Request.h"
#include "ace/Activation_Queue.h"
using namespace std;
class Logger : public ACE_Task<ACE_MT_SYNCH>
{
public:
Logger()
{
this->activate(); // 激活线程
}
int svc() // 该函数在另一个线程中执行
{
while (true)
{
// 遍历队列,执行命令
auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());
if (mq->call() == -1)
break;
}
return 0;
}
void LogMsg(const string& msg)
{
cout << endl << msg << endl;
ACE_OS::sleep(2);
}
void LogMsgActive(const string& msg);
private:
ACE_Activation_Queue cmdQueue;
};
class LogMsgCmd : public ACE_Method_Request
{
public:
LogMsgCmd(Logger* plog, const string& msg)
{
this->log = plog;
this->msg = msg;
}
int call()
{
this->log->LogMsg(msg);
return 0;
}
private:
Logger* log;
string msg;
};
// 以主动的方式记录日志
void Logger::LogMsgActive(const string& msg)
{
// 生成命令对象,插入到命令队列中
cmdQueue.enqueue(new LogMsgCmd(this, msg));
}
int main()
{
Logger log;
log.LogMsgActive("Hello");
ACE_OS::sleep(1);
log.LogMsgActive("ABC");
while (true)
ACE_OS::sleep(10);
return 0;
}
主动对象的基本结构就是这样,然而,由于主动对象是异步调用的,又引出了如下两个新问题:
ACE_Future是表示一个会在将来被赋值的"期货"对象,可以通过ready()函数查询它是否已经被赋值。该对象创建的时候是未赋值的,后期可以通过set()函数来进行赋值,所赋的值可以通过get()函数来获取。
#include <iostream>
#include "ace/Future.h"
using namespace std;
void get_infor(ACE_Future<string> &fu)
{
string state = fu.ready() ? "ready" : "not ready";
cout << endl << state << endl;
if (fu.ready())
{
string value;
fu.get(value);
cout << "value\t" << value << endl;
}
}
int main()
{
ACE_Future<string> fu; // 新创建的Future对象,未经过set(),ready()返回false
get_infor(fu);
fu.set("12345"); // set()之后,ready()返回true
get_infor(fu);
get_infor(fu);
fu.cancel(); // 重置Future对象,重置后,ready()返回false
get_infor(fu);
return 0;
}
#include <iostream>
#include <string>
#include "ace/ACE.h"
#include "ace/Task_T.h"
#include "ace/OS.h"
#include "ace/Method_Request.h"
#include "ace/Activation_Queue.h"
#include "ace/Future.h"
using namespace std;
class Logger : public ACE_Task<ACE_MT_SYNCH>
{
public:
Logger()
{
this->activate(); // 激活线程
}
int svc() // 该函数在另一个线程中执行
{
while (true)
{
// 遍历队列,执行命令
auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());
if (mq->call() == -1)
break;
}
return 0;
}
string LogMsg(const string& msg)
{
ACE_OS::sleep(2);
cout << endl << msg << endl;
return msg;
}
void LogMsgActive(const string& msg, ACE_Future<string>* result);
private:
ACE_Activation_Queue cmdQueue;
};
class LogMsgCmd : public ACE_Method_Request
{
public:
LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result)
{
this->log = plog;
this->msg = msg;
this->result = result;
}
int call()
{
string reply = this->log->LogMsg(msg);
result->set(reply);
return 0;
}
private:
ACE_Future<string>* result;
Logger* log;
string msg;
};
// 以主动的方式记录日志
void Logger::LogMsgActive(const string& msg, ACE_Future<string>* result)
{
// 生成命令对象,插入到命令队列中
cmdQueue.enqueue(new LogMsgCmd(this, msg, result));
}
// 获取结果,打印结果,返回bool表示是否获取成功
bool get_info(ACE_Future<string>& fu)
{
bool ready = fu.ready();
string state = ready ? "\t-->ready" : "\t-->not ready";
cout << endl << state << endl;
if (ready)
{
string value;
fu.get(value);
cout << "\t--->value:\t" << value << endl;
}
return ready;
}
int main()
{
ACE_Future<string> result;
Logger log;
log.LogMsgActive("Hello", &result);
while (true)
{
if (get_info(result)) // 循环获取结果,直到获取到结果,再继续向下执行
break;
ACE_OS::sleep(1);
}
cout << endl << "cmd end" << endl;
result.cancel(); // 重置Future
log.LogMsgActive("ABC", &result);
while (true)
{
if (get_info(result)) // 再次获取结果
break;
ACE_OS::sleep(1);
}
cout << endl << "cmd end" << endl;
}
这种查询模式比较简单有效,但存在一个问题:调用线程必须不断轮询ACE_Future对象以获取返回值,这样的效率比较低。可以通过观察者模式解决这个问题:在ACE_Future对象上注册一个观察者,当ACE_Future对象的值发生改变(异步命令执行完成)时主动通知该观察者,从而获取返回值。
ACE中的观察者模式可以通过ACE_Future_Observer来实现,使用方法如下:
#include "ace/Future.h"
#include <string>
#include <iostream>
using namespace std;
class MyObserver:public ACE_Future_Observer<string>
{
virtual void update (const ACE_Future<string> &future)
{
string value;
future.get(value);
cout<<endl<<"change:\t"<<value<<endl;
}
};
int main(int argc, char *argv[])
{
MyObserver obv;
ACE_Future<string> fu;
fu.attach(&obv);
ACE_OS::sleep(3);
fu.set("12345");
while(true)
ACE_OS::sleep(3);
return 0;
}
#pragma once
#include <iostream>
#include "ace/Task_T.h"
#include "ace/OS.h"
#include "ace/Method_Request.h"
#include "ace/Activation_Queue.h"
#include "ace/Future.h"
using namespace std;
// 观察者 Future Observer
class MyObserver : public ACE_Future_Observer<string>
{
public:
void update(const ACE_Future<string>& future);
};
// 主动对象 Logger
class Logger : public ACE_Task<ACE_MT_SYNCH>
{
public:
Logger();
virtual int svc(); // 该函数在另一个线程中执行
string LogMsg(const string& msg);
void LogMsgActive(const string& msg, ACE_Future<string>* result);
private:
ACE_Activation_Queue cmdQueue;
};
// ACE_Method_Request 是ACE提供的命令模式接口,
// 命令接口调用函数为int call(),
// 通过它可以把每一个操作日志的调用命令封装成一个LogMsgCmd对象
class LogMsgCmd : public ACE_Method_Request
{
public:
LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result);
int call(); // 命令模式接口函数
private:
ACE_Future<string>* result;
Logger* log;
string msg;
};
#include "ACEActiveObjectLogger.h"
void MyObserver::update(const ACE_Future<string>& future)
{
string value;
future.get(value);
cout << "Observer:\t--->value:\t" << value << endl;
}
Logger::Logger()
{
this->activate(); // 激活一个新的线程
}
int Logger::svc() // 新线程的启动后执行的任务
{
while (true)
{
// 遍历队列,执行命令
auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());
if (mq->call() == -1)
break;
}
return 0;
}
string Logger::LogMsg(const string& msg)
{
ACE_OS::sleep(2); // 模拟任务处理时间
cout << endl << "Handle message:\t" << msg << endl;
return msg;
}
// 以主动的方式记录日志
void Logger::LogMsgActive(const string& msg, ACE_Future<string>* result)
{
// 生成日志的命令对象,插入到命令队列中
cmdQueue.enqueue(new LogMsgCmd(this, msg, result));
}
LogMsgCmd::LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result)
: log(plog)
, msg(msg)
, result(result)
{
}
int LogMsgCmd::call()
{
string reply = this->log->LogMsg(msg);
result->set(reply); // 处理完毕的返回值放入 ACE_Future(观察者的观察对象)
return 0;
}
#pragma comment(lib, "ACED.lib")
#include "ACEActiveObjectLogger.h"
int main(int, char*[])
{
MyObserver observer, observer2;
ACE_Future<string> result, result2;
// 注册ACE_Future到观察者
result.attach(&observer);
result2.attach(&observer2);
Logger log;
log.LogMsgActive("Hello", &result);
log.LogMsgActive("ABC", &result2);
while (true)
ACE_OS::sleep(10);
return 0;
}