147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
from flask import Flask, request, send_file, jsonify
|
|
import io
|
|
from scipy.io import wavfile
|
|
import numpy as np
|
|
import librosa
|
|
from Utils import ProceesAudio
|
|
import soundfile as sf
|
|
import datetime
|
|
import boto3
|
|
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
|
|
|
|
app = Flask(__name__)
|
|
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100 MB limit
|
|
|
|
# AWS S3 Configuration
|
|
s3 = boto3.client('s3')
|
|
BUCKET_NAME = 'songsbucketparth'
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health_check():
|
|
return jsonify({"status": "ok"}), 200
|
|
|
|
@app.route('/process_and_upload', methods=['POST'])
|
|
def upload_to_s3(processed_data, userId):
|
|
# Create an in-memory bytes buffer
|
|
buffer = io.BytesIO()
|
|
|
|
# Write processed data to the buffer as a WAV file
|
|
sf.write(buffer, processed_data, 44100, format='WAV')
|
|
buffer.seek(0) # Rewind the buffer
|
|
|
|
# Generate a unique file name
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
s3_key = f'uploads/processed_audio_{userId}_{timestamp}.wav'
|
|
|
|
# Upload the buffer to AWS S3
|
|
s3.upload_fileobj(buffer, BUCKET_NAME, s3_key, ExtraArgs={'ContentType': 'audio/wav'})
|
|
|
|
# Generate a pre-signed URL for the uploaded file
|
|
presigned_url = s3.generate_presigned_url(
|
|
'get_object',
|
|
Params={
|
|
'Bucket': BUCKET_NAME,
|
|
'Key': s3_key
|
|
},
|
|
ExpiresIn=3600 # URL expiration time in seconds (1 hour)
|
|
)
|
|
|
|
return presigned_url
|
|
|
|
def int16_to_float32(samples):
|
|
return samples.astype(np.float32) / 32768.0
|
|
|
|
def get_audio_data(audio_bytes):
|
|
sample_rate, data = wavfile.read(io.BytesIO(audio_bytes))
|
|
left = []
|
|
right = []
|
|
for frame in data:
|
|
frame = int16_to_float32(frame)
|
|
left.append(frame[0])
|
|
right.append(frame[1])
|
|
data = np.array([left, right], dtype=np.float32)
|
|
return sample_rate, data
|
|
|
|
def process_audio_bytes(audio_bytes, index):
|
|
# Read the audio file from bytes
|
|
sample_rate, data = get_audio_data(audio_bytes=audio_bytes)
|
|
|
|
pa = ProceesAudio()
|
|
|
|
processed_data, part_amps = pa.perform_modulation(data=data, sr=sample_rate, index=index)
|
|
file_url = upload_to_s3(processed_data=processed_data, userId="parth")
|
|
arr_to_show = []
|
|
|
|
print(len(processed_data))
|
|
|
|
acc_factor = int(len(processed_data)/150)
|
|
|
|
for i in range(0, len(processed_data), acc_factor):
|
|
arr_to_show.append(np.mean(np.abs(processed_data[i:i+acc_factor])))
|
|
|
|
for i in range(len(arr_to_show)):
|
|
arr_to_show[i] = float(arr_to_show[i])
|
|
|
|
ret = []
|
|
for i in range(10):
|
|
temp = []
|
|
for part in part_amps:
|
|
temp.append(float(part[i]))
|
|
ret.append(temp)
|
|
|
|
return jsonify({"file_url": "", "array": arr_to_show, "part_amps": ret})
|
|
|
|
@app.route('/modify', methods=['POST'])
|
|
def modify():
|
|
if 'song' not in request.files:
|
|
return 'No file part', 400
|
|
file = request.files['song']
|
|
index = request.form.get('index') # Assuming index is sent as a form field
|
|
if file.filename == '':
|
|
return 'No selected file', 400
|
|
if file:
|
|
# Read file bytes
|
|
file_bytes = file.read()
|
|
|
|
# Process the audio bytes with the index parameter
|
|
response = process_audio_bytes(file_bytes, index)
|
|
response.headers.add('Access-Control-Allow-Origin', '*')
|
|
return response
|
|
|
|
@app.route('/manMod', methods=['POST'])
|
|
def manual_modify():
|
|
if 'song' not in request.files:
|
|
return 'No file part', 400
|
|
file = request.files['song']
|
|
index = request.form.get('index') # Assuming index is sent as a form field
|
|
mod_arr = request.form.get('mod_arr')
|
|
if file.filename == '':
|
|
return 'No selected file', 400
|
|
if file:
|
|
# Read file bytes
|
|
file_bytes = file.read()
|
|
# Process the audio bytes with the index parameter
|
|
sample_rate, data = get_audio_data(audio_bytes=file_bytes)
|
|
pa = ProceesAudio()
|
|
processed_data = pa.change_amps(data=data, new_amps=mod_arr, sr=sample_rate)
|
|
|
|
file_url = upload_to_s3(processed_data=processed_data, userId="parth")
|
|
arr_to_show = []
|
|
|
|
acc_factor = int(len(processed_data)/150)
|
|
|
|
for i in range(0, len(processed_data), acc_factor):
|
|
arr_to_show.append(np.mean(np.abs(processed_data[i:i+acc_factor])))
|
|
|
|
for i in range(len(arr_to_show)):
|
|
arr_to_show[i] = float(arr_to_show[i])
|
|
|
|
response = jsonify({"file_url": file_url, "array": arr_to_show})
|
|
|
|
response.headers.add('Access-Control-Allow-Origin', '*')
|
|
return response
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, port=80)
|