Java 中線程池是為了更好地管理、維護和調用線程,那么用什么來管理或者監(jiān)控線程池中的操作呢?下面我將和大家分享關于 Java 是怎么用線程工廠來監(jiān)控線程池的內容。
ThreadFactory
線程池中的線程從哪里來呢?就是ThreadFoctory
public interface ThreadFactory {
Thread newThread(Runnable r);
}
Threadfactory里面有個接口,當線程池中需要創(chuàng)建線程就會調用該方法,也可以自定義線程工廠
public class ThreadfactoryText {
public static void main(String[] args) {
Runnable runnable=new Runnable() {
@Override
public void run() {
int num=new Random().nextInt(10);
System.out.println(Thread.currentThread().getId()+"--"+System.currentTimeMillis()+"--睡眠"+num);
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//創(chuàng)建線程池 使用自定義線程工廠 采用默認的拒絕策略
ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r);
t.setDaemon(true);//設置為守護線程,當主線程運行結束,線程池中線程也會被釋放
System.out.println("創(chuàng)建了線程"+t);
return t;
}
});
//提交五個任務
for (int i = 0; i < 5; i++) {
executorService.submit(runnable);
}
}
}
當線程提交超過五個任務時,線程池會默認拋出異常
監(jiān)控線程池
ThreadPoolExcutor提供了一組方法用于監(jiān)控線程池
int getActiveCount()//獲得線程池只當前的獲得線程數(shù)量
long getCompletedTaskCount()//返回線程池完成任務數(shù)量
int getCorePoolSize()//線程池中核心任務數(shù)量
int getLargestPoolSize() //返回線程池中曾經達到線程的最大數(shù)
int getMaximumPoolSize()//返回線程池的最大容量
int getPoolSize()//返回線程大小
BlockingQueue<Runnable> getQueue()//返回阻塞隊列
long getTaskCount()//返回線程池收到任務總數(shù)
public class Text {
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId() + "線程開始執(zhí)行--" + System.currentTimeMillis());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//創(chuàng)建線程池 使用默認線程工廠 有界隊列 采用DiscardPolicy策略
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
//提交五個任務
for (int i = 0; i < 30; i++) {
executorService.submit(runnable);
System.out.println("當前線程核心線程數(shù)"+executorService.getCorePoolSize()+",最大線程數(shù):"+executorService.getMaximumPoolSize()+",當前線程池大?。?+executorService.getPoolSize()+"活動線程數(shù):"+executorService.getActiveCount()+",收到任務:"+executorService.getTaskCount()+"完成任務數(shù):"+executorService.getCompletedTaskCount()+"等待任務數(shù):"+executorService.getQueue().size());
TimeUnit.MILLISECONDS.sleep(500);
}
System.out.println("-------------------");
while (executorService.getActiveCount()>=0)//繼續(xù)對線程池進行檢測
{
System.out.println("當前線程核心線程數(shù)"+executorService.getCorePoolSize()+",最大線程數(shù):"+executorService.getMaximumPoolSize()+",當前線程池大小:"+executorService.getPoolSize()+"活動線程數(shù):"+executorService.getActiveCount()+",收到任務:"+executorService.getTaskCount()+"完成任務數(shù):"+executorService.getCompletedTaskCount()+"等待任務數(shù):"+executorService.getQueue().size());
Thread.sleep(1000);//每1秒檢測一次
}
}
}
當線程池大小達到了核心線程數(shù),線程會被放在等待隊列。當線程池等待隊列已滿會開啟新的線程。當當前線程大小達到最大線程數(shù),等待隊列也滿了,再提交的話會執(zhí)行DiscardPolicy策略,直接丟棄這個無法處理的任務,最后30個任務只剩下15個了。
原理如圖:
擴展線程池
有時候需要對線程池進行擴展,如在監(jiān)控每個任務開始和結束時間,或者自定義其他增強功能。
ThreadPoolExecutor線程池提供了兩個方法:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
線程池執(zhí)行某個任務前會執(zhí)行beforeExecute()方法,執(zhí)行后會調用afterExecute()方法
查看ThreadPoolExecutor源碼,在該類中定義了一個內部類Worker,ThreadPoolExecutor線程池的工作線程就是Worker類的實例,Worker實例在執(zhí)行時會調用beforeExecute與afterExecute方法。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
}
部分代碼已省略,線程執(zhí)行前會調用beforeExecute,執(zhí)行后會調用afterExecute方法。
擴展線程池示例
package com;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Text07 {
public static void main(String[] args) {
//定義擴展線程池 定義線程池類繼承ThreadPoolExecutor,然后重寫其他方法
ExecutorService threadPoolExecutor=
new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new LinkedBlockingDeque<>()){
//在內部類重寫開始方法
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(t.getId()+"線程準備執(zhí)行任務"+((Mytask)r).name);
}
//在內部類重寫結束方法
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(((Mytask)r).name+"執(zhí)行完成");
}
//線程池退出
@Override
protected void terminated() {
System.out.println("線程池退出");
}
};
for (int i = 0; i < 5; i++) {
Mytask mytask=new Mytask("Thread"+i);
threadPoolExecutor.execute(mytask);
}
}
private static class Mytask implements Runnable
{
private String name;
public Mytask(String name)
{
this.name=name;
}
@Override
public void run() {
System.out.println(name+"正在被執(zhí)行"+Thread.currentThread().getId());
try {
Thread.sleep(1000);//模擬任務時長
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
優(yōu)化線程池大小
線程池大小對系統(tǒng)性能有一定影響,過大或者過小都無法方法發(fā)揮系統(tǒng)最佳性能,不需要非常精確,只要避免極大或者極小就可以了,一般來說線程池大小大姚考慮CPU數(shù)量
線程池大小=CPU數(shù)量 * 目標CPU使用率*(1+等待時間與計算時間的比)
線程池死鎖
如果線程池執(zhí)行中,任務A在執(zhí)行過程中提交了任務B,任務B添加到線程池中的等待隊列,如果A的結束需要B的執(zhí)行結果,而B線程需要等待A線程執(zhí)行完畢,就可能會使其他所有工作線程都處于等待狀態(tài),待這些任務在阻塞隊列中執(zhí)行。線程池中沒有可以對阻塞隊列進行處理的線程,就會一直等待下去照成死鎖。
適合給線程池提交相互獨立的任務,而不是彼此依賴的任務,對于彼此依賴的任務,可以考慮分別提交給不同的線程池來處理。
線程池異常信息捕獲
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Text09 {
public static void main(String[] args) {
//創(chuàng)建線程池
ExecutorService executorService=new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
//向線程池中添加兩個數(shù)相處計算的任務
for (int i = 0; i <5 ; i++) {
executorService.submit(new Text(10,i));
}
}
private static class Text implements Runnable
{
private int x;
private int y;
public Text(int x,int y)
{
this.x=x;
this.y=y;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"線程x/y結果的為"+x+"/"+y+"="+(x/y));
}
}
}
可以看到只有四條結果,實際向線程池提交了五個任務,但是當i==0時,產生了算術異常,線程池把該異常吃掉了,導致我們對該異常一無所知
解決辦法:
1.把submit改為execute
2.對線程池進行擴展,對submit進行包裝
package com;
import java.util.concurrent.*;
public class Text09 {
public static void main(String[] args) {
//創(chuàng)建線程池 使用自定義的線程池
ExecutorService executorService=new TranceThreadPoorExcuter(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
//向線程池中添加兩個數(shù)相處計算的任務
for (int i = 0; i <5 ; i++) {
executorService.submit(new Text(10,i));
}
}
public static class Text implements Runnable
{
public int x;
public int y;
public Text(int x,int y)
{
this.x=x;
this.y=y;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"線程x/y結果的為"+x+"/"+y+"="+(x/y));
}
}
//自定義線程池類 對TranceThreadPoorExcuter進行擴展
private static class TranceThreadPoorExcuter extends ThreadPoolExecutor
{
public TranceThreadPoorExcuter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//定義一個方法用于傳入兩個參數(shù) 第一個是要接受的任務 第二個是Exception
public Runnable warp(Runnable r,Exception e)
{
return new Runnable() {
@Override
public void run() {
try {
r.run();
}
catch (Exception e1)
{
e.printStackTrace();
throw e1;
}
}
};
}
//重寫submit方法
@Override
public Future<?> submit(Runnable task) {
return super.submit(warp(task,new Exception("客戶跟蹤異常")));
}
//還可以重寫excute方法
}
}
此方法使用了自定義的線程池,重寫線程池中的submit方法,在submit方法中,把要傳入的任務參數(shù)帶一個捕獲異常信息的功能就可以捕獲線程池異常。
到此本篇關于用實例代碼展示 Java 如何使用線程工廠監(jiān)控線程池的文章就介紹到這了,想要了解更多相關 Java 線程的其他內容請搜索W3Cschool以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持我們!