Wenzhihuai
  • README
  • 目录
  • 一、Java
    • 1.1 一次jvm调优过程.md
    • 1.2 JVM调优参数.md
    • 1.3 serverlog.md
    • 1.4 【elasticsearch】搜索过程详解
    • 1.5 基于kubernetes的分布式限流
  • 二、大数据
    • 3.1 elastic-spark.md
  • 三、个人网站
    • 3.1 历史与架构.md
    • 3.2 Lucene的使用.md
    • 3.3 定时任务.md
    • 3.4 日志系统.md
    • 3.5 小集群部署.md
    • 3.6 数据库备份.md
    • 3.7 那些牛逼的插件.md
    • 3.8 基于贝叶斯的情感分析.md
    • 3.9 网站性能优化.md
  • 四、DevOps
    • 4.1 DevOps平台.md
    • 4.2 jenkins-x
    • 4.3 tekton
    • 4.4 Git
  • 五、Kubernetes
  • 六、云原生
  • 七、redis
    • 7.1 redis缓存.md
  • 八、mysql
    • 8.1 mysql缓存.md
  • 面试
    • 算法
    • jvm
    • kafka
    • mysql
    • kubernetes
  • 生活
    • 3017.md
    • 3018.md
    • 3019.md
由 GitBook 提供支持
在本页
  • elastic spark
  • 一、原生RDD支持
  • 二、Spark Streaming
  • 三、Spark SQL
  • 四、Spark Structure Streaming
  • 五、Spark on kubernetes Operator
  • 参考:

这有帮助吗?

  1. 二、大数据

3.1 elastic-spark.md

上一页二、大数据下一页三、个人网站

最后更新于1年前

这有帮助吗?

elastic spark

Hadoop允许Elasticsearch在Spark中以两种方式使用:通过自2.1以来的原生RDD支持,或者通过自2.0以来的Map/Reduce桥接器。从5.0版本开始,elasticsearch-hadoop就支持Spark 2.0。目前spark支持的数据源有: (1)文件系统:LocalFS、HDFS、Hive、text、parquet、orc、json、csv (2)数据RDBMS:mysql、oracle、mssql (3)NOSQL数据库:HBase、ES、Redis (4)消息对象:Redis

elasticsearch相对hdfs来说,容易搭建、并且有可视化kibana支持,非常方便spark的初学入门,本文主要讲解用elasticsearch-spark的入门。

Spark - Apache Spark

一、原生RDD支持

1.1 基础配置

相关库引入:

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-30_2.13</artifactId>
            <version>8.1.3</version>
        </dependency>
public static SparkConf getSparkConf() {
    SparkConf sparkConf = new SparkConf().setAppName("elasticsearch-spark-demo");
    sparkConf.set("es.nodes", "host")
            .set("es.port", "xxxxxx")
            .set("es.nodes.wan.only", "true")
            .set("es.net.http.auth.user", "elxxxxastic")
            .set("es.net.http.auth.pass", "xxxx")
            .setMaster("local[*]");
    return sparkConf;
}

1.2 读取es数据

这里用的是kibana提供的sample data里面的索引kibana_sample_data_ecommerce,也可以替换成自己的索引。

public static void main(String[] args) {
    SparkConf conf = getSparkConf();
    try (JavaSparkContext jsc = new JavaSparkContext(conf)) {

        JavaPairRDD<String, Map<String, Object>> esRDD =
                JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce");
        esRDD.collect().forEach(System.out::println);
    }
}

esRDD同时也支持query语句esRDD(final JavaSparkContext jsc, final String resource, final String query),一般对es的查询都需要根据时间筛选一下,不过相对于es的官方sdk,并没有那么友好的api,只能直接使用原生的dsl语句。

1.3 写数据

支持序列化对象、json,并且能够使用占位符动态索引写入数据(使用较少),不过多介绍了。

public static void jsonWrite(){
    String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
    String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
    JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
    JavaEsSpark.saveJsonToEs(stringRDD, "spark-json");
}

比较常用的读写也就这些,更多可以看下官网相关介绍。

二、Spark Streaming

spark的实时处理,es5.0的时候开始支持,目前

三、Spark SQL

四、Spark Structure Streaming

五、Spark on kubernetes Operator

参考:

SparkConf配置,更多详细的请点击或者源码。

1.

2.

3.

这里
ConfigurationOptions
Apache Spark support
elasticsearch-hadoop
使用SparkSQL操作Elasticsearch - Spark入门教程