JAVA并发编程(一)JAVA线程池的使用

概述

new Thread的弊端如下:
a. 每次new Thread新建对象性能差。
b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
c. 缺乏更多功能,如定时执行、定期执行、线程中断。

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?在Java中可以通过线程池来达到这样的效果

Java 线程池

Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

无论使用哪种方式创建,最终返回的都是ThreadPoolExecutor,而方法返回是ExecutorService,ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService实现了ExecutorService这个接口,
ExecutorService这个接口继承了Executor。ThreadPoolExecutor is-aExecutorService。

ThreadPoolExecutor构造方法

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(

int corePoolSize,//线程池维护线程的最少数量

int maximumPoolSize,//线程池维护线程的最大数量

long keepAliveTime,//线程池维护线程所允许的空闲时间

TimeUnit unit, //线程池维护线程所允许的空闲时间的单位

BlockingQueue<Runnable> workQueue, //线程池所使用的缓冲队列,

                                                                 //主要使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue

ThreadFactory threadFactory,//线程工厂

RejectedExecutionHandler handler// 线程池对拒绝任务的处理策略

)

参数的详细含义:
corePoolSize:核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,是预创建线程的意思,即在没有任务到来之前就创corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

提交任务

我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务是否被线程池执行成功。

threadsPool.execute(new Runnable() {
            @Override
            public void run() {
                // TODO Auto-generated method stub
            }
        });

我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
     Object s = future.get();
} catch (InterruptedException e) {
    // 处理中断异常
} catch (ExecutionException e) {
    // 处理无法执行任务异常
} finally {
    // 关闭线程池
    executor.shutdown();
}

终止线程池

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

如何选择合理的线程池

计算密集型,就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服配置,如果在非常好的配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的应用,完全是靠CPU的核数来工作,所以为了让它的优势完全发挥出来,避免过多的线程上下文切换,比较理想方案是:计算密集型的较理想线程数 = CPU内核线程数+1。
对于IO密集型的应用,读写数据库,读写文件都涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。因此从这里可以发现,对于IO密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。比较合适的方案是:IO密集型应用比较理想线程数= CPU内核线程数*2+1。
Android中,AsyncTask的中源码:

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);


创建的线程池个数介于计算密集型和IO密集型之间,算是比较通用的。

RXjava中,对线程池创建提供了很好的控制:

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行

 

 

Android 事件总线OTTO使用说明和源码解析

一、Otto简单介绍

OTTO是Square推出的库,地址:https://github.com/square/otto

先来看看otto的官方介绍

An enhanced Guava-based event bus with emphasis on Android support.Otto is an event bus designed to decouple different parts of your application while still allowing them to communicate efficiently.Forked from Guava, Otto adds unique functionality to an already refined event bus as well as specializing it to the Android platform.

OTTO基于Guava项目的Android支持库,如果你在Android程序开发的过程中想要不同的组件之间进行有效的通信可以使用这个库。通过otto库可以。

二、Otto简单使用

1、创建一个BUS的单例。

public class AppConfig {
private static final Bus BUS = new Bus();
public static Bus getInstance() {
    return BUS;
}
}

2、在需要使用Otto的类中注册

在类创建好之后,或者需要重新注册的时候注册,一般在Activity的onCreate()或者onPause()方法中

 AppConfig.getBusInstance().register(this);

3、定义订阅方法

@Subscribe
public void onWallpaperUpdate(MyObject obj) {
    //对obj进行需要的逻辑处理
}

4、发送消息

AppConfig.getBusInstance().post(myobj);

5、解绑

注意在类销毁的时候或者暂时不需要再收消息的时候解绑,,一般在Activity的onDestroy()或者onResume()方法中

 AppConfig.getBusInstance().unregister(this);

三、Otto源码解析

1、整体结构

otto的源码结构非常简单,所有类都包含在 com.squareup.otto这一个包中,
不计内部类,只有9个类跟接口,分别是
 AnnotatedHandlerFinder 注解解析器
Bus 总线核心类
DeadEvent 没有接收者的事件
EventHandler 事件订阅者
EventProducer 事件生产者
HandlerFinder 获取接收者生产者
Produce 生产者的注解
Subscribe 订阅者注解
ThreadEnforcer对线程进行校验

2、Bus的关键属性方法分析

(1)构造函数

  public Bus() {
    this(DEFAULT_IDENTIFIER);
  }
  public Bus(String identifier) {
    this(ThreadEnforcer.MAIN, identifier);
  }
  public Bus(ThreadEnforcer enforcer) {
    this(enforcer, DEFAULT_IDENTIFIER);
  }
  public Bus(ThreadEnforcer enforcer, String identifier) {
    this(enforcer, identifier, HandlerFinder.ANNOTATED);
  }
  Bus(ThreadEnforcer enforcer, String identifier, HandlerFinder handlerFinder) {
    this.enforcer =  enforcer;
    this.identifier = identifier;
    this.handlerFinder = handlerFinder;
  }

我们通常使用Bus()这个构造方法,在整个app中创建一个单例,这样不但节省资源,更重要的是保证消息正常到达如果不是单例,用一个Bus进行了注册,而用另外一个Bus发送消息,这样订阅的方法是无法收到消息的。

enforcer是对线程进行校验,有两个取值,一个是ThreadEnforcer.ANY,另一个是ThreadEnforcer.MAIN,默认值是ThreadEnforcer.MAIN,这样只能在主线程进行消息处理。如果在非主线程注册或者发送消息,就会抛出异常
throw new IllegalStateException(“Event bus ” + bus + ” accessed from non-main thread ” + Looper.myLooper());
这点一定要注意:Otto默认构造方法创建的Bus实例只能在主线程调用
如果要在其他线程使用,就使用ThreadEnforcer.ANY,或者自定义ThreadEnforcer。
只在主线程使用,能保证简洁,不混乱。但是我们实际使用中很多时候还是要跨线程通信的。
identifier是一个标识,在Bus的toString()方法中会用到。
HandlerFinder用来解析注册的对象,默认的实现是HandlerFinder.ANNOTATED,使用注解解析

(2)对象注册

public void register(Object object) {
    if (object == null) {
      throw new NullPointerException("Object to register must not be null.");
    }
    enforcer.enforce(this);
    Map<Class<?>, EventProducer> foundProducers = handlerFinder.findAllProducers(object);
    for (Class<?> type : foundProducers.keySet()) {
      final EventProducer producer = foundProducers.get(type);
      EventProducer previousProducer = producersByType.putIfAbsent(type, producer);
      //checking if the previous producer existed
      if (previousProducer != null) {
        throw new IllegalArgumentException("Producer method for type " + type
          + " found on type " + producer.target.getClass()
          + ", but already registered by type " + previousProducer.target.getClass() + ".");
      }
      Set<EventHandler> handlers = handlersByType.get(type);
      if (handlers != null && !handlers.isEmpty()) {
        for (EventHandler handler : handlers) {
          dispatchProducerResultToHandler(handler, producer);
        }
      }
    }
    Map<Class<?>, Set<EventHandler>> foundHandlersMap = handlerFinder.findAllSubscribers(object);
    for (Class<?> type : foundHandlersMap.keySet()) {
      Set<EventHandler> handlers = handlersByType.get(type);
      if (handlers == null) {
        //concurrent put if absent
        Set<EventHandler> handlersCreation = new CopyOnWriteArraySet<EventHandler>();
        handlers = handlersByType.putIfAbsent(type, handlersCreation);
        if (handlers == null) {
            handlers = handlersCreation;
        }
      }
      final Set<EventHandler> foundHandlers = foundHandlersMap.get(type);
      if (!handlers.addAll(foundHandlers)) {
        throw new IllegalArgumentException("Object already registered.");
      }
    }
    for (Map.Entry<Class<?>, Set<EventHandler>> entry : foundHandlersMap.entrySet()) {
      Class<?> type = entry.getKey();
      EventProducer producer = producersByType.get(type);
      if (producer != null && producer.isValid()) {
        Set<EventHandler> foundHandlers = entry.getValue();
        for (EventHandler foundHandler : foundHandlers) {
          if (!producer.isValid()) {
            break;
          }
          if (foundHandler.isValid()) {
            dispatchProducerResultToHandler(foundHandler, producer);
          }
        }
      }
    }
  }

对象注册首先进行了非空校验,然后是线程的校验,对象不可为空,不可多次注册

在注册对象之后,会解析出对象对应的类的生产方法和订阅方法,订阅者解析的结果保存在handlersByType,生产者解析的结果保存在producersByType里,这两个属性定义如下
private final ConcurrentMap<Class<?>, Set<EventHandler>> handlersByType =
          new ConcurrentHashMap<Class<?>, Set<EventHandler>>();
  /** All registered event producers, index by event type. */
  private final ConcurrentMap<Class<?>, EventProducer> producersByType =
          new ConcurrentHashMap<Class<?>, EventProducer>();

handlersByType的key中保存了订阅方法的入参 参数类型,vaue中保存着订阅者具体的对象和对应方法

producersByType的key中保存了生产方法的返回值参数类型,vaue中保存着生产者具体的对象和对应方法
从源码中我们可以发现,在解析订阅者之后,如果有对应的生产者,会自动调用生产方法,并自动调用一次订阅者方法。
所谓有对应生产者,就是有Produce注解的方法返回值参数类型和有Subscribe注解的方法参数相同。
这个过程涉及到以下三个方法
public void register(Object object) 
private void dispatchProducerResultToHandler(EventHandler handler, EventProducer producer)
protected void dispatch(Object event, EventHandler wrapper)

(3)消息发送

 public void post(Object event) {
    if (event == null) {
      throw new NullPointerException("Event to post must not be null.");
    }
    enforcer.enforce(this);

    Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());

    boolean dispatched = false;
    for (Class<?> eventType : dispatchTypes) {
      Set<EventHandler> wrappers = getHandlersForEventType(eventType);

      if (wrappers != null && !wrappers.isEmpty()) {
        dispatched = true;
        for (EventHandler wrapper : wrappers) {
          enqueueEvent(event, wrapper);
        }
      }
    }
    if (!dispatched && !(event instanceof DeadEvent)) {
      post(new DeadEvent(this, event));
    }
    dispatchQueuedEvents();
  }

这里主要涉及两个属性

 private final ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>> eventsToDispatch =
      new ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>>() {
        @Override protected ConcurrentLinkedQueue<EventWithHandler> initialValue() {
          return new ConcurrentLinkedQueue<EventWithHandler>();
        }
      };

  /** True if the current thread is currently dispatching an event. */
  private final ThreadLocal<Boolean> isDispatching = new ThreadLocal<Boolean>() {
    @Override protected Boolean initialValue() {
      return false;
    }
  };

当调用  public void post(Object event)这个方法之后,首先进行线程校验,然后解析出对应的订阅者,如果有订阅者,将event放入队列中, 如果没有,就作为一个DeadEvent,对于DeadEvent注释是这样说的

  * Wraps an event that was posted, but which had no subscribers and thus could not be delivered.
* <p>Subscribing a DeadEvent handler is useful for debugging or logging, as it can detect misconfigurations in a
* system’s event distribution.
已经很明确了,就不再赘述。
Bus在分发消息之后循环从消息队列中取值,这跟android的handler消息机制很像,不过bus中的循环在消息取完之后就结束了。
protected void dispatchQueuedEvents() {
    // don't dispatch if we're already dispatching, that would allow reentrancy and out-of-order events. Instead, leave
    // the events to be dispatched after the in-progress dispatch is complete.
    if (isDispatching.get()) {
      return;
    }
    isDispatching.set(true);
    try {
      while (true) {
        EventWithHandler eventWithHandler = eventsToDispatch.get().poll();
        if (eventWithHandler == null) {
          break;
        }

        if (eventWithHandler.handler.isValid()) {
          dispatch(eventWithHandler.event, eventWithHandler.handler);
        }
      }
    } finally {
      isDispatching.set(false);
    }
  }

消息队列使用ThreadLocal保证了队列的独立性。同时多个线程会创建多个循环,提高了效率。发送的消息很快就就可以分发。

(4)解绑操作

public void unregister(Object object) {
    if (object == null) {
      throw new NullPointerException("Object to unregister must not be null.");
    }
    enforcer.enforce(this);
    Map<Class<?>, EventProducer> producersInListener = handlerFinder.findAllProducers(object);
    for (Map.Entry<Class<?>, EventProducer> entry : producersInListener.entrySet()) {
      final Class<?> key = entry.getKey();
      EventProducer producer = getProducerForEventType(key);
      EventProducer value = entry.getValue();

      if (value == null || !value.equals(producer)) {
        throw new IllegalArgumentException(
            "Missing event producer for an annotated method. Is " + object.getClass()
                + " registered?");
      }
      producersByType.remove(key).invalidate();
    }

跟register类似,首先是对象非空校验,然后是线程校验,然后解绑,注册跟解绑一定要成对,没有注册不可以解绑,解绑之后不可以直接再次解绑。

解绑主要是清理工作,减少不必要的内存,防止内存泄漏。解绑之后就不能再收到绑定对象相关的消息了。

3、AnnotatedHandlerFinder对注解的解析,构建生产者和订阅者

解析出来的信息存放在PRODUCERS_CACHE,SUBSCRIBERS_CACHE中,
  /** Cache event bus producer methods for each class. */
  private static final ConcurrentMap<Class<?>, Map<Class<?>, Method>> PRODUCERS_CACHE =
    new ConcurrentHashMap<Class<?>, Map<Class<?>, Method>>();

  /** Cache event bus subscriber methods for each class. */
  private static final ConcurrentMap<Class<?>, Map<Class<?>, Set<Method>>> SUBSCRIBERS_CACHE =
    new ConcurrentHashMap<Class<?>, Map<Class<?>, Set<Method>>>();

结构相同,key值是解析对象对应的class,value是方法参数类型(生产者是返回数据类型,订阅者是方法参数的数据类型)和对应方法。

解析生产者依次调用:
static Map<Class<?>, EventProducer> findAllProducers(Object listener),
private static void loadAnnotatedProducerMethods(Class<?> listenerClass,Map<Class<?>, Method> producerMethods) ,
private static void loadAnnotatedMethods(Class<?> listenerClass,Map<Class<?>, Method> producerMethods, Map<Class<?>, Set<Method>> subscriberMethods)。
解析订阅者依次调用:
static Map<Class<?>, Set<EventHandler>> findAllSubscribers(Object listener) ,
private static void loadAnnotatedSubscriberMethods(Class<?> listenerClass,Map<Class<?>, Set<Method>> subscriberMethods) ,
private static void loadAnnotatedMethods(Class<?> listenerClassMap<Class<?>, Method> producerMethods, Map<Class<?>, Set<Method>> subscriberMethods)。
最终都是调用loadAnnotatedMethods
在解析过程中,违反一些规则会抛出异常
@Subscribe注解的方法只能有一个参数
@Subscribe注解的方法必须有具体实现,不能是个接口
@Subscribe注解的方法必须是public的
@Produce注解的方法入参必须是空的
@Produce注解的方法返回值类型不能是void
@Produce注解的方法必须是具体实现,不能是个接口
@Produce注解的方法必须是public的
@Produce注解的方法相同返回值类型的方法只能有一个

四、总结

OTTO是非常轻量级的,多数实现依赖注解反射,使用过程中如果要对代码进行混淆要特别注意。
android事件总线处理还有个很好的开源框架是EventBus,EventBus稍微重量级一些,复杂一些,对应的功能更多,对线程控制更加灵活。对EventBus的详细介绍可以参照http://blog.csdn.net/robertcpp/article/details/51546714。
用RXJAVA也可以实现事件总线,以后在做详细说明。无论用哪个框架,归根到底都是一种观察者模式。可以根据项目需要选择合适的框架。当然,通过原生的广播,Handler机制,肯定也能实现。
 
 

 

Picasso的使用和源码解析

一、基本介绍

picasso是Square公司开源的一个Android图片下载缓存库,github地址https://github.com/square/picasso,可以实现图片下载和缓存功能。

Picassso的特点有:
自动将图像缓存在本地,自带内存和硬盘二级缓存功能

通过图片压缩转换以减少内存消耗
自动处理了ImageView的回收,自动取消不在视野范围内的ImageView视图资源的加载支持网络图片,drawable资源,asset资源,本地图片等多种资源加载支持调试,调用函数 Picasso.setIndicatorsEnabled(true) 可以在加载的图片左上角显示一个三形,不同的颜色代表不同的加载来源。
二、使用方法
1、gradle 配置

compile 'com.squareup.picasso:picasso:2.5.2'

2、普通图片加载方式

只需要一行代码就能完全实现图片的异步加载:

Picasso.with(context).load("http://img.my.csdn.net/uploads/201605/08/1462674108_9582.jpg").into(imageView);

3、ADAPTER 中加载图片:Adapter的重用会被自动检测到,Picasso会取消上次的加载

@Override 
public void getView(int position, View convertView, ViewGroup parent) {
  Imageview view = (ImageView) convertView;
  if (view == null) {
    view = new ImageView(context);
  }
  String url = getItem(position);
  Picasso.with(context).load(url).into(view);
}

4、图片转换:转换图片以适应布局大小并减少内存占用

 Picasso.with(context)
  .load(url)
  .resize(50, 50)
  .centerCrop()
  .into(imageView);

5、Place holders-空白或者错误占位图片:

picasso提供了两种占位图片,未加载完成或者加载发生错误的时需要一张图片作为提示。

Picasso.with(context)
  .load(url)
  .placeholder(R.drawable.placeholder)
  .error(R.drawable.placeholder_error)
.into(imageView);

6、多种资源文件的加载:

除了加载网络图片picasso还支持加载Resources, assets, files, content providers中的资源文件。

Picasso.with(context).load(R.drawable.landing_screen).into(imageview);
Picasso.with(context).load(new File(...)).into(imageview);
Picasso.with(context).load("file:///android_asset/robert.png").into(imageview);

三、源码解析

1、构建方式

Picasso使用Builder创建对象,我们一般使用public static Picasso with(Context context)方法

public static Picasso with(Context context) {
     if (singleton == null) {
         synchronized (Picasso.class) {
             if (singleton == null) {
                 singleton = new Builder(context).build();
             }
         }
     }
     return singleton;
 }

Picasso的with方法返回Picasso的单例,但是有Builderg构造器,Picasso不是严格意义上的单例模式。
多次build还是可以创建多个实例的。

使用build构建可以自定义线程池、缓存、下载器等方法。

2、资源加载方式

实现RequestHandler接口就可以定义资源加载方式,默认有7种

allRequestHandlers.add(new ResourceRequestHandler(context));//drawable资源图
allRequestHandlers.add(new ContactsPhotoRequestHandler(context));//通讯录图片
allRequestHandlers.add(new MediaStoreRequestHandler(context));//多么媒体资源库图片
allRequestHandlers.add(new ContentStreamRequestHandler(context));//Provider图片
allRequestHandlers.add(new AssetRequestHandler(context));//asset中的图片
allRequestHandlers.add(new FileRequestHandler(context));//存储设备中的图片
allRequestHandlers.add(new NetworkRequestHandler(dispatcher.downloader, stats));//网络图片

可以通过extraRequestHandlers添加新的支持方法,定义新的RequestHandler,需要实现两个方法

public abstract boolean canHandleRequest(Request data);
public abstract Result load(Request request, int networkPolicy) throws IOException;

canHandleRequest定义了在什么情况下用这种方式
load定义了加载具体加载方式
具体看一下NetworkRequestHandler的具体实现,相关解析直接加注释

class NetworkRequestHandler extends RequestHandler {
  static final int RETRY_COUNT = 2;//重试次数

  private static final String SCHEME_HTTP = "http";//识别的scheme
  private static final String SCHEME_HTTPS = "https";//识别的scheme

  private final Downloader downloader;//下载器
  private final Stats stats;//状态
 /**
  * 构造方法
  */
  public NetworkRequestHandler(Downloader downloader, Stats stats) {
    this.downloader = downloader;
    this.stats = stats;
  }

  @Override public boolean canHandleRequest(Request data) {
    String scheme = data.uri.getScheme();
    return (SCHEME_HTTP.equals(scheme) || SCHEME_HTTPS.equals(scheme));
  }

  @Override public Result load(Request request, int networkPolicy) throws IOException {
    Response response = downloader.load(request.uri, request.networkPolicy);
    if (response == null) {
      return null;
    }
    Picasso.LoadedFrom loadedFrom = response.cached ? DISK : NETWORK;
    Bitmap bitmap = response.getBitmap();
    if (bitmap != null) {
      return new Result(bitmap, loadedFrom);
    }

    InputStream is = response.getInputStream();
    if (is == null) {
      return null;
    }
    // Sometimes response content length is zero when requests are being replayed. Haven't found
    // root cause to this but retrying the request seems safe to do so.
    if (loadedFrom == DISK && response.getContentLength() == 0) {
      Utils.closeQuietly(is);
      throw new ContentLengthException("Received response with 0 content-length header.");
    }
    if (loadedFrom == NETWORK && response.getContentLength() > 0) {
      stats.dispatchDownloadFinished(response.getContentLength());
    }
    return new Result(is, loadedFrom);
  }

  @Override int getRetryCount() {
    return RETRY_COUNT;
  }

  @Override boolean shouldRetry(boolean airplaneMode, NetworkInfo info) {
    return info == null || info.isConnected();
  }

  @Override boolean supportsReplay() {
    return true;
  }
  static class ContentLengthException extends IOException {
    public ContentLengthException(String message) {
      super(message);
    }
  }
}

3、请求创建器RequestCreator

Picasso的load方法返回RequestCreator,RequestCreator有两个功能
配置加载参数。
包括placeHolder与error图片,加载图片的大小、旋转、居中等属性。
执行加载。
通过调用into(object)方法进行加载。

 public RequestCreator load(String path) {
        if (path == null) {
            return new RequestCreator(this, null, 0);
        }
        if (path.trim().length() == 0) {
            throw new IllegalArgumentException("Path must not be empty.");
        }
        return load(Uri.parse(path));
    }

into方法只能在主线程调用,否则会抛出异常。如果没有要加载的资源,请求会被取消,提高了执行效率

static void checkMain() {
  if (!isMain()) {
    throw new IllegalStateException("Method call should happen from the main thread.");
      }
}

取消操作是Picasso的方法,也只能在主线程调用

/**
 * Cancel any existing requests for the specified target {@link ImageView}.
 */
public void cancelRequest(ImageView view) {
    // checkMain() is called from cancelExistingRequest()
    if (view == null) {
        throw new IllegalArgumentException("view cannot be null.");
    }
    cancelExistingRequest(view);
}

如果target不为空,执行线程正常,就可以正常构建request,创建requestkey,requestkey使用简单的属性拼接方法
如果设置了内存缓存,那么从内存缓存中读取图片,图片不为空,直接设置图片。
如果没有设置缓存,或者从缓存中读取到的图片为空,那么创建图片获取任务,

4、执行任务Action

Action是一个抽象类,可以理解为一个加载任务,
FetchAction 缓存图片
RemoteViewsAction 给RemoteView设置图片
GetAction 同步获取图片
ImageViewAction 给ImageView设置图片

TargetAction 对Target设置图片

调用RequestCreator对应方法会对应创建所需要的Action

public void into(Target target)
public void fetch() 
public Bitmap get() throws IOException
public void into(ImageView target)
public void into(RemoteViews remoteViews, int viewId, int notificationId,Notification notification)

需要特别注意的是 public Bitmap get() throws IOException 是同步方法,其他几个是异步方法

异步方法将Action提交到Dispatcher,同步方法直接使用BitmapHunter获取图片

获取完图片,处理图片的方法是

  abstract void complete(Bitmap result, Picasso.LoadedFrom from);

不同子类对应不同实现

5、事件分发器

默认的Dispatcher创建方法

Dispatcher dispatcher = new Dispatcher(context, service, HANDLER, downloader, cache, stats);

context参数就不用多说了,service是Pecasso的异步线程池,HANDLER用来向主线程抛消息,downloader是下载器

cache是图片缓存,stats是执行状态

Dispatcher是分发器,由Picasso或Hunter来调用。Dispatcher主要方法有
dispatcherSubmit()和dispatcherCancel()
hunter中加入action便调用dispatcherSubmit(),hunter中取消action便调用dispatcherCancel()
dispatcherComplete()和dispatcherError()
加载结束时调用。均调用batch方法,不过complete操作会将bitmap加入到cache中,以便后续调用。
batch()
起缓冲作用,每隔200毫秒执行一次performBatchComplete()批处理。批处理将hunterList回调给Picasso,Picasso对每个hunter的每个action进行结果回调。

Dispatcher启动了自己的线程dispatcherThread。DispatcherHandler运行在DispatcherThread中。

RequestCreator中调用了Picasso的submit方法,将acton提交到Dispatcher,
DispatcherHandler发送了REQUEST_SUBMIT这个消息,然后在DispatcherHandler所在线程中执行了performSubmit,
在performSubmit中创建BitmapHunter,进行下载。

  void performSubmit(Action action, boolean dismissFailed) {
    if (pausedTags.contains(action.getTag())) {
      pausedActions.put(action.getTarget(), action);
      if (action.getPicasso().loggingEnabled) {
        log(OWNER_DISPATCHER, VERB_PAUSED, action.request.logId(),
            "because tag '" + action.getTag() + "' is paused");
      }
      return;
    }

    BitmapHunter hunter = hunterMap.get(action.getKey());
    if (hunter != null) {
      hunter.attach(action);
      return;
    }

    if (service.isShutdown()) {
      if (action.getPicasso().loggingEnabled) {
        log(OWNER_DISPATCHER, VERB_IGNORED, action.request.logId(), "because shut down");
      }
      return;
    }

    hunter = forRequest(action.getPicasso(), this, cache, stats, action);
    hunter.future = service.submit(hunter);
    hunterMap.put(action.getKey(), hunter);
    if (dismissFailed) {
      failedActions.remove(action.getTarget());
    }

    if (action.getPicasso().loggingEnabled) {
      log(OWNER_DISPATCHER, VERB_ENQUEUED, action.request.logId());
    }
  }

6、图片获取BitmapHunter

BitmapHunter是一个Runnable,作用是获取图片。
BitmapHunter的执行流程:在run()方法中执行hunt()方法尝试获取图片,把结果交给Dispatcher回调。

  Bitmap hunt() throws IOException {
    Bitmap bitmap = null;

    if (shouldReadFromMemoryCache(memoryPolicy)) {
      bitmap = cache.get(key);
      if (bitmap != null) {
        stats.dispatchCacheHit();
        loadedFrom = MEMORY;
        if (picasso.loggingEnabled) {
          log(OWNER_HUNTER, VERB_DECODED, data.logId(), "from cache");
        }
        return bitmap;
      }
    }

    data.networkPolicy = retryCount == 0 ? NetworkPolicy.OFFLINE.index : networkPolicy;
    RequestHandler.Result result = requestHandler.load(data, networkPolicy);
    if (result != null) {
      loadedFrom = result.getLoadedFrom();
      exifOrientation = result.getExifOrientation();
      bitmap = result.getBitmap();
      ........
    }

    if (bitmap != null) {
      if (picasso.loggingEnabled) {
        log(OWNER_HUNTER, VERB_DECODED, data.logId());
      }
      stats.dispatchBitmapDecoded(bitmap);
      if (data.needsTransformation() || exifOrientation != 0) {
        synchronized (DECODE_LOCK) {
          if (data.needsMatrixTransform() || exifOrientation != 0) {
            bitmap = transformResult(data, bitmap, exifOrientation);
            if (picasso.loggingEnabled) {
              log(OWNER_HUNTER, VERB_TRANSFORMED, data.logId());
            }
          }
          if (data.hasCustomTransformations()) {
            bitmap = applyCustomTransformations(data.transformations, bitmap);
            if (picasso.loggingEnabled) {
              log(OWNER_HUNTER, VERB_TRANSFORMED, data.logId(), "from custom transformations");
            }
          }
        }
        if (bitmap != null) {
          stats.dispatchBitmapTransformed(bitmap);
        }
      }
    }

    return bitmap;
  }

7、缓存处理

内存缓存使用LruCache,实现是在LruCache.java中,动态分配缓存大小,大小为可用内存的1/7,

磁盘缓存使用了网络框架的缓存方案。

8、网络请求处理

定义了三种下载器,分别用于okhttp3,okhttp,urlconneciton

static Downloader createDefaultDownloader(Context context) {
    if (SDK_INT >= GINGERBREAD) {
      try {
        Class.forName("okhttp3.OkHttpClient");
        return OkHttp3DownloaderCreator.create(context);
      } catch (ClassNotFoundException ignored) {
      }
      try {
        Class.forName("com.squareup.okhttp.OkHttpClient");
        return OkHttpDownloaderCreator.create(context);
      } catch (ClassNotFoundException ignored) {
      }
    }
    return new UrlConnectionDownloader(context);
  }

9、线程池优化

PicassoExecutorService根据网络状况,使用不同的线程池,命中次数多的,优先级高

    switch (info.getType()) {
      case ConnectivityManager.TYPE_WIFI:
      case ConnectivityManager.TYPE_WIMAX:
      case ConnectivityManager.TYPE_ETHERNET:
        setThreadCount(4);
        break;
      case ConnectivityManager.TYPE_MOBILE:
        switch (info.getSubtype()) {
          case TelephonyManager.NETWORK_TYPE_LTE:  // 4G
          case TelephonyManager.NETWORK_TYPE_HSPAP:
          case TelephonyManager.NETWORK_TYPE_EHRPD:
            setThreadCount(3);
            break;
          case TelephonyManager.NETWORK_TYPE_UMTS: // 3G
          case TelephonyManager.NETWORK_TYPE_CDMA:
          case TelephonyManager.NETWORK_TYPE_EVDO_0:
          case TelephonyManager.NETWORK_TYPE_EVDO_A:
          case TelephonyManager.NETWORK_TYPE_EVDO_B:
            setThreadCount(2);
            break;
          case TelephonyManager.NETWORK_TYPE_GPRS: // 2G
          case TelephonyManager.NETWORK_TYPE_EDGE:
            setThreadCount(1);
            break;
          default:
            setThreadCount(DEFAULT_THREAD_COUNT);
        }
        break;
      default:
        setThreadCount(DEFAULT_THREAD_COUNT);
    }