HEAppE Easy Access
For an explanation how to easily use
HEAppE Middleware was created a step-by-step setup describing job submission flow. The HEAppE provided REST API, which ensures universal access for client applications. The client application could be written in an arbitrary programming language.
Jupyter notebook
Jupyter notebooks have been created for easy access to the HPC infrastructure via HEAppE Middleware. Thanks to the pre-prepared docker-compose.yml
file, the user can use client applications in an easy way independent of the user´s platform. If the client is familiar with Python programming language, it is recommended to choose Python Jupyter kernel. Currently, the Jupyter notebooks supporting Python, Java, and C# Jupyter kernels. Each Jupyter notebook contains executable cells with Python codes (see in the Python section below or you can download pre-prepared Jupyter Playbook
).
version: '3.4' services: jupyternotebookextension: image: janecekkrenek/jupyter-notebook-kernels-extension ports: - 8888:8888 volumes: - ./Notebooks:/home/jovyan/work networks: - external_network environment: TZ: "Europe/Prague" networks: external_network:
Python
- Client installation
For easy usage of HEAppE Middleware is necessary to install HEAppEC from Github.
- Example:
import sys !{sys.executable} -m pip install git+https://github.com/It4innovations/heappeac.git
- Client application setup
- There are four variables that the user must set to configure the client application successfully.
configuration.host (HEAppE intance URL),
projectIdentificator (HPC project),
username
password
You should receive these from your contact on the HEAppE team. When you fill these as a string (it should be written within). In case everything went successfully, you should see
HEAppE instance Python client prepared
message.- Example:
import json import os import time from io import StringIO from pathlib import Path import paramiko from paramiko import SSHClient from scp import SCPClient import heappeac as hp configuration = hp.Configuration() configuration.host = "${HEAppE_URL}" projectIdentificator = "${hpc_project}" username = "${username}" password = "${password}" api_instance = hp.ApiClient(configuration) print("HEAppE instance Python client prepared")
- Available HEAppE command templates
HEAppE cluster information The example below returns information about the available clusters, queues and templates. The most important values you need to find in this output are:
Id
of the cluster you want to use, laterClusterId
Id
of the queue on the given cluster, laterClusterNodeTypeId
Id
of the command template under the selected node, laterCommandTemplateId
You will need to use these at the job creation. Additionally, there is other interesting information about given queues and templates. For the queue, you can find examples of how many nodes are available for the given queue, what is maximal walltime in seconds, and how many cores are available at one node. About the templates, the main thing of concern are the template parameter names.
- Example:
print("Fetching cluster info...") lac_body = { "_preload_content": False } ciEndpoint = hp.ClusterInformationApi(api_instance) r = ciEndpoint.heappe_cluster_information_list_available_clusters_get(**lac_body) r_data = json.loads(r.data) print(json.dumps(r_data, indent = 3))
- Authentication
Before the use of the HEAppE API, you must authenticate yourself. Authentication will provide you a session token which can then be used for 20 minutes. The Session token is then stored in session_code variable.
- Example:
print(f"Authenticating to HEAppE instance through {username}...") cred = { "_preload_content": False, "body": { "Credentials": { "Username": username, "Password": password } } } ulmEndpoint = hp.UserAndLimitationManagementApi(api_instance) r = ulmEndpoint.heappe_user_and_limitation_management_authenticate_user_password_post(**cred) session_code = json.loads(r.data) print(f"Session code: {session_code}")
- Available project
At this point, you will obtain specifications of the HPC computational project, that is needed for job creation.
- Example:
print("Fetching available computational projects...") lproj_body = { "_preload_content": False, "SessionCode": session_code } r = ulmEndpoint.heappe_user_and_limitation_management_projects_for_current_user_get(**lproj_body) r_data = json.loads(r.data) project = next(f["Project"] for f in r_data if f["Project"]["AccountingString"] == projectIdentificator) print(json.dumps(project, indent = 3))
- Job specification
At this point, you will create the job specification. Before running the job it is important to specify the parameters of the job. This specification consists of two part the job specification and the tasks specification.
Note
There are many parameters that can be set up and you can see all of them in the cell with JobSpecification example, but the ones that are not necessary for common runs are commented out.
In the job specification you will choose which computational project resources should be used under the field Project, then choose which cluster you want to use by setting ClusterId and set FileTransferMethodId. Optionally, you can specify the Name of the job, which will assign this job name in HEAppE and also you can set up a notification by email when the job starts, finishes, or is aborted.
For the specification of the tasks you need to specify the task that should be executed, each task is one computational job in the sense of the HPC cluster, which means it is a job submission to the Batch scheduler. There are exceptions where one task can consist of multiple submissions, such as a task longer than the walltime limit of the queue, but we will not consider that at the moment. If you are interested in advanced capabilities, please contact the HEAppE team for support.
Generally, users want to define just one task and this will be used in our example. In the task you can specify:
Name
which stands for the name of the job that will be sent to the cluster schedulerMinCores
andMaxCores
, which stands for how many cores you need for the application [1]Walltime
which specify how many second should be the nodes reserved for [2]ClusterNodeTypeId
stands for theId
of the NodeType that you marked previouslyCommandTemplateId
stands for theId
of CommandTemplate you want to use that belongs to the selected NodeTypeTemplateParameterValues
contain the values of the parameters for the given HEAppE Template. It consists of a list of parameter values which are described by two items:CommandParameterIdentifier
- in other words parameter name as given in the scriptParameterValue
- a string which contains the parameter value
TaskParalizationParameters
these parameters are used to set up the number of MPI processes and OpenMP processes that should be used [3]MPIProcesses
- this is equivalent to the Batch scheduler mpiprocs parameter. It sets the number of MPI processes on one node, even if more nodes are allocated. Each node will have this number of unique MPI processes. Therefore, if the computational node has 128 cores and you want to have 1 MPI process per core then this parameter should be 128 [4]OpenMPThreads
- is the number of OpenMP threads on one node. Commonly, the number of OpenMP threads * MPIProcesses equals to the number of cores available on the nodes, but many other combinations may be used depending on the use case, e.g. 18 [5]MaxCores
- this is the number of cores used for the given setting. In our case when we have only one setting for all the cores, this number should equal to the MaxCores given in the Tasks description, e.g. 128
Foot notes:
- Example:
print("Creating job template...") job_spec_body = { "_preload_content": False, "body": { "JobSpecification": { "Name": "job_1", #"NotificationEmail": "string", #"PhoneNumber": "string", #"NotifyOnAbort": true, #"NotifyOnFinish": true, #"NotifyOnStart": true, "ClusterId": 2, "FileTransferMethodId": 2, "ProjectId": project["Id"], #"WaitingLimit": 0, #"IsExtraLong": true, "EnvironmentVariables": [], "Tasks": [ { "Name": "task_1", "MinCores": 1, "MaxCores": 128, "Priority": 4, "WalltimeLimit": 600, #"PlacementPolicy": "string" #"RequiredNodes": [ # "string" # ], #"ClusterTaskSubdirectory": "string", #"CpuHyperThreading": true, #"JobArrays": "string", #"IsExclusive": true, #"IsRerunnable": true, #"StandardInputFile": "stdin", "StandardOutputFile": "stdout", "StandardErrorFile": "stderr", "ProgressFile": "stdprog", "LogFile": "stdlog", "ClusterNodeTypeId": 8, "CommandTemplateId": 3, #"TaskParalizationParameters": [ #{ #"MPIProcesses": 128, #"OpenMPThreads": 1, #"MaxCores": 128 #} #], #"EnvironmentVariables": [ #{ #"Name": "string" #"Value": "string" #} #], #"DependsOn": [ # "string" # ], "TemplateParameterValues": [ { "CommandParameterIdentifier": "inputParam", "ParameterValue": "testValue" } ] } ] }, "SessionCode": session_code } } jmEndpoint = hp.JobManagementApi(api_instance) r = jmEndpoint.heappe_job_management_create_job_post(**job_spec_body) r_data = json.loads(r.data) job_id = r_data["Id"] tasks = r_data["Tasks"] print(f"Job ID: {job_id}")
- File transfer
In case you want to transfer data to the computation folder, you can do it in multiple ways, each with its own advantages and shortcomings.
Do it in the HEAppE template by transferring data from the cluster to the HEAppE folder. you can use copy (
cp
), or link (ln
).x.Transfer the data from your local computer through HEAppE API via SCP/SFTP.
The second alternative is done in the code example. You can define the data_input_path which should be the path to the folder you want to copy to the HEAppE job folder. The folder structure then will be accessible from the working directory of the job.
- Example:
# Set the data input path data_input_path = "./data_transfer/input/example" # Copying files from input folder to job folder on HPC cluster print("Preparation of copying files ...") ft_body = { "_preload_content": False, "body": { "SubmittedJobInfoId": job_id, "SessionCode": session_code } } ftEndpoint = hp.FileTransferApi(api_instance) r = ftEndpoint.heappe_file_transfer_request_file_transfer_post(**ft_body) jobtransfer = json.loads(r.data) print("Copying files...") ssh = SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) pkey_file = StringIO(jobtransfer["Credentials"]["PrivateKey"]) match jobtransfer["FileTransferCipherType"]: case 1: pkey = paramiko.RSAKey.from_private_key(pkey_file) case 2: pkey = paramiko.RSAKey.from_private_key(pkey_file) case 3: pkey = paramiko.ECDSAKey.from_private_key(pkey_file) case 4: pkey = paramiko.ECDSAKey.from_private_key(pkey_file) case default: pkey = paramiko.RSAKey.from_private_key(pkey_file) ssh.connect(jobtransfer["ServerHostname"], username=jobtransfer["Credentials"]["Username"], pkey=pkey) base_path = jobtransfer["SharedBasepath"] with SCPClient(ssh.get_transport()) as scp: for task in tasks: task_id = str(task["Id"]) scp.put(data_input_path, os.path.join(base_path, task_id)) ssh.close() ft_body = { "_preload_content": False, "body": { "SubmittedJobInfoId": job_id, "PublicKey": jobtransfer["Credentials"]["PublicKey"], "SessionCode": session_code } } r = ftEndpoint.heappe_file_transfer_close_file_transfer_post(**ft_body) r_data = json.loads(r.data) print("Copying input files finished.")
- Job submission
At this point, everything should be ready to submit for the job. The next code example will submit the job created in the previous step and give you information about the job status. The status that the job can take is:
1 - Configuring
2 - Submitted
4 - Queued
8 - Running
16 - Finished
32 - Failed
64 - Cancelled
128 - Waiting for free service cluster account (Only in case when cluster account rotation is disabled)
- Example:
print(f"Submitting job {job_id}...") submit_body = { "_preload_content": False, "body": { "CreatedJobInfoId": job_id, "SessionCode": session_code } } r = jmEndpoint.heappe_job_management_submit_job_put(**submit_body) r_data = json.loads(r.data) print(f"Waiting for job {job_id} to finish...") gcji_body = { "_preload_content": False, "SessionCode": session_code, "SubmittedJobInfoId": job_id } while True: r = jmEndpoint.heappe_job_management_current_info_for_job_get(**gcji_body) r_data = json.loads(r.data) state = r_data["State"] if r_data["State"] == 16: print(f"The job has finished.") break if r_data["State"] == 32: print(f"The job has failed.") break if r_data["State"] == 64: print(f"The job has canceled.") break print(f"Waiting for job {job_id} to finish... current state: {state}") time.sleep(30)
- Download the data
Now it is possible to download data from job folder to your computer in the similar manner as the upload worked.
Note
It is necessary to define path to downloading files (In our example it is
data_transfer
).- Example:
# Set the data output path data_output_path = "./data_transfer/output" print("Fetching generated files...") ft_body = { "_preload_content": False, "body": { "SubmittedJobInfoId": job_id, "SessionCode": session_code } } ftEndpoint = hp.FileTransferApi(api_instance) r = ftEndpoint.heappe_file_transfer_request_file_transfer_post(**ft_body) jobtransfer = json.loads(r.data) lchjf_body = { "_preload_content": False, "SessionCode": session_code, "SubmittedJobInfoId": job_id } r = ftEndpoint.heappe_file_transfer_list_changed_files_for_job_get(**lchjf_body) r_data = json.loads(r.data) producedFiles = [os.path.normpath(path["FileName"]) for path in r_data] print(f"Files changed during job execution: {producedFiles}") print("Fetching the files...") ssh = SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) pkey_file = StringIO(jobtransfer["Credentials"]["PrivateKey"]) match jobtransfer["FileTransferCipherType"]: case 1: pkey = paramiko.RSAKey.from_private_key(pkey_file) case 2: pkey = paramiko.RSAKey.from_private_key(pkey_file) case 3: pkey = paramiko.ECDSAKey.from_private_key(pkey_file) case 4: pkey = paramiko.ECDSAKey.from_private_key(pkey_file) case default: pkey = paramiko.RSAKey.from_private_key(pkey_file) ssh.connect(jobtransfer["ServerHostname"], username=jobtransfer["Credentials"]["Username"], pkey=pkey) base_path = jobtransfer["SharedBasepath"] with SCPClient(ssh.get_transport()) as scp: for fn in producedFiles: specpath = fn[1:] Path(os.path.dirname(f"{data_output_path}/{job_id}/{specpath}")).mkdir(parents=True, exist_ok=True) print(f"{data_output_path}/{job_id}/{specpath}") print(os.path.join(base_path, specpath)) scp.get(os.path.join(base_path, specpath), f"{data_output_path}/{job_id}/{specpath}") ssh.close() ft_body = { "_preload_content": False, "body": { "SubmittedJobInfoId": job_id, "PublicKey": jobtransfer["Credentials"]["PublicKey"], "SessionCode": session_code } } r = ftEndpoint.heappe_file_transfer_close_file_transfer_post(**ft_body) r_data = json.loads(r.data) print("\n") print(", ".join(producedFiles) + " fetched")
Cleanup
When you have data downloaded and the job is finished it is necessary to manually cleanup after the HEAppE job. That can be done by executing the example code. This will delete all the data related to the job and the HEAppE job folder itself.
- Example:
print(f"Removing job {job_id}...") ft_body = { "_preload_content": False, "body": { "SubmittedJobInfoId": job_id, "SessionCode": session_code } } r = jmEndpoint.heappe_job_management_delete_job_delete(**ft_body) r_data = json.loads(r.data) print(r_data)
Job resources reporting
Executing the code example will return the information about the executed job such as time of submission, time of job end and used core-hours.
- Example:
print("Fetching resource usage report...") rur_body = { "_preload_content": False, "SessionCode": session_code, "JobId": job_id } jrEndpoint = hp.JobReportingApi(api_instance) r = jrEndpoint.heappe_job_reporting_resource_usage_report_for_job_get(**rur_body) r_data = json.loads(r.data) print(json.dumps(r_data["Clusters"], indent = 3))