爬虫redis调度器实现功能如下:
- 待爬取url判重(列表页或详细页);
- 待爬取url存储至本地内存;
- 待爬取url存储至redis(列表页或详细页);
- 待爬取url添加优先级(加入评分score,以便优先爬取)
- 入队出队逻辑
talk is cheap,show you guys the code:
public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {
private JedisPool pool;
private String spiderUUID;
private boolean isEmpty;
private static final String QUEUE_PREFIX = "queue_";
private static final String SET_PREFIX = "set_";
private static final String ITEM_PREFIX = "item_";
private BlockingQueue<Request> queue;
public RedisScheduler(String host, int port, String password, String spiderUUID) {
this(new JedisPool(new JedisPoolConfig(), host, port, 2000, password));
this.spiderUUID = spiderUUID;
}
public RedisScheduler(JedisPool pool) {
this.isEmpty = false;
this.queue = new LinkedBlockingQueue();
this.pool = pool;
this.setDuplicateRemover(this);
}
public void resetDuplicateCheck(Task task) {
Jedis jedis = this.pool.getResource();
try {
jedis.del(new String[]{this.getSetKey(task)});
} finally {
this.pool.returnResource(jedis);
}
}
public boolean isDuplicate(Request request, Task task) {
Jedis jedis = this.pool.getResource();
Boolean nocheck = (Boolean)request.getExtra("nocheckdup");
boolean var5;
try {
if (nocheck != null && nocheck) {
var5 = false;
} else {
Long num = jedis.sadd(this.spiderUUID + ":dupefilter", new String[]{Decript.SHA1(request.getUrl())});
if (num == 1L) {
var5 = false;
} else {
var5 = true;
}
}
} finally {
this.pool.returnResource(jedis);
}
return var5;
}
protected void pushWhenNoDuplicate(Request request, Task task) {
Jedis jedis = this.pool.getResource();
int score = false;
int score;
if (request.getExtra("score") != null) {
int tempScore = (Integer)request.getExtra("score");
if (tempScore < 100) {
score = tempScore * 100;
} else {
Integer i = (Integer)request.getExtra("retry");
if (i != null) {
score = tempScore - i;
} else {
score = tempScore;
}
}
} else {
score = 2000;
}
request.putExtra("score", score);
try {
jedis.zadd(this.spiderUUID + ":requests", (double)score, JSON.toJSONString(request));
} finally {
this.pool.returnResource(jedis);
}
this.isEmpty = false;
}
public synchronized void pushLocal(Request request) {
try {
this.queue.put(request);
} catch (InterruptedException var3) {
var3.printStackTrace();
}
}
public synchronized Boolean listExists() {
Jedis jedis = this.pool.getResource();
Boolean exists;
try {
exists = jedis.exists(this.spiderUUID + ":list-requests");
} finally {
this.pool.returnResource(jedis);
}
return exists;
}
public synchronized Request listPoll() {
Jedis jedis = this.pool.getResource();
Request var8;
try {
String str = jedis.lpop(this.spiderUUID + ":list-requests");
if (str == null) {
var8 = null;
} else {
Request request = (Request)JSON.parseObject(str, Request.class);
this.logger.debug("poll list from queue {}", request.getUrl());
var8 = request;
}
} finally {
this.pool.returnResource(jedis);
}
return var8;
}
public synchronized void setClusterStartFlag(String value) {
Jedis jedis = this.pool.getResource();
try {
jedis.set(this.spiderUUID + ":cluster-status", value);
} finally {
this.pool.returnResource(jedis);
}
}
public synchronized Long getClusterStartFlagVal() {
Jedis jedis = this.pool.getResource();
Long timestamp = null;
try {
Boolean isExist = this.getClusterStartFlag();
if (isExist) {
timestamp = Long.parseLong(jedis.get(this.spiderUUID + ":cluster-status"));
jedis.del(this.spiderUUID + ":cluster-status");
}
} finally {
this.pool.returnResource(jedis);
}
return timestamp;
}
public synchronized Boolean getClusterStartFlag() {
Jedis jedis = this.pool.getResource();
Boolean exists;
try {
exists = jedis.exists(this.spiderUUID + ":cluster-status");
} finally {
this.pool.returnResource(jedis);
}
return exists;
}
public synchronized void listPush(Request request) {
Jedis jedis = this.pool.getResource();
try {
jedis.lpush(this.spiderUUID + ":list-requests", new String[]{JSON.toJSONString(request)});
} finally {
this.pool.returnResource(jedis);
}
}
public synchronized Request poll(Task task) {
if (this.queue.size() > 0) {
return (Request)this.queue.poll();
} else {
Jedis jedis = this.pool.getResource();
Request var8;
try {
Transaction t = jedis.multi();
t.zrevrange(this.spiderUUID + ":requests", 0L, 0L);
t.zremrangeByRank(this.spiderUUID + ":requests", -1L, -1L);
Set<String> request1 = (Set)t.exec().get(0);
Iterator it = request1.iterator();
String str = null;
while(true) {
if (!it.hasNext()) {
Map<String, String> a = (Map)JSON.parse(str);
if (str == null) {
var8 = null;
} else {
Request request = (Request)JSON.parseObject(str, Request.class);
this.logger.debug("poll from queue {}", request.getUrl());
var8 = request;
}
break;
}
str = (String)it.next();
}
} finally {
this.pool.returnResource(jedis);
}
if (var8 == null) {
this.isEmpty = true;
} else {
this.isEmpty = false;
}
return var8;
}
}
protected String getSetKey(Task task) {
return "set_" + task.getUUID();
}
protected String getQueueKey(Task task) {
return "queue_" + task.getUUID();
}
public int getLeftRequestsCount(Task task) {
Jedis jedis = this.pool.getResource();
int var4;
try {
Long size = jedis.llen(this.getQueueKey(task));
var4 = size.intValue();
} finally {
this.pool.returnResource(jedis);
}
return var4;
}
public int getTotalRequestsCount(Task task) {
Jedis jedis = this.pool.getResource();
int var4;
try {
Long size = jedis.scard(this.getSetKey(task));
var4 = size.intValue();
} finally {
this.pool.returnResource(jedis);
}
return var4;
}
public boolean isEmpty() {
return this.isEmpty;
}
}
其中加密方法如下:
public class Decript {
public Decript() {
}
public static String SHA1(String decript) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-1");
digest.update(decript.getBytes());
byte[] messageDigest = digest.digest();
StringBuffer hexString = new StringBuffer();
for(int i = 0; i < messageDigest.length; ++i) {
String shaHex = Integer.toHexString(messageDigest[i] & 255);
if (shaHex.length() < 2) {
hexString.append(0);
}
hexString.append(shaHex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException var6) {
var6.printStackTrace();
return "";
}
}
public static String SHA(String decript) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA");
digest.update(decript.getBytes());
byte[] messageDigest = digest.digest();
StringBuffer hexString = new StringBuffer();
for(int i = 0; i < messageDigest.length; ++i) {
String shaHex = Integer.toHexString(messageDigest[i] & 255);
if (shaHex.length() < 2) {
hexString.append(0);
}
hexString.append(shaHex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException var6) {
var6.printStackTrace();
return "";
}
}
public static String MD5(String input) {
try {
MessageDigest mdInst = MessageDigest.getInstance("MD5");
mdInst.update(input.getBytes());
byte[] md = mdInst.digest();
StringBuffer hexString = new StringBuffer();
for(int i = 0; i < md.length; ++i) {
String shaHex = Integer.toHexString(md[i] & 255);
if (shaHex.length() < 2) {
hexString.append(0);
}
hexString.append(shaHex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException var6) {
var6.printStackTrace();
return "";
}
}
public static byte[] encryptAES(String content, String password) {
try {
KeyGenerator kgen = KeyGenerator.getInstance("AES");
kgen.init(128, new SecureRandom(password.getBytes()));
SecretKey secretKey = kgen.generateKey();
byte[] enCodeFormat = secretKey.getEncoded();
SecretKeySpec key = new SecretKeySpec(enCodeFormat, "AES");
Cipher cipher = Cipher.getInstance("AES");
byte[] byteContent = content.getBytes("utf-8");
cipher.init(1, key);
byte[] result = cipher.doFinal(byteContent);
return result;
} catch (NoSuchAlgorithmException var9) {
var9.printStackTrace();
} catch (NoSuchPaddingException var10) {
var10.printStackTrace();
} catch (InvalidKeyException var11) {
var11.printStackTrace();
} catch (UnsupportedEncodingException var12) {
var12.printStackTrace();
} catch (IllegalBlockSizeException var13) {
var13.printStackTrace();
} catch (BadPaddingException var14) {
var14.printStackTrace();
}
return null;
}
public static byte[] decryptAES(byte[] content, String password) {
try {
KeyGenerator kgen = KeyGenerator.getInstance("AES");
kgen.init(128, new SecureRandom(password.getBytes()));
SecretKey secretKey = kgen.generateKey();
byte[] enCodeFormat = secretKey.getEncoded();
SecretKeySpec key = new SecretKeySpec(enCodeFormat, "AES");
Cipher cipher = Cipher.getInstance("AES");
cipher.init(2, key);
byte[] result = cipher.doFinal(content);
return result;
} catch (NoSuchAlgorithmException var8) {
var8.printStackTrace();
} catch (NoSuchPaddingException var9) {
var9.printStackTrace();
} catch (InvalidKeyException var10) {
var10.printStackTrace();
} catch (IllegalBlockSizeException var11) {
var11.printStackTrace();
} catch (BadPaddingException var12) {
var12.printStackTrace();
}
return null;
}
public static String decryptBASE64(String key) {
return "";
}
public static String encryptBASE64(String key) {
return "";
}
public static String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
for(int i = 0; i < bArray.length; ++i) {
sb.append('%');
String sTemp = Integer.toHexString(255 & bArray[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
}