Parallec-logo

Generate & Submit Task

The most efficient way to use Parallec is by reviewing the example codes.

Also review the javadoc

APIs on Request Generation

First lets check on the ones that are protocol independent.

  • setTargetHostsFrom*() is critical to set the target hosts, you may check the details on the following section
  • setConcurrency() is important when you want to change how fast/slow to send the requests.

Details on configs please check here.

API Required Default If Not Set Details
setTargetHostsFrom*() Required No default. Must be set. To set target host list from a string, list, line by lien text, jsonpath, and cms query from local or remote.
setProtocol() Optional HTTP Normally we do not need to set this. When you do prepare*() it is already set. When it is HTTPS. you will need to set it
setConcurrency() Optional concurrencyDefault (1000) The concurrency level. You may send 100,000 requests but send it very slowly.
setConfig() Optional default values Configs about various timeout, whether to auto save responses. whether to enable the response. This can set all the task level over writable configurations.
setEnableCapacityAware TaskScheduler() Optional false: not enabled After enabled, can accommodate the traffic.
async() Optional false, default to sync mode. To run the parallel task async . You may check if the task is completed later.

Capacity aware scheduler

When enabled, task will be pushed to the wait queue instead of immediate execution. A daemon thread (by newSingleThreadScheduledExecutor) will check every 0.5 second if there is capacity to run a task from the wait queue.

This is useful to protect our application when there are multiple concurrent ParallelTasks, and each has high currency requirement.

By default this scheduler is disabled. This is unnecessary unless there are high load or tasks are submitted without our control (e.g. serve as a server).

APIs on Response Handling

API Required Default If Not Set Details
handleInWorker() handleInManager() Optional HandleInManager() Please check the Response Handler Location below for details.
setResponseContext() Optional An empty hashmap Useful when need to pass arbitrary objects from/to the response handler: e.g. pass in an Elastic Search or Kafka Client, or a hashmap to store / process / aggregate the responses.
execute() Required n/a Key function to execute the parallel task. It will first do a validation on required data before the execution.
setAutoSaveLogToLocal() Optional False Will auto save logs to the local file system. The logs by default are written to path "userdata/task/logs" folder. Note that it is user's responsibility to clearn these logs.
setSaveResponseToTask() Optional False If true, will save response to the ParallelTask object. In default, only status code is saved.
Response Handler Location

This is about when to call the user defined response handler's onComplete() function.

  • Handler in manager (default): Call response handler in manager (in a sequence after aggregation) Default mode. In this mode, will trigger the user defined response hander after response is passed back from worker to manager. This is the default mode. Be cautious on using long blocking operation in the handler onComplete() function. Because a long operation may block the whole flow because each response will need to go through here.
  • Handler in worker: Call response handler in operation worker (in parallel before aggregation). Handle the user defined onComplete() function in worker before aggregation (handle in parallel). Be cautious on concurrency / lock control if save the response to a common data store. Also when you define the concurrency level, take into account of the time needed to hander the response.

Set Target Hosts

Parallec provide flexible way to input multiple target hosts from list, string, line by line text, json path, cms query from local or remote urls.

Check the following .setTargetHosts*() functions to set the target hosts.

From Java List
.setTargetHostsFromList(Arrays.asList("www.jeffpei.com", "www.restcommander.com"));
From Java String

From a single string as separate by whitespace.

.setTargetHostsFromString("www.jeffpei.com www.restcommander.com");
From Line by Line Text

From a local file containing host names line by line. Relative or absolute paths are both supported.

.setTargetHostsFromLineByLineText("userdata/sample_target_hosts_top100_old.txt",
                         HostsSourceType.LOCAL_FILE)

Also, you can set target hosts from such a file from a web URL. (This will use the apache server)

.setTargetHostsFromLineByLineText("http://www.restcommander.com/docs/sample_target_hosts_top100.txt",
                         HostsSourceType.URL)
From Json Path

JsonPath is useful to extract host name list from a json file

Here is a sample json file that contains host names.

As long as seperate by whitespace

String jsonPath = "$.sample.small-target-hosts[*].hostName";

.setTargetHostsFromJsonPath(jsonPath,
                "http://parallec.github.io/userdata/sample_target_hosts_json_path.json", HostsSourceType.URL);

You may also load such jsons from local file system too.

From YiDB/CMS Query

YiDB a.k.a CMS (Configuration Manage System internally) may store the cloud topology information.

Here is a sample CMS Query Result that contains host names.

Parallec will auto load target hosts from CMS query and handles paginations for you.

public final String URL_CMS_QUERY_MULTI_PAGE = "http://parallec.github.io/cms/repositories/cmsdb/branches/main/query/sample_cms_query_results_multi_page_1.json";

.setTargetHostsFromCmsQueryUrl(URL_CMS_QUERY_MULTI_PAGE);

Timeout in URLs

ParallecGlobalConfig defines the timeout using the remote URL to get target hosts. One may change it before usage.

/** The url connection connect timeout millis. Used when load target host from URL/CMS*/
public static int urlConnectionConnectTimeoutMillis = 6000;

/** The url connection read timeout millis. Used when load target host from URL/CMS*/
public static int urlConnectionReadTimeoutMillis = 15000;

APIs on HTTP

Most of the APIs to set HTTP properties are named with setHttp*(), except for the setAsyncHttpClient() which overwrite the asyncHttpClient used.

Set HTTP Method, URL and Protocol

  • prepareHttp*() will set the url and HTTP Method. e.g., prepareHttpGet("/index.html") means to conduct HTTP GET http://[targethost]:/index.html
  • When need to do HTTPS, will do setProtocol(RequestProtocol.HTTPS)

Set HTTP Header

Here is the sample code to set HTTP Header with help of ParallecHeader.

.setHttpHeaders(new ParallecHeader().addPair("content-type", "application/json").addPair("key", "value"))
API Required Default If Not Set Details
prepareHttp*() Required n/a This is the starting point. Will will set the HttpMethod (e.g. GET/POST/PUT/DELETE) and the url.
setHttpHeaders() Optional empty Add headers using ParallecHeader. Check example above.
setHttpEntityBody() Optional empty For example, a POST body for the request.
setHttpPort() Optional port 80 Set HTTP port.
setHttpPoller Processor() Optional empty Sets the HTTP poller processor to handle Async API, will auto enable the pollable mode with this call. Details check here.
setAsyncHttpClient() Optional Embed fast one from the store You may overwrite the client to your customized one for each task. The default one is the embed fast one from HttpClientStore.
saveResponseHeaders() Optional false Save HTTP response headers. Please check ResponseHeaderMeta and the example.

APIs on SSH

The APIs to set SSH properties are named with setSsh*().

API Required Default If Not Set Details
prepareSsh() Required n/a Starting point of ssh. Set protocol as SSH.
setSsh CommandLine() Required n/a The command flow you would like to execute.
setSshPort() Optional 22 The SSH Port.
setSsh UserName() Required n/a User name when login
setSsh LoginType() Optional empty The login is either key or password based.
setSshPassword() Optional n/a The ssh login password. Will also auto set the login type to password
setSsh Connection TimeoutMillis() Optional 5000 millisec Connection timeout. Default to 5000 millisec in global config.
setSshPrivKey RelativePath() Optional n/a Note that this path must be relative to the project e.g. "userdata/fake-privkey.txt". This API assumes no passphrase for the private key. Will also auto set the login type to key based.
setSshPrivKey RelativePath WtihPassphrase() Optional n/a Note that this path must be relative to the project. argument include a private key path with passphrase. Will also auto set the login type to key based.

APIs on PING

The APIs to set Ping properties are named with setPing*().

Details of the two modes of implementations can be found in PingProvider.java

API Required Default If Not Set Details
preparePing() Required n/a Starting point of ssh. Set protocol as "Ping".
setPingMode() Optional InetAddress Process or InetAddress based. Default as InetAddress mode. InetAddress requires Root privilege.
setPing TimeoutMillis() Optional 500 The timeout in milliseconds.
setPing NumRetries() Optional 1 The number of retries.

APIs on TCP

The APIs to set TCP properties are named with setTcp*().

API Required Default If Not Set Details
prepareTcp() Required n/a Starting point of TCP request. Set protocol as "TCP" and the request string.
setTcpPort() Required n/a A port number server listens on.
setTcp Connect TimeoutMillis() Optional use default 2000 The connection timeout in milliseconds.
setTcpIdle TimeoutSec() Optional use default 5 The idle timeout for the channel to close the connection.
setTcp ChannelFactory() Optional use the default one If not set, will use the default one in TcpSshPingResourceStore.

APIs on UDP

The APIs to set UDP properties are named with setUdp*().

API Required Default If Not Set Details
prepareUdp() Required n/a Starting point of UDP request. Set protocol as "UDP" and the request string.
setUdpPort() Required n/a A port number server listens on.
setUdp IdleTimeoutSec() Optional use default 2 The idle timeout for the channel to close the connection. Similar to a read timeout.

APIs on Variable Replacement for Heterogeneous Requests

When the protocol is HTTP, the request's entity body, request URL, and also the header part can be putting as a template with variable denoted with "$VariableName".

More complex replacement samples are available in the test cases.

Different requests to different target hosts

Here is the example of hitting 3 different APIs on 3 different servers. $JOB_ID is the variable being replaced. The API to use is setReplacementVarMapNodeSpecific(). Complete sample code is here.

Map<String, StrStrMap> replacementVarMapNodeSpecific = new HashMap<String, StrStrMap>();
replacementVarMapNodeSpecific.put("www.parallec.io",
        new StrStrMap().addPair("JOB_ID", "job_a"));
replacementVarMapNodeSpecific.put("www.jeffpei.com",
        new StrStrMap().addPair("JOB_ID", "job_b"));
replacementVarMapNodeSpecific.put("www.restcommander.com",
        new StrStrMap().addPair("JOB_ID", "job_c"));

pc.prepareHttpGet("/$JOB_ID.html")
        .setTargetHostsFromString(
                "www.parallec.io www.jeffpei.com www.restcommander.com")
        .setReplacementVarMapNodeSpecific(replacementVarMapNodeSpecific)
        .execute(new ParallecResponseHandler() {...}...


Different requests to the same target host

Here is the example of hitting 2 different APIs to the same target host. $ZIP is the variable being replaced. setReplaceVarMapToSingleTargetSingleVar(String variable, List replaceList, String uniformTargetHost) is the API to use. There are more complex replacements APIs available in javadoc. Complete sample code is here.

pc.prepareHttpGet("/userdata/sample_weather_$ZIP.txt")
    .setReplaceVarMapToSingleTargetSingleVar("ZIP",
        Arrays.asList("95037","48824"), "www.parallec.io")
    .setResponseContext(responseContext)
    .execute(new ParallecResponseHandler() {...}...

Regular Expression Response Filter

When defining the response handler, we provide a very simple regular expression based filter class FilterRegex to extract strings.

For example: with ResponseOnSingleTask res, we can apply parse on the response body.

String extractedString = new FilterRegex(
    ".*<td>JobProgress</td>\\s*<td>(.*?)</td>.*")
    .filter(res.getResponseContent());        

Async APIs and Auto Progress Polling

Motivation

In many RESTful services today, a job such as "create compute", "download package" may take indefinite amount of time. And these APIs are normally designed to be asynchronus. They immediately return a Job ID, by which you can poll for the job progress and check status. To achieve job level concurrency, it is essential to define a poller, which describes how to poll progress and when to stop as below. Here are the attributes in a poller. Please refer to the javadoc for more details.

  • The regex to get jobId
  • The regex to get the progress
  • The progress polling API (a template with the jobId)
  • The job completion regex
  • The job failure regex
  • The polling interval
Sample Server

Please check this example for complete code. If we have a job submission API that return a job ID, as in this sample server

//submit job 
HTTP POST: /submitJob return: {"status": "/status/01218499-a5fe-47cf-a0a8-8e9b106c5219", "progress": 0}

//poll progress 
HTTP GET: /status/{JobID}
Sample Poller
// Initialize the poller
String pollerType = "CronusAgentPoller";
String successRegex = ".*\"progress\"\\s*:\\s*(100).*}";
String failureRegex = ".*\"error\"\\s*:\\s*(.*).*}";
String jobIdRegex = ".*\"/status/(.*?)\".*";
String progressRegex = ".*\"progress\"\\s*:\\s*([0-9]*).*}";
int progressStuckTimeoutSeconds = 600;
int maxPollError = 5;
long pollIntervalMillis = 2000L;
String jobIdPlaceHolder = "$JOB_ID";
String pollerRequestTemplate = "/status/" + jobIdPlaceHolder;

HttpPollerProcessor httpPollerProcessor = new HttpPollerProcessor(
        pollerType, successRegex, failureRegex, jobIdRegex,
        progressRegex, progressStuckTimeoutSeconds, pollIntervalMillis,
        pollerRequestTemplate, jobIdPlaceHolder, maxPollError);

To enable the poller defined above, simply call .setHttpPollerProcessor(httpPollerProcessor) when building the task. Parallec will then automatically poll the task progress until it is successful or failure for you, and enable the job level concurrency.