day 04 Java并发多线程

http://www.cnblogs.com/hellocsl/p/3969768.html?utm_source=tuicool&utm_medium=referral
PS:而JVM 每遇到一个线程,就为其分配一个Program Counter Register(程序计数器)VM Stack(虚拟机栈)和Native Method Stack (本地方法栈)
引用别人的博客,关于Java内存管理,博客很好

并发编程的挑战

day 04 Java并发多线程

day 04 Java并发多线程

PS: 轻量级volatile

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

---------------------------------------------------

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

PS: 上述   看清是四种   状态。   目的是为了  减小获得和释放锁的性能消耗

day 04 Java并发多线程

PS: 出现那么多的锁就是为了减少 获得锁和释放锁的 性能消耗; 而且锁只能升级不能降级

day 04 Java并发多线程

day 04 Java并发多线程

/**
* Alipay.com Inc.
* Copyright (c) 2004-2015 All Rights Reserved.
*/
package chapter02; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; /**
* 计数器
*
* @author tengfei.fangtf
* @version $Id: Snippet.java, v 0.1 2015-7-31 下午11:32:42 tengfei.fangtf Exp $
*/
public class Counter { private AtomicInteger atomicI = new AtomicInteger(0);
private int i = 0; public static void main(String[] args) {
final Counter cas = new Counter();
List<Thread> ts = new ArrayList<Thread>(600);
long start = System.currentTimeMillis();
for (int j = 0; j < 100; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
cas.count();
cas.safeCount();
}
}
});
ts.add(t);
}
for (Thread t : ts) {
t.start(); }
// 等待所有线程执行完成
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
} }
System.out.println(cas.i);
System.out.println(cas.atomicI.get()); //获取值
System.out.println(System.currentTimeMillis() - start);
} /**
* 使用CAS实现线程安全计数器
*/
private void safeCount() {
for (;;) {
int i = atomicI.get();
boolean suc = atomicI.compareAndSet(i, ++i);
if (suc) {
break;
}
}
} /**
* 非线程安全计数器
*/
private void count() {
i++;//一个cpu加了,但是另一个不一定加
} } PS: 原子性的那个操作一直都不会变

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

PS: main方法天生就是一个多线程

package chapter04;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean; /**
* 6-1
*/
public class MultiThread { public static void main(String[] args) {
// 获取Java线程管理MXBean
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
// 不需要获取同步的monitor和synchronizer信息,仅仅获取线程和线程堆栈信息
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
// 遍历线程信息,仅打印线程ID和线程名称信息
for (ThreadInfo threadInfo : threadInfos) {
System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());
}
}
}
day 04 Java并发多线程

PS : suspend 、resume 、stop方法

day 04 Java并发多线程

PS: 优雅的结束线程

package chapter04;

import java.util.concurrent.TimeUnit;

/**
* 6-9
*/
public class Shutdown {
public static void main(String[] args) throws Exception {
Runner one = new Runner();
Thread countThread = new Thread(one, "CountThread");
countThread.start();
// 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
TimeUnit.SECONDS.sleep(1);
countThread.interrupt();
Runner two = new Runner();
countThread = new Thread(two, "CountThread");
countThread.start();
// 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
TimeUnit.SECONDS.sleep(1);
two.cancel();
} private static class Runner implements Runnable {
private long i; private volatile boolean on = true; @Override
public void run() {
while (on && !Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("Count i = " + i);
} public void cancel() {
on = false;
}
}
}
day 04 Java并发多线程

PS: 当java虚拟机中存在 Daemon线程的时候,java虚拟机会退出

package chapter04;

/**
* 6-5
*/
public class Daemon { public static void main(String[] args) {
Thread thread = new Thread(new DaemonRunner());
thread.setDaemon(true);
thread.start();
} static class DaemonRunner implements Runnable {
@Override
public void run() {
try {
SleepUtils.second(100);
} finally {
System.out.println("DaemonThread finally run.");
}
}
}
}
///什么也没有执行

day 04 Java并发多线程

day 04 Java并发多线程

Java并发编程:深入剖析ThreadLocal

-------------------------------------------------------------------------------------------------------------------------

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

PS:进程通俗的讲就是一个应用程序,  太会在  内存中分配独立的运行空间
线程:它是位于进程中,负责当前进程中的某个具备独立运行资格的空间。

.1.1.    synchronized

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

package cn.itcast_01_mythread.thread.testThread;

public class MySynchronized {
public static void main(String[] args) {
final MySynchronized mySynchronized = new MySynchronized();//这才是公用的锁
final MySynchronized mySynchronized2 = new MySynchronized();
new Thread("thread1") {
public void run() {
synchronized (mySynchronized) {
try {
System.out.println(this.getName()+" start");
int i =1/0; //如果发生异常,jvm会将锁释放
Thread.sleep(5000);
System.out.println(this.getName()+"醒了");
System.out.println(this.getName()+" end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("thread2") {
public void run() {
synchronized (mySynchronized) { //争抢同一把锁时,线程1没释放之前,线程2只能等待
// synchronized (mySynchronized2) { //如果不是一把锁,可以看到两句话同时打印
System.out.println(this.getName()+" start");
System.out.println(this.getName()+" end"); }
}
}.start();
}
}
  • synchronized的缺陷

    synchronized是java中的一个关键字,也就是说是Java语言内置的特性。


如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:


     1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;


2)线程执行发生异常(他挂了),此时JVM会让线程自动释放锁。

------------------------------------------------------------
PS:也就是 释放锁有两种方式 自己释放;自己挂了。

day 04 Java并发多线程

---------------------------------------------------------------------------------------

PS :因为Synchronoied使用起来不方便,java5以后出现了Lock

day 04 Java并发多线程

PS : CountDownLatch、ReentrantLock和ReentrantReadWriteLock都是同步组件

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

package chapter05;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; /**
* 10-20
*/
public class ConditionUseCase {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
} public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}

day 04 Java并发多线程

day 04 Java并发多线程

Answer: 1.因为HashMap会照成环形数据结构,一直有next,然后就死锁了

2.HashTable使用synchronied并发效率非常低下

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

PS: Synchronizd

day 04 Java并发多线程

day 04 Java并发多线程

PS :ReentrantLock是唯一实现Lock的接口

day 04 Java并发多线程

PS:lock的用法

day 04 Java并发多线程
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; public class MyLockTest {
private static ArrayList<Integer> arrayList = new ArrayList<Integer>();
static Lock lock = new ReentrantLock(); // 注意这个地方,因为Lock是一个借口,通常ReentrantLock进行实现
public static <E> void main(String[] args) {
new Thread() {
public void run() {
Thread thread = Thread.currentThread(); lock.lock();//获取锁
try {
System.out.println(thread.getName() + "得到了锁");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println(thread.getName() + "释放了锁");
lock.unlock();//不释放就死锁了
} };
}.start(); new Thread() {
public void run() {
Thread thread = Thread.currentThread();
lock.lock();
try {
System.out.println(thread.getName() + "得到了锁");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println(thread.getName() + "释放了锁");
lock.unlock();
} };
}.start();
} }
PS:每次需要手动关闭锁

tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。

day 04 Java并发多线程PS:他会尝试获取锁

day 04 Java并发多线程

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; /**
* 观察现象:一个线程获得锁后,另一个线程取不到锁,不会一直等待
* @author
*
*/
public class MyTryLock { private static ArrayList<Integer> arrayList = new ArrayList<Integer>();
static Lock lock = new ReentrantLock(); // 注意这个地方
public static void main(String[] args) { new Thread() {
public void run() {
Thread thread = Thread.currentThread();
boolean tryLock = lock.tryLock();
System.out.println(thread.getName()+" "+tryLock);
if (tryLock) {
try {
System.out.println(thread.getName() + "得到了锁");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println(thread.getName() + "释放了锁");
lock.unlock();
}
}
};
}.start(); new Thread() {
public void run() {
Thread thread = Thread.currentThread();
boolean tryLock = lock.tryLock();
System.out.println(thread.getName()+" "+tryLock);
if (tryLock) {
try {
System.out.println(thread.getName() + "得到了锁");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println(thread.getName() + "释放了锁");
lock.unlock();
}
} };
}.start();
} }
PS:一旦某个线程tryLock false以后, 其他线程就获取不了 了
day 04 Java并发多线程

当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程(并非中断A的操作)。

  注意,当一个线程获取了锁之后,是不会被interrupt()方法中断的。

  因此当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。


package cn.itcast_01_mythread.thread.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 观察现象:如果thread-0得到了锁,阻塞。。。thread-1尝试获取锁,如果拿不到,则可以被中断等待
* @author
*
*/
public class MyInterruptibly {
private Lock lock = new ReentrantLock(); public static void main(String[] args) {
MyInterruptibly test = new MyInterruptibly();
MyThread thread0 = new MyThread(test);
MyThread thread1 = new MyThread(test);
thread0.start();
thread1.start(); try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread1.interrupt(); //如果线程1 陷入等待, 这可以让他结束等待
System.out.println("=====================");
} public void insert(Thread thread) throws InterruptedException{
lock.lockInterruptibly(); //注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出
try {
System.out.println(thread.getName()+"得到了锁");

//long startTime = System.currentTimeMillis();
//for( ; ;) {
/*if(System.currentTimeMillis() - startTime >= Integer.MAX_VALUE)
break;*/
//插入数据
//}
                 Thread.sleep(5000);

            }
finally {
System.out.println(Thread.currentThread().getName()+"执行finally");
lock.unlock();
System.out.println(thread.getName()+"释放了锁");
}
}
} class MyThread extends Thread {
private MyInterruptibly test = null;
public MyThread(MyInterruptibly test) {
this.test = test;
}
@Override
public void run() { try {
test.insert(Thread.currentThread());
} catch (Exception e) {
System.out.println(Thread.currentThread().getName()+"被中断");
}
} }

PS : ReadWriteLock

day 04 Java并发多线程

package cn.itcast_01_mythread.thread.lock;

/**
* 一个线程又要读又要写,用synchronize来实现的话,读写操作都只能锁住后一个线程一个线程地进行
* @author
*
*/
public class MySynchronizedReadWrite { public static void main(String[] args) {
final MySynchronizedReadWrite test = new MySynchronizedReadWrite(); new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start(); new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start(); } public synchronized void get(Thread thread) {
long start = System.currentTimeMillis();
int i=0;
while(System.currentTimeMillis() - start <= 1) {//执行时间不操过1s
i++;
if(i%4==0){
System.out.println(thread.getName()+"正在进行写操作");
}else {
System.out.println(thread.getName()+"正在进行读操作");
}
}
System.out.println(thread.getName()+"读写操作完毕");
} }

Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行写操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0读写操作完毕
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行写操作
Thread-1读写操作完毕

 
读写锁:读的时候,其他线程还可以操作,写的时候绝对不行。   里面有两个方法
day 04 Java并发多线程
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 使用读写锁,可以实现读写分离锁定,读操作并发进行,写操作锁定单个线程
*
* 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
* 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
PS : 多线程就是可以同时读操作,写操作的时候不可以读,读的时候不可以写
* @author
*
*bee:读的时候多个线程可以同时操作, 写线程不能操作
*/
day 04 Java并发多线程
public class MyReentrantReadWriteLock {
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) {
final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock(); new Thread(){
public void run() {
test.get(Thread.currentThread());
test.write(Thread.currentThread());
};
}.start(); new Thread(){
public void run() {
test.get(Thread.currentThread());
test.write(Thread.currentThread());
};
}.start(); } /**
* 读操作,用读锁来锁定
* @param thread
*/
public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis(); while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行读操作");
}
System.out.println(thread.getName()+"读操作完毕");
} finally {
rwl.readLock().unlock();
}
} /**
* 写操作,用写锁来锁定
* @param thread
*/
public void write(Thread thread) {
rwl.writeLock().lock();;
try {
long start = System.currentTimeMillis(); while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行写操作");
}
System.out.println(thread.getName()+"写操作完毕");
} finally {
rwl.writeLock().unlock();
}
}
}

day 04 Java并发多线程

day 04 Java并发多线程

day 04 Java并发多线程

---------------------------------------------------

1.3 关于volatile的介绍

day 04 Java并发多线程

package cn.itcast_01_mythread.volatiletest;

public class TestVolatile {

	public static volatile int numb = 0;

	public static void main(String[] args) throws Exception {

		for (int i = 0; i < 100; i++) {

			new Thread(new Runnable() {

				@Override
public void run() {
for (int i = 0; i < 1000; i++) {
numb++;
}
}
}).start(); } Thread.sleep(2000);
System.out.println(numb);
} } 
PS: 因为 每当启动一个线程的时候都会创建一个栈内存,他们共享着堆空间的数据。当想要对 堆上某个数据进行操作的时候,
就会复制相应的数据到 自己的栈空间 进行操作, volatile就是为了各个线程间同步数据的问题。
这和synchronized还不太一样,synchronized是完全把数据上锁了。 volatile还用是提高数据保存的位置。

1.4 并发的执行

day 04 Java并发多线程

PS:当抢小米手机的时候,通常会异步解耦。其实就是一个修改库存的过程,成千上万个用户同时访问服务器,服务器这边会使用线程池对线程进行管理,防止创建线程过多,服务器奔溃。
同时,会使用任务消息队列 ,在java中使用 JMS规范的ActiveMQ(为了解决大并发的请求,放入编写好的消息队列中)来解决。比如,有5台手机,有100人发起请求,前五个线程可以获得手机,后面直接提示jj。这样一个解耦
  • PS: 线程是不可以无限增长的,所以用一个线程池进行管理
  • day 04 Java并发多线程
  • day 04 Java并发多线程

    day 04 Java并发多线程

package cn.itcast_01_mythread.pool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry; public class TestPool { public static void main(String[] args) throws Exception {
Future<?> submit = null;
Random random = new Random(); //创建固定数量线程池
ExecutorService exec = Executors.newFixedThreadPool(4); //创建调度线程池
//ScheduledExecutorService exec = Executors.newScheduledThreadPool(4); //用来记录各线程的返回结果
ArrayList<Future<?>> results = new ArrayList<Future<?>>(); for (int i = 0; i < 10; i++) {
//fixedPool提交线程,runnable无返回值,callable有返回值
submit = exec.submit(new TaskRunnable(i));
/*submit = exec.submit(new TaskCallable(i));*/ //对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致
/*submit = exec.submit(new TaskCallable(i));*/
//对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行
//submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS);
//存储线程执行结果
results.add(submit); } //打印结果
for(Future f: results){
boolean done = f.isDone();
System.out.println(done?"已完成":"未完成"); //从结果的打印顺序可以看到,即使未完成,也会阻塞等待
System.out.println("线程返回future结果: " + f.get());
} exec.shutdown(); }
}
pool-1-thread-1 启动时间:1510802909
未完成
pool-1-thread-3 启动时间:1510802909
pool-1-thread-4 启动时间:1510802909
pool-1-thread-2 启动时间:1510802909
pool-1-thread-1 is working...0
线程返回future结果: null
未完成
pool-1-thread-1 启动时间:1510802909
pool-1-thread-3 is working...2
pool-1-thread-3 启动时间:1510802909
pool-1-thread-1 is working...4
pool-1-thread-1 启动时间:1510802909
pool-1-thread-2 is working...1
线程返回future结果: null
pool-1-thread-2 启动时间:1510802910
已完成
线程返回future结果: null
未完成
pool-1-thread-1 is working...6
pool-1-thread-3 is working...5
pool-1-thread-1 启动时间:1510802910
pool-1-thread-3 启动时间:1510802910
pool-1-thread-4 is working...3
线程返回future结果: null
已完成
线程返回future结果: null
已完成
线程返回future结果: null
已完成
线程返回future结果: null
未完成
pool-1-thread-3 is working...9
pool-1-thread-1 is working...8
pool-1-thread-2 is working...7
线程返回future结果: null
已完成
线程返回future结果: null
已完成
线程返回future结果: null

 PS :Runnable和Callanble的区别 

package cn.itcast_01_mythread.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class ThreadPoolWithRunable { /**
* 通过线程池执行线程
* @param args
*/
public static void main(String[] args) {
//创建一个线程池
ExecutorService pool = Executors.newCachedThreadPool();
for(int i = 1; i < 5; i++){
pool.execute(new Runnable() {//提交任务
@Override
public void run() {
System.out.println("thread name: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
pool.shutdown();
} }

提交 Callable,该方法返回一个 Future 实例表示任务的状态

调用submit提交任务, 匿名Callable,重写call方法, 有返回值, 获取返回值会阻塞,一直要等到线程任务返回结果

见代码:ThreadPoolWithcallable

pool-1-thread-1 启动时间:1510803239
pool-1-thread-2 启动时间:1510803239
pool-1-thread-2 is working...1
pool-1-thread-2 启动时间:1510803239
pool-1-thread-3 启动时间:1510803239
未完成
pool-1-thread-4 启动时间:1510803239
pool-1-thread-4 is working...3
pool-1-thread-4 启动时间:1510803239
pool-1-thread-4 is working...5
pool-1-thread-4 启动时间:1510803239
pool-1-thread-1 is working...0
线程返回future结果: 0
已完成
线程返回future结果: 1
未完成
pool-1-thread-1 启动时间:1510803240
pool-1-thread-3 is working...2
线程返回future结果: 2
已完成
线程返回future结果: 3
未完成
pool-1-thread-3 启动时间:1510803240
pool-1-thread-2 is working...4
线程返回future结果: 4
pool-1-thread-2 启动时间:1510803241
已完成
线程返回future结果: 5
未完成
pool-1-thread-1 is working...7
pool-1-thread-4 is working...6
线程返回future结果: 6
已完成
线程返回future结果: 7
未完成
pool-1-thread-3 is working...8
线程返回future结果: 8
未完成
pool-1-thread-2 is working...9
线程返回future结果: 9

  

package cn.itcast_01_mythread.pool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry; public class TestPool { public static void main(String[] args) throws Exception {
Future<?> submit = null;
Random random = new Random(); //创建固定数量线程池
ExecutorService exec = Executors.newFixedThreadPool(4); //创建调度线程池
//ScheduledExecutorService exec = Executors.newScheduledThreadPool(4); //用来记录各线程的返回结果
ArrayList<Future<?>> results = new ArrayList<Future<?>>(); for (int i = 0; i < 10; i++) {
//fixedPool提交线程,runnable无返回值,callable有返回值
//submit = exec.submit(new TaskRunnable(i));
submit = exec.submit(new TaskCallable(i)); //对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致
/*submit = exec.submit(new TaskCallable(i));*/
//对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行
//submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS);
//存储线程执行结果
results.add(submit); } //打印结果
for(Future f: results){
boolean done = f.isDone();
System.out.println(done?"已完成":"未完成"); //从结果的打印顺序可以看到,即使未完成,也会阻塞等待
System.out.println("线程返回future结果: " + f.get());
} exec.shutdown(); }
}
package cn.itcast_01_mythread.pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* callable 跟runnable的区别:
* runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值
*
* callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果
*
*/
public class ThreadPoolWithcallable { public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(4); for(int i = 0; i < 10; i++){
Future<String> submit = pool.submit(new Callable<String>(){//线程是有返回值的
@Override
public String call() throws Exception {
//System.out.println("a");
Thread.sleep(5000);
return "b--"+Thread.currentThread().getName();
}
});
//从Future中get结果,这个方法是会被阻塞的,一直要等到线程任务执行完成 才能返回结果!!!!!
所以一般不要使用,他会阻塞主线程,如果必须想要得到结果在使用
System.out.println(submit.get());,
}
pool.shutdown(); } }
package cn.itcast_01_mythread.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; /**
* 列出并发包中的各种线程池
* @author
*
*/ public class ExecutorDemo { public static void main(String[] args) {
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();//单线程,任务顺序执行
//线程池里有很多线程需要同时执行,老的可用线程将被新的任务触发重新执行,
//如果线程超过60秒内没执行,那么将被终止并从池中删除,
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
int cpuNums = Runtime.getRuntime().availableProcessors();
System.out.println(cpuNums);//CPU的核数
//在构造函数中的参数4是线程池的大小,你可以随意设置,也可以和cpu的核数量保持
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(cpuNums);
//用来调度即将执行的任务的线程池,可能是不是直接执行, 每隔多久执行一次... 策略型的
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(8); //只有一个线程,用来调度任务在指定时间执行
ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
}
}
PS:上面可以看到Runnable和callable分别是线程中的不同的实现方式。Runnable不会返回结果,而callable会返回结果,但是在拿结果的时候会阻塞。
在线程池中通过固定数量的线程进行相同的操作,有不同 的实现 。

.1.     java并发包消息队列及在开源软件中的应用

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。           类似与锁

主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。

插入:

1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常,不好

2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续, 有阻塞, 放不进去就等待

读取:

4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止; 阻塞, 取不到就一直等

其他

int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数 据可能不准, 不能保证数据的准确性

boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改    变了返回true

public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true

int drainTo(Collection<? super E> c); //移除此队列中所有可用的元素,并将它们添加到给定 collection 中。取出放到集合中

int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,指定了移   动的数量; 取出指定个数放到集合

BlockingQueue有四个具体的实现类,常用的两种实现类为:

1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

LinkedBlockingQueue和ArrayBlockingQueue区别:

LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

day 04 Java并发多线程

生产者消费者的示例代码:

见代码

package cn.itcast_02_blockingqueue.main;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import cn.itcast_02_blockingqueue.consumer.Consumer;
import cn.itcast_02_blockingqueue.producer.Producer; public class Test {
public static void main(String[] args) throws Exception {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
// BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
// 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
// BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
Consumer consumer = new Consumer(queue);
Producer producer = new Producer(queue);
for (int i = 0; i < 3; i++) {
new Thread(producer, "Producer" + (i + 1)).start();//线程就是这样启动的
        }
for (int i = 0; i < 5; i++) {
new Thread(consumer, "Consumer" + (i + 1)).start();
} new Thread(producer, "Producer" + (5)).start();
}
}
package cn.itcast_02_blockingqueue.producer;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try { System.out.println("I have made a product:"
+ Thread.currentThread().getName());
String temp = "A Product, 生产线程:"
+ Thread.currentThread().getName();
queue.put(temp);//如果队列是满的话,会阻塞当前线程
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package cn.itcast_02_blockingqueue.consumer;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{
BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try {
String consumer = Thread.currentThread().getName();
System.out.println(consumer);
String temp = queue.take();//如果队列为空,会阻塞当前线程
System.out.println(consumer+"get a product:"+temp);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

4.kafka 和 redis的应用,Storm中用到了很多的Blockquene

day 04 Java并发多线程

PS: 比如,双十一在大屏上显示实时的消费金额,  在上面我们讲到,kafka她类似于消息队列,spout从kafka中那数据,根据业务把数据分发到各个节点blot取出来处理。把每一个任务再细分拓扑,到redis,然后再显示到屏幕上。  
比如你在淘宝上下订单,并不是保存到数据库那么简单的。

day 04 Java并发多线程

day 04 Java并发多线程

上一篇:【BZOJ4553】【TJOI2016】【HEOI2016】序列


下一篇:【环境变量】Linux 下三种方式设置环境变量