| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- from datetime import datetime,timedelta
- from adm.constants import CTS as cts
- from django.core.files.uploadedfile import InMemoryUploadedFile
- from django.core.files.base import ContentFile
- from storages.backends.gcloud import GoogleCloudStorage
- from google.oauth2 import service_account
- import os
- import json
- class MediaStorage():
- def __init__(self, container=None):
- self.storage = None
- self.container = container
- if cts.STORAGE_TYPE == "GCP":
- if self.container is None:
- self.container = cts.GCP_ST_INFODRIVERS
- self.storage = GoogleCloudStorage()
- baseDir = os.path.dirname(os.path.realpath(__file__))
- if cts.AMBIENTE_EC2=="PROD":
- pathCredentials = baseDir + "/auth/credentials_gcp_storage.json"
- else:
- pathCredentials = baseDir + "/auth/credentials_gcp_stg_storage.json"
- with open(pathCredentials) as credentials:
- self.credentials = json.load(credentials)
- self.storage.credentials = service_account.Credentials.from_service_account_file(pathCredentials)
- self.storage.project_id = cts.GCP_ST_ACCESS_ID
- self.storage.bucket_name = cts.GCP_ST_ACCESS_KEY_ID
- self.location = self.container + "\\"
- def exists(self, path_midia_bucket):
- return self.storage.exists(path_midia_bucket)
- def delete(self, path_midia_bucket):
- self.storage.delete(path_midia_bucket)
- def url(self, path_midia_bucket,expire=None):
- ret = ""
- if expire is None:
- expire = 600
- elif expire==-1:
- expire = None
- if cts.STORAGE_TYPE == "GCP":
- if expire is None:
- self.storage.expiration = None
- else:
- self.storage.expiration = datetime.utcnow() + timedelta(seconds=expire)
- ret = self.storage.url(path_midia_bucket)
- else:
- ret = self.storage.url(path_midia_bucket,expire)
- return ret
- def save(self, path_midia_bucket, content,max_length=None):
- if cts.STORAGE_TYPE == "GCP":
- if isinstance(content,InMemoryUploadedFile):
- path_midia_bucket = path_midia_bucket.replace("/","\\")
- arrTMP = path_midia_bucket.split("\\")
- path_midia_bucket = path_midia_bucket.replace(arrTMP[0]+"\\",cts.GCP_ST_GENERAL + "\\")
- if self.exists(path_midia_bucket):
- self.delete(path_midia_bucket)
- self.storage.save(path_midia_bucket, content.file)
- else:
- try:
- self.storage.save(path_midia_bucket, content)
- except:
- self.storage.save(path_midia_bucket, ContentFile(content))
-
- def generate_filename(self,filename):
- return filename
-
- def list(self, prefix=None):
- # Get blobs in specific subirectory
- blobs = list(self.storage.bucket.list_blobs(prefix=prefix))
- return blobs
|