Postgresql Output¶
The Postgresql Output takes in as parameter the table name on which you want to insert data and the required information to connect to the database where the table is found (host, database name, etc...).
This bolt will insert data by executing an SQL query that will be dynamically constructed based on the parameters you provided to this bolt in your topology.
Insertion of geospatial data by using Postgis geospatial functions is also supported.
An example can be found below:
Requirements
The database and the table on which you want to work on must already be created. You must have a user account with the necessary priviledges (read and write).
Please follow the version of your postgresql query syntax for the insertion geospatial objects.
Table Name
: mytopology_table
Database Name
: mytopology
SQL Query to create table
:
create table mytopology_table (text1 TEXT, text2 TEXT);
Output result (After inserting data):¶
text1 | text2 |
---|---|
this is a text 1 | this is a text 22 |
this is a text 3 | this is a text 44 |
this is a text 5 | this is a text 66 |
Run with punchlinectl¶
Simple example
{
"dag": [
{
"type": "generator_input",
"settings": {
"messages": [
{ "logs": { "text1": "this is a text 1", "text2": "this is a text22" }},
{ "logs": { "text1": "this is a text 3", "text2": "this is a text44" }},
{ "logs": { "text1": "this is a text 5", "text2": "this is a text66" }}
]
},
"storm_settings": {
"component": "generator"
}
}
],
"bolts": [
{
"type": "postgresql_bolt",
"settings": {
"hosts": ["localhost:5432"],
"username": "yuechun",
"password": "yuechun",
"database": "mytopology",
"table": "mytopology_table",
"bulk_size": 3,
"column_names": [
"text1:cast",
"text2:cast"
]
},
"storm_settings": {
"executors": 1,
"component": "postgres",
"subscribe": [
{
"component": "generator",
"stream": "logs",
"fields":
[
"text1",
"text2"
]
}
]
}
}
]
}
GeoSpatial topology example 1 (PostgreSQL only):
create table prt_test6 (
locLsu_locOneBounce_Point point,
suivi text );
Punchlet
// @test(encoding=json) {"logs":{"log":{"activity":-1,"altUpdate":false,"birthDate":1689,"cycleStartTime":16384,"lastActiveAt":20479,"locLsu":{"bigAxisM":644000,"distLsu":0,"locOneBounce":{"latitude":0.3355544357664034,"longitude":1.230478161628614},"quality":16,"smallAxisM":224769},"measure":{"azimuth":1.720853603154011,"stdErr":0.017453292519943295},"modulationEnum":"FSK_31","noteQualiteModulation":1,"paramFreq":{"bandwidth":1900,"centerFreq":1001000},"sensorId":2,"suivi":"UPDATE","trackId":20}}}
{
/* Convert a Map into a dotted flatMap, example:
{"a":{"b":"c"}} will become {"a_b":"c"}
In our case, we convert the input JSON into
proper Storm fields
*/
[logs] = toFlatTuple().nestedSeparator("_").on([logs][log]);
// check if fields exists
if ([logs][locLsu_locOneBounce_latitude] && [logs][locLsu_locOneBounce_longitude]) {
[logs][locLsu_locOneBounce_Point] = "(" + [logs][locLsu_locOneBounce_latitude] + ", " + [logs][locLsu_locOneBounce_longitude] + ")";
// else assign fake values to prevent null while inserting
} else {
[logs][locLsu_locOneBounce_Point] = "(0, 0)";
}
// for inserting polygon of 3 points
/*
Data should be stream in the format below:
[logs][poly] = "((1, 1), (2, 2), (3, 3))"
*/
//print([logs]);
}
Topology
{
"dag": [
{
"type": "file_spout",
"settings": {
"read_file_from_start": true,
"path": "./a_csv.csv"
},
"storm_settings": {
"component": "file_spout",
"publish": [
{"stream": "logs", "fields": ["log"]}
]
}
}
],
"bolts": [
{
"type": "punchlet_node",
"settings": {
"punchlet": "./punchlets/convert_to_storm_fields_postgres.punch"
},
"storm_settings": {
"executors": 1,
"component": "fieldConverter",
"subscribe": [
{
"component": "prtspout",
"stream": "logs",
"grouping": "localOrShuffle"
}
],
"publish": [
{
"stream": "logs",
"fields": [
"locLsu_locOneBounce_Point",
"suivi"
]
}
]
}
},
{
"type": "postgresql_bolt",
"settings": {
"hosts": ["localhost:5432"],
"username": "postgres",
"password": "postgres",
"database": "mytopology",
"table": "prt_test6",
"pool_name": "myPoolConnection",
"pool_size": 10,
"column_names": [
"locLsu_locOneBounce_Point:cast",
"suivi:cast"
]
},
"storm_settings": {
"executors": 1,
"component": "postgres",
"subscribe": [
{
"component": "fieldConverter",
"stream": "logs"
}
]
}
}
]
}
GeoSpatial topology example 2 (PostgreSQL and Postgis):
Below is an example of inserting data using the ST_GeomFromText
and POINT
functions provided by postgis. 4326 is the SRID being used for our geospatial reference.
// @test(encoding=json) {"logs":{"log":{"activity":-1,"altUpdate":false,"birthDate":1689,"cycleStartTime":16384,"lastActiveAt":20479,"locLsu":{"bigAxisM":644000,"distLsu":0,"locOneBounce":{"latitude":0.3355544357664034,"longitude":1.230478161628614},"quality":16,"smallAxisM":224769},"measure":{"azimuth":1.720853603154011,"stdErr":0.017453292519943295},"modulationEnum":"FSK_31","noteQualiteModulation":1,"paramFreq":{"bandwidth":1900,"centerFreq":1001000},"sensorId":2,"suivi":"UPDATE","trackId":20}}}
{
/* Convert a Map into a dotted flatMap, example:
{"a":{"b":"c"}} will become {"a_b":"c"}
In our case, we convert the input JSON into
proper Storm fields
*/
[logs] = toFlatTuple().nestedSeparator("_").on([logs][log]);
// Manage 'POINT' datatype
if ([logs][locLsu_locOneBounce_latitude] && [logs][locLsu_locOneBounce_longitude]) {
[logs][locLsu_locOneBounce_point] = "ST_GeomFromText('POINT(" + [logs][locLsu_locOneBounce_longitude] + " " + [logs][locLsu_locOneBounce_latitude] + ")', 4326)";
} else {
[logs][locLsu_locOneBounce_point] = "ST_GeomFromText('POINT(0 0)', 4326)";
}
[logs][date_detected] = date("yyyy-MM-dd' 'HH:mm:ss").get();
[logs][date_of_insertion] = "now()";
}
In your topology configuration, function
parameter is used for locLsu_locOneBounce_point
.
...
"column_names": [
"activity:cast",
"altUpdate:cast",
"birthDate:cast",
"cycleStartTime:cast",
"lastActiveAt:cast",
"locLsu_bigAxisM:cast",
"locLsu_distLsu:cast",
"locLsu_locOneBounce_latitude:cast",
"locLsu_locOneBounce_longitude:cast",
"locLsu_locOneBounce_point:function",
"locLsu_quality:cast",
"locLsu_smallAxisM:cast",
"measure_azimuth:cast",
"measure_stdErr:cast",
"modulationEnum:cast",
"noteQualiteModulation:cast",
"paramFreq_bandwidth:cast",
"paramFreq_centerFreq:cast",
"sensorId:cast",
"suivi:cast",
"trackId:cast",
"date_detected:cast",
"date_of_insertion:cast"
]
...
-
For more details about PostgreSQL native geometry syntax, refer to the link below
https://www.postgresql.org/docs/9.3/datatype-geometric.html
Configuration(s)¶
-
hosts
: StringDescription: Database server URLs.
-
username
: StringDescription: A user who has full access on the database and table on which we want to insert data.
-
password
: StringDescription: Password of the corresponding username.
-
database
: StringDescription: The name of the database you want to work on.
-
column_names
: List of StringDescription: A list of the table column names. Each values are delimited by
:
. On the left hand side, the name of the column and on the right hand side what should be done before inserting. i.e cast the data to the appropriate column type or use the given value as a function. Currently, two types are available:cast
andfunction
. The names provided should be in the same order as the query statement used to create the table (ex: create table hello (value1 text, value2 text); -> ["value1:cast", "value2:function"]). -
pool_name
: StringDescription: The name of the connection pooling to assign.
-
pool_size
: IntegerDescription: The number of concurrent connection allowed on the database table.
Additional configurations will be added with time