actor的調(diào)度由線程池來調(diào)度。actor是被調(diào)度對象,skynet把所有活躍的actor通過鏈表串聯(lián)起來,線程池從actor中取出相等數(shù)量的消息進行執(zhí)行,實現(xiàn)公平調(diào)度。
但是,actor消息隊列長度可能不一致,會出現(xiàn)部分actor "餓死"現(xiàn)象,skynet通過對線程池的工作線程賦予不同權(quán)重來規(guī)避這個問題。
二、調(diào)度流程源碼分析actor=隔離的運行環(huán)境+回調(diào)函數(shù)+消息隊列。
2.1、thread_worker()actor是由線程調(diào)度,所以從線程入口函數(shù)thread_worker開始。線程作為消費者,會不斷循環(huán)從消息隊列中取消息,如果沒有消息就進入等待(pthread_cond_wait)。skynet_context_message_dispatch()用來取出消息和消費消息。
/skynet-src/skynet_start.c
static void *
thread_worker(void *p) {struct worker_parm *wp = p;
int id = wp->id;
int weight = wp->weight;
struct monitor *m = wp->m;
struct skynet_monitor *sm = m->m[id];
skynet_initthread(THREAD_WORKER);
struct message_queue * q = NULL;
while (!m->quit) {q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL) {if (pthread_mutex_lock(&m->mutex) == 0) {++ m->sleep;
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex);
-- m->sleep;
if (pthread_mutex_unlock(&m->mutex)) {fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return NULL;
}
2.2、struct skynet_contextstruct skynet_context保存的是actor的上下文信息。
/skynet-src/skynet_server.c
struct skynet_context {void * instance;
struct skynet_module * mod;
void * cb_ud;
skynet_cb cb;
struct message_queue *queue;
ATOM_POINTER logfile;
uint64_t cpu_cost; // in microsec
uint64_t cpu_start; // in microsec
char result[32];
uint32_t handle;
int session_id;
ATOM_INT ref;
int message_count;
bool init;
bool endless;
bool profile;
CHECKCALLING_DECL
};
/skynet-src/skynet_mq.c
struct message_queue {struct spinlock lock;
uint32_t handle;
int cap;
int head;
int tail;
int release;
int in_global;
int overload;
int overload_threshold;
struct skynet_message *queue;
struct message_queue *next;
};
2.3、skynet_context_message_dispatch()/skynet-src/skynet_server.c
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {if (q == NULL) {q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {struct drop_t d = {handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}
int i,n=1;
struct skynet_message msg;
for (i=0;iif (skynet_mq_pop(q,&msg)) {skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {skynet_error(ctx, "May overload, message queue length = %d", overload);
}
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) {skynet_free(msg.data);
} else {dispatch_message(ctx, &msg);
}
skynet_monitor_trigger(sm, 0,0);
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);
return q;
}
2.4、dispatch_message()dispatch_message()本質(zhì)上調(diào)用回調(diào)函數(shù)來處理消息,消息內(nèi)容作為參數(shù);這里就是真正的運行actor了。
/skynet-src/skynet_server.c
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >>MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
if (f) {skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;
int reserve_msg;
if (ctx->profile) {ctx->cpu_start = skynet_thread_time();
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
三、c語言到lua的調(diào)用過程分析了解完調(diào)度流程,那么如果在c語言的callback函數(shù)跳到lua層執(zhí)行actor呢?
lua可以調(diào)用c語言,c語言需要導入一個方法給lua使用,skynet中l(wèi)callback()就是c語言導出給lua使用的方法。
在lcallback設(shè)置回調(diào)函數(shù):skynet_callback()。
/lualib-src/lua-skynet.c
static int
lcallback(lua_State *L) {struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int forward = lua_toboolean(L, 2);
luaL_checktype(L,1,LUA_TFUNCTION);
lua_settop(L,1);
struct callback_context *cb_ctx = (struct callback_context *)lua_newuserdata(L, sizeof(*cb_ctx));
cb_ctx->L = lua_newthread(L);
lua_pushcfunction(cb_ctx->L, traceback);
lua_setuservalue(L, -2);
lua_setfield(L, LUA_REGISTRYINDEX, "callback_context");
lua_xmove(L, cb_ctx->L, 1);
if (forward) {skynet_callback(context, cb_ctx, forward_cb);
} else {skynet_callback(context, cb_ctx, _cb);
}
return 0;
}
// ...
LUAMOD_API int
luaopen_skynet_core(lua_State *L) {luaL_checkversion(L);
luaL_Reg l[] = {{"send" , lsend },
{"genid", lgenid },
{"redirect", lredirect },
{"command" , lcommand },
{"intcommand", lintcommand },
{"addresscommand", laddresscommand },
{"error", lerror },
{"harbor", lharbor },
{"callback", lcallback },
{"trace", ltrace },
{NULL, NULL },
};
// functions without skynet_context
luaL_Reg l2[] = {{"tostring", ltostring },
{"pack", luaseri_pack },
{"unpack", luaseri_unpack },
{"packstring", lpackstring },
{"trash" , ltrash },
{"now", lnow },
{"hpc", lhpc }, // getHPCounter
{NULL, NULL },
};
lua_createtable(L, 0, sizeof(l)/sizeof(l[0]) + sizeof(l2)/sizeof(l2[0]) -2);
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
struct skynet_context *ctx = lua_touserdata(L,-1);
if (ctx == NULL) {return luaL_error(L, "Init skynet context first");
}
luaL_setfuncs(L,l,1);
luaL_setfuncs(L,l2,0);
return 1;
}
當執(zhí)行l(wèi)ua的skynet.start時會調(diào)用c.callback()設(shè)置回調(diào)函數(shù)skynet.dispatch_message,skynet.dispatch_message是一個lua方法;每次消息到來就會調(diào)用lua的skynet.dispatch_message,通過它分發(fā)消息。
/lualib/skynet.lua
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
init_thread = skynet.timeout(0, function()
skynet.init_service(start_func)
init_thread = nil
end)
end
總結(jié)
后言本專欄知識點是通過<零聲教育>的系統(tǒng)學習,進行梳理總結(jié)寫下文章,對c/c++linux系統(tǒng)提升感興趣的讀者,可以點擊鏈接查看詳細的服務(wù):C/C++服務(wù)器開發(fā) 。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
網(wǎng)頁名稱:skynet的actor對等調(diào)度分析-創(chuàng)新互聯(lián)
標題鏈接:http://jinyejixie.com/article10/csoigo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、網(wǎng)站導航、面包屑導航、品牌網(wǎng)站設(shè)計、關(guān)鍵詞優(yōu)化、手機網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容