So one of the good designs could be implementing some kind of thread pools.
Download Source
Test Class
package com.ehsunbehravesh.asyncwebreader; import java.util.HashMap; import java.util.Observable; import java.util.Observer; public class Test2 { public static void main(String[] args) throws Exception { AsyncWebReader webReader = new AsyncWebReader(5/* threads */, new String[]{ "http://www.google.com", "http://www.yahoo.com", "http://www.live.com", "http://www.wikipedia.com", "http://www.facebook.com", "http://www.khorasannews.com", "http://www.fcbarcelona.com", "http://www.khorasannews.com", }); webReader.addObserver(new Observer() { @Override public void update(Observable o, Object arg) { if (arg instanceof Exception) { Exception ex = (Exception) arg; System.out.println(ex.getMessage()); } /*else if (arg instanceof List) { Listvals = (List ) arg; System.out.println(vals.get(0) + ": " + vals.get(1)); } */else if (arg instanceof Object[]) { Object[] objects = (Object[]) arg; HashMap result = (HashMap ) objects[0]; String[] success = (String[]) objects[1]; String[] fail = (String[]) objects[2]; System.out.println("Failds"); for (int i = 0; i < fail.length; i++) { String string = fail[i]; System.out.println(string); } System.out.println("-----------"); System.out.println("success"); for (int i = 0; i < success.length; i++) { String string = success[i]; System.out.println(string); } System.out.println("\n\nresult of Google: "); System.out.println(result.remove("http://www.google.com")); } } }); Thread t = new Thread(webReader); t.start(); t.join(); } }
AsyncWebReader Class
package com.ehsunbehravesh.asyncwebreader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Observable; import java.util.Observer; public class AsyncWebReader extends Observable implements Runnable, Observer { private int coundOfThreads; private WebReader[] readers; private int indicator; private String[] urlAddresses; private ArrayListsuccessfulAddresses; private ArrayList failedAddresses; private HashMap result; private boolean running; public AsyncWebReader(int coundOfThreads, String[] urlAddresses) throws Exception { this.coundOfThreads = coundOfThreads; this.urlAddresses = urlAddresses; if (coundOfThreads <= 0) { throw new Exception("Count of threads should be at least 1!"); } indicator = 0; } @Override public void run() { successfulAddresses = new ArrayList<>(); failedAddresses = new ArrayList<>(); result = new HashMap<>(); running = true; readers = new WebReader[Math.min(coundOfThreads, urlAddresses.length)]; for (int i = 0; i < readers.length; i++) { readers[i] = new WebReader(urlAddresses[indicator++]); readers[i].addObserver(this); new Thread(readers[i]).start(); } /* wait until all urls get fetched */ while (running && successfulAddresses.size() + failedAddresses.size() < urlAddresses.length) { try { Thread.sleep(500); } catch (InterruptedException ex) { setChanged(); notifyObservers(ex); } } String[] successfulAddressesArray = new String[successfulAddresses.size()]; successfulAddressesArray = successfulAddresses.toArray(successfulAddressesArray); String[] failedAddressesArray = new String[failedAddresses.size()]; failedAddressesArray = failedAddresses.toArray(failedAddressesArray); setChanged(); notifyObservers(new Object[] {result, successfulAddressesArray, failedAddressesArray}); } @Override public synchronized void update(Observable o, Object arg) { WebReader reader = (WebReader) o; if (arg instanceof Exception) { failedAddresses.add(reader.getUrlAddress()); } else if (arg instanceof String) { String content = (String) arg; String urlAddress = reader.getUrlAddress(); result.put(urlAddress, content); successfulAddresses.add(urlAddress); setChanged(); List list = new ArrayList<>(); list.add(urlAddress); list.add(content); notifyObservers(list); } if (indicator < urlAddresses.length) { for (int i = 0; i < readers.length; i++) { WebReader currentReader = readers[i]; if (currentReader == reader) { readers[i] = new WebReader(urlAddresses[indicator++]); readers[i].addObserver(this); new Thread(readers[i]).start(); break; } } } } public void stop() { running = false; } }
WebReader Class
package com.ehsunbehravesh.asyncwebreader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URL; import java.net.URLConnection; import java.util.Observable; /** * http://codetoearn.blogspot.com * * @author ehsun7b */ public class WebReader extends Observable implements Runnable { private String urlAddress; private boolean finished; public WebReader(String urlAddress) { this.urlAddress = urlAddress; } @Override public void run() { finished = false; InputStreamReader isr = null; try { URL url = new URL(urlAddress); URLConnection yc = url.openConnection(); isr = new InputStreamReader(yc.getInputStream()); char[] buffer = new char[1024]; int read = 0; StringBuilder doc = new StringBuilder(); while ((read = isr.read(buffer)) > 0) { doc.append(buffer, 0, read); } String content = doc.toString(); setChanged(); notifyObservers(content); } catch (Exception e) { setChanged(); notifyObservers(e); } finally { if (isr != null) { try { isr.close(); } catch (IOException ex) { setChanged(); notifyObservers(ex); } } finished = true; } } public String getUrlAddress() { return urlAddress; } public boolean isFinished() { return finished; } }
No comments:
Post a Comment