2016/2/22

Android Thread 溝通方式


  1. 單向資料管道 pipe
  2. 共用記憶體
  3. BlockingQueue: Producer-Consumer Pattern
  4. Message Queue
  5. 將任務回傳給 Thread

可參考作者的 sample code
EAT sample code chap4


pipe


sample code


範例中用 PipedWrite, PipedReader 建立單向資料傳遞。worker thread 可持續讀取 UI thread 產生的文字資料。


public class PipeExampleActivity extends Activity {

    PipedReader r;
    PipedWriter w;

    private Thread workerThread;

    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        r = new PipedReader();
        w = new PipedWriter();

        try {
            w.connect(r);
        } catch (IOException e) {
            e.printStackTrace();
        }

        workerThread = new Thread(new TextHandlerTask(r));
        workerThread.start();
        
        // w.write(); 可持續傳送字串到 TextHandlerTask 另一個讀取資料的 Thread
    }

    private static class TextHandlerTask implements Runnable {
        private final PipedReader reader;

        public TextHandlerTask(PipedReader reader){
            this.reader = reader;
        }
        @Override
        public void run() {
            while(!Thread.currentThread().isInterrupted()){
                try {
                    int i;
                    while((i = reader.read()) != -1){
                        char c = (char) i;
                        //ADD TEXT PROCESSING LOGIC HERE
                        Log.d(TAG, "char = " + c);
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


共用記憶體



如果物件屬於以下這幾種,就會被儲存在共用記憶體中,物件的 reference 會儲存在執行緒的 stack。


  1. 實例成員變數
  2. 類別成員變數
  3. 方法中宣告的物件

執行緒的信號通知機制


synchronized ReentranLock ReentrantReadWriteLock
Blocking to wait Object.wait() Object.wait(timeout) Condition.await() Condition.await(timeout) Condition.await() Condition.await(timeout)
以信號通知 blocking 的執行緒 Object.notify() Object.notifyAll() Condition.signal() Condition.signalAll() Condition.signal() Condition.signalAll()

BlockingQueue: Producer-Consumer Pattern



public class ConsumerProducer {
    private final int LIMIT=10;
    private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(LIMIT);
    
    private void produce throws InterruptedException {
        int value=0;
        while(true) {
            blockingQueue.put(value);
        }
    }
    
    public void consume throws InterruptedException {
        while(true) {
            int value = blockingQueue.take();
        }
    }
}

Message Queue


Message Queue 是最適合用在 Android APP 的溝通機制。



  1. Insert: 利用連接到 consumer thread 的 Handler,producer thread 將訊息插入 queue
  2. Get: consumer thread 會執行 Looper,從 queue 裡面取得訊息
  3. Deliver: Handler 負責處理 consumer thread 的訊息。Thread 可以有多個 Handler instance 來處理訊息,Looper 可確保訊息被送到正確的 Handler。

UI thread 是唯一預設就與 Looper 相關連的執行緒, UI Looper 跟其他 Looper 有些差異
1. 它可以在任何地方被存取: Looper.getMainLooper()
2. 不能被終止: Looper.quit 會丟出 RuntimeException
3. 執行時,透過 Looper.prepareMainLooper() 把 Looper 關聯到 UI thread,每個 APP 都只能做一次


基本的訊息發送範例


public class LooperActivity extends Activity {

    LooperThread mLooperThread;

    // 定義 worker thread
    private static class LooperThread extends Thread {

        public Handler mHandler;

        public void run() {
            // 將 Looper 與 worker thread 連結在一起
            Looper.prepare();
            // 設定 Handler,讓 producer 可以插入訊息
            mHandler = new Handler() {
                // 當訊息被送到 worker thread 時的 callback
                public void handleMessage(Message msg) {
                    if(msg.what == 0) {
                        doLongRunningOperation();
                    }
                }
            };
            // blocking 呼叫,讓 message queue 可發送訊息給 consumer thread
            Looper.loop();
        }

        private void doLongRunningOperation() {
            // Add long running operation here.
        }
    }

    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_looper);
        // 啟動 worker thread
        mLooperThread = new LooperThread();
        mLooperThread.start();
    }

    public void onClick(View v) {
        if (mLooperThread.mHandler != null) {
            // 初始化 message
            Message msg = mLooperThread.mHandler.obtainMessage(0);
            // 發送訊息mLooperThread.mHandler.sendMessage(msg);
        }
    }

    protected void onDestroy() {
        super.onDestroy();
        // 終止 worker thead,讓 Looper.loop 結束 blocking
        mLooperThread.mHandler.getLooper().quit();
    }
}

如果有一段時間沒有訊息,就會進入 IdleHandler


public class ConsumeAndQuitThreadActivity extends Activity {


    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        final ConsumeAndQuitThread consumeAndQuitThread = new ConsumeAndQuitThread();
        consumeAndQuitThread.start();
        
        // 模擬多個 threads 隨機插入訊息
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        SystemClock.sleep(new Random().nextInt(10));
                        consumeAndQuitThread.enqueueData(i);
                    }
                }
            }).start();
        }
    }

    private static class ConsumeAndQuitThread extends Thread implements MessageQueue.IdleHandler {

        private static final String THREAD_NAME = "ConsumeAndQuitThread";

        public Handler mConsumerHandler;
        private boolean mIsFirstIdle = true;

        public ConsumeAndQuitThread() {
            super(THREAD_NAME);
        }

        @Override
        public void run() {
            Looper.prepare();

            mConsumerHandler = new Handler() {
                @Override
                public void handleMessage(Message msg) {
                    // Consume data
                }
            };
            // 在啟動背景執行緒時,註冊 IdleHandler
            Looper.myQueue().addIdleHandler(this);
            Looper.loop();
        }


        @Override
        public boolean queueIdle() {
            if (mIsFirstIdle) {
                // 不處理第一次的 idle
                mIsFirstIdle = false;
                return true;
            }
            // 發生 idle 時,終止該執行緒
            mConsumerHandler.getLooper().quit();
            return false;
        }

        public void enqueueData(int i) {
            mConsumerHandler.sendEmptyMessage(i);
        }
    }
}

以下是資料訊息


Message.obtain(Handler h);
Message.obtain(Handler h, int what);
Message.obtain(Handler h, int what, Object o);
Message.obtain(Handler h, int what, int arg1, int arg2);
Message.obtain(Handler h, int what, int arg1, int arg2, Object o);

將資料訊息插入 message queue


boolean sendMessage(Message msg)
boolean sendMessageAtFrontOfQueue(Message msg)
boolean sendMessageAtTime(Message msg, long uptimeMillis)
boolean sendMessageDelayed(Message msg, long delayMillis)

// 簡單的訊息
boolean sendEmptyMessage(int what)
boolean sendEmptyMessageAtTime(int what, long uptimeMillis)
boolean sendEmptyMessageDelayed(int what, long delayMillis)

以下是任務訊息


//產生訊息時,同時指定 handler
Message m = Message.obtain(handler, runnable);
m.sendToTarget();

將任務訊息插入 message queue


boolean post(Runnable r)
boolean postAtFrontOfQueue(Runnable r)
boolean postAtTime(Runnable r, Object token, long uptimeMillis)
boolean post(Runnable r, long uptimeMillis)
boolean postDelayed(Runnable r, long delayMillis)

雙向傳遞訊息的範例


public class HandlerExampleActivity extends Activity {

    private final static int SHOW_PROGRESS_BAR = 1;
    private final static int HIDE_PROGRESS_BAR = 0;
    private BackgroundThread mBackgroundThread;

    private TextView mText;
    private Button mButton;
    private ProgressBar mProgressBar;

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_handler_example);

        // 當 activity 建立時,就啟動背景執行緒,處理來自 UI thread 的任務
        mBackgroundThread = new BackgroundThread();
        mBackgroundThread.start();

        mText = (TextView) findViewById(R.id.text);
        mProgressBar = (ProgressBar) findViewById(R.id.progress);
        mButton = (Button) findViewById(R.id.button);
        mButton.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                // 點擊按鈕,將任務傳送給背景執行緒
                mBackgroundThread.doWork();
            }
        });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        // 停止背景執行緒
        mBackgroundThread.exit();
    }

    private final Handler mUiHandler = new Handler() {
        public void handleMessage(Message msg) {

            switch(msg.what) {
                case SHOW_PROGRESS_BAR:
                    mProgressBar.setVisibility(View.VISIBLE);
                    break;
                case HIDE_PROGRESS_BAR:
                    mText.setText(String.valueOf(msg.arg1));
                    mProgressBar.setVisibility(View.INVISIBLE);
                    break;
            }
        }
    };

    private class BackgroundThread extends Thread {

        private Handler mBackgroundHandler;

        public void run() {
            // 將 looper 與這個執行緒關聯起來
            Looper.prepare();
            // 此 Handler 只處理 Runnable
            mBackgroundHandler = new Handler();
            Looper.loop();
        }

        public void doWork() {
            mBackgroundHandler.post(new Runnable() {
                @Override
                public void run() {
                    // 建立只傳送 what 的 Message 物件,讓 Progress Bar 更新進度
                    Message uiMsg = mUiHandler.obtainMessage(SHOW_PROGRESS_BAR, 0,
                            0, null);
                    // 傳送訊息
                    mUiHandler.sendMessage(uiMsg);

                    Random r = new Random();
                    int randomInt = r.nextInt(5000);
                    SystemClock.sleep(randomInt);

                    // 建立物件,用來移除 progress bar
                    uiMsg = mUiHandler.obtainMessage(HIDE_PROGRESS_BAR, randomInt,
                            0, null);
                    mUiHandler.sendMessage(uiMsg);
                }
            });
        }

        public void exit() {
            mBackgroundHandler.getLooper().quit();
        }
    }
}

Handler 也可以利用 Callback interface 建立


public class HandlerCallbackActivity extends Activity implements Handler.Callback {

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_handler_callback);
    }

    @Override
    public boolean handleMessage(Message msg) {
        switch (msg.what) {
            case 1:
                msg.what = 11;
                return true;
            default:
                msg.what = 22;
                return false;
        }
    }

    public void onHandlerCallback(View v) {
        Handler handler = new Handler(this) {
            @Override
            public void handleMessage(Message msg) {
                // Process message
            }
        };
        // 插入訊息,該訊息會被 Callback 攔截
        handler.sendEmptyMessage(1);
        handler.sendEmptyMessage(2);
    }
}

觀察 message queue 的方法


為目前的 message queue 產生 snapshot


public class MQDebugActivity extends Activity {

    private static final String TAG = "EAT";
    Handler mWorkerHandler;

    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_mq_debug);

        Thread t = new Thread() {
            @Override
            public void run() {
                Looper.prepare();
                mWorkerHandler = new Handler() {
                    @Override
                    public void handleMessage(Message msg) {
                        Log.d(TAG, "handleMessage - what = " + msg.what);
                    }
                };
                Looper.loop();
            }
        };
        t.start();
    }

    // Called on button click, i.e. from the UI thread.
    public void onClick(View v) {
        mWorkerHandler.sendEmptyMessageDelayed(1, 2000);
        mWorkerHandler.sendEmptyMessage(2);
        mWorkerHandler.obtainMessage(3, 0, 0, new Object()).sendToTarget();
        mWorkerHandler.sendEmptyMessageDelayed(4, 300);
        mWorkerHandler.postDelayed(new Runnable() {
            @Override
            public void run() {
                Log.d(TAG, "Execute");
            }
        }, 400);
        mWorkerHandler.sendEmptyMessage(5);

        mWorkerHandler.dump(new LogPrinter(Log.DEBUG, TAG), "");
    }
}

也可以追蹤 message queue 的處理


Looper.myLopper().setMessageLogging(new LogPrinter(Log.DEBUG, TAG))

與 UI Thread 溝通


訊息透過 UI Thread 的 Looper 傳遞給 UI Thread,這個 Looper 可透過 Looper.getMainLooper() 取得。


Runnable task = new Runnable() {...};
new Handler(Looper.getMainLooper()).post(task);

UI thread 發布給自己的任務訊息,可繞過訊息傳遞機制,並透過 Activity.runOnUiThread(Runnable ) 立刻被執行。


可在 Application 中透過 Thread ID 的方式識別 UI thread


public class TestApplication extends Application {
    private long mUiThreadId;
    private Handler mUiHandler;
    
    public void onCreate() {
        super.onCreate();
        mUiThreadId = Thread.currentThread().getId();
        mUiHandler = new Handler();
    }
    
    public void customRunOnUiThread(Runnable action) {
        if( Thread.currentThread().getId() != mUiThreadId ) {
            mUiHandler.post(action);
        } else {
            action.run();
        }
    }
}

Reference


Android 高效能多執行緒

2016/2/15

Android 的執行緒

Android 的執行緒是以 Linux pthread 及 Java Thread 實作,從APP的角度來看,有 UI、Binder、Background 執行緒。


分類


  1. UI Thread
    在 APP 開始時被啟動,在 Linux process 生命週期內保持存活,UI Thread 是 APP 的 Main Thread,用來執行 Android Component 以及更新 UI
  2. Binder Thread
    用在不同 process 的 thread 之間互相溝通,每一個 process 會維護一個 thread pool,處理從其他 process 進來的 requests,包含系統服務、Intent、Content Provider、Service
  3. Background Thread
    APP 建立的執行緒都是背景執行緒,他是 UI Thread 的 child

adb shell ps


用以下的指令可以查看 android device 上的 process 資訊。


# 顯示 thread 資訊
adb shell ps -t
# 顯示 fg, bg
adb shell ps -P

APP 裡面的執行緒跟 Linux 的 niceness value 相對應,可以用這兩個方式調整 Thread priority


java.lang.Thread
    setPriority(int priority)

android.os.Process
    Process.setThreadPriority(int priority)
    Process.setThreadPriority(int threadId, int priority)

Thread.setPriority(int) Linux niceness
1 (Thread.MIN_PRIORITY) 19
2 16
3 13
4 10
5 (Thread.NORM_PRIORITY) 0
6 -2
7 -4
8 -5
9 -6
10 -8

Android 控制群組


對 APP 來說最重要的兩個控制群組是 foreground 及 background,foreground 表示目前的 APP 是在前景,可視狀態,按下 Home 就會讓 APP 裡面所有的 Thread 都變成 background group。


如果用以下指令,可降低 Thread priority 並讓這個 thread 永遠都在 background group。


Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)

Android 的 IPC


當 thread 之間沒有共用的記憶體區塊時,android 透過 binder framework 進行 IPC。


Linux 的 IPC 有: 信號、管道、訊息佇列、semaphore與共用記憶體,而 Android RPC 已經被 binder framework 取代。


RPC 的步驟為
1. marshalling: 封裝方法及資料
2. 將 marshal 的資料傳送到遠端 process
3. 在遠端 process 進行 unmarshalling,並執行 RPC
4. 將處理結果 return value 回傳給原始的 process


Binder


binder 讓 APP 在不同 process 的 thread 之間互相傳遞資料及進行 RPC,資料以 android.os.Parcel 物件組成,內容包含參數以及實作 android.os.Parcelable 介面的自訂物件,由 transact() 直到 onTransact() 到伺服器行程。


Transaction Thread Pool 可同時處理 16個 RPC。


IPC 是雙向的,也可以設定 IBinder.FLAG_ONEWAY 進行單向沒有回傳值的 RPC。



AIDL


AIDL: Android Interface Definition Language
當 process 想要開放功能給其他 process 使用時,可編輯 .aidl 檔案,然後產生 IPC 的 java codes。



AIDL 透過 Proxy 及 Stub 進行 RPC 的通訊過程如下



  1. 同步 RPC
    在 .aidl 定義介面,然後用工具產生 Proxy 與 Stub。

interface ISynchronous {
    String getThreadName()
}

由於 Stub 可能會發生:(1) 執行時間很久 (2) Blocking (3) 呼叫共用資料的狀況,最好在客戶端要用 worker thread 進行 RPC 呼叫,而不要用 UI thread。


  1. 非同步 RPC

可定義整個介面都是非同步的 oneway


oneway interface IAsynchronousInterface {
    void method1();
    void method2();
}

也可以只定義某個方法是 oneway


interface IAsynchronousInterface {
    oneway void method1();
    void method2();
}

使用 binder 進行訊息傳遞


在不同 process 的 thread 可透過 binder 傳遞訊息



sample code


WorkerThreadService 是在伺服器行程中執行,與客戶端的 Activity 溝通,Service 會實作 Messenger,將它傳遞給 Activity,而 Activity 會回傳 Message 物件給 Service。


public class WorkerThreadService extends Service {

    private static final String TAG = "WorkerThreadService";
    WorkerThread mWorkerThread;
    Messenger mWorkerMessenger;

    @Override
    public void onCreate() {
        super.onCreate();
        // 產生 Service 時,就建立 worker thread,bind 過來的客戶端也是使用 worker thread
        mWorkerThread = new WorkerThread();
        mWorkerThread.start();
    }

    // Worker thread has prepared a looper and handler.
    private void onWorkerPrepared() {
        Log.d(TAG, "onWorkerPrepared");
        mWorkerMessenger = new Messenger(mWorkerThread.mWorkerHandler);
        synchronized(this) {
            notifyAll();
        }
    }

    // 繫結過來的客戶端會收到 messenger 的 IBinder 物件,客戶端才能跟 server 溝通
    public IBinder onBind(Intent intent) {
        Log.d(TAG, "onBind");
        synchronized (this) {
            while (mWorkerMessenger == null) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    // Empty
                }
            }
        }
        return mWorkerMessenger.getBinder();
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        mWorkerThread.quit();
    }

    private class WorkerThread extends Thread {

        Handler mWorkerHandler;

        @Override
        public void run() {
            Looper.prepare();
            mWorkerHandler = new Handler() {
                // worker thread 的 message handler
                @Override
                public void handleMessage(Message msg) {
                    switch (msg.what) {
                        case 1:
                            try {
                                // 雙向訊息溝通的機制,將新的 Message 傳回 Activity
                                msg.replyTo.send(Message.obtain(null, msg.what, 0, 0));
                            } catch (RemoteException e) {
                                Log.e(TAG, e.getMessage());
                            }
                            break;
                        case 2:
                            Log.d(TAG, "Message received");
                            break;
                    }

                }
            };
            onWorkerPrepared();
            Looper.loop();
        }

        public void quit() {
            mWorkerHandler.getLooper().quit();
        }
    }
}

  1. 單向訊息

在客戶端,Activity繫結到伺服器行程中的 service 並傳遞訊息


public class MessengerOnewayActivity extends Activity {

    private boolean mBound = false;
    private Messenger mRemoteService = null;

    private ServiceConnection mRemoteConnection = new ServiceConnection() {

        public void onServiceConnected(ComponentName className, IBinder service) {
                // 連上 service,由伺服器端回傳的 binder 建立 Messenger
            mRemoteService = new Messenger(service);
            mBound = true;
        }

        public void onServiceDisconnected(ComponentName className) {
            mRemoteService = null;
            mBound = false;
        }
    };

    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_messenger_client);
    }

    public void onBindClick(View v) {
          // 繫結到遠端服務
        Intent intent = new Intent("com.eat.chapter5.ACTION_BIND");
        bindService(intent, mRemoteConnection, Context.BIND_AUTO_CREATE);

    }

    public void onUnbindClick(View v) {
        if (mBound) {
            unbindService(mRemoteConnection);
            mBound = false;
        }

    }

    public void onSendClick(View v) {
        if (mBound) {
            try {
                // 點擊按鈕時傳遞訊息
                mRemoteService.send(Message.obtain(null, 2, 0, 0));
            } catch (RemoteException e) {
                // Empty
            }
        }
    }
}

  1. 雙向訊息

傳遞的 messenge 在 Message.replyTo 裡面存放指向 Messenger 的 reference,可透過它建立雙向溝通機制。


public class MessengerTwowayActivity extends Activity {
    private static final String TAG = "MessengerTwowayActivity";
    private boolean mBound = false;
    private Messenger mRemoteService = null;

    private ServiceConnection mRemoteConnection = new ServiceConnection() {

        public void onServiceConnected(ComponentName className, IBinder service) {
            mRemoteService = new Messenger(service);
            mBound = true;
        }

        public void onServiceDisconnected(ComponentName className) {
            mRemoteService = null;
            mBound = false;
        }
    };

    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_messenger_client);
    }

    public void onBindClick(View v) {
        Intent intent = new Intent("com.eat.chapter5.ACTION_BIND");
        bindService(intent, mRemoteConnection, Context.BIND_AUTO_CREATE);
    }

    public void onUnbindClick(View v) {
        if (mBound) {
            unbindService(mRemoteConnection);
            mBound = false;
        }

    }

    public void onSendClick(View v) {
        if (mBound) {
            try {
                Message msg = Message.obtain(null, 1, 0, 0);
                // 建立被傳送到遠端的 Messenger,此 Messenger 存放目前執行緒的 Handler reference,它會執行來自其他 process 的訊息
                msg.replyTo = new Messenger(new Handler() {
                    @Override
                    public void handleMessage(Message msg) {
                        Log.d(TAG, "Message sent back - msg.what = " + msg.what);
                    }
                });
                mRemoteService.send(msg);
            } catch (RemoteException e) {
                Log.e(TAG, e.getMessage());
            }
        }
    }
}

Reference


Android 高效能多執行緒

2016/2/1

SMACK: Spark、Mesos、Akka、Cassandra 和 Kafka


對於新的名詞 SMACK,以下是一些網站的資訊,對於 Scala Akka,我們還算比較熟悉一些,Spark 在大數據的運算處理效能也已經被驗證了。對 Cassandra 就比較不知道特殊的定位在哪,目前只認為是一種 Java 實作的 NoSQL DB,但在 driver 的部分,並沒有感覺到有什麼優勢。還不熟悉 Mesos 跟 Kafka,Kafka 相較於 RabbitMQ 來說,定位在輕量級的 MQ 系統。


2016年是IT翻轉的關鍵一年


  1. Docker: 建立應用程式層級的虛擬隔離環境

  2. Microservices: 不同於過去只靠單一應用系統(Monolithic Application)來提供各種應用功能,微服務架構是一種以大量微型服務來組合成一套應用系統的架構。

  3. DevOps: 各種DevOps工具可以以Docker為溝通基礎,來建立相互搭配的機制,甚至更進一步能成為一個軟體開發生產線,就像是一間高度自動化的軟體工廠(Software Factory),在設計完成後,只需少數幾人就能維運來生產自家的軟體產品。

  4. SMACK(Spark、Mesos、Akka、Cassandra和Kafka)是矽谷最夯的大資料架構,這個以解決Fast Data串流資料為目標的架構


【大資料2016趨勢分析】超夯Big Data新架構SMACK崛起


一位瑞典大資料開發者Anton Kirillov把這5項大資料技術組合稱作SMACK架構,在2015年9月於斯德哥爾摩Big Data Meetup中提出。


在SMACK架構中,首先,採用Spark分散式引擎用來快速處理大規模資料,並用Mesos管理叢集資源。


Akka則是以Scala語言寫出的Actor模型庫,可用來建構一個能在JVM上執行的高同步、分散式、能自動容錯,並以彈性訊息驅動的應用。

儲存層使用Cassandra分散式資料庫,採用Key-Value資料儲存架構。

Kafka是一套分散式訊息提交系統,可以預先將進來的資料集合起來,讓多個Consumer進行批次資料讀取。


LEARN EVERYTHING YOU NEED TO KNOW ABOUT SCALA AND BIG DATA


How SMACK makes big data faster


Akka: a toolkit and runtime aimed at simplifying the construction of concurrent and distributed applications on the Java Virtual Machine (JVM). The only element that is not an Apache Software project.


Cassandra: a NoSQL database management system


Kafka: a distributed messaging system originally developed by LinkedIn


Mesos: a cluster management system co-created by Matei Zaharia who also co-created Spark


Spark: a general-purpose big data processing platform that generally runs in-memory


SMACK stands for Spark, Mesos, Akka, Cassandra and Kafka. These are all open-source, mostly Apache Software projects (Akka is not).


Akka, Spark and Cassandra take data from Kafka into the data layer - Cassandra handles the operational data, while Spark provides near real time analysis of that data. Mesos is tasked with orchestrating the components and managing all the resources used by each of them.


深入淺出Mesos(一):為軟件定義數據中心而生的操作系統


Mesos的起源於Google的數據中心資源管理系統Borg。Twitter從Google的Borg系統中得到啟發,然後就開發一個類似的資源管理系統來幫助他們擺脫可怕的“失敗之鯨”。


Mesos實現了兩級調度架構,它可以管理多種類型的應用程序。


第一級調度是Master的守護進程,管理Mesos集群中所有節點上運行的Slave守護進程。集群由物理服務器或虛擬服務器組成,用於運行應用程序的任務,比如Hadoop和MPI作業。


第二級調度由被稱作Framework的“組件”組成。Framework包括調度器(Scheduler)和執行器(Executor)進程,其中每個節點上都會運行執行器。Mesos能和不同類型的Framework通信,每種Framework由相應的應用集群管理。


Kafka剖析(一):Kafka背景及架構介紹


Kafka是由LinkedIn開發的一個分佈式的消息系統,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎,使用Scala編寫,它以可水平擴展和高吞吐率而被廣泛使用。


Spark擊敗Hadoop刷新資料排序世界記錄


Spark僅以不到30分鐘就完成排序多達100 TB的資料量,打破了此前由另一個大資料分析工具 Hadoop保有72分鐘的世界記錄。


如何在 CentOS 安裝 Observium


參考官方在 CentOS6CentOS7 安裝 Observium 的文件,以下記錄如何在 CentOS 安裝 Observium。


安裝相關套件


因應 Observium 的套件相依性,安裝一些需要的套件,以下沒有將 MySQL 列進去,假設已經安裝好了。


yum -y install wget httpd php php-mysql php-gd php-posix php-mcrypt php-pear php-pear.noarch \
vixie-cron net-snmp net-snmp-utils fping MySQL-python rrdtool subversion \
jwhois ipmitool graphviz ImageMagick

有的文章說,要監控 VM 還需要安裝 libvirt


yum -y install libvirt

安裝 observium


直接下載 observium community version,解壓縮就好了。


cd /opt

wget http://www.observium.org/observium-community-latest.tar.gz
tar zxvf observium-community-latest.tar.gz

然後建立 observium MySQL DB 以及帳號。


mysql -u root -p

CREATE DATABASE observium DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
GRANT ALL PRIVILEGES ON observium.* TO 'observium'@'localhost' IDENTIFIED BY 'password';

切換到 observium 目錄,處理設定檔,填寫 fping 以及 mysql db 帳號及密碼。


cd observium

cp config.php.default config.php
vi /opt/observium/config.php

config.php 異動的內容如下


# 修改 db username, password
$config['db_user'] = 'observium';
$config['db_pass'] = 'password';

# 新增一行 fping 設定
$config['fping'] = "/usr/sbin/fping";

呼叫此 php 程式,安裝 observium 資料庫


php /opt/observium/includes/update/update.php

建立 logs 以及 rrd 的目錄


# log dir
mkdir /opt/observium/logs
chown apache:apache /opt/observium/logs

# rrd dir
mkdir /opt/observium/rrd
chown apache:apache rrd

設定 observium 虛擬網站的網頁目錄


vi /etc/httpd/conf.d/observium.conf

<VirtualHost *:80>
    DocumentRoot /opt/observium/html/
    ServerName  observium.domain.com
    CustomLog /opt/observium/logs/access_log combined
    ErrorLog /opt/observium/logs/error_log
    <Directory "/opt/observium/html/">
        AllowOverride All
        Options FollowSymLinks MultiViews
    </Directory>
</VirtualHost>

建立帳號,level 10 是 admin


## Add a first user, use level of 10 for admin:
cd /opt/observium
./adduser.php admin password 10

新增一個要監控的機器,hostname 的地方一定只能填 hostname,不能填 IP,如果沒有 domain name,就自訂一個 hostname,並把 hostname ip 對應填寫到 /etc/hosts 裡面。


後面的 public 是 SNMP 的 community string,v2c 是 SNMP 版本號碼,所以這個部分的設定的前提,是被監控的機器要先將 snmpd 設定好。


## Add a first device to monitor, hostname 的地方不能填 ip
./add_device.php hostname public v2c

這裡是要自動 discover 機器的功能。


## Do an initial discovery and polling run to populate the data for the new device:
./discovery.php -h all
./poller.php -h all

調整 cronjob


新增一個檔案 /etc/cron.d/observium,內容如下


33  */6   * * *   root    /opt/observium/discovery.php -h all >> /dev/null 2>&1
*/5 *      * * *   root    /opt/observium/discovery.php -h new >> /dev/null 2>&1
*/5 *      * * *   root    /opt/observium/poller-wrapper.py 2 >> /dev/null 2>&1

以指令要求 crond 重新讀取設定檔


/etc/init.d/crond reload

將 observium 的結果 email 出來的方法


因為 observium 並沒有自動產生報表送出結果的功能,參考這個網頁 [http://blog.pdurante.com/2014/04/11/configure-email-reports-with-observium/] 的方法,可以用 script 的方式,透過 rrdtool 取得 observium 的流量圖。


方法簡述如下:首先找到想要 email 的流量圖,在 observium 的網頁上,點擊 "RRD COMMAND" 取得產生這個圖片的 RRD 指令,接下來就能利用 script 的方式,自動執行 script 並將結果 email 給特定的人員。


note


#!/bin/bash
## 昨天
start_time=`date +%Y-%m-%d --date="-1 day"`
start_time_s=`date +%s -d ${start_time}`

## 昨天 +86400 seconds
end_time_s=$((start_time_s+86400))
echo "start_time="${start_time}", start_time_s="${start_time_s}", end_time_s="${end_time_s}

## 上週一
start_week=`date +%Y-%m-%d -d 'last monday'`
start_week_s=`date +%s -d ${start_week}`
## 這週一
end_week=`date +%Y-%m-%d -d 'monday'`
end_week_s=`date +%s -d ${end_week}`
echo "start_week="${start_week}", start_week_s="${start_week_s}
echo "  end_week="${end_week}",   end_week_s="${end_week_s}

## 上個月 1 日
start_month=`date +%Y-%m-01 --date="-1 month"`
start_month_s=`date +%s -d ${start_month}`
## 這個月 1 日
end_month=`date +%Y-%m-01`
end_month_s=`date +%s -d ${end_month}`

echo "start_month="${start_month}", start_month_s="${start_month_s}
echo "  end_month="${end_month}",   end_month_s="${end_month_s}

mkdir -p ~/chart/${start_time}

# cpu
# memory
# eth0