2020/12/7

數位訊號 DSP

數位訊號處理系統 DSP,也就是將輸入的數位訊號,轉換成另一種數位訊號,\(y[n]= T(x[n])\)

DSP 系統分類

根據數位訊號處理性質可分為

  • 靜態 Static /動態 Dynamic System

    靜態系統 static system 就是輸出訊號只根據目前輸入訊號計算得來,也稱為 Memoryless System

    例如:Scalar Multiplication, \(y[n] = α*x[n]\),其中 α 為 scaling factor,可調整音量大小

    或是 \(y[n] = (x[n])^2\) 可計算功率 Power

​ 動態系統 dynamic system,就是輸出訊號根據過去、現在、未來的數位訊號計算得來,也稱為 Memory System

​ 例如: Moving Average, \(y[n] = \frac{1}{3}(x[n-1]+x[n]+x[n+1])\) 或是 \(y[n] = \frac{1}{3}(x[n-2]+x[n-1]+x[n])\)

  • 線性 Linear /非線性 Nonlinear System

    線性原則就是數位訊號先進行 Linear Combination,再經過 DSP 處理,其結果跟分別對數位訊號進行 DSP 處理,再進行 Linear Combination 的結果一樣

    線性系統: \(T(α*x_1[n]+𝛽*x_2[n]) = α*T(x_1[n]) + 𝛽*T(x_2[n])\)

    ​ ex: Moving Average \(y[n] = \frac{1}{3}(x[n-1]+x[n]+x[n+1])\)

    非線性系統:\(T(α*x_1[n]+𝛽*x_2[n]) ≠ α*T(x_1[n]) + 𝛽*T(x_2[n])\)

    ​ ex: 功率計算 \(y[n] = (x[n])^2\)

  • 時間不變性 Time-Invariant / 時變性 Time-Varying System

    時間不變性 Time-Invariant System 就是 DSP 運算方式不會隨著時間而改變。如果 \(y[n]=T(x[n])\) 則 \(y[n-n_0] = T(x[n-n_0])\)

    ​ ex: Scalar Multiplication

    時變性系統,DSP 運算方式隨著時間改變。因運算方式根據目前觀察到的數位訊號隨時更新,因此具有適應性

  • 因果 Causal / 非因果 Non-Causal System

    因果 Causal System,輸出訊號只根據目前與過去的數位訊號運算而得來

    ​ ex: Moving Average \(y[n] = \frac{1}{3}(x[n-2]+x[n-1]+x[n])\)

    非因果 Non-Causal System,輸出訊號會根據目前、過去及未來的數位訊號運算而得來

    ​ ex: Moving Average, \(y[n] = \frac{1}{3}(x[n-1]+x[n]+x[n+1])\)

    因果系統符合 Real-Time Application 的需求,因此 DSP 通常使用因果系統 Causal System 設計

  • 穩定 Stable / 不穩定 Non-Stable System

    穩定 Stable System,就是系統具有 Bounded Input - Bounded Output (BIBO) 的特性。當訊號落在限定範圍,則輸出訊號也會落在限定範圍內

    當 \( |x[n]| < B_x 則 |y[n]| < B_x \)

    ex: Scalar Multiplication, \(y[n] = α*x[n]\),如果 \(|x[n]|<1\) 則 \(|y[n|< α\)

    不穩定 Non-Stable System,就是系統不具有 Bounded Input - Bounded Output (BIBO) 的特性。

Linear Time-Invariant System (LTI) 線性時間不變性系統,是最具代表性的 DSP

DSP 基本運算

  • Scalar Multiplication

    \(y[n] = α \cdot x[n]\),其中 α 為縮放因子 scaling factor

  • 加/減法

    \( y[n]=x_1[n] ± x_2[n] \)

  • 乘法

    \( y[n]=x_1[n] \cdot x_2[n] \)

  • 時間延遲 time delay

    \( y[n]=x[n-n_0]\)

    當 \(n_0=1\) ,則 \(y[n]=x[n-1]\),又稱為單位延遲 unit-delay

  • Downsampling 降低取樣率,也稱為 Decimation

    \(y[n] = x[2n]\) 將 sample rate 降為一半

    最簡單的方式,是每兩個 sample,直接選用其中一個,但如果 N 比較大,可以用 N 個 sample 的平均值,作為新的取樣結果

  • Upsampling 增加取樣率

    通常是用內插法 interpolation 計算

    \(y[n] = x[n/2]\) 將 sample rate 兩倍

    最簡單的方法,稱為 Zero-Order Hold 內插法。也就是複製前一個 sample value。如果 N 比較大,可改用 Linear interpolation、Polynomimal Interpolation、Spline Interpolation

  • Resampling

    前面的 upsampling/downsampling 都是整數倍,如果是非整數倍,則稱為 resampling。

    Nyquist-Shannon 取樣定理:如果週期函數 x(t) 不包含高於 B cps(次/秒)的頻率,那麼,一系列小於 1/(2B) 秒的x(t)函數值將會受到前一個週期的x(t)函數值影響。因此 2B 樣本/秒或更高的取樣頻率將能使函數不受干擾。相對的,對於一個給定的取樣頻率 fs,完全重構的頻帶限制為 Bfs/2。

    因為在 downsampling 時,可能會發生無法滿足 Nyquist-Shannon 取樣定理的狀況,而發生取樣後訊號的頻率重疊(即高於取樣頻率一半的頻率成分將被重建成低於取樣頻率一半的訊號),也就是混疊 aliasing。

    解決方式是在 downsampling 前,先對原始訊號進行前處理,例如:low-pass filtering,降低原始數位訊號的最高頻率後,再進行 downsampling

    resampling 後的波形必須跟原始訊號的波形相似


import numpy as np
import scipy.signal as signal
import sys

def scalar_multiplication(x, alpha):
    y = alpha * x
    return y

def add(x1, x2):
    y = x1 + x2
    return y

def multiplication(x1, x2):
    y = x1 * x2
    return y

def time_delay(x, n0):
    y = x
    for i in range( n0 ):
        y = np.insert ( y, 0, 0 )   # 在0的位置插入 1 個 0
    return y

def downsampling( x, method = 1 ):
    N = int( len( x ) / 2 )
    y = np.zeros( N )

    if method == 1:             # Decimation
        for n in range( N ):
            y[n] = x[2*n]
    else:                       # Average
        for n in range( N ):
            y[n] = ( x[2*n] + x[2*n+1] ) / 2

    return y

def upsampling( x, method = 1 ):
    N = len( x ) * 2
    y = np.zeros( N )

    if method == 1:             # Zero-Order Hold
        for n in range( N ):
            y[n] = x[int( n / 2 )]
    else:                       # Linear Interpolation
        for n in range( N ):
            if int( n / 2 ) == n / 2:
                y[n] = x[int( n / 2 )]
            else:
                n1 = int( n / 2 )
                n2 = n1 + 1
                if n2 < len( x ):
                    y[n] = ( x[n1] + x[n2] ) / 2
                else:
                    y[n] = x[n1] / 2

    return y

# 利用 Scipy 的 signal.resample 處理
def resampling( x, sampling_rate ):
    num = int( len(x) * sampling_rate )
    y = signal.resample( x, num )
    return y

def main():
    np.set_printoptions(formatter={'all': lambda x: str(x)+","})

    x = np.array ( [ 1, 2, 4, 3, 2, 1, 1 ] )
    x1 = np.array ( [ 1, 2, 4, 3, 2, 1, 1 ] )
    x2 = np.array ( [ 0, 0, 1, 2, 4, 0, -1 ] )

    alpha = 2
    y = scalar_multiplication(x, alpha)
    print ( str(x) + " * " + str(alpha) + " -> " + str(y) )

    y = add(x1, x2)
    print ( str(x1) + " + " + str(x2) + " -> " + str(y) )

    y = multiplication(x1, x2)
    print ( str(x1) + " * " + str(x2) + " -> " + str(y) )

    n0 = 2
    y = time_delay(x, n0)
    print ( str(x) + " time_delay " + str(n0) + " -> " + str(y) )

    # downsampling
    y1 = downsampling( x, 1 )
    y2 = downsampling( x, 2 )
    # upsampling
    z1 = upsampling( x, 1 )
    z2 = upsampling( x, 2 )

    print ("")
    print ("original                        : " + str(x) )
    print ("downsampling Decimation         : " + str(y1))
    print ("downsampling Average            : " + str(y2))
    print ("upsampling Zero-Order Hold      : " + str(z1))
    print ("upsampling Linear Interpolation : " + str(z2))

    y = resampling( x, 1.5 )
    print ("")
    print ( str(x) + " resampling 1.5 -> " + str(y) )

if __name__ == "__main__":
    sys.exit(main())

執行結果

[1, 2, 4, 3, 2, 1, 1,] * 2 -> [2, 4, 8, 6, 4, 2, 2,]
[1, 2, 4, 3, 2, 1, 1,] + [0, 0, 1, 2, 4, 0, -1,] -> [1, 2, 5, 5, 6, 1, 0,]
[1, 2, 4, 3, 2, 1, 1,] * [0, 0, 1, 2, 4, 0, -1,] -> [0, 0, 4, 6, 8, 0, -1,]
[1, 2, 4, 3, 2, 1, 1,] time_delay 2 -> [0, 0, 1, 2, 4, 3, 2, 1, 1,]

original                        : [1, 2, 4, 3, 2, 1, 1,]
downsampling Decimation         : [1.0, 4.0, 2.0,]
downsampling Average            : [1.5, 3.5, 1.5,]
upsampling Zero-Order Hold      : [1.0, 1.0, 2.0, 2.0, 4.0, 4.0, 3.0, 3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0,]
upsampling Linear Interpolation : [1.0, 1.5, 2.0, 3.0, 4.0, 3.5, 3.0, 2.5, 2.0, 1.5, 1.0, 1.0, 1.0, 0.5,]

[1, 2, 4, 3, 2, 1, 1,] resampling 1.5 -> [1.0000000000000007, 1.384837322270305, 3.027762174254682,
 4.024749771849889, 3.3105260815101465, 2.3971667816643563,
 1.8264208230878693, 1.0876017695626832, 0.8352909211473022,
 1.1056443546527661,]

wav DSP

讀取 wav file 的資料,進行 downsampling, upsampling, resampling 後,刻意以原本的 sample rate 數值,寫入新的 wav file,聽起來就像是將原始 wav 進行加速或別的特效處理。


import numpy as np
import wave
from scipy.io import wavfile
import scipy.signal as signal
import struct
import sys

class WavFileParams:
    def __init__(self, filename, num_channels, sampwidth, fs, num_frames, comptype, compname):
        self.filename = filename

        self.num_channels = num_channels                # 通道數
        self.sampwidth = sampwidth                      # 樣本寬度
        self.fs = fs                                    # 取樣頻率(Hz)
        self.num_frames = num_frames                    # 音框數 = 樣本數
        self.comptype = comptype                        # 壓縮型態
        self.compname = compname                        # 壓縮名稱

def write_wav_file(wav_params, digital_signel_points):
    wav_file = wave.open( wav_params.filename, 'w' )
    wav_file.setparams(( wav_params.num_channels, wav_params.sampwidth, wav_params.fs, wav_params.num_frames, wav_params.comptype, wav_params.compname ))

    for s in digital_signel_points :
        # 以 struct.pack 將 int 資料轉換為 16 bits
        wav_file.writeframes( struct.pack( 'h', int ( s ) ) )

    wav_file.close( )

def downsampling( x, method = 1 ):
    N = int( len( x ) / 2 )
    y = np.zeros( N )

    if method == 1:             # Decimation
        for n in range( N ):
            y[n] = x[2*n]
    else:                       # Average
        for n in range( N ):
            y[n] = ( x[2*n] + x[2*n+1] ) / 2

    return y

def upsampling( x, method = 1 ):
    N = len( x ) * 2
    y = np.zeros( N )

    if method == 1:             # Zero-Order Hold
        for n in range( N ):
            y[n] = x[int( n / 2 )]
    else:                       # Linear Interpolation
        for n in range( N ):
            if int( n / 2 ) == n / 2:
                y[n] = x[int( n / 2 )]
            else:
                n1 = int( n / 2 )
                n2 = n1 + 1
                if n2 < len( x ):
                    y[n] = ( x[n1] + x[n2] ) / 2
                else:
                    y[n] = x[n1] / 2

    return y

# 利用 Scipy 的 signal.resample 處理
def resampling( x, sampling_rate ):
    num = int( len(x) * sampling_rate )
    y = signal.resample( x, num )
    return y

def main():
    input_wav_file = "input.wav"
    output_wav_file_prefix = "output"

    wav = wave.open( input_wav_file, 'rb' )
    num_channels = wav.getnchannels( )  # 通道數
    sampwidth    = wav.getsampwidth( )  # 樣本寬度
    fs           = wav.getframerate( )  # 取樣頻率(Hz)
    num_frames   = wav.getnframes( )    # 音框數 = 樣本數
    comptype     = wav.getcomptype( )   # 壓縮型態
    compname     = wav.getcompname( )   # 無壓縮
    wav.close( )

    sampling_rate, x = wavfile.read( input_wav_file )   # 輸入訊號

    # downsampling Decimation
    y = downsampling( x, 1 )
    wav_params = WavFileParams(
        filename = output_wav_file_prefix+"_downsampling_Decimation.wav",       # 檔案名稱

        num_channels = num_channels,    # 通道數
        sampwidth = sampwidth,          # 樣本寬度
        fs = fs,                        # 取樣頻率(Hz)
        num_frames = num_frames,        # 音框數 = 樣本數
        comptype = comptype,            # 壓縮型態
        compname = compname             # 無壓縮
    )
    write_wav_file(wav_params, y);

    # downsampling Average
    y = downsampling( x, 2 )
    wav_params = WavFileParams(
        filename = output_wav_file_prefix+"_downsampling_Average.wav",      # 檔案名稱

        num_channels = num_channels,    # 通道數
        sampwidth = sampwidth,          # 樣本寬度
        fs = fs,                        # 取樣頻率(Hz)
        num_frames = num_frames,        # 音框數 = 樣本數
        comptype = comptype,            # 壓縮型態
        compname = compname             # 無壓縮
    )
    write_wav_file(wav_params, y);

    # upsampling Zero-Order Hold
    y = upsampling( x, 1 )
    wav_params = WavFileParams(
        filename = output_wav_file_prefix+"_upsampling_Zero-OrderHold.wav",     # 檔案名稱

        num_channels = num_channels,    # 通道數
        sampwidth = sampwidth,          # 樣本寬度
        fs = fs,                        # 取樣頻率(Hz)
        num_frames = num_frames,        # 音框數 = 樣本數
        comptype = comptype,            # 壓縮型態
        compname = compname             # 無壓縮
    )
    write_wav_file(wav_params, y);

    # upsampling Linear Interpolation
    y = upsampling( x, 2 )
    wav_params = WavFileParams(
        filename = output_wav_file_prefix+"_upsampling_LinearInterpolation.wav",        # 檔案名稱

        num_channels = num_channels,    # 通道數
        sampwidth = sampwidth,          # 樣本寬度
        fs = fs,                        # 取樣頻率(Hz)
        num_frames = num_frames,        # 音框數 = 樣本數
        comptype = comptype,            # 壓縮型態
        compname = compname             # 無壓縮
    )
    write_wav_file(wav_params, y);

    # resampling
    sampling_rate = 1.5
    y = resampling( x, sampling_rate )
    wav_params = WavFileParams(
        filename = output_wav_file_prefix+"_resampling.wav",        # 檔案名稱

        num_channels = num_channels,    # 通道數
        sampwidth = sampwidth,          # 樣本寬度
        fs = fs,                        # 取樣頻率(Hz)
        num_frames = num_frames,        # 音框數 = 樣本數
        comptype = comptype,            # 壓縮型態
        compname = compname             # 無壓縮
    )
    write_wav_file(wav_params, y);

if __name__ == "__main__":
    sys.exit(main())

振幅調變 AM

振幅調變 Amplitude Modulation (AM),或簡稱調幅,定義為

\(y(t)=x(t)*cos(2*pi*f_c*t)\) 其中 \(f_c\) 為載波頻率 carrier frequency,x(t) 為輸入訊號, y(t) 為輸出訊號

import numpy as np
import wave
from scipy.io import wavfile
import struct
import sys

def AM( x, f, fs ):
    t = np.zeros( len( x ) )
    for i in range( len( x ) ):
        t[i] = i / fs
    carrier = np.cos( 2 * np.pi * f * t )
    return x * carrier

def main( ):
    # infile  = input( "Input File: " )
    # outfile = input( "Output File: " )
    infile = "input.wav"
    outfile = "output.wav"

    wav = wave.open( infile, 'rb' )
    num_channels = wav.getnchannels( )  # 通道數
    sampwidth    = wav.getsampwidth( )  # 樣本寬度
    fs           = wav.getframerate( )  # 取樣頻率(Hz)
    num_frames   = wav.getnframes( )    # 音框數 = 樣本數
    comptype     = wav.getcomptype( )   # 壓縮型態
    compname     = wav.getcompname( )   # 無壓縮
    wav.close( )

    sampling_rate, x = wavfile.read( infile )   # 輸入訊號

    # AM
    # fc = eval( input( "Enter carrier frequency (Hz): " ) )
    fc = 1000
    y = AM( x, fc, fs )

    # output
    wav_file = wave.open( outfile, 'w' )
    wav_file.setparams(( num_channels, sampwidth, fs, num_frames, comptype, compname ))

    for s in y:
        wav_file.writeframes( struct.pack( 'h', int ( s ) ) )

    wav_file.close( )

if __name__ == "__main__":
    sys.exit(main())

References

數位訊號處理:Python程式實作(附範例光碟)(第二版)

沒有留言:

張貼留言