initial commit

This commit is contained in:
parth aranke 2024-05-28 20:37:29 +05:30
commit 2095d0f5ff
93 changed files with 465 additions and 0 deletions

BIN
amp_net.pth Normal file

Binary file not shown.

113
divideStereo.py Normal file
View File

@ -0,0 +1,113 @@
from pydub import AudioSegment
import numpy as np
from scipy.io.wavfile import write
import time
# Load the audio file
audio = AudioSegment.from_file("sitare.wav")
# Ensure the audio is stereo
if audio.channels != 2:
raise ValueError("The audio file must be stereo.")
# Function to convert audio samples from int16 to float32
def int16_to_float32(samples):
return samples.astype(np.float32) / 32768.0
# Function to convert audio samples from float32 to int16
def float32_to_int16(samples):
return (samples * 32768).astype(np.int16)
# Function to calculate the average amplitude of float32 samples
def calculate_average_amplitude(float_samples, max_amp):
if max_amp == 0:
return 0
# Calculate the absolute values of the samples
abs_samples = np.abs(float_samples)
# Calculate the average amplitude
avg_amplitude = np.mean(abs_samples)
return avg_amplitude/max_amp
def audiosegment_to_numpy_array(audio_segment):
# Extract raw data from AudioSegment
raw_data = audio_segment.raw_data
# Get the number of channels
num_channels = audio_segment.channels
# Get sample width in bytes
sample_width = audio_segment.sample_width
# Create a numpy array from the raw data
dtype = np.int16 if sample_width == 2 else np.int8
audio_data = np.frombuffer(raw_data, dtype=dtype)
# Reshape the array based on the number of channels
audio_data = audio_data.reshape((-1, num_channels))
return audio_data
def numpy_array_to_audio_segment(samples, sample_width=2, frame_rate=44100, channels=2):
# Convert the numpy array to bytes
# print(samples.shape)
if samples.shape[0] == 2 and samples.dtype == np.float32:
samples = np.array([[x, y] for x, y in zip(samples[0], samples[1])], dtype=np.float32)
samples = (samples * 32767).astype(np.int16)
sample_width = 2
elif samples.dtype == np.float32:
# print(samples.shape)
samples = (samples * 32767).astype(np.int16)
sample_width = 2
raw_data = samples.tobytes()
# Create an AudioSegment from the raw byte data
audio_segment = AudioSegment(
data=raw_data,
sample_width=sample_width,
frame_rate=frame_rate,
channels=channels
)
return audio_segment
def add_stereo(segments, seg_length, sample_rate):
final_audio = numpy_array_to_audio_segment(segments[0], frame_rate=sample_rate, sample_width=4)
for segment in segments[1:]:
final_audio = final_audio.overlay(numpy_array_to_audio_segment(segment, frame_rate=sample_rate, sample_width=4))
return int16_to_float32(audiosegment_to_numpy_array(final_audio))
# Function to split the audio into 100 parts with different panning, save to files, and calculate average amplitude
def split_stereo(segment, num_parts=100, max_amp=0, sr=44100):
audio = numpy_array_to_audio_segment(segment, frame_rate=sr)
# Calculate the panning step
pan_step = 2 / (num_parts - 1) # Range -1 to 1 divided into 100 steps
avg_amplitudes = []
audios = []
for i in range(num_parts):
# Calculate panning value
pan_value = -1 + i * pan_step
# Apply panning to the audio
panned_audio = audio.pan(pan_value)
# Export panned audio to raw data
raw_data = panned_audio.raw_data
# Convert raw data to numpy array (int16)
samples = np.frombuffer(raw_data, dtype=np.int16)
# Reshape to (number_of_samples, 2) since it is stereo
samples = samples.reshape((-1, 2))
# Convert int 16 to float32
float_samples = int16_to_float32(samples)
# Calculate average amplitude for the current part
avg_amplitude = calculate_average_amplitude(float_samples, max_amp)
avg_amplitudes.append(avg_amplitude)
audios.append(float_samples)
return avg_amplitudes, np.array(audios)

134
entry.py Normal file
View File

@ -0,0 +1,134 @@
import librosa
import numpy as np
import torch
import os
from scipy.io.wavfile import write, read
from model import BigramLanguageModel
from divideStereo import split_stereo, add_stereo
class ProceesAudio():
audio_data = []
final_audio = []
device = 'cuda' if torch.cuda.is_available() else 'cpu'
target_audio = []
def _split_audio_s(self, file_path, segment_length=0.5, overlap=0):
print(file_path)
# Load the stereo audio file
audio, sr = librosa.load(file_path , sr=None, mono=False, dtype=np.float32)
# Calculate segment and overlap samples
segment_samples = int(segment_length * sr)
overlap_samples = int(segment_samples * overlap)
# Split the stereo audio into segments while preserving stereo channels
segments = []
for start in range(0, audio.shape[1], segment_samples - overlap_samples):
segment = audio[:, start:start + segment_samples]
if segment.shape[1] == segment_samples:
segments.append(segment)
return segments, sr
def _split_audio(self, file_path, segment_length=0.1, overlap=0):
audio, sr = librosa.load(file_path, sr=None)
segment_samples = int(segment_length * sr)
overlap_samples = int(segment_samples * overlap)
segments = []
for start in range(0, len(audio), segment_samples - overlap_samples):
segment = audio[start:start + segment_samples]
if len(segment) == segment_samples:
segments.append(segment)
return segments, sr
def _calculate_average_amplitude(self, segments, sr, n_fft=2048, hop_length=256, num_frequency_bands=100):
# for segment in segments:
# stft = librosa.stft(segment, n_fft=n_fft, hop_length=hop_length)
# magnitude = np.abs(stft)
# max_amplitude = max(max_amplitude, np.max(magnitude))
ret=[]
audios = []
max_amp = 0
for segment in segments:
max_amp = max(max_amp, np.max(np.abs(segment)))
for segment in segments:
amps, _ = split_stereo(segment=segment, max_amp=max_amp, sr=sr, num_parts=10)
ret.append(amps)
return ret, audios
def _process_main(self, file_path):
segments, sr = self._split_audio_s(file_path)
amps, _ = self._calculate_average_amplitude(segments, sr)
for amp in amps:
self.audio_data.append(amp)
def _get_output_amps(self, input_amps, index):
model = BigramLanguageModel()
model.to("cpu", dtype=float)
model.load_state_dict(torch.load('amp_net.pth', map_location=torch.device('cpu')))
return model.generate(torch.tensor(input_amps[:index+1]).view(1, index+1, len(input_amps[0])).to(self.device), len(input_amps)-index + 1)
def make_smooth(self, audio, gain, prev_gain):
smooth_index = 1000
mult_arr_in = np.linspace(prev_gain, gain, num=smooth_index)
for i in range(smooth_index):
audio[:smooth_index][i] *= mult_arr_in[i]
audio[smooth_index:] *= gain
return audio
def perform_modulation(self, file, index, output_file_name, num_frequency_bands=100):
segments, sr = self._split_audio_s(file)
max_amp = 0
for segment in segments:
max_amp = max(max_amp, np.max(np.abs(segment)))
x, _ = self._calculate_average_amplitude(segments=segments, sr=sr)
#print(x.shape)
y = self._get_output_amps(x, index)
modified_segs = []
prev_gains = np.ones(10)
for segment, mod in zip(segments, y[0]):
_, audios = split_stereo(segment=segment, max_amp=max_amp, sr=sr, num_parts=10)
final_audios = []
curr_gains = []
for audio, target_amp, i in zip(audios, mod, range(10)):
gain = (target_amp.item()/np.mean(np.abs(audio)))*max_amp
if np.mean(np.abs(audio)) == 0:
gain=0
elif gain <= 50:
gain = gain/50
else:
gain=1
audio = self.make_smooth(audio, gain, prev_gains[i])
curr_gains.append(gain)
final_audios.append(audio)
prev_gains = curr_gains
modified_seg = add_stereo(final_audios, len(final_audios[0]), sample_rate=sr)
modified_segs.append(modified_seg)
modified_segs = np.concatenate(modified_segs)
write("sitare_modified.wav", rate=sr, data=modified_segs.astype(np.float32))
def get_training_data(self, file_path, data_dir):
for song in os.listdir(data_dir):
self.audio_data = []
self._process_main(os.path.join(data_dir, song))
audio_data = np.array(self.audio_data)
tensor_data = torch.tensor(audio_data, dtype=torch.float32)
torch.save(tensor_data, file_path + '/' + song + '.data.pt')
pa = ProceesAudio()
#pa.get_training_data('extracted_data', 'songs')
pa.perform_modulation('sitare.wav', 0, 'output.wav')

191
model.py Normal file
View File

@ -0,0 +1,191 @@
import torch
import torch.nn as nn
from torch.nn import functional as F
import os
# hyperparameters
batch_size = 32 # how many independent sequences will we process in parallel?
block_size = 5 # what is the maximum context length for predictions?
max_iters = 2500
eval_interval = 50
learning_rate = 1e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 10
n_head = 5
n_layer = 5
dropout = 0.2
# ------------
torch.manual_seed(1337)
B = 1
T = 40
C = 10
vocab_size = 10
class Head(nn.Module):
""" one head of self-attention """
def __init__(self, head_size, block_size):
super().__init__()
self.key = nn.Linear(n_embd, head_size, bias=False)
self.query = nn.Linear(n_embd, head_size, bias=False)
self.value = nn.Linear(n_embd, head_size, bias=False)
self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
self.dropout = nn.Dropout(dropout)
def forward(self, x):
B,T,C = x.shape
k = self.key(x) # (B,T,C)
q = self.query(x) # (B,T,C)
# compute attention scores ("affinities")
wei = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
wei = F.softmax(wei, dim=-1) # (B, T, T)
wei = self.dropout(wei)
# perform the weighted aggregation of the values
v = self.value(x) # (B,T,C)
out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
return out
class MultiHeadAttention(nn.Module):
""" multiple heads of self-attention in parallel """
def __init__(self, num_heads, head_size, block_size):
super().__init__()
self.heads = nn.ModuleList([Head(head_size, block_size) for _ in range(num_heads)])
self.proj = nn.Linear(n_embd, n_embd)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
out = torch.cat([h(x) for h in self.heads], dim=-1)
out = self.dropout(self.proj(out))
return out
class FeedFoward(nn.Module):
""" a simple linear layer followed by a non-linearity """
def __init__(self, n_embd):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd),
nn.ReLU(),
nn.Linear(4 * n_embd, n_embd),
nn.Dropout(dropout),
)
def forward(self, x):
return self.net(x)
class Block(nn.Module):
""" Transformer block: communication followed by computation """
def __init__(self, n_embd, n_head, block_size):
# n_embd: embedding dimension, n_head: the number of heads we'd like
super().__init__()
head_size = n_embd // n_head
self.sa = MultiHeadAttention(n_head, head_size, block_size)
self.ffwd = FeedFoward(n_embd)
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
def forward(self, x):
x = x + self.sa(self.ln1(x))
x = x + self.ffwd(self.ln2(x))
return x
class InterBlock(nn.Module):
""" Transformer block: communication followed by computation """
def __init__(self, n_embd, n_head, block_size):
# n_embd: embedding dimension, n_head: the number of heads we'd like
super().__init__()
head_size = n_embd // n_head
self.sa = MultiHeadAttention(n_head, head_size, block_size)
self.ffwd = FeedFoward(n_embd)
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
def forward(self, x):
x = x.view(B*T, C, 1)
x = x + self.sa(self.ln1(x))
x = x + self.ffwd(self.ln2(x))
x = x.view(B, T, C)
return x
# super simple bigram model
class BigramLanguageModel(nn.Module):
def __init__(self):
super().__init__()
# each token directly reads off the logits for the next token from a lookup table
self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
self.position_embedding_table = nn.Embedding(block_size, n_embd)
self.pos_emb_inter = nn.Embedding(10, 1)
self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head, block_size=block_size) for _ in range(n_layer)])
self.interBlocks = nn.Sequential(*[InterBlock(1, n_head=1, block_size=1) for _ in range(n_layer)])
self.l1 = nn.Linear(n_embd, 1000)
self.l2 = nn.Linear(1000, 1000)
self.l3 = nn.Linear(1000, n_embd)
self.ln_f = nn.LayerNorm(n_embd) # final layer norm
self.lm_head = nn.Linear(n_embd, vocab_size)
self.lm_head2 = nn.Linear(n_embd, vocab_size)
self.lm_head3 = nn.Linear(n_embd, vocab_size)
self.lm_head4 = nn.Linear(n_embd, vocab_size)
self.tanh = nn.Tanh()
self.softmax = nn.Softmax(dim=1)
def forward(self, idx, targets=None):
B, T, C = idx.shape
# idx and targets are both (B,T) tensor of integers
# tok_emb = self.token_embedding_table(idx) # (B,T,C)
pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
#pos_emb_inter = self.pos_emb_inter(torch.arange(C, device=device))
x = idx.view(B,T,C)
#x = x + pos_emb_inter
x = idx + pos_emb # (B,T,C)
x = self.blocks(x) # (B,T,C)
#x = self.interBlocks(x) # (B,T,C)
# x = self.l1(x)
# x = self.softmax(x)
# x = self.l2(x)
# x = self.softmax(x)
# x = self.l3(x)
# x = self.softmax(x)
x = self.ln_f(x) # (B,T,C)
logits = self.lm_head(x) # (B,T,vocab_size)
logits = self.softmax(logits)
if targets is None:
loss = None
else:
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, idx, max_new_tokens):
# idx is (B, T) array of indices in the current context
for _ in range(max_new_tokens):
# crop idx to the last block_size tokens
idx_cond = idx[:, -block_size:]
# get the predictions
logits, loss = self(idx_cond)
# focus only on the last time step
logits = logits[:, -1, :] # becomes (B, C)
# B, C = logits.shape
# # apply softmax to get probabilities
# probs = F.softmax(logits, dim=-1) # (B, C)
# # sample from the distribution
# idx_next = torch.multinomial(probs, num_samples=100) # (B, 1)
# append sampled index to the running sequence
logits = logits.view(1, B, C)
idx = torch.cat((idx, logits), dim=1) # (B, T+1)
return idx

27
songs/kaaa.pem Normal file
View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAlENig2vVPOU1QxLJRXEhSMvsn3Eo+TqwTLN0yyV+1Noketv/
oGYFuN9bAYjfQPFTYi2YoXf2xRTfKrkgco6VEbv8jFY5JK0b9B2Lao+J4bSpO4YI
Permk26ri8fpp5DO1H6HLtH3Ff7H03jbyN7TBgFY3EggTgBxVXG116DfX9TgwO0N
Ap9fi8F4baFJ7O1YSnFrNYHzLHwfuRwHw5+vkU7qWIgndN/NX4OWZ3PTJIhduxVJ
9SO2h+KFG1xXao2anKs1zy41XOP8Auj7zwcupteB1l1BvrnGwlmeuIwSt8jJc9JQ
sS/mGdFtBaBQNG2B7LV0boXQTh7xD9LSvBHx0QIDAQABAoIBADRk+9NlsB2tAn2J
jgc0S4bmSeCupMQusU7pvERuI7wkuu+P/243n54NkKOiMwNIRSdU5bNqKZLMJXF7
qrPRH2z8+eq+LcOASXijWDl4MEnLBmM6on7/HCzN12G2bCQDUJRXgn5RRPhI6WUa
ONFrrTdaEoUGGCR4k5SWcqSY/Vdn4QWOwDoyjDc3jVEEaIvTz/DNlWJ+wFBwK3jA
7ugIVj2Tjjab6yE6i98mlqe54I5fEDibzP/uoZ7yY9Ad+bChNbPIPnpvZ2S9mMLO
efQhIHosyKAeZpllzgfTFJNrZymUnkSfM3znh/kn4skfl7D5jhFF0X5NPh5JmYh7
A6GGVIECgYEA1x4dnyyIenARKF853w/BTJ1HFW5TQPE+40Av2NIMxMC+LiGdZxeF
1SMhWSmked3slrIJwJ107DBXiLrSvHFLe92NeQRcEC8gNlVpWr8rl9f/AtrDS9G3
HkMYuP2yBea25UTnIVDUccCpmxmqyxdkXaTSODxuX9WFcpQbZxi0eqUCgYEAsHCs
hjtlymIf2/GOqY6Zem0VCztrgPxcfyMZMnhY1sO5MYj0dZX4pGMWkZ7l+7dFOMH+
Hy/wNc8kYORz66K+SrqKZBGs5Y2Q3XXitGYhOD3af7l3wMhaNWO9cbp/jzGA3t0m
hO38bbVjSehlSMyNWkd9sFhOezhnW3FR8p427r0CgYBoKBjHnVRnlCN6nMD0lDhP
1Ec6xTRiWWzNs6J3JR0xlvF+fKAt7mHKlTvsZBFTQHTxcjrunE6knbOEbJDk1SZr
TxbU5Zt96pN6O1jLq4x79QVwaGPvLhev+2TZDV1JJjCgSgBvTSfP/C0jAAaGV0kt
XkS7HeeDV2tJ0xSBkqCwUQKBgAPj6Bnpk1v7/+HUmKWqbD3jy9P8kmk8pzlyZ6rS
Nns9w4aRE3rIPLw7JxM84qtDJjaTKGw9nVlCYPX184xdgT6akFSWgy5oiaooYEBE
BDV+P6QTTc2lXT/SHCo2x3s7xzCixaemq6srla3afuk1AWTdMKtDty9GVhkRDZKA
5IstAoGBAKYrpHiJzCbkErYFNM+iCOtt3VaHPmCArpsl1IbE5fnBmINTkTHYL9nc
TIzl1tLZEnfeK+dvuwVOGvj9/IK/gSYLbhMkEhGxci6cAO4oxiVGljnkMiXtHlCq
Vau3tFk0etyUfFckN1BGH/SoyORsqJjXnDGbZ2JbaYcdQnK9+avr
-----END RSA PRIVATE KEY-----