some monitor and JMX support #98

pull/121/head
yihua.huang 11 years ago
parent f39aa435cf
commit a5db6cf292

@ -6,6 +6,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.downloader.Downloader;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.monitor.SpiderListener;
import us.codecraft.webmagic.pipeline.CollectorPipeline;
import us.codecraft.webmagic.pipeline.ConsolePipeline;
import us.codecraft.webmagic.pipeline.Pipeline;
@ -101,6 +102,8 @@ public class Spider implements Runnable, Task {
private final AtomicInteger threadAlive = new AtomicInteger(0);
private List<SpiderListener> spiderListeners;
private final AtomicLong pageCount = new AtomicLong(0);
/**
@ -312,7 +315,9 @@ public class Spider implements Runnable, Task {
public void run() {
try {
processRequest(requestFinal);
onSuccess(requestFinal);
} catch (Exception e) {
onError(requestFinal);
logger.error("download " + requestFinal + " error", e);
} finally {
threadAlive.decrementAndGet();
@ -330,6 +335,22 @@ public class Spider implements Runnable, Task {
}
}
protected void onError(Request request) {
if (CollectionUtils.isNotEmpty(spiderListeners)){
for (SpiderListener spiderListener : spiderListeners) {
spiderListener.onError(request);
}
}
}
protected void onSuccess(Request request) {
if (CollectionUtils.isNotEmpty(spiderListeners)){
for (SpiderListener spiderListener : spiderListeners) {
spiderListener.onSuccess(request);
}
}
}
private void checkRunningStat() {
while (true) {
int statNow = stat.get();
@ -378,6 +399,7 @@ public class Spider implements Runnable, Task {
protected void processRequest(Request request) {
Page page = downloader.download(request, this);
if (page == null) {
onError(request);
sleep(site.getSleepTime());
return;
}
@ -659,4 +681,17 @@ public class Spider implements Runnable, Task {
public Site getSite() {
return site;
}
public List<SpiderListener> getSpiderListeners() {
return spiderListeners;
}
public Spider setSpiderListeners(List<SpiderListener> spiderListeners) {
this.spiderListeners = spiderListeners;
return this;
}
public Scheduler getScheduler() {
return scheduler;
}
}

@ -34,6 +34,12 @@ public abstract class AbstractDownloader implements Downloader {
return (Html) page.getHtml();
}
protected void onSuccess(Request request) {
}
protected void onError(Request request) {
}
protected Page addToCycleRetry(Request request, Site site) {
Page page = new Page();
Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);

@ -87,7 +87,9 @@ public class HttpClientDownloader extends AbstractDownloader {
String value = httpResponse.getEntity().getContentType().getValue();
charset = UrlUtils.getCharset(value);
}
return handleResponse(request, charset, httpResponse, task);
Page page = handleResponse(request, charset, httpResponse, task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
@ -97,6 +99,7 @@ public class HttpClientDownloader extends AbstractDownloader {
if (site.getCycleRetryTimes() > 0) {
return addToCycleRetry(request, site);
}
onError(request);
return null;
} finally {
try {

@ -0,0 +1,18 @@
package us.codecraft.webmagic.monitor;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.Scheduler;
/**
* The scheduler whose requests can be counted for monitor.
*
* @author code4crafter@gmail.com
* @since 0.5.0
*/
public interface MonitorableScheduler extends Scheduler {
public int getLeftRequestsCount(Task task);
public int getTotalRequestsCount(Task task);
}

@ -0,0 +1,14 @@
package us.codecraft.webmagic.monitor;
import us.codecraft.webmagic.Request;
/**
* @author code4crafer@gmail.com
* @since 0.5.0
*/
public interface SpiderListener {
public void onSuccess(Request request);
public void onError(Request request);
}

@ -0,0 +1,122 @@
package us.codecraft.webmagic.monitor;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.processor.example.GithubRepoPageProcessor;
import us.codecraft.webmagic.processor.example.OschinaBlogPageProcessor;
import javax.management.*;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author code4crafer@gmail.com
* @since 0.5.0
*/
public class SpiderMonitor implements SpiderMonitorMBean {
private List<SpiderStatus> spiderStatuses = new ArrayList<SpiderStatus>();
@Override
public List<SpiderStatus> getSpiders() {
return spiderStatuses;
}
@Override
public SpiderStatus getSpider() {
return spiderStatuses.get(0);
}
public void register(Spider spider) {
MonitorSpiderListener monitorSpiderListener = new MonitorSpiderListener();
if (spider.getSpiderListeners() == null) {
List<SpiderListener> spiderListeners = new ArrayList<SpiderListener>();
spiderListeners.add(monitorSpiderListener);
spider.setSpiderListeners(spiderListeners);
} else {
spider.getSpiderListeners().add(monitorSpiderListener);
}
spiderStatuses.add(new SpiderStatus(spider, monitorSpiderListener));
}
public class MonitorSpiderListener implements SpiderListener {
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger errorCount = new AtomicInteger(0);
private List<String> errorUrls = Collections.synchronizedList(new ArrayList<String>());
@Override
public void onSuccess(Request request) {
successCount.incrementAndGet();
}
@Override
public void onError(Request request) {
errorUrls.add(request.getUrl());
errorCount.incrementAndGet();
}
public AtomicInteger getSuccessCount() {
return successCount;
}
public AtomicInteger getErrorCount() {
return errorCount;
}
public List<String> getErrorUrls() {
return errorUrls;
}
}
public static void main(String[] args) throws MalformedObjectNameException,
NullPointerException, InstanceAlreadyExistsException,
MBeanRegistrationException, NotCompliantMBeanException, IOException {
int rmiPort = 1099;
SpiderMonitor spiderMonitor = new SpiderMonitor();
String jmxServerName = "TestJMXServer";
Spider oschinaSpider = Spider.create(new OschinaBlogPageProcessor()).addUrl("http://my.oschina.net/flashsword/blog").thread(2);
spiderMonitor.register(oschinaSpider);
Spider githubSpider = Spider.create(new GithubRepoPageProcessor()).addUrl("https://github.com/code4craft");
spiderMonitor.register(githubSpider);
// jdkfolder/bin/rmiregistry.exe 9999
Registry registry = LocateRegistry.createRegistry(rmiPort);
MBeanServer mbs = MBeanServerFactory.createMBeanServer(jmxServerName);
//MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName objName = new ObjectName(jmxServerName + ":name=" + "HelloWorld");
mbs.registerMBean(spiderMonitor, objName);
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + rmiPort + "/" + jmxServerName);
System.out.println("JMXServiceURL: " + url.toString());
JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);
jmxConnServer.start();
for (SpiderStatus spiderStatuse : spiderMonitor.spiderStatuses) {
objName = new ObjectName(jmxServerName + ":name=" + spiderStatuse.getName());
mbs.registerMBean(spiderStatuse, objName);
}
}
}

@ -0,0 +1,14 @@
package us.codecraft.webmagic.monitor;
import java.util.List;
/**
* @author code4crafer@gmail.com
*/
public interface SpiderMonitorMBean {
public List<SpiderStatus> getSpiders();
public SpiderStatus getSpider();
}

@ -0,0 +1,52 @@
package us.codecraft.webmagic.monitor;
import us.codecraft.webmagic.Spider;
import java.util.List;
/**
* @author code4crafer@gmail.com
* @since 0.5.0
*/
public class SpiderStatus implements SpiderStatusMBean{
private final Spider spider;
private final SpiderMonitor.MonitorSpiderListener monitorSpiderListener;
public SpiderStatus(Spider spider, SpiderMonitor.MonitorSpiderListener monitorSpiderListener) {
this.spider = spider;
this.monitorSpiderListener = monitorSpiderListener;
}
public String getName() {
return spider.getUUID();
}
public int getLeftPages() {
if (spider.getScheduler() instanceof MonitorableScheduler) {
return ((MonitorableScheduler) spider.getScheduler()).getLeftRequestsCount(spider);
}
return -1;
}
public int getTotalPages() {
if (spider.getScheduler() instanceof MonitorableScheduler) {
return ((MonitorableScheduler) spider.getScheduler()).getTotalRequestsCount(spider);
}
return -1;
}
public List<String> getErrorPages() {
return monitorSpiderListener.getErrorUrls();
}
public void start() {
spider.start();
}
public void stop() {
spider.stop();
}
}

@ -0,0 +1,22 @@
package us.codecraft.webmagic.monitor;
import java.util.List;
/**
* @author code4crafer@gmail.com
* @since 0.5.0
*/
public interface SpiderStatusMBean {
public String getName();
public int getLeftPages();
public int getTotalPages();
public List<String> getErrorPages();
public void start();
public void stop();
}

@ -5,6 +5,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.monitor.MonitorableScheduler;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -15,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @author code4crafter@gmail.com
* @since 0.5.0
*/
public abstract class LocalDuplicatedRemovedScheduler implements Scheduler {
public abstract class LocalDuplicatedRemovedScheduler implements MonitorableScheduler {
protected Logger logger = LoggerFactory.getLogger(getClass());
@ -34,5 +35,10 @@ public abstract class LocalDuplicatedRemovedScheduler implements Scheduler {
return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
}
@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
protected abstract void pushWhenNoDuplicate(Request request, Task task);
}

@ -60,4 +60,9 @@ public class PriorityScheduler extends LocalDuplicatedRemovedScheduler {
}
return priorityQueueMinus.poll();
}
@Override
public int getLeftRequestsCount(Task task) {
return noPriorityQueue.size();
}
}

@ -29,4 +29,9 @@ public class QueueScheduler extends LocalDuplicatedRemovedScheduler {
public synchronized Request poll(Task task) {
return queue.poll();
}
@Override
public int getLeftRequestsCount(Task task) {
return queue.size();
}
}

@ -161,4 +161,9 @@ public class FileCacheQueueScheduler extends LocalDuplicatedRemovedScheduler {
fileCursorWriter.println(cursor.incrementAndGet());
return queue.poll();
}
@Override
public int getLeftRequestsCount(Task task) {
return queue.size();
}
}

@ -7,6 +7,7 @@ import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.monitor.MonitorableScheduler;
/**
* Use Redis as url scheduler for distributed crawlers.<br>
@ -14,7 +15,7 @@ import us.codecraft.webmagic.Task;
* @author code4crafter@gmail.com <br>
* @since 0.2.0
*/
public class RedisScheduler implements Scheduler {
public class RedisScheduler implements MonitorableScheduler {
private JedisPool pool;
@ -39,10 +40,10 @@ public class RedisScheduler implements Scheduler {
// if cycleRetriedTimes is set, allow duplicated.
Object cycleRetriedTimes = request.getExtra(Request.CYCLE_TRIED_TIMES);
// use set to remove duplicate url
if (cycleRetriedTimes != null || !jedis.sismember(SET_PREFIX + task.getUUID(), request.getUrl())) {
if (cycleRetriedTimes != null || !jedis.sismember(getSetKey(task), request.getUrl())) {
// use list to store queue
jedis.rpush(QUEUE_PREFIX + task.getUUID(), request.getUrl());
jedis.sadd(SET_PREFIX + task.getUUID(), request.getUrl());
jedis.rpush(getQueueKey(task), request.getUrl());
jedis.sadd(getSetKey(task), request.getUrl());
if (request.getExtras() != null) {
String field = DigestUtils.shaHex(request.getUrl());
String value = JSON.toJSONString(request);
@ -58,7 +59,7 @@ public class RedisScheduler implements Scheduler {
public synchronized Request poll(Task task) {
Jedis jedis = pool.getResource();
try {
String url = jedis.lpop(QUEUE_PREFIX + task.getUUID());
String url = jedis.lpop(getQueueKey(task));
if (url == null) {
return null;
}
@ -75,4 +76,34 @@ public class RedisScheduler implements Scheduler {
pool.returnResource(jedis);
}
}
protected String getSetKey(Task task) {
return SET_PREFIX + task.getUUID();
}
protected String getQueueKey(Task task) {
return QUEUE_PREFIX + task.getUUID();
}
@Override
public int getLeftRequestsCount(Task task) {
Jedis jedis = pool.getResource();
try {
Long size = jedis.llen(getQueueKey(task));
return size.intValue();
} finally {
pool.returnResource(jedis);
}
}
@Override
public int getTotalRequestsCount(Task task) {
Jedis jedis = pool.getResource();
try {
Long size = jedis.scard(getQueueKey(task));
return size.intValue();
} finally {
pool.returnResource(jedis);
}
}
}

Loading…
Cancel
Save