@Dubyoo
2016-02-29T08:28:38.000000Z
字数 6041
阅读 3776
ACE
要点:
1. 继承 ACE_Event_Handler
2. 重载回调函数 handle_timeout()
3. 设置定时器 reactor()->schedule_timer( , , , )
4. 循环调用Reactor事件处理 ACE_Reactor::instance()->handle_events()
一个计时器的例子
// ACEReactorTimerTest.h#pragma once#include <iostream>#include "ace/Reactor.h"#include "ace/OS.h"#include "ace/Log_Msg.h"using namespace std;class MyTimerHandler : public ACE_Event_Handler{private:int delay;int inteval;int timerid;public:MyTimerHandler(int delay, int inteval): delay(delay), inteval(inteval){ }int open(){ACE_Time_Value delaytime(delay);ACE_Time_Value intevaltime(inteval);// Reactor中的定时器timerid = reactor()->schedule_timer(this, // 第1个参数:指定一个ACE_Event_Handler类指针,到达delaytime时,会调用该类下的handle_timeout()函数"Hello timeout", // 第2个参数:向handle_timeout(const ACE_Time_Value&, const void*)的传参,类型为const void*delaytime, // 第3个参数:延时delaytime,到达时间后调用ACE_Event_Handler下的handle_timeout()intevaltime // 第4个参数:间隔时间,每隔一段时间调用handle_timeout();若为0,则只执行一次);return timerid;}int close(){return reactor()->cancel_timer(timerid);}int handle_timeout(const ACE_Time_Value& current_time, const void* arg){time_t epoch = ((timespec_t)current_time).tv_sec;cout << "timeout arg:\t" << (char*)arg << endl;ACE_DEBUG((LM_INFO, ACE_TEXT("handle_timeout: %s\n"), ACE_OS::ctime(&epoch)));return 0;}};
// main.cpp#pragma comment(lib, "ACED.lib")#include "ACEReactorTimerTest.h"int main(int, char*[]){MyTimerHandler * timer = new MyTimerHandler(1, 3);timer->reactor(ACE_Reactor::instance());timer->open();int i = 0;while (++i){cout << "Repeat begin\t" << i << endl;ACE_Reactor::instance()->handle_events();cout << "Repeat end\t" << i << endl;}return 0;}
要点:
1. 继承 ACE_Event_Handler
2. 重载回调函数 handle_*()
3. 登记到反应器 ACE_Reactor::instance()->register_handler()
4. 循环调用Reactor事件处理 ACE_Reactor::instance()->handle_events()
// MyClient.h#pragma once#include <iostream>#include "ace/Reactor.h"#include "ace/SOCK_Stream.h"#include "ace/SOCK_Connector.h"#include "ace/OS.h"using namespace std;class MyClient : public ACE_Event_Handler{public:ACE_HANDLE get_handle() const;int handle_input(ACE_HANDLE fd);int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);ACE_SOCK_STREAM& Peer();int handle_timeout(const ACE_Time_Value& current_time, const void* arg);private:ACE_SOCK_STREAM peer;};ACE_HANDLE MyClient::get_handle() const{return peer.get_handle();}int MyClient::handle_input(ACE_HANDLE fd){int recv = 0;char buffer[1024] = "";memset(buffer, 0, 1024);if ((recv = peer.recv(buffer, 1024)) <= 0)return -1;buffer[recv] = '\0';cout << "recv:\t" << buffer << endl;// peer.send(buffer, recv + 1);return 0;}int MyClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask){cout << "Connection closed." << endl;return ACE_Event_Handler::handle_close(handle, close_mask);}ACE_SOCK_STREAM& MyClient::Peer(){return peer;}int MyClient::handle_timeout(const ACE_Time_Value& current_time, const void* arg){cout << "send:\t" << (char*)arg << endl;return peer.send((char*)arg, strlen((char*)arg) + 1);}void RunClient(){MyClient client;ACE_SOCK_CONNECTOR connector;ACE_INET_Addr addr(3000, "127.0.0.1");ACE_Time_Value timeout(5, 0);if (connector.connect(client.Peer(), addr, &timeout) != 0){cout << endl << "Connect fail." << endl;}cout << "Connected." << endl;ACE_Reactor::instance()->register_handler(&client, ACE_Event_Handler::READ_MASK);ACE_Time_Value delaytime(5);ACE_Time_Value intevaltime(2);ACE_Reactor::instance()->schedule_timer(&client, "Keep alive...", delaytime, intevaltime);while (true){ACE_Reactor::instance()->handle_events();}}
// main.cpp#pragma comment(lib, "ACED.lib")#include "MyClient.h"int main(int, char*[]){RunClient();return 0;}
// MyServer.h#pragma once#include <iostream>#include "ace/Reactor.h"#include "ace/SOCK_Stream.h"#include "ace/SOCK_Connector.h"#include "ace/SOCK_Acceptor.h"using namespace std;class MyServer : public ACE_Event_Handler{public:ACE_SOCK_STREAM& Peer();int open();ACE_HANDLE get_handle() const;virtual int handle_input(ACE_HANDLE fd);virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);private:ACE_SOCK_STREAM peer;};class MyAcceptor : public ACE_Event_Handler{public:~MyAcceptor();int open(const ACE_INET_Addr& addr);ACE_HANDLE get_handle() const;virtual int handle_input(ACE_HANDLE fd);private:ACE_SOCK_Acceptor acceptor;};ACE_SOCK_STREAM& MyServer::Peer(){return peer;}int MyServer::open(){return ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);}ACE_HANDLE MyServer::get_handle() const{return peer.get_handle();}int MyServer::handle_input(ACE_HANDLE fd){int recv = 0;char buffer[1024] = "";memset(buffer, 0, 1024);ACE_INET_Addr client_addr;peer.get_remote_addr(client_addr);if ((recv = peer.recv(buffer, 1024)) > 0){buffer[recv] = '\0';cout << "recv from client(" << client_addr.get_host_addr() << ":" << client_addr.get_port_number() << "):\t" << buffer << endl;return 0;}return -1;}int MyServer::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask){cout << endl << "Connection closed." << endl;return ACE_Event_Handler::handle_close(handle, close_mask);}MyAcceptor::~MyAcceptor(){this->handle_close(ACE_INVALID_HANDLE, 0);}int MyAcceptor::open(const ACE_INET_Addr& listen_addr){if (acceptor.open(listen_addr, 1) == -1){cout << endl << "Open port fail." << endl;return -1;}cout << "Open port:\t" << listen_addr.get_host_addr() << ":" << listen_addr.get_port_number() << endl;return ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);}ACE_HANDLE MyAcceptor::get_handle() const{return acceptor.get_handle();}int MyAcceptor::handle_input(ACE_HANDLE fd){MyServer * server = new MyServer();if (acceptor.accept(server->Peer()) == -1){cout << endl << "Accept client fail." << endl;return -1;}ACE_INET_Addr client_addr;server->Peer().get_remote_addr(client_addr);cout << "Accept client:\t" << client_addr.get_host_addr() << ":" << client_addr.get_port_number() << endl;server->Peer().send("Hello client! You are online.", 30);ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);if (server->open() == -1)server->handle_close(ACE_INVALID_HANDLE, 0);return 0;}void RunServer(){ACE_INET_Addr server_addr(3000, "127.0.0.1");MyAcceptor acceptor;acceptor.open(server_addr);while (true){ACE_Reactor::instance()->handle_events();}}
// main.cpp#pragma comment(lib, "ACED.lib")#include "MyServer.h"int main(int, char*[]){RunServer();return 0;}