@Dubyoo
2016-02-29T16:28:38.000000Z
字数 6041
阅读 3498
ACE
要点:
1. 继承 ACE_Event_Handler
2. 重载回调函数 handle_timeout()
3. 设置定时器 reactor()->schedule_timer( , , , )
4. 循环调用Reactor事件处理 ACE_Reactor::instance()->handle_events()
一个计时器的例子
// ACEReactorTimerTest.h
#pragma once
#include <iostream>
#include "ace/Reactor.h"
#include "ace/OS.h"
#include "ace/Log_Msg.h"
using namespace std;
class MyTimerHandler : public ACE_Event_Handler
{
private:
int delay;
int inteval;
int timerid;
public:
MyTimerHandler(int delay, int inteval)
: delay(delay)
, inteval(inteval)
{ }
int open()
{
ACE_Time_Value delaytime(delay);
ACE_Time_Value intevaltime(inteval);
// Reactor中的定时器
timerid = reactor()->schedule_timer(
this, // 第1个参数:指定一个ACE_Event_Handler类指针,到达delaytime时,会调用该类下的handle_timeout()函数
"Hello timeout", // 第2个参数:向handle_timeout(const ACE_Time_Value&, const void*)的传参,类型为const void*
delaytime, // 第3个参数:延时delaytime,到达时间后调用ACE_Event_Handler下的handle_timeout()
intevaltime // 第4个参数:间隔时间,每隔一段时间调用handle_timeout();若为0,则只执行一次
);
return timerid;
}
int close()
{
return reactor()->cancel_timer(timerid);
}
int handle_timeout(const ACE_Time_Value& current_time, const void* arg)
{
time_t epoch = ((timespec_t)current_time).tv_sec;
cout << "timeout arg:\t" << (char*)arg << endl;
ACE_DEBUG((LM_INFO, ACE_TEXT("handle_timeout: %s\n"), ACE_OS::ctime(&epoch)));
return 0;
}
};
// main.cpp
#pragma comment(lib, "ACED.lib")
#include "ACEReactorTimerTest.h"
int main(int, char*[])
{
MyTimerHandler * timer = new MyTimerHandler(1, 3);
timer->reactor(ACE_Reactor::instance());
timer->open();
int i = 0;
while (++i)
{
cout << "Repeat begin\t" << i << endl;
ACE_Reactor::instance()->handle_events();
cout << "Repeat end\t" << i << endl;
}
return 0;
}
要点:
1. 继承 ACE_Event_Handler
2. 重载回调函数 handle_*()
3. 登记到反应器 ACE_Reactor::instance()->register_handler()
4. 循环调用Reactor事件处理 ACE_Reactor::instance()->handle_events()
// MyClient.h
#pragma once
#include <iostream>
#include "ace/Reactor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
#include "ace/OS.h"
using namespace std;
class MyClient : public ACE_Event_Handler
{
public:
ACE_HANDLE get_handle() const;
int handle_input(ACE_HANDLE fd);
int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
ACE_SOCK_STREAM& Peer();
int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
private:
ACE_SOCK_STREAM peer;
};
ACE_HANDLE MyClient::get_handle() const
{
return peer.get_handle();
}
int MyClient::handle_input(ACE_HANDLE fd)
{
int recv = 0;
char buffer[1024] = "";
memset(buffer, 0, 1024);
if ((recv = peer.recv(buffer, 1024)) <= 0)
return -1;
buffer[recv] = '\0';
cout << "recv:\t" << buffer << endl;
// peer.send(buffer, recv + 1);
return 0;
}
int MyClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
cout << "Connection closed." << endl;
return ACE_Event_Handler::handle_close(handle, close_mask);
}
ACE_SOCK_STREAM& MyClient::Peer()
{
return peer;
}
int MyClient::handle_timeout(const ACE_Time_Value& current_time, const void* arg)
{
cout << "send:\t" << (char*)arg << endl;
return peer.send((char*)arg, strlen((char*)arg) + 1);
}
void RunClient()
{
MyClient client;
ACE_SOCK_CONNECTOR connector;
ACE_INET_Addr addr(3000, "127.0.0.1");
ACE_Time_Value timeout(5, 0);
if (connector.connect(client.Peer(), addr, &timeout) != 0)
{
cout << endl << "Connect fail." << endl;
}
cout << "Connected." << endl;
ACE_Reactor::instance()->register_handler(&client, ACE_Event_Handler::READ_MASK);
ACE_Time_Value delaytime(5);
ACE_Time_Value intevaltime(2);
ACE_Reactor::instance()->schedule_timer(&client, "Keep alive...", delaytime, intevaltime);
while (true)
{
ACE_Reactor::instance()->handle_events();
}
}
// main.cpp
#pragma comment(lib, "ACED.lib")
#include "MyClient.h"
int main(int, char*[])
{
RunClient();
return 0;
}
// MyServer.h
#pragma once
#include <iostream>
#include "ace/Reactor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
using namespace std;
class MyServer : public ACE_Event_Handler
{
public:
ACE_SOCK_STREAM& Peer();
int open();
ACE_HANDLE get_handle() const;
virtual int handle_input(ACE_HANDLE fd);
virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
private:
ACE_SOCK_STREAM peer;
};
class MyAcceptor : public ACE_Event_Handler
{
public:
~MyAcceptor();
int open(const ACE_INET_Addr& addr);
ACE_HANDLE get_handle() const;
virtual int handle_input(ACE_HANDLE fd);
private:
ACE_SOCK_Acceptor acceptor;
};
ACE_SOCK_STREAM& MyServer::Peer()
{
return peer;
}
int MyServer::open()
{
return ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);
}
ACE_HANDLE MyServer::get_handle() const
{
return peer.get_handle();
}
int MyServer::handle_input(ACE_HANDLE fd)
{
int recv = 0;
char buffer[1024] = "";
memset(buffer, 0, 1024);
ACE_INET_Addr client_addr;
peer.get_remote_addr(client_addr);
if ((recv = peer.recv(buffer, 1024)) > 0)
{
buffer[recv] = '\0';
cout << "recv from client(" << client_addr.get_host_addr() << ":" << client_addr.get_port_number() << "):\t" << buffer << endl;
return 0;
}
return -1;
}
int MyServer::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
cout << endl << "Connection closed." << endl;
return ACE_Event_Handler::handle_close(handle, close_mask);
}
MyAcceptor::~MyAcceptor()
{
this->handle_close(ACE_INVALID_HANDLE, 0);
}
int MyAcceptor::open(const ACE_INET_Addr& listen_addr)
{
if (acceptor.open(listen_addr, 1) == -1)
{
cout << endl << "Open port fail." << endl;
return -1;
}
cout << "Open port:\t" << listen_addr.get_host_addr() << ":" << listen_addr.get_port_number() << endl;
return ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_HANDLE MyAcceptor::get_handle() const
{
return acceptor.get_handle();
}
int MyAcceptor::handle_input(ACE_HANDLE fd)
{
MyServer * server = new MyServer();
if (acceptor.accept(server->Peer()) == -1)
{
cout << endl << "Accept client fail." << endl;
return -1;
}
ACE_INET_Addr client_addr;
server->Peer().get_remote_addr(client_addr);
cout << "Accept client:\t" << client_addr.get_host_addr() << ":" << client_addr.get_port_number() << endl;
server->Peer().send("Hello client! You are online.", 30);
ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);
if (server->open() == -1)
server->handle_close(ACE_INVALID_HANDLE, 0);
return 0;
}
void RunServer()
{
ACE_INET_Addr server_addr(3000, "127.0.0.1");
MyAcceptor acceptor;
acceptor.open(server_addr);
while (true)
{
ACE_Reactor::instance()->handle_events();
}
}
// main.cpp
#pragma comment(lib, "ACED.lib")
#include "MyServer.h"
int main(int, char*[])
{
RunServer();
return 0;
}