查看原文
其他

Java多线程之JUC

树莓蛋黄派 Java规途 2023-07-04
  • 线程和进程

    • 进程-系统调度的基本单位

    • 线程-CPU调度的基本单位

  • 并发和并行

  • Lock锁(重点)

    • Synchronized与Lock的区别

    • 生产者和消费者问题

  • 锁是什么?如何判断锁的是谁?(八锁现象)

    • 示例1:一个类对象,两个同步方法

    • 示例2:两个对象,两个同步方法,一个普通方法

    • 示例3:两个静态同步方法,两个对象

    • 示例4:一个静态同步方法,一个同步方法,一个对象

  • 集合类不安全

    • List不安全

    • set不安全

    • Map不安全

  • Callable

  • 常用的辅助类

    • CountDownLatch(辅助类)

    • CyclicBarrier

    • Semaphore

  • 读写锁 ReadWriteLock

  • 阻塞队列

    • **四组API**

    • SynchronousQueue 同步队列

  • 线程池(重点)

    • 线程池的优势

    • 三大方法

    • 七大参数

    • CPU密集型和IO密集型---定义最大线程

  • 四大函数式接口

    • **函数型接口Function**

    • **Predicate接口**

    • **Consumer消费型接口**

    • **supplier供给型接口**

  • 流式Stream计算

    • 什么是Stream流式计算?

  • ForkJoin

  • 异步回调

  • JMM

    • 关于JMM的一些约定

    • 八种操作

    • 八种操作的规则

  • Volatile

    • 保证可见性

    • 不保证原子性

    • 防止指令重排

  • 单例模式

    • 饿汉式

    • DCL懒汉式

    • 静态内部类单例模式

    • 枚举

  • 深入理解CAS

    • **CAS**

    • ABA问题

  • 原子引用

  • 各种锁的理解:

    • 公平锁、非公平锁

    • 可重入锁

    • 自旋锁

    • 死锁

什么是JUC

java.util工具包、分类

业务:普通的代码,Thread Runnable:没有返回值,效率相比Callable低

线程和进程

进程-系统调度的基本单位

  • 进程:是一个「程序」,程序的集合。
  • 一个进程往往可以包含多个线程,至少包含一个。

线程-CPU调度的基本单位

  • Java默认有两个线程,一个是main,一个是gc线程。
  • 对于Java而言:Thread,Runnable,Callable接口
  • Java本身是没有权限开启线程名,需要调用本地方法来开启线程。调用底层的C++

并发和并行

  1. 多个线程同时操作同一个资源叫做并发。
  2. 多个线程并行操作多个资源叫做并行。
  3. 并发编程的本质:「充分利用CPU的资源」

线程的6个状态:新建,可运行,阻塞,等待,计时等待,终止。

  public enum State {
        /**
         * Thread state for a thread which has not yet started.
         */

        NEW,

        /**
         * Thread state for a runnable thread.  A thread in the runnable
         * state is executing in the Java virtual machine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         */

        RUNNABLE,

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */

        BLOCKED,

        /**
         * Thread state for a waiting thread.
         * A thread is in the waiting state due to calling one of the
         * following methods:
         * <ul>
         *   <li>{@link Object#wait() Object.wait} with no timeout</li>
         *   <li>{@link #join() Thread.join} with no timeout</li>
         *   <li>{@link LockSupport#park() LockSupport.park}</li>
         * </ul>
         *
         * <p>A thread in the waiting state is waiting for another thread to
         * perform a particular action.
         *
         * For example, a thread that has called {@code Object.wait()}
         * on an object is waiting for another thread to call
         * {@code Object.notify()} or {@code Object.notifyAll()} on
         * that object. A thread that has called {@code Thread.join()}
         * is waiting for a specified thread to terminate.
         */

        WAITING,

        /**
         * Thread state for a waiting thread with a specified waiting time.
         * A thread is in the timed waiting state due to calling one of
         * the following methods with a specified positive waiting time:
         * <ul>
         *   <li>{@link #sleep Thread.sleep}</li>
         *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
         *   <li>{@link #join(long) Thread.join} with timeout</li>
         *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
         *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
         * </ul>
         */

        TIMED_WAITING,

        /**
         * Thread state for a terminated thread.
         * The thread has completed execution.
         */

        TERMINATED;
    }

wait和sleep的区别:

  1. 来自不同的类(wait来自object类,sleep来自Thread类)
  2. 关于锁的释放 wait会释放锁。sleep不会释放锁。
  3. 使用的范围是不同的:sleep可以用于任何地方。wait必须在同步代码块中。
  4. wait必须要被唤醒,sleep不需要被唤醒。

Lock锁(重点)

传统的synchronized

public class SaleTickets {
    public static void main(String[] args) {
        //并发:多个线程同时操作同一个资源类,把资源类丢入线程。
        Ticket ticket = new Ticket();
        //此处需要在Tread类中传入一个Runnable接口的实现类,因此需要lambda表达式,真正的实现在lambda表达式中。
        new Thread(()->{
            for (int i = 0; i <=50;i++){
            ticket.saleTickets();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i <= 50;i++) {

            ticket.saleTickets();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i <= 50; i++) {
            ticket.saleTickets();

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
    }
}
    /**资源类Ticket ,oop(尽量不使用接口,减少耦合性)
     * */

   class  Ticket{
        /**定义票数*/
        private  int numbers=50;
        /**定义一个售票的方法
         * 这里为了避免出现问题,加了同步锁。
         * 本质上就是排队+锁
         * */

        public  synchronized void saleTickets(){
            if (numbers>0){
                System.out.println(Thread.currentThread().getName()+"抢到了第"+(numbers--)+"张票,还剩余"+numbers);
            }
        }
    }


Lock锁:这是一个接口

公平锁与非公平锁

公平锁:十分公平,需要先来后到。非公平锁:十分不公平,可以进行插队。(默认)

Lock接口的使用示例:

public class SaleTicketByLockInterface {
    public static void main(String[] args) {
        //并发:多个线程同时操作同一个资源类,把资源类丢入线程。
        Ticket1 ticket = new Ticket1();
        new Thread(()->{
            for (int i = 0; i <=50;i++ ) {

            ticket.saleTickets();}
        },"A").start();
        new Thread(()->{
            for (int i = 0; i <= 50; i++) {

            ticket.saleTickets();}
        },"B").start();
        new Thread(()->{
            for (int i = 0; i <= 50; i++) {

            ticket.saleTickets();}
        },"C").start();


    }

    }

/**这里使用了lock锁
 * 1. new ReentrantLock()
 * 2.加锁
 * 3.解锁finally
 * */

class  Ticket1{
    /**定义票数和定义锁*/
    private  int numbers=50;
    Lock lock=new ReentrantLock();
    /**定义一个售票的方法
     *
     * */

    public  void saleTickets(){
        //这里的lock必须要与try catch块一起用。unlock必须要在finally块中。
        lock.lock();
        try {
            if (numbers > 0) {
                System.out.println(Thread.currentThread().getName() + "抢到了第" + (numbers--) + "张票,还剩余" + numbers);
            }
        }finally {
            lock.unlock();
        }
    }
}

Synchronized与Lock的区别

  1. Synchronized是一个内置的关键字,而Lock是一个Java类
  2. Synchronized是无法获取判断锁的状态,而Lock可以判断是否获取到锁
  3. Synchronized会自动释放锁,但Lock必须要手动释放锁。(如果不释放锁,会导致「死锁」
  4. Synchronized来进行同步时,线程1和线程2同时运行,线程1自动获得锁。线程2等待,若线程1阻塞,线程1会等待, 同时线程2也会一直等待。Lock锁不一定会一直等待下去。
  5. Synchronized是「可重入锁」,并且不可中断的,非公平的。Lock是「可重入锁」,判断锁,非公平(但是可以自己设置)
  6. Synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码。

生产者和消费者问题

「示例:生产者和消费者问题」1.Synchronized版的生产者与消费者问题

public class ProducersAndConsumers {
    public static void main(String[] args) {
        Data data=new Data();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者1").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者1").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者1").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者2").start();


    }

}

/**定义一个需要共同操作的资源类(需要低耦合性)
 *口诀:判断等待,业务,通知
 * */

class  Data{
private int num=0;

/**进行+1操作*/
    public  synchronized void increment() throws InterruptedException {
        if (num!=0){
            //如果num不为0则生产者线程需要等待
            this.wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName() + "===>"+num);
        //如果为0同时也进行了自增,则需要重新唤起其他线程
        this.notifyAll();
    }

/**进行-1操作*/
public synchronized void decrement() throws InterruptedException {
    if (num==0){
        //如果num为0,则消费者线程需要等待,等待生产者线程进行生产
        this.wait();
    }
    num--;
    System.out.println(Thread.currentThread().getName() + "===>"+num);
    //如果num不为0,且num--进行了相应减少,则需要唤起其他线程
    this.notifyAll();

}
}

存在的问题:「虚假唤醒问题」

应该将if改为while,因为if只会判断一次

2.Lock版的生产者消费者问题 利用condition的条件对象来控制线程的等待(await)和唤醒(signalAll),

利用Lock锁实现线程的随机唤醒:

public class ProducersAndConsumersByLock {
    public static void main(String[] args) {
        Data1 data=new Data1();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者1").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者1").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者2").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者2").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者3").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者3").start();


    }

}

/**定义一个需要共同操作的资源类(需要低耦合性)
 *
 * */

class  Data1{
    private int num=0;
    /**定义锁Lock和Condition*/
    Lock lock=new ReentrantLock();
    Condition condition=lock.newCondition();

    /**进行+1操作*/
    public  void increment() throws InterruptedException {
        lock.lock();
        try {
            while (num != 0) {
                //如果num不为0则生产者线程需要等待
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName() + "===>" + num);
            //如果为0同时也进行了自增,则需要重新唤起其他线程
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

    /**进行-1操作*/
    public  void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (num == 0) {
                //如果num为0,则消费者线程需要等待,等待生产者线程进行生产
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName() + "===>" + num);
            //如果num不为0,且num--进行了相应减少,则需要唤起其他线程
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }
}

condition:精准地通知和唤醒线程

利用Lock锁及Condition实现指定线程唤醒

public class ProducersAndConsumersByLock {
    public static void main(String[] args) {
        Data2 data=new Data2();
        new Thread(()->{
            for (int i = 0; i <10;i++ ) {
                data.printA();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printB();
            }

        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printC();
            }
        },"C").start();
    }
}

class Data2{//线程共享的资源,用Lock锁实现
    /**定义一个可重入锁
     * */

    private Lock lock=new ReentrantLock();
    /**定义几个条件对象,用来操作线程(同步监视器)
     * 可以用多个监视器来监视多个对象*/

    private Condition condition1=lock.newCondition();
    private Condition condition2=lock.newCondition();
    private Condition condition3=lock.newCondition();
    /**设置一个标志位
     * 1 A 执行
     * 2 B 执行
     * 3 C 执行
     * */

    private  int number=1;
    /**定义三个实现方法*/
    public void printA(){
        lock.lock();
        try {
            //业务代码
            while (number!=1){
                //此时如果number不为1,执行等待。
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"正在执行");
            //执行完毕之后,需要唤醒指定的线程,此处是B,此处使用监视器2来唤醒B并将标志设置为2
            number=2;
            condition2.signal();

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printB(){
        lock.lock();
        try {
            //业务代码
            while (number!=2){
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName() + "正在执行");
            number=3;
            condition3.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printC(){
        lock.lock();
        try {
            //业务代码
            while (number!=3){
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName() + "正在执行");
            number=1;
            condition1.signal();

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

锁是什么?如何判断锁的是谁?(八锁现象)

锁的是:对象(new出来的),class

示例1:一个类对象,两个同步方法

按照顺序来执行,并且第一个sendMessages方法设置了延迟,对象只有一个iphone,此时存在竞争现象,synchronized保证方法 同步,所以先执行了发短信,再执行了打电话(因为第一个方法设置延迟,所以线程B会等待线程A执行完毕之后再执行线程B的方法)

public class Test1 {
    public static void main(String[] args) throws InterruptedException {
        //定义一个线程资源类的实例对象

        IPhone iPhone = new IPhone();
     
        new Thread(() -> {
            iPhone.sendMessages();
        },"A").start();

        //使用Timeunit使线程休眠
        TimeUnit.SECONDS.sleep(1);
        new Thread(()->{
            iPhone.Call();
        },"B").start();
    }


}
/**定义一个资源类,由多个线程来共享。
 * */

class IPhone{

    //synchronized 锁的对象就是方法的调用者(谁调用就锁谁)
    //两个方法用的是同一把对象锁(iphone),谁先拿到谁执行
    /**定义两个手机类的同步方法*/
    public synchronized  void  sendMessages(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("手机可以发短信");

    }
    public synchronized  void Call(){
        System.out.println("手机可以打电话");
    }
}

示例2:两个对象,两个同步方法,一个普通方法

此时设置了两个对象,意味着存在两把不同的锁,所以调用两个同步方法并不存在竞争关系,由于线程A设置了休眠, 所以线程B的内容会先输出,再输出线程A的内容。并且由于普通方法并不存在竞争关系,所以由于休眠的原因会依旧先输出普通方法的内容。此时如果去掉休眠,依旧会按照顺序,先输出senMessages方法。

public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        //定义2个线程资源类的实例对象
        IPhone1 iPhone1 = new IPhone1();
        IPhone1 iPhone2=new IPhone1();
        new Thread(() -> {
            try {
                iPhone1.sendMessages();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A").start();

        //使用Timeunit使线程休眠
        TimeUnit.SECONDS.sleep(1);
        new Thread(()->{
            iPhone2.call();
        },"B").start();
    }


}
/**定义一个资源类,由多个线程来共享。
 * */

class IPhone1{

    //synchronized 锁的对象就是方法的调用者(谁调用就锁谁)
    //两个方法用的是同一把对象锁(iphone),谁先拿到谁执行
    /**定义两个手机类的同步方法*/
    public synchronized  void  sendMessages() throws InterruptedException {
        //此处如果不在同步方法这里加休眠,那么先执行同步方法,如果加了休眠,则先执行普通方法
        TimeUnit.SECONDS.sleep(2);
        System.out.println("手机可以发短信");

    }
    public synchronized  void call(){
        System.out.println("手机可以打电话");
    }


   /**这里不是同步方法,不受锁的影响,不存在竞争*/
    public  void sayHello(){
        System.out.println("Hello");
    }
}

示例3:两个静态同步方法,两个对象

此时拥有两个静态同步方法,两个类对象,结果是按照顺序来执行的:因为静态同步方法锁的是类,不是具体的对象,所以不论创建 多少个对象,锁的都是唯一的class(反射),类似于两个同步方法,一个对象,按照所规定的顺序来并发执行。(所以在第一个方法没结束之前 无法执行第二个方法。即synchronized把class锁住了,其他线程无法执行。)

public class Test3 {  
public static void main(String[] args) throws InterruptedException 
//定义2个线程资源类的实例对象   
IPhone2 iPhone1 = new IPhone2();
IPhone2 iPhone2=new IPhone2();
//此时两个对象的class模板只有一个,锁的是class对象 
new Thread(() -> {   
      try {             
      iPhone1.sendMessages(); 
  } catch (InterruptedException e) {  
  e.printStackTrace(); 
        }     
  },"A").start();     
//使用Timeunit使线程休眠  
TimeUnit.SECONDS.sleep(1);
new Thread(()->{   
iPhone2.call(); 
},"B").start();  
    }
}
/**定义一个资源类,由多个线程来共享。 
* Iphone2具有唯一的class对象(反射对象) 
* */

class IPhone2{  
//synchronized 锁的对象就是方法的调用者(谁调用就锁谁)
//static是一个静态域  
//类一加载就有了class 这是一个模板,这时候锁的是class对象(该对象全局唯一) 
/**定义两个手机类的静态同步方法*/ 
public  static synchronized  void  sendMessages() throws InterruptedException {     
//此处如果不在同步方法这里加休眠,那么先执行同步方法,如果加了休眠,则先执行普通方法  TimeUnit.SECONDS.sleep(2);     
System.out.println("手机可以发短信");
    } 
public static synchronized  void call(){     
System.out.println("手机可以打电话");  
    }
}

示例4:一个静态同步方法,一个同步方法,一个对象

此时定义了一个静态同步方法,一个同步方法,一个对象。所以输出结果是先执行call,再执行了senMessages方法 因为,虽然定义了一个对象,如果是两个普通同步方法,应该是按顺序执行,但有了静态同步方法的存在,就出现了 虽然是一个类对象,但是两个方法锁的却不是同一个对象,static方法锁的是class对象,普通同步方法锁的是定义的哪个对象,所以 就不存在竞争关系,加上static同步方法设置了休眠,所以会先输出call再输出senMessages方法。

public class Test4 {  
public static void main(String[] args) throws InterruptedException {    //定义2个线程资源类的实例对象    
IPhone3 iPhone1 = new IPhone3(); 
//IPhone3 iPhone2=new IPhone3(); 
//此时两个对象的class模板只有一个,锁的是class对象 
new Thread(() -> {  
try {       
iPhone1.sendMessages();
catch (InterruptedException e) {
e.printStackTrace();   
    }    
},"A").start();  
//使用Timeunit使线程休眠    
TimeUnit.SECONDS.sleep(1); 
new Thread(()->{
iPhone1.call();  
  },"B").start();  
    }
}
/**定义一个资源类,由多个线程来共享。 
* Iphone2具有唯一的class对象(反射对象) 
* */

class IPhone3{   
/**定义了一个静态的同步方法,   
* 此方法锁的是class模板*/
   
public  static synchronized  void  sendMessages() throws InterruptedException {   
//此处如果不在同步方法这里加休眠,那么先执行同步方法,如果加了休眠,则先执行普通方法  TimeUnit.SECONDS.sleep(2); 
System.out.println("手机可以发短信");  
}  
/**一个普通的同步方法    
* 此方法锁的是Iphone3对象 
* 与上一个方法锁的对象不同。
* 因此该方法不需要去等待上一个方法的执行。*/
   
public synchronized  void call(){   
System.out.println("手机可以打电话");  
  }
}

new this 指的是具体的类对象。static Class指的是唯一的类模板。如果是同一个锁(锁的是同一个对象),那么谁先获得谁先执行 如果是不同的锁(锁的是不同的对象),就分别执行,谁有休眠,谁就最后执行。

集合类不安全

List不安全

public class ListTest {  
//并发会出现的异常:java.util.ConcurrentModificationException并发修改异常    /**解决方案:    
1.利用vector类,是一个线程安全的集合。  
2. List<String> list= Collections.synchronizedList(new ArrayList<>());使用集合工具类collections将其转换成同步的  
*3.CopyOnWrite写入时复制 COW是一种计算机的优化策略。List<String> list=new CopyOnWriteArrayList<>();    
* 多线程调用的时候,List读取的时候是固定的,但是写入的时候多线程使得写入的数据会被覆盖造成错误,所以使用CopyOnWrite来写入时进行复制。 
*CopyOnWrite相比Vector的优势:  
* CopyOnWrite没有使用Synchronized,效率较高。  
* */  
public static void main(String[] args) {      
// 1.List<String> list=new ArrayList<>(); 
// 2. List<String> list=new Vector<>();
//3.  List<String> list= Collections.synchronizedList(new ArrayList<>());       
List<String> list=new CopyOnWriteArrayList<>(); 
for (int i = 1; i <=10;i++ ) {  
    new Thread(()->{ 
    list.add(UUID.randomUUID().toString().substring(0,5));                System.out.println(list);        
    },String.valueOf(i)).start();        
      } 
    }
}

set不安全

public class SetTest {    
/**出现异常:java.util.ConcurrentModificationException
* 解决方案:    
* 1.Collections:采用集合类来实现集合的线程安全   
* Set<String> set= Collections.synchronizedSet(new HashSet<String>());  *2.使用JUC的形式来处理集合的线程安全问题 
* Set<String> set=new CopyOnWriteArraySet<>(); 
* */
   
public static void main(String[] args) {   
// Set<String> set=new HashSet<String>();  
//1.Set<String> set= Collections.synchronizedSet(new HashSet<String>());    
Set<String> set=new CopyOnWriteArraySet<>();  
for (int i = 1; i <= 30; i++) {     
new Thread(()->{  
set.add(UUID.randomUUID().toString().substring(0,5));                System.out.println(set);  
        },String.valueOf(i)).start();    
            } 
        }
}

hashset的底层是什么?

 public HashSet() {        map = new HashMap<>();    }

底层就是hashmap。set的本质就是map中的key,因为key是无序且不重复的。

Map不安全

回顾map的基本知识:

public class MapTest {   
/**  
* hashmap基础知识: 
* new HashMap<>(16,0.75);初始容量16,负载因子0.75  
* 出现异常:java.util.ConcurrentModificationException  
* 解决方案: 
* 1.调用Collection工具类   
* Map<String,String> map= Collections.synchronizedMap(new HashMap<>());  * 2.使用 Map<String,String> map= new ConcurrentHashMap();  
*   
* */
    
public static void main(String[] args) {  
//Map<String,String> map=new HashMap<>();  
// Map<String,String> map= Collections.synchronizedMap(new HashMap<>());     
Map<String,String> map= new ConcurrentHashMap();     
for (int i = 1; i <= 30; i++) {   
new Thread(()->{     
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));          
System.out.println(map); 
},String.valueOf(i)).start();  
      }   
    }
}

Callable

Callable接口类似于Runnable接口,都是为其实例可能由另一个线程执行的类设计的。

  1. 可以有返回值。
  2. 可以抛出异常。
  3. 方法不同(是call方法)
public class CallableTest {  
public static void main(String[] args) throws ExecutionException, InterruptedException 
//线程启动方法new Thread().start();由于Thread中只接受Runnable接口的实现,无法直接接受Callable,需要通过FutureTask来建立联系。
MyThread myThread=new MyThread();    
//由于myThread无法直接传入Thread中,所以需要一个适配类FutureTask.因为该类实现了Runnable接口,并且其构造函数接受Callable接口参数,
FutureTask futureTask=  new FutureTask<>(myThread); new Thread(futureTask,"A").start(); 
//由于Callable接口是有返回结果的,所以可以进行接受 ,
String s=(String) futureTask.get(); 
System.out.println(s);  
    }
  }
class MyThread implements Callable<String>{
/**MyThread类实现了Callable接口,并重写call方法 * 有返回值,并且可以抛出异常 * */  
@Override   
public String call() throws Exception {     
System.out.println("call方法被执行了");   
  return "1024";    
  }
}

上面的get方法可能会产生「阻塞」,因为获取结果的时候会需要等待,要把它放到最后。或许使用「异步通信」

如果上方调用了两条线程来执行,其结果会被缓存(即第一次执行的结果会被保存起来,直接结束第二次将要执行的线程),提高效率(即只会将相应的结果输出一次。)

常用的辅助类

CountDownLatch(辅助类)

允许一个或者多个线程等待直到其他线程中执行的一组操作完成的同步辅助。是用来计数的

public class CountDownLatchTest {   
public static void main(String[] args) throws InterruptedException {      //模拟倒计时6个数  
CountDownLatch countDownLatch= new CountDownLatch(6); 
//对其执行-1操作
countDownLatch.countDown(); 
//模拟六个线程来执行减1操作    
for (int i = 0; i < 6; i++) { 
new Thread(()->{  
System.out.println(Thread.currentThread().getName() + "走了");             countDownLatch.countDown();
//对计数器执行-1操作(相对于指定的数量,本例中是6)     
},String.valueOf(i)).start();        }    
//此时如果六个线程都走完,需要等待计数器归0,然后再向下执行。   
countDownLatch.await(); 
System.out.println("关闭");   
      }
}

原理

  1. countDownLatch.countDown(),对计数器执行-1操作。
  2. countDownLatch.await();等待计数器归0

每次有线程调用countDownLatch.countDown()进行减1,直到变为0时,唤醒await方法,再继续执行

CyclicBarrier

允许一组线程全部等待达到共同屏障点的同步辅助。加法计数器

public class CyclicBarrierTest {  
public static void main(String[] args) {      
//这里需要传入一个数值型变量和一个Runnable接口的实现,意思是达到某个数量时执行Runnable接口中的操作。   
CyclicBarrier cyclicBarrier= new CyclicBarrier(7,()->{ 
//规定了必须要走完7个线程,之后才能执行其他的操作。   
System.out.println("收集成功");     
});   
for (int i = 1; i <= 7; i++) {     
//此处的lambda表达式是无法拿到i变量的,因为Lambda 表达式中要用到的,但又未在 Lambda 表达式中声明的变量, 
// 必须声明为 final 或者是 effectively final(定义之后再也没有被改变),否则就会出现编译错误。  
final int temp=i;     
new Thread(()->{         
System.out.println(Thread.currentThread().getName() + "拿到了第"+temp+"个数字");   
//通过方法来计数      
try {           
cyclicBarrier.await();   
catch (InterruptedException e) {    
e.printStackTrace();    
catch (BrokenBarrierException e) {      
e.printStackTrace();                }  
}).start();      
    }  
System.out.println("结束");   
    }
}

规定应该达到的线程数,达到之后再进行其他操作。

Semaphore

信号量,维持一组许可证,如果有必要,每个acquire都会阻塞,直到许可证可用,才能使用。限定了一定的数量。

public class SemaphoreTest {    
public static void main(String[] args) {    
//默认线程数量,假设有3个停车位。有6辆车需要停,限流可以去使用。   
Semaphore semaphore= new Semaphore(3); 
for (int i = 0; i < 6; i++) {    
new Thread(()->{   
//两个方法,一个获得,一个释放    
try {       
semaphore.acquire();  
System.out.println(Thread.currentThread().getName() + "得到了车位");      //得到停车位之后停留一段时间     
TimeUnit.SECONDS.sleep(2); 
System.out.println(Thread.currentThread().getName() + "离开了车位");                } catch (InterruptedException e) {          
e.printStackTrace();     
}finally {             
//所有的释放操作都必须放在final中      
semaphore.release();       
  } 
},String.valueOf(i)).start(); 
      } 
    }
}

原理:

  1. acquire():获得,如果已经满了,那就等待到被释放为止
  2. release():释放(加1)操作。然后唤醒等待资源。

「适用于多个共享资源互斥使用。,并发限流」

读写锁 ReadWriteLock

它维护了一对的locks,一个用于只读,一个用于写。读的时候可以由多个线程来读,但写的时候只能由单一的线程来执行写操作。

public class ReadWriteLockTest {  
public static void main(String[] args) {   
MyCacheLock myCache=new MyCacheLock();    
// Write操作    
for (int i = 1; i <= 5; i++) {  
final  int temp=i;  
new Thread(()->{          
myCache.set(temp+"", temp+"");    
  },String.valueOf(i)).start();     
}        
//Read操作       
for (int i = 1; i <= 5; i++) {        
final  int temp=i;    
new Thread(()->{    
myCache.get(temp+"");    
},String.valueOf(i)).start();     
    } 
  }
}
/**自定义缓存*/
class MyCache{   
/**volatile表示共享资源同步,是原子性的   
* 特性是:有序性,可见性,避免重排序。
*/

private  volatile Map<String,Object> map=new HashMap<>();   
/**存方法    
* 写*/
  
public synchronized void  set(String key ,Object obj){   
System.out.println(Thread.currentThread().getName() + "写入"+key); 
map.put(key, obj);
System.out.println(Thread.currentThread().getName() + "写入完毕");  
    }   
/**取方法*/   
public void get(String key){       
System.out.println(Thread.currentThread().getName() + "读取");    
Object o=map.get(key);  
System.out.println(Thread.currentThread().getName() + "读取完毕");  
    }
}
/**定义一个加锁的缓存类*/
class MyCacheLock {   
/**volatile表示共享资源同步,是原子性的     
* 特性是:有序性,可见性,避免重排序。
*/
   
private volatile Map<String, Object> map = new HashMap<>();   
/**创建一个读写锁,更加精细的控制*/  
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();    /**存方法    
* 写*/
  
public void set(String key, Object obj) {  
//此处类似于lock锁    
readWriteLock.writeLock().lock();   
try { 
System.out.println(Thread.currentThread().getName() + "写入" + key);      map.put(key, obj); 
System.out.println(Thread.currentThread().getName() + "写入完毕");        } catch (Exception e) {            
e.printStackTrace(); 
finally {      
readWriteLock.writeLock().unlock();  
  }  
}  
/**取方法*/    
public void get(String key) {     
readWriteLock.readLock().lock();
try {         
System.out.println(Thread.currentThread().getName() + "读取");            Object o = map.get(key);   
System.out.println(Thread.currentThread().getName() + "读取完毕");        } catch (Exception e) {          
e.printStackTrace();     
finally {         
readWriteLock.readLock().unlock();   
    } 
  }
}

「读写锁的总结」ReadWriteLock

  1. 读和读 可以共存
  2. 读和写 不可以共存
  3. 写和写 不可以共存 独占锁(写锁):一次只能被一个线程占用 共享锁(读锁):多个线程可以同时占用

阻塞队列

  • 阻塞

  • 队列

阻塞队列

什么情况下需要阻塞队列?多线程并发处理和线程池

使用队列:添加或者移除

「四组API」

  1. 抛出异常
  2. 不会抛出异常,有返回值
  3. 阻塞等待
  4. 超时等待

「抛出异常的方法」

public class TestThrowsException{
/**抛出异常的队列 
*
**/

public static void test1(){    
//设置队列的大小3      
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);      System.out.println(arrayBlockingQueue.add("a"));    
System.out.println(arrayBlockingQueue.add("b"));   
System.out.println(arrayBlockingQueue.add("c"));     
//add操作 抛出异常:java.lang.IllegalStateException    
// System.out.println(arrayBlockingQueue.add("d"));   
System.out.println("======================");     
//表示查看队首的元素     
System.out.println(arrayBlockingQueue.element()); 
//弹出元素remove    
System.out.println(arrayBlockingQueue.remove());  
System.out.println(arrayBlockingQueue.remove());  
System.out.println(arrayBlockingQueue.remove());  
//取完之后再取会抛出异常:java.util.NoSuchElementException    
System.out.println(arrayBlockingQueue.remove());    
    }
}

「不抛出异常,有返回值的方法」

public class TestNoThrowsException{  
/**不抛出异常,但是会有返回值*/   
public static void test2() {  
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);      System.out.println(arrayBlockingQueue.offer("a"));    
System.out.println(arrayBlockingQueue.offer("b"));  
System.out.println(arrayBlockingQueue.offer("c"));  
//超过三个时,会返回一个布尔值,表示添加失败,没有抛出异常   
//System.out.println(arrayBlockingQueue.offer("d"));
//表示查看队列的首部元素peek   
System.out.println(arrayBlockingQueue.peek());    
System.out.println("=========================");  
//表示即将移除队列的元素     
System.out.println(arrayBlockingQueue.poll()); 
System.out.println(arrayBlockingQueue.poll()); 
System.out.println(arrayBlockingQueue.poll());   
//当取出超过三个时,会返回null值也不会抛出异常    
System.out.println(arrayBlockingQueue.poll()); 
    }
}

「阻塞,一直等待」

public class WaitQueueTest{    
/**     
* 等待  一直阻塞*/
    
public static void test3() throws InterruptedException {       
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);      //没有返回值,一直阻塞        
arrayBlockingQueue.put("a");    
arrayBlockingQueue.put("b");  
arrayBlockingQueue.put("c"); 
//此时再添加元素之后,会一直等待    
//  arrayBlockingQueue.put("d");
System.out.println("================="); 
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take()); 
System.out.println(arrayBlockingQueue.take());  
//此时没有元素时,再取出,会一直等待     
System.out.println(arrayBlockingQueue.take());   
  }
}

「阻塞,超时等待」

public class WaitForALongTime{    
/**等待,阻塞(等待超时)*/  
public static void test4() throws InterruptedException {   
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);      arrayBlockingQueue.offer("a"); 
arrayBlockingQueue.offer("b");
arrayBlockingQueue.offer("c");
//当队列满时继续添加元素,但只等待两秒,若仍旧没有位置,就会超时退出   
arrayBlockingQueue.offer("d",2, TimeUnit.SECONDS);  
System.out.println("=======================");  
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
//此时是取出元素,若超过两秒钟仍旧没有取出,就直接退出    
arrayBlockingQueue.poll(2,TimeUnit.SECONDS); 
  }
}

SynchronousQueue 同步队列

非阻塞队列 没有容量,添加一个元素,必须等待该元素取出之后才能继续添加元素,相当于容量为1。put和take来进行添加和删除操作

「同步队列」

public class SynchronousQueueTest {   
public static void main(String[] args) {    
//BlockingQueue是SynchronousQueue的父接口---同步队列  
BlockingQueue<String> synchronousQue=new SynchronousQueue<>();        //创建两个线程来测试     
new Thread(()->{     
try {      
for (int i = 1; i <=5;i++ ) {   
System.out.println(Thread.currentThread().getName() + "put"+i);          synchronousQue.put(String.valueOf(i));      

  } catch (InterruptedException e) {    
e.printStackTrace();     
    }   
},"T1").start();     
new Thread(()->{         
//为了确保上面先执行,对线程等待          
try {          
for (int i = 1; i <= 5; i++) {   
TimeUnit.SECONDS.sleep(2);    
System.out.println(Thread.currentThread().getName() + "take"+synchronousQue.take());      
}
  } catch (InterruptedException e) {     
e.printStackTrace();         
  }  
},"T2").start();    
  }
}

线程池(重点)

线程池:三大方法,七大参数,四种拒绝策略

「池化技术」:事先准备好一些资源,要用就取,不用就回归池

程序运行的本质是:消耗占用系统资源,我们需要优化资源使用 常见的池:线程池,JDBC连接池,内存池,对象池

线程池的优势

  1. 降低资源的消耗
  2. 提高响应的速度和效率
  3. 方便管理线程「线程复用,控制最大并发数,管理线程」

注意:线程池不允许使用Executors创建,而是通过ThreadPoolExecutors创建,可以使我们更加明确线程池的运行规则 ,避免资源耗尽的风险。

Executors返回的线程池对象弊端如下:1)FixedThreadPool和SingleThreadPool允许的请求队列长度最大约为21亿,可能会堆积大量的请求。导致OOM 2) CacheThreadPool允许创建的最大线程数为21亿,可能会创建大量的线程,导致OOM

三大方法

public class ThreeMethodsTest {    
/**表示一个线程池中最多有多少个线程可以执行,单线程的就只有一个线程执行,指定容量的就有容量个数的线程可以执行,   
* 缓存线程池表示可伸缩的线程数量  
* */
   
public static void main(String[] args) {     
//得到一个单一的线程池   
ExecutorService treadPool=Executors.newSingleThreadExecutor();        //创建一个固定线程大小的线程池   
ExecutorService treadPool1= Executors.newFixedThreadPool(5);   
//得到一个缓存线程池,可伸缩的     
ExecutorService treadPool2=Executors.newCachedThreadPool();      
/**使用了线程池来创建线程*/    
try {      
for (int i = 0; i < 10; i++) {      
treadPool2.execute(() -> {      
System.out.println(Thread.currentThread().getName() + "进来了");                });  
      } 
}catch (Exception e){     
e.printStackTrace();   
}finally {  
//线程池用完,程序结束,需要关闭线程池。      
treadPool2.shutdown();     
    }
  }
}

七大参数

「源码分析」

public static ExecutorService newSingleThreadExecutor() {    
return new FinalizableDelegatedExecutorService  
(new ThreadPoolExecutor(11,    
0L, TimeUnit.MILLISECONDS,                                   
new LinkedBlockingQueue<Runnable>()));
}    
public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>()); 

public static ExecutorService newCachedThreadPool() {    
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());   
}本质上是   ThreadPoolExecutor  

「ThreadPoolExecutor源码」

public ThreadPoolExecutor(int corePoolSize,//核心线程大小                              int maximumPoolSize,//最大线程大小                              long keepAliveTime,//超时无人调用,就释放                              TimeUnit unit,//超时单位                            
BlockingQueue<Runnable> workQueue,//阻塞队列                              ThreadFactory threadFactory,//线程工厂,创建线程,一般不懂                              RejectedExecutionHandler handler)//拒绝策略 {      
if (corePoolSize < 0 ||   maximumPoolSize <= 0 ||   maximumPoolSize < corePoolSize ||      keepAliveTime < 0)     
throw new IllegalArgumentException()
;     
if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();  
this.corePoolSize = corePoolSize;   
this.maximumPoolSize = maximumPoolSize;  
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);    
this.threadFactory = threadFactory;   
this.handler = handler;   
}

「手动创建线程池」

public class ThreadPoolTestForSevenParam {  
public static void main(String[] args) {     
//采用 ThreadPoolExecutor的方式来创建线程池    
ExecutorService executorService=new ThreadPoolExecutor(                2,              
5,    
3,   
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3), 
Executors.defaultThreadFactory(),    
//队列满了会和最早的进行竞争,也不会抛出异常。    
new ThreadPoolExecutor.DiscardOldestPolicy()
);   
try {          
//最大的线程承载 队列+max值,超出最大值就会抛出异常,触发拒绝策略      
for (int i = 1; i <=9; i++) {   
executorService.execute(()->{   
System.out.println(Thread.currentThread().getName() + "进来了");                });  
    }       
}catch (Exception e){      
e.printStackTrace(); 
}finally {  
executorService.shutdown();
      } 
    }
}

「四种拒绝策略」

四大拒绝策略

  1. 此处的new ThreadPoolExecutor.AbortPolicy()是默认的拒绝策略。AbortPolicy表示当队列满时,不处理了,抛出异常 new ThreadPoolExecutor.AbortPolicy()
  2. 哪里来的去哪里,将交由main线程来执行操作 new ThreadPoolExecutor.CallerRunsPolicy()
  3. 队列满时不会抛出异常,丢掉任务 new ThreadPoolExecutor.DiscardPolicy()
  4. 队列满了会和最早的进行竞争,也不会抛出异常。new ThreadPoolExecutor.DiscardOldestPolicy()

CPU密集型和IO密集型---定义最大线程

  1. 「CPU密集型」
public class TestCpu {
//获取CPU的核数
    //    System.out.println(Runtime.getRuntime().

 availableProcessors());
 //采用 ThreadPoolExecutor的方式来创建线程池
 ExecutorService executorService = new ThreadPoolExecutor(
         2,
         Runtime.getRuntime().availableProcessors(),
         3,
         TimeUnit.SECONDS,
         new LinkedBlockingQueue<>(3),
         Executors.defaultThreadFactory(),
         //队列满了会和最早的进行竞争,也不会抛出异常。
         new ThreadPoolExecutor.DiscardOldestPolicy()
 );
}
  1. 「IO密集型」判断程序中十分耗IO的线程,最大线程数设为其两倍

四大函数式接口

lambda表达式,链式编程,函数式接口,Stream流式计算

函数式接口:只有一个方法的接口,如Runnable

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface {@code Runnable} is used
     * to create a thread, starting the thread causes the object's
     * {@code run} method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method {@code run} is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */

    public abstract void run();
}

可以简化编程模型,在框架底层使用。

JUC中四大函数式接口

「函数型接口Function」

public class FunctionInterfaceTest {
 /*Function 函数型接口
 * 1.有一个输入参数,有一个输出参数
 * 2. 只要是函数式接口,就可以用lambda表达式来简化
  * * */

    public static void main(String[] args) {
        //函数型接口的使用是匿名内部类(此处的功能就是输入什么就输出什么)
        Function function=new Function<String,String>() {
            @Override
            public String apply(String str) {
                return str;
            }
        };

        Function<String ,String> function1=( str)->{return str;};

        System.out.println(function1.apply("a"));

    }
}

「Predicate接口」

断定型接口:有一个输入参数,返回值只能是布尔值。

public class PredicateInterfaceTest {
    public static void main(String[] args) {
        //实现判断字符串是否为空
        Predicate<String> predicate=new Predicate<String>() {
            @Override
            public boolean test(String str) {
                return str.isEmpty();
            }
        };

        //lambda表达式简化
        Predicate<String> predicate1=str->{return str.isEmpty();};
        System.out.println(predicate1.test("a"));

    }
}

「Consumer消费型接口」

public class ConsumerInterfaceTest {
    /**Consumer接口只有输入,没有返回值*/
    public static void main(String[] args) {
        Consumer<String>consumer=new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        };
        //可以使用lambda表达式
        Consumer<String> consumer1=(String s)->{
            System.out.println(s);
        };
        consumer.accept("A");
    }
}

「supplier供给型接口」

public class SupplierInterfaceTest {
    /**Supplier供给型接口,没有输入,只有返回值*/
    public static void main(String[] args) {
        Supplier<Integer> sup=new Supplier<Integer>() {
            @Override
            public Integer get() {
                System.out.println("进get方法了");
                return 1024;
            }
        };
        //使用lambda表达式
        Supplier<Integer> sup1=()->{return 1024;};
        System.out.println(sup1.get());
    }
}

流式Stream计算

什么是Stream流式计算?

存储+计算 集合 MySQL都是存储 流用来计算

public class TestStream {
 /**题目要求:
  * 对下面的所有用户进行操作筛选
  * 1. id必须为偶数
  * 2.年龄必须大于23
  * 3.名字必须转为大写字母
  * 4.用户名字母倒着排序
  * 5.只输出一个用户*/

    public static void main(String[] args) {


        User u1 = new User(1"a"21);
        User u2 = new User(2"b"22);
        User u3 = new User(3"c"23);
        User u4 = new User(4"d"24);
        User u5 = new User(5"e"25);
        User u6 = new User(6"f"26);

        //将其放入集合(存储)中
        List<User> alist = Arrays.asList(u1, u2, u3, u4, u5, u6);

        //计算交给流
        alist.stream().filter(u->{return u.getId()%2==0;})
                .filter(u->{return u.getAge()>23;})
                .map(u->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach(System.out::println);
    }
}

class User{
    private int id;
    private String name;
    private int age;

    public User(int id,String name,int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

ForkJoin

分支合并 什么是ForkJoin 出现于JDK1.7,主要执行一些并发任务,适合大数据的情形下来执行任务。

「特点」:工作窃取;

维护的都是「双端队列」

「ForkJoin的使用」

public class ForkJoinDemo extends RecursiveTask<Long> {
    private Long start;
    private Long end;
    /**定义一个临界值*/
    private Long temp=10000L;
    public ForkJoinDemo(Long start,Long end){
        this.start = start;
        this.end = end;

    }

    /**这是一个计算的方法*/
    @Override
    protected Long compute() {
        Long result=0L;
        if ((end-start)<temp){
            Long sum=0L;
            for (Long i=start; i < end; i++) {
                sum+=i;

            }
            return sum;
        }else {
            //使用ForkJoin计算
            Long mid=(start + end)/2;
            ForkJoinDemo forkJoinDemo1=new ForkJoinDemo(start,mid);
            //拆分任务,把任务压入线程队列
           ForkJoinTask task1= forkJoinDemo1.fork();
            ForkJoinDemo forkJoinDemo2=new ForkJoinDemo(mid+1,end);
            ForkJoinTask task2=forkJoinDemo2.fork();
            //获取最终结果
            return result=forkJoinDemo1.join()+forkJoinDemo2.join();

        }
    }

    public static void main(String[] args) {
        ForkJoinDemo task=new ForkJoinDemo(0L,10_0000_0000L);
        System.out.println(task.compute());

        System.out.println("====================");

        long start=System.currentTimeMillis();
        System.out.println(LongStream.rangeClosed(0L1_0000_0000).sum());
        long end=System.currentTimeMillis();
        System.out.println(end-start);

        //5000000050000000
        //499934463999828390
       BigInteger result=BigInteger.valueOf(5000_0000_5000_0000L).divide(BigInteger.valueOf(499934463999828390L));
        System.out.println(result);
    }
}

异步回调

Future:对将来某个要发生的事件进行建模

执行阻塞任务时不会进行持续等待。

Future接口

CompletableFuture接口

public class FutureDemo {
/**异步调用:CompletableFuture<T>
 *1.成功回调
 *2.失败回调
 * */

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.泛型采用Void,因此是无返回结果的(无返回值的异步回调)
       CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
           try {
               TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
          // int a=10/0;
           System.out.println(Thread.currentThread().getName() + "无返回结果");
       });
        System.out.println("111111111111111");
       //利用get方法来获得返回结果,阻塞获取执行结果,调用get之后会执行lambda表达式中内容,因为无返回值,所以输出get的结果是null
        completableFuture.get();

        System.out.println("================");
        //2.有返回值的异步回调 lambda表达式中传入一个供给型参数
        CompletableFuture<Integer> completableFuture1=CompletableFuture.supplyAsync(()->{
            int i = 10/0;
            System.out.println(Thread.currentThread().getName() + "Integer");
            return 1024;});

        //由于是有返回值的异步回调,所以当成功时需要进行相应的操作(方法内需要传入消费型接口,有参数无返回值)
        System.out.println(completableFuture1.whenComplete((t, u) -> {
            System.out.println("t的内容是" + t);//正常的返回结果(正常结果参数)
            System.out.println("u的内容是" + u);//如果有错误就会打印错误的信息(错误信息参数)
            //当失败时,会形成相应的异常,exceptionally会要求传入一个Function接口的参数,要求有参数和返回值。
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 404;//可以获取到错误的结果
            //get方法依旧是获取异步回调的结果
        }).get());
    }
}

JMM

JMM:Java内存模型,是个概念(约定),并不是真实存在的

关于JMM的一些约定

  1. 线程解锁前,必须把自己的共享变量立刻刷回主存
  2. 线程加锁前,必须读取主存中的最新值到工作区
  3. 加锁和解锁是同一把锁。

线程分为「工作内存」「主内存」。一般有8种操作:

先store后write

存在的问题:内存可见性问题

八种操作

  1. lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态
  2. unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  3. read(读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
  4. load(载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
  5. use(使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
  6. assign(赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
  7. store(存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
  8. write(写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

八种操作的规则

  1. 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
  2. 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
  3. 不允许一个线程将没有assign的数据从工作内存同步回主内存
  4. 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
  5. 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
  6. 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
  7. 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
  8. 对一个变量进行unlock操作之前,必须把此变量同步回主内存

Volatile

Volatile是JVM提供的「轻量级的同步机制」作用:

  1. 保证可见性
  2. 不保证原子性
  3. 防止指令重排

保证可见性

public class JmmDemo2 {
    //不加volatile就会形成死循环,加上volatile可以保证可见性
        private static volatile int number=0;
        public static void main(String[] args) {//main线程
            new Thread(()->{//线程1对主内存变化可见
                while (number==0){
                   //由于使用了volatile 关键字,可以使该线程对主存中变量的变化可见。
                    System.out.println(number);//此处输出的number为0,原因是主线程休眠了1秒中,但A线程还在运行,
                    // 此时的number没有被改为1
                }
            }).start();
            //让主线程休眠一秒
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            number=1;
            System.out.println(number);
        }
    }

不保证原子性

原子性:不可分割(线程A在执行任务时不能被打扰和分割,要么同时成功,要么同时失败)

public class VolatileAtomicTest1 {
    /** volatile 不保证原子性*/
        private static volatile int number=0;

        public static void add(){
            number++;
        }
    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            //创建20个线程,每个线程执行1000次add方法
            new Thread(()->{
                for (int i1 = 0; i1 < 1000; i1++) {
                    add();
                }
            }).start();
        }
        //判断存活的线程数,如果大于2说明还有线程没有执行完,并将其作出让步。
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+""+number);
    }
}

此例加上lock和Synchronized可以保证原子性,如果不加,如何保证原子性?使用原子类来解决原子性问题

「原子类操作示例」

public class VolatileAtomicTest2 {
    /**现在使用原子类的int*/
    private static volatile AtomicInteger number=new AtomicInteger(1);

    public static void add(){
            number.getAndIncrement();//执行加1方法,使用CAS来执行操作
    }
    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            //创建20个线程,每个线程执行100次add方法
            new Thread(()->{
                for (int i1 = 0; i1 < 1000; i1++) {
                    add();
                }
            }).start();
        }
        //判断存活的线程数,如果大于2说明还有线程没有执行完,并将其作出让步。
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+""+number);
    }
}

这些类的底层都是直接与操作系统挂钩。(在内存中修改值)

防止指令重排

指令重排:自己写的程序,计算机并不是按照自己写的顺序执行。过程:源代码->编译器优化的重排->指令并行重排->内存系统重排->执行「处理器在指令重排时会考虑数据之间的依赖性问题。」

「volatile可以避免指令重排」

内存屏障,CPU指令的作用:

  1. 保证特定的操作执行顺序
  2. 可以保证某些变量的内存可见性

总结:volatile可以保证「可见性」,但「不保证原子性」,由于内存屏障,可以「防止指令重排」

单例模式

饿汉式、DCL懒汉式设计模式

饿汉式

public class HungryPattern {
    /**这一部分属性在创建对象之后就加载进内存,会比较浪费空间*/
    private byte[] data1=new byte[1024];
    private byte[] data2=new byte[1024];
    private byte[] data3=new byte[1024];
    private byte[] data4=new byte[1024];
    /**单例模式最显著的特点是构造器私有*/
    private HungryPattern() {

    }

    /**构造一个对象,可以保证其是唯一的对象,单例*/
    private static  HungryPattern HUNGRY=new HungryPattern();

    /**构造一个静态方法,返回这个单例的对象
     * */

    public static HungryPattern getInstance(){
        return  HUNGRY;
    }
}


DCL懒汉式

public class LazyPattern {
    private  static boolean flag=false;
    /**单例模式,构造器私有*/
private  LazyPattern() {
    /*防止反射来破坏单例模式,可以加一重检测*/
    synchronized (LazyPattern.class){
        if (flag==false){
            flag = true;
        }else {
            //此处 表示如果flag为false,说明之前没有通过反射或其他机制来创建对象,可以继续执行,但如果不为false说明之前创建过对象,此时需要抛出异常
            throw new RuntimeException("不要试图使用反射来破坏单例模式");
        }
      /*  if (LAZY!=null){
            throw new RuntimeException("不要试图使用反射来破坏单例模式");
        }*/

    }
    System.out.println(Thread.currentThread().getName()+"启动了");
}
/**创建单一的对象*/
private static volatile   LazyPattern LAZY;

/**创建一个获取单例对象的方法
 * 由于没有进行初始化,所以需要对其进行相应的判断,如果为空,则创建一个,再返回*/

public static LazyPattern getInstance(){
    //双重检测锁模式的懒汉式单例模式 DCL
    if (LAZY==null){
        //如果单例的对象为空,将该该类的class对象锁住
        synchronized (LazyPattern.class){
            if (LAZY == null){
                LAZY=new LazyPattern();
                /**不是一个原子性操作:1.分配内存空间,2.执行构造方法,初始化对象3.将对象指向空间
                 * 可能CPU不会按照指定的顺序来执行,此时会发生错误,A线程可能按照132的顺序执行,但此时B线程也进入了,读取到对象不为空
                 * 直接返回,造成异常,因此需要利用volatile修饰对象*/

            }
        }
    }

        return LAZY;
}

//单线程下,单例是安全的

    public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException {
       //可以利用反射来破坏单例模式
        //首先创建一个LazyPattern的对象
       // LazyPattern instance1=LazyPattern.getInstance();
        //利用反射获取该类的私有构造器
        Constructor<LazyPattern> constructor=LazyPattern.class.getDeclaredConstructor(null);
        //关闭安全机制
        constructor.setAccessible(true);
        //利用反射构造器来创建新的对象
       LazyPattern instance2=constructor.newInstance();


        //此时如果获取了属性信号量,并对其进行相应的修改,依旧可以破坏单例模式
       Field f= LazyPattern.class.getDeclaredField("flag");
       //破坏f属性的安全检测
        f.setAccessible(true);
        f.set(flag,false);
        //创建对象后,将相应的信号量又改为false,可以继续创建新的对象,单例模式又被破坏了。
        LazyPattern instance3=constructor.newInstance();
        //如果不通过getInstance方法获取对象,都通过反射来new,依旧会破坏单例模式。
        //此时需要在类中设定一个标志信号,如果满足某个信号,说明之前已经有过一个对象了,就无法再创建,会立即抛出异常
        System.out.println(instance3);
        System.out.println(instance2);
    }
}

静态内部类单例模式

public class Holder {
    /**创建私有构造器*/
    private Holder() {

    }
/**创建一个静态方法去获得Holder的对象*/
public static Holder getInstance(){
    return InnerClass.holder;
}
    /**定义一个静态内部类
     * */

    public static class InnerClass{
        //创建外部类的对象
       private static final Holder holder= new Holder();

    }
}


以上的单例模式都不够安全,因此引出了枚举

枚举

枚举类反编译源码

public enum EnumSingleton {
    /**定义枚举常量*/
    INSTANCE;
    //枚举类的构造器是私有的

    public EnumSingleton getInstance(){
        return INSTANCE;
    }
}
class Test{
    public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        //利用枚举类来创建对象
        EnumSingleton instance1 = EnumSingleton.INSTANCE;
        //通过反射获取枚举类的构造器对象
        //Constructor<EnumSingleton> constructor=EnumSingleton.class.getDeclaredConstructor(null);
        Constructor<EnumSingleton> constructor=EnumSingleton.class.getDeclaredConstructor(String.class,int.class);
        //关闭安全检测
        constructor.setAccessible(true);
        //常见另一个枚举对象
        EnumSingleton instance2=constructor.newInstance();
        System.out.println(instance1);
        System.out.println(instance2);
        /**
         * java.lang.NoSuchMethodException异常显示我们类中没有这个无参的构造方法
         * 实际上存在的是有参数的构造方法
         * 修改完反射之后的构造器参数,之后会出现异常:Cannot reflectively create enum objects,即不能用反射来创建枚举对象*/

    }
}

深入理解CAS

CAS:期望的值达到了,就更新,否则不更新。CAS是CPU的并发原语

unSafe类是个本地方法

「自旋锁」:表示会一直循环直到达到目标值

「CAS」

public class CASDemo1 {

    //CAS:比较并学习
    public static void main(String[] args) {
        //定义一个原子类
        AtomicInteger integer=new AtomicInteger(2021);
        /**
         *     public final boolean compareAndSet(int expectedValue, int newValue)
         *     一个期望值,一个更新值*/

        //如果我期望的值达到了,就更新,否则不更新。CAS是CPU的并发原语
        System.out.println(integer.compareAndSet(20212022));
        System.out.println(integer.get());
        integer.getAndIncrement();
        System.out.println(integer.compareAndSet(20212022));
        System.out.println(integer.get());
    }
}

总结:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么执行操作,如果不是就一直执行循环。

「缺点」

  1. 底层循环会耗时
  2. 一次性只能保证一个共享变量的原子性
  3. 存在ABA问题

ABA问题

这是一种乐观锁

原子引用

原子引用:「带版本号的原子操作」---->乐观锁

Integer使用了对象缓存机制,默认范围为-128-127,「推荐使用静态工厂方法valueof()「获取对象实例,而不是new 因为valueof使用了」缓存」,而new一定会创建新的对象,分配新的内存空间。

public class AtomicReferenceTest {
    public static void main(String[] args) {
        //创建带版本号的原子引用 initialStamp:表示所带的版本号
        //AtomicStampedReference 如果泛型是包装类,要注意对象的引用问题。
     //正常情况下传入的是对象,比较的是对象地址
        AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(1,1);
        //模拟两个线程操作
        new Thread(()->{
            //获得版本号
            int stampedIndex = atomicStampedReference.getStamp();
            System.out.println("a1获得"+ stampedIndex);
            //加上延迟,保证两者版本号相同
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //对原子引用进行CAS操作,并写出原来的版本号和新的版本号
            System.out.println(atomicStampedReference.compareAndSet(12,
                    atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)+"A");
            System.out.println("a2获得"+atomicStampedReference.getStamp());

            //再将更新好的操作,重新改回去
            System.out.println(atomicStampedReference.compareAndSet(21,
                    atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)+"A");

            System.out.println("a3获得"+atomicStampedReference.getStamp());

        },"A").start();
        new Thread(()->{
            int stamp=atomicStampedReference.getStamp();
            System.out.println("b1获得"+stamp);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicStampedReference.compareAndSet(16,
                    atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)+"B");
            System.out.println("b2获得"+atomicStampedReference.getStamp());
        },"B").start();


      /*  TimeUnit unit;
        BlockingQueue workQueue;
        ThreadPoolExecutor executor=new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        for (int i = 0; i < 2; i++) {
            executor.execute(() -> {
                //获得版本号
                int stampedIndex = atomicStampedReference.getStamp();
                System.out.println(Thread.currentThread().getName() + stampedIndex);
                //加上延迟,保证两者版本号相同
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //对原子引用进行CAS操作,并写出原来的版本号和新的版本号
                atomicStampedReference.compareAndSet(2020,2022,
                        atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
                System.out.println(Thread.currentThread().getName()+atomicStampedReference.getStamp());

                //再将更新好的操作,重新改回去
                atomicStampedReference.compareAndSet(2022,2020,
                        atomicStampedReference.getStamp(), atomicStampedReference.getStamp()+1);

                System.out.println(Thread.currentThread().getName()+atomicStampedReference.getStamp());

            });*/

        }
    }

各种锁的理解:

公平锁、非公平锁

  1. 公平锁:非常公平,不准插队,讲究线程的先来后到
  2. 非公平锁:非常不公平,可以插队, 默认都是非公平锁
public class sync {
 public ReentrantLock() {
  sync = new NonfairSync();
 }
 
 //重载方法:判断是否是公平锁
 public ReentrantLock(boolean fair) {
  sync = fair ? new FairSync() : new NonfairSync();
 }
}


可重入锁

可重入锁(递归锁 ),相当于是获得了一把锁之后间接获得第二把锁

「Synchronized」

public class ReentrantLockSynchronizedTest {
    public static void main(String[] args) {
        Phone phone=new Phone();
     /**  new Thread(()->{
            phone.sendMessage();
        },"A").start();
        new Thread(()->{
            phone.sendMessage();
        },"B").start();*/


    ThreadPoolExecutor executor=new ThreadPoolExecutor(2,2,0, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    executor.execute(()->{
        phone.sendMessage();
    });
    executor.execute(()->{
        phone.sendMessage();
    });

    }
}
class Phone{
    /**定义一个发信息的方法和一个打电话的方法
     * 此时线程调用sendMessage方法之后会立即得到call方法的锁,只有等到call方法执行完毕之后才会释放锁给B执行。
     * */

    public  synchronized void sendMessage(){
        System.out.println(Thread.currentThread().getName()+"正在发信息");
        call();//call方法里也有一把锁
    }
    public synchronized  void call(){
        System.out.println(Thread.currentThread().getName()+"正在打电话");
    }
}

「Lock锁」Lock锁的加锁和解锁必须要「成对出现」

public class ReentrantLockForLockTest {
    public static void main(String[] args) {
        Phone1 p=new Phone1();
        ThreadPoolExecutor executor=new ThreadPoolExecutor(2,2,0, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        executor.execute(()->{
            p.sendMessage();
        });
        executor.execute(()->{
            p.sendMessage();
        });
    }
}
class Phone1{
    /**定义一个发信息的方法和一个打电话的方法
     * 此时线程调用sendMessage方法之后会立即得到call方法的锁,只有等到call方法执行完毕之后才会释放锁给B执行。
     * */

    Lock lock=new ReentrantLock();
    public   void sendMessage(){
        lock.lock();//与synchronized不同的是线程进入方法之后会获得两把锁,一个是sendMessage的,一个是call的,并且lock锁必须配对
        try {
            System.out.println(Thread.currentThread().getName() + "正在发信息");
            call();//call方法里也有一把锁
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public   void call(){
        lock.lock();
        try {
        System.out.println(Thread.currentThread().getName()+"正在打电话");}catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

自旋锁

spinLock

自定义锁测试

public class SpinLockTest {
    /**此时的原子引用以Thread为参数,表示锁一个线程*/
   AtomicReference<Thread> atomicReference=new AtomicReference<>();
   /**加锁的方法*/
    public void myLock(){
        //得到获得该锁的线程
        Thread thread=Thread.currentThread();
        System.out.println(thread.getName()+" myLock");

        //自旋锁
        while (atomicReference.compareAndSet(null, thread)) {
            System.out.println(Thread.currentThread().getName() + "正在自旋");
        }
    }
    /**解锁的方法*/
    public void myUnLock() {
        Thread thread=Thread.currentThread();
        System.out.println(thread.getName()+" myUnLock");
       atomicReference.compareAndSet(thread, null);
    }

    public static void main(String[] args) throws InterruptedException {
        //底层是使用自旋锁
        SpinLockTest loc=new SpinLockTest();
        new Thread(()->{
            loc.myLock();
            try {
                TimeUnit.SECONDS.sleep(1);
            }catch (Exception e) {
                System.out.println(e.getMessage());
            }finally {
                loc.myUnLock();
            }
        },"T1").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(()->{
            loc.myLock();
            try {
                TimeUnit.SECONDS.sleep(1);
            }catch (Exception e) {
                System.out.println(e.getMessage());
            }finally {
                loc.myUnLock();
            }
        },"T2").start();


    }
}

死锁

所谓死锁就是互相争夺资源

四要素(产生条件)

  1. 互斥使用,即当资源被一个线程使用(占有)时,别的线程不能使用

  2. 不可抢占,资源请求者不能强制从资源占有者手中夺取资源,资源只能由资源占用者

    主动释放

  3. 请求和保持,即当资源的请求者在请求其他的资源的同时保持对原有资源的占有

  4. 循环等待,即存在一个等待队列: P1占有P2的资源,P2占有P3的资源,P3占有P1的资源。

    这样就形成了一个等待环路。

public class DeadLockTest {
    public static void main(String[] args) {
        String LockA="A资源";
        String LockB="B资源";
        MyThread myThread1=new MyThread(LockA,LockB);
        MyThread myThread2=new MyThread(LockB,LockA);
        new Thread(myThread1,"T1").start();
        new Thread(myThread2,"T2").start();

    }
}

class  MyThread implements Runnable {
    /**定义两个共享变量*/
    String LockA;
    String LockB;
    /**定义构造方法初始化LockA和LockB*/
    public MyThread(String LockA, String LockB){
        this.LockA = LockA;
        this.LockB = LockB;
    }

    @Override
    public void run() {
        /**在Run方法中分别锁住LockA和LockB对象,嵌套使用*/
        synchronized (LockA){
            System.out.println(Thread.currentThread().getName() + " 进来了+LockA");
            System.out.println(Thread.currentThread().getName() + "已经获取了"+LockA + "正在尝试获取"+LockB);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (LockB){
                System.out.println(Thread.currentThread().getName() + " 进来了+LockB");
                System.out.println(Thread.currentThread().getName() + "已经获取了"+LockB + "正在尝试获取"+LockA);

            }
        }

    }
}

排查解决问题:

  1. 使用jps-l查看进程号
  2. 使用jstack进程号找到死锁问题




您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存