集册 Java实例教程 使用线程池为通道提供服务

使用线程池为通道提供服务

欢马劈雪     最近更新时间:2020-01-02 10:19:05

560
使用线程池服务通道
  

import java.io.IOException;

import java.net.InetSocketAddress;

import java.net.ServerSocket;

import java.nio.ByteBuffer;//from 时   代    Java - nowjava.com

import java.nio.channels.SelectableChannel;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.LinkedList;

import java.util.List;


public class Main {

  static ThreadPool pool = new ThreadPool(5);

  public static void main(String[] argv) throws Exception {

    System.out.println("Listening on port " + 5555);

    ServerSocketChannel serverChannel = ServerSocketChannel.open();

    ServerSocket serverSocket = serverChannel.socket();

    Selector selector = Selector.open();

    serverSocket.bind(new InetSocketAddress(5555));

    serverChannel.configureBlocking(false);
    /* 
     来自 
    *n o w j a   v  a . c o m - 时  代  Java*/

    serverChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {

      int n = selector.select();

      if (n == 0) {

        continue; // nothing to do

      }

      Iterator it = selector.selectedKeys().iterator();

      while (it.hasNext()) {

        SelectionKey key = (SelectionKey) it.next();

        if (key.isAcceptable()) {

          ServerSocketChannel server = (ServerSocketChannel) key.channel();

          SocketChannel channel = server.accept();

          registerChannel(selector, channel, SelectionKey.OP_READ);

          channel.write(ByteBuffer.wrap("Hi there!\r\n".getBytes()));

        }

        if (key.isReadable()) {

          readDataFromSocket(key);

        }

        it.remove();

      }

    }

  }

  protected static void registerChannel(Selector selector,

      SelectableChannel channel, int ops) throws Exception {

    if (channel == null) {

      return;

    }

    channel.configureBlocking(false);

    channel.register(selector, ops);

  }


  static void readDataFromSocket(SelectionKey key) throws Exception {

    WorkerThread worker = pool.getWorker();

    if (worker == null) {

      return;

    }

    worker.serviceChannel(key);

  }

}


class ThreadPool {

  List idle = new LinkedList();

  ThreadPool(int poolSize) {

    for (int i = 0; i < poolSize; i++) {

      WorkerThread thread = new WorkerThread(this);

      thread.setName("Worker" + (i + 1));

      thread.start();

      idle.add(thread);

    }

  }

  WorkerThread getWorker() {

    WorkerThread worker = null;

    synchronized (idle) {

      if (idle.size() > 0) {

        worker = (WorkerThread) idle.remove(0);

      }

    }

    return (worker);

  }

  void returnWorker(WorkerThread worker) {

    synchronized (idle) {

      idle.add(worker);

    }

  }

}


class WorkerThread extends Thread {

  private ByteBuffer buffer = ByteBuffer.allocate(1024);

  private ThreadPool pool;

  private SelectionKey key;


  WorkerThread(ThreadPool pool) {

    this.pool = pool;

  }

  public synchronized void run() {

    System.out.println(this.getName() + " is ready");


    while (true) {

      try {

        this.wait();

      } catch (InterruptedException e) {

        e.printStackTrace();

        this.interrupted();

      }

      if (key == null) {

        continue; // just in case

      }

      System.out.println(this.getName() + " has been awakened");


      try {

        drainChannel(key);

      } catch (Exception e) {

        System.out.println("Caught '" + e + "' closing channel");

        try {

          key.channel().close();

        } catch (IOException ex) {

          ex.printStackTrace();

        }

        key.selector().wakeup();

      }

      key = null;

      this.pool.returnWorker(this);

    }

  }


  synchronized 
展开阅读全文