smodifier/server.py
2024-06-06 23:21:18 +05:30

157 lines
4.9 KiB
Python

from flask import Flask, request, send_file, jsonify
import io
from scipy.io import wavfile
import numpy as np
import librosa
from Utils import ProceesAudio
import soundfile as sf
import datetime
import boto3
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100 MB limit
# AWS S3 Configuration
s3 = boto3.client('s3')
BUCKET_NAME = 'songsbucketparth'
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({"status": "ok"}), 200
@app.route('/process_and_upload', methods=['POST'])
def upload_to_s3(processed_data, userId, type):
# Create an in-memory bytes buffer
buffer = io.BytesIO()
# Write processed data to the buffer as a WAV file
sf.write(buffer, processed_data, 44100, format='WAV')
buffer.seek(0) # Rewind the buffer
# Generate a unique file name
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
s3_key = f'uploads/processed_audio_{userId}_{timestamp}_{type}.wav'
# Upload the buffer to AWS S3
s3.upload_fileobj(buffer, BUCKET_NAME, s3_key, ExtraArgs={'ContentType': 'audio/wav'})
# Generate a pre-signed URL for the uploaded file
presigned_url = s3.generate_presigned_url(
'get_object',
Params={
'Bucket': BUCKET_NAME,
'Key': s3_key
},
ExpiresIn=3600 # URL expiration time in seconds (1 hour)
)
return presigned_url
def int16_to_float32(samples):
return samples.astype(np.float32) / 32768.0
def get_audio_data(audio_bytes):
sample_rate, data = wavfile.read(io.BytesIO(audio_bytes))
left = []
right = []
for frame in data:
frame = int16_to_float32(frame)
left.append(frame[0])
right.append(frame[1])
data = np.array([left, right], dtype=np.float32)
return sample_rate, data
def process_audio_bytes(audio_bytes, index):
# Read the audio file from bytes
sample_rate, data = get_audio_data(audio_bytes=audio_bytes)
pa = ProceesAudio()
processed_data, part_amps, part_audios = pa.perform_modulation(data=data, sr=sample_rate, index=index)
part_urls= []
for i in range(10):
print(np.array(part_audios[i], dtype=np.float32))
url = upload_to_s3(processed_data=np.array(part_audios[i], dtype=np.float32), userId="parth", type="part")
part_urls.append(url)
processed_data = np.concatenate(processed_data)
file_url = upload_to_s3(processed_data=processed_data, userId="parth", type="final")
arr_to_show = []
print(len(processed_data))
acc_factor = int(len(processed_data)/150)
for i in range(0, len(processed_data), acc_factor):
arr_to_show.append(np.mean(np.abs(processed_data[i:i+acc_factor])))
for i in range(len(arr_to_show)):
arr_to_show[i] = float(arr_to_show[i])
ret = []
for i in range(10):
temp = []
for part in part_amps:
temp.append(float(part[i]))
ret.append(temp)
return jsonify({"file_url": file_url, "array": arr_to_show, "part_amps": ret, "part_urls": part_urls})
@app.route('/modify', methods=['POST'])
def modify():
if 'song' not in request.files:
return 'No file part', 400
file = request.files['song']
index = request.form.get('index') # Assuming index is sent as a form field
if file.filename == '':
return 'No selected file', 400
if file:
# Read file bytes
file_bytes = file.read()
# Process the audio bytes with the index parameter
response = process_audio_bytes(file_bytes, index)
response.headers.add('Access-Control-Allow-Origin', '*')
return response
@app.route('/manMod', methods=['POST'])
def manual_modify():
if 'song' not in request.files:
return 'No file part', 400
file = request.files['song']
index = request.form.get('index') # Assuming index is sent as a form field
mod_arr = request.form.get('mod_arr')
if file.filename == '':
return 'No selected file', 400
if file:
# Read file bytes
file_bytes = file.read()
# Process the audio bytes with the index parameter
sample_rate, data = get_audio_data(audio_bytes=file_bytes)
pa = ProceesAudio()
processed_data = pa.change_amps(data=data, new_amps=mod_arr, sr=sample_rate)
file_url = upload_to_s3(processed_data=processed_data, userId="parth")
arr_to_show = []
acc_factor = int(len(processed_data)/150)
for i in range(0, len(processed_data), acc_factor):
arr_to_show.append(np.mean(np.abs(processed_data[i:i+acc_factor])))
for i in range(len(arr_to_show)):
arr_to_show[i] = float(arr_to_show[i])
response = jsonify({"file_url": file_url, "array": arr_to_show})
response.headers.add('Access-Control-Allow-Origin', '*')
return response
if __name__ == '__main__':
app.run(debug=True, port=80)