HEAppE Easy Access

For an explanation how to easily use HEAppE Middleware was created a step-by-step setup describing job submission flow. The HEAppE provided REST API, which ensures universal access for client applications. The client application could be written in an arbitrary programming language.

Jupyter notebook

Jupyter notebooks have been created for easy access to the HPC infrastructure via HEAppE Middleware. Thanks to the pre-prepared docker-compose.yml file, the user can use client applications in an easy way independent of the user´s platform. If the client is familiar with Python programming language, it is recommended to choose Python Jupyter kernel. Currently, the Jupyter notebooks supporting Python, Java, and C# Jupyter kernels. Each Jupyter notebook contains executable cells with Python codes (see in the Python section below or you can download pre-prepared Jupyter Playbook).

docker-compose.yml
version: '3.4'

services:
  jupyternotebookextension:
    image: janecekkrenek/jupyter-notebook-kernels-extension
    ports:
      - 8888:8888
    volumes:
      - ./Notebooks:/home/jovyan/work
    networks:
      - external_network
    environment:
      TZ: "Europe/Prague"

networks:
external_network:

Python

  1. Client installation

    For easy usage of HEAppE Middleware is necessary to install HEAppEC from Github.

    Example:
    import sys
    
    !{sys.executable} -m pip install git+https://github.com/It4innovations/heappeac.git
    
  2. Client application setup
    There are four variables that the user must set to configure the client application successfully.
    • configuration.host (HEAppE intance URL),

    • projectIdentificator (HPC project),

    • username

    • password

    You should receive these from your contact on the HEAppE team. When you fill these as a string (it should be written within). In case everything went successfully, you should see HEAppE instance Python client prepared message.

    Example:
    import json
    import os
    import time
    from io import StringIO
    from pathlib import Path
    
    import paramiko
    from paramiko import SSHClient
    from scp import SCPClient
    
    import heappeac as hp
    
    configuration = hp.Configuration()
    configuration.host = "${HEAppE_URL}"
    projectIdentificator = "${hpc_project}"
    username = "${username}"
    password = "${password}"
    api_instance = hp.ApiClient(configuration)
    
    print("HEAppE instance Python client prepared")
    
  3. Available HEAppE command templates

    HEAppE cluster information The example below returns information about the available clusters, queues and templates. The most important values you need to find in this output are:

    • Id of the cluster you want to use, later ClusterId

    • Id of the queue on the given cluster, later ClusterNodeTypeId

    • Id of the command template under the selected node, later CommandTemplateId

    You will need to use these at the job creation. Additionally, there is other interesting information about given queues and templates. For the queue, you can find examples of how many nodes are available for the given queue, what is maximal walltime in seconds, and how many cores are available at one node. About the templates, the main thing of concern are the template parameter names.

    Example:
    print("Fetching cluster info...")
    lac_body = {
        "_preload_content": False
    }
    
    ciEndpoint = hp.ClusterInformationApi(api_instance)
    r = ciEndpoint.heappe_cluster_information_list_available_clusters_get(**lac_body)
    r_data = json.loads(r.data)
    print(json.dumps(r_data, indent = 3))
    
  4. Authentication

    Before the use of the HEAppE API, you must authenticate yourself. Authentication will provide you a session token which can then be used for 20 minutes. The Session token is then stored in session_code variable.

    Example:
    print(f"Authenticating to HEAppE instance through {username}...")
    cred = {
        "_preload_content": False,
        "body": {
            "Credentials": {
                "Username": username,
                "Password": password
            }
        }
    }
    
    ulmEndpoint = hp.UserAndLimitationManagementApi(api_instance)
    r = ulmEndpoint.heappe_user_and_limitation_management_authenticate_user_password_post(**cred)
    session_code = json.loads(r.data)
    print(f"Session code: {session_code}")
    
  5. Available project

    At this point, you will obtain specifications of the HPC computational project, that is needed for job creation.

    Example:
    print("Fetching available computational projects...")
    lproj_body = {
        "_preload_content": False,
        "SessionCode": session_code
    }
    
    r = ulmEndpoint.heappe_user_and_limitation_management_projects_for_current_user_get(**lproj_body)
    r_data = json.loads(r.data)
    project = next(f["Project"] for f in r_data if f["Project"]["AccountingString"] == projectIdentificator)
    print(json.dumps(project, indent = 3))
    
  6. Job specification

    At this point, you will create the job specification. Before running the job it is important to specify the parameters of the job. This specification consists of two part the job specification and the tasks specification.

    Note

    There are many parameters that can be set up and you can see all of them in the cell with JobSpecification example, but the ones that are not necessary for common runs are commented out.

    In the job specification you will choose which computational project resources should be used under the field Project, then choose which cluster you want to use by setting ClusterId and set FileTransferMethodId. Optionally, you can specify the Name of the job, which will assign this job name in HEAppE and also you can set up a notification by email when the job starts, finishes, or is aborted.

    For the specification of the tasks you need to specify the task that should be executed, each task is one computational job in the sense of the HPC cluster, which means it is a job submission to the Batch scheduler. There are exceptions where one task can consist of multiple submissions, such as a task longer than the walltime limit of the queue, but we will not consider that at the moment. If you are interested in advanced capabilities, please contact the HEAppE team for support.

    Generally, users want to define just one task and this will be used in our example. In the task you can specify:

    • Name which stands for the name of the job that will be sent to the cluster scheduler

    • MinCores and MaxCores, which stands for how many cores you need for the application [1]

    • Walltime which specify how many second should be the nodes reserved for [2]

    • ClusterNodeTypeId stands for the Id of the NodeType that you marked previously

    • CommandTemplateId stands for the Id of CommandTemplate you want to use that belongs to the selected NodeType

    • TemplateParameterValues contain the values of the parameters for the given HEAppE Template. It consists of a list of parameter values which are described by two items:
      • CommandParameterIdentifier - in other words parameter name as given in the script

      • ParameterValue - a string which contains the parameter value

    • TaskParalizationParameters these parameters are used to set up the number of MPI processes and OpenMP processes that should be used [3]
      • MPIProcesses - this is equivalent to the Batch scheduler mpiprocs parameter. It sets the number of MPI processes on one node, even if more nodes are allocated. Each node will have this number of unique MPI processes. Therefore, if the computational node has 128 cores and you want to have 1 MPI process per core then this parameter should be 128 [4]

      • OpenMPThreads - is the number of OpenMP threads on one node. Commonly, the number of OpenMP threads * MPIProcesses equals to the number of cores available on the nodes, but many other combinations may be used depending on the use case, e.g. 18 [5]

      • MaxCores - this is the number of cores used for the given setting. In our case when we have only one setting for all the cores, this number should equal to the MaxCores given in the Tasks description, e.g. 128

    Foot notes:

Example:
print("Creating job template...")
job_spec_body = {
    "_preload_content": False,
    "body": {
        "JobSpecification": {
            "Name": "job_1",
            #"NotificationEmail": "string",
            #"PhoneNumber": "string",
            #"NotifyOnAbort": true,
            #"NotifyOnFinish": true,
            #"NotifyOnStart": true,
            "ClusterId": 2,
            "FileTransferMethodId": 2,
            "ProjectId": project["Id"],
            #"WaitingLimit": 0,
            #"IsExtraLong": true,
            "EnvironmentVariables": [],
            "Tasks": [
                {
                    "Name": "task_1",
                    "MinCores": 1,
                    "MaxCores": 128,
                    "Priority": 4,
                    "WalltimeLimit": 600,
                    #"PlacementPolicy": "string"
                    #"RequiredNodes": [
                    #   "string"
                    # ],
                    #"ClusterTaskSubdirectory": "string",
                    #"CpuHyperThreading": true,
                    #"JobArrays": "string",
                    #"IsExclusive": true,
                    #"IsRerunnable": true,
                    #"StandardInputFile": "stdin",
                    "StandardOutputFile": "stdout",
                    "StandardErrorFile": "stderr",
                    "ProgressFile": "stdprog",
                    "LogFile": "stdlog",
                    "ClusterNodeTypeId": 8,
                    "CommandTemplateId": 3,
                    #"TaskParalizationParameters": [
                        #{
                            #"MPIProcesses": 128,
                            #"OpenMPThreads": 1,
                            #"MaxCores": 128
                        #}
                    #],
                    #"EnvironmentVariables": [
                        #{
                            #"Name": "string"
                            #"Value": "string"
                        #}
                    #],
                    #"DependsOn": [
                    #   "string"
                    # ],
                    "TemplateParameterValues": [
                        {
                            "CommandParameterIdentifier": "inputParam",
                            "ParameterValue": "testValue"
                        }
                    ]
                }
            ]
        },
        "SessionCode": session_code
    }
}

jmEndpoint = hp.JobManagementApi(api_instance)
r = jmEndpoint.heappe_job_management_create_job_post(**job_spec_body)
r_data = json.loads(r.data)
job_id = r_data["Id"]
tasks = r_data["Tasks"]
print(f"Job ID: {job_id}")
  1. File transfer

    In case you want to transfer data to the computation folder, you can do it in multiple ways, each with its own advantages and shortcomings.

    • Do it in the HEAppE template by transferring data from the cluster to the HEAppE folder. you can use copy (cp), or link (ln).x.

    • Transfer the data from your local computer through HEAppE API via SCP/SFTP.

    The second alternative is done in the code example. You can define the data_input_path which should be the path to the folder you want to copy to the HEAppE job folder. The folder structure then will be accessible from the working directory of the job.

    Example:
    # Set the data input path
    data_input_path = "./data_transfer/input/example"
    
    # Copying files from input folder to job folder on HPC cluster
    print("Preparation of copying files ...")
    ft_body = {
        "_preload_content": False,
        "body": {
            "SubmittedJobInfoId": job_id,
            "SessionCode": session_code
        }
    }
    
    ftEndpoint = hp.FileTransferApi(api_instance)
    r = ftEndpoint.heappe_file_transfer_request_file_transfer_post(**ft_body)
    jobtransfer = json.loads(r.data)
    
    print("Copying files...")
    ssh = SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    pkey_file = StringIO(jobtransfer["Credentials"]["PrivateKey"])
    match jobtransfer["FileTransferCipherType"]:
        case 1:
            pkey = paramiko.RSAKey.from_private_key(pkey_file)
        case 2:
            pkey = paramiko.RSAKey.from_private_key(pkey_file)
        case 3:
            pkey = paramiko.ECDSAKey.from_private_key(pkey_file)
        case 4:
            pkey = paramiko.ECDSAKey.from_private_key(pkey_file)
        case default:
            pkey = paramiko.RSAKey.from_private_key(pkey_file)
    
    ssh.connect(jobtransfer["ServerHostname"], username=jobtransfer["Credentials"]["Username"], pkey=pkey)
    base_path = jobtransfer["SharedBasepath"]
    with SCPClient(ssh.get_transport()) as scp:
        for task in tasks:
            task_id = str(task["Id"])
            scp.put(data_input_path, os.path.join(base_path, task_id))
    ssh.close()
    
    
    ft_body = {
        "_preload_content": False,
        "body": {
            "SubmittedJobInfoId": job_id,
            "PublicKey": jobtransfer["Credentials"]["PublicKey"],
            "SessionCode": session_code
        }
    }
    
    r = ftEndpoint.heappe_file_transfer_close_file_transfer_post(**ft_body)
    r_data = json.loads(r.data)
    print("Copying input files finished.")
    
  2. Job submission

    At this point, everything should be ready to submit for the job. The next code example will submit the job created in the previous step and give you information about the job status. The status that the job can take is:

    • 1 - Configuring

    • 2 - Submitted

    • 4 - Queued

    • 8 - Running

    • 16 - Finished

    • 32 - Failed

    • 64 - Cancelled

    • 128 - Waiting for free service cluster account (Only in case when cluster account rotation is disabled)

    Example:
    print(f"Submitting job {job_id}...")
    submit_body = {
        "_preload_content": False,
        "body":
            {
                "CreatedJobInfoId": job_id,
                "SessionCode": session_code
            }
    }
    
    r = jmEndpoint.heappe_job_management_submit_job_put(**submit_body)
    r_data = json.loads(r.data)
    
    
    print(f"Waiting for job {job_id} to finish...")
    gcji_body = {
        "_preload_content": False,
        "SessionCode": session_code,
        "SubmittedJobInfoId": job_id
    }
    
    while True:
        r = jmEndpoint.heappe_job_management_current_info_for_job_get(**gcji_body)
        r_data = json.loads(r.data)
        state = r_data["State"]
        if r_data["State"] == 16:
            print(f"The job has finished.")
            break
        if r_data["State"] == 32:
            print(f"The job has failed.")
            break
        if r_data["State"] == 64:
            print(f"The job has canceled.")
            break
        print(f"Waiting for job {job_id} to finish... current state: {state}")
        time.sleep(30)
    
  3. Download the data

    Now it is possible to download data from job folder to your computer in the similar manner as the upload worked.

    Note

    It is necessary to define path to downloading files (In our example it is data_transfer).

    Example:
    # Set the data output path
    data_output_path = "./data_transfer/output"
    
    print("Fetching generated files...")
    ft_body = {
        "_preload_content": False,
        "body": {
            "SubmittedJobInfoId": job_id,
            "SessionCode": session_code
        }
    }
    
    ftEndpoint = hp.FileTransferApi(api_instance)
    r = ftEndpoint.heappe_file_transfer_request_file_transfer_post(**ft_body)
    jobtransfer = json.loads(r.data)
    
    lchjf_body = {
        "_preload_content": False,
        "SessionCode": session_code,
        "SubmittedJobInfoId": job_id
    }
    
    r = ftEndpoint.heappe_file_transfer_list_changed_files_for_job_get(**lchjf_body)
    r_data = json.loads(r.data)
    producedFiles = [os.path.normpath(path["FileName"]) for path in r_data]
    print(f"Files changed during job execution: {producedFiles}")
    print("Fetching the files...")
    
    ssh = SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    pkey_file = StringIO(jobtransfer["Credentials"]["PrivateKey"])
    match jobtransfer["FileTransferCipherType"]:
        case 1:
            pkey = paramiko.RSAKey.from_private_key(pkey_file)
        case 2:
            pkey = paramiko.RSAKey.from_private_key(pkey_file)
        case 3:
            pkey = paramiko.ECDSAKey.from_private_key(pkey_file)
        case 4:
            pkey = paramiko.ECDSAKey.from_private_key(pkey_file)
        case default:
            pkey = paramiko.RSAKey.from_private_key(pkey_file)
    
    ssh.connect(jobtransfer["ServerHostname"], username=jobtransfer["Credentials"]["Username"], pkey=pkey)
    base_path = jobtransfer["SharedBasepath"]
    
    with SCPClient(ssh.get_transport()) as scp:
        for fn in producedFiles:
            specpath = fn[1:]
            Path(os.path.dirname(f"{data_output_path}/{job_id}/{specpath}")).mkdir(parents=True, exist_ok=True)
            print(f"{data_output_path}/{job_id}/{specpath}")
            print(os.path.join(base_path, specpath))
            scp.get(os.path.join(base_path, specpath), f"{data_output_path}/{job_id}/{specpath}")
    ssh.close()
    
    ft_body = {
        "_preload_content": False,
        "body": {
            "SubmittedJobInfoId": job_id,
            "PublicKey": jobtransfer["Credentials"]["PublicKey"],
            "SessionCode": session_code
        }
    }
    
    r = ftEndpoint.heappe_file_transfer_close_file_transfer_post(**ft_body)
    r_data = json.loads(r.data)
    print("\n")
    print(", ".join(producedFiles) + " fetched")
    
  4. Cleanup

    When you have data downloaded and the job is finished it is necessary to manually cleanup after the HEAppE job. That can be done by executing the example code. This will delete all the data related to the job and the HEAppE job folder itself.

    Example:
    print(f"Removing job {job_id}...")
    ft_body = {
        "_preload_content": False,
        "body": {
            "SubmittedJobInfoId": job_id,
            "SessionCode": session_code
        }
    }
    
    r = jmEndpoint.heappe_job_management_delete_job_delete(**ft_body)
    r_data = json.loads(r.data)
    print(r_data)
    
  5. Job resources reporting

    Executing the code example will return the information about the executed job such as time of submission, time of job end and used core-hours.

    Example:
    print("Fetching resource usage report...")
    rur_body = {
        "_preload_content": False,
        "SessionCode": session_code,
        "JobId": job_id
    }
    
    jrEndpoint = hp.JobReportingApi(api_instance)
    r = jrEndpoint.heappe_job_reporting_resource_usage_report_for_job_get(**rur_body)
    r_data = json.loads(r.data)
    print(json.dumps(r_data["Clusters"], indent = 3))