FM音源方式
FM音源のレシピのコツ(自分で作る場合)
このシンセサイザーにおける各パラメータの音への影響は以下のとおりです。
- Mod Ratio (倍音構成)
1.0, 2.0, 3.0...(整数): きれいな和音、楽器的な音になります。1.41, 2.5, 3.14...(非整数): 金属音、鐘、ノイズっぽい音になります。0.5: 1オクターブ下の音が混ざり、太くなります。1.5: 「ド」に対して「ソ」が混ざり、パワーコードのような響きになります。
- Mod Index (音の明るさ・激しさ)
0.0: 純粋なサイン波(ポーという時報のような音)。1.0 ~ 3.0: 心地よいFMトーン(エレピやベース)。5.0 ~ 10.0: ギラギラした音、ビヨーンという音。10.0以上: ノイズに近い破壊的な音(Gainを下げないと耳が痛くなります)。
- Attack / Release
- Pad系: Attackを
0.5以上にすると、ふわっと立ち上がります。 - Bass/Bell系: Attackを
0.01(最速) にして、Releaseで余韻を調整します。
- Pad系: Attackを

デスクトップ アナログ+Soundfont版(Python)
# コード名: py_synth_v43_midi_monitor
# バージョン: 43.0 (MIDI Monitor Edition)
# Description: 受信したMIDIデータをリアルタイムでログ表示するモニター機能を追加。音切れ防止・ベロシティ補正済み。
import os
os.environ['OMP_NUM_THREADS'] = '1'
import sys
import time
import threading
import gc
import math
import numpy as np
import rtmidi
import fluidsynth
import tkinter as tk
from tkinter import ttk
from tkinter import filedialog
from contextlib import contextmanager
# --- ライブラリ ---
try:
import sounddevice as sd
except ImportError:
sys.exit("Error: pip install sounddevice")
try:
import mido
except ImportError:
sys.exit("Error: pip install mido")
try:
from scipy.signal import lfilter
except ImportError:
sys.exit("Error: pip install scipy")
# --- 設定 ---
TARGET_DEVICE_KEYWORD = "USB"
SAMPLE_RATE = 48000
BLOCK_SIZE = 4096
CHANNELS = 2
DTYPE = 'int16'
MASTER_GAIN_DEFAULT = 0.5
SF2_PATH = "/usr/share/sounds/sf2/FluidR3_GM.sf2"
is_running = True
AUDIO_DEVICE_NAME = "Searching..."
AUDIO_DEVICE_ID = None
MIDI_PORT_NAME = "Searching..."
# --- GM楽器名 ---
GM_INSTRUMENTS = [
"Grand Piano", "Bright Piano", "E.Grand Piano", "Honky-tonk", "E.Piano 1", "E.Piano 2", "Harpsichord", "Clavinet",
"Celesta", "Glockenspiel", "Music Box", "Vibraphone", "Marimba", "Xylophone", "Tubular Bells", "Dulcimer",
"Drawbar Organ", "Percussive Organ", "Rock Organ", "Church Organ", "Reed Organ", "Accordion", "Harmonica", "Tango Accordion",
"Guit(Nylon)", "Guit(Steel)", "Guit(Jazz)", "Guit(Clean)", "Guit(Muted)", "Guit(Overdrive)", "Guit(Distortion)", "Guit(Harmonics)",
"Acoustic Bass", "E.Bass(Finger)", "E.Bass(Pick)", "Fretless Bass", "Slap Bass 1", "Slap Bass 2", "Synth Bass 1", "Synth Bass 2",
"Violin", "Viola", "Cello", "Contrabass", "Tremolo Strings", "Pizzicato Strings", "Orchestral Harp", "Timpani",
"Strings 1", "Strings 2", "SynthStrings 1", "SynthStrings 2", "Choir Aahs", "Voice Oohs", "Synth Voice", "Orchestra Hit",
"Trumpet", "Trombone", "Tuba", "Muted Trumpet", "French Horn", "Brass Section", "Synth Brass 1", "Synth Brass 2",
"Soprano Sax", "Alto Sax", "Tenor Sax", "Baritone Sax", "Oboe", "English Horn", "Bassoon", "Clarinet",
"Piccolo", "Flute", "Recorder", "Pan Flute", "Blown Bottle", "Shakuhachi", "Whistle", "Ocarina",
"Lead 1 (square)", "Lead 2 (saw)", "Lead 3 (calliope)", "Lead 4 (chiff)", "Lead 5 (charang)", "Lead 6 (voice)", "Lead 7 (fifths)", "Lead 8 (bass+lead)",
"Pad 1 (new age)", "Pad 2 (warm)", "Pad 3 (polysynth)", "Pad 4 (choir)", "Pad 5 (bowed)", "Pad 6 (metallic)", "Pad 7 (halo)", "Pad 8 (sweep)",
"FX 1 (rain)", "FX 2 (soundtrack)", "FX 3 (crystal)", "FX 4 (atmosphere)", "FX 5 (brightness)", "FX 6 (goblins)", "FX 7 (echoes)", "FX 8 (sci-fi)",
"Sitar", "Banjo", "Shamisen", "Koto", "Kalimba", "Bag pipe", "Fiddle", "Shanai",
"Tinkle Bell", "Agogo", "Steel Drums", "Woodblock", "Taiko Drum", "Melodic Tom", "Synth Drum", "Reverse Cymbal",
"Guit Fret Noise", "Breath Noise", "Seashore", "Bird Tweet", "Telephone Ring", "Helicopter", "Applause", "Gunshot"
]
@contextmanager
def ignore_stderr():
try:
devnull = os.open(os.devnull, os.O_WRONLY)
old_stderr = os.dup(2)
sys.stderr.flush()
os.dup2(devnull, 2)
os.close(devnull)
try: yield
finally: os.dup2(old_stderr, 2); os.close(old_stderr)
except: yield
class SynthParams:
def __init__(self):
self.engine_mode = "ANALOG"
self.master_gain = MASTER_GAIN_DEFAULT
self.waveform = "sawtooth"
self.lfo_rate = 5.0; self.lfo_depth = 0.0
self.filter_mode = "LPF"; self.cutoff_base = 1000.0; self.resonance = 0.5; self.vcf_env_amt = 4000.0
self.vcf_attack = 0.1; self.vcf_decay = 0.3; self.vcf_sustain = 0.4; self.vcf_release = 0.5
self.amp_attack = 0.05; self.amp_decay = 0.2; self.amp_sustain = 0.6; self.amp_release = 0.5
self.sf_program = 0; self.sf_vol = 1.0
self.digital_filter_on = True
params = SynthParams()
class FluidManager:
def __init__(self):
self.fs = None; self.ready = False
def init_synth(self):
try:
self.fs = fluidsynth.Synth()
self.fs.setting('synth.sample-rate', float(SAMPLE_RATE))
self.fs.setting('synth.gain', 1.0)
self.fs.setting('synth.polyphony', 32)
self.fs.setting('synth.reverb.active', 0)
self.fs.setting('synth.chorus.active', 0)
sfid = self.fs.sfload(SF2_PATH)
self.fs.program_select(0, sfid, 0, 0)
self.ready = True
print("FluidSynth Ready.")
except Exception as e:
print(f"FluidSynth Error: {e}"); self.ready = False
def get_samples(self, n):
if not self.ready: return np.zeros(n * 2, dtype=np.int16)
return np.array(self.fs.get_samples(n), dtype=np.int16)
def note_on(self, n, v):
if self.ready: self.fs.noteon(0, n, v)
def note_off(self, n):
if self.ready: self.fs.noteoff(0, n)
def change_program(self, p):
if self.ready: self.fs.program_change(0, int(p))
fluid_engine = FluidManager()
def design_biquad(mode, fc, q, fs):
w0=2*math.pi*fc/fs; alpha=math.sin(w0)/(2*q); c=math.cos(w0); a0=1+alpha
if mode=="LPF": b=[(1-c)/2, 1-c, (1-c)/2]
elif mode=="HPF": b=[(1+c)/2, -(1+c), (1+c)/2]
elif mode=="BPF": b=[alpha, 0, -alpha]
else: return [1,0,0],[1,0,0]
return np.array(b)/a0, np.array([1+alpha,-2*c,1-alpha])/a0
class ADSREnvelope:
def __init__(self, is_amp=True):
self.state='IDLE'; self.level=0.0; self.is_amp=is_amp
def trigger(self): self.state='ATTACK'
def release(self): self.state='RELEASE'
def get_val(self):
if self.is_amp: a,d,s,r = params.amp_attack,params.amp_decay,params.amp_sustain,params.amp_release
else: a,d,s,r = params.vcf_attack,params.vcf_decay,params.vcf_sustain,params.vcf_release
step = float(BLOCK_SIZE)/SAMPLE_RATE
if self.state=='ATTACK':
self.level+=step/max(a,0.001);
if self.level>=1.0: self.level,self.state=1.0,'DECAY'
elif self.state=='DECAY':
self.level-=(step/max(d,0.001))*(1.0-s);
if self.level<=s: self.level,self.state=s,'SUSTAIN'
elif self.state=='SUSTAIN': self.level=s
elif self.state=='RELEASE':
self.level-=step/max(r,0.001);
if self.level<=0.0: self.level,self.state=0.0,'IDLE'
return self.level
class StereoMasterFilter:
def __init__(self): self.zi = np.zeros((2, 2))
def process(self, sig_stereo_int16, cut, res):
sig_float = sig_stereo_int16.astype(np.float32) / 32768.0
fc = max(50.0, min(cut, SAMPLE_RATE * 0.45)); q = 0.7 + (res * 9.0)
b, a = design_biquad(params.filter_mode, fc, q, SAMPLE_RATE)
sig_reshaped = sig_float.reshape(-1, 2)
out_reshaped, self.zi = lfilter(b, a, sig_reshaped, axis=0, zi=self.zi)
return np.clip(out_reshaped.flatten() * 32768.0, -32768, 32767).astype(np.int16)
class Oscillator:
def __init__(self): self.p=0.0; self.f=0.0
def set_f(self,f): self.f=f
def get(self,n,lfo):
if self.f==0: return np.zeros(n)
mf=self.f+(lfo*params.lfo_depth*10.0)
ph=self.p+np.cumsum(np.full(n,1.0))*(mf/SAMPLE_RATE)
self.p=ph[-1]%1.0; ph%=1.0
w=params.waveform
if w=='sawtooth': return 2.0*(ph-0.5)
elif w=='square': return np.sign(np.sin(2*np.pi*ph))
elif w=='sine': return np.sin(2*np.pi*ph)
return 2.0*np.abs(2.0*(ph-0.5))-1.0
class LFO:
def __init__(self): self.p=0.0
def get(self,n):
t=(np.arange(n)+self.p)/SAMPLE_RATE; self.p+=n
if self.p>SAMPLE_RATE: self.p-=SAMPLE_RATE
return np.sin(2*np.pi*params.lfo_rate*t)
vco=Oscillator(); lfo=LFO(); amp=ADSREnvelope(True); vcf=ADSREnvelope(False)
master_filter = StereoMasterFilter()
active_note=None
# --- GUI Manager with MIDI Log ---
class GUIManager:
def __init__(self):
self.root = None; self.btn_engine = None; self.instr_label = None; self.btn_filter = None
self.disp_a=None; self.disp_d=None; self.disp_s=None; self.disp_r=None
self.var_vol=None; self.var_cut=None; self.var_res=None
self.gate_lamp=None; self.scope_canvas=None
self.lbl_filename=None; self.lbl_wave=None
self.var_dig_flt = None; self.btn_scope = None
self.scope_mode = "OFF"; self.last_scope_time = 0
self.log_text = None # MIDI Log Widget
def set_gate(self, on_off):
if self.root and self.gate_lamp:
color = "#FF0000" if on_off else "#440000"
try: self.root.after_idle(lambda: self.gate_lamp.itemconfig("lamp", fill=color))
except: pass
def update_slider(self, target_var, value):
if self.root and target_var:
try: self.root.after_idle(lambda: target_var.set(value))
except: pass
def update_wave_label(self, name):
if self.root and self.lbl_wave:
try: self.root.after_idle(lambda: self.lbl_wave.config(text=f"WAVE: {name.upper()}"))
except: pass
def update_filename(self, text):
if self.root and self.lbl_filename:
try: self.root.after_idle(lambda: self.lbl_filename.config(text=text))
except: pass
# ★ MIDIログ追加用メソッド
def log_midi(self, text):
if self.root and self.log_text:
try: self.root.after_idle(lambda: self._append_log(text))
except: pass
def _append_log(self, text):
if not self.log_text: return
self.log_text.config(state='normal')
self.log_text.insert(tk.END, text + "\n")
self.log_text.see(tk.END)
# 行数制限(重くなるのを防ぐため最新50行)
if int(self.log_text.index('end-1c').split('.')[0]) > 50:
self.log_text.delete('1.0', '2.0')
self.log_text.config(state='disabled')
def toggle_engine(self):
if params.engine_mode == "ANALOG": params.engine_mode = "DIGITAL"; self.btn_engine.config(text="MODE: DIGITAL (SoundFont)", bg="#44AAEE")
else: params.engine_mode = "ANALOG"; self.btn_engine.config(text="MODE: ANALOG (Scipy)", bg="#EEAA44")
def toggle_filter(self):
modes = ["LPF", "BPF", "HPF"]; current = params.filter_mode
next_mode = modes[(modes.index(current) + 1) % 3]; params.filter_mode = next_mode
if self.btn_filter: self.btn_filter.config(text=f"FILTER: {next_mode}")
def toggle_scope(self):
if self.scope_mode == "OFF": self.scope_mode = "WAVE"; self.btn_scope.config(text="SCOPE: WAVE", bg="black", fg="#00FF00")
elif self.scope_mode == "WAVE": self.scope_mode = "FFT"; self.btn_scope.config(text="SCOPE: FFT", bg="#224422", fg="#00FF00")
else: self.scope_mode = "OFF"; self.btn_scope.config(text="SCOPE: OFF (Stable)", bg="#444444", fg="#AAAAAA");
if self.scope_canvas: self.scope_canvas.delete("all")
def draw_scope_scheduled(self, data):
if self.scope_mode == "OFF" or not self.root or not self.scope_canvas: return
try:
w, h = 250, 100; mid = h/2; self.scope_canvas.delete("all"); disp_data = data / 32768.0
if self.scope_mode == "WAVE":
self.scope_canvas.create_line(0, mid, w, mid, fill="#003300", dash=(2, 4))
frames = len(disp_data) // 2; step = max(1, frames // w); pts = []
for i in range(w):
idx = int(i * step) * 2
if idx < len(disp_data): pts.extend([i, mid - disp_data[idx] * 40.0])
if len(pts) > 2: self.scope_canvas.create_line(pts, fill="#00FF00", width=1.5)
else:
window = np.hanning(len(disp_data)); fft_res = np.abs(np.fft.rfft(disp_data * window)); fft_res = fft_res / len(disp_data) * 100.0; pts = []
num_bins = min(len(fft_res), 300)
for i in range(num_bins):
x = i * (w / num_bins); mag = np.clip(fft_res[i] * h * 0.8, 0, h); pts.extend([x, h - mag])
if len(pts) > 2: self.scope_canvas.create_line(pts, fill="#00FFFF", width=1.5)
except: pass
def set_digital_filter(self):
if self.var_dig_flt: params.digital_filter_on = self.var_dig_flt.get()
def change_instr_relative(self, delta):
new_prog = (params.sf_program + delta) % 128; params.sf_program = new_prog; fluid_engine.change_program(new_prog)
name = GM_INSTRUMENTS[new_prog]
if self.instr_label: self.instr_label.config(text=f"{new_prog}: {name}", fg="cyan")
gui = GUIManager()
# --- Audio Callback ---
last_audio_data = np.zeros(1024, dtype=np.int16)
def sd_callback(outdata, frames, time_info, status):
if status: print(f"Stat: {status}", file=sys.stderr)
if not is_running: outdata.fill(0); return
if params.engine_mode == "ANALOG":
raw = vco.get(frames, np.mean(lfo.get(frames)))
an_mono = raw * amp.get_val()
an_stereo_int16 = (np.repeat(an_mono, 2) * 0.8 * 32767).astype(np.int16)
else: an_stereo_int16 = np.zeros(frames * 2, dtype=np.int16)
dig_stereo_int16 = fluid_engine.get_samples(frames)
if params.engine_mode == "ANALOG": dig_stereo_int16 *= 0
else: an_stereo_int16 *= 0
mixed_int16 = np.clip(an_stereo_int16.astype(np.int32) + dig_stereo_int16.astype(np.int32), -32768, 32767).astype(np.int16)
apply_filter = False
if params.engine_mode == "ANALOG": apply_filter = True
elif params.engine_mode == "DIGITAL": apply_filter = params.digital_filter_on
if apply_filter:
v_env = vcf.get_val()
cut = params.cutoff_base + (params.vcf_env_amt * v_env)
final_out = master_filter.process(mixed_int16, cut, params.resonance)
else:
vcf.get_val(); final_out = mixed_int16
final_out = (final_out * params.master_gain).astype(np.int16)
global last_audio_data
if len(last_audio_data) != len(final_out): last_audio_data = np.zeros_like(final_out)
last_audio_data[:] = final_out
outdata[:] = final_out.reshape(-1, 2)
def gui_update_loop():
if is_running and gui.root:
gui.draw_scope_scheduled(last_audio_data)
gui.root.after(100, gui_update_loop)
def handle_cc(n, v):
norm=v/127.0
if n==8: params.master_gain=norm; gui.update_slider(gui.var_vol,norm); return
if n==7: w=["sawtooth","square","sine","triangle"][min(3,int(norm*4))]; params.waveform=w; gui.update_wave_label(w); return
if n==1: params.lfo_rate=0.1+norm*19.9
elif n==2: params.lfo_depth=norm
elif n==3: params.vcf_attack=0.01+norm*2; gui.update_slider(gui.disp_a, params.vcf_attack)
elif n==4: params.vcf_decay=0.01+norm*2; gui.update_slider(gui.disp_d, params.vcf_decay)
elif n==5: params.vcf_sustain=norm; gui.update_slider(gui.disp_s, params.vcf_sustain)
elif n==6: params.vcf_release=0.01+norm*3; gui.update_slider(gui.disp_r, params.vcf_release)
def midi_loop(midi_in):
while is_running:
msg = midi_in.get_message()
if msg:
m, _ = msg
if len(m) >= 2: process_midi_event(m[0], m[1], m[2] if len(m)>2 else 0)
time.sleep(0.001)
def get_note_name(note_num):
notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
octave = note_num // 12 - 1
name = notes[note_num % 12]
return f"{name}{octave}"
def process_midi_event(status, data1, data2):
global active_note
st = status & 0xF0
# --- ★MIDI LOGGING ---
log_msg = ""
if st == 0x90 and data2 > 0: # Note On
n, v = data1, data2
v = v + 40
# Velocity Correction
if v > 127 : v =127
log_msg = f"Note On : {get_note_name(n)} ({n}) vel:{v}"
if params.engine_mode == "ANALOG":
vco.set_f(440.0*(2**((n-69)/12.0))); amp.trigger(); vcf.trigger(); active_note=n; gui.set_gate(True)
else:
fluid_engine.note_on(n, v); vcf.trigger(); gui.set_gate(True)
elif (st == 0x80) or (st == 0x90 and data2 == 0): # Note Off
n = data1
log_msg = f"Note Off: {get_note_name(n)} ({n})"
if params.engine_mode == "ANALOG":
if n == active_note: amp.release(); vcf.release(); active_note=None; gui.set_gate(False)
else:
fluid_engine.note_off(n); vcf.release(); gui.set_gate(False)
elif st == 0xB0: # CC
log_msg = f"Control : #{data1} val:{data2}"
handle_cc(data1, data2)
elif st == 0xC0: # PC
log_msg = f"Prog Chg: {data1} ({GM_INSTRUMENTS[data1] if data1<128 else '?'})"
if params.engine_mode == "DIGITAL":
params.sf_program = data1; fluid_engine.change_program(data1)
name = GM_INSTRUMENTS[data1] if data1 < len(GM_INSTRUMENTS) else "Unknown"
if gui.root: gui.instr_label.config(text=f"{data1}: {name}", fg="cyan")
# GUI Log Update
if log_msg:
# Append hex raw data
raw_str = f"[{status:02X} {data1:02X} {data2:02X}]"
gui.log_midi(f"{raw_str} {log_msg}")
class MidiPlayer:
def __init__(self): self.filename = None; self.is_playing = False; self.thread = None
def load_file(self):
f = filedialog.askopenfilename(filetypes=[("MIDI Files", "*.mid"), ("All Files", "*.*")])
if f: self.filename = f; short_name = os.path.basename(f); gui.update_filename(f"File: {short_name}"); print(f"Loaded: {f}")
def play(self):
if not self.filename or self.is_playing: return
self.is_playing = True; self.thread = threading.Thread(target=self._play_thread, daemon=True); self.thread.start()
def stop(self):
self.is_playing = False; process_midi_event(0xB0, 120, 0)
if params.engine_mode == "DIGITAL":
for i in range(128): fluid_engine.note_off(i)
def _play_thread(self):
try:
mid = mido.MidiFile(self.filename, clip=True); print(f"Start playing: {self.filename}")
for msg in mid.play():
if not self.is_playing or not is_running: break
if msg.type == 'note_on': process_midi_event(0x90, msg.note, msg.velocity)
elif msg.type == 'note_off': process_midi_event(0x80, msg.note, msg.velocity)
elif msg.type == 'control_change': process_midi_event(0xB0, msg.control, msg.value)
elif msg.type == 'program_change': process_midi_event(0xC0, msg.program, 0)
self.is_playing = False; print("Finished.")
except Exception as e: print(f"Error: {e}"); self.is_playing = False
midi_player = MidiPlayer()
def cleanup_and_exit():
global is_running
print("\nShutting down...")
is_running = False
midi_player.stop()
if 'stream' in globals(): stream.stop(); stream.close()
if 'mi' in globals(): mi.close_port()
if gui.root: gui.root.destroy()
print("Bye!")
sys.exit()
def create_panel():
root=tk.Tk(); gui.root=root
root.title("Synth v43 (MIDI Monitor)"); root.geometry("640x820"); style=ttk.Style(); style.theme_use('clam')
root.protocol("WM_DELETE_WINDOW", cleanup_and_exit)
tk.Label(root, text=f"Audio: {AUDIO_DEVICE_NAME} (ID:{AUDIO_DEVICE_ID}) | MIDI: {MIDI_PORT_NAME}", bg="#333333", fg="white", font=("Arial", 9)).pack(fill=tk.X, side=tk.TOP)
gui.disp_a=tk.DoubleVar(value=params.vcf_attack); gui.disp_d=tk.DoubleVar(value=params.vcf_decay)
gui.disp_s=tk.DoubleVar(value=params.vcf_sustain); gui.disp_r=tk.DoubleVar(value=params.vcf_release)
gui.var_vol=tk.DoubleVar(value=params.master_gain); gui.var_cut=tk.DoubleVar(value=params.cutoff_base)
gui.var_res=tk.DoubleVar(value=params.resonance); gui.var_dig_flt = tk.BooleanVar(value=params.digital_filter_on)
top=tk.Frame(root,bg="black",pady=5); top.pack(fill=tk.X)
cv=tk.Canvas(top,width=30,height=30,bg="black",highlightthickness=0); cv.pack(side=tk.LEFT,padx=10)
gui.gate_lamp=cv; cv.create_oval(5,5,25,25,fill="#440000",outline="gray",tags="lamp")
gui.btn_engine=tk.Button(top, text="MODE: ANALOG (Scipy)", bg="#EEAA44", fg="white", font=("Arial",12,"bold"), command=gui.toggle_engine); gui.btn_engine.pack(side=tk.LEFT, padx=100)
tk.Button(top, text="QUIT", bg="#FF4444", fg="white", font=("Arial",10,"bold"), command=cleanup_and_exit).pack(side=tk.RIGHT, padx=10)
main=ttk.Frame(root,padding=5); main.pack(fill=tk.BOTH,expand=True)
l=ttk.LabelFrame(main,text="Global Filter & Scope",padding=5); l.pack(side=tk.LEFT,fill=tk.BOTH,expand=True)
sc_frame = tk.Frame(l, bg="black", bd=2, relief=tk.SUNKEN); sc_frame.pack(pady=5)
gui.scope_canvas=tk.Canvas(sc_frame,width=250,height=100,bg="black",highlightthickness=0); gui.scope_canvas.pack()
gui.lbl_wave = tk.Label(l, text="WAVE: SAWTOOTH", bg="#222222", fg="cyan", font=("Arial", 10, "bold")); gui.lbl_wave.pack(fill=tk.X)
gui.btn_scope = tk.Button(l, text="SCOPE: OFF (Stable)", bg="#444444", fg="#AAAAAA", font=("Arial", 8), command=gui.toggle_scope); gui.btn_scope.pack(fill=tk.X, pady=2)
gui.btn_filter=tk.Button(l, text="FILTER: LPF", bg="#4444AA", fg="white", font=("Arial",10,"bold"), command=gui.toggle_filter); gui.btn_filter.pack(fill=tk.X, pady=5)
def ms(p,l,v,mx,at):
f=ttk.Frame(p); f.pack(fill=tk.X); ttk.Label(f,text=l,width=5).pack(side=tk.LEFT)
ttk.Scale(f,from_=50 if at=="cutoff_base" else 0, to=mx,variable=v,command=lambda x: setattr(params,at,float(x))).pack(side=tk.LEFT,fill=tk.X,expand=True)
ms(l,"Cut",gui.var_cut,5000,"cutoff_base"); ms(l,"Res",gui.var_res,0.9,"resonance")
r=ttk.LabelFrame(main,text="Digital / Player",padding=5); r.pack(side=tk.RIGHT,fill=tk.BOTH,expand=True)
gui.instr_label=tk.Label(r,text="0: Grand Piano", font=("Arial",14,"bold"), fg="cyan", bg="#222222", width=20); gui.instr_label.pack(pady=5)
instr_btns = ttk.Frame(r); instr_btns.pack(fill=tk.X, pady=2)
tk.Button(instr_btns, text="< Prev", width=10, command=lambda: gui.change_instr_relative(-1)).pack(side=tk.LEFT, padx=5, expand=True)
tk.Button(instr_btns, text="Next >", width=10, command=lambda: gui.change_instr_relative(1)).pack(side=tk.LEFT, padx=5, expand=True)
fk = ttk.Frame(r); fk.pack(fill=tk.X, pady=5)
cb = ttk.Checkbutton(fk, text="Filter Enable (Digital Mode)", variable=gui.var_dig_flt, command=gui.set_digital_filter); cb.pack(anchor=tk.CENTER)
mp = ttk.LabelFrame(r, text="MIDI File Player", padding=5); mp.pack(fill=tk.X, pady=5)
gui.lbl_filename = tk.Label(mp, text="No File Selected", font=("Arial", 8)); gui.lbl_filename.pack(fill=tk.X)
btn_box = ttk.Frame(mp); btn_box.pack(fill=tk.X)
tk.Button(btn_box, text="Load", command=midi_player.load_file, bg="#DDDDDD").pack(side=tk.LEFT, fill=tk.X, expand=True)
tk.Button(btn_box, text="Play", command=midi_player.play, bg="#AAFFAA").pack(side=tk.LEFT, fill=tk.X, expand=True)
tk.Button(btn_box, text="Stop", command=midi_player.stop, bg="#FFAAAA").pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Label(r,text="Filter ADSR").pack(pady=2)
def me(l,v,c):
f=ttk.Frame(r); f.pack(fill=tk.X); ttk.Label(f,text=l,width=3).pack(side=tk.LEFT)
ttk.Scale(f,from_=0.01,to=2.0,variable=v,command=lambda x: setattr(params,f"vcf_{c}",float(x))).pack(side=tk.LEFT,fill=tk.X,expand=True)
me("A",gui.disp_a,"attack"); me("D",gui.disp_d,"decay"); me("S",gui.disp_s,"sustain"); me("R",gui.disp_r,"release")
ttk.Label(r,text="Vol (K8)").pack(pady=2)
ttk.Scale(r,from_=0,to=1,variable=gui.var_vol,command=lambda x: handle_cc(8,float(x)*127)).pack(fill=tk.X)
# ★MIDI Monitor Frame
log_f = ttk.LabelFrame(root, text="MIDI Monitor (Recent 50)", padding=5)
log_f.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
gui.log_text = tk.Text(log_f, height=6, state='disabled', bg="black", fg="#00FF00", font=("Consolas", 9))
gui.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
sb = ttk.Scrollbar(log_f, orient="vertical", command=gui.log_text.yview)
sb.pack(side=tk.RIGHT, fill=tk.Y)
gui.log_text.config(yscrollcommand=sb.set)
gui.root.after(100, gui_update_loop)
return root
if __name__ == "__main__":
fluid_engine.init_synth()
mi=rtmidi.MidiIn()
ports = mi.get_ports(); found_idx = -1
for i, p_name in enumerate(ports):
if "Midi Through" not in p_name: found_idx = i; break
if found_idx != -1: mi.open_port(found_idx); print(f"MIDI Connected: {ports[found_idx]}")
else: mi.open_virtual_port("PySynth"); print("MIDI: Virtual Port")
threading.Thread(target=midi_loop,args=(mi,),daemon=True).start()
print("\n--- Audio Hardware Scan ---")
devs = sd.query_devices()
target_id = None
for i, d in enumerate(devs):
if d['max_output_channels'] > 0 and TARGET_DEVICE_KEYWORD in d['name']:
target_id = i
AUDIO_DEVICE_NAME = d['name']
AUDIO_DEVICE_ID = i
print(f"--> FOUND TARGET: {d['name']} (ID: {i})")
break
if target_id is None:
target_id = sd.default.device[1]
AUDIO_DEVICE_NAME = "Default (Target not found)"
print("--> Using Default Device")
stream = sd.OutputStream(
device=target_id,
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype=DTYPE,
blocksize=BLOCK_SIZE,
latency='high',
callback=sd_callback
)
stream.start()
print(f"Stream Started: {SAMPLE_RATE}Hz / {BLOCK_SIZE} frames")
root=create_panel()
try: root.mainloop()
except: pass
ST7789版(Python)
# コード名: py_synth_v60_arpeggiator
# バージョン: 60.0 (Chord Arpeggiator)
# Description: Page 8にアルペジエーター機能(BPM/Pattern)を追加。
import os
os.environ['OMP_NUM_THREADS'] = '1'
import sys
import time
import threading
import math
import subprocess
import random
import numpy as np
import rtmidi
import fluidsynth
import digitalio
import board
import mido
from PIL import Image, ImageDraw, ImageFont
from adafruit_rgb_display import st7789
# --- ライブラリチェック ---
try:
import sounddevice as sd
except ImportError:
sys.exit("Error: pip install sounddevice")
try:
from scipy.signal import lfilter
except ImportError:
sys.exit("Error: pip install scipy")
# --- 設定 ---
TARGET_DEVICE_KEYWORD = "USB"
SAMPLE_RATE = 48000
BLOCK_SIZE = 256
CHANNELS = 2
DTYPE = 'int16'
MASTER_GAIN_DEFAULT = 0.5
SF2_PATH = "/usr/share/sounds/sf2/FluidR3_GM.sf2"
MIDI_DIR = "./midi"
# LCD Pins
CS_PIN = board.D8
DC_PIN = board.D27
RST_PIN = board.D17
BAUDRATE = 24000000
is_running = True
last_audio_data = np.zeros(BLOCK_SIZE, dtype=np.int16)
midi_logs = []
last_drum_name = ""
# --- コード定義 ---
CHORD_TYPES = [
{"name": "OFF", "offsets": [0]},
{"name": "Major", "offsets": [0, 4, 7]},
{"name": "Minor", "offsets": [0, 3, 7]},
{"name": "Maj7", "offsets": [0, 4, 7, 11]},
{"name": "Min7", "offsets": [0, 3, 7, 10]},
{"name": "Sus4", "offsets": [0, 5, 7]},
{"name": "5th(Pwr)", "offsets": [0, 7]},
{"name": "Octave", "offsets": [0, 12]}
]
# --- アルペジオパターン ---
ARP_PATTERNS = ["OFF", "UP", "DOWN", "RAND"]
# --- GM楽器名 ---
GM_INSTRUMENTS = [
"Grand Piano", "Bright Piano", "E.Grand Piano", "Honky-tonk", "E.Piano 1", "E.Piano 2", "Harpsichord", "Clavinet",
"Celesta", "Glockenspiel", "Music Box", "Vibraphone", "Marimba", "Xylophone", "Tubular Bells", "Dulcimer",
"Drawbar Organ", "Percussive Organ", "Rock Organ", "Church Organ", "Reed Organ", "Accordion", "Harmonica", "Tango Accordion",
"Guit(Nylon)", "Guit(Steel)", "Guit(Jazz)", "Guit(Clean)", "Guit(Muted)", "Guit(Overdrive)", "Guit(Distortion)", "Guit(Harmonics)",
"Acoustic Bass", "E.Bass(Finger)", "E.Bass(Pick)", "Fretless Bass", "Slap Bass 1", "Slap Bass 2", "Synth Bass 1", "Synth Bass 2",
"Violin", "Viola", "Cello", "Contrabass", "Tremolo Strings", "Pizzicato Strings", "Orchestral Harp", "Timpani",
"Strings 1", "Strings 2", "SynthStrings 1", "SynthStrings 2", "Choir Aahs", "Voice Oohs", "Synth Voice", "Orchestra Hit",
"Trumpet", "Trombone", "Tuba", "Muted Trumpet", "French Horn", "Brass Section", "Synth Brass 1", "Synth Brass 2",
"Soprano Sax", "Alto Sax", "Tenor Sax", "Baritone Sax", "Oboe", "English Horn", "Bassoon", "Clarinet",
"Piccolo", "Flute", "Recorder", "Pan Flute", "Blown Bottle", "Shakuhachi", "Whistle", "Ocarina",
"Lead 1 (square)", "Lead 2 (saw)", "Lead 3 (calliope)", "Lead 4 (chiff)", "Lead 5 (charang)", "Lead 6 (voice)", "Lead 7 (fifths)", "Lead 8 (bass+lead)",
"Pad 1 (new age)", "Pad 2 (warm)", "Pad 3 (polysynth)", "Pad 4 (choir)", "Pad 5 (bowed)", "Pad 6 (metallic)", "Pad 7 (halo)", "Pad 8 (sweep)",
"FX 1 (rain)", "FX 2 (soundtrack)", "FX 3 (crystal)", "FX 4 (atmosphere)", "FX 5 (brightness)", "FX 6 (goblins)", "FX 7 (echoes)", "FX 8 (sci-fi)",
"Sitar", "Banjo", "Shamisen", "Koto", "Kalimba", "Bag pipe", "Fiddle", "Shanai",
"Tinkle Bell", "Agogo", "Steel Drums", "Woodblock", "Taiko Drum", "Melodic Tom", "Synth Drum", "Reverse Cymbal",
"Guit Fret Noise", "Breath Noise", "Seashore", "Bird Tweet", "Telephone Ring", "Helicopter", "Applause", "Gunshot"
]
# --- ドラムマップ ---
DRUM_MAP = {
36: "Bass Drum 1", 37: "Side Stick", 38: "Acoustic Snare", 39: "Hand Clap",
40: "Electric Snare", 41: "Low Floor Tom", 42: "Closed Hi-Hat", 43: "High Floor Tom",
44: "Pedal Hi-Hat", 45: "Low Tom", 46: "Open Hi-Hat", 47: "Low-Mid Tom",
48: "Hi-Mid Tom", 49: "Crash Cymbal 1", 50: "High Tom", 51: "Ride Cymbal 1"
}
class SynthParams:
def __init__(self):
self.engine_mode = "ANALOG"
self.master_gain = MASTER_GAIN_DEFAULT
self.waveform = "sawtooth"
self.lfo_rate = 5.0; self.lfo_depth = 0.0
self.filter_mode = "LPF"; self.cutoff_base = 1000.0; self.resonance = 0.5; self.vcf_env_amt = 4000.0
self.vcf_attack = 0.1; self.vcf_decay = 0.3; self.vcf_sustain = 0.4; self.vcf_release = 0.5
self.amp_attack = 0.05; self.amp_decay = 0.2; self.amp_sustain = 0.6; self.amp_release = 0.5
self.sf_program = 0; self.sf_vol = 1.0
self.reverb_send = 40
self.chorus_send = 0
# Chord & Arp
self.chord_type_idx = 0
self.arp_pattern_idx = 0 # 0=OFF
self.arp_bpm = 120.0
self.page_index = 0
self.gate_status = False
params = SynthParams()
# --- Helper Functions for System Info ---
def get_ip_address():
try:
cmd = "hostname -I | cut -d' ' -f1"
return subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
except: return "Unknown"
def get_cpu_temp():
try:
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
temp = float(f.read()) / 1000.0
return temp
except: return 0.0
def get_disk_usage():
try:
cmd = "df -h / | awk 'NR==2{print $3 \"/\" $2 \" (\" $5 \")\"}'"
return subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
except: return "?"
def get_ram_usage():
try:
cmd = "free -m | awk 'NR==2{printf \"%s/%sMB (%.0f%%)\", $3,$2,$3*100/$2 }'"
return subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
except: return "?"
# --- MIDI Player Class ---
class MidiPlayer:
def __init__(self):
self.files = []
self.selected_idx = 0
self.is_playing = False
self.stop_signal = False
self.thread = None
self.scan_files()
def scan_files(self):
self.files = []
if os.path.exists(MIDI_DIR):
for f in os.listdir(MIDI_DIR):
if f.lower().endswith(".mid"):
self.files.append(f)
self.files.sort()
else:
try:
os.makedirs(MIDI_DIR)
print(f"Created {MIDI_DIR}")
except: pass
if not self.files:
self.files = ["(No Files)"]
def select_file(self, norm_val):
count = len(self.files)
self.selected_idx = min(count - 1, int(norm_val * count))
def play(self):
if self.is_playing: return
if self.files[0] == "(No Files)": return
self.stop_signal = False
self.is_playing = True
self.thread = threading.Thread(target=self._play_thread, daemon=True)
self.thread.start()
def stop(self):
if self.is_playing:
self.stop_signal = True
for ch in range(16):
process_midi_event(0xB0 + ch, 120, 0)
process_midi_event(0xB0 + ch, 123, 0)
def _play_thread(self):
filename = os.path.join(MIDI_DIR, self.files[self.selected_idx])
try:
mid = mido.MidiFile(filename)
print(f"Playing: {filename}")
if params.engine_mode == "ANALOG":
params.engine_mode = "DIGITAL1"
for msg in mid.play():
if self.stop_signal or not is_running: break
if msg.type == 'note_on': process_midi_event(0x90 | msg.channel, msg.note, msg.velocity)
elif msg.type == 'note_off': process_midi_event(0x80 | msg.channel, msg.note, msg.velocity)
elif msg.type == 'control_change': process_midi_event(0xB0 | msg.channel, msg.control, msg.value)
elif msg.type == 'program_change': process_midi_event(0xC0 | msg.channel, msg.program, 0)
print("Playback Finished.")
except Exception as e:
print(f"MIDI Error: {e}")
self.is_playing = False
self.stop_signal = False
midi_player = MidiPlayer()
class FluidManager:
def __init__(self):
self.fs = None; self.ready = False
def init_synth(self):
try:
self.fs = fluidsynth.Synth()
self.fs.setting('synth.sample-rate', float(SAMPLE_RATE))
self.fs.setting('synth.gain', 1.0)
self.fs.setting('synth.polyphony', 64)
self.fs.setting('synth.reverb.active', 'yes')
self.fs.setting('synth.chorus.active', 'yes')
sfid = self.fs.sfload(SF2_PATH)
self.fs.program_select(0, sfid, 0, 0)
res = self.fs.program_select(9, sfid, 128, 0)
if res != 0: self.fs.program_select(9, sfid, 120, 0)
self.fs.program_change(9, 0)
self.fs.cc(0, 91, params.reverb_send)
self.fs.cc(0, 93, params.chorus_send)
self.ready = True
print("FluidSynth Ready (Reverb/Chorus ON).")
except Exception as e:
print(f"FluidSynth Error: {e}"); self.ready = False
def get_samples(self, n):
if not self.ready: return np.zeros(n * 2, dtype=np.int16)
return np.array(self.fs.get_samples(n), dtype=np.int16)
def note_on(self, ch, n, v):
if self.ready: self.fs.noteon(ch, n, v)
def note_off(self, ch, n):
if self.ready: self.fs.noteoff(ch, n)
def change_program(self, ch, p):
if self.ready: self.fs.program_change(ch, int(p))
def send_cc(self, ch, cc, val):
if self.ready: self.fs.cc(ch, cc, int(val))
fluid_engine = FluidManager()
# --- Arpeggiator ---
class Arpeggiator:
def __init__(self):
self.current_notes = [] # [(note, vel)]
self.playing_note = None
self.step_idx = 0
self.last_step_time = 0
threading.Thread(target=self._loop, daemon=True).start()
def set_notes(self, notes_with_vel):
# notes_with_vel: list of (n, v)
self.current_notes = notes_with_vel
self.current_notes.sort(key=lambda x: x[0]) # Default sort UP
def _loop(self):
while is_running:
mode = ARP_PATTERNS[params.arp_pattern_idx]
# ARP OFF: Do nothing (Logic handled in process_midi)
if mode == "OFF" or not self.current_notes:
if self.playing_note is not None:
fluid_engine.note_off(0, self.playing_note)
self.playing_note = None
time.sleep(0.01)
continue
# ARP ON
step_duration = 60.0 / params.arp_bpm / 2.0 # 8th notes
now = time.time()
if now - self.last_step_time >= step_duration:
# Stop prev
if self.playing_note is not None:
fluid_engine.note_off(0, self.playing_note)
# Select Next Note
notes = self.current_notes[:] # Copy
if not notes: continue
if mode == "UP":
pass # Sorted
elif mode == "DOWN":
notes.reverse()
elif mode == "RAND":
random.shuffle(notes)
# Safe idx
self.step_idx %= len(notes)
n, v = notes[self.step_idx]
fluid_engine.note_on(0, n, v)
self.playing_note = n
if mode == "RAND":
self.step_idx = random.randint(0, len(notes)-1)
else:
self.step_idx += 1
self.last_step_time = now
time.sleep(0.005)
arp = Arpeggiator()
def design_biquad(mode, fc, q, fs):
w0=2*math.pi*fc/fs; alpha=math.sin(w0)/(2*q); c=math.cos(w0); a0=1+alpha
if mode=="LPF": b=[(1-c)/2, 1-c, (1-c)/2]
elif mode=="HPF": b=[(1+c)/2, -(1+c), (1+c)/2]
elif mode=="BPF": b=[alpha, 0, -alpha]
else: return [1,0,0],[1,0,0]
return np.array(b)/a0, np.array([1+alpha,-2*c,1-alpha])/a0
class ADSREnvelope:
def __init__(self, is_amp=True):
self.state='IDLE'; self.level=0.0; self.is_amp=is_amp
def trigger(self): self.state='ATTACK'
def release(self): self.state='RELEASE'
def get_val(self):
if self.is_amp: a,d,s,r = params.amp_attack,params.amp_decay,params.amp_sustain,params.amp_release
else: a,d,s,r = params.vcf_attack,params.vcf_decay,params.vcf_sustain,params.vcf_release
step = float(BLOCK_SIZE)/SAMPLE_RATE
if self.state=='ATTACK':
self.level+=step/max(a,0.001);
if self.level>=1.0: self.level,self.state=1.0,'DECAY'
elif self.state=='DECAY':
self.level-=(step/max(d,0.001))*(1.0-s);
if self.level<=s: self.level,self.state=s,'SUSTAIN'
elif self.state=='SUSTAIN': self.level=s
elif self.state=='RELEASE':
self.level-=step/max(r,0.001);
if self.level<=0.0: self.level,self.state=0.0,'IDLE'
return self.level
class StereoMasterFilter:
def __init__(self): self.zi = np.zeros((2, 2))
def process(self, sig_stereo_int16, cut, res):
sig_float = sig_stereo_int16.astype(np.float32) / 32768.0
fc = max(50.0, min(cut, SAMPLE_RATE * 0.45)); q = 0.7 + (res * 9.0)
b, a = design_biquad(params.filter_mode, fc, q, SAMPLE_RATE)
sig_reshaped = sig_float.reshape(-1, 2)
out_reshaped, self.zi = lfilter(b, a, sig_reshaped, axis=0, zi=self.zi)
return np.clip(out_reshaped.flatten() * 32768.0, -32768, 32767).astype(np.int16)
class Oscillator:
def __init__(self): self.p=0.0; self.f=0.0
def set_f(self,f): self.f=f
def get(self,n,lfo):
if self.f==0: return np.zeros(n)
mf=self.f+(lfo*params.lfo_depth*10.0)
ph=self.p+np.cumsum(np.full(n,1.0))*(mf/SAMPLE_RATE)
self.p=ph[-1]%1.0; ph%=1.0
w=params.waveform
if w=='sawtooth': return 2.0*(ph-0.5)
elif w=='square': return np.sign(np.sin(2*np.pi*ph))
elif w=='sine': return np.sin(2*np.pi*ph)
return 2.0*np.abs(2.0*(ph-0.5))-1.0
class LFO:
def __init__(self): self.p=0.0
def get(self,n):
t=(np.arange(n)+self.p)/SAMPLE_RATE; self.p+=n
if self.p>SAMPLE_RATE: self.p-=SAMPLE_RATE
return np.sin(2*np.pi*params.lfo_rate*t)
vco=Oscillator(); lfo=LFO(); amp=ADSREnvelope(True); vcf=ADSREnvelope(False)
master_filter = StereoMasterFilter()
active_note=None
# --- Audio Callback ---
def sd_callback(outdata, frames, time_info, status):
global last_audio_data
if not is_running: outdata.fill(0); return
if params.engine_mode == "ANALOG":
raw = vco.get(frames, np.mean(lfo.get(frames)))
an_mono = raw * amp.get_val()
an_stereo_int16 = (np.repeat(an_mono, 2) * 0.8 * 32767).astype(np.int16)
else:
an_stereo_int16 = np.zeros(frames * 2, dtype=np.int16)
dig_stereo_int16 = fluid_engine.get_samples(frames)
mixed_int16 = np.clip(an_stereo_int16.astype(np.int32) + dig_stereo_int16.astype(np.int32), -32768, 32767).astype(np.int16)
apply_filter = False
if params.engine_mode == "ANALOG": apply_filter = True
elif params.engine_mode == "DIGITAL1": apply_filter = True
if apply_filter:
v_env = vcf.get_val()
cut = params.cutoff_base + (params.vcf_env_amt * v_env)
final_out = master_filter.process(mixed_int16, cut, params.resonance)
else:
vcf.get_val()
final_out = mixed_int16
final_out = (final_out * params.master_gain).astype(np.int16)
if len(last_audio_data) != len(final_out):
last_audio_data = np.zeros_like(final_out)
last_audio_data[:] = final_out
outdata[:] = final_out.reshape(-1, 2)
# --- Display Manager ---
class DisplayManager:
def __init__(self):
spi = board.SPI()
cs = digitalio.DigitalInOut(CS_PIN)
dc = digitalio.DigitalInOut(DC_PIN)
rst = digitalio.DigitalInOut(RST_PIN)
self.disp = st7789.ST7789(
spi, cs=cs, dc=dc, rst=rst,
baudrate=BAUDRATE,
width=240, height=320,
x_offset=0, y_offset=0,
rotation=0
)
self.width = 320
self.height = 240
self.image = Image.new("RGB", (self.width, self.height))
self.draw = ImageDraw.Draw(self.image)
try:
self.font_s = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14)
self.font_m = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 18)
self.font_l = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 24)
except:
self.font_s = ImageFont.load_default()
self.font_m = ImageFont.load_default()
self.font_l = ImageFont.load_default()
self.last_sys_update = 0
self.cache_ip = ""; self.cache_disk = ""; self.cache_ram = ""
def update(self):
self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0)
gate_color = (255, 0, 0) if params.gate_status else (50, 0, 0)
self.draw.ellipse((5, 5, 20, 20), fill=gate_color)
page_names = ["MAIN", "LFO", "ADSR/FLT", "VISUAL", "LOG", "PLAYER", "SYSTEM", "EFFECT", "CHORD"]
title = f"{page_names[params.page_index]} (PG:{params.page_index+1}/9)"
self.draw.text((30, 5), title, font=self.font_m, fill=(200, 200, 200))
self.draw.line((0, 30, self.width, 30), fill=(100, 100, 100))
if params.page_index == 0: self._draw_page_main()
elif params.page_index == 1: self._draw_page_lfo()
elif params.page_index == 2: self._draw_page_adsr()
elif params.page_index == 3: self._draw_page_scope()
elif params.page_index == 4: self._draw_page_log()
elif params.page_index == 5: self._draw_page_player()
elif params.page_index == 6: self._draw_page_system()
elif params.page_index == 7: self._draw_page_effect()
elif params.page_index == 8: self._draw_page_chord()
self.disp.image(self.image.rotate(90, expand=True))
def _draw_page_main(self):
if params.engine_mode == "ANALOG":
mode_color = (255, 165, 0)
txt = f"WAVE: {params.waveform.upper()}"
elif params.engine_mode == "DIGITAL1":
mode_color = (0, 191, 255)
p = params.sf_program
name = GM_INSTRUMENTS[p] if p < len(GM_INSTRUMENTS) else "?"
txt = f"PROG {p}: {name}"
elif params.engine_mode == "DIGITAL2":
mode_color = (100, 255, 100)
p = params.sf_program
name = GM_INSTRUMENTS[p] if p < len(GM_INSTRUMENTS) else "?"
txt = f"PROG {p}: {name}"
else:
mode_color = (255, 100, 255)
p = params.sf_program
name = GM_INSTRUMENTS[p] if p < len(GM_INSTRUMENTS) else "?"
txt = f"PROG {p}: {name}"
self.draw.text((10, 40), f"MODE: {params.engine_mode}", font=self.font_l, fill=mode_color)
self.draw.text((10, 80), txt, font=self.font_m, fill=(255, 255, 255))
if "DIGITAL" in params.engine_mode:
self.draw.text((10, 110), "Use K3 to Select Prog", font=self.font_s, fill=(100, 100, 255))
if params.engine_mode == "DIGITAL2":
self.draw.text((180, 45), "(Filt OFF)", font=self.font_s, fill=(100, 255, 100))
elif params.engine_mode == "DIGITAL3":
self.draw.text((180, 45), "(Fixed Vel)", font=self.font_s, fill=(255, 100, 255))
if last_drum_name:
self.draw.text((10, 135), f"Drum: {last_drum_name}", font=self.font_m, fill=(255, 100, 100))
vol_pct = int(params.master_gain * 100)
self.draw.text((10, 160), f"VOL: {vol_pct}%", font=self.font_m, fill=(0, 255, 0))
bar_w = 180
self.draw.rectangle((10, 185, 10+bar_w, 200), outline=(0, 255, 0), fill=None)
self.draw.rectangle((10, 185, 10+int(bar_w*params.master_gain), 200), fill=(0, 255, 0))
def _draw_page_lfo(self):
self.draw.text((10, 40), f"RATE: {params.lfo_rate:.1f} Hz", font=self.font_l, fill=(255, 255, 0))
self.draw.text((10, 70), f"DEPTH: {int(params.lfo_depth*100)} %", font=self.font_l, fill=(0, 255, 255))
self.draw.text((10, 100), "Knob K1: Rate", font=self.font_s, fill=(200, 200, 200))
self.draw.text((10, 120), "Knob K2: Depth", font=self.font_s, fill=(200, 200, 200))
cx, cy = 160, 180
w, h = 280, 60
t = time.time()
phase = t * params.lfo_rate * 2.0 * math.pi
points = []
for i in range(w):
x = i
amp = (params.lfo_depth * 25.0) + 2.0
y = math.sin(phase + (i * 0.1)) * amp
points.append((20 + x, cy + y))
if len(points) > 1:
self.draw.line(points, fill=(255, 0, 255), width=2)
self.draw.rectangle((20, cy-30, 20+w, cy+30), outline=(50, 50, 50))
def _draw_page_adsr(self):
self.draw.text((10, 40), f"A:{params.vcf_attack:.2f} D:{params.vcf_decay:.2f}", font=self.font_m, fill=(255, 255, 255))
self.draw.text((10, 65), f"S:{params.vcf_sustain:.2f} R:{params.vcf_release:.2f}", font=self.font_m, fill=(255, 255, 255))
self.draw.text((10, 90), f"CUT: {int(params.cutoff_base)}", font=self.font_m, fill=(255, 255, 0))
self.draw.text((160, 90), f"RES: {params.resonance:.1f}", font=self.font_m, fill=(255, 0, 255))
self.draw.text((10, 115), "K1:A K2:D K3:S K4:R", font=self.font_s, fill=(150, 150, 150))
self.draw.text((10, 130), "K5:Cut K7:Res", font=self.font_s, fill=(150, 150, 150))
gx, gy, gw, gh = 20, 220, 280, 80
total_t = params.vcf_attack + params.vcf_decay + params.vcf_release + 0.5
scale_x = gw / max(total_t, 1.0)
x0, y0 = gx, gy
xa = x0 + (params.vcf_attack * scale_x)
ya = gy - gh
xd = xa + (params.vcf_decay * scale_x)
ys = gy - (params.vcf_sustain * gh)
hold_w = 0.5 * scale_x
xr_start = xd + hold_w
xr_end = xr_start + (params.vcf_release * scale_x)
pts = [(x0, y0), (xa, ya), (xd, ys), (xr_start, ys), (xr_end, y0)]
self.draw.line(pts, fill=(0, 255, 0), width=3)
self.draw.line((gx, gy, gx+gw, gy), fill=(100, 100, 100))
def _draw_page_scope(self):
step = 4
scale_y = 60.0
mid_y = 80
points = []
data = last_audio_data[::2]
for x in range(0, self.width, step):
idx = int((x / self.width) * len(data))
if idx < len(data):
val = data[idx] / 32768.0
y = mid_y - (val * scale_y)
points.append((x, y))
if len(points) > 1:
self.draw.line(points, fill=(0, 255, 0), width=2)
self.draw.text((5, 140), "Waveform (Up) / Spectrum (Down)", font=self.font_s, fill=(150, 150, 150))
# FFT Spectrum
try:
fft_data = np.abs(np.fft.rfft(data))
fft_len = len(fft_data)
display_len = min(fft_len, 64)
bar_width = self.width / display_len
bottom_y = 230
max_height = 80.0
for i in range(display_len):
mag = fft_data[i]
if i == 0: mag = 0
h = min(max_height, (mag / 400000.0) * max_height)
x1 = i * bar_width
y1 = bottom_y - h
x2 = x1 + bar_width - 1
y2 = bottom_y
r = int(255 * (i / display_len))
b = 255 - r
self.draw.rectangle((x1, y1, x2, y2), fill=(r, 100, b))
except: pass
def _draw_page_log(self):
y = 40
recent = midi_logs[-8:]
for line in recent:
self.draw.text((5, y), line, font=self.font_s, fill=(0, 255, 255))
y += 20
def _draw_page_player(self):
self.draw.text((10, 40), "MIDI File Player", font=self.font_l, fill=(0, 255, 255))
status_txt = "PLAYING >>" if midi_player.is_playing else "STOPPED ||"
status_col = (0, 255, 0) if midi_player.is_playing else (255, 100, 100)
self.draw.text((10, 70), status_txt, font=self.font_l, fill=status_col)
self.draw.text((10, 110), "K1: Select File", font=self.font_s, fill=(200, 200, 200))
self.draw.text((10, 130), "K2: Play(Right)/Stop(Left)", font=self.font_s, fill=(200, 200, 200))
start_y = 160
files = midi_player.files
idx = midi_player.selected_idx
if idx < 1: window_start = 0
elif idx >= len(files) - 1: window_start = len(files) - 3
else: window_start = idx - 1
window_start = max(0, window_start)
for i in range(window_start, min(len(files), window_start + 3)):
fname = files[i]
y = start_y + (i - window_start) * 25
prefix = "> " if i == idx else " "
col = (255, 255, 0) if i == idx else (200, 200, 200)
if len(fname) > 20: fname = fname[:18] + ".."
self.draw.text((10, y), prefix + fname, font=self.font_m, fill=col)
def _draw_page_system(self):
now = time.time()
if now - self.last_sys_update > 2.0:
self.cache_ip = get_ip_address()
self.cache_disk = get_disk_usage()
self.cache_ram = get_ram_usage()
self.last_sys_update = now
self.draw.text((10, 40), "IP Address:", font=self.font_m, fill=(200, 200, 200))
self.draw.text((10, 60), self.cache_ip, font=self.font_l, fill=(0, 255, 255))
temp = get_cpu_temp()
t_col = (0, 255, 0)
if temp > 60: t_col = (255, 255, 0)
if temp > 70: t_col = (255, 0, 0)
self.draw.text((10, 90), f"CPU Temp: {temp:.1f} C", font=self.font_m, fill=t_col)
self.draw.text((10, 120), "RAM Usage:", font=self.font_s, fill=(200, 200, 200))
self.draw.text((10, 135), self.cache_ram, font=self.font_m, fill=(255, 255, 255))
self.draw.text((10, 160), "Disk Usage:", font=self.font_s, fill=(200, 200, 200))
self.draw.text((10, 175), self.cache_disk, font=self.font_m, fill=(255, 255, 255))
def _draw_page_effect(self):
self.draw.text((10, 40), "Global Effects", font=self.font_l, fill=(255, 0, 255))
self.draw.text((10, 80), f"Reverb Send: {params.reverb_send}", font=self.font_m, fill=(255, 255, 0))
bar_w = 200
self.draw.rectangle((10, 105, 10+bar_w, 115), outline=(100, 100, 100))
self.draw.rectangle((10, 105, 10+int(bar_w*(params.reverb_send/127.0)), 115), fill=(255, 255, 0))
self.draw.text((10, 140), f"Chorus Send: {params.chorus_send}", font=self.font_m, fill=(0, 255, 255))
self.draw.rectangle((10, 165, 10+bar_w, 175), outline=(100, 100, 100))
self.draw.rectangle((10, 165, 10+int(bar_w*(params.chorus_send/127.0)), 175), fill=(0, 255, 255))
self.draw.text((10, 200), "K1: Reverb K2: Chorus", font=self.font_s, fill=(200, 200, 200))
def _draw_page_chord(self):
self.draw.text((10, 40), "Chord & Arpeggiator", font=self.font_l, fill=(255, 100, 100))
# 1. Type
idx = params.chord_type_idx
c_name = CHORD_TYPES[idx]["name"]
col = (100, 100, 100) if idx == 0 else (255, 255, 0)
self.draw.text((10, 70), f"TYPE: {c_name}", font=self.font_m, fill=col)
# 2. BPM
self.draw.text((10, 100), f"BPM: {int(params.arp_bpm)}", font=self.font_m, fill=(0, 255, 255))
# 3. Pattern
p_idx = params.arp_pattern_idx
p_name = ARP_PATTERNS[p_idx]
col_p = (100, 100, 100) if p_idx == 0 else (255, 0, 255)
self.draw.text((10, 130), f"ARP: {p_name}", font=self.font_m, fill=col_p)
self.draw.text((10, 180), "K1/Pad:Type K2:BPM", font=self.font_s, fill=(150, 150, 150))
self.draw.text((10, 200), "K3:Arp Pattern", font=self.font_s, fill=(150, 150, 150))
disp_mgr = None
# --- MIDI Handling ---
def handle_cc(n, v):
norm = v / 127.0
# K6: Page Select (9 Pages now)
if n == 6:
pg = int(norm * 9)
if pg > 8: pg = 8
if params.page_index != pg: params.page_index = pg
return
if n == 8: params.master_gain = norm; return
if params.page_index == 0:
if n == 7:
idx = min(6, int(norm * 7))
if idx == 4: params.engine_mode = "DIGITAL1"
elif idx == 5: params.engine_mode = "DIGITAL2"
elif idx == 6: params.engine_mode = "DIGITAL3"
else:
params.engine_mode = "ANALOG"
params.waveform = ["sawtooth", "square", "sine", "triangle"][idx]
return
if n == 3 and "DIGITAL" in params.engine_mode:
prog = v
if params.sf_program != prog:
params.sf_program = prog
fluid_engine.change_program(0, prog)
return
elif params.page_index == 1:
if n == 1: params.lfo_rate = 0.1 + norm * 19.9
elif n == 2: params.lfo_depth = norm
return
elif params.page_index == 2:
if n == 1: params.vcf_attack = 0.01 + norm * 2
elif n == 2: params.vcf_decay = 0.01 + norm * 2
elif n == 3: params.vcf_sustain = norm
elif n == 4: params.vcf_release = 0.01 + norm * 3
elif n == 5: params.cutoff_base = 50.0 + norm * 5000.0
elif n == 7: params.resonance = norm * 0.95
return
elif params.page_index == 5:
if n == 1: midi_player.select_file(norm)
elif n == 2:
if v > 64: midi_player.play()
else: midi_player.stop()
elif params.page_index == 7: # EFFECT
if n == 1:
params.reverb_send = v
fluid_engine.send_cc(0, 91, v)
elif n == 2:
params.chorus_send = v
fluid_engine.send_cc(0, 93, v)
elif n == 3:
params.master_gain = norm
elif params.page_index == 8: # CHORD & ARP
if n == 1:
idx = int(norm * len(CHORD_TYPES))
if idx >= len(CHORD_TYPES): idx = len(CHORD_TYPES) - 1
params.chord_type_idx = idx
elif n == 2: # BPM
params.arp_bpm = 60.0 + (norm * 240.0) # 60-300
elif n == 3: # Pattern
idx = int(norm * len(ARP_PATTERNS))
if idx >= len(ARP_PATTERNS): idx = len(ARP_PATTERNS) - 1
params.arp_pattern_idx = idx
def get_note_name(note_num):
notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
octave = note_num // 12 - 1
name = notes[note_num % 12]
return f"{name}{octave}"
def add_log(msg):
midi_logs.append(msg)
if len(midi_logs) > 20: midi_logs.pop(0)
def process_midi_event(status, data1, data2):
global active_note, last_drum_name
st = status & 0xF0
ch = status & 0x0F
log_str = ""
# Note On
if st == 0x90 and data2 > 0:
n, v = data1, data2
# Pad Chord Select (Only in Chord Page)
if params.page_index == 8:
if 36 <= n <= 43:
idx = n - 36
params.chord_type_idx = idx
log_str = f"Chord Type: {CHORD_TYPES[idx]['name']}"
add_log(log_str)
return
elif 44 <= n <= 51:
idx = n - 44
params.chord_type_idx = idx
log_str = f"Chord Type: {CHORD_TYPES[idx]['name']}"
add_log(log_str)
return
# Drum Check
is_drum = (ch == 9) or (36 <= n <= 51 and ch == 0)
if is_drum:
target_ch = 9
v_drum = max(v, 64)
fluid_engine.note_on(target_ch, n, v_drum)
d_name = DRUM_MAP.get(n, "Drum")
last_drum_name = d_name
log_str = f"Ch{ch} Pad: {d_name}"
add_log(log_str)
return
# Normal Note
if ch == 0 and params.engine_mode == "DIGITAL3": v = 102
else: v = min(127, v + 40)
log_str = f"Ch{ch} NoteOn: {n}"
# Chord Mode Processing (Only for Ch0)
if ch == 0:
# 1. Generate Notes
notes_to_play = []
if params.chord_type_idx > 0:
offsets = CHORD_TYPES[params.chord_type_idx]["offsets"]
for off in offsets:
chord_n = n + off
if 0 <= chord_n <= 127: notes_to_play.append((chord_n, v))
else:
notes_to_play.append((n, v))
# 2. Check ARP status
is_arp = (params.arp_pattern_idx > 0)
if is_arp:
arp.set_notes(notes_to_play)
# For Analog Scope visual only
if params.engine_mode == "ANALOG": active_note=n; params.gate_status=True
else: params.gate_status=True
else:
# Normal Chord Play
for (cn, cv) in notes_to_play:
fluid_engine.note_on(ch, cn, cv)
if params.engine_mode == "ANALOG":
vco.set_f(440.0*(2**((n-69)/12.0))); amp.trigger(); vcf.trigger(); active_note=n; params.gate_status=True
else:
vcf.trigger(); params.gate_status=True
# Note Off
elif (st == 0x80) or (st == 0x90 and data2 == 0):
n = data1
if params.page_index == 8 and (36 <= n <= 51): return
is_drum = (ch == 9) or (36 <= n <= 51 and ch == 0)
if is_drum:
fluid_engine.note_off(9, n)
return
log_str = f"Ch{ch} NoteOff: {n}"
if ch == 0:
is_arp = (params.arp_pattern_idx > 0)
if is_arp:
# Stop Arp
arp.set_notes([])
if params.engine_mode == "ANALOG":
if n == active_note: amp.release(); vcf.release(); active_note=None; params.gate_status=False
else:
vcf.release(); params.gate_status=False
else:
# Normal Note Off
if params.chord_type_idx > 0:
offsets = CHORD_TYPES[params.chord_type_idx]["offsets"]
for off in offsets:
chord_n = n + off
if 0 <= chord_n <= 127: fluid_engine.note_off(ch, chord_n)
else:
fluid_engine.note_off(ch, n)
if params.engine_mode == "ANALOG":
if n == active_note: amp.release(); vcf.release(); active_note=None; params.gate_status=False
else:
vcf.release(); params.gate_status=False
elif st == 0xB0:
if ch == 0:
handle_cc(data1, data2)
log_str = f"CC #{data1} Val:{data2}"
else:
fluid_engine.send_cc(ch, data1, data2)
elif st == 0xC0:
if "DIGITAL" in params.engine_mode or ch != 0:
params.sf_program = data1
fluid_engine.change_program(ch, data1)
log_str = f"Ch{ch} PC: {data1}"
if log_str and ch == 0: add_log(log_str)
def midi_loop(midi_in):
while is_running:
msg = midi_in.get_message()
if msg:
m, _ = msg
if len(m) >= 2: process_midi_event(m[0], m[1], m[2] if len(m)>2 else 0)
time.sleep(0.001)
# --- Main ---
if __name__ == "__main__":
print("Initializing Synth (v60: Arpeggiator)...")
disp_mgr = DisplayManager()
print("Searching for audio device with keyword: 'USB'...")
devs = sd.query_devices()
usb_idx = None
for i, d in enumerate(devs):
if TARGET_DEVICE_KEYWORD in d['name'] and d['max_output_channels'] > 0:
usb_idx = i
print(f"Found Target Device: {d['name']} (Index: {i})")
break
if usb_idx is None:
print("Warning: USB Device not found. Using system default.")
fluid_engine.init_synth()
mi=rtmidi.MidiIn()
ports = mi.get_ports(); found_idx = -1
for i, p_name in enumerate(ports):
if "Midi Through" not in p_name: found_idx = i; break
if found_idx != -1: mi.open_port(found_idx); print(f"MIDI Connected: {ports[found_idx]}")
else: mi.open_virtual_port("PySynth"); print("MIDI: Virtual Port")
threading.Thread(target=midi_loop,args=(mi,),daemon=True).start()
print("Starting Audio Stream...")
stream = sd.OutputStream(
device=usb_idx,
channels=CHANNELS,
samplerate=SAMPLE_RATE,
dtype=DTYPE,
blocksize=BLOCK_SIZE,
latency='high',
callback=sd_callback
)
stream.start()
print("System Running.")
try:
while True:
start = time.time()
disp_mgr.update()
elapsed = time.time() - start
if elapsed < 0.05:
time.sleep(0.05 - elapsed)
except KeyboardInterrupt:
print("\nStopping...")
is_running = False
midi_player.stop()
stream.stop()
mi.close_port()
print("Bye!")

デスクトップ版 ポリフォニックアナログシンセサイザー(Python)
# code_name: raspi4_polysynth_v9_effects
# Version: 9.0.0
# Description: Multicore + MIDI Player + FX (Chorus/Delay)
# Hardware: Raspberry Pi 4, USB DAC, AKAI MPK Mini (CC 1-8)
import tkinter as tk
from tkinter import ttk
from tkinter import filedialog
import numpy as np
import sounddevice as sd
import mido
import threading
import time
import sys
import multiprocessing
import queue
import os
# --- Visualization Imports ---
import matplotlib
matplotlib.use("TkAgg")
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
# --- System Config ---
SAMPLE_RATE = 48000
BLOCK_SIZE = 1024
POLYPHONY = 6
CHANNELS = 2
VIZ_FPS = 30
# --- Shared Default Parameters ---
DEFAULT_PARAMS = {
'waveform': 'square',
'pulse_width': 0.5,
'cutoff': 1.0,
'resonance': 0.0,
'amp_attack': 0.05,
'amp_release': 0.5,
'lfo_rate': 5.0,
'lfo_amount': 0.0,
'volume': 0.8,
'velocity_sense': 1.0,
'chorus_mix': 0.0, # 0.0 - 1.0
'delay_mix': 0.0 # 0.0 - 1.0
}
# --- MIDI CC Mapping (FX用に変更) ---
CC_MAP = {
1: 'cutoff', 2: 'resonance', 3: 'amp_attack', 4: 'amp_release',
5: 'pulse_width',
6: 'chorus_mix', # Old: lfo_rate -> New: Chorus
7: 'delay_mix', # Old: lfo_amount -> New: Delay
8: 'volume'
}
# ==========================================
# DSP Logic: Synthesis
# ==========================================
class ADSR:
IDLE, ATTACK, RELEASE = 0, 1, 4
def __init__(self):
self.state = self.IDLE
self.level = 0.0
def trigger(self): self.state = self.ATTACK
def release(self): self.state = self.RELEASE
def process_block(self, size, attack_t, release_t):
env = np.zeros(size)
atk_s = max(1, int(attack_t * SAMPLE_RATE))
rel_s = max(1, int(release_t * SAMPLE_RATE))
step_atk = 1.0 / atk_s
step_rel = 1.0 / rel_s
for i in range(size):
if self.state == self.ATTACK:
self.level += step_atk
if self.level >= 1.0: self.level = 1.0
elif self.state == self.RELEASE:
self.level -= step_rel
if self.level <= 0.0:
self.level = 0.0
self.state = self.IDLE
env[i] = self.level
return env
class SynthVoice:
def __init__(self):
self.active = False
self.note = 0
self.phase = 0.0
self.freq = 440.0
self.velocity = 0
self.amp_env = ADSR()
def trigger(self, note, velocity):
self.active = True
self.note = note
self.velocity = velocity / 127.0
self.freq = 440.0 * (2.0 ** ((note - 69) / 12.0))
self.phase = 0.0
self.amp_env.trigger()
def release(self):
self.amp_env.release()
def get_audio_block(self, size, p, lfo_val):
if not self.active: return np.zeros(size)
t = np.arange(size)
phase_inc = self.freq / SAMPLE_RATE
phases = self.phase + t * phase_inc
phases_wrapped = phases % 1.0
self.phase = (self.phase + size * phase_inc) % 1.0
w_type = p['waveform']
if w_type == 'square':
pw = p['pulse_width']
mod_pw = np.clip(pw + (lfo_val * p['lfo_amount'] * 0.2), 0.05, 0.95)
osc = np.where(phases_wrapped < mod_pw, 1.0, -1.0)
elif w_type == 'saw':
osc = 2.0 * phases_wrapped - 1.0
elif w_type == 'tri':
osc = 2.0 * np.abs(2.0 * phases_wrapped - 1.0) - 1.0
elif w_type == 'sine':
osc = np.sin(2.0 * np.pi * phases_wrapped)
else:
osc = 2.0 * phases_wrapped - 1.0
cutoff_mod = np.clip(p['cutoff'] + (lfo_val * p['lfo_amount'] * 0.5), 0.1, 1.0)
tone_gain = 0.6 + (0.4 * cutoff_mod) # Gain Boosted
if p['resonance'] > 0.1:
osc += (osc * osc * osc * p['resonance'] * 2.0)
osc = np.clip(osc, -1.5, 1.5)
amp = self.amp_env.process_block(size, p['amp_attack'], p['amp_release'])
if self.amp_env.state == ADSR.IDLE: self.active = False
return osc * amp * self.velocity * tone_gain
class PolySynth:
def __init__(self):
self.voices = [SynthVoice() for _ in range(POLYPHONY)]
self.lfo_phase = 0.0
def note_on(self, note, vel):
for v in self.voices:
if not v.active:
v.trigger(note, vel); return
def note_off(self, note):
for v in self.voices:
if v.active and v.note == note: v.release()
def get_block(self, frames, current_params):
lfo_inc = current_params['lfo_rate'] / SAMPLE_RATE
lfo_phases = self.lfo_phase + np.arange(frames) * lfo_inc
lfo_out = np.sin(2.0 * np.pi * lfo_phases)
self.lfo_phase = (self.lfo_phase + frames * lfo_inc) % 1.0
mix = np.zeros(frames)
vc = 0
for v in self.voices:
if v.active:
mix += v.get_audio_block(frames, current_params, lfo_out)
vc += 1
# Note: Master Volume and Limiter moved to EffectProcessor for consistency
return mix
# ==========================================
# DSP Logic: Effects (New)
# ==========================================
class EffectProcessor:
def __init__(self):
# Chorus State
self.chorus_buffer_size = 2048
self.chorus_buffer = np.zeros(self.chorus_buffer_size)
self.chorus_ptr = 0
self.chorus_lfo_phase = 0.0
# Delay State
self.delay_len = int(0.4 * SAMPLE_RATE) # 400ms delay
self.delay_buffer = np.zeros(self.delay_len)
self.delay_ptr = 0
def process(self, input_signal, params):
# 1. Chorus Effect (Simple Modulated Delay)
c_mix = params['chorus_mix']
output = input_signal.copy()
if c_mix > 0.01:
# Write to ring buffer
n = len(input_signal)
indices = np.arange(n) + self.chorus_ptr
np.put(self.chorus_buffer, indices % self.chorus_buffer_size, input_signal)
# LFO for Delay Time modulation
lfo_rate = 1.5 # Hz (Fixed for Chorus)
lfo_depth_samples = 150 # ~3ms
base_delay = 300 # ~6ms
lfo_inc = lfo_rate / SAMPLE_RATE
phases = self.chorus_lfo_phase + np.arange(n) * lfo_inc
mod = np.sin(2.0 * np.pi * phases) * lfo_depth_samples
self.chorus_lfo_phase = (self.chorus_lfo_phase + n * lfo_inc) % 1.0
# Calculate Read Pointers (Integer indexing for speed on Pi)
read_pos = (indices - base_delay - mod).astype(int)
delayed_sig = np.take(self.chorus_buffer, read_pos % self.chorus_buffer_size)
# Mix: Dry + Wet
output = output * (1.0 - 0.5 * c_mix) + delayed_sig * c_mix
# Update pointer
self.chorus_ptr = (self.chorus_ptr + n) % self.chorus_buffer_size
# 2. Delay Effect (Simple Feedback Echo)
d_mix = params['delay_mix']
if d_mix > 0.01:
n = len(output)
# Read from delay buffer (Feedback loop)
# Current time reading
read_indices = (np.arange(n) + self.delay_ptr) % self.delay_len
delay_sig = np.take(self.delay_buffer, read_indices)
# Mix Wet signal into Output
output = output + delay_sig * d_mix
# Write Output back to delay buffer (with Feedback decay)
feedback = 0.5
to_write = input_signal + (delay_sig * feedback) # Simple feed-forward + feedback
np.put(self.delay_buffer, read_indices, to_write)
self.delay_ptr = (self.delay_ptr + n) % self.delay_len
# 3. Master Volume & Limiter
output *= params['volume']
np.clip(output, -0.95, 0.95, out=output)
# Stereo Expansion (Simple duplicate for now)
return np.column_stack((output, output)).astype(np.float32)
# ==========================================
# AUDIO PROCESS
# ==========================================
def audio_engine_process(control_queue, viz_queue, feedback_queue, device_id):
print(f"[Audio Process] PID: {multiprocessing.current_process().pid} Started.")
synth = PolySynth()
fx = EffectProcessor() # ★ FX Engine
current_params = DEFAULT_PARAMS.copy()
def apply_note_on(note, velocity):
vel = velocity
if current_params['velocity_sense'] < 0.5:
vel = 127
synth.note_on(note, vel)
def midi_listener():
ports = mido.get_input_names()
target = None
for p in ports:
if any(x in p for x in ["MPK", "Midi Through", "AKAI", "USB Audio"]): target = p
if not target and ports: target = ports[0]
if target:
print(f"[Audio Process] MIDI Connected: {target}")
with mido.open_input(target) as inport:
for msg in inport:
if msg.type == 'note_on' and msg.velocity > 0:
apply_note_on(msg.note, msg.velocity)
elif msg.type == 'note_off' or (msg.type == 'note_on' and msg.velocity==0):
synth.note_off(msg.note)
elif msg.type == 'control_change':
cc = msg.control
val = msg.value / 127.0
if cc in CC_MAP:
key = CC_MAP[cc]
# FX params usually linear 0-1
if key in ['chorus_mix', 'delay_mix', 'volume', 'resonance', 'lfo_amount']:
new_val = val
# Others need scaling
elif key == 'cutoff': new_val = 0.1 + (val * 0.9)
elif key == 'lfo_rate': new_val = 0.1 + val * 20.0
elif key == 'pulse_width': new_val = 0.05 + (val * 0.9)
elif key == 'amp_attack': new_val = 0.01 + val * 2.0
elif key == 'amp_release': new_val = 0.1 + val * 5.0
else: new_val = val
current_params[key] = new_val
feedback_queue.put(('midi_update', key, new_val))
t_midi = threading.Thread(target=midi_listener, daemon=True)
t_midi.start()
def callback(outdata, frames, time_info, status):
try:
while True:
msg = control_queue.get_nowait()
cmd = msg[0]
if cmd == 'set_param': current_params[msg[1]] = msg[2]
elif cmd == 'note_on': apply_note_on(msg[1], msg[2])
elif cmd == 'note_off': synth.note_off(msg[1])
elif cmd == 'stop': raise sd.CallbackAbort
except queue.Empty: pass
# 1. Synthesize (Mono)
raw_sig = synth.get_block(frames, current_params)
# 2. Apply Effects & Stereo Out
final_sig = fx.process(raw_sig, current_params)
if len(final_sig) < len(outdata):
outdata[:len(final_sig)] = final_sig
outdata[len(final_sig):] = 0
else:
outdata[:] = final_sig
try:
# Visualize L channel
viz_queue.put_nowait(final_sig[:, 0].copy())
except queue.Full: pass
try:
with sd.OutputStream(device=device_id, channels=CHANNELS,
callback=callback, samplerate=SAMPLE_RATE,
blocksize=BLOCK_SIZE):
while True: time.sleep(1)
except KeyboardInterrupt: pass
except Exception as e: print(f"[Audio] Error: {e}")
# ==========================================
# GUI PROCESS
# ==========================================
class SynthGUI:
def __init__(self, root, control_queue, viz_queue, feedback_queue):
self.root = root
self.control_queue = control_queue
self.viz_queue = viz_queue
self.feedback_queue = feedback_queue
self.params = DEFAULT_PARAMS.copy()
self.suppress_send = False
self.midi_thread = None
self.midi_playing = False
self.midi_file_path = ""
self.root.title("Raspi4 PolySynth v9.0 - FX Edition")
self.root.geometry("950x580") # 少し高さを確保
self.root.configure(bg="#2b2b2b")
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
self.sliders = {}
self.view_mode = "SCOPE"
self.setup_ui()
self.setup_plots()
self.update_plot()
self.check_feedback_loop()
def setup_ui(self):
style = ttk.Style()
style.theme_use('clam')
style.configure("TLabelframe", background="#2b2b2b", foreground="white")
style.configure("TLabelframe.Label", background="#2b2b2b", foreground="#00ff00")
style.configure("TCheckbutton", background="#2b2b2b", foreground="white")
main_container = tk.Frame(self.root, bg="#2b2b2b")
main_container.pack(fill="both", expand=True)
left_frame = tk.Frame(main_container, bg="#2b2b2b", width=380)
left_frame.pack(side="left", fill="y", padx=5, pady=5)
self.right_frame = tk.Frame(main_container, bg="#101010")
self.right_frame.pack(side="right", fill="both", expand=True, padx=5, pady=5)
self.create_controls(left_frame)
self.create_player_controls(left_frame)
btn_frame = tk.Frame(left_frame, bg="#2b2b2b")
btn_frame.pack(side="bottom", fill="x", pady=5)
self.view_btn = tk.Button(btn_frame, text="VIEW", command=self.toggle_view,
bg="#0066cc", fg="white", font=("Arial", 9, "bold"), height=1)
self.view_btn.pack(side="left", fill="x", expand=True, padx=2)
exit_btn = tk.Button(btn_frame, text="EXIT", command=self.on_close,
bg="#cc0000", fg="white", font=("Arial", 9, "bold"), height=1)
exit_btn.pack(side="right", fill="x", expand=True, padx=2)
def create_controls(self, parent):
groups = [
("OSCILLATOR [K5]", [("Pulse Width", "pulse_width", 0.05, 0.95)]),
("FILTER / TONE [K1, K2]", [("Cutoff", "cutoff", 0.1, 1.0), ("Resonance", "resonance", 0, 1.0)]),
("MODULATION [Slider]", [("Rate", "lfo_rate", 0.1, 20.0), ("Amount", "lfo_amount", 0.0, 1.0)]),
("ENVELOPE [K3, K4]", [("Attack", "amp_attack", 0.01, 2.0), ("Release", "amp_release", 0.1, 5.0)]),
# ★ New Effects Section
("EFFECTS [K6, K7]", [("Chorus Mix", "chorus_mix", 0.0, 1.0), ("Delay Mix", "delay_mix", 0.0, 1.0)]),
("MASTER [K8]", [("Volume", "volume", 0.0, 1.0)])
]
osc_frame = ttk.LabelFrame(parent, text="OSCILLATOR [K5]")
osc_frame.pack(fill="x", pady=2)
wave_sub = tk.Frame(osc_frame, bg="#2b2b2b")
wave_sub.pack(side="top", fill="x", pady=2)
self.wave_var = tk.StringVar(value=self.params['waveform'])
for label, val in [("Sqr", "square"), ("Saw", "saw"), ("Tri", "tri"), ("Sin", "sine")]:
ttk.Radiobutton(wave_sub, text=label, value=val, variable=self.wave_var,
command=self.on_wave_change).pack(side="left", padx=5)
for g_name, ctrls in groups:
if g_name.startswith("OSC"): frame = osc_frame
elif g_name.startswith("MASTER"):
frame = ttk.LabelFrame(parent, text=g_name)
frame.pack(fill="x", pady=2)
self.vel_sense_var = tk.IntVar(value=int(self.params['velocity_sense']))
cb = ttk.Checkbutton(frame, text="Velocity Sense", variable=self.vel_sense_var,
command=self.on_vel_sense_change)
cb.pack(anchor="w", padx=5, pady=0)
else:
frame = ttk.LabelFrame(parent, text=g_name)
frame.pack(fill="x", pady=2)
for label, key, min_v, max_v in ctrls:
f = tk.Frame(frame, bg="#2b2b2b")
f.pack(fill="x", padx=5, pady=0)
tk.Label(f, text=label, width=12, anchor="w", bg="#2b2b2b", fg="white", font=("Arial", 8)).pack(side="left")
s = tk.Scale(f, from_=min_v, to=max_v, orient="horizontal", resolution=0.01,
bg="#404040", fg="white", highlightthickness=0, width=10,
command=lambda v, k=key: self.on_slider_change(k, v))
s.set(self.params[key])
s.pack(side="right", fill="x", expand=True)
self.sliders[key] = s
def create_player_controls(self, parent):
player_frame = ttk.LabelFrame(parent, text="MIDI PLAYER")
player_frame.pack(fill="x", pady=5)
self.lbl_file = tk.Label(player_frame, text="No file loaded", bg="#2b2b2b", fg="#aaaaaa", anchor="w", font=("Arial", 8))
self.lbl_file.pack(fill="x", padx=5, pady=0)
btn_box = tk.Frame(player_frame, bg="#2b2b2b")
btn_box.pack(fill="x", pady=2)
tk.Button(btn_box, text="LOAD", command=self.load_midi_file,
bg="#444444", fg="white", width=8, font=("Arial", 8)).pack(side="left", padx=2)
tk.Button(btn_box, text="PLAY", command=self.play_midi,
bg="#008800", fg="white", width=6, font=("Arial", 8)).pack(side="left", padx=2)
tk.Button(btn_box, text="STOP", command=self.stop_midi,
bg="#880000", fg="white", width=6, font=("Arial", 8)).pack(side="left", padx=2)
def load_midi_file(self):
path = filedialog.askopenfilename(filetypes=[("MIDI Files", "*.mid"), ("All Files", "*.*")])
if path:
self.midi_file_path = path
self.lbl_file.config(text=os.path.basename(path))
def play_midi(self):
if not self.midi_file_path: return
if self.midi_playing: return
self.midi_playing = True
self.midi_thread = threading.Thread(target=self._midi_worker, daemon=True)
self.midi_thread.start()
def stop_midi(self):
self.midi_playing = False
def _midi_worker(self):
try:
mid = mido.MidiFile(self.midi_file_path)
for msg in mid.play():
if not self.midi_playing: break
if msg.type == 'note_on' and msg.velocity > 0:
self.control_queue.put(('note_on', msg.note, msg.velocity))
elif msg.type == 'note_off' or (msg.type == 'note_on' and msg.velocity == 0):
self.control_queue.put(('note_off', msg.note))
except Exception: pass
self.midi_playing = False
def setup_plots(self):
self.fig = Figure(figsize=(5, 3.2), dpi=100, facecolor="#101010")
self.ax1 = self.fig.add_subplot(211)
self.ax2 = self.fig.add_subplot(212)
self.canvas = FigureCanvasTkAgg(self.fig, master=self.right_frame)
self.canvas.get_tk_widget().pack(fill="both", expand=True)
self.line1, = self.ax1.plot([], [], color="#00ff00", lw=1)
self.line2, = self.ax2.plot([], [], color="#00ffff", lw=1)
self.x_wave = np.arange(0, BLOCK_SIZE, 4)
self.x_freq = np.fft.rfftfreq(BLOCK_SIZE, 1/SAMPLE_RATE)
self.setup_scope_view()
def setup_scope_view(self):
self.ax1.clear(); self.ax2.clear()
self.ax1.set_facecolor("#000000"); self.ax2.set_facecolor("#000000")
self.ax1.set_title("Waveform", color="white", fontsize=8)
self.ax2.set_title("Spectrum", color="white", fontsize=8)
self.ax1.tick_params(labelsize=6)
self.ax2.tick_params(labelsize=6)
self.ax1.set_ylim(-1.1, 1.1); self.ax1.set_xlim(0, BLOCK_SIZE)
self.ax2.set_ylim(0, 0.5); self.ax2.set_xlim(0, 5000)
self.ax1.grid(True, color="#333333"); self.ax2.grid(True, color="#333333")
self.line1, = self.ax1.plot(self.x_wave, np.zeros(len(self.x_wave)), color="#00ff00", lw=1)
self.line2, = self.ax2.plot(self.x_freq, np.zeros(len(self.x_freq)), color="#00ffff", lw=1)
self.fig.tight_layout()
self.canvas.draw()
def setup_param_view(self):
self.ax1.clear(); self.ax2.clear()
self.ax1.set_facecolor("#000000"); self.ax2.set_facecolor("#000000")
self.ax1.set_title("ADSR Setting", color="white", fontsize=8)
self.ax2.set_title("VCF Response", color="white", fontsize=8)
self.ax1.tick_params(labelsize=6)
self.ax2.tick_params(labelsize=6)
self.ax1.set_ylim(0, 1.1); self.ax1.set_xlim(0, 3.0)
self.ax2.set_ylim(0, 1.2); self.ax2.set_xlim(20, 10000)
self.ax2.set_xscale('log')
self.ax1.grid(True, color="#333333"); self.ax2.grid(True, color="#333333")
self.line1, = self.ax1.plot([], [], color="#ffcc00", lw=2)
self.line2, = self.ax2.plot([], [], color="#ff00ff", lw=2)
self.fig.tight_layout()
self.canvas.draw()
def on_slider_change(self, key, value):
if self.suppress_send: return
val = float(value)
self.params[key] = val
self.control_queue.put(('set_param', key, val))
def on_wave_change(self):
val = self.wave_var.get()
self.params['waveform'] = val
self.control_queue.put(('set_param', 'waveform', val))
def on_vel_sense_change(self):
val = float(self.vel_sense_var.get())
self.params['velocity_sense'] = val
self.control_queue.put(('set_param', 'velocity_sense', val))
def toggle_view(self):
if self.view_mode == "SCOPE":
self.view_mode = "PARAM"
self.setup_param_view()
else:
self.view_mode = "SCOPE"
self.setup_scope_view()
def check_feedback_loop(self):
try:
while not self.feedback_queue.empty():
msg = self.feedback_queue.get_nowait()
if msg[0] == 'midi_update':
key, val = msg[1], msg[2]
self.suppress_send = True
if key in self.sliders:
self.sliders[key].set(val)
self.params[key] = val
self.suppress_send = False
except queue.Empty: pass
self.root.after(50, self.check_feedback_loop)
def update_plot(self):
if self.view_mode == "SCOPE":
try:
raw_audio = None
while not self.viz_queue.empty():
raw_audio = self.viz_queue.get_nowait()
if raw_audio is not None:
ds_audio = raw_audio[::4]
if len(ds_audio) == len(self.x_wave):
self.line1.set_ydata(ds_audio * 3.0)
window = np.hanning(len(raw_audio))
fft_res = np.fft.rfft(raw_audio * window)
fft_mag = np.abs(fft_res) / len(raw_audio) * 15.0
if len(fft_mag) == len(self.x_freq):
self.line2.set_ydata(fft_mag)
self.canvas.draw_idle()
except queue.Empty: pass
else:
att, rel = self.params['amp_attack'], self.params['amp_release']
t_adsr = np.linspace(0, 3.0, 300)
y_adsr = np.zeros_like(t_adsr)
hold = 0.5
for i, t in enumerate(t_adsr):
if t < att: y_adsr[i] = t/att if att>0 else 1
elif t < att+hold: y_adsr[i] = 1
elif t < att+hold+rel: y_adsr[i] = 1 - (t-(att+hold))/rel
self.line1.set_data(t_adsr, y_adsr)
cutoff = self.params['cutoff']
res = self.params['resonance']
fc = 50 + cutoff * 8000
f_vcf = np.logspace(np.log10(20), np.log10(10000), 200)
q = 0.7 + res * 4.0
ratio = f_vcf / fc
y_vcf = np.clip(1.0 / np.sqrt((1-ratio**2)**2 + (ratio/q)**2), 0, 1.2)
self.line2.set_data(f_vcf, y_vcf)
self.canvas.draw_idle()
self.root.after(int(1000/VIZ_FPS), self.update_plot)
def on_close(self):
self.midi_playing = False
self.control_queue.put(('stop', None, None))
self.root.quit()
self.root.destroy()
def find_audio_device_safe():
devices = sd.query_devices()
print("\n--- Audio Devices ---")
target_id = None
for i, dev in enumerate(devices):
if dev['max_output_channels'] > 0:
print(f"ID {i}: {dev['name']}")
if "USB" in dev['name']: target_id = i
return target_id if target_id is not None else sd.default.device[1]
if __name__ == "__main__":
multiprocessing.freeze_support()
control_q = multiprocessing.Queue()
viz_q = multiprocessing.Queue(maxsize=2)
feedback_q = multiprocessing.Queue()
dev_id = find_audio_device_safe()
print(f"\nTarget Audio Device ID: {dev_id}")
p_audio = multiprocessing.Process(
target=audio_engine_process,
args=(control_q, viz_q, feedback_q, dev_id),
daemon=True
)
p_audio.start()
try:
root = tk.Tk()
app = SynthGUI(root, control_q, viz_q, feedback_q)
root.mainloop()
except KeyboardInterrupt:
pass
finally:
print("Terminating...")
p_audio.terminate()
p_audio.join()
sys.exit(0)

FM音源版(未完成 Python)
# py_fm_synth_rpi4_v12.1_sync.py
# Version: 12.1.0
# Description: 4-OP FM with full GUI-Preset-Operator synchronization.
import sounddevice as sd
import numpy as np
import threading
import tkinter as tk
from tkinter import ttk
import mido
import time
import sys
import multiprocessing
from queue import Empty
# --- 設定 ---
SAMPLE_RATE = 48000
BLOCK_SIZE = 1024
CHANNELS = 2
MAX_VOICES = 3
PLOT_INTERVAL_MS = 100
# --- 4-OP プリセット ---
PRESETS = {
"4-OP Grand Piano": {
'algo': 1, 'master_gain': 0.55, 'fb': 0.4,
'ops': [
{'idx': 1.0, 'ratio': 1.0, 'atk': 0.01, 'rel': 1.2}, # Op1
{'idx': 2.2, 'ratio': 1.003, 'atk': 0.01, 'rel': 0.5}, # Op2
{'idx': 0.8, 'ratio': 7.0, 'atk': 0.01, 'rel': 0.2}, # Op3
{'idx': 2.0, 'ratio': 14.0, 'atk': 0.01, 'rel': 0.1} # Op4
]
},
"4-OP Trumpet": {
'algo': 1, 'master_gain': 0.4, 'fb': 0.7,
'ops': [
{'idx': 1.0, 'ratio': 1.0, 'atk': 0.08, 'rel': 0.2},
{'idx': 3.5, 'ratio': 1.0, 'atk': 0.1, 'rel': 0.2},
{'idx': 1.0, 'ratio': 2.0, 'atk': 0.1, 'rel': 0.2},
{'idx': 0.5, 'ratio': 3.0, 'atk': 0.1, 'rel': 0.2}
]
},
"4-OP Pipe Organ": {
'algo': 3, 'master_gain': 0.4, 'fb': 0.0,
'ops': [
{'idx': 1.0, 'ratio': 1.0, 'atk': 0.05, 'rel': 0.2},
{'idx': 0.5, 'ratio': 2.0, 'atk': 0.05, 'rel': 0.2},
{'idx': 0.3, 'ratio': 4.0, 'atk': 0.05, 'rel': 0.2},
{'idx': 0.2, 'ratio': 0.5, 'atk': 0.05, 'rel': 0.2}
]
}
}
# パラメータレンジ定義
RANGE = {
'idx': (0.0, 20.0), 'ratio': (0.5, 15.0), 'atk': (0.01, 2.0), 'rel': (0.01, 2.0),
'fb': (0.0, 0.95), 'master_gain': (0.0, 1.0)
}
# ==========================================
# Audio Process (Independent Core)
# ==========================================
class FMVoice4Op:
def __init__(self, sample_rate, block_size):
self.sample_rate = sample_rate
self.block_size = block_size
self.active, self.gate, self.finished = False, False, True
self.note = 0; self.velocity = 0.0
self.phases = np.zeros(4); self.env_levels = np.zeros(4)
self.prev_fb_out = 0.0; self.carrier_buf = np.zeros(block_size, dtype=np.float32)
def trigger(self, note, velocity):
self.note, self.velocity = note, velocity
self.gate, self.active, self.finished = True, True, False
def add_to_buffer(self, mix_buffer, params):
if self.finished: return
target_v = (params['fixed_vel_val'] if params['use_fixed_vel'] else self.velocity) if self.gate else 0.0
envs = []
for i in range(4):
p = params['ops'][i]
rate = p['atk'] if self.gate else p['rel']
step = (target_v - self.env_levels[i]) * (self.block_size / (self.sample_rate * rate))
e = np.linspace(self.env_levels[i], self.env_levels[i] + step, self.block_size)
self.env_levels[i] = e[-1]; envs.append(e)
if not self.gate and self.env_levels[0] < 0.001:
self.active, self.finished = False, True; return
f0 = 440.0 * (2 ** ((self.note - 69) / 12.0))
t = np.arange(self.block_size) / self.sample_rate
algo = params['algo']
op_outs = [np.zeros(self.block_size) for _ in range(4)]
for i in reversed(range(4)):
p = params['ops'][i]; freq = f0 * p['ratio']
mod_in = np.zeros(self.block_size)
if algo == 1:
if i < 3: mod_in = op_outs[i+1] * params['ops'][i+1]['idx'] * envs[i+1]
elif algo == 2:
if i == 2: mod_in = op_outs[3] * params['ops'][3]['idx'] * envs[3]
if i == 0: mod_in = op_outs[1] * params['ops'][1]['idx'] * envs[1]
fb_val = (self.prev_fb_out * params['fb'] * 2.0) if i == 3 else 0.0
sig = np.sin(2 * np.pi * freq * t + self.phases[i] + mod_in + fb_val)
op_outs[i] = sig
if i == 3: self.prev_fb_out = sig[-1]
self.phases[i] = (self.phases[i] + 2 * np.pi * freq * self.block_size / self.sample_rate) % (2 * np.pi)
final_out = np.zeros(self.block_size)
if algo == 1: final_out = op_outs[0] * params['ops'][0]['idx'] * envs[0]
elif algo == 2: final_out = (op_outs[0] * envs[0] + op_outs[2] * envs[2]) * 0.5
elif algo == 3:
for i in range(4): final_out += op_outs[i] * envs[i] * 0.25
np.add(mix_buffer, final_out, out=mix_buffer)
def audio_process_entry(cmd_q, mon_q):
from copy import deepcopy
# 初期化
current_params = deepcopy(PRESETS["4-OP Grand Piano"])
current_params.update({'use_fixed_vel': False, 'fixed_vel_val': 1.0})
voices = [FMVoice4Op(SAMPLE_RATE, BLOCK_SIZE) for _ in range(MAX_VOICES)]
mix_buffer = np.zeros(BLOCK_SIZE, dtype=np.float32)
devices = sd.query_devices()
target_id = next((i for i, d in enumerate(devices) if d['max_output_channels'] > 0 and "USB" in d['name']), sd.default.device[1])
def callback(outdata, frames, time_info, status):
try:
while True:
cmd, key, val = cmd_q.get_nowait()
if cmd == 'note_on':
for v in voices:
if not v.active: v.trigger(key, val); break
elif cmd == 'note_off':
for v in voices:
if v.note == key: v.gate = False
elif cmd == 'param':
if '.' in key: # ops.0.idx
_, op_i, p_k = key.split('.'); current_params['ops'][int(op_i)][p_k] = val
else: current_params[key] = val
elif cmd == 'preset':
# 音色を深くコピーして反映
new_p = deepcopy(PRESETS[key])
for k in new_p: current_params[k] = new_p[k]
except Empty: pass
mix_buffer.fill(0.0); active = False
for v in voices:
if v.active: v.add_to_buffer(mix_buffer, current_params); active = True
np.multiply(mix_buffer, current_params['master_gain'], out=mix_buffer)
np.clip(mix_buffer, -1.0, 1.0, out=mix_buffer)
outdata[:, 0] = mix_buffer; outdata[:, 1] = mix_buffer
if mon_q.empty() and active: mon_q.put_nowait(mix_buffer[::8].copy())
with sd.OutputStream(device=target_id, channels=CHANNELS, samplerate=SAMPLE_RATE, blocksize=BLOCK_SIZE, latency='high', callback=callback):
while True: time.sleep(1)
# ==========================================
# Main Process (GUI & MIDI)
# ==========================================
class SynthGUI:
def __init__(self, root, cmd_q, mon_q):
self.root, self.cmd_q, self.mon_q = root, cmd_q, mon_q
self.root.title("FM Synth v12.1 (Full Sync)")
# 現在のパラメータ状態をGUI側でも保持(同期用)
from copy import deepcopy
self.gui_params = deepcopy(PRESETS["4-OP Grand Piano"])
# --- Layout ---
self.top = ttk.Frame(root, padding=5); self.top.pack(side=tk.TOP, fill=tk.X)
self.mid = ttk.Frame(root, padding=5); self.mid.pack(side=tk.TOP, fill=tk.X)
self.left = ttk.Frame(root, padding=10); self.left.pack(side=tk.LEFT, fill=tk.Y)
self.right = ttk.Frame(root, padding=10); self.right.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
# Op Selector
op_sel_frame = ttk.LabelFrame(self.top, text="Edit Operator Select", padding=5); op_sel_frame.pack(side=tk.LEFT, padx=5)
self.op_var = tk.IntVar(value=0)
for i in range(4):
ttk.Radiobutton(op_sel_frame, text=f"Op{i+1}", variable=self.op_var, value=i, command=self.sync_gui_to_current_op).pack(side=tk.LEFT, padx=10)
# Global
glob_frame = ttk.LabelFrame(self.top, text="Global Settings", padding=5); glob_frame.pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)
self.algo_var = tk.DoubleVar(value=1.0)
ttk.Label(glob_frame, text="Algo:").pack(side=tk.LEFT)
ttk.Scale(glob_frame, from_=1, to=3, variable=self.algo_var, command=self.on_algo_change).pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)
# Knobs
k_frame = ttk.LabelFrame(self.mid, text="K1-K8 Knobs Sync", padding=5); k_frame.pack(fill=tk.X)
self.knob_vars = {}; self.knob_labels = {}
for i in range(1, 9):
f = ttk.Frame(k_frame); f.pack(side=tk.LEFT, padx=10, expand=True)
ttk.Label(f, text=f"K{i}", font=('Arial', 8, 'bold')).pack()
v = tk.DoubleVar(value=0)
s = ttk.Scale(f, from_=1.0, to=0.0, orient=tk.VERTICAL, length=120, variable=v)
s.pack(pady=2); self.knob_vars[i] = v
lbl = ttk.Label(f, text="-", font=('Arial', 8), foreground="blue"); lbl.pack(); self.knob_labels[i] = lbl
# Settings
s_frame = ttk.LabelFrame(self.left, text="Settings", padding=5); s_frame.pack(fill=tk.X)
self.preset_var = tk.StringVar(value="4-OP Grand Piano")
self.combo = ttk.Combobox(s_frame, textvariable=self.preset_var, values=list(PRESETS.keys()), state="readonly")
self.combo.pack(fill=tk.X, pady=5); self.combo.bind("<<ComboboxSelected>>", self.on_preset_select)
self.v_fix = tk.BooleanVar(value=False)
ttk.Checkbutton(s_frame, text="Velocity Fix", variable=self.v_fix, command=lambda: self.cmd_q.put(('param', 'use_fixed_vel', self.v_fix.get()))).pack(anchor=tk.W)
ttk.Button(self.left, text="EXIT", command=sys.exit).pack(pady=40)
# Visuals
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
self.fig = Figure(figsize=(5, 3), dpi=80); self.ax_w = self.fig.add_subplot(211); self.ax_f = self.fig.add_subplot(212)
self.fig.subplots_adjust(hspace=0.6); self.line_w, = self.ax_w.plot(np.zeros(BLOCK_SIZE//8))
self.ax_w.set_ylim(-1.1, 1.1); self.line_f, = self.ax_f.plot(np.zeros(BLOCK_SIZE//16 + 1))
self.ax_f.set_ylim(0, 100); self.canvas = FigureCanvasTkAgg(self.fig, master=self.right); self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
self.sync_gui_to_current_op()
self.update_plot()
def on_algo_change(self, v):
val = int(float(v))
self.gui_params['algo'] = val
self.cmd_q.put(('param', 'algo', val))
def on_preset_select(self, e):
name = self.preset_var.get()
from copy import deepcopy
self.gui_params = deepcopy(PRESETS[name])
self.cmd_q.put(('preset', name, 0))
# 画面の全スライダーとAlgoを更新
self.sync_gui_to_current_op()
self.algo_var.set(float(self.gui_params['algo']))
def sync_gui_to_current_op(self):
""" オペレータ切り替えやプリセット変更時にGUIスライダーを連動させる """
op_idx = self.op_var.get()
p = self.gui_params['ops'][op_idx]
# ラベル更新
labels = {1: f"Op{op_idx+1} Idx", 2: f"Op{op_idx+1} Ratio", 3: f"Op{op_idx+1} Atk", 4: f"Op{op_idx+1} Rel", 5: "Feedback", 6: "Algo", 7: "-", 8: "Master Gain"}
for i, text in labels.items(): self.knob_labels[i].config(text=text)
# スライダー値の逆換算設定
def norm(val, key): return (val - RANGE[key][0]) / (RANGE[key][1] - RANGE[key][0])
self.knob_vars[1].set(norm(p['idx'], 'idx'))
self.knob_vars[2].set(norm(p['ratio'], 'ratio'))
self.knob_vars[3].set(norm(p['atk'], 'atk'))
self.knob_vars[4].set(norm(p['rel'], 'rel'))
self.knob_vars[5].set(norm(self.gui_params['fb'], 'fb'))
self.knob_vars[6].set(norm(self.gui_params['algo'], 'master_gain')) # Algoは便宜上0-1にマップ
self.knob_vars[8].set(norm(self.gui_params['master_gain'], 'master_gain'))
def update_plot(self):
try:
data = None
while not self.mon_q.empty(): data = self.mon_q.get_nowait()
if data is not None:
self.line_wave.set_ydata(data)
fft = 20 * np.log10(np.abs(np.fft.rfft(data * np.hanning(len(data)))) + 1e-6) + 60
self.line_fft.set_ydata(fft); self.canvas.draw_idle()
except: pass
self.root.after(PLOT_INTERVAL_MS, self.update_plot)
def midi_loop(cmd_q, gui):
ports = mido.get_input_names()
target = next((p for p in ports if "AKAI" in p or "MIDI" in p), None)
if not target: return
with mido.open_input(target) as port:
for msg in port:
if msg.type == 'note_on' and msg.velocity > 0: cmd_q.put(('note_on', msg.note, msg.velocity/127.0))
elif msg.type in ['note_off', 'note_on']: cmd_q.put(('note_off', msg.note, 0))
elif msg.type == 'control_change' and 1 <= msg.control <= 8:
cc = msg.control; norm = msg.value / 127.0; op = gui.op_var.get()
# パラメータ計算
def denorm(n, key): return RANGE[key][0] + n * (RANGE[key][1] - RANGE[key][0])
if cc == 1:
val = denorm(norm, 'idx'); gui.gui_params['ops'][op]['idx'] = val
cmd_q.put(('param', f'ops.{op}.idx', val))
elif cc == 2:
val = denorm(norm, 'ratio'); gui.gui_params['ops'][op]['ratio'] = val
cmd_q.put(('param', f'ops.{op}.ratio', val))
elif cc == 3:
val = denorm(norm, 'atk'); gui.gui_params['ops'][op]['atk'] = val
cmd_q.put(('param', f'ops.{op}.atk', val))
elif cc == 4:
val = denorm(norm, 'rel'); gui.gui_params['ops'][op]['rel'] = val
cmd_q.put(('param', f'ops.{op}.rel', val))
elif cc == 5:
val = denorm(norm, 'fb'); gui.gui_params['fb'] = val
cmd_q.put(('param', 'fb', val))
elif cc == 6:
val = int(1 + norm * 2); gui.gui_params['algo'] = val
cmd_q.put(('param', 'algo', val))
gui.root.after(0, lambda v=val: gui.algo_var.set(float(v)))
elif cc == 8:
val = norm; gui.gui_params['master_gain'] = val
cmd_q.put(('param', 'master_gain', val))
gui.root.after(0, lambda c=cc, n=norm: gui.knob_vars[c].set(n))
if __name__ == "__main__":
multiprocessing.set_start_method('spawn', force=True)
cmd_q, mon_q = multiprocessing.Queue(), multiprocessing.Queue()
proc = multiprocessing.Process(target=audio_process_entry, args=(cmd_q, mon_q), daemon=True); proc.start()
root = tk.Tk(); root.geometry("950x700")
app = SynthGUI(root, cmd_q, mon_q)
threading.Thread(target=midi_loop, args=(cmd_q, app), daemon=True).start(); root.mainloop()