Spring Boot 项目操作 InfluxDB 可以通过官方 Java 客户端实现。由于 InfluxDB 1.x 和 2.x 的客户端 API 差异较大,以下分别提供两个版本的实现方案,以网约车场景中的车辆位置数据为例:
在 pom.xml
中引入 1.x 客户端:
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.23</version>
</dependency>
在 application.yml
中添加配置:
influxdb:
url: http://localhost:8086
username: admin
password: admin123
database: vehicle_db # 数据库名
package com.bwie.config;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class InfluxDB1Config {
@Value("${influxdb.url}")
private String url;
@Value("${influxdb.username}")
private String username;
@Value("${influxdb.password}")
private String password;
@Value("${influxdb.database}")
private String database;
@Bean
public InfluxDB influxDB() {
// 创建连接
InfluxDB influxDB = InfluxDBFactory.connect(url, username, password);
// 设置默认数据库
influxDB.setDatabase(database);
// 开启批量写入(优化性能)
influxDB.enableBatch(
1000, // 批量大小
100, // 刷新间隔(毫秒)
TimeUnit.MILLISECONDS
);
return influxDB;
}
}
package com.bwie.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class VehiclePositionVo {
private String carId; // 车辆ID(标签)
private Double latitude; // 纬度(字段)
private Double longitude; // 经度(字段)
private Instant timestamp; // 时间戳
}
package com.bwie;
import com.bwie.vo.VehiclePositionVo;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@SpringBootTest
public class TestInfluxApp {
@Autowired
private InfluxDB influxDB;
private static final String MEASUREMENT = "vehicle_pos";
@Test
public void testInfluxDB() {
//--1 写入数据
VehiclePositionVo position = new VehiclePositionVo("car1", 39.9, 116.3, Instant.now());
Point point = Point.measurement(MEASUREMENT)
.tag("car_id", position.getCarId()) // 标签(带索引,适合查询)
.addField("lat", position.getLatitude()) // 字段
.addField("lon", position.getLongitude())
.time(position.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS) // 时间戳
.build();
influxDB.write(point);
//--2 查询数据
// InfluxQL查询语句
String query = String.format(
"SELECT * FROM %s WHERE carId = '%s' AND time >= %dms AND time <= %dms",
MEASUREMENT, "car1",
Instant.now().toEpochMilli()-1000000000L,
Instant.now().toEpochMilli()
);
QueryResult result = influxDB.query(new Query(query));
List<VehiclePositionVo> list = result.getResults().get(0).getSeries().get(0).getValues().stream()
.map(values -> {
VehiclePositionVo pos = new VehiclePositionVo();
pos.setCarId("car1");
pos.setLatitude((Double) values.get(1)); // 纬度在第2列
pos.setLongitude((Double) values.get(2)); // 经度在第3列
pos.setTimestamp(Instant.ofEpochMilli(((Number) values.get(0)).longValue())); // 时间戳在第1列
return pos;
})
.collect(Collectors.toList());
for (VehiclePositionVo positionVo : list) {
System.out.println(positionVo);
}
}
}
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class InfluxDB2Service {
private final InfluxDBClient influxDBClient;
private final String bucket;
private static final String MEASUREMENT = "vehicle_pos"; // 类似表名
public InfluxDB2Service(InfluxDBClient influxDBClient, @Value("${influxdb2.bucket}") String bucket) {
this.influxDBClient = influxDBClient;
this.bucket = bucket;
}
// 新增数据
public void insert(VehiclePosition position) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
Point point = Point.measurement(MEASUREMENT)
.addTag("carId", position.getCarId()) // 标签
.addField("latitude", position.getLatitude()) // 字段
.addField("longitude", position.getLongitude())
.time(position.getTimestamp(), WritePrecision.NS) // 时间戳(纳秒)
.build();
writeApi.writePoint(bucket, point);
}
}
// 批量插入
public void batchInsert(List<VehiclePosition> positions) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
List<Point> points = positions.stream()
.map(pos -> Point.measurement(MEASUREMENT)
.addTag("carId", pos.getCarId())
.addField("latitude", pos.getLatitude())
.addField("longitude", pos.getLongitude())
.time(pos.getTimestamp(), WritePrecision.NS)
.build())
.collect(Collectors.toList());
writeApi.writePoints(bucket, points);
}
}
// 查询数据(使用Flux语法)
public List<VehiclePosition> queryByCarId(String carId, Instant start, Instant end) {
// Flux查询语句
String flux = String.format("""
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "%s" and r.carId == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
""", bucket, start, end, MEASUREMENT, carId);
List<FluxTable> tables = influxDBClient.getQueryApi().query(flux);
return tables.stream()
.flatMap(table -> table.getRecords().stream())
.map(record -> {
VehiclePosition pos = new VehiclePosition();
pos.setCarId(carId);
pos.setLatitude(record.getValueByKey("latitude").toString());
pos.setLongitude(record.getValueByKey("longitude").toString());
pos.setTimestamp(record.getTime());
return pos;
})
.collect(Collectors.toList());
}
// 删除数据
public void deleteByCarId(String carId, Instant start, Instant end) {
influxDBClient.getDeleteApi().delete(
start, end,
String.format("carId=\"%s\"", carId), // 条件
bucket,
influxDBClient.getOrganization()
);
}
}
database
+ measurement
组织数据bucket
(桶)+ measurement
组织数据,且必须通过 token
认证enableBatch
)可大幅提升写入性能根据项目中使用的 InfluxDB 版本选择对应实现,上述代码可直接集成到 Spring Boot 项目中,用于网约车车辆轨迹等时序数据的存储和查询。