diff --git a/Makefile.PL b/Makefile.PL index cef356a..0959a7e 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -20,6 +20,18 @@ WriteMakefile( PREREQ_PM => { #'ABC' => 1.6, #'Foo::Bar::Module' => 5.0401, + 'Attribute::Handlers'=>0, + 'Clone'=>0, + 'InfluxDB::LineProtocol'=>0, + 'List::MoreUtils'=>0, + 'JSON::MaybeXS'=>0, + 'LWP::Protocol::http'=>0, + 'LWP::Protocol::https'=>0, + 'LWP::UserAgent'=>0, + 'Object::Result'=>0, + 'Sub::Delete'=>0, + 'Switch'=>0, + 'URI'=>0 }, dist => { COMPRESS => 'gzip -9f', SUFFIX => 'gz', }, clean => { FILES => 'InfluxDB-HTTP-*' }, diff --git a/README.pod b/README.pod index 0daf797..3b8171f 100644 --- a/README.pod +++ b/README.pod @@ -41,14 +41,15 @@ attribute C: Furthermore, all result objects provide access to the C object that is returned by InfluxDB in the attribute C. -=head2 new host => 'localhost', port => 8086, timeout => 600 +=head2 new(host=>'localhost', port=>8086, username=>'zutana', password=>'se¢retØ', ssl=>1, ssl_verify=>0, ssl_opts=>{}, timeout=>600) -Passing C, C and/or C is optional, defaulting to the InfluxDB defaults or -to 3 minutes for the timeout. The timeout is in seconds. +Passing C, C, C, C, C, C, C, and/or +C is optional, defaulting to the InfluxDB defaults or to 3 minutes for the timeout. +The timeout is in seconds. Returns an instance of InfluxDB::HTTP. -=head2 ping +=head2 ping() Pings the InfluxDB instance configured in the constructor (i.e. by C and C). @@ -62,7 +63,7 @@ is part of the HTTP response from the pinged InfluxDB instance. my $ping = $influx->ping(); print $ping->version if ($ping); -=head2 query query, database => "DATABASE", chunk_size => CHUNK_SIZE, epoch => "ns" +=head2 query(query, database=>"DATABASE", chunk_size=>CHUNK_SIZE, epoch=>"ns") Used to query the InfluxDB instance. All parameters but the first one are optional. The C parameter can either be a String or a Perl ArrayRef of Strings, where every String @@ -74,21 +75,26 @@ hash. Additionally the attribute C provides the request identifier a the HTTP reponse headers by InfluxDB. This can for example be useful for correlating requests with log files. -=head2 write measurement, database => "DATABASE", precision => "ns", retention_policy => "RP" +=head2 write(data, database=>"DATABASE", precision=>"ns", retention_policy=>"RP", measurement=>"foo", tag_fields=>("tag1", ...)) -Writes data into InfluxDB. The parameter C can either be a String or an -ArrayRef of Strings, where each String contains one valid InfluxDB LineProtocol -statement. All of those mesaurements are then sent to InfluxDB and the specified -database. The returned object evaluates to true if the write was successful, and otherwise -to false. +Writes data into InfluxDB. The 1ˢᵗ parameter C can either be a String or an +ArrayRef of Strings or Hashes, where each String contains one valid InfluxDB LineProtocol +statement and each Hash represents a point. All of those mesaurements are then sent to +InfluxDB and the specified database. The returned object evaluates to true if the +write was successful, and otherwise to false. -The optional argument precision can be given if a precsion different than "ns" is used in +The optional argument "C" can be given if a precsion different than "ns" is used in the line protocol. InfluxDB docs suggest that using a coarser precision than ns can save space and processing. In many cases "s" or "m" might do. -The optional argument retention_policy can be used to specify a retention policy other than +The optional argument "C" can be used to specify a retention policy other than the default retention policy of the selected database. +The optional argument "C" is required if the 1ˢᵗ parameter C is an ArrayRef +and contains elements other than Strings. + +The optional argument "C" specifies which field(s) will be stored as tag(s). + =head2 get_lwp_useragent Returns the internally used LWP::UserAgent instance for possible modifications @@ -96,7 +102,7 @@ Returns the internally used LWP::UserAgent instance for possible modifications =head1 AUTHOR -Raphael Seebacher, C<< >> +Raphael Seebacher, C<< >>, modificada por sam80180 C<< >> =head1 BUGS diff --git a/lib/InfluxDB/HTTP.pm b/lib/InfluxDB/HTTP.pm index d3689b3..e66154b 100644 --- a/lib/InfluxDB/HTTP.pm +++ b/lib/InfluxDB/HTTP.pm @@ -1,18 +1,45 @@ package InfluxDB::HTTP; use strict; +use utf8; use warnings; require Exporter; our @ISA = qw(Exporter); -our @EXPORT_OK = (); +my %constants; +BEGIN { # http://www.perlmonks.org/?node_id=1072731 + %constants = ( + PRECISION_NANOSECONDS => 'ns', + PRECISION_MICROSECONDS_U => "µ", + PRECISION_MICROSECONDS => 'u', + PRECISION_MILLISECONDS => 'ms', + PRECISION_SECONDS => 's', + PRECISION_MINUTES => 'm', + PRECISION_HOURS => 'h', + PRECISION_RFC3339 => "rfc3339", + + ENDPOINT_QUERY => "query", + ENDPOINT_WRITE => "write", + ENDPOINT_PING => "ping" + ); +} # end BEGIN +use constant \%constants; +our @EXPORT_OK = (keys(%constants), parseResults, toValidQueryPrecision, validatePrecision); our @EXPORT = (); +use Attribute::Handlers; +use Clone 'clone'; +use InfluxDB::LineProtocol qw(); +use List::MoreUtils qw(); use JSON::MaybeXS; +use LWP::Protocol::http; +use LWP::Protocol::https; use LWP::UserAgent; use Object::Result; +use Sub::Delete; +use Switch; use URI; our $VERSION = '0.04'; @@ -22,19 +49,39 @@ sub new { my %args = ( host => 'localhost', port => 8086, + username=>undef, + password=>undef, + ssl=>1, + ssl_verify=>0, + ssl_opts=>{}, timeout => 180, + _prev_write_precision=>undef, # no va a cargar 'InfluxDB::LineProtocol::data2line' tan frecuentemente @_, ); - my ($host, $port, $timeout) = @args{'host', 'port', 'timeout'}; + my ($host, $port, $username, $password, $ssl, $ssl_verify, $ssl_opts, $timeout) = @args{'host', 'port', "username", "password", "ssl", "ssl_verify", "ssl_opts", 'timeout'}; my $self = { host => $host, port => $port, + username=>$username, + password=>$password, + ssl=>$ssl, + ssl_verify=>$ssl_verify, + ssl_opts=>$ssl_opts }; my $ua= LWP::UserAgent->new(); $ua->agent("InfluxDB-HTTP/$VERSION"); $ua->timeout($timeout); + if ($ssl) { + my $opts0 = {}; + if (!$ssl_verify) { + $opts0 = {verify_hostname=>0 ,SSL_verify_mode=>0x00}; # https://stackoverflow.com/a/338550 + } # fin if + my $opts = ({%$opts0, %$ssl_opts}, ); + $self->{ssl_opts} = $opts; + $ua->ssl_opts(%$opts); + } # fin if $self->{lwp_user_agent} = $ua; bless $self, $class; @@ -49,8 +96,12 @@ sub get_lwp_useragent { sub ping { my ($self) = @_; - my $uri = $self->_get_influxdb_http_api_uri('ping'); - my $response = $self->{lwp_user_agent}->head($uri->canonical()); + my $uri = $self->_get_influxdb_http_api_uri(ENDPOINT_PING); + $uri->query_form( + ($self->{username} ? ('u'=>$self->{username}) : ()), + ($self->{password} ? ('p'=>$self->{password}) : ()) + ); + my $response = $self->get_lwp_useragent()->head($uri->canonical()); if (! $response->is_success()) { my $error = $response->message(); @@ -59,7 +110,7 @@ sub ping { error { return $error; } { return "Error pinging InfluxDB: $error"; } { return; } - } + }; } my $version = $response->header('X-Influxdb-Version'); @@ -68,32 +119,35 @@ sub ping { version { return $version; } { return "Ping successful: InfluxDB version $version"; } { return 1; } - } + }; } sub query { my $self = shift; my $query = shift; - my %args = (epoch => 'ns', @_); - my ($database, $chunk_size, $epoch) = @args{'database', 'chunk_size', 'epoch'}; + my $argEpoch = "epoch"; + my %args = ($argEpoch => PRECISION_RFC3339, @_); + my ($database, $chunk_size, $epoch) = @args{'database', 'chunk_size', $argEpoch}; - die "Missing argument 'query'" if !$query; - die "Argument epoch '$epoch' is not one of (h,m,s,ms,u,ns)" if $epoch !~ /^(h|m|s|ms|u|ns)$/; + die("Missing 1st argument 'query'.") if (!$query); + die(sprintf("Argument ${epoch} '%s' is not one of (%s, %s, %s, %s, %s, %s, %s, %s).", $epoch, PRECISION_HOURS, PRECISION_MINUTES, PRECISION_SECONDS, PRECISION_MILLISECONDS, PRECISION_MICROSECONDS, PRECISION_MICROSECONDS_U, PRECISION_NANOSECONDS, PRECISION_RFC3339)) if (!&validatePrecision($epoch)); if (ref($query) eq 'ARRAY') { $query = join(';', @$query); } - my $uri = $self->_get_influxdb_http_api_uri('query'); - + my $uri = $self->_get_influxdb_http_api_uri(ENDPOINT_QUERY); + my $param_epoch = &toValidQueryPrecision($epoch, ENDPOINT_QUERY); $uri->query_form( q => $query, ($database ? (db => $database) : ()), + ($self->{username} ? ('u'=>$self->{username}) : ()), + ($self->{password} ? ('p'=>$self->{password}) : ()), ($chunk_size ? (chunk_size => $chunk_size) : ()), - ($epoch ? (epoch => $epoch) : ()) + ($param_epoch ? (epoch => $param_epoch) : ()) ); - my $response = $self->{lwp_user_agent}->post($uri->canonical()); + my $response = $self->get_lwp_useragent()->post($uri->canonical()); chomp(my $content = $response->content()); @@ -115,7 +169,7 @@ sub query { request_id { return $response->header('Request-Id'); } { return "Returned data: $content"; } { return 1; } - } + }; } } else { @@ -127,32 +181,68 @@ sub query { error { return $error; } { return "Error executing query: $error"; } { return; } - } + }; } sub write { my $self = shift; - my $measurement = shift; - my %args = @_; - my ($database, $precision, $retention_policy) = @args{'database', 'precision', 'retention_policy'}; - - die "Missing argument 'measurement'" if !$measurement; - die "Missing argument 'database'" if !$database; - die "Argument precision '$precision' is set and not one of (h,m,s,ms,u,ns)" if $precision && $precision !~ /^(h|m|s|ms|u|ns)$/; - - if (ref($measurement) eq 'ARRAY') { - $measurement = join("\n", @$measurement); + my $points = shift; + my $argDatabase = 'database'; + my $argMeasurement = "measurement"; + my $argPrecision = 'precision'; + my $argRetentionPolicy = "retention_policy"; + my $argTagCols = "tag_fields"; + my %args = ( + $argDatabase=>undef, + $argMeasurement=>undef, + $argPrecision=>PRECISION_NANOSECONDS, + $argRetentionPolicy=>undef, + $argTagCols=>undef, + @_ + ); + my ($database, $measurement, $precision, $retention_policy, $tagCols) = @args{$argDatabase, $argMeasurement, $argPrecision, $argRetentionPolicy, $argTagCols}; + + die("Missing argument 1st argument (points|line protocols).") if (!$points); + die("Missing argument '${argDatabase}'.") if (!$database); + die(sprintf("Argument ${argPrecision} '%s' is not one of (%s, %s, %s, %s, %s, %s, %s, %s).", $precision, PRECISION_HOURS, PRECISION_MINUTES, PRECISION_SECONDS, PRECISION_MILLISECONDS, PRECISION_MICROSECONDS, PRECISION_MICROSECONDS_U, PRECISION_NANOSECONDS, PRECISION_RFC3339)) if (defined($precision) && !&validatePrecision($precision)); + + my $strLineProtocols = $points; + my $param_precision = &toValidQueryPrecision($precision, ENDPOINT_WRITE); + if (ref($points) eq 'ARRAY') { + if (!defined($self->{_prev_write_precision}) || $self->{_prev_write_precision} ne $precision) { + delete_sub "get_ts" if (exists(&get_ts)); + $self->{_prev_write_precision} = $precision; + InfluxDB::LineProtocol->import(("data2line", "precision=".($param_precision eq PRECISION_MICROSECONDS ? "us" : $param_precision))); + } # fin if + $strLineProtocols = join("\n", (map { + my $isString = ($_ & ~$_); # https://www.perlmonks.org/?node_id=791677 + if (!$isString && !defined($measurement)) { die("Parámetro «${argMeasurement}» no encontrado."); } # fin if + my $copied = $_; + my $tags = {}; + if (defined($tagCols) && ref($tagCols) eq 'ARRAY') { + $copied = clone($_); + my @aTagCols = @$tagCols; + for my $i (0 .. $#aTagCols) { + if (!defined($copied->{$aTagCols[$i]})) { next; } # fin if + $tags->{$aTagCols[$i]} = $copied->{$aTagCols[$i]}; + delete $copied->{$aTagCols[$i]}; + } # fin for + } # fin if + ($isString ? $_ : data2line($measurement, $copied, $tags)); + } @$points)); } - my $uri = $self->_get_influxdb_http_api_uri('write'); + my $uri = $self->_get_influxdb_http_api_uri(ENDPOINT_WRITE); $uri->query_form( db => $database, - ($precision ? (precision => $precision) : ()), + ($self->{username} ? ('u'=>$self->{username}) : ()), + ($self->{password} ? ('p'=>$self->{password}) : ()), + ($param_precision ? (precision => $param_precision) : ()), ($retention_policy ? (rp => $retention_policy) : ()) ); - my $response = $self->{lwp_user_agent}->post($uri->canonical(), Content => $measurement); + my $response = $self->get_lwp_useragent()->post($uri->canonical(), Content => $strLineProtocols); chomp(my $content = $response->content()); @@ -184,7 +274,7 @@ sub _get_influxdb_http_api_uri { my $uri = URI->new(); - $uri->scheme('http'); + $uri->scheme(($self->{ssl} ? LWP::Protocol::https::socket_type() : LWP::Protocol::http::socket_type())); $uri->host($self->{host}); $uri->port($self->{port}); $uri->path($endpoint); @@ -192,6 +282,69 @@ sub _get_influxdb_http_api_uri { return $uri; } +sub UNIVERSAL::MétodoEstático :ATTR(CODE) { # https://metacpan.org/release/Attribute-Static/source/lib/Attribute/Static.pm + my ($package, $symbol, $referent, $attr, $data, $phase) = @_; + my $meth = *{$symbol}{NAME}; + no warnings 'redefine'; + *{$symbol} = sub { + my $check_invocant = sub { # https://stackoverflow.com/a/35283547 + my $thing = shift; + my $caller = caller(); + return (!defined($thing) || !ref($thing) || !blessed($thing) || !$thing->isa($caller) ? 0 : $thing); + }; + if ($check_invocant->($_[0])) { shift @_; } # fin if + goto &$referent; + }; +} # fin UNIVERSAL::MétodoEstático() + +sub validatePrecision :MétodoEstático { + my ($strPrecision) = @_; + if (!defined($strPrecision)) { return 0; } # fin if + my $pattern = sprintf("^(%s|%s|%s|%s|%s|%s|%s|%s)\$", PRECISION_HOURS, PRECISION_MINUTES, PRECISION_SECONDS, PRECISION_MILLISECONDS, PRECISION_MICROSECONDS, PRECISION_MICROSECONDS_U, PRECISION_NANOSECONDS, PRECISION_RFC3339); + return ($strPrecision !~ /$pattern/ ? 0 : 1); +} # fin validatePrecision() + +sub toValidQueryPrecision :MétodoEstático { + my ($strPrecision, $endpoint) = @_; + $endpoint //= ENDPOINT_QUERY; + switch ($endpoint) { + case ENDPOINT_QUERY { + switch ($strPrecision) { + case PRECISION_RFC3339 { return undef; } # fin case + case PRECISION_MICROSECONDS_U { return PRECISION_MICROSECONDS; } # fin case + } # fin switch + } # fin case + case ENDPOINT_WRITE { + switch ($strPrecision) { + case PRECISION_RFC3339 { return PRECISION_NANOSECONDS; } # fin case + case PRECISION_MICROSECONDS_U { return PRECISION_MICROSECONDS; } # fin case + } # fin switch + } # fin case + } # fin switch + return $strPrecision; +} # fin toValidQueryPrecision() + +sub parseResults :MétodoEstático { # https://github.com/ajgb/anyevent-influxdb/blob/master/lib/AnyEvent/InfluxDB.pm#L591 + my ($data) = @_; + return [ + map { + my $res = $_; + my $cols = $res->{columns}; + my $values = $res->{values}; + +{ + name => $res->{name}, + values => [ + map { + +{ + List::MoreUtils::zip(@$cols, @$_) + }; + } @{$values || []} + ] + }; + } @{$data->[0]->{series} || []} + ]; +} # fin parseResults() + 1; __END__ @@ -239,14 +392,15 @@ attribute C: Furthermore, all result objects provide access to the C object that is returned by InfluxDB in the attribute C. -=head2 new host => 'localhost', port => 8086, timeout => 600 +=head2 new(host=>'localhost', port=>8086, username=>'zutana', password=>'se¢retØ', ssl=>1, ssl_verify=>0, ssl_opts=>{}, timeout=>600) -Passing C, C and/or C is optional, defaulting to the InfluxDB defaults or -to 3 minutes for the timeout. The timeout is in seconds. +Passing C, C, C, C, C, C, C, and/or +C is optional, defaulting to the InfluxDB defaults or to 3 minutes for the timeout. +The timeout is in seconds. Returns an instance of InfluxDB::HTTP. -=head2 ping +=head2 ping() Pings the InfluxDB instance configured in the constructor (i.e. by C and C). @@ -260,7 +414,7 @@ is part of the HTTP response from the pinged InfluxDB instance. my $ping = $influx->ping(); print $ping->version if ($ping); -=head2 query query, database => "DATABASE", chunk_size => CHUNK_SIZE, epoch => "ns" +=head2 query(query, database=>"DATABASE", chunk_size=>CHUNK_SIZE, epoch=>"ns") Used to query the InfluxDB instance. All parameters but the first one are optional. The C parameter can either be a String or a Perl ArrayRef of Strings, where every String @@ -272,29 +426,34 @@ hash. Additionally the attribute C provides the request identifier a the HTTP reponse headers by InfluxDB. This can for example be useful for correlating requests with log files. -=head2 write measurement, database => "DATABASE", precision => "ns", retention_policy => "RP" +=head2 write(data, database=>"DATABASE", precision=>"ns", retention_policy=>"RP", measurement=>"foo", tag_fields=>("tag1", ...)) -Writes data into InfluxDB. The parameter C can either be a String or an -ArrayRef of Strings, where each String contains one valid InfluxDB LineProtocol -statement. All of those mesaurements are then sent to InfluxDB and the specified -database. The returned object evaluates to true if the write was successful, and otherwise -to false. +Writes data into InfluxDB. The 1ˢᵗ parameter C can either be a String or an +ArrayRef of Strings or Hashes, where each String contains one valid InfluxDB LineProtocol +statement and each Hash represents a point. All of those mesaurements are then sent to +InfluxDB and the specified database. The returned object evaluates to true if the +write was successful, and otherwise to false. -The optional argument precision can be given if a precsion different than "ns" is used in +The optional argument "C" can be given if a precsion different than "ns" is used in the line protocol. InfluxDB docs suggest that using a coarser precision than ns can save space and processing. In many cases "s" or "m" might do. -The optional argument retention_policy can be used to specify a retention policy other than +The optional argument "C" can be used to specify a retention policy other than the default retention policy of the selected database. -=head2 get_lwp_useragent +The optional argument "C" is required if the 1ˢᵗ parameter C is an ArrayRef +and contains elements other than Strings. + +The optional argument "C" specifies which field(s) will be stored as tag(s). + +=head2 get_lwp_useragent() Returns the internally used LWP::UserAgent instance for possible modifications (e.g. to configure an HTTP proxy). =head1 AUTHOR -Raphael Seebacher, C<< >> +Raphael Seebacher, C<< >>, modificada por sam80180 C<< >> =head1 BUGS