Concurrent Web Requests with Thread Pooling

If we want to fetch the content of some web URLs concurrently we can use multi thread theqniques but something important is that when we are going to make a lot of web requests we should not consume all the CPU resource!
So one of the good designs could be implementing some kind of thread pools.

Download Source

Test Class

package com.ehsunbehravesh.asyncwebreader;

import java.util.HashMap;
import java.util.Observable;
import java.util.Observer;

public class Test2 {

  public static void main(String[] args) throws Exception {
    AsyncWebReader webReader = new AsyncWebReader(5/* threads */, new String[]{
              "http://www.google.com",
              "http://www.yahoo.com",
              "http://www.live.com",
              "http://www.wikipedia.com",
              "http://www.facebook.com",
              "http://www.khorasannews.com",
              "http://www.fcbarcelona.com",
              "http://www.khorasannews.com",
            });

    webReader.addObserver(new Observer() {
      @Override
      public void update(Observable o, Object arg) {
        if (arg instanceof Exception) {
          Exception ex = (Exception) arg;
          System.out.println(ex.getMessage());
        } /*else if (arg instanceof List) {
          List vals = (List) arg;
          System.out.println(vals.get(0) + ": " + vals.get(1));
        } */else if (arg instanceof Object[]) {
          Object[] objects = (Object[]) arg;
          HashMap result = (HashMap) objects[0];
          String[] success = (String[]) objects[1];
          String[] fail = (String[]) objects[2];

          System.out.println("Failds");
          for (int i = 0; i < fail.length; i++) {
            String string = fail[i];
            System.out.println(string);
          }

          System.out.println("-----------");
          System.out.println("success");
          for (int i = 0; i < success.length; i++) {
            String string = success[i];
            System.out.println(string);
          }
          
          System.out.println("\n\nresult of Google: ");
          System.out.println(result.remove("http://www.google.com"));
        }
      }
    });
    Thread t = new Thread(webReader);
    t.start();
    t.join();
  }
}


AsyncWebReader Class

package com.ehsunbehravesh.asyncwebreader;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Observable;
import java.util.Observer;

public class AsyncWebReader extends Observable implements Runnable, Observer {
  
  private int coundOfThreads;
  private WebReader[] readers;
  private int indicator;
  private String[] urlAddresses;
  private ArrayList successfulAddresses;
  private ArrayList failedAddresses;
  private HashMap result;
  
  private boolean running;
  
  public AsyncWebReader(int coundOfThreads, String[] urlAddresses) throws Exception {
    this.coundOfThreads = coundOfThreads;
    this.urlAddresses = urlAddresses;
    if (coundOfThreads <= 0) {
      throw new Exception("Count of threads should be at least 1!");
    }
    indicator = 0;
  }
  
  
  @Override
  public void run() {
    successfulAddresses = new ArrayList<>();
    failedAddresses = new ArrayList<>();
    result = new HashMap<>();
    running = true;
    readers = new WebReader[Math.min(coundOfThreads, urlAddresses.length)];
    
    for (int i = 0; i < readers.length; i++) {
      readers[i] = new WebReader(urlAddresses[indicator++]);
      readers[i].addObserver(this);
      new Thread(readers[i]).start();
    }
    
    /* wait until all urls get fetched */
    
    while (running && 
            successfulAddresses.size() + failedAddresses.size() 
            < urlAddresses.length) {
      try {        
        Thread.sleep(500);        
      } catch (InterruptedException ex) {
        setChanged();
        notifyObservers(ex);
      }
    }
        
    
    String[] successfulAddressesArray = new String[successfulAddresses.size()];
    successfulAddressesArray = successfulAddresses.toArray(successfulAddressesArray);
    
    String[] failedAddressesArray = new String[failedAddresses.size()];
    failedAddressesArray = failedAddresses.toArray(failedAddressesArray);
    
    setChanged();
    notifyObservers(new Object[] {result, successfulAddressesArray, failedAddressesArray});    
  }
  
  @Override
  public synchronized void update(Observable o, Object arg) {
    WebReader reader = (WebReader) o;
    if (arg instanceof Exception) {      
      failedAddresses.add(reader.getUrlAddress());
    } else if (arg instanceof String) {      
      String content = (String) arg;
      String urlAddress = reader.getUrlAddress();
      result.put(urlAddress, content);
      successfulAddresses.add(urlAddress);
      setChanged();
      List list = new ArrayList<>();
      list.add(urlAddress);
      list.add(content);
      notifyObservers(list);
    }
    
    if (indicator < urlAddresses.length) {
      for (int i = 0; i < readers.length; i++) {
        WebReader currentReader = readers[i];
        if (currentReader == reader) {
          readers[i] = new WebReader(urlAddresses[indicator++]);
          readers[i].addObserver(this);
          new Thread(readers[i]).start();
          break;
        }
      }
    }
  }
  
  public void stop() {
    running = false;
  }  
}


WebReader Class

package com.ehsunbehravesh.asyncwebreader;

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.Observable;

/**
 * http://codetoearn.blogspot.com
 *
 * @author ehsun7b
 */
public class WebReader extends Observable implements Runnable {

  private String urlAddress;  
  private boolean finished;

  public WebReader(String urlAddress) {
    this.urlAddress = urlAddress;
  }

  @Override
  public void run() {
    finished = false;
    InputStreamReader isr = null;
    try {
      URL url = new URL(urlAddress);
      URLConnection yc = url.openConnection();
      isr = new InputStreamReader(yc.getInputStream());

      char[] buffer = new char[1024];
      int read = 0;
      StringBuilder doc = new StringBuilder();
      while ((read = isr.read(buffer)) > 0) {
        doc.append(buffer, 0, read);
      }
      String content = doc.toString();
      setChanged();
      notifyObservers(content);
    } catch (Exception e) {
      setChanged();
      notifyObservers(e);
    } finally {
      if (isr != null) {
        try {
          isr.close();
        } catch (IOException ex) {
          setChanged();
          notifyObservers(ex);
        }
      }
      finished = true;
    }
  }

  public String getUrlAddress() {
    return urlAddress;
  }    

  public boolean isFinished() {
    return finished;
  }    
}

No comments:

Post a Comment