«

如何在php使用swoole实现TCP服务

腾逍技术 发布于 阅读:464 PHP


今天主要为大家详细介绍php如何使用swoole实现TCP服务,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以参考一下
 

这里以在Yii框架下示例

一:swoole配置TCP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
'swoole' => [
    // 日志文件路径
    'log_file' => '@console/log/swoole.log',
    // 设置swoole_server错误日志打印的等级,范围是0-5。低于log_level设置的日志信息不会抛出
    'log_level' => 1,
    // 进程的PID存储文件
    'pid_file' => '@console/log/swoole.server.pid',
 
    // HTTP协议配置
    'http' => [
        'host' => '0.0.0.0',
        'port' => '8889',
 
        // 异步任务的工作进程数量
        'task_worker_num' => 4,
    ],
    // TCP协议配置
    'tcp' => [
        'host' => '0.0.0.0',
        'port' => '14000',
 
        // 异步任务的工作进程数量
        'task_worker_num' => 4,
 
        // 启用TCP-Keepalive死连接检测
        'open_tcp_keepalive' => 1,
        // 单位秒,连接在n秒内没有数据请求,将开始对此连接进行探测
        'tcp_keepidle' => 5 * 60,
        // 探测的次数,超过次数后将close此连接
        'tcp_keepcount' => 3,
        // 探测的间隔时间,单位秒
        'tcp_keepinterval' => 60,
 
        // 心跳检测,此选项表示每隔多久轮循一次,单位为秒
        'heartbeat_check_interval' => 2 * 60,
        // 心跳检测,连接最大允许空闲的时间
        'heartbeat_idle_time' => 5 * 60,
    ]
],

二:swoole实现TCP服务基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
<?php
/**
 * @link 
 * @copyright 
 * @license 
 */
 
namespace console\swoole;
 
use Yii;
use yii\helpers\Console;
use yii\helpers\ArrayHelper;
 
/*
 * Swoole Server基类
 *
 * @author wangjian
 * @since 0.1
 */
abstract class BaseServer
{
    /**
     * @var Swoole\Server
     */
    public $swoole;
    /**
     * @var boolean DEBUG
     */
    public $debug = false;
 
    /**
     * __construct
     */
    public function __construct($httpConfig, $tcpConfig, $config = [])
    {
        $httpHost = ArrayHelper::remove($httpConfig, 'host');
        $httpPort = ArrayHelper::remove($httpConfig, 'port');
        $this->swoole = new \swoole_http_server($httpHost, $httpPort);
        $this->swoole->set(ArrayHelper::merge($config, $httpConfig));
 
        $this->swoole->on('start', [$this, 'onStart']);
        $this->swoole->on('request', [$this, 'onRequest']);
        $this->swoole->on('WorkerStart', [$this, 'onWorkerStart']);
        $this->swoole->on('WorkerStop', [$this, 'onWorkerStop']);
        $this->swoole->on('task', [$this, 'onTask']);
        $this->swoole->on('finish', [$this, 'onTaskFinish']);
 
        $this->swoole->on('shutdown', [$this, 'onShutdown']);
 
        $tcpHost = ArrayHelper::remove($tcpConfig, 'host');
        $tcpPort = ArrayHelper::remove($tcpConfig, 'port');
        $tcpServer = $this->swoole->listen($tcpHost, $tcpPort, SWOOLE_SOCK_TCP);
        $tcpServer->set($tcpConfig);
        $tcpServer->on('connect', [$this, 'onConnect']);
        $tcpServer->on('receive', [$this, 'onReceive']);
        $tcpServer->on('close', [$this, 'onClose']);
    }
 
    /*
     * 启动server
     */
    public function run()
    {
        $this->swoole->start();
    }
 
    /**
     * Server启动在主进程的主线程时的回调事件处理
     *
     * @param swoole_server $server
     */
    public function onStart(\swoole_server $server)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Server Start**\n", Console::FG_GREEN);
        $this->stdout("master_pid: ");
        $this->stdout("{$server->master_pid}\n", Console::FG_BLUE);
 
        $this->onStartHandle($server);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * 客户端与服务器建立连接后的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     * @param integer $reactorId
     */
    abstract public function onConnect(\swoole_server $server, int $fd, int $reactorId);
 
    /**
     * 当服务器收到来自客户端的数据时的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     * @param integer $reactorId
     * @param string $data
     */
    abstract public function onReceive(\swoole_server $server, int $fd, int $reactorId, string $data);
 
    /**
     * 当服务器收到来自客户端的HTTP请求时的回调事件处理
     *
     * @param swoole_http_request $request
     * @param swoole_http_response $response
     */
    abstract public function onRequest(\swoole_http_request $request, \swoole_http_response $response);
 
    /**
     * Worker进程/Task进程启动时发生
     *
     * @param swoole_server $server
     * @param integer $worker_id
     */
    abstract public function onWorkerStart(\swoole_server $server, int $worker_id);
 
    /**
     * Worker进程/Task进程终止时发生
     *
     * @param swoole_server $server
     * @param integer $worker_id
     */
    abstract public function onWorkerStop(\swoole_server $server, int $worker_id);
 
    /**
     * 异步任务处理
     *
     * @param swoole_server $server
     * @param integer $taskId
     * @param integer $srcWorkerId
     * @param mixed $data
     */
    abstract public function onTask(\swoole_server $server, int $taskId, int $srcWorkerId, mixed $data);
 
    /**
     * 异步任务处理完成
     *
     * @param swoole_server $server
     * @param integer $taskId
     * @param mixed $data
     */
    abstract public function onTaskFinish(\swoole_server $server, int $taskId, mixed $data);
 
    /**
     * 客户端与服务器断开连接后的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     */
    abstract public function onClose(\swoole_server $server, $fd);
 
    /**
     * Server正常结束时的回调事件处理
     *
     * @param swoole_server $server
     */
    public function onShutdown(\swoole_server $server)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Server Stop**\n", Console::FG_GREEN);
        $this->stdout("master_pid: ");
        $this->stdout("{$server->master_pid}\n", Console::FG_BLUE);
 
        $this->onShutdownHandle($server);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * Server启动在主进程的主线程时的自定义事件处理
     *
     * @param swoole_server $server
     */
    protected function onStartHandle(\swoole_server $server)
    {
 
    }
 
    /**
     * Server正常结束时的自定义事件处理
     *
     * @param swoole_server $server
     */
    protected function onShutdownHandle(\swoole_server $server)
    {
 
    }
 
    /**
     * 获取请求路由
     *
     * @param swoole_http_request $request
     */
    protected function getRoute(\swoole_http_request $request)
    {
        return ltrim($request->server['request_uri'], '/');
    }
 
    /**
     * 获取请求的GET参数
     *
     * @param swoole_http_request $request
     */
    protected function getParams(\swoole_http_request $request)
    {
        return $request->get;
    }
 
    /**
     * 解析收到的数据
     *
     * @param string $data
     */
    protected function decodeData($data)
    {
        return json_decode($data, true);
    }
 
    /**
     * Before Exec
     */
    protected function beforeExec()
    {
        $startedAt = microtime(true);
        $this->stdout(date('Y-m-d H:i:s') . "\n", Console::FG_YELLOW);
        return $startedAt;
    }
 
    /**
     * After Exec
     */
    protected function afterExec($startedAt)
    {
        $duration = number_format(round(microtime(true) - $startedAt, 3), 3);
        $this->stdout("{$duration} s\n\n", Console::FG_YELLOW);
    }
 
    /**
     * Prints a string to STDOUT.
     */
    protected function stdout($string)
    {
        if (Console::streamSupportsAnsiColors(\STDOUT)) {
            $args = func_get_args();
            array_shift($args);
            $string = Console::ansiFormat($string, $args);
        }
        return Console::stdout($string);
    }
}

三:swoole操作类(继承swoole基类)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
<?php
/**
 * @link 
 * @copyright 
 * @license 
 */
 
namespace console\swoole;
 
use Yii;
use yii\db\Query;
use yii\helpers\Console;
use yii\helpers\VarDumper;
use apps\sqjc\models\WaterLevel;
use apps\sqjc\models\WaterLevelLog;
use common\models\Bayonet;
use common\models\Device;
use common\models\DeviceCategory;
 
/**
 * Swoole Server测试类
 *
 * @author wangjian
 * @since 1.0
 */
class Server extends BaseServer
{
    /**
     * @inheritdoc
     */
    public function onConnect($server, $fd, $reactorId)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Connection Open**\n", Console::FG_GREEN);
        $this->stdout("fd: ");
        $this->stdout("{$fd}\n", Console::FG_BLUE);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * @inheritdoc
     */
    public function onReceive($server, $fd, $reactorId, $data)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Received Message**\n", Console::FG_GREEN);
 
        $this->stdout("fd: ");
        $this->stdout("{$fd}\n", Console::FG_BLUE);
 
        $this->stdout("data: ");//接收的数据
        $this->stdout("{$data}\n", Console::FG_BLUE);
 
 
 
        $result = $server->send($fd, '回复消息');
 
 
 
        $this->afterExec($startedAt);
    }
 
 
    /**
     * @inheritdoc
     */
    public function onRequest($request, $response)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**HTTP Request**\n", Console::FG_GREEN);
 
        $this->stdout("fd: ");
        $this->stdout("{$request->fd}\n", Console::FG_BLUE);
 
        $response->status(200);
        $response->end('success');
 
        $this->afterExec($startedAt);
    }
 
    /**
     * @inheritdoc
     */
    public function onClose($server, $fd)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Connection Close**\n", Console::FG_GREEN);
 
        $this->stdout("fd: ");
        $this->stdout("{$fd}\n", Console::FG_BLUE);
 
        $this->afterExec($startedAt);
    }
 
 
 
    /**
     * @inheritdoc
     */
    public function onTask($server, $taskId, $srcWorkerId, $data)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("New AsyncTask: ");
        $this->stdout("{$taskId}\n", Console::FG_BLUE);
        $this->stdout("{$data}\n", Console::FG_BLUE);
 
        $server->finish($data);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * @inheritdoc
     */
    public function onWorkerStop($server, $worker_id)
    {
        // Yii::$app->db->close();
    }
    /**
     * @inheritdoc
     */
    public function onWorkerStart($server, $worker_id)
    {
        // Yii::$app->db->open();
    }
 
 
    /**
     * @inheritdoc
     */
    public function onTaskFinish($server, $taskId, $data)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("AsyncTask finished: ");
        $this->stdout("{$taskId}\n", Console::FG_BLUE);
 
        $this->afterExec($startedAt);
    }
}

四:操作TCP服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
<?php
/**
 * @link 
 * @copyright 
 * @license 
 */
 
namespace console\controllers;
 
use Yii;
use yii\helpers\Console;
use yii\helpers\FileHelper;
use yii\helpers\ArrayHelper;
use console\swoole\Server;
 
/**
 * WebSocket Server controller.
 *
 * @see https://github.com/tystudy/yii2-swoole-websocket/blob/master/README.md
 *
 * @author wangjian
 * @since 1.0
 */
class SwooleController extends Controller
{
    /**
     * @var string 监听IP
     */
    public $host;
    /**
     * @var string 监听端口
     */
    public $port;
    /**
     * @var boolean 是否以守护进程方式启动
     */
    public $daemon = false;
    /**
     * @var boolean 是否启动测试类
     */
    public $test = false;
 
    /**
     * @var array Swoole参数配置项
     */
    private $_params;
 
    /**
     * @var array Swoole参数配置项(HTTP协议)
     */
    private $_http_params;
    /**
     * @var array Swoole参数配置项(TCP协议)
     */
    private $_tcp_params;
 
    /**
     * @inheritdoc
     */
    public function beforeAction($action)
    {
        if (parent::beforeAction($action)) {
            //判断是否开启swoole拓展
            if (!extension_loaded('swoole')) {
                return false;
            }
 
            //获取swoole配置信息
            if (!isset(Yii::$app->params['swoole'])) {
                return false;
            }
            $this->_params = Yii::$app->params['swoole'];
            $this->_http_params = ArrayHelper::remove($this->_params, 'http');
            $this->_tcp_params = ArrayHelper::remove($this->_params, 'tcp');
 
            foreach ($this->_params as &$param) {
                if (strncmp($param, '@', 1) === 0) {
                    $param = Yii::getAlias($param);
                }
            }
 
            $this->_params = ArrayHelper::merge($this->_params, [
                'daemonize' => $this->daemon
            ]);
 
            return true;
        } else {
            return false;
        }
    }
    /**
     * 启动服务
     */
    public function actionStart()
    {
        if ($this->getPid() !== false) {
            $this->stdout("WebSocket Server is already started!\n", Console::FG_RED);
            return self::EXIT_CODE_NORMAL;
        }
        $server = new Server($this->_http_params, $this->_tcp_params, $this->_params);
        $server->run();
    }
 
    /**
     * 停止服务
     */
    public function actionStop()
    {
        $pid = $this->getPid();
        if ($pid === false) {
            $this->stdout("Tcp Server is already stoped!\n", Console::FG_RED);
            return self::EXIT_CODE_NORMAL;
        }
 
        \swoole_process::kill($pid);
    }
 
    /**
     * 清理日志文件
     */
    public function actionClearLog()
    {
        $logFile = Yii::getAlias($this->_params['log_file']);
        FileHelper::unlink($logFile);
    }
 
    /**
     * 获取进程PID
     *
     * @return false|integer PID
     */
    private function getPid()
    {
        $pidFile = $this->_params['pid_file'];
        if (!file_exists($pidFile)) {
            return false;
        }
 
        $pid = file_get_contents($pidFile);
        if (empty($pid)) {
            return false;
        }
 
        $pid = intval($pid);
        if (\swoole_process::kill($pid, 0)) {
            return $pid;
        } else {
            FileHelper::unlink($pidFile);
            return false;
        }
    }
 
    /**
     * @inheritdoc
     */
    public function options($actionID)
    {
        return ArrayHelper::merge(parent::options($actionID), [
            'daemon',
            'test'
        ]);
    }
 
    /**
     * @inheritdoc
     */
    public function optionAliases()
    {
        return ArrayHelper::merge(parent::optionAliases(), [
            'd' => 'daemon',
            't' => 'test',
        ]);
    }
}

以上就是php使用swoole实现TCP服务的详细内容!