Skip to content

Punch

Overview

The punch node lets you run a punchlet on incoming Dataset rows. Say you receive a dataset as follows:

|  column1 | column2 | column3 |
|  "hello" | true    | 17      |

The punchlet will receive a json document like

  {
    "column1" : "hello",
    "column2" : true,
    "column3" : 17
  }

The punchlet can produce one or several addcolumns. Here is a punchlet that will simply add columns

{
  [column4] = [column1] + " world";
  [column5] = [column3] * 2;
}

This will produce the following results:

```sh
|  column1 | column2 | column3 | column4       | column5 |
|  "hello" | true    | 17      | "hello world" |  19     |

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

Basic

{
  type: punchline
  runtime: spark
  version: "6.0"
  tenant: mytenant
  dag: [
      {
        type: punch
        component: punch
        settings:
        {
          punchlet_code: 
          '''
          {
            [column4] = [column1] + "world";
            [column5] = [column3] * 2;
          }
          '''
          output_columns: [
            {
              type: string
              field: column4
            }
            {
              type: integer
              field: column5
            }
          ]
        }
        subscribe:
        [
          {
            component: input
            stream: documents
          }
        ]
        publish:
        [
          {
            stream: documents
          }
        ]
      }
  ]
}

Generating Several Rows

Your punchlet can output an array of values instead of just a single json document. In that case, as many rows will be generated in the output dataset.

{
  type: punchline
  runtime: spark
  version: "6.0"
  tenant: mytenant
  dag: [
      {
        type: punch
        component: punch
        settings:
        {
          punchlet_code: 
          '''
          {
            Tuple results;
            for (int i = 0; i < 3; i++) {
              Tuple tmp;
              tmp:[column4] = [column1] + i;
              tmp:[column5] = [column3] + i;
              results.append(tmp);
            }
            // this is a notation to overwrtop level document
            root:/ = results;
          }
          '''
          output_columns: [
            {
              type: string
              field: column4
            }
            {
              type: integer
              field: column5
            }
          ]
        }
        subscribe:
        [
          {
            component: input
            stream: documents
          }
        ]
        publish:
        [
          {
            stream: documents
          }
        ]
      }
  ]
}

The expected output is as follows:

|  column1 | column2 | column3 | column4    | column5 |
|  "hello" | true    | 17      | "hello 0"  | 17      |
|  "hello" | true    | 17      | "hello 1"  | 18      |
|  "hello" | true    | 17      | "hello 2"  | 19      |

Resources

Output documents will be appended in the output dataset. For each input document, the punch can either provide an output document, or multiple output documents (by providing an array as root tuple).

You can also provide external resources by adding it in resources setting. Those resources are accessible in punchlet code through the Java function:

/**
* Return a provided resource
* @param resourceName name of the resource (subscription name or "resources" map key)
* @param resourceType type of the resource
* @return the resource
*/
public <T> T getResource(String resourceName, Class<T> resourceType)

Warning

You must use this node instead of punch_stage if you need to provide a resource from an other node during punchlet execution.

{
    type: file_model_input
      component: model_loader
      settings:
      {
              file_path: model.bin
      }
      publish: [
        {
          stream: model
        }
      ]
      subscribe: []
}
{      
    type: punch
    component: punch
    settings:
    {
        punchlet_code: 
        '''
        {   
            [base64] = [name].encodeBase64();
            [decade] = [age] % 10;
            [pipelineModel] = getResource("resource_1",PipelineModel.class);
            [my_resource] = getResource("resource_2",String.class);
            // Do something with my resources
            print(root);
        }
        '''
        output_columns: [
            {
                type: string
                field: base64
            }
            {
                type: integer
                field: decade
            }

        ]
        resources: {
            resource_1: model_loader_model
            resource_2 : hello
        }

    }
    subscribe:
    [
        {
            component: input
            stream: data
        }
        {
            component: model_loader
            stream: model
        }

    ]
    publish:
    [
        {
            stream: data
        }
    ]
            }

Info

As you can see, we use 2 different resources of two different types. The first one is a resource calculated within the job with the file_model_input node. To use this resource we have to set the value of resource to : component_stream (here : model_loader_model) and set the type return by the node file_model_input within the punchlet (here : PipelineModel.class). The second one is a constant resource of String type, you can define any type (Integer, String ..) in order to use it in your punchlet.

Parameters

Name Type mandatory Default value Description
punchlet_code String false "{}" Punchlet code. Override "punchlet_code_file".
punchlet_code_file String false NONE Punchlet code file readable from driver.
input_columns String false NONE If not set, all the dataset row columns will be visible to the punchlet and You can specifically narrow the number of exposed columns by defining input_columns.
output_column List of Json false NONE List of additional columns, i.e. the one added by the punchlet.
resources List of Json false NONE Map of resources provided during punchlet execution.