第四节 SpringBoot项目集成InfluxDB,并实现增删改查

亮子 2025-09-02 15:23:35 130 0 0 0

Spring Boot 项目操作 InfluxDB 可以通过官方 Java 客户端实现。由于 InfluxDB 1.x 和 2.x 的客户端 API 差异较大,以下分别提供两个版本的实现方案,以网约车场景中的车辆位置数据为例:

一、InfluxDB 1.x 版本实现

1. 添加依赖

pom.xml 中引入 1.x 客户端:

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.23</version>
</dependency>

2. 配置 InfluxDB 连接

application.yml 中添加配置:

influxdb:
  url: http://localhost:8086
  username: admin
  password: admin123
  database: vehicle_db  # 数据库名

3. 配置类(创建连接实例)

package com.bwie.config;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
public class InfluxDB1Config {

    @Value("${influxdb.url}")
    private String url;

    @Value("${influxdb.username}")
    private String username;

    @Value("${influxdb.password}")
    private String password;

    @Value("${influxdb.database}")
    private String database;

    @Bean
    public InfluxDB influxDB() {
        // 创建连接
        InfluxDB influxDB = InfluxDBFactory.connect(url, username, password);

        // 设置默认数据库
        influxDB.setDatabase(database);

        // 开启批量写入(优化性能)
        influxDB.enableBatch(
                1000,  // 批量大小
                100,   // 刷新间隔(毫秒)
                TimeUnit.MILLISECONDS
        );

        return influxDB;
    }
}

4. 实体类(车辆位置)

package com.bwie.vo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.Instant;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class VehiclePositionVo {
    private String carId;       // 车辆ID(标签)
    private Double latitude;    // 纬度(字段)
    private Double longitude;   // 经度(字段)
    private Instant timestamp;  // 时间戳
}

5. 增删改查实现类

package com.bwie;

import com.bwie.vo.VehiclePositionVo;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@SpringBootTest
public class TestInfluxApp {

    @Autowired
    private InfluxDB influxDB;

    private static final String MEASUREMENT = "vehicle_pos";

    @Test
    public void testInfluxDB() {

        //--1 写入数据
        VehiclePositionVo position = new VehiclePositionVo("car1", 39.9, 116.3, Instant.now());

        Point point = Point.measurement(MEASUREMENT)
                .tag("car_id", position.getCarId())  // 标签(带索引,适合查询)
                .addField("lat", position.getLatitude())  // 字段
                .addField("lon", position.getLongitude())
                .time(position.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS)  // 时间戳
                .build();
        influxDB.write(point);

        //--2 查询数据
        // InfluxQL查询语句
        String query = String.format(
                "SELECT * FROM %s WHERE carId = '%s' AND time >= %dms AND time <= %dms",
                MEASUREMENT, "car1",
                Instant.now().toEpochMilli()-1000000000L,
                Instant.now().toEpochMilli()
        );

        QueryResult result = influxDB.query(new Query(query));
        List<VehiclePositionVo> list = result.getResults().get(0).getSeries().get(0).getValues().stream()
                .map(values -> {
                    VehiclePositionVo pos = new VehiclePositionVo();
                    pos.setCarId("car1");
                    pos.setLatitude((Double) values.get(1));  // 纬度在第2列
                    pos.setLongitude((Double) values.get(2)); // 经度在第3列
                    pos.setTimestamp(Instant.ofEpochMilli(((Number) values.get(0)).longValue())); // 时间戳在第1列
                    return pos;
                })
                .collect(Collectors.toList());

        for (VehiclePositionVo positionVo : list) {
            System.out.println(positionVo);
        }
    }

}

6. 其他参考代码

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class InfluxDB2Service {

    private final InfluxDBClient influxDBClient;
    private final String bucket;
    private static final String MEASUREMENT = "vehicle_pos";  // 类似表名

    public InfluxDB2Service(InfluxDBClient influxDBClient, @Value("${influxdb2.bucket}") String bucket) {
        this.influxDBClient = influxDBClient;
        this.bucket = bucket;
    }

    // 新增数据
    public void insert(VehiclePosition position) {
        try (WriteApi writeApi = influxDBClient.getWriteApi()) {
            Point point = Point.measurement(MEASUREMENT)
                    .addTag("carId", position.getCarId())  // 标签
                    .addField("latitude", position.getLatitude())  // 字段
                    .addField("longitude", position.getLongitude())
                    .time(position.getTimestamp(), WritePrecision.NS)  // 时间戳(纳秒)
                    .build();
            writeApi.writePoint(bucket, point);
        }
    }

    // 批量插入
    public void batchInsert(List<VehiclePosition> positions) {
        try (WriteApi writeApi = influxDBClient.getWriteApi()) {
            List<Point> points = positions.stream()
                    .map(pos -> Point.measurement(MEASUREMENT)
                            .addTag("carId", pos.getCarId())
                            .addField("latitude", pos.getLatitude())
                            .addField("longitude", pos.getLongitude())
                            .time(pos.getTimestamp(), WritePrecision.NS)
                            .build())
                    .collect(Collectors.toList());
            writeApi.writePoints(bucket, points);
        }
    }

    // 查询数据(使用Flux语法)
    public List<VehiclePosition> queryByCarId(String carId, Instant start, Instant end) {
        // Flux查询语句
        String flux = String.format("""
                from(bucket: "%s")
                  |> range(start: %s, stop: %s)
                  |> filter(fn: (r) => r._measurement == "%s" and r.carId == "%s")
                  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                """, bucket, start, end, MEASUREMENT, carId);

        List<FluxTable> tables = influxDBClient.getQueryApi().query(flux);
        return tables.stream()
                .flatMap(table -> table.getRecords().stream())
                .map(record -> {
                    VehiclePosition pos = new VehiclePosition();
                    pos.setCarId(carId);
                    pos.setLatitude(record.getValueByKey("latitude").toString());
                    pos.setLongitude(record.getValueByKey("longitude").toString());
                    pos.setTimestamp(record.getTime());
                    return pos;
                })
                .collect(Collectors.toList());
    }

    // 删除数据
    public void deleteByCarId(String carId, Instant start, Instant end) {
        influxDBClient.getDeleteApi().delete(
                start, end,
                String.format("carId=\"%s\"", carId),  // 条件
                bucket,
                influxDBClient.getOrganization()
        );
    }
}

核心说明

  1. 数据模型差异
  • 1.x 用 database + measurement 组织数据
  • 2.x 用 bucket(桶)+ measurement 组织数据,且必须通过 token 认证
  1. 查询语言差异
  • 1.x 使用类 SQL 的 InfluxQL
  • 2.x 使用管道式的 Flux 语言,功能更强大但学习成本较高
  1. 更新操作
  • InfluxDB 不支持直接更新数据,需通过“删除旧数据 + 插入新数据”实现
  • 时序数据特性决定了更适合“写后读”,而非频繁修改
  1. 性能优化
  • 批量写入(enableBatch)可大幅提升写入性能
  • 合理设置标签(Tag)和字段(Field):查询频繁的字段用 Tag(带索引)

根据项目中使用的 InfluxDB 版本选择对应实现,上述代码可直接集成到 Spring Boot 项目中,用于网约车车辆轨迹等时序数据的存储和查询。