成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

swoole實現(xiàn)實時推送的方法

這篇文章主要介紹swoole實現(xiàn)實時推送的方法,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

創(chuàng)新互聯(lián)建站服務項目包括豐順網(wǎng)站建設、豐順網(wǎng)站制作、豐順網(wǎng)頁制作以及豐順網(wǎng)絡營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,豐順網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務的客戶以成都為中心已經(jīng)輻射到豐順省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!

swoole+redis實現(xiàn)實時數(shù)據(jù)推送         

<?php
/**
 * ***************************************
 *            單進程保護                 *
 * ***************************************
 */
$phpSelf 			= realpath($_SERVER['PHP_SELF']);
$lockFile			= $phpSelf.'.lock';
$lockFileHandle 	= fopen($lockFile, "w");
if ($lockFileHandle == false) {
	exit("Can not create lock file $lockFile\n");
}
if (!flock($lockFileHandle, LOCK_EX + LOCK_NB)) {
	exit(date("Y-m-d H:i:s")."Process already exist.\n");
}
 
/**
 * ***************************************
 *     進入程序,定義相關(guān)配置            *
 * ***************************************
 */
set_time_limit(0);
//socket會話的超時時間,根據(jù)業(yè)務場景設置,這里設置為永不超時
//如果設置了時間,則從socket建立=>傳輸=>關(guān)閉整個過程必須在定義的時間內(nèi)完成,否則自動close該socket并拋出warning
ini_set('default_socket_timeout', -1);
$conf = array(
	'listen'  => array('host' => '0.0.0.0','port' => '8008'),
	'setting' => array(
		//程序允許的最大連接數(shù),用以設置server最大允許維持多少個TCP連接,超過該數(shù)量后,新連接將被拒絕,默認為ulimit -n的值,如果設置大于ulimit -n則強制重置為ulimit- n,如果確實需要設置超過ulimit -n的值,請修改系統(tǒng)值 vim /etc/security/limits.conf 修改nofile的值
		"max_conn"			=> 1024,
		//啟用CPU親和設置(在全異步非阻塞是可啟用),在多核的服務器中,啟用此特性會將swoole的reactor線程/worker進程綁定到固定的一個核上??梢员苊膺M程/線程的運行時在多個核之間互相切換,提高CPU Cache的命中率,如何確定綁定在了哪個核上,請參考文檔, 查看命令: taskset -p 進程id
		'open_cpu_affinity'	=> 0,
		//配置task進程數(shù)量,配置此參數(shù)后將會啟用task功能。所以Server務必要注冊onTask、onFinish3個事件回調(diào)函數(shù)。如果沒有注冊,服務器程序?qū)o法啟動.Task進程是同步阻塞的,配置方式與Worker同步模式一致。
		'task_worker_num'	=> 20,
		//設置task進程的最大任務數(shù)。一個task進程在處理完超過此數(shù)值的任務后將自動退出。這個參數(shù)是為了防止PHP進程內(nèi)存溢出。如果不希望進程自動退出可以設置為0, 默認是0
		'task_max_request'	=> 1024, 
		//設置task的數(shù)據(jù)臨時目錄,在swoole_server中,如果投遞的數(shù)據(jù)超過8192字節(jié),將啟用臨時文件來保存數(shù)據(jù)。這里的task_tmpdir就是用來設置臨時文件保存的位置。
		'task_tmpdir'		=> '/tmp/',
		//worker進程數(shù)量,根據(jù)業(yè)務代碼的模式作調(diào)整,全異步非阻塞可設置為CPU核數(shù)的1-4倍;同步阻塞,請參考文檔調(diào)整
		'worker_num'		=> 8,
		//指定swoole錯誤日志文件
		'log_file' 			=> '/tmp/log/log.txt',
		//SSL公鑰和私鑰的位置,啟用wss必須在編譯swoole時加入--enable-openssl選項
		'ssl_cert_file'		=> '/usr/local/nginx/conf/server.cer',
		'ssl_key_file'		=> '/usr/local/nginx/conf/server.key',
	),
);
 
/**
 * ***************************************
 *       初始化Redis連接                 *
 * ***************************************
 */
$redis = null;
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
$redis->auth(REDIS_PWD);
$GLOBALS['redis']=$redis;
 
/**
 * ***************************************
 *        腳本重啟時,清除歷史的數(shù)據(jù)     *
 * ***************************************
 */
$sArr = $redis->sMembers(REDIS_S_KEY);
if (!empty($sArr)) {
	foreach ((array)$sArr as $key => $sc) {
		$fdArr = $redis->sMembers(REDIS_S_FD.$sc);
		foreach ((array)$fdArr as $k => $fd) {
			$res1 = $redis->del(REDIS_FD_S.$fd);
		}
		$res2 = $redis->del(REDIS_S_FD.$sc);
	}
	$redis->del(REDIS_S_KEY);
}
$redis->del(REDIS_ZS_KEY);
 
/**
 * ***************************************
 *           綁定回調(diào)事件                *
 * ***************************************
 */
$ws = null;
//wss服務
$ws = new swoole_websocket_server($conf['listen']['host'], $conf['listen']['port'], SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$ws->set($conf['setting']);
 
/**
 * Server啟動在主進程的主線程回調(diào)此函數(shù)
 * 在此事件之前Swoole Server已進行了如下操作
 * 已創(chuàng)建了manager進程
 * 已創(chuàng)建了worker子進程
 * 已監(jiān)聽所有TCP/UDP端口
 * 已監(jiān)聽了定時器
 * 在onStart中創(chuàng)建的全局資源對象不能在worker進程中被使用,因為發(fā)生onStart調(diào)用時,worker進程已經(jīng)創(chuàng)建好了。新創(chuàng)建的對象在主進程內(nèi),worker進程無法訪問到此內(nèi)存區(qū)域。因此全局對象創(chuàng)建的代碼需要放置在swoole_server_start之前
 */
$ws->on('start', function ($ws) {
	swoole_set_process_name(PROCESS_NAME.'_master');
});
 
/**
 * 與onStart回調(diào)在不同進程中并行執(zhí)行的回調(diào)函數(shù)(不存在先后順序)
 * @param: $ws swoole_websocket_server object
 * @param: $wid 創(chuàng)建該進程時swoole分配的id(不是進程id)
 * 注意點:
 * 1. 此事件在worker進程/task進程啟動時發(fā)生。onWorkerStart/onStart是并發(fā)執(zhí)行的,沒有先后順序,這里創(chuàng)建的對象可以在進程生命周期內(nèi)使用
 * 2. swoole1.6.11之后task_worker中也會觸發(fā)onWorkerStart,故而在下面的處理中,加入了判斷業(yè)務類型$jobType是task還是work,如果是task則命名為****_Tasker_$id,如果是worker則命名為****_Worker_$id
 * 3. 發(fā)生PHP致命錯誤或者代碼中主動調(diào)用exit時,Worker/Task進程會退出,管理進程會重新創(chuàng)建新的進程
 * 5. 如果想使用swoole_server_reload實現(xiàn)代碼重載入,必須在workerStart中require你的業(yè)務文件,而不是在文件頭部。在onWorkerStart調(diào)用之前已包含的文件,不會重新載入代碼。
 * 6. 可以將公用的,不易變的php文件放置到onWorkerStart之前(例如上面的redis配置)。這樣雖然不能重載入代碼,但所有worker是共享的,不需要額外的內(nèi)存來保存這些數(shù)據(jù)。
 * 7. onWorkerStart之后的代碼每個worker都需要在內(nèi)存中保存一份
 */
$ws->on('workerstart', function ($ws, $wid) {
	$jobType = $ws->taskworker ? 'Tasker' : 'Worker';
	swoole_set_process_name(PROCESS_NAME.'_'.$jobType.'_'.$wid);
	$GLOBALS['ws'] = $ws; //保存server對象到全局中以待使用
	if ($jobType == 'Worker') { //在某個worker進程上綁定redis訂閱進程
		if ($wid === 0) {
            $dataRedis = null;
            $dataRedis = new Redis();
            $dataRedis->connect(REDIS_HOST_DATA, REDIS_PORT_DATA);
            $dataRedis->auth(REDIS_PWD_DATA);
            //使用psubscribe訂閱指定模式的頻道,這里*表示所有頻道
            //請注意,redis訂閱不提供區(qū)分庫(db)的功能,所以多個庫都同時在發(fā)布同一個名字的頻道時,都將被訂閱到
			$dataRedis->psubscribe(array("*"), "sendTask");
		}
	}
});
 
/**
 * 管理進程啟用時,調(diào)用該回調(diào)函數(shù)
 * 注意manager進程中不能添加定時器
 * manager進程中可以調(diào)用sendMessage接口向其他工作進程發(fā)送消息
 */
$ws->on('managerstart', function ($ws) {
	swoole_set_process_name(PROCESS_NAME.'_manage');
});
 
/**
 * swoole websocket服務特有的回調(diào)函數(shù),此函數(shù)在websocket服務器中必須定義實現(xiàn),否則websocket服務將無法啟動
 * 當服務器收到來自客戶端的數(shù)據(jù)幀時會回調(diào)此函數(shù)
 * @param: $ws為swoole_websocket_server對象,其結(jié)構(gòu)在調(diào)試時可var_dump查看
 * @param: $frame為swoole_websocket_frame對象,包含了客戶端發(fā)來的數(shù)據(jù)幀信息,包含以下四個屬性:
 * @param: $frame->fd: 客戶端的socket id,每個id對應一個客戶端,推送消息的時候需要指定
 * @param: $frame->data: 數(shù)據(jù)內(nèi)容,可以是文本內(nèi)容或者是二進制數(shù)據(jù)(圖片等),可以通過opcode的值來判斷。$data 如果是文本類型,編碼格式必然是UTF-8,這是WebSocket協(xié)議規(guī)定的
 * @param: $frame->opcode: WebSocket的OpCode類型,可以參考WebSocket協(xié)議標準文檔, WEBSOCKET_OPCODE_TEXT = 0x1 ,文本數(shù)據(jù); WEBSOCKET_OPCODE_BINARY = 0x2 ,二進制數(shù)據(jù)
 * @param: $frame->finish: 表示數(shù)據(jù)幀是否完整,一個WebSocket請求可能會分成多個數(shù)據(jù)幀進行發(fā)送
 * 注意點: 客戶端發(fā)送的ping幀不會觸發(fā)onMessage,底層會自動回復pong包
 */
$ws->on('message', function ($ws, $frame) {
    echo "Server has receive message\n";
    //接收到客戶端請求,并建立連接之后,進行相應業(yè)務的處理
    handleClientData($ws, $frame);
});
 
/**
 * 在task_worker進程內(nèi)被調(diào)用。worker進程可以使用swoole_server_task函數(shù)向task_worker進程投遞新的任務(此處使用的是taskwait)
 * 當前的Task進程在調(diào)用onTask回調(diào)函數(shù)時會將進程狀態(tài)切換為忙碌,這時將不再接收新的Task,當onTask函數(shù)返回時會將進程狀態(tài)切換為空閑然后繼續(xù)接收新的Task。
 * @param: $ws swoole_websocket_server object
 * @param: $tid task process id
 * @param: $wid from id 表示來自哪個Worker進程。$task_id和$wid組合起來才是全局唯一的,不同的worker進程投遞的任務ID可能會有相同
 * @param: $data 需要執(zhí)行的任務內(nèi)容
 * 注意點: onTask函數(shù)執(zhí)行時遇到致命錯誤退出,或者被外部進程強制kill,當前的任務會被丟棄,但不會影響其他正在排隊的Task
 */
$ws->on('task', function ($ws, $tid, $wid, $data) {
	switch ($data['cmd']) {
		case 'pushToClient': $ret = pushToClientTask($ws, $data['key'], $data['val']); break;
	}
	//1.7.2以上的版本,在onTask函數(shù)中 return字符串,表示將此內(nèi)容返回給worker進程。worker進程中會觸發(fā)onFinish函數(shù),表示投遞的task已完成。return的變量可以是任意非null的PHP變量
	return $returnContent;
	//1.7.2以前的版本,需要調(diào)用swoole_server->finish()函數(shù)將結(jié)果返回給worker進程
	// $ws->finish($data);
});
 
/**
 * 當worker進程投遞的任務在task_worker中完成時,task進程會通過$ws->finish()方法將任務處理的結(jié)果發(fā)送給worker進程。
 * @param: $ws swoole_websocket_server object
 * @param: $tid task_id
 * @param: $data 任務處理后的結(jié)果內(nèi)容
 * 注意點: task進程的onTask事件中沒有調(diào)用finish方法或者return結(jié)果,worker進程不會觸發(fā)onFinish
 *        執(zhí)行onFinish邏輯的worker進程與下發(fā)task任務的worker進程是同一個進程
 */
$ws->on('finish', function($ws, $tid, $data) {
 
});
 
/**
 * TCP客戶端連接關(guān)閉后,在worker進程中回調(diào)此函數(shù)
 * 在函數(shù)中可以做一些類似于刪除業(yè)務中與每個客戶端交互時存放的數(shù)據(jù)的操作
 * @param: $ws swoole_websocket_server object
 * @param: $fd 已關(guān)閉的fd interger
 * @param: $rid(可選),來自哪個reactor線程
 * 注意點: 
 * 1. onClose回調(diào)函數(shù)如果發(fā)生了致命錯誤,會導致連接泄漏。通過netstat命令會看到大量CLOSE_WAIT狀態(tài)的TCP連接
 * 2. 查看命令netstat -anopc | grep 端口號,可以查看到TCP接收和發(fā)送隊列是否有堆積以及TCP連接的狀態(tài)
 * 3. 無論由客戶端發(fā)起close還是服務器端主動調(diào)用$serv->close()關(guān)閉連接,都會觸發(fā)此事件。因此只要連接關(guān)閉,就一定會回調(diào)此函數(shù)
 * 4. 1.7.7+版本以后onClose中依然可以調(diào)用connection_info方法獲取到連接信息,在onClose回調(diào)函數(shù)執(zhí)行完畢后才會調(diào)用close關(guān)閉TCP連接
 * 5. 這里回調(diào)onClose時表示客戶端連接已經(jīng)關(guān)閉,所以無需執(zhí)行$server->close($fd)。代碼中執(zhí)行$serv->close($fd)會拋出PHP錯誤告警。也就是在onclose中不能再$ws->close()了.
 * 6. swoole-1.9.7版本修改了$reactorId參數(shù),當服務器主動關(guān)閉連接時,底層會設置此參數(shù)為-1,可以通過判斷$reactorId < 0來分辨關(guān)閉是由服務器端還是客戶端發(fā)起的(debug時可以使用)
 */
$ws->on('close', function ($ws, $fd) {
	$redis = new Redis();
	$redis->connect(REDIS_HOST, REDIS_PORT);
	$redis->auth(REDIS_PWD);
	$sArr = $redis->sMembers(REDIS_FD_S.$fd);
	if (!empty($sArr)) {
		foreach ((array)$sArr as $key => $sc) {
			$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
			$num = $redis->sCard(REDIS_S_FD.$sc);
			if ($num == '0') {
				$redis->sRem(REDIS_S_KEY, $sc);
				$redis->hDel(REDIS_ZS_KEY, $sc);
			}
		}
	}
	$redis->del(REDIS_FD_S.$fd);
	$redis->close();
	echo "FD $fd has closed.\n";
});
 
/**
 * 開啟swoole_websocket_server服務
 */
$ws->start();
 
 
/**
 * 接受到消息以后進行響應異步任務的執(zhí)行
 * @param: $ws swoole_websocket_sever object
 * @param: $frame swoole_websocket_frame obejct
 */
function handleClientData($ws, $frame) {
	$data = $frame->data;
	$redis = new Redis();
	$redis->connect(REDIS_HOST, REDIS_PORT);
	$redis->auth(REDIS_PWD);
	$isMembers = $redis->sIsmember(REDIS_S_FD.$sc, $frame->fd);
	if (!$isMembers) {
		$res = $redis->sAdd(REDIS_S_FD.$sc, $frame->fd);
	}
	$redis->sAdd(REDIS_FD_S.$frame->fd, $sc);
	$isMembers = $redis->sIsmember(REDIS_S_KEY, $sc);
	if (!$isMembers) { 
		$redis->sAdd(REDIS_S_KEY, $sc);
	}
}
 
 
/**
 * redis訂閱后的回調(diào)函數(shù)
 * @param: $ins instance實例
 * @param: $pattern 匹配模式
 * @param: $channel 頻道名
 * @param: $data 數(shù)據(jù)
 * 注意點: subscribe和psubscribe兩種不同的訂閱方式的回調(diào)函數(shù)的參數(shù)個數(shù)不一樣,后者多了$pattern參數(shù)
 */
function sendTask($ins, $pattern, $channel, $data) {
	//滿足一些條件后,投遞到task進程中進行推送
	$taskData = array(
		'cmd' => 'pushToClient',
		'key' => $sc,
		'val' => $data,
	);
	//請注意,taskwait是同步阻塞的,所以改腳本并不是全異步非阻塞的
	$GLOBALS['ws']->taskwait($taskData);
}
 
/**
 * 推送消息到指定的客戶端
 * @param: $ws swoole_websocket_server object
 * @param: $sc 股票代碼
 * @param: $data 要推送的數(shù)據(jù)
 */
function pushToClientTask($ws, $sc, $data) {
    $redis = new Redis();
    $redis->connect(REDIS_HOST, REDIS_PORT);
    $redis->auth(REDIS_PWD);
	$fdList = $redis->sMembers(REDIS_S_FD.$sArr[4]);
	if (!empty($fdList)) {
		foreach ((array)$fdList as $fd) {
			$res = $GLOBALS['ws']->push($fd, $data);
			echo "FD: $fd push $res.\n";
			if (!$res) { //推送失敗,即客戶端已經(jīng)斷開連接
				//從該fd訂閱的所有股票中刪除該fd
				$sArrOfFd = $redis->sMembers(REDIS_FD_S.$fd);
				if (!empty($sArrOfFd)) {
					foreach ((array)$sArrOfFd as $key => $sc) {
						$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
						$num = $redis->sCard(REDIS_S_FD.$sc);
						if ($num == '0') {
							$redis->sRem(REDIS_S_KEY, $sc);
							$redis->hDel(REDIS_ZS_KEY, $sc);
						}
					}
				}
				$redis->del(REDIS_FD_S.$fd);
			}
		}
	}
    $redis->close();
}

以上是“swoole實現(xiàn)實時推送的方法”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

文章標題:swoole實現(xiàn)實時推送的方法
標題URL:http://jinyejixie.com/article2/gpgoic.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設計全網(wǎng)營銷推廣、軟件開發(fā)、企業(yè)網(wǎng)站制作、商城網(wǎng)站網(wǎng)站導航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司
盐津县| 木兰县| 县级市| 花莲县| 镇赉县| 三穗县| 易门县| 连云港市| 祁门县| 博乐市| 安龙县| 闵行区| 两当县| 诏安县| 龙陵县| 花莲县| 闻喜县| 汝城县| 思南县| 高邑县| 江源县| 民县| 云龙县| 中宁县| 曲沃县| 太仓市| 沂水县| 仙游县| 措美县| 县级市| 惠东县| 海阳市| 本溪市| 昌黎县| 马尔康县| 大关县| 芦溪县| 邯郸市| 汕头市| 齐河县| 临泽县|