Skip to content

使用ES Java SDK操作

pom依赖

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>estest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.10.1</version>
        </dependency>
    </dependencies>
</project>

日志写入

java
package org.example;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class EsWriteExample {
    public static void main(String[] args) throws IOException {

        String slsProject = "etl-dev";
        String slsLogstore = "accesslog";
        String slsEndpoint = "cn-huhehaote.log.aliyuncs.com";

        String schema = "https";
        String esHost = slsProject + "." +  slsEndpoint; // ${project}.${endpoint}
        int port = 443;
        String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore}
        String esPrefix = "/es/";
        String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET");

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(accessKeyId, accessKeySecret));

        RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback(
                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));

        // Set /es/ prefix
        builder.setPathPrefix(esPrefix);
        RestHighLevelClient client = new RestHighLevelClient(builder);

        // Create bulk request
        BulkRequest bulkRequest = new BulkRequest();

        // Add sample documents to bulk request
        for (int i = 1; i <= 10; i++) {
            Map<String, Object> document = new HashMap<>();
            document.put("@timestamp", System.currentTimeMillis());
            document.put("request_method", i % 2 == 0 ? "GET" : "POST");
            document.put("request_uri", "/api/test/" + i);
            document.put("status", i % 3 == 0 ? 404 : 200);
            document.put("response_time", 100 + (i * 10));
            document.put("user_agent", "Mozilla/5.0 (Test Agent " + i + ")");
            document.put("remote_addr", "192.168.1." + (100 + i));
            document.put("body_bytes_sent", 1024 + (i * 100));
            document.put("message", "Sample log entry " + i);

            IndexRequest indexRequest = new IndexRequest(esIndex)
                    .id("doc_" + i)  // Optional: set document ID
                    .source(document, XContentType.JSON);

            bulkRequest.add(indexRequest);
        }

        try {
            // Execute bulk request
            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

            // Check for failures
            if (bulkResponse.hasFailures()) {
                System.err.println("Bulk request has failures:");
                System.err.println(bulkResponse.buildFailureMessage());
            } else {
                System.out.println("Bulk request completed successfully!");
                System.out.println("Total items processed: " + bulkResponse.getItems().length);
                System.out.println("Time taken: " + bulkResponse.getTook().getMillis() + " ms");
            }

            // Print individual item results
            // System.out.println("\nIndividual item results:");
            // for (int i = 0; i < bulkResponse.getItems().length; i++) {
            //     org.elasticsearch.action.bulk.BulkItemResponse item = bulkResponse.getItems()[i];
            //     if (item.isFailed()) {
            //         System.out.println("Item " + (i + 1) + " failed: " + item.getFailureMessage());
            //     } else {
            //         System.out.println("Item " + (i + 1) + " success: " + item.getResponse().getResult());
            //     }
            // }

        } catch (IOException e) {
            System.err.println("Error executing bulk request: " + e.getMessage());
            e.printStackTrace();
        } finally {
            client.close();
        }
    }
}

日志查询

java
package org.example;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;

public class EsQueryExample {
    public static void main(String[] args) throws IOException {

        String slsProject = "etl-dev";
        String slsLogstore = "accesslog";
        String slsEndpoint = "cn-huhehaote.log.aliyuncs.com";

        String schema = "https";
        String esHost = slsProject + "." +  slsEndpoint; // ${project}.${endpoint}
        int port = 443;
        String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore}
        String esPrefix = "/es/";
        String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET");

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(accessKeyId, accessKeySecret));

        RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback(
                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));

        // Set /es/ prefix
        builder.setPathPrefix(esPrefix);
        RestHighLevelClient client = new RestHighLevelClient(builder);

        // Query
        BoolQueryBuilder boolExpr= new BoolQueryBuilder();

        long endTime = System.currentTimeMillis();
        long startTime = endTime - 3600 * 1000;
        boolExpr.filter().add(new MatchQueryBuilder("request_method", "GET"));
        boolExpr.filter().add(new RangeQueryBuilder("@timestamp").gte(startTime).lte(endTime).format("epoch_millis"));

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        searchSourceBuilder.query(boolExpr);
        SearchRequest searchRequest = new SearchRequest(esIndex);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println(searchResponse.toString());

        client.close();
    }
}