Animated Realtime Spectrograph with Scrolling Waterfall Display in Python

Warning: This post is several years old and the author has marked it as poor quality (compared to more recent posts). It has been left intact for historical reasons, but but its content (and code) may be inaccurate or poorly written.

Update: More modern Python data visualization code and examples can be found on GitHub: https://github.com/swharden/Python-GUI-examples

spectrogram scrollbars

My project is coming along nicely. This isn’t an incredibly robust spectrograph program, but it sure gets the job done quickly and easily. The code below will produce a real time scrolling spectrograph entirely with Python! It polls the microphone (or default recording device), should work on any OS, and can be adjusted for vertical resolution / FFT frequency discretion resolution. It has some simple functions for filtering (check out the de-trend filter!) and might serve as a good start to a spectrograph / frequency analysis project. It took my a long time to reach this point! I’ve worked with Python before, and dabbled with the Python Imaging Library (PIL), but this is my first experience with real time linear data analysis and high-demand multi-threading. I hope it helps you. Below are screenshots of the program (two running at the same time) listening to the same radio signals (mostly Morse code) with standard output and with the “de-trending filter” activated.

nofilter
filter
import pyaudio
import scipy
import struct
import scipy.fftpack

from Tkinter import *
import threading
import time, datetime
import wckgraph
import math

import Image, ImageTk
from PIL import ImageOps
from PIL import ImageChops
import time
import random
import threading
import scipy


#ADJUST RESOLUTION OF VERTICAL FFT
bufferSize=2**11
#bufferSize=2**8

#ADJUSTS AVERAGING SPEED NOT VERTICAL RESOLUTION
#REDUCE HERE IF YOUR PC CANT KEEP UP
sampleRate=24000
#sampleRate=64000

p = pyaudio.PyAudio()
chunks=[]
ffts=[]
def stream():
        global chunks, inStream, bufferSize
        while True:
                chunks.append(inStream.read(bufferSize))

def record():
        global w, inStream, p, bufferSize
        inStream = p.open(format=pyaudio.paInt16,channels=1,
                rate=sampleRate,input=True,frames_per_buffer=bufferSize)
        threading.Thread(target=stream).start()
        #stream()

def downSample(fftx,ffty,degree=10):
        x,y=[],[]
        for i in range(len(ffty)/degree-1):
                x.append(fftx[i*degree+degree/2])
                y.append(sum(ffty[i*degree:(i+1)*degree])/degree)
        return [x,y]

def smoothWindow(fftx,ffty,degree=10):
        lx,ly=fftx[degree:-degree],[]
        for i in range(degree,len(ffty)-degree):
                ly.append(sum(ffty[i-degree:i+degree]))
        return [lx,ly]

def smoothMemory(ffty,degree=3):
        global ffts
        ffts = ffts+[ffty]
        if len(ffts)< =degree: return ffty ffts=ffts[1:] return scipy.average(scipy.array(ffts),0) def detrend(fftx,ffty,degree=10): lx,ly=fftx[degree:-degree],[] for i in range(degree,len(ffty)-degree): ly.append((ffty[i]-sum(ffty[i-degree:i+degree])/(degree*2)) *2+128) #ly.append(fft[i]-(ffty[i-degree]+ffty[i+degree])/2) return [lx,ly] def graph(): global chunks, bufferSize, fftx,ffty, w if len(chunks)>0:
                data = chunks.pop(0)
                data=scipy.array(struct.unpack("%dB"%(bufferSize*2),data))
                ffty=scipy.fftpack.fft(data)
                fftx=scipy.fftpack.rfftfreq(bufferSize*2, 1.0/sampleRate)
                fftx=fftx[0:len(fftx)/4]
                ffty=abs(ffty[0:len(ffty)/2])/1000
                ffty1=ffty[:len(ffty)/2]
                ffty2=ffty[len(ffty)/2::]+2
                ffty2=ffty2[::-1]
                ffty=ffty1+ffty2
                ffty=(scipy.log(ffty)-1)*120
                fftx,ffty=downSample(fftx,ffty,2)
                updatePic(fftx,ffty)
                reloadPic()

        if len(chunks)>20:
                print "falling behind...",len(chunks)

def go(x=None):
        global w,fftx,ffty
        print "STARTING!"
        threading.Thread(target=record).start()
        while True:
                #record()
                graph()


def updatePic(datax,data):
     global im, iwidth, iheight
     strip=Image.new("L",(1,iheight))
     if len(data)>iheight:
             data=data[:iheight-1]
     #print "MAX FREQ:",datax[-1]
     strip.putdata(data)
     #print "%03d, %03d" % (max(data[-100:]), min(data[-100:]))
     im.paste(strip,(iwidth-1,0))
     im=im.offset(-1,0)
     root.update()

def reloadPic():
     global im, lab
     lab.image = ImageTk.PhotoImage(im)
     lab.config(image=lab.image)


root = Tk()
im=Image.open('./ramp.tif')
im=im.convert("L")
iwidth,iheight=im.size
im=im.crop((0,0,500,480))
#im=Image.new("L",(100,1024))
iwidth,iheight=im.size
root.geometry('%dx%d' % (iwidth,iheight))
lab=Label(root)
lab.place(x=0,y=0,width=iwidth,height=iheight)
go()

UPDATE! I’m not going to post the code for this yet (it’s very messy) but I got this thing to display a spectrograph on a canvas. What’s the advantage of that? Huge, massive spectrographs (thousands of pixels in all directions) can now be browsed in real time using scrollbars, and when you scroll it doesn’t stop recording, and you don’t lose any data! Super cool.

spectrogram scrollbars