Omnisci Output¶
The Omnisci Output takes in as parameter the table name on which you want to insert data and the required information to connect to the database where the table is found (host, database name, etc...).
This bolt will insert data by executing an SQL query that will be dynamically constructed based on the parameters you provided to this bolt in your topology.
An example can be found be found below:
Requirements
The database and the table on which you want to work on must already be created. You must have a user account with the necessary priviledges (read and write).
Please follow the version of your mapd query syntax for the insertion geospatial objects.
An example can be found be found below:¶
Table Name
: mytopology_table
Database Name
: mytopology
SQL Query to create table
:
create table flights(arr_timestamp timestamp, dep_timestamp timestamp, uniquecarrier varchar(50));
Output result (After inserting data):¶
arr_timestamp | dep_timestamp | uniquecarrier |
---|---|---|
2017-04-23 06:30:00 | 2017-04-23 06:30:00 | test |
2017-04-23 06:30:00 | 2017-04-23 06:30:00 | test |
2017-04-23 06:30:00 | 2017-04-23 06:30:00 | test |
Run with punchlinectl¶
{
"dag": [
{
"type": "generator_input",
"component": "generator",
"settings": {
"messages": [
{ "logs": { "dep_timestamp": "2017-04-23 06:30:00", "uniquecarrier": "test", "arr_timestamp": "2017-04-23 06:30:00" }},
{ "logs": { "uniquecarrier": "test", "arr_timestamp": "2017-04-23 06:30:00", "dep_timestamp": "2017-04-23 06:30:00" }},
{ "logs": { "arr_timestamp": "2017-04-23 06:30:00", "dep_timestamp": "2017-04-23 06:30:00", "uniquecarrier": "test" }}
]
}
},
{
"type": "omnisci_output",
"component": "mapd",
"settings": {
"hosts": ["localhost:9091"],
"username": "mapd",
"password": "HyperInteractive",
"database": "mapd",
"table": "flights",
"bulk_size": 1,
"insert_type": "thrift_transport",
"column_names": [
"arr_timestamp",
"dep_timestamp",
"uniquecarrier"
]
},
"subscribe": [
{
"component": "generator",
"stream": "logs",
"fields":
[
"arr_timestamp",
"dep_timestamp",
"uniquecarrier"
]
}
]
}
]
}
GeoSpatial topology example:
create table prt_test6 (
activity real,
altUpdate text,
birthDate real,
cycleStartTime real,
lastActiveAt real,
locLsu_bigAxisM real,
locLsu_distLsu real,
locLsu_locOneBounce_latitude float,
locLsu_locOneBounce_longitude float,
locLsu_locOneBounce_Point point,
locLsu_quality real,
locLsu_smallAxisM real,
measure_azimuth float,
measure_stdErr float,
modulationEnum text,
noteQualiteModulation real,
paramFreq_bandwidth real,
paramFreq_centerFreq real,
sensorId real,
suivi text,
trackId real );
Punchlet
// @test(encoding=json) {"logs":{"log":{"activity":-1,"altUpdate":false,"birthDate":1689,"cycleStartTime":16384,"lastActiveAt":20479,"locLsu":{"bigAxisM":644000,"distLsu":0,"locOneBounce":{"latitude":0.3355544357664034,"longitude":1.230478161628614},"quality":16,"smallAxisM":224769},"measure":{"azimuth":1.720853603154011,"stdErr":0.017453292519943295},"modulationEnum":"FSK_31","noteQualiteModulation":1,"paramFreq":{"bandwidth":1900,"centerFreq":1001000},"sensorId":2,"suivi":"UPDATE","trackId":20}}}
{
/* Convert a Map into a dotted flatMap, example:
{"a":{"b":"c"}} will become {"a_b":"c"}
In our case, we convert the input JSON into
proper Storm fields
*/
[logs] = toFlatTuple().nestedSeparator("_").on([logs][log]);
// check if fields exists
if ([logs][locLsu_locOneBounce_latitude] && [logs][locLsu_locOneBounce_longitude]) {
[logs][locLsu_locOneBounce_Point] = "POINT(" + [logs][locLsu_locOneBounce_latitude] + " " + [logs][locLsu_locOneBounce_longitude] + ")";
// else assign fake values to prevent null while inserting
} else {
[logs][lat_lon] = "POINT(0 0)";
}
// for inserting polygon of 3 points
/*
Data should be stream in the format below:
[logs][poly] = "POLYGON((1 2,3 4,5 6,7 8,9 10))"
*/
//print([logs]);
}
Topology
{
"dag": [
{
"component": "file_input",
"type": "file_input",
"settings": {
"read_file_from_start": true,
"path": "./a_csv.csv"
},
"publish": [
{"stream": "logs", "fields": ["log"]}
]
},
{
"component": "fieldConverter",
"type": "punchlet_node",
"settings": {
"punchlet": "./punchlets/convert_to_storm_fields_mapd.punch"
},
"subscribe": [
{
"component": "prtspout",
"stream": "logs",
"grouping": "localOrShuffle"
}
],
"publish": [
{
"stream": "logs",
"fields": [
"activity",
"altUpdate",
"birthDate",
"cycleStartTime",
"lastActiveAt",
"locLsu_bigAxisM",
"locLsu_distLsu",
"locLsu_locOneBounce_latitude",
"locLsu_locOneBounce_longitude",
"locLsu_locOneBounce_Point",
"locLsu_quality",
"locLsu_smallAxisM",
"measure_azimuth",
"measure_stdErr",
"modulationEnum",
"noteQualiteModulation",
"paramFreq_bandwidth",
"paramFreq_centerFreq",
"sensorId",
"suivi",
"trackId"
]
}
]
},
{
"component": "mapd",
"type": "omnisci_output",
"settings": {
"hosts": ["localhost:9091"],
"username": "mapd",
"password": "HyperInteractive",
"database": "mytopology",
"table": "prt_test6",
"bulk_size": 3,
"insert_type": "thrift_transport",
"column_names": [
"activity",
"altUpdate",
"birthDate",
"cycleStartTime",
"lastActiveAt",
"locLsu_bigAxisM",
"locLsu_distLsu",
"locLsu_locOneBounce_latitude",
"locLsu_locOneBounce_longitude",
"locLsu_locOneBounce_Point",
"locLsu_quality",
"locLsu_smallAxisM",
"measure_azimuth",
"measure_stdErr",
"modulationEnum",
"noteQualiteModulation",
"paramFreq_bandwidth",
"paramFreq_centerFreq",
"sensorId",
"suivi",
"trackId"
]
},
"subscribe": [
{
"component": "fieldConverter",
"stream": "logs"
}
]
}
]
}
Configuration(s)¶
-
hosts
: StringDescription: Database server URLs.
-
port
: IntegerDescription: Database server Port.
-
username
: StringDescription: A user who has full access on the database and table on which we want to insert data.
-
password
: StringDescription: Password of the corresponding username.
-
database
: StringDescription: The name of the database you want to work on.
-
bulk_size
: IntegerDescription: Takes an integer between 1 and 1000 (Note currently JDBC insertion does not support bulk_size > 1).
-
insert_type
: StringDescription: Takes values
jdbc
:WIP
orthrift_transport
. -
column_names
: List of StringDescription: A list of the table column names. The names provided should be in the same order as the query statement used to create the table (ex: create table hello (value1 text, value2 text); -> ["value1", "value2"]).
Additional configurations might be added with time