Skip to main content
Version: 2.1.0

Sample: Sensitive archiving Storage

This tutorial presents a sample using Astran.api. The concept and feature of this demo are to develop an automatic sensitive Backup Storage Application.

Introduction

We have data that we want to back up securely. The idea is simple once the data is created or copied in the temporary folder, it will be sent and stored securely with Astran's platform via secured API.

Once this file is uploaded, it will be deleted from the temporary folder. We can verify that it has been correctly saved via Astran web governance console, called cockpit.

schema.png

Video demonstration

In this section, you can see the demonstrative and explicative video of the demo.

Automatic Sensitive Data Backup

Requirements

To develop our sample we used:

  • Python Programming
  • Watchdog PyPI
  • HTTPS requests

Astran.API only accepts requests from registered users, so valid credentials (e-mail address and password) are required.

API call modelization

We started by creating a api.py file where we defined a class which embedded all the api call essentials for this tutorial:

  • Login
  • Token refreshing
  • Get informations about my user
  • Get informations about documents
  • Upload a new documents
# api.py

"""Split API v2.1 client module."""

import os
import urllib
from typing import Any, Dict, List

import requests


class Credential:
"""Split API v2.1 credentials."""

def __init__(self, client_id: str, username: str, password: str) -> None:
self.username = username
self.password = password
self.client_id = client_id


class API:
"""Split API v2.1 client."""

def __init__(
self, api_url: str, auth_url: str, credential: Credential, realm: str
) -> None:
self.api_url = urllib.parse.urljoin(api_url, "/api/v2.1")
self.auth_url = urllib.parse.urljoin(
auth_url, f"/auth/realms/{realm}/protocol/openid-connect/token"
)
self.credential = credential
self.realm = realm
self._access_token = None
self._refresh_token = None

def _with_auth(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self._access_token}",
"Content-Type": "application/json",
}

def login(self):
"""Allow you to login with the openid-connect protocol."""
payload = {
"grant_type": "password",
"username": self.credential.username,
"password": self.credential.password,
"client_id": self.credential.client_id,
}

# Create a post request
req = requests.post(self.auth_url, data=payload)

# If request_status_code == 200 or 201 OK
# else raise an exception
req.raise_for_status()

# If no exception handle result
res = req.json()

# Store the access_token and the refresh_token
self._access_token = res.get("access_token")
self._refresh_token = res.get("refresh_token")

def refresh(self):
"""Allow you to refresh your token with the openid-connect protocol."""
payload = {
"grant_type": "refresh_token",
"refresh_token": self._refresh_token,
"client_id": self.credential.client_id,
}

req = requests.post(self.auth_url, data=payload)
req.raise_for_status()

# If no exception handle result
res = req.json()
self._access_token = res.get("access_token")
self._refresh_token = res.get("refresh_token")

def user_me(self) -> Dict[str, Any]:
"""
Allow you to get all informations about your user.
Cf https://api.s5.astran.io/docs/#/User/GetUserMe
"""
req = requests.get(f"{self.api_url}/users/me", headers=self._with_auth())
req.raise_for_status()

# If no exception return the response body
return req.json()

def list_children(self, document_id: str) -> List[Dict[str, Any]]:
"""
Allow to list children document from parent document_id.
Cf https://api.s5.astran.io/docs/#/File/getFileChildren
"""
req = requests.get(
f"{self.api_url}/files/{document_id}/children", headers=self._with_auth()
)
req.raise_for_status()
return req.json()

def get_document_from_path(
self, doc_path: str, children: bool = True, metadata: bool = True
) -> Dict[str, Any]:
"""
Allow you to get document from its path.
Cf https://api.s5.astran.io/docs/#/Filesystem/getFile
"""
headers = self._with_auth()

if metadata:
headers["Accept"] = "application/json"

req = requests.get(
f"{self.api_url}/filesystem/{doc_path}?children={children}",
headers=self._with_auth(),
)
req.raise_for_status()
return req.json()

def upload(
self, parent_id: str, local_file_path: str, is_major: bool = True
) -> Dict[str, Any]:
"""
Allow you to upload a local file to Astra.api.
Cf https://api.s5.astran.io/docs/#/File/CreateFileChildByFileID
"""
with open(local_file_path, "rb") as file:
params = {"name": os.path.basename(local_file_path), "isMajor": is_major}
req = requests.post(
f"{self.api_url}/files/{parent_id}/children",
headers=self._with_auth(),
params=params,
data=file.read(),
)
req.raise_for_status()
return req.json()

Get destination folder ID

In our case, we want to store all the files in a specific folder HR_Backup that we already created. We need to get the id of the specified folder.

First, we need the rootDocumentId since HR_Backup is created in the rootFolder.

# main.py
from api import API, Credential

if __name__ == "__main__":
# Define my credentials for the authentication
cred = Credential("my-client-id", "my-email@exemple.com", "my-strong-password")

# Instantiate the api client
api = API(
"https://{my-realm}.api.s5.astran.io", # Api URL under your realm
"https://{my-realm}.auth.astran.io", # Authentication URL under your realm
cred, # your creds
"my-realm" # Your realm name
)

try:
# Login
api.login()

# Retrieve user informations
user_me = api.user_me()

# print the rootID
rootID = user_me["member"]["rootDocumentId"]
print(rootID)
except Exception as err:
print(f"Cannot get user informations {err}")

Once we get the rootDocumentId we can get the Id of the specific folder.

# main.py
from api import API, Credential

if __name__ == "__main__":
# Define my credentials for the authentication
cred = Credential("my-client-id", "my-email@exemple.com", "my-strong-password")

# Instantiate the api client
api = API(
"https://{my-realm}.api.s5.astran.io", # Api URL under your realm
"https://{my-realm}.auth.astran.io", # Authentication URL under your realm
cred, # your creds
"my-realm" # Your realm name
)

try:
# Login
api.login()

# Retrieve user informations
user_me = api.user_me()

# print the rootID
rootID = user_me["member"]["rootDocumentId"]
print(rootID)

root_children = api.list_children(rootID)
doc = list(filter(lambda child: child["name"] == "HR_Backup", root_children))
print(doc)
except Exception as err:
print(f"Cannot get user informations {err}")

We can get the same result but in a shorter way with filesystem search:

# main.py
from api import API, Credential

if __name__ == "__main__":
# Define my credentials for the authentication
cred = Credential("my-client-id", "my-email@exemple.com", "my-strong-password")

# Instantiate the api client
api = API(
"https://{my-realm}.api.s5.astran.io", # Api URL under your realm
"https://{my-realm}.auth.astran.io", # Authentication URL under your realm
cred, # your creds
"my-realm" # Your realm name
)

try:
# Login
api.login()

# Get document metadata with the filesystem path
doc = api.get_document_from_path("/HR_Backup", False)
print(doc)
except Exception as err:
print(f"Cannot get document informations {err}")

Monitoring Temporary Folder

Once connected and knowing where we must send our documents, we can now focus on monitoring the temporary folder and detecting the creation of a file to backup it in a secure way.

We start by importing the needed libraries

import os
import time
from unicodedata import name
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from os import access

We define our class.

# event.py

from api import API, Credential

class MyEventHandler(FileSystemEventHandler):
def on_created(self, event):
print("created")

# Define my credentials for the authentication
cred = Credential("my-client-id", "my-email@exemple.com", "my-strong-password")

# Instantiate the api client
api = API(
"https://{my-realm}.api.s5.astran.io", # Api URL under your realm
"https://{my-realm}.auth.astran.io", # Authentication URL under your realm
cred, # your creds
"my-realm" # Your realm name
)

try:
# Login
api.login()

# Get document metadata with the filesystem path
doc = api.get_document_from_path("/HR_Backup", False)

# Upload document to API
uploaded_doc = api.upload(doc["ID"], event.src_path)
os.remove(event.src_path)
except Exception as err:
print(f"Cannot upload document {event.src_path} {err}")

Here is the main function.

# event_main.py
if __name__ == "__main__":

event_handler = MyEventHandler()
path= "../TemporaryFolder"
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
print("Monitoring")
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
print("Done")
observer.join()