from flask import Flask, request, send_file, jsonify import io from scipy.io import wavfile import numpy as np import librosa from Utils import ProceesAudio import soundfile as sf import datetime import boto3 from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100 MB limit # AWS S3 Configuration s3 = boto3.client('s3') BUCKET_NAME = 'songsbucketparth' @app.route('/health', methods=['GET']) def health_check(): return jsonify({"status": "ok"}), 200 @app.route('/process_and_upload', methods=['POST']) def upload_to_s3(processed_data, userId): # Create an in-memory bytes buffer buffer = io.BytesIO() # Write processed data to the buffer as a WAV file sf.write(buffer, processed_data, 44100, format='WAV') buffer.seek(0) # Rewind the buffer # Generate a unique file name timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") s3_key = f'uploads/processed_audio_{userId}_{timestamp}.wav' # Upload the buffer to AWS S3 s3.upload_fileobj(buffer, BUCKET_NAME, s3_key, ExtraArgs={'ContentType': 'audio/wav'}) # Generate a pre-signed URL for the uploaded file presigned_url = s3.generate_presigned_url( 'get_object', Params={ 'Bucket': BUCKET_NAME, 'Key': s3_key }, ExpiresIn=3600 # URL expiration time in seconds (1 hour) ) return presigned_url def int16_to_float32(samples): return samples.astype(np.float32) / 32768.0 def process_audio_bytes(audio_bytes, index): # Read the audio file from bytes sample_rate, data = wavfile.read(io.BytesIO(audio_bytes)) left = [] right = [] for frame in data: frame = int16_to_float32(frame) left.append(frame[0]) right.append(frame[1]) data = np.array([left, right], dtype=np.float32) pa = ProceesAudio() processed_data, part_amps = pa.perform_modulation(data=data, sr=sample_rate, index=index) file_url = upload_to_s3(processed_data=processed_data, userId="parth") arr_to_show = [] acc_factor = int(len(processed_data)/150) for i in range(0, len(processed_data), acc_factor): arr_to_show.append(np.mean(np.abs(processed_data[i:i+acc_factor]))) for i in range(len(arr_to_show)): arr_to_show[i] = float(arr_to_show[i]) return jsonify({"file_url": file_url, "array": arr_to_show, "part_amps": float(part_amps)}) @app.route('/modify', methods=['POST']) def modify(): if 'song' not in request.files: return 'No file part', 400 file = request.files['song'] index = request.form.get('index') # Assuming index is sent as a form field if file.filename == '': return 'No selected file', 400 if file: # Read file bytes file_bytes = file.read() # Process the audio bytes with the index parameter response = process_audio_bytes(file_bytes, index) response.headers.add('Access-Control-Allow-Origin', '*') return response if __name__ == '__main__': app.run(debug=True, port=80)