码迷,mamicode.com
首页 > 其他好文 > 详细

GuozhongCrawler实现的基于redis的队列

时间:2015-06-18 19:57:43      阅读:73      评论:0      收藏:0      [点我收藏+]

标签:guozhongcrawler 爬虫   分布式爬虫实现   分布式爬虫队列   

             GuozhongCrawler的分布式爬虫还在开发当中。作者首先爆出GuozhongCrawler实现的基于redis的队列,提供大家写其他分布式爬虫的参考。


package com.guozhong.queue;

import com.guozhong.request.BasicRequest;

/**
* 线程安全的可阻塞式队列接口
* @author 郭钟
* @QQ群 202568714
*
*/
public interface BlockingRequestQueue {

/**
* 检索并移除此队列的头,如果此队列为空,则返回 null。
* @return
*/
public BasicRequest poll();

/**
* 向队列中添加指定的元素。
* @param e
* @return
*/
public boolean add(BasicRequest e);

/**
* 检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待。
* @return
* @throws InterruptedException
*/
public BasicRequest take() throws InterruptedException ;

/**
* 检索,但是不移除此队列的头,如果此队列为空,则返回 null。
*/
public BasicRequest peek();

/**
* 检索,但是不移除此队列的头。 此方法与 peek 方法的惟一不同是,如果此队列为空,它会抛出一个异常。
*/
public BasicRequest element();

/**
* 从此队列移除指定元素的单个实例(如果存在)。
* @return
*/
public boolean remove(BasicRequest e);

/**
* 检索并移除此队列的头。此方法与 poll 方法的不同在于,如果此队列为空,它会抛出一个异常。 抛出: NoSuchElementException
* - 如果此队列为空。
*/
public BasicRequest remove();

/**
* 返回队列中的元素个数。
*/
public int size();

/**
* 返回队列中的是否为空
*/
public boolean isEmpty();

/**
* 清空队列所有元素
*/
public void clear();

}




实现类如下:


package com.guozhong.queue;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import javax.sound.midi.VoiceStatus;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import com.guozhong.component.BinaryProcessor;
import com.guozhong.component.PageProcessor;
import com.guozhong.component.PageScript;
import com.guozhong.model.Proccessable;
import com.guozhong.page.OkPage;
import com.guozhong.page.Page;
import com.guozhong.request.BasicRequest;
import com.guozhong.request.BinaryRequest;
import com.guozhong.request.PageRequest;
import com.guozhong.request.StartContext;

/**
 * 优先级队列
 *
 * @author Administrator
 *
 */
public final class RedisRequestBlockingQueue implements BlockingRequestQueue,Serializable {

    /**
     *
     */
    private static final long serialVersionUID = 1L;

    private JedisPool pool = null;

    private byte[] queue;
    
    
    /**
     * 给定JedisPoolConfig初始化一个队列
     * @param host
     * @param port
     * @param config
     * @param queue
     */
    public RedisRequestBlockingQueue(String host, int port, JedisPoolConfig config, String queue) {
        pool = new JedisPool(config, host, port, 15000);
        try {
            this.queue = queue.getBytes("utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }


    /**
     * 基于默认配置初始化一个队列
     * @param host
     * @param port
     * @param queue
     */
    public RedisRequestBlockingQueue(String host, int port, String queue) {
        JedisPoolConfig config = new JedisPoolConfig();

        // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
        config.setBlockWhenExhausted(true);

        // 设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
        config.setEvictionPolicyClassName("org.apache.commons.pool2.impl.DefaultEvictionPolicy");

        // 是否启用pool的jmx管理功能, 默认true
        config.setJmxEnabled(true);

        // MBean ObjectName = new
        // ObjectName("org.apache.commons.pool2:type=GenericObjectPool,name=" +
        // "pool" + i); 默 认为"pool", JMX不熟,具体不知道是干啥的...默认就好.
        config.setJmxNamePrefix("pool");

        // 是否启用后进先出, 默认true
        config.setLifo(true);

        // 最大空闲连接数, 默认8个
        config.setMaxIdle(100);

        // 最大连接数, 默认8个
        config.setMaxTotal(300);

        // 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,
        // 默认-1
        config.setMaxWaitMillis(10000);

        // 逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
        config.setMinEvictableIdleTimeMillis(1800000);

        // 最小空闲连接数, 默认0
        config.setMinIdle(20);

        // 每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
        config.setNumTestsPerEvictionRun(3);

        // 对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数
        // 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
        config.setSoftMinEvictableIdleTimeMillis(1800000);

        // 在获取连接的时候检查有效性, 默认false
        config.setTestOnBorrow(true);

        // 在空闲时检查有效性, 默认false
        config.setTestWhileIdle(true);

        // 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
        config.setTimeBetweenEvictionRunsMillis(20);

        pool = new JedisPool(config, host, port, 15000);
        try {
            this.queue = queue.getBytes("utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public BasicRequest remove() {
        Long leng = new OperationJedis<Long>() {
            
            @Override
            protected Long operation(Jedis jedis) throws Exception {
                return jedis.llen(queue);
            }
        }.exe();
        if (leng == 0) {
            throw new NoSuchElementException("队列长度为0");
        } else {
            return poll();
        }
    }

    @Override
    public BasicRequest poll() {
        BasicRequest basicRequest = null;

        byte[] data = new OperationJedis<byte[]>() {

            @Override
            protected byte[] operation(Jedis jedis) throws Exception {
                return jedis.rpop(queue);
            }
        }.exe();

        basicRequest = byteToObject(data);

        return basicRequest;
    }

    @Override
    public BasicRequest element() {
        BasicRequest basicRequest = null;

        byte[] data = new OperationJedis<byte[]>() {

            @Override
            protected byte[] operation(Jedis jedis) throws Exception {
                byte[] data = jedis.rpop(queue);
                if (data != null) {
                    jedis.rpush(queue, data);
                }
                return data;
            }
        }.exe();

        if (data == null) {
            throw new NoSuchElementException("队列长度为0");
        }

        basicRequest = byteToObject(data);
        return basicRequest;
    }

    
    @Override
    public BasicRequest peek() {
        BasicRequest basicRequest = null;

        byte[] data = new OperationJedis<byte[]>() {

            @Override
            protected byte[] operation(Jedis jedis) throws Exception {
                byte[] data = jedis.rpop(queue);
                if (data != null) {
                    jedis.rpush(queue, data);
                }
                return data;
            }
        }.exe();

        basicRequest = byteToObject(data);
        return basicRequest;
    }

    /**
     * 返回队列中的元素个数。
     */
    @Override
    public int size() {
        long size = new OperationJedis<Long>() {

            @Override
            protected Long operation(Jedis jedis) throws Exception {
                return jedis.llen(queue);
            }
        }.exe();
        return (int) size;
    }

    
    @Override
    public boolean isEmpty() {
        boolean isEmpty = new OperationJedis<Boolean>() {

            @Override
            protected Boolean operation(Jedis jedis) throws Exception {
                return jedis.llen(queue) == 0;
            }
        }.exe();
        return isEmpty;
    }

    @Override
    public void clear() {
        new OperationJedis<Void>() {

            @Override
            protected Void operation(Jedis jedis) throws Exception {
                jedis.del(queue);
                return null;
            }
        }.exe();
    }

    @Override
    public boolean add(final BasicRequest e) {
        new OperationJedis<Void>() {

            @Override
            protected Void operation(Jedis jedis) throws Exception {
                byte[] data = objectToByte(e);
                jedis.lpush(queue, data);
                return null;
            }
        }.exe();
        return true;
    }


    public BasicRequest take() throws InterruptedException {
        BasicRequest basicRequest = null;

        byte[] data = new OperationJedis<byte[]>() {

            @Override
            protected byte[] operation(Jedis jedis) throws Exception {
                byte[] data = null;
                while(true){
                    data = jedis.rpop(queue);
                    if(data != null){
                        break;
                    }
                    Thread.sleep(100);
                }
                return data;
            }
        }.exe();

        basicRequest = byteToObject(data);

        return basicRequest;
    }

    @Override
    public boolean remove(final BasicRequest o) {
        new OperationJedis<Void>() {

            @Override
            protected Void operation(Jedis jedis) throws Exception {
                if(o instanceof BasicRequest){
                    byte[] data = objectToByte((BasicRequest) o);
                    jedis.lrem(queue, 0, data);
                }
                return null;
            }
        }.exe();
        return true;
    }

    public byte[] objectToByte(BasicRequest obj) {
        if (obj == null) {
            return null;
        }
        byte[] bytes = null;
        ByteArrayOutputStream bo = new ByteArrayOutputStream();
        ObjectOutputStream oo = null;
        try {
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (oo != null) {
                try {
                    oo.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (bo != null) {
                try {
                    bo.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return bytes;
    }

    public BasicRequest byteToObject(byte[] bytes) {
        if (bytes == null) {
            return null;
        }
        Serializable obj = null;
        ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
        ObjectInputStream oi = null;
        try {
            oi = new ObjectInputStream(bi);
            obj = (Serializable) oi.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (oi != null) {
                try {
                    oi.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (bi != null) {
                try {
                    bi.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return (BasicRequest) obj;
    }

    @SuppressWarnings("unused")
    private abstract class OperationJedis<E> {

        protected abstract E operation(Jedis jedis) throws Exception;

        public final E exe() {
            Jedis jedis = null;
            E result = null;
            try {
                jedis = pool.getResource();
                result = operation(jedis);
            } catch (Exception e) {
                e.printStackTrace();
                if (jedis != null) {
                    pool.returnBrokenResource(jedis);
                    jedis = null;
                }
            } finally {
                if (jedis != null) {
                    pool.returnResource(jedis);
                }
            }
            return result;
        }
    }
    
}



GuozhongCrawler实现的基于redis的队列

标签:guozhongcrawler 爬虫   分布式爬虫实现   分布式爬虫队列   

原文地址:http://blog.csdn.net/u012572945/article/details/46549521

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!