[关闭]
@Dubyoo 2016-02-26T14:54:38.000000Z 字数 6038 阅读 3015

Active Object(基于ACE的主动对象模式)

ACE


1. 基础版

include

  1. #include <iostream>
  2. #include <string>
  3. #include "ace/ACE.h"
  4. #include "ace/Task_T.h"
  5. #include "ace/OS.h"
  6. #include "ace/Method_Request.h"
  7. #include "ace/Activation_Queue.h"
  8. using namespace std;

class Logger

  1. class Logger : public ACE_Task<ACE_MT_SYNCH>
  2. {
  3. public:
  4. Logger()
  5. {
  6. this->activate(); // 激活线程
  7. }
  8. int svc() // 该函数在另一个线程中执行
  9. {
  10. while (true)
  11. {
  12. // 遍历队列,执行命令
  13. auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());
  14. if (mq->call() == -1)
  15. break;
  16. }
  17. return 0;
  18. }
  19. void LogMsg(const string& msg)
  20. {
  21. cout << endl << msg << endl;
  22. ACE_OS::sleep(2);
  23. }
  24. void LogMsgActive(const string& msg);
  25. private:
  26. ACE_Activation_Queue cmdQueue;
  27. };

class LogMsgCmd

  1. class LogMsgCmd : public ACE_Method_Request
  2. {
  3. public:
  4. LogMsgCmd(Logger* plog, const string& msg)
  5. {
  6. this->log = plog;
  7. this->msg = msg;
  8. }
  9. int call()
  10. {
  11. this->log->LogMsg(msg);
  12. return 0;
  13. }
  14. private:
  15. Logger* log;
  16. string msg;
  17. };

Logger::LogMsgActive

  1. // 以主动的方式记录日志
  2. void Logger::LogMsgActive(const string& msg)
  3. {
  4. // 生成命令对象,插入到命令队列中
  5. cmdQueue.enqueue(new LogMsgCmd(this, msg));
  6. }

main

  1. int main()
  2. {
  3. Logger log;
  4. log.LogMsgActive("Hello");
  5. ACE_OS::sleep(1);
  6. log.LogMsgActive("ABC");
  7. while (true)
  8. ACE_OS::sleep(10);
  9. return 0;
  10. }

问题:

主动对象的基本结构就是这样,然而,由于主动对象是异步调用的,又引出了如下两个新问题:


2. ACE_Future对象的用法

ACE_Future是表示一个会在将来被赋值的"期货"对象,可以通过ready()函数查询它是否已经被赋值。该对象创建的时候是未赋值的,后期可以通过set()函数来进行赋值,所赋的值可以通过get()函数来获取。
  1. #include <iostream>
  2. #include "ace/Future.h"
  3. using namespace std;
  4. void get_infor(ACE_Future<string> &fu)
  5. {
  6. string state = fu.ready() ? "ready" : "not ready";
  7. cout << endl << state << endl;
  8. if (fu.ready())
  9. {
  10. string value;
  11. fu.get(value);
  12. cout << "value\t" << value << endl;
  13. }
  14. }
  15. int main()
  16. {
  17. ACE_Future<string> fu; // 新创建的Future对象,未经过set(),ready()返回false
  18. get_infor(fu);
  19. fu.set("12345"); // set()之后,ready()返回true
  20. get_infor(fu);
  21. get_infor(fu);
  22. fu.cancel(); // 重置Future对象,重置后,ready()返回false
  23. get_infor(fu);
  24. return 0;
  25. }

3. 进阶版(加入返回值的获取)

include

  1. #include <iostream>
  2. #include <string>
  3. #include "ace/ACE.h"
  4. #include "ace/Task_T.h"
  5. #include "ace/OS.h"
  6. #include "ace/Method_Request.h"
  7. #include "ace/Activation_Queue.h"
  8. #include "ace/Future.h"
  9. using namespace std;

class Logger

  1. class Logger : public ACE_Task<ACE_MT_SYNCH>
  2. {
  3. public:
  4. Logger()
  5. {
  6. this->activate(); // 激活线程
  7. }
  8. int svc() // 该函数在另一个线程中执行
  9. {
  10. while (true)
  11. {
  12. // 遍历队列,执行命令
  13. auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());
  14. if (mq->call() == -1)
  15. break;
  16. }
  17. return 0;
  18. }
  19. string LogMsg(const string& msg)
  20. {
  21. ACE_OS::sleep(2);
  22. cout << endl << msg << endl;
  23. return msg;
  24. }
  25. void LogMsgActive(const string& msg, ACE_Future<string>* result);
  26. private:
  27. ACE_Activation_Queue cmdQueue;
  28. };

class LogMsgCmd

  1. class LogMsgCmd : public ACE_Method_Request
  2. {
  3. public:
  4. LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result)
  5. {
  6. this->log = plog;
  7. this->msg = msg;
  8. this->result = result;
  9. }
  10. int call()
  11. {
  12. string reply = this->log->LogMsg(msg);
  13. result->set(reply);
  14. return 0;
  15. }
  16. private:
  17. ACE_Future<string>* result;
  18. Logger* log;
  19. string msg;
  20. };

Logger::LogMsgActive

  1. // 以主动的方式记录日志
  2. void Logger::LogMsgActive(const string& msg, ACE_Future<string>* result)
  3. {
  4. // 生成命令对象,插入到命令队列中
  5. cmdQueue.enqueue(new LogMsgCmd(this, msg, result));
  6. }

get_info

  1. // 获取结果,打印结果,返回bool表示是否获取成功
  2. bool get_info(ACE_Future<string>& fu)
  3. {
  4. bool ready = fu.ready();
  5. string state = ready ? "\t-->ready" : "\t-->not ready";
  6. cout << endl << state << endl;
  7. if (ready)
  8. {
  9. string value;
  10. fu.get(value);
  11. cout << "\t--->value:\t" << value << endl;
  12. }
  13. return ready;
  14. }

main

  1. int main()
  2. {
  3. ACE_Future<string> result;
  4. Logger log;
  5. log.LogMsgActive("Hello", &result);
  6. while (true)
  7. {
  8. if (get_info(result)) // 循环获取结果,直到获取到结果,再继续向下执行
  9. break;
  10. ACE_OS::sleep(1);
  11. }
  12. cout << endl << "cmd end" << endl;
  13. result.cancel(); // 重置Future
  14. log.LogMsgActive("ABC", &result);
  15. while (true)
  16. {
  17. if (get_info(result)) // 再次获取结果
  18. break;
  19. ACE_OS::sleep(1);
  20. }
  21. cout << endl << "cmd end" << endl;
  22. }

问题:

这种查询模式比较简单有效,但存在一个问题:调用线程必须不断轮询ACE_Future对象以获取返回值,这样的效率比较低。可以通过观察者模式解决这个问题:在ACE_Future对象上注册一个观察者,当ACE_Future对象的值发生改变(异步命令执行完成)时主动通知该观察者,从而获取返回值。


4. 观察者 ACE_Future_Observer

ACE中的观察者模式可以通过ACE_Future_Observer来实现,使用方法如下:
  1. #include "ace/Future.h"
  2. #include <string>
  3. #include <iostream>
  4. using namespace std;
  5. class MyObserver:public ACE_Future_Observer<string>
  6. {
  7. virtual void update (const ACE_Future<string> &future)
  8. {
  9. string value;
  10. future.get(value);
  11. cout<<endl<<"change:\t"<<value<<endl;
  12. }
  13. };
  14. int main(int argc, char *argv[])
  15. {
  16. MyObserver obv;
  17. ACE_Future<string> fu;
  18. fu.attach(&obv);
  19. ACE_OS::sleep(3);
  20. fu.set("12345");
  21. while(true)
  22. ACE_OS::sleep(3);
  23. return 0;
  24. }

5. 加入观察者的主动对象

  1. #pragma once
  2. #include <iostream>
  3. #include "ace/Task_T.h"
  4. #include "ace/OS.h"
  5. #include "ace/Method_Request.h"
  6. #include "ace/Activation_Queue.h"
  7. #include "ace/Future.h"
  8. using namespace std;
  9. // 观察者 Future Observer
  10. class MyObserver : public ACE_Future_Observer<string>
  11. {
  12. public:
  13. void update(const ACE_Future<string>& future);
  14. };
  15. // 主动对象 Logger
  16. class Logger : public ACE_Task<ACE_MT_SYNCH>
  17. {
  18. public:
  19. Logger();
  20. virtual int svc(); // 该函数在另一个线程中执行
  21. string LogMsg(const string& msg);
  22. void LogMsgActive(const string& msg, ACE_Future<string>* result);
  23. private:
  24. ACE_Activation_Queue cmdQueue;
  25. };
  26. // ACE_Method_Request 是ACE提供的命令模式接口,
  27. // 命令接口调用函数为int call(),
  28. // 通过它可以把每一个操作日志的调用命令封装成一个LogMsgCmd对象
  29. class LogMsgCmd : public ACE_Method_Request
  30. {
  31. public:
  32. LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result);
  33. int call(); // 命令模式接口函数
  34. private:
  35. ACE_Future<string>* result;
  36. Logger* log;
  37. string msg;
  38. };
  1. #include "ACEActiveObjectLogger.h"
  2. void MyObserver::update(const ACE_Future<string>& future)
  3. {
  4. string value;
  5. future.get(value);
  6. cout << "Observer:\t--->value:\t" << value << endl;
  7. }
  8. Logger::Logger()
  9. {
  10. this->activate(); // 激活一个新的线程
  11. }
  12. int Logger::svc() // 新线程的启动后执行的任务
  13. {
  14. while (true)
  15. {
  16. // 遍历队列,执行命令
  17. auto_ptr<ACE_Method_Request> mq(this->cmdQueue.dequeue());
  18. if (mq->call() == -1)
  19. break;
  20. }
  21. return 0;
  22. }
  23. string Logger::LogMsg(const string& msg)
  24. {
  25. ACE_OS::sleep(2); // 模拟任务处理时间
  26. cout << endl << "Handle message:\t" << msg << endl;
  27. return msg;
  28. }
  29. // 以主动的方式记录日志
  30. void Logger::LogMsgActive(const string& msg, ACE_Future<string>* result)
  31. {
  32. // 生成日志的命令对象,插入到命令队列中
  33. cmdQueue.enqueue(new LogMsgCmd(this, msg, result));
  34. }
  35. LogMsgCmd::LogMsgCmd(Logger* plog, const string& msg, ACE_Future<string>* result)
  36. : log(plog)
  37. , msg(msg)
  38. , result(result)
  39. {
  40. }
  41. int LogMsgCmd::call()
  42. {
  43. string reply = this->log->LogMsg(msg);
  44. result->set(reply); // 处理完毕的返回值放入 ACE_Future(观察者的观察对象)
  45. return 0;
  46. }
  1. #pragma comment(lib, "ACED.lib")
  2. #include "ACEActiveObjectLogger.h"
  3. int main(int, char*[])
  4. {
  5. MyObserver observer, observer2;
  6. ACE_Future<string> result, result2;
  7. // 注册ACE_Future到观察者
  8. result.attach(&observer);
  9. result2.attach(&observer2);
  10. Logger log;
  11. log.LogMsgActive("Hello", &result);
  12. log.LogMsgActive("ABC", &result2);
  13. while (true)
  14. ACE_OS::sleep(10);
  15. return 0;
  16. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注