Skip to content

Punch

Before you start...

Before using...

The punch node lets you run a punchlet on incoming Dataset rows. Say you receive a dataset as follows:

|  column1 | column2 | column3 |
|  "hello" | true    | 17      |
...

The punchlet will receive a json document like this:

  {
    "column1" : "hello",
    "column2" : true,
    "column3" : 17
  }

The punchlet can produce one or several additional columns. Here is a punchlet that will simply add two additional columns:

{
  [column4] = [column1] + " world";
  [column5] = [column3] * 2;
}

This will produce the following rows

|  column1 | column2 | column3 | column4        | column5 |
|  "hello" | true    | 17      | "hello world"  | 34      |
...

Tips

you can use the ''' special hjson tag to include punchlet code in a more readable and unescaped way. An example follows.

Pyspark ->

Spark ->

Examples

Use-cases

Our "hello world" punchline configuration.

beginner_use_case.punchline

{
  type: punchline
  runtime: spark
  version: "6.0"
  tenant: mytenant
  dag: [
      {
        type: punch
        component: punch
        settings:
        {
          punchlet_code: 
          '''
          {
            [column4] = [column1] + "world";
            [column5] = [column3] * 2;
          }
          '''
          output_columns: [
            {
              type: string
              field: column4
            }
            {
              type: integer
              field: column5
            }
          ]
        }
        subscribe:
        [
          {
            component: input
            stream: documents
          }
        ]
        publish:
        [
          {
            stream: documents
          }
        ]
      }
  ]
}

run beginner_use_case.punchline by using the command below:

CONF=beginner_use_case.punchline
punchlinectl start -p $CONF
What did we achieved ?

Selecting some of the input columns input_columns property for later usage !

Generating Several Rows

Your punchlet can output an array of values instead of just a single json document. In that case, as many rows will be generated in the output dataset.

intermediate_use_case.punchline

{
  type: punchline
  runtime: spark
  version: "6.0"
  tenant: mytenant
  dag: [
      {
        type: punch
        component: punch
        settings:
        {
          punchlet_code: 
          '''
          {
            Tuple results;
            for (int i = 0; i < 3; i++) {
              Tuple tmp;
              tmp:[column4] = [column1] + i;
              tmp:[column5] = [column3] + i;
              results.append(tmp);
            }
            // this is a notation to overwrite the top level document
            root:/ = results;
          }
          '''
          output_columns: [
            {
              type: string
              field: column4
            }
            {
              type: integer
              field: column5
            }
          ]
        }
        subscribe:
        [
          {
            component: input
            stream: documents
          }
        ]
        publish:
        [
          {
            stream: documents
          }
        ]
      }
  ]
}

let's excute it with the command below:

CONF=intermediate_use_case.punchline
punchlinectl start -p $CONF
Expected output
|  column1 | column2 | column3 | column4    | column5 |
|  "hello" | true    | 17      | "hello 0"  | 17      |
|  "hello" | true    | 17      | "hello 1"  | 18      |
|  "hello" | true    | 17      | "hello 2"  | 19      |

Resources

Output documents will be appended in the output dataset. For each input document, the punch can either provide an output document, or multiple output documents (by providing an array as root tuple).

You can also provide external resources by adding it in resources setting. Those resources are accessible in punchlet code through the Java function:

/**
* Return a provided resource
* @param resourceName name of the resource (subscription name or "resources" map key)
* @param resourceType type of the resource
* @return the resource
*/
public <T> T getResource(String resourceName, Class<T> resourceType)

Warning

You must use this node instead of punch_stage if you need to provide a resource from an other node during punchlet execution.

advanced_use_case.punchline

    {
        type: file_model_input
          component: model_loader
          settings:
          {
                  file_path: model.bin
          }
          publish: [
            {
              stream: model
            }
          ]
          subscribe: []
    }
    {      
        type: punch
        component: punch
        settings:
        {
            punchlet_code: 
            '''
            {   
                [base64] = [name].encodeBase64();
                [decade] = [age] % 10;
                [pipelineModel] = getResource("resource_1",PipelineModel.class);
                [my_resource] = getResource("resource_2",String.class);
                // Do something with my resources
                print(root);
            }
            '''
            output_columns: [
                {
                    type: string
                    field: base64
                }
                {
                    type: integer
                    field: decade
                }

            ]
            resources: {
                resource_1: model_loader_model
                resource_2 : hello
            }


        }
        subscribe:
        [
            {
                component: input
                stream: data
            }
            {
                component: model_loader
                stream: model
            }

        ]
        publish:
        [
            {
                stream: data
            }
        ]
    }

Info

As you can see, we use 2 different resources of two different types. The first one is a resource calculated within the job with the file_model_input node. To use this resource we have to set the value of resource to : component_stream (here : model_loader_model) and set the type return by the node file_model_input within the punchlet (here : PipelineModel.class). The second one is a constant resource of String type, you can define any type (Integer, String ..) in order to use it in your punchlet.

Parameters

Common Settings

Name Type mandatory Default value Description
punchlet_code String false "{}" Punchlet code. Override "punchlet_code_file".
punchlet_code_file String false NONE Punchlet code file readable from driver.
input_columns String false NONE If not set, all the dataset row columns will be visible to the punchlet and You can specifically narrow the number of exposed columns by defining input_columns.
output_column List of Json false NONE List of additional columns, i.e. the one added by the punchlet.
resources List of Json false NONE Map of resources provided during punchlet execution.

Advanced Settings

No advanced settings