@Chiang
2020-01-06T17:48:09.000000Z
字数 5283
阅读 556
Swoole
常驻进程
swoole 扩展了生命周期
并发1万个请求从MySQL读取海量数据仅需要0.2秒
$s = microtime(true);
Co\run(function() {
for ($c = 100; $c--;) {
go(function () {
$mysql = new Swoole\Coroutine\MySQL;
$mysql->connect([
'host' => '127.0.0.1',
'user' => 'root',
'password' => 'root',
'database' => 'test'
]);
$statement = $mysql->prepare('SELECT * FROM `user`');
for ($n = 100; $n--;) {
$result = $statement->execute();
assert(count($result) > 0);
}
});
}
});
echo 'use ' . (microtime(true) - $s) . ' s';
你可以在一个事件循环上创建多个服务:TCP,HTTP,Websocket和HTTP2,并且能轻松承载上万请求。
function tcp_pack(string $data): string
{
return pack('N', strlen($data)) . $data;
}
function tcp_unpack(string $data): string
{
return substr($data, 4, unpack('N', substr($data, 0, 4))[1]);
}
$tcp_options = [
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0,
'package_body_offset' => 4
];
$server = new Swoole\WebSocket\Server('127.0.0.1', 9501, SWOOLE_BASE);
$server->set(['open_http2_protocol' => true]);
// http && http2
$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
$response->end('Hello ' . $request->rawcontent());
});
// websocket
$server->on('message', function (Swoole\WebSocket\Server $server, Swoole\WebSocket\Frame $frame) {
$server->push($frame->fd, 'Hello ' . $frame->data);
});
// tcp
$tcp_server = $server->listen('127.0.0.1', 9502, SWOOLE_TCP);
$tcp_server->set($tcp_options);
$tcp_server->on('receive', function (Swoole\Server $server, int $fd, int $reactor_id, string $data) {
$server->send($fd, tcp_pack('Hello ' . tcp_unpack($data)));
});
$server->start();
不管是DNS查询抑或是发送请求和接收响应,都是协程调度的,不会产生任何阻塞。
go(function () {
// http
$http_client = new Swoole\Coroutine\Http\Client('127.0.0.1', 9501);
assert($http_client->post('/', 'Swoole Http'));
var_dump($http_client->body);
// websocket
$http_client->upgrade('/');
$http_client->push('Swoole Websocket');
var_dump($http_client->recv()->data);
});
go(function () {
// http2
$http2_client = new Swoole\Coroutine\Http2\Client('localhost', 9501);
$http2_client->connect();
$http2_request = new Swoole\Http2\Request;
$http2_request->method = 'POST';
$http2_request->data = 'Swoole Http2';
$http2_client->send($http2_request);
$http2_response = $http2_client->recv();
var_dump($http2_response->data);
});
go(function () use ($tcp_options) {
// tcp
$tcp_client = new Swoole\Coroutine\Client(SWOOLE_TCP);
$tcp_client->set($tcp_options);
$tcp_client->connect('127.0.0.1', 9502);
$tcp_client->send(tcp_pack('Swoole Tcp'));
var_dump(tcp_unpack($tcp_client->recv()));
});
通道(Channel)是协程之间通信交换数据的唯一渠道, 而协程+通道的开发组合即为著名的CSP编程模型。
在Swoole开发中,Channel常用于连接池的实现和协程并发的调度。
在以下示例中,我们并发了一千个redis请求,通常的情况下,这已经超过了Redis最大的连接数,将会抛出连接异常, 但基于Channel实现的连接池可以完美地调度请求,开发者就无需担心连接过载。
class RedisPool
{
/**@var \Swoole\Coroutine\Channel */
protected $pool;
/**
* RedisPool constructor.
* @param int $size max connections
*/
public function __construct(int $size = 100)
{
$this->pool = new \Swoole\Coroutine\Channel($size);
for ($i = 0; $i < $size; $i++) {
$redis = new \Swoole\Coroutine\Redis();
$res = $redis->connect('127.0.0.1', 6379);
if ($res == false) {
throw new \RuntimeException("failed to connect redis server.");
} else {
$this->put($redis);
}
}
}
public function get(): \Swoole\Coroutine\Redis
{
return $this->pool->pop();
}
public function put(\Swoole\Coroutine\Redis $redis)
{
$this->pool->push($redis);
}
public function close(): void
{
$this->pool->close();
$this->pool = null;
}
}
go(function () {
$pool = new RedisPool();
// max concurrency num is more than max connections
// but it's no problem, channel will help you with scheduling
for ($c = 0; $c < 1000; $c++) {
go(function () use ($pool, $c) {
for ($n = 0; $n < 100; $n++) {
$redis = $pool->get();
assert($redis->set("awesome-{$c}-{$n}", 'swoole'));
assert($redis->get("awesome-{$c}-{$n}") === 'swoole');
assert($redis->delete("awesome-{$c}-{$n}"));
$pool->put($redis);
}
});
}
});
Swoole的部分客户端实现了defer机制来进行并发,但你依然可以用协程和通道的组合来灵活地实现它。
go(function () {
// User: I need you to bring me some information back.
// Channel: OK! I will be responsible for scheduling.
$channel = new Swoole\Coroutine\Channel;
go(function () use ($channel) {
// Coroutine A: Ok! I will show you the github addr info
$addr_info = Co::getaddrinfo('github.com');
$channel->push(['A', json_encode($addr_info, JSON_PRETTY_PRINT)]);
});
go(function () use ($channel) {
// Coroutine B: Ok! I will show you what your code look like
$mirror = Co::readFile(__FILE__);
$channel->push(['B', $mirror]);
});
go(function () use ($channel) {
// Coroutine C: Ok! I will show you the date
$channel->push(['C', date(DATE_W3C)]);
});
for ($i = 3; $i--;) {
list($id, $data) = $channel->pop();
echo "From {$id}:\n {$data}\n";
}
// User: Amazing, I got every information at earliest time!
});
$id = Swoole\Timer::tick(100, function () {
echo "⚙ Do something...\n";
});
Swoole\Timer::after(500, function () use ($id) {
Swoole\Timer::clear($id);
echo "⏰ Done\n";
});
Swoole\Timer::after(1000, function () use ($id) {
if (!Swoole\Timer::exists($id)) {
echo "✅ All right!\n";
}
});
go(function () {
$i = 0;
while (true) {
Co::sleep(0.1);
echo "📝 Do something...\n";
if (++$i === 5) {
echo "🛎 Done\n";
break;
}
}
echo "🎉 All right!\n";
});
在最新版本的Swoole中,我们添加了一项新功能,使PHP原生的同步网络库一键化成为协程库。
只需在脚本顶部调用Swoole\Runtime::enableCoroutine()方法并使用php-redis,并发1万个请求从Redis读取数据仅需0.1秒!
Swoole\Runtime::enableCoroutine();
$s = microtime(true);
Co\run(function() {
for ($c = 100; $c--;) {
go(function () {
($redis = new Redis)->connect('127.0.0.1', 6379);
for ($n = 100; $n--;) {
assert($redis->get('awesome') === 'swoole');
}
});
}
});
echo 'use ' . (microtime(true) - $s) . ' s';
调用它之后,Swoole内核将替换ZendVM中的Stream函数指针,如果使用基于php_stream的扩展,则所有套接字操作都可以在运行时动态转换为协程调度的异步IO。