[多线程]Java多线程04_多个线程协作以及线程私有数据

一、线程间的通讯

前面说完了线程的工作、线程的安全等等,那么多个线程的协同工作,线程之间的通讯就是接下来的话题了。

线程之间的等待通知,简而言之就是一个线程做完了自己该做的一件事情,然后通知另外一个线程继续需要的业务操作,实现一个任务由多个线程异步进行从而提升软件性能。

$seq
线程A->线程A: 执行任务
Note right of 线程B: 等待线程A执行完成通知
线程A->线程B: 执行完成,通知线程B启动
Note right of 线程B: 继续执行任务
$

(一)什么时候用到

线程之间的通知需要应用在有对象级别锁的时候,比如前几个说的synchronized方法或者代码块,从而使用锁来进行通知(这时候,Objectwait()notify()以及notifyAll()方法就派上用场了)。

在锁对象调用wait()的时候,它会释放当前占有的锁,线程进入阻塞状态,等待其他线程调用该对象所的notify方法才继续占领锁运行,如果没有调用notify方法则这些线程会一直阻塞下去。如果有多个线程进入等待,则由线程规划器挑选出阻塞状态的线程继续运行。

二、线程通讯实战

(一)简单例子

创建一个线程,执行的时候进入了等待状态,而主线程在3秒以后唤醒他让他继续执行。


/** * 一个进入等待的线程 * @author liweidan */ public class WaitThread implements Runnable { private Object aLock; public WaitThread(Object aLock) { this.aLock = aLock; } @Override public void run() { try { synchronized (aLock) { System.out.println("线程开始进入等待"); aLock.wait(); System.out.println("线程被唤醒,结束了运行"); } } catch (InterruptedException e) { e.printStackTrace(); } } } public class ThreadStart { public static void main(String[] args) { startWaitDemo(); } public static void startWaitDemo() { try { /** 创建一个公共锁 */ Object aLock = new Object(); /** 通过已有的锁创建线程 */ WaitThread waitThread = new WaitThread(aLock); /** 启动线程 */ new Thread(waitThread).start(); /** aLock需要在另外一个线程的监视器中执行 */ synchronized (aLock) { /** 主线程休息3秒 */ Thread.sleep(3000L); /** 唤醒阻塞的线程 */ aLock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } } } // 结果: 线程开始进入等待 线程被唤醒,结束了运行

注意:
1. aLock一定需要是监视器状态的时候,也就是在synchronized代码块中调用;
2. wait()调用以后,线程会立即释放锁。

(二)wait()立即释放锁而notify()不立即释放

  1. wait()执行完成以后,可以看到线程直接在执行处停了下来并且释放锁。
  2. notify()执行完以后只是通知正在阻塞的线程说可以继续执行下去,但是如果notify()后面还有其他业务代码,则本线程会把后面的业务代码执行完成才算是把锁释放,这时候其他阻塞线程才会开始执行。

1. Thread生命周期

虽然说用时序图来表示不太符合常规,但是因为其箭头的清晰可见,所以我还是用时序图来表示。

线程一共有四个状态:新的线程、运行的线程、暂停的线程以及销毁的线程

$seq
新的线程->运行的线程: start()
新的线程->销毁的线程: stop()
运行的线程->暂停的线程: suspend() / sleep() / wait()
暂停的线程->运行的线程: resume()
运行的线程->运行的线程: yield()
运行的线程->销毁的线程: stop()或者run()结束
$

(三)几个需要注意的地方

  1. 当一个线程正处于wait状态的时候,调用interrupt()方法会抛出InterruptedException
  2. 方法wait(long)作用是:进入wait多少秒以后没有其他线程唤醒则该线程自动唤醒
  3. 通知过早:线程监听器在调用wait()之前就已经由其他线程调用notify(),则该通知无效,线程将一直等待下去
  4. wait()执行的条件发生变化的时候,需要注意条件的设置,以免产生错乱。

(四)生产者与消费者

生产者与消费者模式是最经典的案例了,意思就是生产者生产完数据以后要通知消费者来消费,但是如果生产者或者消费者是多个的情况下,可能出现以下几种情况:

生产者或者消费者使用notify()唤醒的可能是同类(生产者唤醒生产者,消费者唤醒消费者),那么在判断资源的时候可能出现都是空,导致所有线程都在等待,这时候程序就进行不下去了。可以使用notifyAll()来解决。

因为有几种情况:一对一,一对多,多对一。这里就只示范多对多情况,因为解决了这个最复杂的情况,其他貌似都不会太难了。

/**
 * 生产者线程
 * @author liweidan
 * @date 2017.12.18 下午4:28
 * @email toweidan@126.com
 */
public class ProducerThread implements Runnable {
    /** 资源 */
    private List<String> library;
    public ProducerThread(List<String> library) {
        this.library = library;
    }
    @Override
    public void run() {
        synchronized (library) {
            while (true) {
                if (library.isEmpty()) {
                    for (int i = 0; i < 5; i++) {
                        System.out.println("生产者生产: " + "library" + i);
                        library.add("library" + i);
                    }
                    /** 生产完成,通知全部消费者以及生产者 */
                    System.out.println("完成生产,通知全部");
                    library.notifyAll();
                } else {
                    /** 如果资源不为空则进入等待 */
                    try {
                        System.out.println("资源不为空,生产者进入等待");
                        library.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

/**
 * 消费者线程
 * @author liweidan
 * @date 2017.12.18 下午4:28
 * @email toweidan@126.com
 */
public class ConsumerThread implements Runnable {
    /** 资源 */
    private List<String> library;
    public ConsumerThread(List<String> library) {
        this.library = library;
    }
    @Override
    public void run() {
        synchronized (library) {
            while (true) {
                if (!library.isEmpty()) {
                    /** 移除最后一个 */
                    System.out.println("消费者消费: " + library.get(library.size() - 1));
                    library.remove(library.size() - 1);
                    /** 消费完成,通知全部消费者以及生产者 */
                    library.notifyAll();
                } else {
                    /** 如果资源不为空则进入等待 */
                    System.out.println("消费者:资源为空进入等待");
                    try {
                        library.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

public static void startProdAndCous() {
    List<String> lib = new ArrayList<>();
    /** 5个生产者、5个消费者 */
    for (int i = 0; i < 5; i++) {
        new Thread(new ConsumerThread(lib)).start();
        new Thread(new ProducerThread(lib)).start();
    }
}

// 结果:
......
资源不为空,生产者进入等待
消费者消费: library4
消费者消费: library3
消费者消费: library2
消费者消费: library1
消费者消费: library0
消费者:资源为空进入等待
生产者生产: library0
生产者生产: library1
生产者生产: library2
生产者生产: library3
生产者生产: library4
完成生产,通知全部
资源不为空,生产者进入等待
资源不为空,生产者进入等待
消费者消费: library4
消费者消费: library3
消费者消费: library2
消费者消费: library1
消费者消费: library0
消费者:资源为空进入等待
消费者:资源为空进入等待
......

可以看到,结果已经十分明显,可以实现资源的生产与消费。

这里还有个问题,就是唤醒的时候没有分清楚唤醒谁,在接下来的Lock可以实现。

三、线程之间数据的数据通讯

jdk提供了两对类,分别用于传输数据流以及字节流,对应IO流来学习即可,分别是:
1)PipedInputStream和PipedOutputStream
2)PipedReader和PipedWriter

/**
 * 读的线程
 * @author liweidan
 * @date 2017.12.18 下午5:08
 * @email toweidan@126.com
 */
public class ReadThread implements Runnable {
    private PipedInputStream inputStream;
    public ReadThread(PipedInputStream inputStream) {
        this.inputStream = inputStream;
    }
    @Override
    public void run() {
        try {
            while (true) {
                byte[] arr = new byte[1024];
                int len = inputStream.read(arr);
                while (len != -1) {
                    System.out.println("收到数据,开始读取数据: ");
                    String str = new String(arr, 0, len);
                    System.out.println(str);
                    len = inputStream.read(arr);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 写的线程
 * @author liweidan
 * @date 2017.12.18 下午6:00
 * @email toweidan@126.com
 */
public class WriteThread implements Runnable {
    private PipedOutputStream outputStream;
    public WriteThread(PipedOutputStream outputStream) {
        this.outputStream = outputStream;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Scanner scanner = new Scanner(System.in);
                String str = scanner.next();
                outputStream.write(str.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 
 * @author liweidan
 * @date 2017.12.18 下午6:05
 * @email toweidan@126.com
 */
public class ThreadStart {
    public static void main(String[] args) {
        try {
            PipedOutputStream outputStream = new PipedOutputStream();
            PipedInputStream inputStream = new PipedInputStream();
            inputStream.connect(outputStream);
            new Thread(new ReadThread(inputStream)).start();
            new Thread(new WriteThread(outputStream)).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

// 结果: 
dsasd
收到数据,开始读取数据: 
dsasd
HelloWorld
收到数据,开始读取数据: 
HelloWorld

两个重点:
1. 当inputStreamwhile里面已经”枯竭”的时候,就会继续等待,等有数据进来的时候,再把它打印出来。
2. 通过inputStream.connect(outputStream);这个方法连接两个流

PS:貌似开发中比较少用这个?

PipedReader和PipedWriter使用与字节流相似。

四、等待线程运行——join使用

作用:创建线程后,调用线程的join方法。那么调用的线程会停下来(或指定停下来多长时间),等待线程执行完成输出结果,调用线程才继续运行。

代码所示主线程创建了A线程后,等待A线程执行完成再继续执行主线程。

/**
 * Join使用
 * @author liweidan
 * @date 2017.12.19 上午9:54
 * @email toweidan@126.com
 */
public class JoinDemo implements Runnable {
    @Override
    public void run() {
        try {
            /** 随机休眠时间 */
            double v = Math.random() * 3;
            Thread.sleep((long) v);
            /** 输出结果 */
            System.out.println("wait in " + v + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

private static void startJoinDemo() {
    try {
        Thread aThread = new Thread(new JoinDemo());
        aThread.start();
        aThread.join();
        System.out.println("Main end in " + System.currentTimeMillis());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

// 结果: 
wait in 1.9918097432208672ms
Main end in 1513648665010  // 可以看到Main end 总是在线程执行完成后的输出

当然也会有坑:当我们通过join指定等待时间的时候,有可能多个线程之间互相抢夺资源但是主线程抢到了发现时间已经足够了就提前给执行后面的逻辑。

在这个例子中,我把join线程修改为睡眠2秒,然后再启动其他线程,多个线程进行抢夺资源。

/**
 * Join使用
 * @author liweidan
 * @date 2017.12.19 上午9:54
 * @email toweidan@126.com
 */
public class JoinDemo implements Runnable {
    @Override
    public void run() {
        try {
            /** 随机休眠时间 */
            double v = Math.random() * 3;
            /** 指定休息2秒 */
            Thread.sleep(2000);
            /** 输出结果 */
            System.out.println("wait in " + v + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

private static void startJoinDemo02() {
    try {
        Thread aThread = new Thread(new JoinDemo());
        Thread otherThread = new Thread(() -> {
            try {
                Thread.sleep(2000L);
                System.out.println("other end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        aThread.start();
        otherThread.start();
        aThread.join(1000L);
        System.out.println("Main end in " + System.currentTimeMillis());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

// 结果: 
Main end in 1513649551229 // main提前结束了。
other end
wait in 0.8592433100474021ms

运行图解:

$seq
主线程->aThread: 创建A线程
主线程->otherThread: 创建其他线程
主线程->aThread: 启动A线程
主线程->otherThread: 启动其他线程
主线程->aThread: 等待1秒
Note right of aThread: aThread等待2秒
Note right of 主线程: 主线程发现已经停够1秒,打印
$

五、线程私有变量存储: ThreadLocal和InheritableThreadLocal

我们可以这么理解,每个线程执行的空间是相互隔离,当有需要在线程内部进行数据传递的时候可以通过ThreadLocal以及InheritableThreadLocal(可以获取父线程的值)来做数据,相当于一个线程之间的小数据库,用于存储相应的数据的。像我们项目就是在每次请求进来设置当前线程的值是用户类,然后在后面的各个分层如果需要拿到用户信息,即可快速获取。

举个例子:创建两个线程,分别调用同一个方法,这个方法从ThreadLocal获取值,证明每个线程拿到的值是当前线程设置的。

/**
 * 线程私有变量
 * @author liweidan
 * @date 2017.12.19 上午10:47
 * @email toweidan@126.com
 */
public class ThreadHolder {
    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
    public static String set(String value) {
        threadLocal.set(value);
        return value;
    }
    public static String get() {
        return threadLocal.get();
    }
}

/**
 * 用于给线程调用,证明不同线程输出来的值是不一样的
 * @author liweidan
 * @date 2017.12.19 上午10:52
 * @email toweidan@126.com
 */
public class ServicceA {
    public void test() {
        String v = ThreadHolder.get();
        System.out.println("ServiceA: " + v + ", Current: " + Thread.currentThread().getName());
    }
}

/**
 * A线程
 * @author liweidan
 * @date 2017.12.19 上午10:45
 * @email toweidan@126.com
 */
public class ThreadA implements Runnable {
    private ServicceA servicceA;
    public ThreadA(ServicceA servicceA) {
        this.servicceA = servicceA;
    }
    @Override
    public void run() {
        ThreadHolder.set("AThread value");
        System.out.println(ThreadHolder.get());
        new ServicceA().test();
    }
}

/**
 * B线程
 * @author liweidan
 * @date 2017.12.19 上午10:46
 * @email toweidan@126.com
 */
public class ThreadB implements Runnable {
    private ServicceA servicceA;
    public ThreadB(ServicceA servicceA) {
        this.servicceA = servicceA;
    }
    @Override
    public void run() {
        ThreadHolder.set("BThread value");
        System.out.println(ThreadHolder.get());
        new ServicceA().test();
    }
}

// 结果
AThread value
ServiceA: AThread value, Current: Thread-0
BThread value
ServiceA: BThread value, Current: Thread-1

通过开启两个不同的线程,但是注入相同的一个对象,在对象里面获得的值都是各自线程设置的,说明这个类是可以隔离各个线程之间的值,用于存放需要经常获取的值进行计算,以便于在同一个线程当中可以随处使用。

InheritableThreadLocal则可以在线程中创建子线程然后获取到对应的父线程的值。但是需要注意的是当子线程已经获取到值的时候,父线程对值进行修改,子线程并不会响应后面的修改,所以在项目中,最好是一旦设置了这个值就不要对其作出修改,以免业务发生错误。

六、总结

  1. 线程之间通过wait()以及notify()相互通知执行,需要注意唤醒的同类以免造成软件相互等待1
  2. 线程之间通过PipedStream进行数据通讯
  3. ThreadLocal的使用
点赞