详细介绍如何使用Java语言对天翼云时序数据库Influx版实例进行连接指引。
构建要求
Java SDK工具包需在Java 1.8+环境下运行。
获取并装载SDK
天翼云时序数据库Influx版实例用户可以在【实例信息-文档下载】页下载SDK包。并将下载的.jar包依赖引入到项目工程中。
使用SDK创建连接
SDK采用HTTP API方式连接。为方便用户操作,天翼云为提供SDK替代原生的HTTP API。
InfluxDBClient是操作InfluxDB的客户端操作类,使用InfluxDB的SDK前首先需要创建InfluxDBClient的实例对象,可以通过InfluxDBClientFactory工厂类里面提供的8种方法来创建InfluxDBClient实例对象。
其中,8种创建InfluxDBClient实例对象的方法可以概括为三类:
- 第一类是方法中通过传递不同的配置参数来创建InfluxDBClient实例对象。
- 第二类是通过InfluxDBClientOptions配置类来创建InfluxDBClient实例对象。
- 第三类是方法中不传递任何参数而是通过读取根目录下的influx2.properties配置文件中的配置项来创建InfluxDBClient实例对象。
注意InfluxDBClient实例对象使用完成之后记得调用influxDBClient.close()方法进行及时关闭。
第一类示例代码
方法1 : create(String url, char[] token, String org, String bucket)
InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8383", "my-token".toCharArray(), "test", "myBucket");
方法2 : create(String url, char[] token, String org)
InfluxDBClient client = InfluxDBClientFactory.create"http://localhost:8383", "test");
方法3 : create(String url, char[] token)
InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8383", "my-token".toCharArray());
方法4: create(String url, String username, char[] password)
InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8383", "my-username", "my-password".toCharArray());
方法5: create(String connectionString)
//可以使用连接字符串构建客户端,该连接字符串包含URL地址及相关参数。
InfluxDBClient client = InfluxDBClientFactory.create("http://localhost:8383?readTimeout=1000&writeTimeout=3000&connectTimeout=2000&logLevel=HEADERS&token=my-token&bucket=my-bucket&org=test");
其中,URL中参数支持以下选项:
属性名 | 默认值 | 描述说明 |
---|---|---|
org | - | 写入和查询的默认目标组织 |
bucket | - | 写入的默认目标存储桶 |
token | - | 用于授权的令牌 |
logLevel | NONE | 日志级别 |
readTimeout | 10000ms | 读取超时时间 |
writeTimeout | 10000ms | 写超时时间 |
connectTimeout | 10000ms | 套接字超时时间 |
第二类示例代码
首先创建InfluxDBClientOptions对象,然后放入InfluxDBClientFactory类的create方法中即可创建InfluxDBClient实例对象。
//创建配置对象
InfluxDBClientOptions options = InfluxDBClientOptions.builder().url(url).org("-").authenticateToken("my-token".toCharArray()).bucket("my-bucket").build();
//创建InfluxDBClient的实例对象
InfluxDBClient client = InfluxDBClientFactory.create(options);
第三类示例代码
不传递任何参数而是通过读取根目录下的influx2.properties配置文件中的配置项来创建InfluxDBClient实例对象。
InfluxDBClient client = InfluxDBClientFactory.create();
行协议基本概念
行协议是一种纯文本数据,只要数据符合行协议格式,就可以使用Java SDK或前往控制台页面写入。
行协议的格式如下:
<measurement>[,<tag_key1>=<tag_value1>[,<tag_key2>=<tag_value2>]] <field_key1>=<field_value1>[,<field_key2>=<field_value2>] [<timestamp>]
行协议遵循以下要点:
- 一行数据中,最多包含4部分数据,分别为mesurement、tags、fields、timestamp。
- tags和timestamp可选。
- 仅可存在一个mesurement,可以存在多个tags和fields。
- mesurement和tags之间以英文逗号,分隔,tag和tag之间也以英文逗号,分隔。
- mesurement/tags和fields之间以英文空格符 分隔,fields和timestamp之间以英文空格符 分隔,但field和field之间英文逗号,分隔。
示例:
# 不带tag
myMeasurement myField=value 1690279633098000000
# 带tag
myMeasurement,tag=tag_value myField1=value1,myField2=value2 1690279633098000000
注意: 行协议格式数据采用\n分隔符来分割一行数据。
行协议组成元素
上文提到,一条行协议数据由4个部分组成,接下来详细介绍各部分元素。
Measurement
(必需)将要往Bucket内写入的数据表名。表名区分大小写。
Tag集
(可选)Tag是Influx中的一个概念,用于区分一个表里面的不同时间线数据。具体来说,要插入的点对应的field_name和Tag共同构成了一条时间线。
Field集
(必需)以键值对形式表示的点数。紧随在所有的点数都应至少包含一个Field键值对。
时间戳
(可选)表示要写入点数对应的时间戳,默认情况下,该值为纳秒级。若要写入其他精度的时间戳,需要指定其精度。若未指定时间戳,则取当前本地系统时间为值。
异步非阻塞写入
异步非阻塞写入数据方式需要创建WriteApi对象,支持使用行协议Line Protocol、数据点Data Point和对象POJO写入数据,支持批量写入数据。
对象写入
可以通过POJO对象的形式异步非阻塞写入数据到特定的Bucket。
示例代码:
//定义POJO对象类
@Measurement(name = "temperature")
public static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
//配置客户端influxDBClient
//写入数据
try (WriteApi writeApi = influxDBClient.getWriteApi()){
//构建POJO对象
Temperature temperature = new Temperature();
temperature.location = "south";
temperature.value = 62D;
temperature.time = Instant.now();
//写入对象数据
writeApi.writeMeasurement(WritePrecision.NS, temperature);
}
//关闭客户端
influxDBClient.close();
数据点写入
可以通过Data Point数据点的形式异步非阻塞写入数据到特定的Bucket。
示例代码:
//配置客户端
influxDBClient
//写入数据
try (WriteApi writeApi = influxDBClient.getWriteApi()){
//构建数据点Point
Point point = Point.measurement("temperature")
.addTag("location","west")
.addField("value",55D)
.time(Instant.now().toEpochMilli(),WritePrecision.MS);
//写入Point数据
writeApi.writePoint(point);
}
//关闭客户端
influxDBClient.close();
行协议写入
可以通过Line Protocol行协议的形式异步非阻塞写入数据到特定的Bucket。
示例代码:
//配置客户端
influxDBClient
//写入数据
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
//定义record
String record = "temperature,location=north value=60.0";
//写入行数据
writeApi.writeRecord(WritePrecision.NS,record);
}
//关闭客户端
influxDBClient.close();
批量写入
通过构建WriteOptions配置对象并将其作为参数传入客户端创建WriteApi对象的方法中,从而实现对数据批量写入的支持。
示例代码:
//配置客户端influxDBClient
//构建WriteOptions配置对象,参数根据实际需要灵活调整。
WriteOptions writeOptions = WriteOptions.builder()
.batchSize(10_000)
.bufferLimit(500)
.flushInterval(500)
.jitterInterval(1_000)
.retryInterval(2_000)
.maxRetries(5)
.maxRetryDelay(250_123)
.exponentialBase(2)
.writeScheduler(Schedulers.computation())
.backpressureStrategy(BackpressureOverflowStrategy.ERROR).build();
//创建WriteApi 对象
WriteApi api = influxDBClient.getWriteApi(writeOptions );
//写入数据
……
//关闭客户端
influxDBClient.close();
同步阻塞写入
同步阻塞写入数据方式需要创建WriteApiBlocking对象,支持使用行协议Line Protocol、数据点Data Point和对象POJO写入数据。
对象写入
可以通过POJO对象的形式同步阻塞写入数据到特定的Bucket。
示例代码:
//定义POJO对象类
@Measurement(name = "temperature")
public static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
//配置客户端influxDBClient
//创建WriteApiBlocking对象
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
//写入数据
try{
//构建POJO对象
Temperature temperature = new Temperature();
temperature.location = "south";
temperature.value = 62D;
temperature.time = Instant.now();
//写入对象数据
writeApi.writeMeasurement(WritePrecision.NS, temperature);
}
//关闭客户端
influxDBClient.close();
数据点写入
可以通过Data Point数据点的形式同步阻塞写入数据到特定的Bucket。
示例代码:
//配置客户端influxDBClient
//创建WriteApiBlocking对象
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
//写入数据
try{
//构建数据点Point
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55D)
.time(Instant.now().toEpochMilli(),WritePrecision.MS);
//写入Point数据
writeApi.writePoint(point);
}
//关闭客户端
influxDBClient.close();
行协议写入
可以通过Line Protocol行协议的形式同步阻塞写入数据到特定的Bucket。
示例代码:
//配置客户端influxDBClient
//创建WriteApiBlocking对象
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
//写入数据
try{
//定义record
String record = "temperature,location=north value=60.0";
//写入行数据
writeApi.writeRecord(WritePrecision.NS,record);
}
//关闭客户端
influxDBClient.close();
默认标签值使用
有时候在每一个测量值中都需保存部分相同的信息,如hostname、location等,这时可以通过静态值、系统变量或环境变量的属性配置来设置默认的标签值,其配置的表达式形式如下:静态值:China;系统变量: {version};环境变量: {env.hostname}。
针对示例行协议内容:mine-sensor,id=132-987-655,customer="China",hostname=example.com,sensor-version=v1.00 altitude=10,其有两种实现方式,分别如下。
通过配置文件实现的示例代码:
influx2.tags.id = 132-987-655influx2.tags.customer = Chinainflux2.tags.hostname = ${env.hostname}influx2.tags.sensor-version = ${version}
通过API实现的示例代码:
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url(url).authenticateToken(token)
.addDefaultTag("id","132-987-655")
.addDefaultTag("customer","China")
.addDefaultTag("hostnamer","${env.hostname}")
.addDefaultTag("sensor-version","${version}").build();
GZip支持
InfluxDBClient默认不会为其底层调用的HTTP请求启用GZIP压缩,如果要启用GZIP以减少传输数据的大小,则可以通过如下示例代码进行配置。
示例代码:
influxDBClient.enableGzip();
同步查询
同步查询不适用于大型查询结果。
示例代码:
//配置客户端influxDBClient
//定义查询Flux,内容可以根据查询需要自行定义。
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
//创建QueryApi对象
QueryApi queryApi = influxDBClient.getQueryApi();
//查询数据
List<FluxTable> tables = queryApi.query(flux);
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
}
}
//关闭客户端
influxDBClient.close();
另外同步查询提供了FluxRecords到 POJO的可能性映射。
//定义POJO对象类
@Measurement(name = "temperature")
public static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
//配置客户端influxDBClient
//定义查询Flux,内容可以根据查询需要自行定义。
String flux = "from(bucket:\"my-bucket\")
|> range(start: 0)
|> filter(fn: (r) => r._measurement == \"temperature\")";
//创建QueryApi对象
QueryApi queryApi = influxDBClient.getQueryApi();
//查询数据并映射成POJO对象
List<Temperature> temperatures = queryApi.query(flux, Temperature.class);
for (Temperature temperature : temperatures) {
System.out.println(temperature.location + ": " + temperature.value + " at " + temperature.time);
}
//关闭客户端
influxDBClient.close();
异步查询
异步查询提供了处理未绑定查询的可能性,并允许用户处理异常、停止接收更多结果以及成功查询的通知。
示例代码:
//配置客户端influxDBClient
//定义查询Flux,内容可以根据查询需要自行定义。
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
//创建QueryApi对象
QueryApi queryApi = influxDBClient.getQueryApi();
//查询数据:query(String query, BiConsumer<Cancellable, FluxRecord> onNext, Consumer<? super Throwable> onError, Runnable onComplete);
//异步查询提供多种query方法,如上方法的后两个参数可以根据实际需要灵活选用
queryApi.query(flux,(cancellable, fluxRecord) -> {
//回调消费FluxRecord结果,具有中断流查询的能力
System.out.println(fluxRecord.getTime() + ": "+fluxRecord.getValueByKey("_value"));
},throwable -> {
//回调消费所有的错误通知
System.out.println("Error occurred:"+ throwable.getMessage());
},() -> {
//回调消费查询成功的通知
System.out.println("Query completed");
});
//关闭客户端
influxDBClient.close();
另外异步查询提供了FluxRecords到 POJO的可能性映射。
示例代码:
//定义POJO对象类
@Measurement(name = "temperature")
public static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
//配置客户端influxDBClient
//定义查询Flux,内容可以根据查询需要自行定义。
String flux = "from(bucket:\"my-bucket\")
|> range(start: 0)
|> filter(fn: (r) => r._measurement == \"temperature\")";
//创建QueryApi对象
QueryApi queryApi = influxDBClient.getQueryApi();
//查询数据并映射成POJO对象
queryApi.query(flux, Temperature.class, (cancellable, temperature) -> {
//回调消费FluxRecord结果并映射成POJO对象,具有中断异步查询的能力
System.out.println(temperature.location + ": " + temperature.value + " at " + temperature.time);
});
//关闭客户端
influxDBClient.close();
原生查询
原生查询允许直接处理成原始CSV响应。
示例代码:
//配置客户端influxDBClient
//定义查询Flux,内容可以根据查询需要自行定义。
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
//创建QueryApi对象
QueryApi queryApi = influxDBClient.getQueryApi();
//查询数据
String csv = queryApi.queryRaw(flux);
System.out.println("CSV response: " + csv);
//关闭客户端
influxDBClient.close();
异步版本允许逐行处理。
示例代码:
//配置客户端influxDBClient
//定义查询Flux,内容可以根据查询需要自行定义。
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
//创建QueryApi对象
QueryApi queryApi = influxDBClient.getQueryApi();
//查询数据
queryApi.queryRaw(flux, (cancellable, line) -> {
System.out.println("Response: " + line);
});
//关闭客户端
influxDBClient.close();
Flux-DSL构建查询
对于同步查询、异步查询或原生查询中的Flux查询语句,除了直接手动拼接成String外,还可以通过Flux-DSL来构建Flux查询。
示例代码:
//配置客户端influxDBClient
//构建Flux对象,内容可以根据查询需要自行定义。
Flux flux = Flux.from("my-bucket")
.range(-30L, ChronoUnit.MINUTES)
.filter(Restrictions.and(Restrictions.measurement().equal("cpu")))
.limit(10);
//创建QueryApi对象
QueryApi queryApi = influxDBClient.getQueryApi();
//查询数据
List<FluxTable> tables = queryApi.query(flux.toString());
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
}}
//关闭客户端
influxDBClient.close();
删除数据
删除数据需要创建DeleteApi对象,下面展示从InfluxDB删除数据的示例代码:
//配置客户端influxDBClient
//创建DeleteApi对象。
DeleteApi deleteApi = influxDBClient.getDeleteApi();
//删除数据。其中,DeleteApi对象拥有包含不同参数的多个delete方法,起止时间以及bucket等信息参数可以根据实际需要灵活调整。
try{
OffsetDateTime start = OffsetDateTime .now().minus(1, ChronoUnit.HOURS);
OffsetDateTime stop = OffsetDateTime.now();
deleteApi.delete(start, stop, "", "my-bucket", "my-org");
} catch (InfluxException ie) {
System.out.println("InfluxException: " + ie);
}
//关闭客户端
influxDBClient.close();
日志等级配置
可以通过更改日志等级来记录请求和响应内容,其中,LogLevel值包括:NONE、BASIC、HEADER、BODY。
注意,当Logleve配置为BODY等级时,将在流式传输时禁用块并将整个响应加载到内存中。
示例代码:
influxDBClient.setLogLevel(LogLevel.HEADERS)
管理API
允许客户通过SDK中的API进行管理操作,其中包含Buckets接口、org接口、sources接口。
Buckets接口
管理buckets相关接口需要首先创建BucketsApi对象,进而调用其新建、查询、删除等函数方法实现对buckets的数据管理。
示例代码:
//配置客户端influxDBClient
//创建BucketsApi对象
BucketsApi bucketsApi = influxDBClient.getBucketsApi();
//说明:BucketsApi 对象中提供多种不同参数的新建、查询、删除等函数方法用于Bucket对象的管理,下面只是择取部分方法对其进行代码示例,实际可根据需要灵活调整所调用方法。
//创建Bucket(Bucket个数不超过100)
BucketRetentionRules retention = new BucketRetentionRules();retention.setEverySeconds(3600);
Bucket createBucket= bucketsApi.createBucket("iot-bucket", retention, "xxxxxxxxx");
//查询
BucketBucket foundBucket = bucketsApi.findBucketByID(createBucket.getId());
//更新
BucketcreateBucket.setName("Therm sensor 2000");
createBucket.getRetentionRules().add(new BucketRetentionRules().everySeconds(3600*2));
Bucket updatedBucket = bucketsApi.updateBucket(createBucket);
//删除
BucketbucketsApi.deleteBucket(updatedBucket);
//关闭客户端
influxDBClient.close();
org接口
管理organizations相关接口需要首先创建OrganizationsApi对象,进而调用其查询函数方法实现对organizations数据的查询。
示例代码:
//配置客户端influxDBClient
//创建OrganizationsApi对象
OrganizationsApi organizationsApi = influxDBClient.getOrganizationsApi();
//说明:OrganizationsApi 对象中提供多种不同参数的查询函数方法用于Organizations对象的管理,下面只是择取部分方法对其进行代码示例,实际可根据需要灵活调整所调用方法。
//查询
OrganizationOrganization foundOrganization = organizationsApi.findOrganizationByID(createOrganization.getId());
//关闭客户端
influxDBClient.close();
sources接口
管理sources相关接口需要首先创建SourcesApi对象,进而调用其新建、查询、删除等函数方法实现对sources的数据管理。
示例代码:
//配置客户端influxDBClient
//创建SourcesApi对象
SourcesApi sourcesApi = influxDBClient.getSourcesApi();
//说明:SourcesApi 对象中提供多种不同参数的查询等函数方法用于Sources对象的管理,下面只是择取部分方法对其进行代码示例,实际可根据需要灵活调整所调用方法。
//创建
SourceSource source = new Source();
source.setOrgID("02cebf26d7fc1000");
source.setDefault(false);
source.setName("my-source");
source.setType(Source.TypeEnum.V1);
source.setUrl("http://localhost:8383");
source.setInsecureSkipVerify(true);
source.setTelegraf("telegraf");
source.setToken(UUID.randomUUID().toString());
source.setUsername("admin");
source.setPassword("password");
source.setSharedSecret(UUID.randomUUID().toString());
source.setMetaUrl("/usr/local/var/influxdb/meta");
source.setDefaultRP("autogen");
Source createdSource = sourcesApi.createSource(source);
//查询
SourceSource foundSource = sourcesApi.findSourceByID(createdSource.getId());
//更新
SourcecreatedSource.setInsecureSkipVerify(false);
updateSource = sourcesApi.updateSource(createdSource);
//删除SourcesourcesApi.deleteSource(updateSource);
//关闭客户端
influxDBClient.close();
关闭客户端
InfluxDB Client提供了优雅的关闭功能。
示例代码:
//关闭客户端
influxDBClient.close();