Skip to content

Punch

Overview

The punch node lets you run a punchlet on incoming Dataset rows. Say you receive a dataset as follows:

|  column1 | column2 | column3 |
|  "hello" | true    | 17      |

The punchlet will receive a json document like

  {
    "column1" : "hello",
    "column2" : true,
    "column3" : 17
  }

The punchlet can produce one or several addcolumns. Here is a punchlet that will simply add columns

{
  [column4] = [column1] + " world";
  [column5] = [column3] * 2;
}

This will produce the following results:

```sh
|  column1 | column2 | column3 | column4       | column5 |
|  "hello" | true    | 17      | "hello world" |  19     |

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

Basic

---
type: punchline
runtime: spark
version: '6.0'
dag:
- type: punch
  component: punch
  settings:
    punchlet_code: |-
      {
        [column4] = [column1] + "world";
        [column5] = [column3] * 2;
      }
    output_columns:
    - type: string
      field: column4
    - type: integer
      field: column5
  subscribe:
  - component: input
    stream: documents
  publish:
  - stream: documents

Generating Several Rows

Your punchlet can output an array of values instead of just a single json document. In that case, as many rows will be generated in the output dataset.

---
type: punchline
runtime: spark
version: '6.0'
dag:
- type: punch
  component: punch
  settings:
    punchlet_code: |-
      {
        Tuple results;
        for (int i = 0; i < 3; i++) {
          Tuple tmp;
          tmp:[column4] = [column1] + i;
          tmp:[column5] = [column3] + i;
          results.append(tmp);
        }
        // this is a notation to overwrtop level document
        root:/ = results;
      }
    output_columns:
    - type: string
      field: column4
    - type: integer
      field: column5
  subscribe:
  - component: input
    stream: documents
  publish:
  - stream: documents

The expected output is as follows:

|  column1 | column2 | column3 | column4    | column5 |
|  "hello" | true    | 17      | "hello 0"  | 17      |
|  "hello" | true    | 17      | "hello 1"  | 18      |
|  "hello" | true    | 17      | "hello 2"  | 19      |

Resources

Output documents will be appended in the output dataset. For each input document, the punch can either provide an output document, or multiple output documents (by providing an array as root tuple).

You can also provide external resources by adding it in resources setting. Those resources are accessible in punchlet code through the Java function:

/**
* Return a provided resource
* @param resourceName name of the resource (subscription name or "resources" map key)
* @param resourceType type of the resource
* @return the resource
*/
public <T> T getResource(String resourceName, Class<T> resourceType)

Warning

You must use this node instead of punch_stage if you need to provide a resource from an other node during punchlet execution.

---
type: punchline
runtime: spark
version: "6.0"
dag:
- type: file_model_input
  component: model_loader
  settings:
    file_path: model.bin
  publish:
  - stream: model
  subscribe: []
- type: punch
  component: punch
  settings:
    punchlet_code: | 
      {
        [base64] = [name].encodeBase64();
        [decade] = [age] % 10;
        [pipelineModel] = getResource(\"resource_1\",PipelineModel.class);
        [my_resource] = getResource(\"resource_2\",String.class);
        // Do something with my resources
        print(root);
      }
    output_columns:
    - type: string
      field: base64
    - type: integer
      field: decade
    resources:
      resource_1: model_loader_model
      resource_2: hello
  subscribe:
  - component: input
    stream: data
  - component: model_loader
    stream: model
  publish:
  - stream: data

Info

As you can see, we use 2 different resources of two different types. The first one is a resource calculated within the job with the file_model_input node. To use this resource we have to set the value of resource to : component_stream (here : model_loader_model) and set the type return by the node file_model_input within the punchlet (here : PipelineModel.class). The second one is a constant resource of String type, you can define any type (Integer, String ..) in order to use it in your punchlet.

Parameters

Name Type mandatory Default value Description
punchlet_code String false "{}" Punchlet code. Override "punchlet_code_file".
punchlet_code_file String false NONE Punchlet code file readable from driver.
input_columns String false NONE If not set, all the dataset row columns will be visible to the punchlet and You can specifically narrow the number of exposed columns by defining input_columns.
output_column List of Json false NONE List of additional columns, i.e. the one added by the punchlet.
resources List of Json false NONE Map of resources provided during punchlet execution.