webmagic之Redis调度器

爬虫redis调度器实现功能如下:

  • 待爬取url判重(列表页或详细页);
  • 待爬取url存储至本地内存;
  • 待爬取url存储至redis(列表页或详细页);
  • 待爬取url添加优先级(加入评分score,以便优先爬取)
  • 入队出队逻辑

talk is cheap,show you guys the code:

public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {
    private JedisPool pool;
    private String spiderUUID;
    private boolean isEmpty;
    private static final String QUEUE_PREFIX = "queue_";
    private static final String SET_PREFIX = "set_";
    private static final String ITEM_PREFIX = "item_";
    private BlockingQueue<Request> queue;

    public RedisScheduler(String host, int port, String password, String spiderUUID) {
        this(new JedisPool(new JedisPoolConfig(), host, port, 2000, password));
        this.spiderUUID = spiderUUID;
    }

    public RedisScheduler(JedisPool pool) {
        this.isEmpty = false;
        this.queue = new LinkedBlockingQueue();
        this.pool = pool;
        this.setDuplicateRemover(this);
    }

    public void resetDuplicateCheck(Task task) {
        Jedis jedis = this.pool.getResource();

        try {
            jedis.del(new String[]{this.getSetKey(task)});
        } finally {
            this.pool.returnResource(jedis);
        }

    }

    public boolean isDuplicate(Request request, Task task) {
        Jedis jedis = this.pool.getResource();
        Boolean nocheck = (Boolean)request.getExtra("nocheckdup");

        boolean var5;
        try {
            if (nocheck != null && nocheck) {
                var5 = false;
            } else {
                Long num = jedis.sadd(this.spiderUUID + ":dupefilter", new String[]{Decript.SHA1(request.getUrl())});
                if (num == 1L) {
                    var5 = false;
                } else {
                    var5 = true;
                }
            }
        } finally {
            this.pool.returnResource(jedis);
        }

        return var5;
    }

    protected void pushWhenNoDuplicate(Request request, Task task) {
        Jedis jedis = this.pool.getResource();
        int score = false;
        int score;
        if (request.getExtra("score") != null) {
            int tempScore = (Integer)request.getExtra("score");
            if (tempScore < 100) {
                score = tempScore * 100;
            } else {
                Integer i = (Integer)request.getExtra("retry");
                if (i != null) {
                    score = tempScore - i;
                } else {
                    score = tempScore;
                }
            }
        } else {
            score = 2000;
        }

        request.putExtra("score", score);

        try {
            jedis.zadd(this.spiderUUID + ":requests", (double)score, JSON.toJSONString(request));
        } finally {
            this.pool.returnResource(jedis);
        }

        this.isEmpty = false;
    }

    public synchronized void pushLocal(Request request) {
        try {
            this.queue.put(request);
        } catch (InterruptedException var3) {
            var3.printStackTrace();
        }

    }

    public synchronized Boolean listExists() {
        Jedis jedis = this.pool.getResource();

        Boolean exists;
        try {
            exists = jedis.exists(this.spiderUUID + ":list-requests");
        } finally {
            this.pool.returnResource(jedis);
        }

        return exists;
    }

    public synchronized Request listPoll() {
        Jedis jedis = this.pool.getResource();

        Request var8;
        try {
            String str = jedis.lpop(this.spiderUUID + ":list-requests");
            if (str == null) {
                var8 = null;
            } else {
                Request request = (Request)JSON.parseObject(str, Request.class);
                this.logger.debug("poll list from queue {}", request.getUrl());
                var8 = request;
            }
        } finally {
            this.pool.returnResource(jedis);
        }

        return var8;
    }

    public synchronized void setClusterStartFlag(String value) {
        Jedis jedis = this.pool.getResource();

        try {
            jedis.set(this.spiderUUID + ":cluster-status", value);
        } finally {
            this.pool.returnResource(jedis);
        }

    }

    public synchronized Long getClusterStartFlagVal() {
        Jedis jedis = this.pool.getResource();
        Long timestamp = null;

        try {
            Boolean isExist = this.getClusterStartFlag();
            if (isExist) {
                timestamp = Long.parseLong(jedis.get(this.spiderUUID + ":cluster-status"));
                jedis.del(this.spiderUUID + ":cluster-status");
            }
        } finally {
            this.pool.returnResource(jedis);
        }

        return timestamp;
    }

    public synchronized Boolean getClusterStartFlag() {
        Jedis jedis = this.pool.getResource();

        Boolean exists;
        try {
            exists = jedis.exists(this.spiderUUID + ":cluster-status");
        } finally {
            this.pool.returnResource(jedis);
        }

        return exists;
    }

    public synchronized void listPush(Request request) {
        Jedis jedis = this.pool.getResource();

        try {
            jedis.lpush(this.spiderUUID + ":list-requests", new String[]{JSON.toJSONString(request)});
        } finally {
            this.pool.returnResource(jedis);
        }

    }

    public synchronized Request poll(Task task) {
        if (this.queue.size() > 0) {
            return (Request)this.queue.poll();
        } else {
            Jedis jedis = this.pool.getResource();

            Request var8;
            try {
                Transaction t = jedis.multi();
                t.zrevrange(this.spiderUUID + ":requests", 0L, 0L);
                t.zremrangeByRank(this.spiderUUID + ":requests", -1L, -1L);
                Set<String> request1 = (Set)t.exec().get(0);
                Iterator it = request1.iterator();
                String str = null;

                while(true) {
                    if (!it.hasNext()) {
                        Map<String, String> a = (Map)JSON.parse(str);
                        if (str == null) {
                            var8 = null;
                        } else {
                            Request request = (Request)JSON.parseObject(str, Request.class);
                            this.logger.debug("poll from queue {}", request.getUrl());
                            var8 = request;
                        }
                        break;
                    }

                    str = (String)it.next();
                }
            } finally {
                this.pool.returnResource(jedis);
            }

            if (var8 == null) {
                this.isEmpty = true;
            } else {
                this.isEmpty = false;
            }

            return var8;
        }
    }

    protected String getSetKey(Task task) {
        return "set_" + task.getUUID();
    }

    protected String getQueueKey(Task task) {
        return "queue_" + task.getUUID();
    }

    public int getLeftRequestsCount(Task task) {
        Jedis jedis = this.pool.getResource();

        int var4;
        try {
            Long size = jedis.llen(this.getQueueKey(task));
            var4 = size.intValue();
        } finally {
            this.pool.returnResource(jedis);
        }

        return var4;
    }

    public int getTotalRequestsCount(Task task) {
        Jedis jedis = this.pool.getResource();

        int var4;
        try {
            Long size = jedis.scard(this.getSetKey(task));
            var4 = size.intValue();
        } finally {
            this.pool.returnResource(jedis);
        }

        return var4;
    }

    public boolean isEmpty() {
        return this.isEmpty;
    }
}

其中加密方法如下:

public class Decript {
    public Decript() {
    }

    public static String SHA1(String decript) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-1");
            digest.update(decript.getBytes());
            byte[] messageDigest = digest.digest();
            StringBuffer hexString = new StringBuffer();

            for(int i = 0; i < messageDigest.length; ++i) {
                String shaHex = Integer.toHexString(messageDigest[i] & 255);
                if (shaHex.length() < 2) {
                    hexString.append(0);
                }

                hexString.append(shaHex);
            }

            return hexString.toString();
        } catch (NoSuchAlgorithmException var6) {
            var6.printStackTrace();
            return "";
        }
    }

    public static String SHA(String decript) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA");
            digest.update(decript.getBytes());
            byte[] messageDigest = digest.digest();
            StringBuffer hexString = new StringBuffer();

            for(int i = 0; i < messageDigest.length; ++i) {
                String shaHex = Integer.toHexString(messageDigest[i] & 255);
                if (shaHex.length() < 2) {
                    hexString.append(0);
                }

                hexString.append(shaHex);
            }

            return hexString.toString();
        } catch (NoSuchAlgorithmException var6) {
            var6.printStackTrace();
            return "";
        }
    }

    public static String MD5(String input) {
        try {
            MessageDigest mdInst = MessageDigest.getInstance("MD5");
            mdInst.update(input.getBytes());
            byte[] md = mdInst.digest();
            StringBuffer hexString = new StringBuffer();

            for(int i = 0; i < md.length; ++i) {
                String shaHex = Integer.toHexString(md[i] & 255);
                if (shaHex.length() < 2) {
                    hexString.append(0);
                }

                hexString.append(shaHex);
            }

            return hexString.toString();
        } catch (NoSuchAlgorithmException var6) {
            var6.printStackTrace();
            return "";
        }
    }

    public static byte[] encryptAES(String content, String password) {
        try {
            KeyGenerator kgen = KeyGenerator.getInstance("AES");
            kgen.init(128, new SecureRandom(password.getBytes()));
            SecretKey secretKey = kgen.generateKey();
            byte[] enCodeFormat = secretKey.getEncoded();
            SecretKeySpec key = new SecretKeySpec(enCodeFormat, "AES");
            Cipher cipher = Cipher.getInstance("AES");
            byte[] byteContent = content.getBytes("utf-8");
            cipher.init(1, key);
            byte[] result = cipher.doFinal(byteContent);
            return result;
        } catch (NoSuchAlgorithmException var9) {
            var9.printStackTrace();
        } catch (NoSuchPaddingException var10) {
            var10.printStackTrace();
        } catch (InvalidKeyException var11) {
            var11.printStackTrace();
        } catch (UnsupportedEncodingException var12) {
            var12.printStackTrace();
        } catch (IllegalBlockSizeException var13) {
            var13.printStackTrace();
        } catch (BadPaddingException var14) {
            var14.printStackTrace();
        }

        return null;
    }

    public static byte[] decryptAES(byte[] content, String password) {
        try {
            KeyGenerator kgen = KeyGenerator.getInstance("AES");
            kgen.init(128, new SecureRandom(password.getBytes()));
            SecretKey secretKey = kgen.generateKey();
            byte[] enCodeFormat = secretKey.getEncoded();
            SecretKeySpec key = new SecretKeySpec(enCodeFormat, "AES");
            Cipher cipher = Cipher.getInstance("AES");
            cipher.init(2, key);
            byte[] result = cipher.doFinal(content);
            return result;
        } catch (NoSuchAlgorithmException var8) {
            var8.printStackTrace();
        } catch (NoSuchPaddingException var9) {
            var9.printStackTrace();
        } catch (InvalidKeyException var10) {
            var10.printStackTrace();
        } catch (IllegalBlockSizeException var11) {
            var11.printStackTrace();
        } catch (BadPaddingException var12) {
            var12.printStackTrace();
        }

        return null;
    }

    public static String decryptBASE64(String key) {
        return "";
    }

    public static String encryptBASE64(String key) {
        return "";
    }

    public static String bytesToHexString(byte[] bArray) {
        StringBuffer sb = new StringBuffer(bArray.length);

        for(int i = 0; i < bArray.length; ++i) {
            sb.append('%');
            String sTemp = Integer.toHexString(255 & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }

            sb.append(sTemp.toUpperCase());
        }

        return sb.toString();
    }
}

 

上一篇:Webmagic爬取网页内容时的“空格”变为“?”的问题


下一篇:spark学习第1天