google cloud storage presigned url is not working
I followed the exmaple implementation for AWS s3 to create signedUrl. Instead of AWS, I want to use Google cloud storage. But unfortunately, I am getting "'HTTP PUT failed, RetryException: PUT'" error in the errorDetail field of the response.
I executed a PUT request on returning the URL in the "output" section of the response. It worked. The error message is very poorly designed and doesn't provide any other details or directions to follow.
It would be nice to get support on this:
Function to generate signed_url:
def get_presigned_url(bucket_name, path, operation, expires_in=3600):
"""
Generates a presigned URL for a Google Cloud Storage object.
:param bucket_name: The name of the GCS bucket.
:param path: The name of the blob in the GCS bucket.
:param operation: The operation ('read' or 'write').
:param expires_in: Expiration time in seconds.
:return: A presigned URL.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(path)
if operation == 'read':
url = blob.generate_signed_url(
version="v4",
expiration=timedelta(seconds=expires_in),
method="GET"
)
elif operation == 'write':
url = blob.generate_signed_url(
version="v4",
expiration=timedelta(seconds=expires_in),
method="PUT",
content_type='application/octet-stream'
)
else:
raise ValueError("Operation must be 'read' or 'write'.")
return url
def api_call(image: Image) -> Image:
image_bytes = imageToBytes(image)
original_file_name = f"raw_images/original.png"
upload_bytes_to_gcs(bucket, image_bytes, original_file_name)
new_original_file_name = f"processed_images/original.png"
input_scr_url = get_presigned_url(bucket, original_file_name, 'read')
output_dest_url = get_presigned_url(bucket, new_original_file_name, 'write')
headers = {"Authorization": f"Bearer {token}", "x-api-key": client_id}
payload = {
"inputs": {
"href": input_scr_url,
"storage": "external"
},
"outputs": [
{
"href": output_dest_url,
"storage": "external",
"type": "image/png"
}
]
}
resp = invoke_autotone(autotone_url, headers, payload)
status = poll_status(resp, headers)
print(status)