本文轉(zhuǎn)自互聯(lián)網(wǎng)
成都創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供玉屏網(wǎng)站建設、玉屏做網(wǎng)站、玉屏網(wǎng)站設計、玉屏網(wǎng)站制作等企業(yè)網(wǎng)站建設、網(wǎng)頁設計與制作、玉屏企業(yè)網(wǎng)站模板建站服務,10年玉屏做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡服務。
本系列文章將整理到我在GitHub上的《Java面試指南》倉庫,更多精彩內(nèi)容請到我的倉庫里查看
https://github.com/h3pl/Java-Tutorial
喜歡的話麻煩點下Star哈
文章將同步到我的個人博客:
www.how2playlife.com
本文是微信公眾號【Java技術江湖】的《不可輕視的Java網(wǎng)絡編程》其中一篇,本文部分內(nèi)容來源于網(wǎng)絡,為了把本文主題講得清晰透徹,也整合了很多我認為不錯的技術博客內(nèi)容,引用其中了一些比較好的博客文章,如有侵權,請聯(lián)系作者。
該系列博文會告訴你如何從計算機網(wǎng)絡的基礎知識入手,一步步地學習Java網(wǎng)絡基礎,從socket到nio、bio、aio和netty等網(wǎng)絡編程知識,并且進行實戰(zhàn),網(wǎng)絡編程是每一個Java后端工程師必須要學習和理解的知識點,進一步來說,你還需要掌握Linux中的網(wǎng)絡編程原理,包括IO模型、網(wǎng)絡編程框架netty的進階原理,才能更完整地了解整個Java網(wǎng)絡編程的知識體系,形成自己的知識框架。
為了更好地總結(jié)和檢驗你的學習成果,本系列文章也會提供部分知識點對應的面試題以及參考答案。
如果對本系列文章有什么建議,或者是有什么疑問的話,也可以關注公眾號【Java技術江湖】聯(lián)系作者,歡迎你參與本系列博文的創(chuàng)作和修訂。
Selector是NIO中實現(xiàn)I/O多路復用的關鍵類。Selector實現(xiàn)了通過一個線程管理多個Channel,從而管理多個網(wǎng)絡連接的目的。
Channel代表這一個網(wǎng)絡連接通道,我們可以將Channel注冊到Selector中以實現(xiàn)Selector對其的管理。一個Channel可以注冊到多個不同的Selector中。
當Channel注冊到Selector后會返回一個SelectionKey對象,該SelectionKey對象則代表這這個Channel和它注冊的Selector間的關系。并且SelectionKey中維護著兩個很重要的屬性:interestOps、readyOps
interestOps是我們希望Selector監(jiān)聽Channel的哪些事件。
我們將我們感興趣的事件設置到該字段,這樣在selection操作時,當發(fā)現(xiàn)該Channel有我們所感興趣的事件發(fā)生時,就會將我們感興趣的事件再設置到readyOps中,這樣我們就能得知是哪些事件發(fā)生了以做相應處理。
Selector中維護3個特別重要的SelectionKey集合,分別是
下面的源碼解析會說明上面3個集合的用處
下面我們通過一段對Selector的使用流程講解來進一步深入其實現(xiàn)原理。
首先先來段Selector最簡單的使用片段
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
int port = 5566;
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if(n > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
......
iter.remove();
}
}
}
SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現(xiàn)。
ServerSocketChannel.open();
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
SocketChannel.open();
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
Selector.open();
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
我們來進一步的了解下SelectorProvider.provider()
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
① 如果配置了“java.nio.channels.spi.SelectorProvider”屬性,則通過該屬性值load對應的SelectorProvider對象,如果構(gòu)建失敗則拋異常。
② 如果provider類已經(jīng)安裝在了對系統(tǒng)類加載程序可見的jar包中,并且該jar包的源碼目錄META-INF/services包含有一個java.nio.channels.spi.SelectorProvider提供類配置文件,則取文件中第一個類名進行l(wèi)oad以構(gòu)建對應的SelectorProvider對象,如果構(gòu)建失敗則拋異常。
③ 如果上面兩種情況都不存在,則返回系統(tǒng)默認的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
④ 隨后在調(diào)用該方法,即SelectorProvider.provider()。則返回第一次調(diào)用的結(jié)果。
不同系統(tǒng)對應著不同的sun.nio.ch.DefaultSelectorProvider
這里我們看linux下面的sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.EPollSelectorProvider();
}
}
可以看見,linux系統(tǒng)下sun.nio.ch.DefaultSelectorProvider.create(); 會生成一個sun.nio.ch.EPollSelectorProvider類型的SelectorProvider,這里對應于linux系統(tǒng)的epoll
/**
* Opens a selector.
*
* <p> The new selector is created by invoking the {@link
* java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
* of the system-wide default {@link
* java.nio.channels.spi.SelectorProvider} object. </p>
*
* @return A new selector
*
* @throws IOException
* If an I/O error occurs
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
在得到sun.nio.ch.EPollSelectorProvider后調(diào)用openSelector()方法構(gòu)建Selector,這里會構(gòu)建一個EPollSelectorImpl對象。
class EPollSelectorImpl
extends SelectorImpl
{
// File descriptors used for interrupt
protected int fd0;
protected int fd1;
// The poll object
EPollArrayWrapper pollWrapper;
// Maps from file descriptors to keys
private Map<Integer,SelectionKeyImpl> fdToKey;
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
} catch (Throwable t) {
try {
FileDispatcherImpl.closeIntFD(fd0);
} catch (IOException ioe0) {
t.addSuppressed(ioe0);
}
try {
FileDispatcherImpl.closeIntFD(fd1);
} catch (IOException ioe1) {
t.addSuppressed(ioe1);
}
throw t;
}
}
EPollSelectorImpl構(gòu)造函數(shù)完成:
① EPollArrayWrapper的構(gòu)建,EpollArrayWapper將Linux的epoll相關系統(tǒng)調(diào)用封裝成了native方法供EpollSelectorImpl使用。
② 通過EPollArrayWrapper向epoll注冊中斷事件
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
③ fdToKey:構(gòu)建文件描述符-SelectionKeyImpl映射表,所有注冊到selector的channel對應的SelectionKey和與之對應的文件描述符都會放入到該映射表中。
EPollArrayWrapper完成了對epoll文件描述符的構(gòu)建,以及對linux系統(tǒng)的epoll指令操縱的封裝。維護每次selection操作的結(jié)果,即epoll_wait結(jié)果的epoll_event數(shù)組。
EPollArrayWrapper操縱了一個linux系統(tǒng)下epoll_event結(jié)構(gòu)的本地數(shù)組。
* typedef union epoll_data {
* void *ptr;
* int fd;
* __uint32_t u32;
* __uint64_t u64;
* } epoll_data_t;
*
* struct epoll_event {
* __uint32_t events;
* epoll_data_t data;
* };
epoll_event的數(shù)據(jù)成員(epoll_data_t data)包含有與通過epoll_ctl將文件描述符注冊到epoll時設置的數(shù)據(jù)相同的數(shù)據(jù)。這里data.fd為我們注冊的文件描述符。這樣我們在處理事件的時候持有有效的文件描述符了。
EPollArrayWrapper將Linux的epoll相關系統(tǒng)調(diào)用封裝成了native方法供EpollSelectorImpl使用。
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;
上述三個native方法就對應Linux下epoll相關的三個系統(tǒng)調(diào)用
// The fd of the epoll driver
private final int epfd;
// The epoll_event array for results from epoll_wait
private final AllocatedNativeObject pollArray;
// Base address of the epoll_event array
private final long pollArrayAddress;
// 用于存儲已經(jīng)注冊的文件描述符和其注冊等待改變的事件的關聯(lián)關系。在epoll_wait操作就是要檢測這里文件描述法注冊的事件是否有發(fā)生。
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
private final Map<Integer,Byte> eventsHigh = new HashMap<>();
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
}
EPoolArrayWrapper構(gòu)造函數(shù),創(chuàng)建了epoll文件描述符。構(gòu)建了一個用于存放epoll_wait返回結(jié)果的epoll_event數(shù)組。
ServerSocketChannel.open();
返回ServerSocketChannelImpl對象,構(gòu)建linux系統(tǒng)下ServerSocket的文件描述符。
// Our file descriptor
private final FileDescriptor fd;
// fd value needed for dev/poll. This value will remain valid
// even after the value in the file descriptor object has been set to -1
private int fdVal;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
① 構(gòu)建代表channel和selector間關系的SelectionKey對象
② implRegister(k)將channel注冊到epoll中
③ k.interestOps(int) 完成下面兩個操作:
a) 會將注冊的感興趣的事件和其對應的文件描述存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中,這是給底層實現(xiàn)epoll_wait時使用的。
b) 同時該操作還會將設置SelectionKey的interestOps字段,這是給我們程序員獲取使用的。
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
① 將channel對應的fd(文件描述符)和對應的SelectionKeyImpl放到fdToKey映射表中。
② 將channel對應的fd(文件描述符)添加到EPollArrayWrapper中,并強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在于之前被取消過的注冊中。)
③ 將selectionKey放入到keys集合中。
selection操作有3中類型:
① select():該方法會一直阻塞直到至少一個channel被選擇(即,該channel注冊的事件發(fā)生了)為止,除非當前線程發(fā)生中斷或者selector的wakeup方法被調(diào)用。
② select(long time):該方法和select()類似,該方法也會導致阻塞直到至少一個channel被選擇(即,該channel注冊的事件發(fā)生了)為止,除非下面3種情況任意一種發(fā)生:a) 設置的超時時間到達;b) 當前線程發(fā)生中斷;c) selector的wakeup方法被調(diào)用
③ selectNow():該方法不會發(fā)生阻塞,如果沒有一個channel被選擇也會立即返回。
我們主要來看看select()的實現(xiàn) :int n = selector.select();
public int select() throws IOException {
return select(0);
}
最終會調(diào)用到EPollSelectorImpl的doSelect
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
① 先處理注銷的selectionKey隊列
② 進行底層的epoll_wait操作
③ 再次對注銷的selectionKey隊列進行處理
④ 更新被選擇的selectionKey
先來看processDeregisterQueue():
void processDeregisterQueue() throws IOException {
Set var1 = this.cancelledKeys();
synchronized(var1) {
if (!var1.isEmpty()) {
Iterator var3 = var1.iterator();
while(var3.hasNext()) {
SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();
try {
this.implDereg(var4);
} catch (SocketException var12) {
IOException var6 = new IOException("Error deregistering key");
var6.initCause(var12);
throw var6;
} finally {
var3.remove();
}
}
}
}
}
從cancelledKeys集合中依次取出注銷的SelectionKey,執(zhí)行注銷操作,將處理后的SelectionKey從cancelledKeys集合中移除。執(zhí)行processDeregisterQueue()后cancelledKeys集合會為空。
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(Integer.valueOf(fd));
pollWrapper.remove(fd);
ski.setIndex(-1);
keys.remove(ski);
selectedKeys.remove(ski);
deregister((AbstractSelectionKey)ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
注銷會完成下面的操作:
① 將已經(jīng)注銷的selectionKey從fdToKey( 文件描述與SelectionKeyImpl的映射表 )中移除
② 將selectionKey所代表的channel的文件描述符從EPollArrayWrapper中移除
③ 將selectionKey從keys集合中移除,這樣下次selector.select()就不會再將該selectionKey注冊到epoll中監(jiān)聽
④ 也會將selectionKey從對應的channel中注銷
⑤ 最后如果對應的channel已經(jīng)關閉并且沒有注冊其他的selector了,則將該channel關閉
完成????的操作后,注銷的SelectionKey就不會出現(xiàn)先在keys、selectedKeys以及cancelKeys這3個集合中的任何一個。
接著我們來看EPollArrayWrapper.poll(timeout):
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
updateRegistrations()方法會將已經(jīng)注冊到該selector的事件(eventsLow或eventsHigh)通過調(diào)用epollCtl(epfd, opcode, fd, events); 注冊到linux系統(tǒng)中。
這里epollWait就會調(diào)用linux底層的epoll_wait方法,并返回在epoll_wait期間有事件觸發(fā)的entry的個數(shù)
再看updateSelectedKeys():
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
該方法會從通過EPollArrayWrapper pollWrapper 以及 fdToKey( 構(gòu)建文件描述符-SelectorKeyImpl映射表 )來獲取有事件觸發(fā)的SelectionKeyImpl對象,然后將SelectionKeyImpl放到selectedKey集合( 有事件觸發(fā)的selectionKey集合,可以通過selector.selectedKeys()方法獲得 )中,即selectedKeys。并重新設置SelectionKeyImpl中相關的readyOps值。
但是,這里要注意兩點:
① 如果SelectionKeyImpl已經(jīng)存在于selectedKeys集合中,并且發(fā)現(xiàn)觸發(fā)的事件已經(jīng)存在于readyOps中了,則不會使numKeysUpdated++;這樣會使得我們無法得知該事件的變化。
????這點說明了為什么我們要在每次從selectedKey中獲取到Selectionkey后,將其從selectedKey集合移除,就是為了當有事件觸發(fā)使selectionKey能正確到放入selectedKey集合中,并正確的通知給調(diào)用者。
再者,如果不將已經(jīng)處理的SelectionKey從selectedKey集合中移除,那么下次有新事件到來時,在遍歷selectedKey集合時又會遍歷到這個SelectionKey,這個時候就很可能出錯了。比如,如果沒有在處理完OP_ACCEPT事件后將對應SelectionKey從selectedKey集合移除,那么下次遍歷selectedKey集合時,處理到到該SelectionKey,相應的ServerSocketChannel.accept()將返回一個空(null)的SocketChannel。
② 如果發(fā)現(xiàn)channel所發(fā)生I/O事件不是當前SelectionKey所感興趣,則不會將SelectionKeyImpl放入selectedKeys集合中,也不會使numKeysUpdated++
select,poll,epoll都是IO多路復用的機制。I/O多路復用就是通過一種機制,一個進程可以監(jiān)視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。但select,poll,epoll本質(zhì)上都是同步I/O,因為他們都需要在讀寫事件就緒后自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需自己負責進行讀寫,異步I/O的實現(xiàn)會負責把數(shù)據(jù)從內(nèi)核拷貝到用戶空間。
epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數(shù)以百萬計的socket句柄。
在 select/poll中,進程只有在調(diào)用一定的方法后,內(nèi)核才對所有監(jiān)視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一 個文件描述符,一旦基于某個文件描述符就緒時,內(nèi)核會采用類似callback的回調(diào)機制,迅速激活這個文件描述符,當進程調(diào)用epoll_wait() 時便得到通知。(此處去掉了遍歷文件描述符,而是通過監(jiān)聽回調(diào)的的機制。這正是epoll的魅力所在。)
如果沒有大量的idle -connection或者dead-connection,epoll的效率并不會比select/poll高很多,但是當遇到大量的idle- connection,就會發(fā)現(xiàn)epoll的效率大大高于select/poll。
注意:linux下Selector底層是通過epoll來實現(xiàn)的,當創(chuàng)建好epoll句柄后,它就會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調(diào)用close()關閉,否則可能導致fd被耗盡。
先看看使用c封裝的3個epoll系統(tǒng)調(diào)用:
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
epoll_wait在調(diào)用時,在給定的timeout時間內(nèi),所監(jiān)控的句柄中有事件發(fā)生時,就返回用戶態(tài)的進程。
大概看看epoll內(nèi)部是怎么實現(xiàn)的:
當epoll_wait調(diào)用時,僅僅觀察就緒鏈表里有沒有數(shù)據(jù),如果有數(shù)據(jù)就返回,否則就sleep,超時時立刻返回。
epoll的兩種工作模式:
socket讀數(shù)據(jù)
socket寫數(shù)據(jù)
最后順便說下在Linux系統(tǒng)中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。
因為本人對計算機系統(tǒng)組成以及C語言等知識比較欠缺,因為文中相關知識點的表示也相當“膚淺”,如有不對不妥的地方望讀者指出。同時我也會繼續(xù)加強對該方面知識點的學習~
http://www.jianshu.com/p/0d497fe5484a
http://remcarpediem.com/2017/04/02/Netty源碼-三-I-O模型和Java-NIO底層原理/
圣思園netty課程
新聞名稱:Java網(wǎng)絡編程和NIO詳解7:淺談Linux中NIOSelector的實現(xiàn)原理
鏈接地址:http://jinyejixie.com/article42/ppedec.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設計公司、微信公眾號、商城網(wǎng)站、外貿(mào)建站、網(wǎng)站維護、做網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)