-
Notifications
You must be signed in to change notification settings - Fork 58
Description
Related
import datetime
import boto3
import requests
AWS_ACCESS_KEY_ID = "AKEXAMPLES3S"
AWS_SECRET_ACCESS_KEY = "SKEXAMPLES3S"
AWS_S3_ENDPOINT_URL = "http://127.0.0.1:8014"
AWS_STORAGE_BUCKET_NAME = "test-bucket-name"
RETENTION = "retention"
EXPIRE_FAST = "expire_fast"
TEMP_MARKER = "temp"
service = "s3"
region = "us-east-1"
t = datetime.datetime.now(datetime.UTC)
algorithm = "AWS4-HMAC-SHA256"
credential_scope = "/".join([t.strftime("%Y%m%d"), region, service, "aws4_request"])
key = "test-key/my_file.txt"
def get_tag_xml(key, value):
return f"<Tagging><TagSet><Tag><Key>{key}</Key><Value>{value}</Value></Tag></TagSet></Tagging>"
conditions = [
{"x-amz-algorithm": algorithm},
{"x-amz-credential": credential_scope},
{"x-amz-date": t.isoformat()},
{"tagging": get_tag_xml(RETENTION, EXPIRE_FAST)},
{"success_action_status": "201"},
{"bucket": AWS_STORAGE_BUCKET_NAME},
["starts-with", "$key", TEMP_MARKER],
]
fields = {
"x-amz-algorithm": algorithm,
"x-amz-credential": credential_scope,
"x-amz-date": t.isoformat(),
"tagging": get_tag_xml(RETENTION, EXPIRE_FAST),
"success_action_status": "201",
}
client = boto3.client(
service_name="s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=AWS_S3_ENDPOINT_URL,
)
presigned = client.generate_presigned_post(
AWS_STORAGE_BUCKET_NAME,
key,
Fields=fields,
Conditions=conditions,
ExpiresIn=60 * 60,
)
# Demonstrate how another Python program can use the presigned URL to upload a file
object_name = "a.txt"
with open(object_name, "rb") as f:
files = {"file": (object_name, f)}
http_response = requests.post(
presigned["url"], data=presigned["fields"], files=files
)
print(f"File upload HTTP status code: {http_response.status_code}")
print(http_response.content)Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working