Skip to content

MapdBolt

The MapdBolt Insert Bolt takes in as parameter the table name on which you want to insert data and the required information to connect to the database where the table is found (host, database name, etc...).

This bolt will insert data by executing an SQL query that will be dynamically constructed based on the parameters you provided to this bolt in your topology.

An example can be found be found below:

Requirements

The database and the table on which you want to work on must already be created. You must have a user account with the necessary priviledges (read and write). Please follow the version of your mapd query syntax for the insertion geospatial objects.

An example can be found be found below:

Table Name: mytopology_table
Database Name: mytopology

SQL Query to create table:

1
create table flights(arr_timestamp timestamp, dep_timestamp timestamp, uniquecarrier varchar(50));

Output result (After inserting data):

arr_timestamp dep_timestamp uniquecarrier
2017-04-23 06:30:00 2017-04-23 06:30:00 test
2017-04-23 06:30:00 2017-04-23 06:30:00 test
2017-04-23 06:30:00 2017-04-23 06:30:00 test

Run with punchplatform-topology.sh

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
{
  "spouts": [
    {
      "type": "generator_spout",
      "spout_settings": {
        "messages": [
          { "logs": {  "dep_timestamp": "2017-04-23 06:30:00", "uniquecarrier": "test", "arr_timestamp": "2017-04-23 06:30:00" }},
          { "logs": {  "uniquecarrier": "test", "arr_timestamp": "2017-04-23 06:30:00", "dep_timestamp": "2017-04-23 06:30:00" }},
          { "logs": {  "arr_timestamp": "2017-04-23 06:30:00", "dep_timestamp": "2017-04-23 06:30:00", "uniquecarrier": "test" }}
        ]
      },
      "storm_settings": {
        "component": "generator"

      }
    }
  ],
  "bolts": [
    {
      "type": "mapd_bolt",
      "bolt_settings": {
        "host": "localhost",
        "username": "mapd",
        "port": 9091,
        "password": "HyperInteractive",
        "database": "mapd",
        "table": "flights",
        "bulk_size": 1,
        "insert_type": "thrift_transport",
        "column_names": [
          "arr_timestamp",
          "dep_timestamp",
          "uniquecarrier"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "mapd",
        "subscribe": [
          {
            "component": "generator",
            "stream": "logs",
            "fields": 
            [
              "arr_timestamp",
              "dep_timestamp",
              "uniquecarrier"
            ]
          }
        ]
      }
    }
  ]
}

GeoSpatial topology example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
create table prt_test6 (
                          activity real,
                          altUpdate text,
                          birthDate real,
                          cycleStartTime real,
                          lastActiveAt real,
                          locLsu_bigAxisM real,
                          locLsu_distLsu real,
                          locLsu_locOneBounce_latitude float,
                          locLsu_locOneBounce_longitude float,
                          locLsu_locOneBounce_Point point,
                          locLsu_quality real,
                          locLsu_smallAxisM real,
                          measure_azimuth float,
                          measure_stdErr float,
                          modulationEnum text,
                          noteQualiteModulation real,
                          paramFreq_bandwidth real,
                          paramFreq_centerFreq real,
                          sensorId real,
                          suivi text,
                          trackId real );

Punchlet

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// @test(encoding=json) {"logs":{"log":{"activity":-1,"altUpdate":false,"birthDate":1689,"cycleStartTime":16384,"lastActiveAt":20479,"locLsu":{"bigAxisM":644000,"distLsu":0,"locOneBounce":{"latitude":0.3355544357664034,"longitude":1.230478161628614},"quality":16,"smallAxisM":224769},"measure":{"azimuth":1.720853603154011,"stdErr":0.017453292519943295},"modulationEnum":"FSK_31","noteQualiteModulation":1,"paramFreq":{"bandwidth":1900,"centerFreq":1001000},"sensorId":2,"suivi":"UPDATE","trackId":20}}}
{
    /* Convert a Map into a dotted flatMap, example:
       {"a":{"b":"c"}} will become {"a_b":"c"}

       In our case, we convert the input JSON into
       proper Storm fields
    */


    [logs] = toFlatTuple().nestedSeparator("_").on([logs][log]);

    // check if fields exists
    if ([logs][locLsu_locOneBounce_latitude] && [logs][locLsu_locOneBounce_longitude]) {

      [logs][locLsu_locOneBounce_Point] = "POINT(" + [logs][locLsu_locOneBounce_latitude] + " " + [logs][locLsu_locOneBounce_longitude] + ")";

    // else assign fake values to prevent null while inserting
    } else {

      [logs][lat_lon] = "POINT(0 0)";

    }


    // for inserting polygon of 3 points

    /*
    Data should be stream in the format below:

    [logs][poly] = "POLYGON((1 2,3 4,5 6,7 8,9 10))"
    */

    //print([logs]);
}

Topology

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
{
 "spouts": [
    {
      "type": "file_spout",
      "spout_settings": {
        "read_file_from_start": true,
        "path": "./a_csv.csv"
      },
      "storm_settings": {
        "component": "file_spout",
        "publish": [
          {"stream": "logs", "fields": ["log"]}
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "punch_bolt",
      "bolt_settings": {
        "punchlet": "./punchlets/convert_to_storm_fields_mapd.punch"
      },
      "storm_settings": {
        "executors": 1,
        "component": "fieldConverter",
        "subscribe": [
          {
            "component": "prtspout",
            "stream": "logs",
            "grouping": "localOrShuffle"
          }
        ],
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "activity",
              "altUpdate",
              "birthDate",
              "cycleStartTime",
              "lastActiveAt",
              "locLsu_bigAxisM",
              "locLsu_distLsu",
              "locLsu_locOneBounce_latitude",
              "locLsu_locOneBounce_longitude",
              "locLsu_locOneBounce_Point",
              "locLsu_quality",
              "locLsu_smallAxisM",
              "measure_azimuth",
              "measure_stdErr",
              "modulationEnum",
              "noteQualiteModulation",
              "paramFreq_bandwidth",
              "paramFreq_centerFreq",
              "sensorId",
              "suivi",
              "trackId"
            ]
          }
        ]
      }
    },
    {
      "type": "mapd_bolt",
      "bolt_settings": {
        "host": "localhost",
        "username": "mapd",
        "port": 9091,
        "password": "HyperInteractive",
        "database": "mytopology",
        "table": "prt_test6",
        "bulk_size": 3,
        "insert_type": "thrift_transport",
        "column_names": [
             "activity",
              "altUpdate",
              "birthDate",
              "cycleStartTime",
              "lastActiveAt",
              "locLsu_bigAxisM",
              "locLsu_distLsu",
              "locLsu_locOneBounce_latitude",
              "locLsu_locOneBounce_longitude",
              "locLsu_locOneBounce_Point",
              "locLsu_quality",
              "locLsu_smallAxisM",
              "measure_azimuth",
              "measure_stdErr",
              "modulationEnum",
              "noteQualiteModulation",
              "paramFreq_bandwidth",
              "paramFreq_centerFreq",
              "sensorId",
              "suivi",
              "trackId"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "mapd",
        "subscribe": [
          {
            "component": "fieldConverter",
            "stream": "logs"
          }
        ]
      }
    }
  ]
}

Configuration(s)

  • host: String

    Description: Database server URL.

  • port: Integer

    Description: Database server Port.

  • username: String

    Description: A user who has full access on the database and table on which we want to insert data.

  • password: String

    Description: Password of the corresponding username.

  • database: String

    Description: The name of the database you want to work on.

  • bulk_size: Integer

    Description: Takes an integer betweend 1 and 1000 (Note currently JDBC insertion does not support bulk_size > 1).

  • insert_type: String

    Description: Takes values jdbc: WIP or thrift_transport.

  • column_names: List of String

    Description: A list of the table column names. The names provided should be in the same order as the query statement used to create the table (ex: create table hello (value1 text, value2 text); -> ["value1", "value2"]).

Additional configurations might be added with time