使用线程池服务通道
import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer;//from 时 代 Java - nowjava.com import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; import java.util.List; public class Main { static ThreadPool pool = new ThreadPool(5); public static void main(String[] argv) throws Exception { System.out.println("Listening on port " + 5555); ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverChannel.socket(); Selector selector = Selector.open(); serverSocket.bind(new InetSocketAddress(5555)); serverChannel.configureBlocking(false); /* 来自 *n o w j a v a . c o m - 时 代 Java*/ serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int n = selector.select(); if (n == 0) { continue; // nothing to do } Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); registerChannel(selector, channel, SelectionKey.OP_READ); channel.write(ByteBuffer.wrap("Hi there!\r\n".getBytes())); } if (key.isReadable()) { readDataFromSocket(key); } it.remove(); } } } protected static void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception { if (channel == null) { return; } channel.configureBlocking(false); channel.register(selector, ops); } static void readDataFromSocket(SelectionKey key) throws Exception { WorkerThread worker = pool.getWorker(); if (worker == null) { return; } worker.serviceChannel(key); } } class ThreadPool { List idle = new LinkedList(); ThreadPool(int poolSize) { for (int i = 0; i < poolSize; i++) { WorkerThread thread = new WorkerThread(this); thread.setName("Worker" + (i + 1)); thread.start(); idle.add(thread); } } WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (idle.size() > 0) { worker = (WorkerThread) idle.remove(0); } } return (worker); } void returnWorker(WorkerThread worker) { synchronized (idle) { idle.add(worker); } } } class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; WorkerThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); this.interrupted(); } if (key == null) { continue; // just in case } System.out.println(this.getName() + " has been awakened"); try { drainChannel(key); } catch (Exception e) { System.out.println("Caught '" + e + "' closing channel"); try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; this.pool.returnWorker(this); } } synchronized