前言:
经过前面两篇文章,你想大家应该已经知道网络爬虫是怎么一回事了。这篇文章会在之前做过的事情上做一些改进,以及说明之前的做法的不足之处。
思路分析:
1.逻辑结构图
上图中展示的就是我们网络爬虫中的整个逻辑思路(调用Python解析URL,这里只作了简略的展示)。
2.思路说明:
首先,我们来把之前思路梳理一下。之前我们采用的两个队列Queue来保存已经访问过和待访问的链接列表,并采用广度优先搜索进行递归访问这些待访问的链接地址。而且这里使用的是单线程操作。在对数据库的操作中,我们添加了一个辅助字段cipher_address来进行“唯一”性保证,因为我们担心MySQL在对过长的url链接操作时会有一些不尽如人意。
我不知道上面这一段能否让你对之前我们处理Spider的做法有一个大概的了解,如果你还没有太明白这是怎么一回事。你可以访问《网络爬虫初步:从访问网页到数据解析》和《网络爬虫初步:从一个入口链接开始不断抓取页面中的网址并入库》这两篇文章进行了解。
下面我就来说明一下,之前的做法存在的问题:
1.单线程:采用单线程的做法,可以说相当不科学,尤其是对付这样一个大数据的问题。所以,我们需要采用多线程来处理问题,这里会用到多线程中的线程池。
2.数据存储方式:如果我们采用内存去保存数据,这样会有一个问题,因为数据量非常大,所以程序在运行的过种中必然会内存溢出。而事实也正是如此:
3.Url去重的方式:如果我们对Url进行MD5或是SHA1进行加密的方式进行哈希的话,这样会有一个效率的隐患。不过的确这个问题并不那么复杂。对效率的影响也很小。不过,还好Java自身就已经对String型的数据有哈希的函数可以直接调用:hashCode()
代码及说明:
linkSpider.java
public class linkSpider {
private SpiderQueue queue = null;
public void ErgodicNetworklink(String startAddress) {
if (startAddress == null) {
return;
}
SpiderBLL.insertEntry2DB(startAddress);
List modelList = new ArrayList();
queue = SpiderBLL.getAddressQueue(startAddress, 0);
if (queue.isQueueEmpty()) {
System.out.println("Your address cannot get more address.");
return;
}
ThreadPoolExecutor threadPool = getThreadPool();
int index = 0;
boolean breakFlag = false;
while (!breakFlag) {
// 待访问队列为空时的处理
if (queue.isQueueEmpty()) {
System.out.println("queue is null...");
modelList = DBBLL.getUnvisitedInfoModels(queue.MAX_SIZE);
if (modelList == null || modelList.size() == 0) {
breakFlag = true;
} else {
for (WebInfoModel webInfoModel : modelList) {
queue.offer(webInfoModel);
DBBLL.updateUnvisited(webInfoModel);
}
}
}
WebInfoModel model = queue.poll();
if (model == null) {
continue;
}
// 判断此网站是否已经访问过
if (DBBLL.isWebInfoModelExist(model)) {
// 如果已经被访问,进入下一次循环
System.out.println("已存在此网站(" + model.getName() + ")");
continue;
}
poolQueueFull(threadPool);
System.out.println("LEVEL: [" + model.getLevel() + "] NAME: " + model.getName());
SpiderRunner runner = new SpiderRunner(model.getAddress(), model.getLevel(), index++);
threadPool.execute(runner);
SystemBLL.cleanSystem(index);
// 对已访问的address进行入库
DBBLL.insert(model);
}
threadPool.shutdown();
}
private ThreadPoolExecutor getThreadPool() {
final int MAXIMUM_POOL_SIZE = 520;
final int CORE_POOL_SIZE = 500;
return new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 3, TimeUnit.SECONDS, new ArrayBlockingQueue(MAXIMUM_POOL_SIZE), new ThreadPoolExecutor.DiscardOldestPolicy());
}
private void poolQueueFull(ThreadPoolExecutor threadPool) {
while (getQueueSize(threadPool.getQueue()) >= threadPool.getMaximumPoolSize()) {
System.out.println("线程池队列已满,等3秒再添加任务");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private synchronized int getQueueSize(Queue queue) {
return queue.size();
}
class SpiderRunner implements Runnable {
private String address;
private SpiderQueue auxiliaryQueue; // 记录访问某一个网页中解析出的网址
private int index;
private int parentLevel;
public SpiderRunner(String address, int parentLevel, int index) {
this.index = index;
this.address = address;
this.parentLevel = parentLevel;
}
public void run() {
auxiliaryQueue = SpiderBLL.getAddressQueue(address, parentLevel);
System.out.println("[" + index + "]: " + address);
DBBLL.insert2Unvisited(auxiliaryQueue, index);
auxiliaryQueue = null;
}
}
}在上面的ErgodicNetworklink方法代码中,大家可以看到我们已经把使用Queue保存数据的方式改为使用数据库存储。这样做的好处就是我们不用再为OOM而烦恼了。而且,上面的代码也使用了线程池。使用多线程来执行在调用Python获得链接列表的操作。
而对于哈希Url的做法,可以参考如下关键代码:
public static void insert2Unvisited(WebInfoModel model) {
if (model == null) {
return;
}
String sql = "INSERT INTO unvisited_site(name, address, hash_address, date, visited, level) VALUES('" + model.getName() + "', '" + model.getAddress() + "', " + model.getAddress().hashCode() + ", " + System.currentTimeMillis() + ", 0, " + model.getLevel() + ");";
DBServer db = null;
try {
db = new DBServer();
db.insert(sql);
db.close();
} catch (Exception e) {
System.out.println("your sql is: " + sql);
e.printStackTrace();
} finally {
db.close();
}
}
PythonUtils.java
这个类是与Python进行交互操作的类。代码如下:
public class PythonUtils {
// Python文件的所在路径
private static final String PY_PATH = "/root/python/WeblinkSpider/html_parser.py";
private static String[] getShellArgs(String address) {
String[] shellParas = new String[3];
shellParas[0] = "python";
shellParas[1] = PY_PATH;
shellParas[2] = address.replace(""", "\"");
return shellParas;
}
private static WebInfoModel parserWebInfoModel(String info, int parentLevel) {
if (BEEStringTools.isEmptyString(info)) {
return null;
}
String[] infos = info.split("\$#\$");
if (infos.length != 2) {
return null;
}
if (BEEStringTools.isEmptyString(infos[0].trim())) {
return null;
}
if (BEEStringTools.isEmptyString(infos[1].trim()) || infos[1].trim().equals("http://") || infos[1].trim().equals("https://")) {
return null;
}
WebInfoModel model = new WebInfoModel();
model.setName(infos[0].trim());
model.setAddress(infos[1]);
model.setLevel(parentLevel + 1);
return model;
}
private static SpiderQueue getAddressQueueByPython(String[] shellParas, int parentLevel) {
if (shellParas == null) {
return null;
}
Runtime r = Runtime.getRuntime();
SpiderQueue queue = null;
try {
Process p = r.exec(shellParas);
BufferedReader bfr = new BufferedReader(new InputStreamReader(p.getInputStream()));
queue = new SpiderQueue();
String line = "";
WebInfoModel model = null;
while((line = bfr.readLine()) != null) {
// System.out.println("----------> from python: " + line);
if (BEEStringTools.isEmptyString(line.trim())) {
continue;
}
if (HttpBLL.isErrorStateCode(line)) {
break;
}
model = parserWebInfoModel(line, parentLevel);
if (model == null) {
continue;
}
queue.offer(model);
}
model = null;
line = null;
} catch (IOException e) {
e.printStackTrace();
} finally {
r = null;
}
return queue;
}
public static SpiderQueue getAddressQueueByPython(String address, int parentLevel) {
return getAddressQueueByPython(getShellArgs(address), parentLevel);
}
}
遇到的问题:
1.请使用Python2.7
因为Python2.6中HTMLParser还是有一些缺陷的,例如下图中展示的。不过在Python2.7中,这个问题就不再是问题了。
2.数据库崩溃了
数据库崩溃的原因可能是待访问的数据表中的数据过大引起的。
3.对数据库的同步操作
上面的做法是对数据库操作进行同步时出现的问题,如果不进行同步,我们会得到数据库连接数超过最大连接数的异常信息。对于这个问题有望在下篇文章中进行解决。