Showing posts with label Multi Threading. Show all posts
Showing posts with label Multi Threading. Show all posts

Concurrent Web Requests with Thread Pooling

If we want to fetch the content of some web URLs concurrently we can use multi thread theqniques but something important is that when we are going to make a lot of web requests we should not consume all the CPU resource!
So one of the good designs could be implementing some kind of thread pools.

Download Source

Test Class

package com.ehsunbehravesh.asyncwebreader;

import java.util.HashMap;
import java.util.Observable;
import java.util.Observer;

public class Test2 {

  public static void main(String[] args) throws Exception {
    AsyncWebReader webReader = new AsyncWebReader(5/* threads */, new String[]{
              "http://www.google.com",
              "http://www.yahoo.com",
              "http://www.live.com",
              "http://www.wikipedia.com",
              "http://www.facebook.com",
              "http://www.khorasannews.com",
              "http://www.fcbarcelona.com",
              "http://www.khorasannews.com",
            });

    webReader.addObserver(new Observer() {
      @Override
      public void update(Observable o, Object arg) {
        if (arg instanceof Exception) {
          Exception ex = (Exception) arg;
          System.out.println(ex.getMessage());
        } /*else if (arg instanceof List) {
          List vals = (List) arg;
          System.out.println(vals.get(0) + ": " + vals.get(1));
        } */else if (arg instanceof Object[]) {
          Object[] objects = (Object[]) arg;
          HashMap result = (HashMap) objects[0];
          String[] success = (String[]) objects[1];
          String[] fail = (String[]) objects[2];

          System.out.println("Failds");
          for (int i = 0; i < fail.length; i++) {
            String string = fail[i];
            System.out.println(string);
          }

          System.out.println("-----------");
          System.out.println("success");
          for (int i = 0; i < success.length; i++) {
            String string = success[i];
            System.out.println(string);
          }
          
          System.out.println("\n\nresult of Google: ");
          System.out.println(result.remove("http://www.google.com"));
        }
      }
    });
    Thread t = new Thread(webReader);
    t.start();
    t.join();
  }
}


AsyncWebReader Class

package com.ehsunbehravesh.asyncwebreader;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Observable;
import java.util.Observer;

public class AsyncWebReader extends Observable implements Runnable, Observer {
  
  private int coundOfThreads;
  private WebReader[] readers;
  private int indicator;
  private String[] urlAddresses;
  private ArrayList successfulAddresses;
  private ArrayList failedAddresses;
  private HashMap result;
  
  private boolean running;
  
  public AsyncWebReader(int coundOfThreads, String[] urlAddresses) throws Exception {
    this.coundOfThreads = coundOfThreads;
    this.urlAddresses = urlAddresses;
    if (coundOfThreads <= 0) {
      throw new Exception("Count of threads should be at least 1!");
    }
    indicator = 0;
  }
  
  
  @Override
  public void run() {
    successfulAddresses = new ArrayList<>();
    failedAddresses = new ArrayList<>();
    result = new HashMap<>();
    running = true;
    readers = new WebReader[Math.min(coundOfThreads, urlAddresses.length)];
    
    for (int i = 0; i < readers.length; i++) {
      readers[i] = new WebReader(urlAddresses[indicator++]);
      readers[i].addObserver(this);
      new Thread(readers[i]).start();
    }
    
    /* wait until all urls get fetched */
    
    while (running && 
            successfulAddresses.size() + failedAddresses.size() 
            < urlAddresses.length) {
      try {        
        Thread.sleep(500);        
      } catch (InterruptedException ex) {
        setChanged();
        notifyObservers(ex);
      }
    }
        
    
    String[] successfulAddressesArray = new String[successfulAddresses.size()];
    successfulAddressesArray = successfulAddresses.toArray(successfulAddressesArray);
    
    String[] failedAddressesArray = new String[failedAddresses.size()];
    failedAddressesArray = failedAddresses.toArray(failedAddressesArray);
    
    setChanged();
    notifyObservers(new Object[] {result, successfulAddressesArray, failedAddressesArray});    
  }
  
  @Override
  public synchronized void update(Observable o, Object arg) {
    WebReader reader = (WebReader) o;
    if (arg instanceof Exception) {      
      failedAddresses.add(reader.getUrlAddress());
    } else if (arg instanceof String) {      
      String content = (String) arg;
      String urlAddress = reader.getUrlAddress();
      result.put(urlAddress, content);
      successfulAddresses.add(urlAddress);
      setChanged();
      List list = new ArrayList<>();
      list.add(urlAddress);
      list.add(content);
      notifyObservers(list);
    }
    
    if (indicator < urlAddresses.length) {
      for (int i = 0; i < readers.length; i++) {
        WebReader currentReader = readers[i];
        if (currentReader == reader) {
          readers[i] = new WebReader(urlAddresses[indicator++]);
          readers[i].addObserver(this);
          new Thread(readers[i]).start();
          break;
        }
      }
    }
  }
  
  public void stop() {
    running = false;
  }  
}


WebReader Class

package com.ehsunbehravesh.asyncwebreader;

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.Observable;

/**
 * http://codetoearn.blogspot.com
 *
 * @author ehsun7b
 */
public class WebReader extends Observable implements Runnable {

  private String urlAddress;  
  private boolean finished;

  public WebReader(String urlAddress) {
    this.urlAddress = urlAddress;
  }

  @Override
  public void run() {
    finished = false;
    InputStreamReader isr = null;
    try {
      URL url = new URL(urlAddress);
      URLConnection yc = url.openConnection();
      isr = new InputStreamReader(yc.getInputStream());

      char[] buffer = new char[1024];
      int read = 0;
      StringBuilder doc = new StringBuilder();
      while ((read = isr.read(buffer)) > 0) {
        doc.append(buffer, 0, read);
      }
      String content = doc.toString();
      setChanged();
      notifyObservers(content);
    } catch (Exception e) {
      setChanged();
      notifyObservers(e);
    } finally {
      if (isr != null) {
        try {
          isr.close();
        } catch (IOException ex) {
          setChanged();
          notifyObservers(ex);
        }
      }
      finished = true;
    }
  }

  public String getUrlAddress() {
    return urlAddress;
  }    

  public boolean isFinished() {
    return finished;
  }    
}

Multi Thread TCP Socket Programming

In the previous tutorial we have learned how to implement a very basic TCP communication between a client and a server.
But as you mentioned the server is capable to handle only one client at the time.

So, we show you how to write a more advanced multi-threaded Server to handle several clients simultaneously.

Doing some tasks simultaneously (handling several clients) means that we should design our server in a multi-threaded fashion.

Handling a clients means:
  • sending message to the client
  • receiving clients messages
And we should do this for more than one client at the time plus we still should listen to the port for potential new clients!

Listening to the port will happen in the main Server program thread but for handling individual clients we write the CliendHandler class which implements runnable interface.

In java for creating a thread we have 2 ways:
  1. Extending Thread class (our class inherits from Thread class)
  2. Implementing Runnable interface

We choose the second way in this example since it is a cleaner approach (my opinion).

ClientHandler class
package com.blogspot.codetoearn.advancedtcpsocket.server;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
 * http://codetoearn.blogspot.com/
 *
 * @author ehsun7b
 */
public class ClientHandler implements Runnable {

  private Socket clientSocket;

  public ClientHandler(Socket clientSocket) {
    this.clientSocket = clientSocket;
  }    

  @Override
  public void run() {
    try {
      /* getting input and output streams of the client socket */
      OutputStream outputStream = clientSocket.getOutputStream();
      InputStream inputStream = clientSocket.getInputStream();

      /* sending welcome message to the client */
      String message = "Welcome! You are connected.\n";
      outputStream.write(message.getBytes());
      outputStream.flush();

      /* getting the client reply */
      int character = inputStream.read();

      while (character != -1) {
        System.out.print((char) character);
        character = inputStream.read();
      }

      inputStream.close();
      outputStream.close();
    } catch (Exception ex) {
      System.out.println("Error: " + ex.getMessage());
    }
  }
}

So in the server class, the ServerSocket object accepts any incoming connection and create on ClientHandler object and give it the new socket.

Server class
package com.blogspot.codetoearn.advancedtcpsocket.server;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * http://codetoearn.blogspot.com/ 
 * @author ehsun7b
 */
public class Server {

  public static final int port = 12345;
  private ServerSocket serverSocket;

  public void listen() {
    try {
      /* creating the serverSocket object */
      serverSocket = new ServerSocket(port);
      
      while (true) {
        Socket socket = serverSocket.accept();
        /* creating a client handler and give the socket to it*/
        ClientHandler clientHandler = new ClientHandler(socket);
        Thread thread = new Thread(clientHandler);
        thread.start();
      }
    } catch (IOException ex) {
      System.out.println("Error: " + ex.getMessage());
    }
  }
  
  public static void main(String[] args) {
    new Server().listen();
  }
}

And finally our client which a simple TCP socket client like the one we have seen in the last example. But we use some simple mechanism to generate 100 random messages to the server from each Client object, to have a simple simulation.

Client class:

package com.blogspot.codetoearn.advancedtcpsocket.client;

import com.blogspot.codetoearn.advancedtcpsocket.utils.Utils;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
 * http://codetoearn.blogspot.com/
 * @author ehsun7b
 */
public class Client {
  
  private int port;
  private String host;
  private String name;
  private Socket socket;

  public Client(int port, String host, String name) {
    this.port = port;
    this.host = host;
    this.name = name;
  }
  
  public void Connect() {
    try {
      /* creating the socket and trying to connect */
      socket = new Socket(host, port);

      /* getting input and output streams of the socket */
      OutputStream outputStream = socket.getOutputStream();
      InputStream inputStream = socket.getInputStream();

      /* getting the server message */
      int character = inputStream.read();

      while (character != -1 && character != '\n') {
        System.out.print((char) character);
        character = inputStream.read();        
      }
      
      /* sending 100 random messages to the server */
      String[] messages = Utils.randomMessages(100);
      
      for (int i = 0; i < messages.length; i++) {
        String msg = name + ": " + messages[i];        
        outputStream.write(msg.getBytes());
        outputStream.flush();
        Thread.sleep(1000); // wait for 1 second
      }

      inputStream.close();
      outputStream.close();
    } catch (Exception ex) {
      System.out.println("Error: " + ex.getMessage());
    }
  }
  
  /* client program */ 
  public static void main(String[] args) {
    new Thread(new Runnable() {

      @Override
      public void run() {
        new Client(12345, "localhost", "client1").Connect();
      }
    }).start();
    
    new Thread(new Runnable() {

      @Override
      public void run() {
        new Client(12345, "localhost", "client2").Connect();
      }
    }).start();
  }
}

Download the complete code here!