2021/3/8

頻率域 DSP

頻率域的數位訊號處理 Digital Signal Processing in Frequency Domain

到目前為止,DSP 是以離散時間域的數學運算為主軸,例如:卷積運算、移動平均濾波器、FIR 濾波器、IIR 濾波器。本章是以頻率域的數位訊號處理為主軸

頻率域 DSP 的系統方塊圖

處理步驟如下:

  1. 輸入的數位訊號 x[n],取離散時間傅立葉轉換 DTFT,結果為

    \(X(e^{jω}) = F\{x[n]\}\)

    在DSP 實務中,則是採用離散傅立葉轉換 (Discrete Fourier Transform, DFT),即

    \(X[k] = \sum_{n=0}^{N-1}x[n]e^{-j2 \pi kn/N}, k=0,1,...,N-1\)

    為了縮短處理時間,通常採用快速傅立葉轉換 Fast Fourier Transform, FFT 的演算法進行運算。注意運算結果為複數陣列

  2. 根據系統輸入/輸出的目的或規格,進行濾波器的設計,藉以產生頻率響應,即

    \(H(e^{jω}) = F\{h[n]\}\)

    在 DSP 實務中,根據濾波器的種類,設計濾波器的頻率響應 Frequency Response,通常頻率響應在特定頻率範圍的響應值(或強度值),介於 0~1 之間

  3. 套用濾波器,通常使用卷積定理,藉以產生輸出訊號的頻率域表示法

    \(Y(e^{jω}) = H(e^{jω})X(e^{jω})\)

    在 DSP 實務中,運用陣列的點對點乘法 (Pointwise Multiplication) 運算,達到濾波的效果,即

    \(Y[k] = H[k] \cdot X[k]\)

    結果通常是複數陣列

  4. 最後,取反離散時間傅立葉轉換(Inverse DTFT),即可得到輸出訊號

    \(y[n] = F^{-1}\{Y(e^{jω})\}\)

    在 DSP 實務中,採用反離散傅立葉轉換 Inverse DFT,即

    \(y[n] = \frac{1}{N} \sum_{k=0}^{N-1} Y[k]e^{j2 \pi kn/N}, n=0,1,2,...,N-1\)

    在此,也是使用反快速傅立葉轉換 Inverse FFT,縮短處理時間。由於處理後的結果為複數,通常是取實數部分(忽略虛數),作為輸出訊號,必要時,進一步將數值作正規化處理,藉以控制輸出訊號的數值範圍

理想濾波器

理想濾波器在離散時間域的脈衝響應是有限數列,因此無法在離散時間域實現理想濾波器,然而透過頻率域 DSP,可實現理想濾波器。

ex: 輸入訊號諧波定義如下

\(x(t) = cos(2 \pi \cdot 10 \cdot t)+ cos(2 \pi \cdot 20 \cdot t) + cos(2 \pi \cdot 30 \cdot t)\)

取樣頻率 \(f_s=500Hz\),試套用以下理想濾波器

(1) 理想低通濾波器,截止頻率 \(f_c=15Hz\)

(2) 理想高通濾波器,截止頻率 \(f_c=15Hz\)

(3) 理想帶通濾波器,截止頻率 \(f_1=15Hz, f_2=25Hz\)

(4) 理想帶阻濾波器,截止頻率 \(f_1=15Hz, f_2=25Hz\)

import numpy as np
from numpy.fft import fft, fftshift, ifft, fftfreq
import matplotlib.pyplot as plt

def ideal_lowpass_filtering( x, cutoff, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( -cutoff, cutoff + 1 ):
        H[i] = 1
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_highpass_filtering( x, cutoff, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( -cutoff, cutoff + 1 ):
        H[i] = 1
    H = 1 - H
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_bandpass_filtering( x, f1, f2, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( f1, f2 + 1 ):
        H[i] = 1
    for i in range( -f1, -f2 - 1, -1 ):
        H[i] = 1
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_bandstop_filtering( x, f1, f2, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( f1, f2 + 1 ):
        H[i] = 1
    for i in range( -f1, -f2 - 1, -1 ):
        H[i] = 1
    H = 1 - H
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_allpass_filtering( x ):
    X = fft( x )
    Y = X
    y = ifft( Y )
    y = y.real
    return y

def main( ):
    # print( "DSP in Frequency Domain" )
    # print( "(1) Ideal Lowpass Filtering" )
    # print( "(2) Ideal Highpass Filtering" )
    # print( "(3) Ideal Bandpass Filtering" )
    # print( "(4) Ideal Bandstop Filtering" )
    # print( "(5) Ideal Allpass Filtering" )

    # choice = eval( input( "Please enter your choice: " ) )

    # if choice == 1 or choice == 2:
    #   fc = eval( input( "Please enter cutoff frequency(Hz): " ) )

    # if choice == 3 or choice == 4:
    #   f1 = eval( input( "Please enter frequency f1(Hz): " ) )
    #   f2 = eval( input( "Please enter frequency f2(Hz): " ) )

    fc = 15

    f1 = 15
    f2 = 25

    # 取樣頻率
    fs = 500
    t = np.linspace( 0, 1, fs, endpoint = False )
    # 原始訊號 (諧波)
    x = np.cos( 2 * np.pi * 10 * t ) + np.cos( 2 * np.pi * 20 * t ) + np.cos( 2 * np.pi * 30 * t )

    # fftfreq: 離散傅立葉轉換的採樣頻率
    # fftshift: 將 0 移到正中心
    f = fftshift( fftfreq( fs, 1 / fs ) )
    Xm = abs( fftshift( fft( x ) ) )

    # 替換中文字型,處理 matplotlib 中文 label 問題
    # https://blog.csdn.net/gmr2453929471/article/details/78655834
    plt.rcParams['font.sans-serif'] = ['Heiti TC'] # 步驟一(替換sans-serif字型)
    plt.rcParams['axes.unicode_minus'] = False  # 步驟二(解決座標軸負數的負號顯示問題)

    # 輸入訊號圖形
    plt.figure( 1 )
    plt.subplot(121)
    plt.plot( x )
    plt.xlabel( 't (second) (輸入訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(122)
    plt.plot( f, Xm )
    plt.xlabel( 'f (輸入訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    # if choice == 1:
    #   y = ideal_lowpass_filtering( x, fc, fs )
    # elif choice == 2:
    #   y = ideal_highpass_filtering( x, fc, fs )
    # elif choice == 3:
    #   y = ideal_bandpass_filtering( x, f1, f2, fs )
    # elif choice == 4:
    #   y = ideal_bandstop_filtering( x, f1, f2, fs )
    # else:
    #   y = ideal_allpass_filtering( x )

    # ideal_lowpass_filtering
    y = ideal_lowpass_filtering( x, fc, fs )
    Ym = abs( fftshift( fft( y ) ) )

    plt.figure( 2 )
    plt.subplot(221)
    plt.plot( y )
    plt.xlabel( 't (second) (ideal_lowpass 輸出訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(222)
    plt.plot( f, Ym )
    plt.xlabel( 'f  (ideal_lowpass 輸出訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    # ideal_highpass_filtering
    y = ideal_highpass_filtering( x, fc, fs )
    Ym = abs( fftshift( fft( y ) ) )

    plt.figure( 2 )
    plt.subplot(223)
    plt.plot( y )
    plt.xlabel( 't (second) (ideal_highpass 輸出訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(224)
    plt.plot( f, Ym )
    plt.xlabel( 'f  (ideal_highpass 輸出訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    # ideal_bandpass_filtering
    y = ideal_bandpass_filtering( x, f1, f2, fs )
    Ym = abs( fftshift( fft( y ) ) )
    plt.figure( 3 )
    plt.subplot(221)
    plt.plot( y )
    plt.xlabel( 't (second) (ideal_bandpass 輸出訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(222)
    plt.plot( f, Ym )
    plt.xlabel( 'f  (ideal_bandpass 輸出訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    # ideal_bandstop_filtering
    y = ideal_bandstop_filtering( x, f1, f2, fs )
    Ym = abs( fftshift( fft( y ) ) )
    plt.subplot(223)
    plt.plot( y )
    plt.xlabel( 't (second) (ideal_bandstop 輸出訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(224)
    plt.plot( f, Ym )
    plt.xlabel( 'f  (ideal_bandstop 輸出訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    # ideal_allpass_filtering
    y = ideal_allpass_filtering( x )
    Ym = abs( fftshift( fft( y ) ) )
    plt.figure( 4 )
    plt.subplot(221)
    plt.plot( y )
    plt.xlabel( 't (second) (ideal_allpass 輸出訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(222)
    plt.plot( f, Ym )
    plt.xlabel( 'f  (ideal_allpass 輸出訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    plt.show( )

main( )

輸入訊號有 10, 20, 30 Hz 三種頻率

ideal lowpass filter:抑制超過 15Hz 的頻率,輸出只剩下 10Hz 的頻率分量

ideal highpass filter:抑制 15Hz 以下的頻率,輸出 20, 30 Hz 的頻率分量

ideal bandpass filter:允許 15~25 Hz 的頻率分量,剩下 20 Hz

ideal bandstop filter:阻絕 15~25 Hz 的頻率分量,剩下 10, 30 Hz

allpass filter:全部都通過

頻譜平移

Spectrum Shifting 就是將輸入訊號在頻率域中平移,產生輸出訊號,也稱為頻率平移 (frequency shifting)

如果是聲音訊號經過頻譜平移,可改變聲音的頻率,因此,常被稱為音頻改變,或音高改變(Pitch Change)

回顧傅立葉轉換的第二平移定理(或頻率平移定理)

\(F\{f(t) \cdot e^{jω_0t}\} = F(ω-ω_0)\),其中 \(ω_0 > 0 \) 是平移的角頻率。 因此函數 f 的傅立葉轉換,在頻率域的平移,結果是原函數另外乘上一個複數指標函數。

本章採用的方法是在頻率域中進行平移


ex: 輸入弦波訊號的定義:\(x(t)=cos(2 \pi \cdot 10 \cdot t)\) ,且取樣頻率 \(f_s = 500Hz\),套用頻譜平移 (Spectrum Shifting) 技術,平移 20Hz 的頻率

import numpy as np
from numpy.fft import fft, fftshift, ifft, fftfreq
import matplotlib.pyplot as plt

def spectrum_shifting( x, shift, fs ):
    X = fft( x )
    N = fs
    N_half = int( fs / 2 )
    Y = np.zeros( N, dtype = 'complex' )
    for i in range( N_half ):
        if i + shift >= 0 and i + shift <= N_half:
            Y[i + shift] = X[i]
    for i in range( N_half + 1, fs ):
        if i - shift >= N_half + 1 and i - shift < N:
            Y[i - shift] = X[i]
    y = ifft( Y )
    y = y.real
    return y

def main( ):
    fs = 500
    # 輸入弦波訊號
    t = np.linspace( 0, 1, fs, endpoint = False )
    x = np.cos( 2 * np.pi * 50 * t )

    # 平移
    y = spectrum_shifting( x, -30, fs )

    f = fftshift( fftfreq( fs, 1 / fs ) )
    Xm = abs( fftshift( fft( x ) ) )
    Ym = abs( fftshift( fft( y ) ) )

    # 替換中文字型,處理 matplotlib 中文 label 問題
    # https://blog.csdn.net/gmr2453929471/article/details/78655834
    plt.rcParams['font.sans-serif'] = ['Heiti TC'] # 步驟一(替換sans-serif字型)
    plt.rcParams['axes.unicode_minus'] = False  # 步驟二(解決座標軸負數的負號顯示問題)

    plt.subplot(221)
    plt.plot( x )
    plt.xlabel( 't (second) (輸入訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(222)
    plt.plot( f, Xm )
    plt.xlabel( 'f (輸入訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    plt.subplot(223)
    plt.plot( y )
    plt.xlabel( 't (second) (平移後輸出訊號)' )
    plt.ylabel( 'Amplitude' )

    plt.subplot(224)
    plt.plot( f, Ym )
    plt.xlabel( 'f (平移後輸出訊號頻譜)' )
    plt.ylabel( 'Magnitude' )

    plt.show( )

main( )

輸入訊號的頻率分量為 10Hz,平移 20 Hz 後,輸出為 30 Hz

語音的頻率域 DSP

上面的範例輸入訊號時間長度 1s,取樣率 500Hz,FFT 運算還可在有限時間內處理完畢。

但 DSP 實務上,輸入訊號的樣本數很多,例如一般來說歌曲的取樣率 44.1kHz,時間長度 3mins,單通道語音的樣本數為 \(44100*60*3=7938000\)

如果對整個輸入訊號進行 FFT,運算量過大。

因此數位語音 DSP 實務中,通常是將原始的輸入訊號,切割為多個獨立的 segments,每個 segments 包含 N 個樣本數,對每個 segment 分別進行頻率域 DSP,再重組成輸出訊號的結果。

wav ideal filtering

import numpy as np
import wave
from scipy.io.wavfile import read, write
import struct
from numpy.fft import fft, fftshift, ifft

def ideal_lowpass_filtering( x, cutoff, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( -cutoff, cutoff + 1 ):
        H[i] = 1
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_highpass_filtering( x, cutoff, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( -cutoff, cutoff + 1 ):
        H[i] = 1
    H = 1 - H
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_bandpass_filtering( x, f1, f2, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( f1, f2 + 1 ):
        H[i] = 1
    for i in range( -f1, -f2 - 1, -1 ):
        H[i] = 1
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_bandstop_filtering( x, f1, f2, fs ):
    X = fft( x )
    H = np.zeros( fs )
    for i in range( f1, f2 + 1 ):
        H[i] = 1
    for i in range( -f1, -f2 - 1, -1 ):
        H[i] = 1
    H = 1 - H
    Y = H * X
    y = ifft( Y )
    y = y.real
    return y

def ideal_allpass_filtering( x ):
    X = fft( x )
    Y = X
    y = ifft( Y )
    y = y.real
    return y

def dsp(choise, x, fs):
    y = np.zeros( x.size )
    n = int( x.size / fs ) + 1
    N = fs
    for iter in range( n ):
        xx = np.zeros( N )
        yy = np.zeros( N )
        for i in range( iter * N, ( iter + 1 ) * N ):
            if i < x.size:
                xx[i - iter * N] = x[i]

        # yy = ideal_lowpass_filtering( xx, 2000, fs )
        if choise == 0:
            yy = ideal_lowpass_filtering( xx, 2000, fs )
        elif choise == 1:
            yy = ideal_highpass_filtering( xx, 2000, fs )
        elif choise == 2:
            yy = ideal_bandpass_filtering(xx, 2000, 3000, fs)
        elif choise == 3:
            yy = ideal_bandstop_filtering(xx, 2000, 3000, fs)
        else:
            yy = ideal_allpass_filtering(xx)

        for i in range( iter * N, ( iter + 1 ) * N ):
            if i < x.size:
                y[i] = yy[i - iter * N]

    return y

def write_wav_file(y, filename, num_channels, sampwidth, fs, num_frames, comptype, compname):
    wav_file = wave.open( filename, 'w' )
    wav_file.setparams(( num_channels, sampwidth, fs, num_frames, comptype, compname ))

    for s in y:
        wav_file.writeframes( struct.pack( 'h', int ( s ) ) )

    wav_file.close( )

def main( ):
    # infile  = input( "Input File: " )
    # outfile = input( "Output File: " )
    infile = "r2d2.wav"

    # ----------------------------------------------------
    #  輸入模組
    # ----------------------------------------------------
    wav = wave.open( infile, 'rb' )
    num_channels = wav.getnchannels( )  # 通道數
    sampwidth    = wav.getsampwidth( )  # 樣本寬度
    fs           = wav.getframerate( )  # 取樣頻率(Hz)
    num_frames   = wav.getnframes( )    # 音框數 = 樣本數
    comptype     = wav.getcomptype( )   # 壓縮型態
    compname     = wav.getcompname( )   # 無壓縮
    wav.close( )

    sampling_rate, x = read( infile )   # 輸入訊號

    # ----------------------------------------------------
    #  DSP 模組
    # ----------------------------------------------------
    # y = np.zeros( x.size )
    # n = int( x.size / fs ) + 1
    # N = fs
    # for iter in range( n ):
    #   xx = np.zeros( N )
    #   yy = np.zeros( N )
    #   for i in range( iter * N, ( iter + 1 ) * N ):
    #       if i < x.size:
    #           xx[i - iter * N] = x[i]

    #   yy = ideal_lowpass_filtering( xx, 2000, fs )

    #   for i in range( iter * N, ( iter + 1 ) * N ):
    #       if i < x.size:
    #           y[i] = yy[i - iter * N]

    # ----------------------------------------------------
    #  輸出模組
    # ----------------------------------------------------
    # wav_file = wave.open( outfile, 'w' )
    # wav_file.setparams(( num_channels, sampwidth, fs, num_frames, comptype, compname ))

    # for s in y:
    #   wav_file.writeframes( struct.pack( 'h', int ( s ) ) )

    # wav_file.close( )
    y = dsp(0, x, fs)
    write_wav_file(y, "01_ideal_lowpass_filtering.wav", num_channels, sampwidth, fs, num_frames, comptype, compname)

    y = dsp(1, x, fs)
    write_wav_file(y, "02_ideal_highpass_filtering.wav", num_channels, sampwidth, fs, num_frames, comptype, compname)

    y = dsp(2, x, fs)
    write_wav_file(y, "03_ideal_bandpass_filtering.wav", num_channels, sampwidth, fs, num_frames, comptype, compname)

    y = dsp(3, x, fs)
    write_wav_file(y, "04_ideal_bandstop_filtering.wav", num_channels, sampwidth, fs, num_frames, comptype, compname)

    y = dsp(4, x, fs)
    write_wav_file(y, "05_ideal_allpass_filtering.wav", num_channels, sampwidth, fs, num_frames, comptype, compname)

main( )

頻譜平移

原始的輸入訊號,必須先切割成多個獨立的 segments,每個片段包含 N 個樣本,再分別對每個片段進行頻譜平移

import numpy as np
import wave
from scipy.io.wavfile import read, write
import struct
from numpy.fft import fft, fftshift, ifft

def spectrum_shifting( x, shift, fs ):
    X = fft( x )
    N = fs
    N_half = int( fs / 2 )
    Y = np.zeros( N, dtype = 'complex' )
    for i in range( N_half ):
        if i + shift >= 0 and i + shift <= N_half:
            Y[i + shift] = X[i]
    for i in range( N_half + 1, fs ):
        if i - shift >= N_half + 1 and i - shift < N:
            Y[i - shift] = X[i]
    y = ifft( Y )
    y = y.real
    return y

def main( ):
    # infile  = input( "Input File: " )
    # outfile = input( "Output File: " )
    infile  = "r2d2.wav"
    outfile = "r2d2_spectrum_shifting.wav"

    # ----------------------------------------------------
    #  輸入模組
    # ----------------------------------------------------
    wav = wave.open( infile, 'rb' )
    num_channels = wav.getnchannels( )  # 通道數
    sampwidth    = wav.getsampwidth( )  # 樣本寬度
    fs           = wav.getframerate( )  # 取樣頻率(Hz)
    num_frames   = wav.getnframes( )    # 音框數 = 樣本數
    comptype     = wav.getcomptype( )   # 壓縮型態
    compname     = wav.getcompname( )   # 無壓縮
    wav.close( )

    sampling_rate, x = read( infile )   # 輸入訊號

    # ----------------------------------------------------
    #  DSP 模組
    # ----------------------------------------------------
    y = np.zeros( x.size )
    n = int( x.size / fs ) + 1
    N = fs
    for iter in range( n ):
        xx = np.zeros( N )
        yy = np.zeros( N )
        for i in range( iter * N, ( iter + 1 ) * N ):
            if i < x.size:
                xx[i - iter * N] = x[i]

        yy = spectrum_shifting( xx, 500, fs )

        for i in range( iter * N, ( iter + 1 ) * N ):
            if i < x.size:
                y[i] = yy[i - iter * N]

    # ----------------------------------------------------
    #  輸出模組
    # ----------------------------------------------------
    wav_file = wave.open( outfile, 'w' )
    wav_file.setparams(( num_channels, sampwidth, fs, num_frames, comptype, compname ))

    for s in y:
        wav_file.writeframes( struct.pack( 'h', int ( s ) ) )

    wav_file.close( )

main( )

References

數位訊號處理:Python程式實作(附範例光碟)(第二版)

沒有留言:

張貼留言