Skip to content

Postgresql Output

The Postgresql Output takes in as parameter the table name on which you want to insert data and the required information to connect to the database where the table is found (host, database name, etc...).

This bolt will insert data by executing an SQL query that will be dynamically constructed based on the parameters you provided to this bolt in your topology.

Insertion of geospatial data by using Postgis geospatial functions is also supported.

An example can be found below:

Requirements

The database and the table on which you want to work on must already be created. You must have a user account with the necessary priviledges (read and write). Please follow the version of your postgresql query syntax for the insertion geospatial objects.

Table Name: mytopology_table
Database Name: mytopology

SQL Query to create table:

create table mytopology_table (text1 TEXT, text2 TEXT);

Output result (After inserting data):

text1 text2
this is a text 1 this is a text 22
this is a text 3 this is a text 44
this is a text 5 this is a text 66

Run with punchlinectl

Simple example

{
  "dag": [
    {
      "type": "generator_input",
      "settings": {
        "messages": [
          { "logs": {  "text1": "this is a text 1", "text2": "this is a text22" }},
          { "logs": {  "text1": "this is a text 3", "text2": "this is a text44" }},
          { "logs": {  "text1": "this is a text 5", "text2": "this is a text66" }}
        ]
      },
      "storm_settings": {
        "component": "generator"

      }
    }
  ],
  "bolts": [
    {
      "type": "postgresql_bolt",
      "settings": {
        "hosts": ["localhost:5432"],
        "username": "yuechun",
        "password": "yuechun",
        "database": "mytopology",
        "table": "mytopology_table",
        "bulk_size": 3,
        "column_names": [
          "text1:cast",
          "text2:cast"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "postgres",
        "subscribe": [
          {
            "component": "generator",
            "stream": "logs",
            "fields": 
              [
                "text1",
                "text2"
              ]
          }
        ]
      }
    }
  ]
}

GeoSpatial topology example 1 (PostgreSQL only):

create table prt_test6 (
                          locLsu_locOneBounce_Point point,
                          suivi text );

Punchlet

// @test(encoding=json) {"logs":{"log":{"activity":-1,"altUpdate":false,"birthDate":1689,"cycleStartTime":16384,"lastActiveAt":20479,"locLsu":{"bigAxisM":644000,"distLsu":0,"locOneBounce":{"latitude":0.3355544357664034,"longitude":1.230478161628614},"quality":16,"smallAxisM":224769},"measure":{"azimuth":1.720853603154011,"stdErr":0.017453292519943295},"modulationEnum":"FSK_31","noteQualiteModulation":1,"paramFreq":{"bandwidth":1900,"centerFreq":1001000},"sensorId":2,"suivi":"UPDATE","trackId":20}}}
{
    /* Convert a Map into a dotted flatMap, example:
       {"a":{"b":"c"}} will become {"a_b":"c"}

       In our case, we convert the input JSON into
       proper Storm fields
    */

    [logs] = toFlatTuple().nestedSeparator("_").on([logs][log]);

    // check if fields exists
    if ([logs][locLsu_locOneBounce_latitude] && [logs][locLsu_locOneBounce_longitude]) {

      [logs][locLsu_locOneBounce_Point] = "(" + [logs][locLsu_locOneBounce_latitude] + ", " + [logs][locLsu_locOneBounce_longitude] + ")";

    // else assign fake values to prevent null while inserting
    } else {

      [logs][locLsu_locOneBounce_Point] = "(0, 0)";

    }

    // for inserting polygon of 3 points

    /*
    Data should be stream in the format below:

    [logs][poly] = "((1, 1), (2, 2), (3, 3))"
    */

    //print([logs]);
}

Topology

{
 "dag": [
    {
      "type": "file_spout",
      "settings": {
        "read_file_from_start": true,
        "path": "./a_csv.csv"
      },
      "storm_settings": {
        "component": "file_spout",
        "publish": [
          {"stream": "logs", "fields": ["log"]}
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "punchlet_node",
      "settings": {
        "punchlet": "./punchlets/convert_to_storm_fields_postgres.punch"
      },
      "storm_settings": {
        "executors": 1,
        "component": "fieldConverter",
        "subscribe": [
          {
            "component": "prtspout",
            "stream": "logs",
            "grouping": "localOrShuffle"
          }
        ],
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "locLsu_locOneBounce_Point",
              "suivi"
            ]
          }
        ]
      }
    },
    {
      "type": "postgresql_bolt",
      "settings": {
        "hosts": ["localhost:5432"],
        "username": "postgres",
        "password": "postgres",
        "database": "mytopology",
        "table": "prt_test6",
        "pool_name": "myPoolConnection",
        "pool_size": 10,
        "column_names": [
          "locLsu_locOneBounce_Point:cast",
          "suivi:cast"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "postgres",
        "subscribe": [
          {
            "component": "fieldConverter",
            "stream": "logs"
          }
        ]
      }
    }
  ]
}

GeoSpatial topology example 2 (PostgreSQL and Postgis):

Below is an example of inserting data using the ST_GeomFromText and POINT functions provided by postgis. 4326 is the SRID being used for our geospatial reference.

// @test(encoding=json) {"logs":{"log":{"activity":-1,"altUpdate":false,"birthDate":1689,"cycleStartTime":16384,"lastActiveAt":20479,"locLsu":{"bigAxisM":644000,"distLsu":0,"locOneBounce":{"latitude":0.3355544357664034,"longitude":1.230478161628614},"quality":16,"smallAxisM":224769},"measure":{"azimuth":1.720853603154011,"stdErr":0.017453292519943295},"modulationEnum":"FSK_31","noteQualiteModulation":1,"paramFreq":{"bandwidth":1900,"centerFreq":1001000},"sensorId":2,"suivi":"UPDATE","trackId":20}}}
{
    /* Convert a Map into a dotted flatMap, example:
       {"a":{"b":"c"}} will become {"a_b":"c"}

       In our case, we convert the input JSON into
       proper Storm fields
    */

    [logs] = toFlatTuple().nestedSeparator("_").on([logs][log]);

    // Manage 'POINT' datatype
    if ([logs][locLsu_locOneBounce_latitude] && [logs][locLsu_locOneBounce_longitude]) {
      [logs][locLsu_locOneBounce_point] = "ST_GeomFromText('POINT(" + [logs][locLsu_locOneBounce_longitude] + " " + [logs][locLsu_locOneBounce_latitude] + ")', 4326)";
    } else {
      [logs][locLsu_locOneBounce_point] = "ST_GeomFromText('POINT(0 0)', 4326)";
    }

    [logs][date_detected] = date("yyyy-MM-dd' 'HH:mm:ss").get(); 
    [logs][date_of_insertion] = "now()";
}

In your topology configuration, function parameter is used for locLsu_locOneBounce_point.

        ...

        "column_names": [
          "activity:cast",
          "altUpdate:cast",
          "birthDate:cast",
          "cycleStartTime:cast",
          "lastActiveAt:cast",
          "locLsu_bigAxisM:cast",
          "locLsu_distLsu:cast",
          "locLsu_locOneBounce_latitude:cast",
          "locLsu_locOneBounce_longitude:cast",
          "locLsu_locOneBounce_point:function",
          "locLsu_quality:cast",
          "locLsu_smallAxisM:cast",
          "measure_azimuth:cast",
          "measure_stdErr:cast",
          "modulationEnum:cast",
          "noteQualiteModulation:cast",
          "paramFreq_bandwidth:cast",
          "paramFreq_centerFreq:cast",
          "sensorId:cast",
          "suivi:cast",
          "trackId:cast",
          "date_detected:cast",
          "date_of_insertion:cast"
        ]

        ...
  • For more details about PostgreSQL native geometry syntax, refer to the link below

    https://www.postgresql.org/docs/9.3/datatype-geometric.html

Configuration(s)

  • hosts: String

    Description: Database server URLs.

  • username: String

    Description: A user who has full access on the database and table on which we want to insert data.

  • password: String

    Description: Password of the corresponding username.

  • database: String

    Description: The name of the database you want to work on.

  • column_names: List of String

    Description: A list of the table column names. Each values are delimited by :. On the left hand side, the name of the column and on the right hand side what should be done before inserting. i.e cast the data to the appropriate column type or use the given value as a function. Currently, two types are available: cast and function. The names provided should be in the same order as the query statement used to create the table (ex: create table hello (value1 text, value2 text); -> ["value1:cast", "value2:function"]).

  • pool_name: String

    Description: The name of the connection pooling to assign.

  • pool_size: Integer

    Description: The number of concurrent connection allowed on the database table.

Additional configurations will be added with time