Executor 為執行緒以及在系統上使用的資源,提供更進一步的控制機制。
Executor 的功能
- 建立 worker thread 的 pool 及 queue,控制等待執行的任務數量
- 檢查讓執行緒異常結束的錯誤
- 等待執行緒結束,並擷取結果
- 以固定順序批次執行執行緒,並擷取結果
- 啟用背景執行緒,讓使用者能快一點取得結果
SimpleExecutor
針對每一個任務建立一個執行緒。
import java.util.concurrent.Executor;
public class SimpleExecutor implements Executor {
@Override
public void execute(Runnable runnable) {
new Thread(runnable).start();
}
}
SerialExecutor
以任務佇列 ArrauDeque 儲存所有任務,用 FIFO 的順序執行任務。
private static class SerialExecutor implements Executor {
final ArrayDequq<Runnable> mTasks = new ArrayDequq<Runnable>();
Runnable mActive;
@Override
public synchronized void execute(final Runnable runnable) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if(mActive==null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if( (mActive=mTasks.poll()) !=null ) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
ThreadPool
為避免 APP 建立太多 Thread,物件的建立與銷毀浪費了系統的資源,所以可使用 ThreadPool。Pool 的大小可以由 processors 數量來決定。
Runtime.getRuntime().availableProcessors();
ThreadPool 的參數
- 固定尺寸: 由 Executors.newFixedThreadPool(n) 產生
- 動態尺寸: pool隨著任務數量多寡,尺寸會變大或縮小
- 單執行緒: Executor.newSingleThreadExecutor
具有自訂執行緒特性的固定尺寸 ThreadPool sample:
class LowPriorityThreadFactory implements ThreadFactory {
private static final String TAG = "LowPriorityThreadFactory";
private static int count = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("LowPrio " + count++);
t.setPriority(4);
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
@Override
public void uncaughtException(Thread t, Throwable e)
{
Log.d(TAG, "Thread = " + t.getName() + ", error = " + e.getMessage());
}
});
return t;
}
}
追蹤 ThreadPool
public class TaskTrackingThreadPool extends ThreadPoolExecutor{
private static final String TAG = "CustomThreadPool";
private AtomicInteger mTaskCount = new AtomicInteger(0);
public TaskTrackingThreadPool() {
super(3, 3, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
Log.d(TAG, "beforeExecute - thread = " + t.getName());
mTaskCount.getAndIncrement();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Log.d(TAG, "afterExecute - thread = " + Thread.currentThread().getName() + "t = " + t);
mTaskCount.getAndDecrement();
}
@Override
protected void terminated() {
super.terminated();
Log.d(TAG, "terminated - thread = " + Thread.currentThread().getName());
}
public int getNbrOfTasks() {
return mTaskCount.get();
}
}
Sample
利用 invokeAll 讓兩個 worker thread 同時執行兩個獨立的任務,在兩個都完成之後,再將結果合併在一起。
public class InvokeActivity extends Activity {
private static final String TAG = "InvokeActivity";
private TextView textStatus;
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_invoke);
textStatus = (TextView) findViewById(R.id.text_status);
}
public void onButtonClick(View v) {
// 讓 UI thread 分擔 invokeAll 的執行器
SimpleExecutor simpleExecutor = new SimpleExecutor();
simpleExecutor.execute(new Runnable() {
@Override
public void run() {
// 存放兩個獨立任務
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
return getFirstPartialDataFromNetwork();
}
});
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
return getSecondPartialDataFromNetwork();
}
});
// 使用兩個 threads 執行任務
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
try {
Log.d(TAG, "invokeAll");
List<Future<String>> futures = executor.invokeAll(tasks);
Log.d(TAG, "invokeAll after");
final String mashedData = mashupResult(futures);
textStatus.post(new Runnable() {
@Override
public void run() {
textStatus.setText(mashedData);
}
});
Log.d(TAG, "mashedData = " + mashedData);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
});
}
private String getFirstPartialDataFromNetwork() {
Log.d(TAG, "ProgressReportingTask 1 started");
SystemClock.sleep(10000);
Log.d(TAG, "ProgressReportingTask 1 done");
return "MockA";
}
private String getSecondPartialDataFromNetwork() {
Log.d(TAG, "ProgressReportingTask 2 started");
SystemClock.sleep(2000);
Log.d(TAG, "ProgressReportingTask 2 done");
return "MockB";
}
private String mashupResult(List<Future<String>> futures) throws ExecutionException, InterruptedException {
StringBuilder builder = new StringBuilder();
for (Future<String> future : futures) {
builder.append(future.get());
// 以 future.get 分同步取得結果
}
return builder.toString();
}
}
當 Acvtivity 建立後,就會在背景下載多個圖片
public class ECSImageDownloaderActivity extends Activity {
private static final String TAG = "ECSImageDownloaderActivity";
private LinearLayout layoutImages;
private class ImageDownloadTask implements Callable<Bitmap> {
@Override
public Bitmap call() throws Exception {
return downloadRemoteImage();
}
private Bitmap downloadRemoteImage() {
SystemClock.sleep((int) (5000f - new Random().nextFloat() * 5000f));
return BitmapFactory.decodeResource(ECSImageDownloaderActivity.this.getResources(), R.drawable.ic_launcher);
}
}
// 代表產生結果的任務的 Callable instance
private class DownloadCompletionService extends ExecutorCompletionService {
private ExecutorService mExecutor;
public DownloadCompletionService(ExecutorService executor) {
super(executor);
mExecutor = executor;
}
public void shutdown() {
mExecutor.shutdown();
}
public boolean isTerminated() {
return mExecutor.isTerminated();
}
}
// 由完成 queue 取得任務執行結果的 consumer thread
private class ConsumerThread extends Thread {
private DownloadCompletionService mEcs;
private ConsumerThread(DownloadCompletionService ecs) {
this.mEcs = ecs;
}
@Override
public void run() {
super.run();
try {
// 如果執行器被終止,所有任務都已經完成
while(!mEcs.isTerminated()) {
Future<Bitmap> future = mEcs.poll(1, TimeUnit.SECONDS);
if (future != null) {
addImage(future.get());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_ecs_image_downloader);
layoutImages = (LinearLayout) findViewById(R.id.layout_images);
DownloadCompletionService ecs = new DownloadCompletionService(Executors.newCachedThreadPool());
new ConsumerThread(ecs).start();
for (int i = 0; i < 5; i++) {
ecs.submit(new ImageDownloadTask());
}
ecs.shutdown();
}
private void addImage(final Bitmap image) {
runOnUiThread(new Runnable() {
@Override
public void run() {
ImageView iv = new ImageView(ECSImageDownloaderActivity.this);
iv.setImageBitmap(image);
layoutImages.addView(iv);
}
});
}
}
沒有留言:
張貼留言