Spark支持Scala连接数据库

数据库设计:

CREATE DATABASE bigdata;
USE bigdata;
CREATE TABLE `t_student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

pom.xml:

<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
             <version>${spark.version}</version>
         </dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
      <!--  <testSourceDirectory>src/test/scala</testSourceDirectory>-->
        <plugins>
            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

scala代码:

package cn.itcast.jdbc

import java.sql
import java.sql.{DriverManager, PreparedStatement}

import com.mysql.jdbc.Connection
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRDD {
  def main(args: Array[String]): Unit = {
    //1.create
    val conf: SparkConf = new SparkConf().setAppName("chenyanlong").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    sc.setLogLevel("WARN")

    //2.prepare of data
    val data: RDD[(String, Int)] = sc.parallelize(List(("chen", 18), ("yan", 19), ("long", 20)))

    //foreachPartition针对每一个分区进行操作
    //data.foreachPartition(savaToMySQL)

    //3.connect
    def getConn(): sql.Connection = {
      DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456")
    }

    //eg:insert
    data.foreachPartition(saveToMySQL)
    //4.insert function
    def saveToMySQL(partitionData:Iterator[(String,Int)])={
      //save to mysql
      val conn: sql.Connection = getConn()

      //
      partitionData.foreach(data=>{
        //save each mysql
        val sql="insert into `t_student` (`id`,`name`,`age`)  values(null,?,?);"
        val stmt: PreparedStatement = conn.prepareStatement(sql)
        stmt.setString(1,data._1)
        stmt.setInt(2,data._2)
        stmt.execute()
      })
      conn.close()
    }
    //5.select param
    val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(
      sc,
      getConn,
      "select  * from t_student where id >=? and id <=?",
      4,
      6,
      2,
      rs => {
        val id: Int = rs.getInt("id")
        val name: String = rs.getString("name")
        val age: Int = rs.getInt("age")
        (id, name, age)
      }
    )
    //6.print data
    println(studentRDD.collect().toBuffer)
  }
}

运行效果:

 

 

chenyanlong_v
关注 关注
  • 0
    点赞
  • 1
    收藏
    觉得还不错? 一键收藏
  • 0
    评论
使用Scala 读写MySQL 数据给Spark任务执行
Kaybee - 练级之路
12-17 1794
初学Spark,需要从数据库读取数据给Spark执行,然后将执行结果返回给数据库。由于Spark是基于 Scala 开发的,刚开始完全摸不到头脑,本来是用java将数据库数据写到一个文件,然后spark去读这个文件然后执行,又突然想到,既然scala写的spark程序,何不用scala来直接读取数据库给spark任务执行,然后返回给数据库就行了啊,还绕那么多弯干嘛。。虽然不会写Scala,但是会写
Spark(6)——Sparkscala交互
weixin_48445640的博客
10-15 319
在前面我们使用的是python与spark交互,其实spark也提供了与scala交互的页面,在spark/bin目录下,实行如下语句: spark-shell --master local[2] 可见是实现了scala交互,可以使用:
ScalaSpark:大数据处理的完美组合
最新发布
2401_85639015的博客
08-07 1228
函数式编程:支持高阶函数、不可变数据结构等。面向对象编程:支持类和对象的定义,并具备继承、多态等特性。与Java兼容:可以与Java代码互操作,方便使用现有的Java库。表达能力强:代码简洁,能够用更少的代码实现更多功能。Apache Spark是一个开源的分布式计算框架,用于处理大规模数据集。内存计算:通过将数据存储在内存中,显著提升计算速度。RDD(弹性分布式数据集):提供了一个可以并行处理的数据结构。支持多种编程语言:包括Java、Python、Scala和R。丰富的库支持
scala+spark
weixin_63168851的博客
04-11 233
spark-env.template复制一份spark-env.sh。1、将Scala安装包上传到syh1虚拟机/etc/packages。13、将syh1节点Spark复制到syh2,syh3节点。10、修改spark-env.sh,文件末尾添加以下内容。4、修改文件/etc/profile (3个节点)7、将spark包上传到/opt/programs。6、将scala安装复制到syh1和syh2节点。8、切换目录,解压到/opt/programs/3、解压到/opt/programs。
Spark学习链接(Scala)
code_caq的博客
08-11 381
1.scala 官方:http://www.scala-lang.org/api/2.10.6/#package http://www.runoob.com/scala/scala-tutorial.html2.Spark官方指南 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package3
scala 操作数据库的方法
08-25
Scala 中,定义数据库连接是操作数据库的第一步。我们可以使用 DruidDataSource 来定义数据库连接。DruidDataSource 是一个开源的数据库连接池,提供了高性能和稳定的数据库连接。 在上面的代码中,我们使用 ...
Greenplum Spark Connector 2.2.0 for Scala 2.11
10-27
在技术细节上,`greenplum-connector-apache-spark-scala_2.11-2.2.0.jar`是连接器的核心库文件,它包含了所有必要的类和方法,使得Spark应用程序能够识别和连接Greenplum。开发者在构建Spark应用时,需要将这个JAR...
scala读取mysql数据库_IDEA 环境中使用Scala连接mysql数据库并读取数据。
weixin_30446393的博客
01-19 375
package cn.brent.sparkstreammingimport java.sql.{Connection, DriverManager}import scala.collection.mutable.ArrayBufferobject DBUntils {val mysqlConf = Map("driver" -> "com.mysql.jdbc.Driver","url" ...
spark连接数据库操作(scala实现)
幽林孤狼
08-20 6389
最近用到spark streaming程序处理数据时,想要将结果保存到关系数据库中,于是先考虑到怎么样连接数据库,以及如何操作数据库 连接操作方法如下: 一、基本查询 import java.sql.{Connection, DriverManager, ResultSet} // Change to Your Database Config val conn_str = "jdbc:my
Scala连接Mysql数据库和Sqlserver数据库,增量抽取数据存储到Hive数据库
zhengzaifeidelushang的博客
12-17 1794
Scala连接Mysql数据库和Sqlserver数据库 Mysql和Sqlserver源数据库单表数据量超过200G,现在需要把数据搬运到HDFS上存储,释放源数据库存储空间。这里采用Scala开发Spark程序,按照索引ID增量抽取数据插入到hive数据库中,每次增量抽取300万条数据。 如下图所示: 每次抽取300万条数据,并且每次存储最大ID到一张记录表中,在最大ID基础上实现每次增量抽取300万条数据到Hive数据库表中。 下面详细记录了Scala连接Mysql数据库和Sqlserver数据库,
ScalaSpark的环境搭建版本匹配问题(学习笔记)
04-18
最新版本的scala-2.11.8与Spark2.1.0环境搭建
ScalaSpark安装及部署
Jenny_yz的博客
04-26 872
1.进入spark/conf目录修改spark的配置文件spark-env.sh,将spark-env.sh.template配置模板文件复制一份并重命名spark-env.sh,具体命令如下。命令,修改spark-env.sh文件,在该文件中添加以下内容:(注:将SPARK_MASTER_HOST配置参数前用#注释掉)链接:https://pan.baidu.com/s/1iWD6wVCHhDyohoqlP-ellA?命令将其文件解压到/export/servers/目录下。
Spark2.0 读写mysql数据(scala)
@羲凡—只为更好的活着
04-01 872
@羲凡——只为了更好的活着 Spark2.0 读写mysql数据(scala) 特别强调楼主使用spark2.3.2版本 1.准备工作 在pom.xml文件中要添加 <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> ...
idea关联scalaspark开发(全)
qq_49824182的博客
10-12 2861
idea关联scalaspark开发(全)
SparkScala配置
qq_40891050的博客
06-14 685
sparkscala
sparkscala快速入门
五哥的酒馆
07-20 372
&#13; scala和java都是在jvm之上的语言,相对来讲,scala热度比较低,其实并不是一个特别好的语言选择。&#13; 原因倒不是因为scala本身的缺点,而是使用人群不够多,论坛和社区不够活跃。这就跟社交软件一样,大家都用微信,短信就没人用了。&#13; 但是scala是写分布式程序的一门非常方便的语言,因为scala几乎每个对象都有map,reduce,fil...
Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数
PushyTao的博客
04-17 2997
本文目录写在前面step1 下载Scala IDEstep2step3 Scala 下载step4 Scala 配置step5 创建scala项目step6 创建scala objectstep7 修改pom文件配置项目设置输入路径 写在前面 本系列文章索引以及一些默认好的条件在 传送门 要想完成Spark的配置,首先需要完成Hadoop&&Spark的配置 Hadoop配置教程:链接 若未进行明确说明,均按照root用户操作 step1 下载Scala IDE 本来在Eclipse的
spark集群与scala程序开发实用经验
1066196847的博客
04-04 1840
spark在很多公司中都有线上应用,多是用在处理数据上面,语法相较于hadoop更加简单,而且更易理解,集群也更易管理,但是还是有很多技巧可寻,掌握这些技巧对提升工作效率来说非常重要
Scala连接MySQL数据库读写操作示例
总结,这段代码演示了如何在Scala中使用Spark SQL连接MySQL数据库读取数据,以及如何处理和转换数据,但缺少了完整的数据处理流程和数据写回数据库的部分。在实际应用中,你需要根据具体需求扩展这段代码,比如添加...
写文章

热门文章

  • 鸿蒙OS下载地址 142649
  • 机器学习实战教程(13篇) 40831
  • SQL中的外键 34723
  • PEST 分析法 19693
  • 最新AI学习路线附带资料与链接 18941

分类专栏

  • 编程语言 1篇
  • 大数据_安全 1篇
  • 大数据_架构
  • 大数据_获取 1篇
  • 大数据_存储 3篇
  • 大数据_计算 1篇
  • 大数据_分析 1篇
  • 大数据_可视化
  • 机器学习 7篇

最新评论

  • 解决Maven 编译出的jar中没有主清单属性

    软剑: 可用 解决问题了

  • SQL中的外键

    ChanRa1n: 这个出错和你前面插入的数据没有对应上啊表情包

  • 如何打开.lxe文件

    行者sl: 大哥文件没了

  • WARN hdfs.DataStreamer: Caught exception java.lang.InterruptedException

    浪迹天涯笑红尘: 程序没执行 你再说什么飞机》

  • Spark为什么要设计宽窄依赖?

    整多两瓶咖啡不就行: 你回答了个什么 表情包

大家在看

  • 如何发送事件到原生层
  • STM32CubeMX生成main.c、main.h文件解读 169
  • ros中的cpp和py运行 132
  • 大语言模型学习第八讲之大语言模型评估(8.4&8.5)
  • 每日OJ题_牛客_JZ79判断是不是平衡二叉树_C++_Java

最新文章

  • bigdata_flink错误本-错误记录
  • Clickhouse_表引擎
  • 设计模式参考
2023年1篇
2022年9篇
2021年2篇
2020年72篇
2019年231篇
2018年63篇
2017年159篇

目录

目录

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43元 前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

玻璃钢生产厂家宝山区专业玻璃钢雕塑性价比出众吉林抽象玻璃钢雕塑海拉尔玻璃钢雕塑价格商场美陈布置木门大连售卖沈阳玻璃钢花盆工艺品玻璃钢卡通雕塑定做经玻璃钢雕塑福建周年庆典商场美陈报价商场水系美陈玻璃钢梅花鹿雕塑摆件玻璃钢雕塑制品厂电话蚌埠校园玻璃钢雕塑绍兴玻璃钢雕塑销售厂家广东玻璃钢雕塑多少钱小区玻璃钢雕塑厂家供应玻璃钢实物雕塑邯郸玻璃钢雕塑厂家哪家好玻璃钢雕塑一般哪里用的多精美的玻璃钢动物雕塑台州户外玻璃钢雕塑供应商商场森系美陈布置玻璃钢户外雕塑厂家玻璃钢雕塑设计平台哪个好玻璃钢卡通雕塑设计厂家惠州玻璃钢花盆制品厂四川自流井玻璃钢雕塑工业区武汉水果玻璃钢雕塑优势玻璃钢雕塑打磨工具佛山玻璃钢动物雕塑销售厂家沈阳玻璃钢雕塑企业香港通过《维护国家安全条例》两大学生合买彩票中奖一人不认账让美丽中国“从细节出发”19岁小伙救下5人后溺亡 多方发声单亲妈妈陷入热恋 14岁儿子报警汪小菲曝离婚始末遭遇山火的松茸之乡雅江山火三名扑火人员牺牲系谣言何赛飞追着代拍打萧美琴窜访捷克 外交部回应卫健委通报少年有偿捐血浆16次猝死手机成瘾是影响睡眠质量重要因素高校汽车撞人致3死16伤 司机系学生315晚会后胖东来又人满为患了小米汽车超级工厂正式揭幕中国拥有亿元资产的家庭达13.3万户周杰伦一审败诉网易男孩8年未见母亲被告知被遗忘许家印被限制高消费饲养员用铁锨驱打大熊猫被辞退男子被猫抓伤后确诊“猫抓病”特朗普无法缴纳4.54亿美元罚金倪萍分享减重40斤方法联合利华开始重组张家界的山上“长”满了韩国人?张立群任西安交通大学校长杨倩无缘巴黎奥运“重生之我在北大当嫡校长”黑马情侣提车了专访95后高颜值猪保姆考生莫言也上北大硕士复试名单了网友洛杉矶偶遇贾玲专家建议不必谈骨泥色变沉迷短剧的人就像掉进了杀猪盘奥巴马现身唐宁街 黑色着装引猜测七年后宇文玥被薅头发捞上岸事业单位女子向同事水杯投不明物质凯特王妃现身!外出购物视频曝光河南驻马店通报西平中学跳楼事件王树国卸任西安交大校长 师生送别恒大被罚41.75亿到底怎么缴男子被流浪猫绊倒 投喂者赔24万房客欠租失踪 房东直发愁西双版纳热带植物园回应蜉蝣大爆发钱人豪晒法院裁定实锤抄袭外国人感慨凌晨的中国很安全胖东来员工每周单休无小长假白宫:哈马斯三号人物被杀测试车高速逃费 小米:已补缴老人退休金被冒领16年 金额超20万

玻璃钢生产厂家 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化