接前文
http://blog.itpub.net/29254281/viewspace-1344706/
http://blog.itpub.net/29254281/viewspace-1347985/
http://blog.itpub.net/29254281/viewspace-2134876/
之前的代码被大神怼了..被怒批 杂乱无章,误人子弟
我估计主要是因为面向对象的程度不够.过程化太明显了
在网上找了一个Reactor模式的例子,又改了改自己的程序.
因为Oracle太笨重了,这回干脆换了MySQL好了
改写之后的程序,在我的电脑上,使用 2线程,最大500连接的配置,性能最好。
	- 
		import java.io.IOException;  
	
- 
		import java.net.InetSocketAddress;  
	
- 
		import java.net.SocketAddress;  
	
- 
		import java.nio.ByteBuffer;  
	
- 
		import java.nio.channels.SelectionKey;  
	
- 
		import java.nio.channels.Selector;  
	
- 
		import java.nio.channels.SocketChannel;  
	
- 
		import java.nio.charset.Charset;  
	
- 
		import java.sql.Connection;  
	
- 
		import java.sql.DriverManager;  
	
- 
		import java.sql.PreparedStatement;  
	
- 
		import java.sql.SQLException;  
	
- 
		import java.sql.Timestamp;  
	
- 
		import java.util.ArrayList;  
	
- 
		import java.util.HashSet;  
	
- 
		import java.util.Iterator;  
	
- 
		import java.util.List;  
	
- 
		import java.util.Set;  
	
- 
		import java.util.concurrent.BlockingQueue;  
	
- 
		import java.util.concurrent.LinkedBlockingQueue;  
	
- 
		import java.util.concurrent.atomic.AtomicInteger;  
	
- 
		import java.util.regex.Matcher;  
	
- 
		import java.util.regex.Pattern;  
	
- 
		  
	
- 
		class Reactor implements Runnable {  
	
- 
		    public static int GETCOUNT() {  
	
- 
		        return COUNT.get();  
	
- 
		  
	
- 
		    }  
	
- 
		  
	
- 
		    public static int getQueueSize() {  
	
- 
		        return QUEUE.size();  
	
- 
		    }  
	
- 
		  
	
- 
		    private static final AtomicInteger COUNT = new AtomicInteger();  
	
- 
		    private static final AtomicInteger TASKCOUNT = new AtomicInteger();  
	
- 
		  
	
- 
		    public int startTask() {  
	
- 
		        return TASKCOUNT.incrementAndGet();  
	
- 
		    }  
	
- 
		  
	
- 
		    public int finishTask() {  
	
- 
		        return TASKCOUNT.decrementAndGet();  
	
- 
		    }  
	
- 
		  
	
- 
		    public int incrementAndGet() {  
	
- 
		        return COUNT.incrementAndGet();  
	
- 
		    }  
	
- 
		  
	
- 
		    public final Selector selector;  
	
- 
		    private static BlockingQueue QUEUE = new LinkedBlockingQueue();  
	
- 
		  
	
- 
		    public void addTask(Task task) {  
	
- 
		        try {  
	
- 
		            QUEUE.put(task);  
	
- 
		        } catch (InterruptedException e) {  
	
- 
		            e.printStackTrace();  
	
- 
		        }  
	
- 
		    }  
	
- 
		  
	
- 
		    public Reactor() throws IOException {  
	
- 
		        selector = Selector.open();  
	
- 
		    }  
	
- 
		  
	
- 
		    @Override  
	
- 
		    public void run() {  
	
- 
		        try {  
	
- 
		            while (!Thread.interrupted()) {  
	
- 
		                int maxClient = 500;  
	
- 
		                Task task = null;  
	
- 
		                if (TASKCOUNT.get() < maxClient) {  
	
- 
		                    while ((task = (Task) QUEUE.poll()) != null) {  
	
- 
		                        new Connector(this, task).run();  
	
- 
		                        if (TASKCOUNT.get() > maxClient) {  
	
- 
		                            break;  
	
- 
		                        }  
	
- 
		                    }  
	
- 
		                }  
	
- 
		  
	
- 
		                selector.select();  
	
- 
		  
	
- 
		                Set selectionKeys = selector.selectedKeys();  
	
- 
		                Iterator it = selectionKeys.iterator();  
	
- 
		                // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。  
	
- 
		                while (it.hasNext()) {  
	
- 
		                    // 来一个事件 第一次触发一个accepter线程  
	
- 
		                    // 以后触发SocketReadHandler  
	
- 
		                    SelectionKey selectionKey = it.next();  
	
- 
		                    dispatch(selectionKey);  
	
- 
		                }  
	
- 
		                selectionKeys.clear();  
	
- 
		            }  
	
- 
		        } catch (IOException e) {  
	
- 
		            e.printStackTrace();  
	
- 
		        }  
	
- 
		    }  
	
- 
		  
	
- 
		    /** 
	
- 
		     * 运行Acceptor或SocketReadHandler 
	
- 
		     *  
	
- 
		     * @param key 
	
- 
		     */  
	
- 
		    void dispatch(SelectionKey key) {  
	
- 
		        Runnable r = (Runnable) (key.attachment());  
	
- 
		        if (r != null) {  
	
- 
		            r.run();  
	
- 
		        }  
	
- 
		    }  
	
- 
		  
	
- 
		}  
	
- 
		  
	
- 
		class Connector implements Runnable {  
	
- 
		    private Reactor reactor;  
	
- 
		    private Task task;  
	
- 
		  
	
- 
		    public Connector(Reactor reactor, Task task) {  
	
- 
		        this.reactor = reactor;  
	
- 
		        this.task = task;  
	
- 
		    }  
	
- 
		  
	
- 
		    @Override  
	
- 
		    public void run() {  
	
- 
		        try {  
	
- 
		            reactor.startTask();  
	
- 
		            task.setStarttime(System.currentTimeMillis());  
	
- 
		            SocketAddress addr = new InetSocketAddress(task.getHost(), 80);  
	
- 
		            SocketChannel socketChannel = SocketChannel.open();  
	
- 
		            socketChannel.configureBlocking(false);  
	
- 
		            socketChannel.connect(addr);  
	
- 
		  
	
- 
		            BaseHandler base = new BaseHandler();  
	
- 
		            base.setTask(task);  
	
- 
		            base.setSelector(reactor.selector);  
	
- 
		            base.setSocketChannel(socketChannel);  
	
- 
		            base.setReactor(reactor);  
	
- 
		            if (socketChannel != null)// 调用Handler来处理channel  
	
- 
		                socketChannel.register(reactor.selector, SelectionKey.OP_CONNECT, new SocketWriteHandler(base));  
	
- 
		        } catch (IOException e) {  
	
- 
		            e.printStackTrace();  
	
- 
		        }  
	
- 
		    }  
	
- 
		}  
	
- 
		  
	
- 
		class BaseHandler {  
	
- 
		    private Selector selector;  
	
- 
		    private SocketChannel socketChannel;  
	
- 
		    private Task task;  
	
- 
		    private ByteBuffer byteBuffer = ByteBuffer.allocate(2400);  
	
- 
		    private Reactor reactor;  
	
- 
		  
	
- 
		    public Reactor getReactor() {  
	
- 
		        return reactor;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setReactor(Reactor reactor) {  
	
- 
		        this.reactor = reactor;  
	
- 
		    }  
	
- 
		  
	
- 
		    public Selector getSelector() {  
	
- 
		        return selector;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setSelector(Selector selector) {  
	
- 
		        this.selector = selector;  
	
- 
		    }  
	
- 
		  
	
- 
		    public SocketChannel getSocketChannel() {  
	
- 
		        return socketChannel;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setSocketChannel(SocketChannel socketChannel) {  
	
- 
		        this.socketChannel = socketChannel;  
	
- 
		    }  
	
- 
		  
	
- 
		    public Task getTask() {  
	
- 
		        return task;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setTask(Task task) {  
	
- 
		        this.task = task;  
	
- 
		    }  
	
- 
		  
	
- 
		    public ByteBuffer getByteBuffer() {  
	
- 
		        return byteBuffer;  
	
- 
		    }  
	
- 
		}  
	
- 
		  
	
- 
		class SocketWriteHandler implements Runnable {  
	
- 
		    BaseHandler baseHandler;  
	
- 
		  
	
- 
		    public SocketWriteHandler(BaseHandler baseHandler) {  
	
- 
		        this.baseHandler = baseHandler;  
	
- 
		        ByteBuffer byteBuffer = baseHandler.getByteBuffer();  
	
- 
		        Task task = baseHandler.getTask();  
	
- 
		        try {  
	
- 
		            byteBuffer.put(("GET " + task.getCurrentPath() + " HTTP/1.0\r\n").getBytes("utf8"));  
	
- 
		            byteBuffer.put(("HOST:" + task.getHost() + "\r\n").getBytes("utf8"));  
	
- 
		            byteBuffer.put(("Accept:*/*\r\n").getBytes("utf8"));  
	
- 
		            byteBuffer.put(("\r\n").getBytes("utf8"));  
	
- 
		            byteBuffer.flip();  
	
- 
		        } catch (IOException e) {  
	
- 
		            e.printStackTrace();  
	
- 
		        }  
	
- 
		    }  
	
- 
		  
	
- 
		    @Override  
	
- 
		    public void run() {  
	
- 
		        try {  
	
- 
		            while (!baseHandler.getSocketChannel().finishConnect()) {  
	
- 
		                System.out.println("Waiting Connected");  
	
- 
		            }  
	
- 
		  
	
- 
		            baseHandler.getSocketChannel().write(baseHandler.getByteBuffer());  
	
- 
		  
	
- 
		            if (baseHandler.getByteBuffer().hasRemaining()) {  
	
- 
		                baseHandler.getByteBuffer().compact();  
	
- 
		                baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_WRITE, this);  
	
- 
		                System.out.println("Continue Write");  
	
- 
		            } else {  
	
- 
		                baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_READ,  
	
- 
		                        new SocketReadHandler(baseHandler));  
	
- 
		                baseHandler.getByteBuffer().clear();  
	
- 
		            }  
	
- 
		        } catch (IOException e) {  
	
- 
		            e.printStackTrace();  
	
- 
		        }  
	
- 
		    }  
	
- 
		  
	
- 
		}  
	
- 
		  
	
- 
		class SocketReadHandler implements Runnable {  
	
- 
		    Charset charset = Charset.forName("utf8");  
	
- 
		    Charset gbkcharset = Charset.forName("gbk");  
	
- 
		    BaseHandler baseHandler;  
	
- 
		  
	
- 
		    public SocketReadHandler(BaseHandler baseHandler) {  
	
- 
		        this.baseHandler = baseHandler;  
	
- 
		    }  
	
- 
		  
	
- 
		    @Override  
	
- 
		    public void run() {  
	
- 
		        try {  
	
- 
		            SocketChannel channel = baseHandler.getSocketChannel();  
	
- 
		            ByteBuffer byteBuffer = baseHandler.getByteBuffer();  
	
- 
		            Task task = baseHandler.getTask();  
	
- 
		  
	
- 
		            int length;  
	
- 
		            while ((length = channel.read(byteBuffer)) > 0) {  
	
- 
		                byteBuffer.flip();  
	
- 
		                task.getContent().append(charset.decode(charset.encode(gbkcharset.decode(byteBuffer))).toString());  
	
- 
		  
	
- 
		                byteBuffer.compact();  
	
- 
		            }  
	
- 
		            if (length == -1) {  
	
- 
		                channel.close();  
	
- 
		                task.setEndtime(System.currentTimeMillis());  
	
- 
		                baseHandler.getReactor().incrementAndGet();  
	
- 
		                baseHandler.getReactor().finishTask();  
	
- 
		                new ParseHandler(task, baseHandler.getReactor()).run();  
	
- 
		  
	
- 
		            } else {  
	
- 
		                baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_READ, this);  
	
- 
		            }  
	
- 
		        } catch (IOException e) {  
	
- 
		            e.printStackTrace();  
	
- 
		        }  
	
- 
		  
	
- 
		    }  
	
- 
		  
	
- 
		}  
	
- 
		  
	
- 
		public class Probe {  
	
- 
		    public static void main(String[] args) throws IOException, InterruptedException {  
	
- 
		        for (int i = 0; i <2; i++) {  
	
- 
		            Reactor reactor = new Reactor();  
	
- 
		            reactor.addTask(new Task("news.163.com", 80, "/index.html"));  
	
- 
		            new Thread(reactor, "ReactorThread_" + i).start();  
	
- 
		        }  
	
- 
		        long start = System.currentTimeMillis();  
	
- 
		        while (true) {  
	
- 
		            Thread.sleep(1000);  
	
- 
		            long end = System.currentTimeMillis();  
	
- 
		            float interval = ((end - start) / 1000);  
	
- 
		            int connectTotal = Reactor.GETCOUNT();  
	
- 
		  
	
- 
		            int persistenceTotal = PersistenceHandler.GETCOUNT();  
	
- 
		  
	
- 
		            int connectps = Math.round(connectTotal / interval);  
	
- 
		            int persistenceps = Math.round(persistenceTotal / interval);  
	
- 
		            System.out.print("\r连接总数:" + connectTotal + " \t每秒连接:" + connectps + "\t连接队列剩余:" + Reactor.getQueueSize()  
	
- 
		                    + " \t持久化总数:" + persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化队列剩余:"  
	
- 
		                    + PersistenceHandler.getInstance().getSize());  
	
- 
		        }  
	
- 
		    }  
	
- 
		  
	
- 
		}  
	
- 
		  
	
- 
		class Task {  
	
- 
		    private String host;  
	
- 
		    private int port;  
	
- 
		    private String currentPath;  
	
- 
		    private long starttime;  
	
- 
		    private long endtime;  
	
- 
		  
	
- 
		    private String type;  
	
- 
		    private StringBuilder content = new StringBuilder(2400);  
	
- 
		    private int state;  
	
- 
		    private boolean isValid = true;  
	
- 
		  
	
- 
		    public Task() {  
	
- 
		    }  
	
- 
		  
	
- 
		    public Task(String host, int port, String path) {  
	
- 
		        init(host, port, path);  
	
- 
		    }  
	
- 
		  
	
- 
		    public void init(String host, int port, String path) {  
	
- 
		        this.setCurrentPath(path);  
	
- 
		        this.host = host;  
	
- 
		        this.port = port;  
	
- 
		    }  
	
- 
		  
	
- 
		    public long getStarttime() {  
	
- 
		        return starttime;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setStarttime(long starttime) {  
	
- 
		        this.starttime = starttime;  
	
- 
		    }  
	
- 
		  
	
- 
		    public long getEndtime() {  
	
- 
		        return endtime;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setEndtime(long endtime) {  
	
- 
		        this.endtime = endtime;  
	
- 
		    }  
	
- 
		  
	
- 
		    public boolean isValid() {  
	
- 
		        return isValid;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setValid(boolean isValid) {  
	
- 
		        this.isValid = isValid;  
	
- 
		    }  
	
- 
		  
	
- 
		    public int getState() {  
	
- 
		        return state;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setState(int state) {  
	
- 
		        this.state = state;  
	
- 
		    }  
	
- 
		  
	
- 
		    public String getCurrentPath() {  
	
- 
		        return currentPath;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setCurrentPath(String currentPath) {  
	
- 
		        this.currentPath = currentPath;  
	
- 
		        int i = 0;  
	
- 
		        if (currentPath.indexOf("?") != -1) {  
	
- 
		            i = currentPath.indexOf("?");  
	
- 
		        } else {  
	
- 
		            if (currentPath.indexOf("#") != -1) {  
	
- 
		                i = currentPath.indexOf("#");  
	
- 
		            } else {  
	
- 
		                i = currentPath.length();  
	
- 
		            }  
	
- 
		        }  
	
- 
		        this.type = currentPath.substring(currentPath.indexOf(".") + 1, i);  
	
- 
		    }  
	
- 
		  
	
- 
		    public long getTaskTime() {  
	
- 
		        return getEndtime() - getStarttime();  
	
- 
		    }  
	
- 
		  
	
- 
		    public String getType() {  
	
- 
		        return type;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setType(String type) {  
	
- 
		        this.type = type;  
	
- 
		    }  
	
- 
		  
	
- 
		    public String getHost() {  
	
- 
		        return host;  
	
- 
		    }  
	
- 
		  
	
- 
		    public int getPort() {  
	
- 
		        return port;  
	
- 
		    }  
	
- 
		  
	
- 
		    public StringBuilder getContent() {  
	
- 
		        return content;  
	
- 
		    }  
	
- 
		  
	
- 
		    public void setContent(StringBuilder content) {  
	
- 
		        this.content = content;  
	
- 
		    }  
	
- 
		  
	
- 
		}  
	
- 
		  
	
- 
		class ParseHandler implements Runnable {  
	
- 
		    private static final Set SET = new HashSet();  
	
- 
		  
	
- 
		    PersistenceHandler persistencehandler = PersistenceHandler.getInstance();  
	
- 
		  
	
- 
		    List domainlist = new ArrayList();  
	
- 
		  
	
- 
		    Task task;  
	
- 
		  
	
- 
		    private interface Filter {  
	
- 
		        void doFilter(Task fatherTask, Task newTask, String path, Filter chain);  
	
- 
		    }  
	
- 
		  
	
- 
		    private class FilterChain implements Filter {  
	
- 
		        private List list = new ArrayList();  
	
- 
		  
	
- 
		        {  
	
- 
		            addFilter(new TwoLevel());  
	
- 
		            addFilter(new OneLevel());  
	
- 
		            addFilter(new FullPath());  
	
- 
		            addFilter(new Root());  
	
- 
		            addFilter(new Default());  
	
- 
		        }  
	
- 
		  
	
- 
		        private void addFilter(Filter filter) {  
	
- 
		            list.add(filter);  
	
- 
		        }  
	
- 
		  
	
- 
		        private Iterator it = list.iterator();  
	
- 
		  
	
- 
		        @Override  
	
- 
		        public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
	
- 
		            if (it.hasNext()) {  
	
- 
		                ((Filter) it.next()).doFilter(fatherTask, newTask, path, chain);  
	
- 
		            }  
	
- 
		        }  
	
- 
		  
	
- 
		    }  
	
- 
		  
	
- 
		    private class TwoLevel implements Filter {  
	
- 
		  
	
- 
		        @Override  
	
- 
		        public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
	
- 
		            if (path.startsWith("../../")) {  
	
- 
		                String prefix = getPrefix(fatherTask.getCurrentPath(), 3);  
	
- 
		                newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../../", prefix));  
	
- 
		            } else {  
	
- 
		                chain.doFilter(fatherTask, newTask, path, chain);  
	
- 
		            }  
	
- 
		  
	
- 
		        }  
	
- 
		    }  
	
- 
		  
	
- 
		    private class OneLevel implements Filter {  
	
- 
		  
	
- 
		        @Override  
	
- 
		        public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
	
- 
		            if (path.startsWith("../")) {  
	
- 
		                String prefix = getPrefix(fatherTask.getCurrentPath(), 2);  
	
- 
		                newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../", prefix));  
	
- 
		            } else {  
	
- 
		                chain.doFilter(fatherTask, newTask, path, chain);  
	
- 
		            }  
	
- 
		  
	
- 
		        }  
	
- 
		  
	
- 
		    }  
	
- 
		  
	
- 
		    private class FullPath implements Filter {  
	
- 
		  
	
- 
		        @Override  
	
- 
		        public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
	
- 
		            if (path.startsWith("http://")) {  
	
- 
		                Iterator it = domainlist.iterator();  
	
- 
		                boolean flag = false;  
	
- 
		                while (it.hasNext()) {  
	
- 
		                                
            
                        
 新闻标题:网页主动探测工具-使用Reactor模式
 文章网址:http://cdysf.com/article/ipsjcp.html