Thread pool details ThreadPoolExecutor

1. Explanation of the ThreadPoolExecutor class

1. [Thread pool] status:

Five states:

  • The shutdown() method of the thread pool to convert the thread pool from RUNNING (running state) to SHUTDOWN state

  • The shutdownNow()method of the thread pool to convert the thread pool from the RUNNING or SHUTDOWN state to the STOP state.

Note: SHUTDOWN The state and STOP state will change to the  TIDYING state first, and both will eventually become TERMINATED

2. ThreadPoolExecutor [constructor] :

ThreadPoolExecutorInherited from AbstractExecutorService, while AbstractExecutorServiceimplementing the ExecutorServiceinterface.

Next, we will explain the meaning of these parameters respectively.

2.1) The working principle of thread pool:

  • corePoolSize : The maximum number of core threads in the thread pool

  • maximumPoolSize : The maximum number of threads that can exist in the thread pool

  • workQueue: a blocking queue for caching tasks

When calling the thread pool execute() method to add a task, the thread pool will make the following judgments:

  • If there is an idle thread, execute the task directly;

  • If there are no idle threads and the number of currently running threads is less than corePoolSizethat, create a new thread to execute the task;

  • If there are no idle threads, and the current number of threads is equal corePoolSize, and the blocking queue is not full, the task will be queued without adding new threads;

  • If there are no idle threads, and the blocking queue is full, and the number of threads in the pool is less than maximumPoolSize that, create a new thread to execute the task;

  • If there are no idle threads, and the blocking queue is full, and the number of threads in the pool is equal to maximumPoolSize , new tasks are rejected according to the policy specified by the handler in the constructor.

2.2)KeepAliveTime:

  • keepAliveTime : Indicates the survival time of idle threads

  • TimeUnit unit : indicates the unit of keepAliveTime

When a thread has nothing to do and exceeds a certain time (keepAliveTime), the thread pool will judge that if the number of currently running threads is greater than  corePoolSizethat, the thread will be stopped. So after all tasks of the thread pool are done, it will eventually shrink to  corePoolSize the size.

Note: If the thread pool allowCoreThreadTimeoutparameter is set to true (default false), then keepaliveTimestop directly when the idle thread exceeds. (It will not judge whether the number of threads is greater than or not corePoolSize) That is: the final number of threads will become 0.

2.3) workQueue task queue:

  • workQueue : It determines the queuing strategy for cached tasks

  • ThreadPoolExecutorThe thread pool recommends three kinds of waiting queues, which are: SynchronousQueue , LinkedBlockingQueue and  ArrayBlockingQueue.

1) Bounded queue:

  • SynchronousQueue : A blocking queue that does not store elements, each insert operation must wait until another thread calls the remove operation, otherwise the insert operation has been blocked, and the throughput is usually higher . The LinkedBlockingQueuestatic factory method Executors.newCachedThreadPool uses this queue.

  • ArrayBlockingQueue: A bounded blocking queue backed by an array. This queue sorts the elements on a FIFO (first in, first out) principle. Once such a buffer is created, its capacity cannot be increased. Attempting to put an element into a full queue will cause the operation to block; attempting to extract an element from an empty queue will similarly block.

2) Unbounded queue:

  • LinkedBlockingQueue: Unbounded blocking queue based on linked list structure, it can specify capacity or not (actually any queue/stack with infinite capacity has capacity, this capacity is Integer.MAX_VALUE)

  • PriorityBlockingQueue: is an unbounded blocking queue with internal elements sorted by priority. Elements in the queue must implement the Comparable interface in order to compareTo()be sorted by implementing methods. The element with the highest priority will always be at the head of the queue; PriorityBlockingQueue the ordering of elements of the same priority is not guaranteed.

Note: It is related to the type of and keepAliveTime. If it is unbounded, it will never trigger , and naturally it will have no meaning.maximumPoolSize

2.4)threadFactory:

threadFactory : Specifies the factory that creates the thread. (can not be specified)

If no thread factory is specified, ThreadPoolExecutor it will be used to ThreadPoolExecutor.defaultThreadFactory create a thread. Threads created by the default factory: both belong to the same thread group, have the same  Thread.NORM_PRIORITY priority, and pool-XXX-thread-have a thread name named ” ” (XXX is the sequence number when the thread was created), and the created threads are all non-daemon processes.

2.5) handler rejection strategy:

handler : Indicates   the policy adopted when the thread pool refuses to add new tasks when it workQueue is full and the number of threads in the pool is reached  . maximumPoolSize(can not be specified)

The most scientific is the processing method provided by AbortPolicy: throw an exception and handle it by the developer.

3. Common methods:

In addition to specifying the values ​​of the above parameters when the thread pool is created, it can also be set by the following methods after the thread pool is created.

In addition, there are some methods:

  • getCorePoolSize(): Returns the number of core threads in the thread pool, this value is always unchanged, and returns the coreSize size set in the constructor;

  • getMaximumPoolSize(): Returns the maximum number of threads in the thread pool, this value remains unchanged, and returns the coreSize size set in the constructor;

  • getLargestPoolSize(): Records the maximum number of threads that have ever appeared (water mark);

  • getPoolSize(): The number of current threads in the thread pool;

  • prestartAllCoreThreads(): All core threads will be started, regardless of whether there are tasks to be executed, the thread pool will create new threads until the number of threads in the pool reaches corePoolSize;

  • prestartCoreThread(): will start a core thread (same as above);

  • allowCoreThreadTimeOut(true): Allow the core thread to exit after the KeepAliveTime time;

4. Executors class:

The underlying implementation of the Executors class is ThreadPoolExecutor! Executors factory methods are:

  • Executors.newCachedThreadPool(): Unbounded thread pool for automatic thread recycling

  • Executors.newFixedThreadPool(int): Fixed size thread pool

  • Executors.newSingleThreadExecutor(): a single background thread

They have predefined settings for most usage scenarios. However, it is stated in the Ali java documentation that try not to use this class to create a thread pool.

Second, the thread pool related interface introduction:

1. ExecutorService interface:

This interface is the real thread pool interface. The above ThreadPoolExecutorand below ScheduledThreadPoolExecutorare the implementation classes of this interface. Common methods of changing the interface:

  • Future<?> submit(Runnable task): Submit the Runnable task to the thread pool and return the Future object, because the Runnable has no return value, that is to say, calling the Future object get() method returns null;

  • <T> Future<T> submit(Callable<T> task): Submit the Callable task to the thread pool, return the Future object, and call the Future object get() method to obtain the return value of the Callable;

  • <T> Future<T> submit(Runnable task,T result): Submit the Runnable task to the thread pool, return the Future object, and call the Future object get() method to obtain the parameter value of the Runnable;

  • invokeAll(collection of tasks)/invokeAll(collection of tasks, long timeout, TimeUnit unit): invokeAll will add all Futures to the returned collection in the order in which they are in the task collection. This method is a blocking method. The invokeAll method returns only when all tasks have been executed, or when the calling thread is interrupted, or when the specified time limit has elapsed. When invokeAll returns, each task either returns or cancels. At this time, the client can call get/isCancelled to determine the specific situation.

  • invokeAny(collection of tasks)/invokeAny(collection of tasks, long timeout, TimeUnit unit): The blocking method does not return a Future object, but returns the result of a Callable object in the collection, and there is no guarantee which Callable is the result returned after the call. If a task finishes running or throws an exception, the method will cancel other The implementation of the Callable. The difference from invokeAll is that as long as one task is executed, the result will be returned, and other unexecuted tasks will be cancelled; also, it also has a timeout function;

  • shutdown(): Close the service after completing the submitted tasks and no longer accept new tasks;

  • shutdownNow(): stop all running tasks and shut down the service;

  • isTerminated(): Test whether all tasks have been executed;

  • isShutdown(): Tests whether the ExecutorService has been shut down.

1.1) Example of submit method:

We know that there are the following three main methods in the thread pool interface, let’s take a look at a specific example:

1)Callable:

public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS, 
   new ArrayBlockingQueue<Runnable>(50),  
   new ThreadFactory(){ public Thread newThread(Runnable r) {
                return new Thread(r, "schema_task_pool_" + r.hashCode());
            }}, new ThreadPoolExecutor.DiscardOldestPolicy());
 
public static void callableTest() {
 int a = 1;
 //callable
 Future<Boolean> future = threadPool.submit(new Callable<Boolean>(){
  @Override
  public Boolean call() throws Exception {
   int b = a + 100;
   System.out.println(b);
   return true;
  }
 });
 try {
  System.out.println("feature.get");
  Boolean boolean1 = future.get();
  System.out.println(boolean1);
 } catch (InterruptedException e) {
  System.out.println("InterruptedException...");
  e.printStackTrace();
 } catch (ExecutionException e) {
  System.out.println("execute exception...");
  e.printStackTrace();
 } 
}

2)Runnable:

public static void runnableTest() {
 int a = 1;
 //runnable
 Future<?> future1 = threadPool.submit(new Runnable(){
  @Override
  public void run() {
   int b = a + 100;
   System.out.println(b);
  }
 });
 try {
  System.out.println("feature.get");
  Object x = future1.get(900,TimeUnit.MILLISECONDS);
  System.out.println(x);//null
 } catch (InterruptedException e) {
  e.printStackTrace();
 } catch (ExecutionException e) {
  System.out.println("execute exception...");
  e.printStackTrace();
 } catch (TimeoutException e) {
  e.printStackTrace();
 }
}

3)Runnable+result:

class RunnableTask implements Runnable {
 Person p;
 RunnableTask(Person p) {
  this.p = p;
 }
 
 @Override
 public void run() {
  p.setId(1);
  p.setName("Runnable Task...");
 }
}
class Person {
 private Integer id;
 private String name;
 
 public Person(Integer id, String name) {
  super();
  this.id = id;
  this.name = name;
 }
 public Integer getId() {
  return id;
 }
 public void setId(Integer id) {
  this.id = id;
 }
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 @Override
 public String toString() {
  return "Person [id=" + id + ", ]";
 }
}
 
public static void runnableTest2() {
 //runnable + result
 Person p = new Person(0,"person");
 Future<Person> future2 = threadPool.submit(new RunnableTask(p),p);
 try {
  System.out.println("feature.get");
  Person person = future2.get();
  System.out.println(person);
 } catch (InterruptedException e) {
  e.printStackTrace();
 } catch (ExecutionException e) {
  e.printStackTrace();
 }
}

1.2) When the thread pool executes, what happens after the call method of Callable (run method of Runnable) throws an exception?

In the above example, we can see that whether the thread pool is executing Callableor Runnablecalling the returned Future object get()method, two exceptions need to be handled (if the get(timeout)method is called, three exceptions need to be handled), as follows:

// run on the thread pool
Future<Object> future = threadPool.submit(callable);
try {
 System.out.println("feature.get");
 Object x = future.get(900,TimeUnit.MILLISECONDS);
 System.out.println(x);
} catch (InterruptedException e) {
 e.printStackTrace();
} catch (ExecutionException e) {
 System.out.println("execute exception...");
 e.printStackTrace();
} catch (TimeoutException e) {
 e.printStackTrace();
}

  • If the get method is interrupted, InterruptedExceptionan exception is entered;

  • If an exception is thrown in the thread execution process (call, run method), the exception is entered ExecutionException;

  • If the get method times out, enter TimeoutExceptionan exception;

1.3) Difference between submit() and execute() methods:

ExecutorService, ScheduledExecutorServicethe interface submit()and the execute()method all submit tasks to the thread pool, but the difference between the two is that

  • The received parameters are different, executeonly the Runnabletype submitcan be received , Runnableand the Callabletwo types can be received;

  • submitThere is a return value, but executeno return value; submiteasy to Exceptionhandle;

1) The internal implementation of the submit method:

In fact, submitthere is nothing mysterious about the method. It is to encapsulate our task into an RunnableFutureinterface (inheriting the Runnable and Future interfaces), and then call the executemethod. Let’s look at the source code:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task,  null );   // Convert to RunnableFuture, the result passed is null
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

2) Internal implementation of newTaskFor method:

newTaskForThe method is new and a FutureTaskreturn, so the three methods are actually converting the task into  , FutureTaskif the task is Callable, it will be assigned directly, and if it is, it will be assigned again.Runnable

When the submitparameter is Callable :

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;      
    }

When the submitparameter is Runnable:

   // In order, call 
    protected  <T>  RunnableFuture<T>  newTaskFor (Runnable runnable, T value)  {
         return  new  FutureTask<T>(runnable, value);
    }
    public FutureTask(Runnable runnable, V result) {
        this.state = NEW; 
    }
   //Below ExecutorsMethod 
    public  static  <T>  Callable<T>  callable (Runnable task, T result)  {
         if  (task ==  null )
             throw  new  NullPointerException();
         return  new  RunnableAdapter<T>(task, result) ;
    }
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {   
            task.run();
            return result;
        }
    }

After reading the source code, the mystery is revealed. It is because the result Futureneeds to be returned, so the internal task must be Callable. If the task is, Runnable it will be a different day, and a vest will be put on Runnable the outside Callable, and the returned result will be written when it is constructed.

Reference: /liuxiao723846/article/details/108024212

1.4) ScheduledExecutorService interface:

Inheritance ExecutorService, and provides the function of executing tasks according to time. The methods it provides mainly include:

  • schedule(task, initDelay): Schedule the submitted Callable or Runnable task to be executed after the time specified by initDelay;

  • scheduleAtFixedRate(): Schedule the submitted Runnable task to be repeated at the specified interval;

  • scheduleWithFixedDelay(): Schedule the submitted Runnable task to be executed repeatedly after each execution, waiting for the time specified by delay;

Note: The implementation class of this interface is ScheduledThreadPoolExecutor.

2. Callable interface:

After jdk1.5, you can create threads in the following ways:

  • Inherit Threadclass, implement void run()method;

  • Implement Runnableinterface, implement void run()method;

  • implement Callableinterface, implement V call() Throws Exceptionmethod

1) The difference between Callable and Runnale interfaces:

  • CallableExceptions can be thrown, and Futurein FutureTaskconjunction with, can be used to obtain the results of asynchronous execution;

  • RunnableNo results are returned, exceptions can only be digested internally;

2) The method of executing the thread of Callable can be done in the following two ways:

  • By means FutureTaskof Threadthe startmethod used to perform;

  • Join the thread pool, use the thread poolexecute or submitexecute;

Note: CallableThread cannot be used directly to execute;

We all know that Callablewith a return value, what if we don’t need a return value but want to use Callableit?

There is a Void type (capital V) in jdk, but it must be there return null.

threadpool.submit(new Callable<Void>() {
    @Override
    public Void call() {
        //...
        return null;
    }
});

3) The Runnable interface can be converted into a Callable interface through the Executors tool class:

ExecutorsThe callablemethod in can be Runnableconverted into Callable, as follows:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
}

RunnableAdapterThe class has seen the source code above. The principle is to use the return value result as a member variable and pass it in through parameters, thereby realizing the Runnablereturn value.

Example:

public static void test5() {
     Person p = new Person(0,"person");
     RunnableTask runnableTask =  new  RunnableTask(p); //Create runnable 
     Callable<Person> callable = Executors.callable(runnableTask,p); //Convert 
     Future<Person> future1 = threadPool.submit(callable); //Execute on thread pool Callable 
     try  {
   Person person = future1.get();
   System.out.println(person);
  } catch (InterruptedException | ExecutionException e) {
   e.printStackTrace();
  }
     
   @Override
   public void run() {
    
   }
     };
     Callable<Object> callable2 = Executors.callable(runnable); //Convert 
     Future<Object> future2 = threadPool.submit(callable2); //Execute Callable on thread pool 
     try  {
      Object o = future2.get();
   System.out.println(o);
  } catch (InterruptedException | ExecutionException e) {
   e.printStackTrace();
  }
}

3. Future interface:

3.1) Future is an interface used to obtain asynchronous calculation results. Common methods are:

  • boolean cancel(boolean mayInterruptIfRunning): An attempt was made to cancel the execution of this task. This attempt will fail if the task has completed, or has been canceled, or cannot be canceled for some other reason. When cancel is called, if the call is successful and the task has not yet started, the task will never run. If the task has already started, the  mayInterruptIfRunning parameter determines whether the thread executing the task should be interrupted in an attempt to stop the task. After this method returns,  isDone() subsequent calls to will always return true. If this method returns true,  isCancelled() subsequent calls to will always return true.

  • boolean isCancelled(): Returns true if the task was canceled before it completed normally.

  • boolean isDone(): Returns true if the task has completed, possibly due to normal termination, exception, or cancellation, in all of these cases this method will return true.

  • V get()throws InterruptedException,ExecutionException: Get the asynchronous result, this method will block until the calculation is completed;

  • V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException: Get the asynchronous result. This method will block for the specified time until the calculation is completed. After the timeout, a timeout exception will be thrown.

Through method analysis, we also know that Future actually provides 3 functions:

  • Ability to interrupt tasks in progress;

  • Determine whether the task is completed;

  • Get the result after the task is executed.

But Future is just an interface, we can’t create objects directly, so we need its implementation class FutureTask to appear.

3.2) FutureTask class:

1) Implementation of the FutureTask class:

public class FutureTask<V> implements RunnableFuture<V> {
//...
}
 
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTaskImplemented Runnable, Futuretwo interfaces. Since it ‘s FutureTaskimplemented Runnable, it can either Threadbe executed directly by wrapping it, or it can be submitted to ExecuteService. And you can also get the execution result directly through get()the function, which will block until the result is returned.

So it ‘s FutureTaskboth Futurea , Runnableand a wrapper Callable(if it’s a Runnable that will eventually be converted to a Callable), it’s a combination of the two.

2) Constructor of FutureTask:

public FutureTask(Callable<V> callable) {
 
}
 
public FutureTask(Runnable runnable, V result) {
 
}

3.3) Example: (FutureTask two constructors, and running on Thread and thread pool)

1) The Callable wrapped by FutureTask is executed on Thread and thread pool:

public static void test3() {
  int a = 1,b = 2;
  Callable<Integer> callable = new Callable<Integer>() {
   @Override
   public Integer call() throws Exception {
    return a + b;
   }
  };
  //Execute Callable 
  FutureTask<Integer> futureTask =  new  FutureTask<>(callable) through futureTask;
  
  //1. Use Thread to execute thread 
  new  Thread(futureTask).start();
   try  {
   Integer integer = futureTask.get();
   System.out.println(integer);
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (ExecutionException e) {
   e.printStackTrace();
  }
  
  //2. Use thread pool to execute thread 
  Executors.newFixedThreadPool( 1 ).submit(futureTask);
  threadPool.shutdown();
  try {
   Integer integer = futureTask.get();
   System.out.println(integer);
  } catch (InterruptedException | ExecutionException e) {
   e.printStackTrace();
  } 
 }

2) Runnable wrapped by FutureTask is executed on Thread and thread pool:

public static void test4() {
  Person p = new Person(0,"person");
  RunnableTask runnableTask = new RunnableTask(p);
  
  //Create futureTask to execute Runnable 
  FutureTask<Person> futureTask =  new  FutureTask<>(runnableTask,p);
  
  //1. Use Thread to execute thread 
  new  Thread(futureTask).start();
   try  {
   Person x = futureTask.get();
   System.out.println(x);
  } catch (InterruptedException | ExecutionException e) {
   e.printStackTrace();
  } 
  
  //2. Use the thread pool to execute the thread
  threadPool.submit(futureTask);
  threadPool.shutdown();
  try {
   Person y = futureTask.get();
   System.out.println(y);
  } catch (InterruptedException | ExecutionException e) {
   e.printStackTrace();
  }
 }

Person and RunnableTask are the same as in the above example.

Leave a Comment

Your email address will not be published. Required fields are marked *