标签:
MySQL下载地址:http://www.mysql.com/downloads/
1)准备软件
MySQL版本:mysql-5.5.21-win32.msi
2)安装环境:
操作系统:Windows 7旗舰版
3)开始安装
第一步:双击"msi"安装文件,出现如图1.1-1界面——"MySQL安装向导",按"Next"继续。
图1.1-1 MySQL安装向导
第二步:在"I accept …."前面勾上,同意协议,按"Next"按钮继续。
图1.1-2 软件协议
第三步:选择安装类型,有"Typical(默认)"、"Custom(定制安装)"、"Complete(完全)"三个选项。
我们选择"Custom",有更多的选项,也方便熟悉安装过程。
图1.1-3 安装类型
第四步:选择组件及更改文件夹位置。
图1.1-4 自定义界面
所有可用组件列入定制安装对话框左侧的树状视图内。未安装的组件用红色X 图标表示;已经安装的组件有灰色图标。要想更改组件,点击该组件的图标并从下拉列表中选择新的选项。组件我选择了默认安装,位置我会更改一下,点击Browse。
图1.1-5 路径选择
按"OK"按钮返回,并按"Next"按钮继续。
备注:安装mysql的路径中,不能含有中文。
第五步:确认一下先前的设置,如果有误,按"Back"返回重做。按"Install"开始安装。
图1.1-6 准备安装
第六步:正在安装中,请稍候……
图1.1-7 正在安装
第七步:弹出一个页面来,是关于介绍MySQL企业版的信息,没有什么可操作的,按"Next"按钮继续。
图1.1-8 MySQL企业版介绍
然后弹出一个类似界面,接着按"Next"按钮继续。
第八步:那个带复选框的是"MySQL服务器实例配置向导",保持默认勾选,按"Finish"按钮。至此,安装过程已经结束了,但是还需要配置一下。
图1.1-9 安装结束
第九步:MySQL配置向导启动界面,按"Next"按钮继续。
图1.1-10 配置向导
MySQL Configuration Wizard(配置向导)可以帮助自动配置Windows中的服务器。MySQL Configuration Wizard(配置向导)问你一系列问题,然后将回答放到模板中生成一个my.ini文件,该文件与你的安装一致。目前只适用于Windows用户。
一般情况当MySQL安装帮助退出时,从MySQL安装帮助启动MySQL Configuration Wizard(配置向导)。还可以点击Windows启动菜单中MySQL服务器实例配置向导条目中的MySQL部分来启动MySQL Configuration Wizard(配置向导)。并且,还可以进入MySQL安装bin目录直接启动MySQLInstanceConfig.exe文件。
第十步:选择配置类型,可以 选择两种配置类型:Detailed Configuration(详细配置)和Standard Configuration(标准配置)。Standard Configuration(标准配置)选项适合想要快速启动MySQL而不必考虑服务器配置的新用户。详细配置选项适合想要更加细粒度控制服务器配置的 高级用户。我们选择"Detailed Configuration",方便熟悉配置过程。
图1.1-11 配置类型
备注:
如果你是MySQL的新手,需要配置为单用户开发机的服务器,Standard Configuration(标准配置)应当适合你的需求。选择Standard Configuration(标准配置)选项,则 MySQL Configuration Wizard(配置向导)自动设置所有配置选项,但不包括服务选项和安全选项。
Standard Configuration(标准配置)设置选项可能与安装MySQL的系统不兼容。如果系统上已经安装了MySQL和你想要配置的安装,建议选择详细配置。
第十一步:选择服务器类型,可以选择3种服务器类型,选择哪种服务器将影响到MySQL Configuration Wizard(配置向导)对内存、硬盘和过程或使用的决策。
大家根据自己的类型选择了,一般选"Server Machine",不会太少,也不会占满。我们选择"Server Machine",按"Next"按钮继续。
图1.1-12 服务器类型
第十二步:选择数据库用途, 通过Database Usage(数据库使用)对话框,你可以指出创建MySQL表时使用的表处理器。通过该选项,你可以选择是否使用InnoDB储存引擎,以及InnoDB 占用多大比例的服务器资源。"Multifunctional Database(通用多功能型,好)"、"Transactional Database Only(服务器类型,专注于事务处理,一般)"、"Non-Transactional Database Only(非事务处理型,较简单,主要做一些监控、记数用,对MyISAM 数据类型的支持仅限于non-transactional)",随自己的用途而选择了,一般选择第一种多功能的。我们选择"Multifunctional Database",按"Next"按钮继续。
图1.1-13 数据库用途
第十三步:对InnoDB Tablespace 进行配置,就是为InnoDB 数据库文件选择一个存储空间,如果修改了,要记住位置,重装的时候要选择一样的地方,否则可能会造成数据库损坏,当然,对数据库做个备份就没问题了,这里不详述。这里没有修改,使用用默认位置,按"Next"按钮继续。
图1.1-14 配置InnoDB表空间
第十四步:选择MySQL允 许的最大连接数,限制所创建的与MySQL服务器之间的并行连接数量很重要,以便防止服务器耗尽资源。在Concurrent Connections(并行连接)对话框中,可以选择服务器的使用方法,并根据情况限制并行连接的数量。还可以手动设置并行连接的限制。第一种是最大 20个连接并发数,第二种是最大500个并发连接数,最后一种是自定义。我们选择"Online Transaction PRocessing(OLTP)",按"Next"按钮继续。
图1.1-15 最大连接数
第十五步:进行网络配置,在 Networking Options(网络选项)对话框中可以启用或禁用TCP/IP网络,并配置用来连接MySQL服务器的端口号。默认情况启用TCP/IP网络。要想禁用 TCP/IP网络,取消选择Enable TCP/IP Networking选项旁边的检查框。默认使用3306端口。要想更访问MySQL使用的端口,从下拉框选择一个新端口号或直接向下拉框输入新的端口 号。如果你选择的端口号已经被占用,将提示确认选择的端口号。我们保持默认选项,按"Next"按钮继续。
图1.1-16 网络配置
第十六步:对数据库语言编码进行设置,非常重要,因为Hadoop里默认编码为UTF-8,所以为了避免出现乱码,我们这里选择"UTF-8"作为MySQL数据库的语言编码。
图1.1-17 数据库编码
第十七步:是否要把MySQL设置成Windows的服务,一般选择设成服务,这样以后就可以通过服务中启动和关闭mysql数据库了。推荐:下面的复选框也勾选上,这样,在cmd模式下,不必非到mysql的bin目录下执行命令。我们全部打上了勾,Service Name 不变。按"Next"按钮继续。
图1.1-18 服务选项
第十八步:设置MySQL的超级用户密码,这个超级用户非常重要,对MySQL拥有全部的权限,请设置好并牢记超级用户的密码,下面有个复选框是选择是否允许远程机器用root用户连接到你的MySQL服务器上面,如果有这个需求,也请勾选。我们这里的root用户密码设置为"hadoop",并勾上"允许远程连接"复选框,按"Next"按钮继续。
图1.1-19 安全选项
备注:
第十九步:确认设置无误,如果有误,按"Back"返回检查。如果没有,按"Execute"使设置生效。
图1.1-20 确认配置
第二十步:设置完毕,按"Finish"按钮结束MySQL的安装与配置。
图1.1-21 配置完成
备注:这里有一个比较常见的错误,就 是不能"Start service",一般出现在以前有安装MySQL的服务器上,解决的办法,先保证以前安装的MySQL 服务器彻底卸载掉了;不行的话,检查是否按上面一步所说,之前的密码是否有修改,照上面的操作;如果依然不行,将MySQL 安装目录下的data 文件夹备份,然后删除,在安装完成后,将安装生成的 data 文件夹删除,备份的data 文件夹移回来,再重启MySQL 服务就可以了,这种情况下,可能需要将数据库检查一下,然后修复一次,防止数据出错。
4)验证成功
第一种:打开任务管理器 看到MySQL服务是否已经启动。
图1.1-22 任务管理器
第二种:"开始à启动cmdà开打cmd模式",输入"mysql –u root –p"连接数据库。
图1.1-23 连接数据库
1)准备软件
MySQL数据库:MySQL-server-5.5.21-1.linux2.6.i386.rpm
MySQL客户端:MySQL-client-5.5.21-1.linux2.6.i386.rpm
2)安装环境:
操作系统:CentOS6.0 Linux
3)检查安装
在安装MySQL之前,先检查CentOS系统中是否已经安装了一个MySQL,如果已经安装先卸载,不然会导致安装新的MySQL失败。
用下面命令查看系统之前是否已安装MySQL。
rpm -qa | grep mysql
查看结果如下:
从上图得知,CentOS6.0系统自带了一个MySQL,我们需要删除这个老版本,用root用户执行下面语句。
rpm -e --nodeps mysql-libs-5.1.47-4.el6.i686
上图中,我们先切换到"root"用户下,然后执行删除语句,删除之后,我们再次查看,发现已经成功删除了CentOS6.0自带的旧MySQL版本。
在删除MySQL的rpm后,还要进行一些扫尾操作,网上有两种操作。(备注:我在这里两种都没有用到,发现系统中并没有其他残余的MySQL信息。)
第一种善后处理:使用下面命令进行处理。
rm -rf /var/lib/mysql*
rm -rf /usr/share/mysql*
另一种善后处理:卸载后/var/lib/mysql中的/etc/my.cnf会重命名为my.cnf.rpmsave,/var/log/mysqld.log 会重命名为/var/log/mysqld.log.rpmsave,如果确定没用后就手工删除。
4)开始安装
第一步:上传所需软件。通过"FlashFXP"软件使用"vsftpd"上传用到的两个软件到"/home/hadoop"目录下。
第二步:安装MySQL服务端。用"root"用户运行如下命令进行安装:(备注:以下步骤都是用"root"用户执行。)
rpm -ivh MySQL-server-5.5.21-1.linux2.6.i386.rpm
通过SecureCRT查看如下:
如出现如上信息,服务端安装完毕。
第三步:检测MySQL 3306端口是否安打开。测试是否成功可运行netstat看MySQL端口是否打开,如打开表示服务已经启动,安装成功。MySQL默认的端口是3306。
netstat -nat
从上图中发现并没有与"3306"有关的信息,说明"MySQL服务器"没有启动。通过下面命令启动MySQL。
service mysql start
从上图中已经发现我们的MySQL服务器已经起来了。
第四步:安装MySQL客户端。用下面命令进行安装:
rpm -ivh MySQL-client-5.5.21-1.linux2.6.i386.rpm
执行命令显示如下:
从上图中显示MySQL客户端已经安装完毕。
第五步:MySQL的几个重要目录。MySQL安装完成后不像SQL Server默认安装在一个目录,它的数据库文件、配置文件和命令文件分别在不同的目录,了解这些目录非常重要,尤其对于Linux的初学者,因为 Linux本身的目录结构就比较复杂,如果搞不清楚MySQL的安装目录那就无从谈起深入学习。
下面就介绍一下这几个目录。
a、数据库目录
/var/lib/mysql/
b、配置文件
/usr/share/mysql(mysql.server命令及配置文件)
c、相关命令
/usr/bin(mysqladmin mysqldump等命令)
d、启动脚本
/etc/rc.d/init.d/(启动脚本文件mysql的目录)
如:/etc/rc.d/init.d/mysql start/restart/stop/status
下面就分别展示上面的几个目录内容:
第六步:更改MySQL目录。由于MySQL数据库目录占用磁盘比较大,而MySQL默认的数据文件存储目录为/"var/lib/mysql",所以我们要把目录移到"/"根目录下的"mysql_data"目录中。
需要以下几个步骤:
cd /
mkdir mysql_data
可以用两种方法:
service mysql stop
或者
mysqladmin -u root -p shutdown
从上图中我们得知"MySQL服务进程"已经停掉。
备注:MySQL默认用户名为"root",此处的"root"与Linux的最高权限用户"root"不是一会儿,而且默认的用户"root"的密码为空,所以上图中让输入密码,直接点击回车即可。
mv /var/lib/mysql /mysql_data
这样就把MySQL的数据文件移动到了"/mysql_data/mysql"下。
如果"/etc/"目录下没有my.cnf配置文件,请到"/usr/share/mysql/"下找到*.cnf文件,拷贝其中一个合适的配置文件到"/etc/"并改名为"my.cnf"中。命令如下:
cp /usr/share/mysql/my-medium.cnf /etc/my.cnf
上图中,下查看"/etc/"下面是否有"my.cnf"文件,发现没有,然后通过上面的命令进行拷贝,拷贝完之后,进行查看,发现拷贝成功。
备注:"/usr/share/mysql/"下有好几个结尾为cnf的文件,它们的作用分别是。
a、my-small.cnf:是为了小型数据库而设计的。不应该把这个模型用于含有一些常用项目的数据库。
b、my-medium.cnf:是为中等规模的数据库而设计的。如果你正在企业中使用RHEL,可能会比这个操作系统的最小RAM需求(256MB)明显多得多的物理内存。由此可见,如果有那么多RAM内存可以使用,自然可以在同一台机器上运行其它服务。
c、my-large.cnf:是为专用于一个SQL数据库的计算机而设计的。由于它可以为该数据库使用多达512MB的内存,所以在这种类型的系统上将需要至少1GB的RAM,以便它能够同时处理操作系统与数据库应用程序。
d、my-huge.cnf:是为企业中的数据库而设计的。这样的数据库要求专用服务器和1GB或1GB以上的RAM。
这些选择高度依赖于内存的数量、计算机的运算速度、数据库的细节大小、访问数据库的用户数量以及在数据库中装入并访问数据的用户数量。随着数据库和用户的不断增加,数据库的性能可能会发生变化。
备注:这里我们根据实际情况,选择了"my-medium.cnf"进行配置。
为保证MySQL能够正常工作,需要指明"mysql.sock"文件的产生位置,以及默认编码修改为UTF-8。用下面命令:
vim /etc /my.cnf
需要修改和添加的内容如下:
【client】
socket = /mysql_data/mysql/mysql.sock
default-character-set=utf8
【mysqld】
socket = /mysql_data/mysql/mysql.sock
datadir =/mysql_data/mysql
character-set-server=utf8
lower_case_table_names=1(注意linux下mysql安装完后是默认:区分表名的大小写,不区分列名的大小写;lower_case_table_names = 0 0:区分大小写,1:不区分大小写)
备注:【client】和【mysqld】设置的编码时前地名称不一样。
最后,需要修改MySQL启动脚本/etc/rc.d/init.d/mysql,修改datadir=/mysql_data/mysql。
vim /etc/rc.d/init.d/mysql
service mysql start
正准备高兴时,发现MySQL启动不了了,网上搜了一下午,各种都没有解决。后来在一篇文章才得知又是"SELinux"惹得祸。解决办法如下:
打开/etc/selinux/config,把SELINUX=enforcing改为SELINUX=disabled后存盘退出重启机器试试,必须要重启,很关键。
机器重启之后,在把"mysql服务"启动。
第七步:修改登录密码。
MySQL默认没有密码,安装完毕增加密码的重要性是不言而喻的。
在没有添加密码前,直接输入"mysql"就能登录到MySQL数据库里。
用到的命令如下:
mysqladmin -u root password ‘new-password‘
格式:mysqladmin -u用户名 -p旧密码 password 新密码
我们这里设置MySQL数据库"root"用户的密码为"hadoop"。执行的命令如下:
mysqladmin –u root password hadoop
(1)不用密码登录
此时显示错误,说明密码已经修改。
(2)用修改后的密码登录
从上图中得知,我们已经成功修改了密码,并且用新的密码登录了MySQL服务器。
第八步:配置防火墙
第一种:修改防火墙配置文件"/etc/sysconfig/iptables",添加如下内容:
-A INPUT -m state --state NEW -m tcp -p tcp --sport 3306 -j ACCEPT
-A OUTPUT -m state --state NEW -m tcp -p tcp --dport 3306 -j ACCEPT
然后执行下面命令,使防火墙立即生效。
service iptables restart
第二种:关闭防火墙
通过下面两个命令使防火墙关闭,并且永远不起作用。
service iptables stop
chkconfig iptables off
我们在这里为了方便,采用第二种方法,执行效果如下。
第九步:验证MySQL数据库编码是否为UTF-8。
连接上数据库之后,输入命令:"SHOW VARIABLES LIKE ‘%char%‘;"即可查看到现在你的数据库所使用的字符集了。
第十步:删除空用户,增强安全。
目前为止我们都是以"root"的身份进行的,但是当我们切换至普通用户登录MySQL时,直接输入"mysql"就进去了,我们刚才不是设置密码了吗?怎么就失效了呢?说明有空用户存在。先用命令"exit"退出,在按照下面命令进行修正。
解决步骤如下:
mysql -u root -p
mysql>delete from mysql.user where user=‘‘;
mysql>flush privileges;
mysql>exit
mysql
发现以"mysql"登录已经失效,必须以"mysql –u root -p"才能登录。
下面是执行效果截图:
MapReduce技术推出后,曾遭到关系数据库研究者的挑剔和批评,认为MapReduce不具备有类似于关系数据库中的结构化数据存储和处理能力。为此,Google和MapReduce社区进行了很多努力。一方面,他们设计了类似于关系数据中结构化数据表的技术(Google的BigTable,Hadoop的HBase)提供一些粗粒度的结构化数据存储和处理能力;另一方面,为了增强与关系数据库的集成能力,Hadoop MapReduce提供了相应的访问关系数据库库的编程接口。
MapReduce与MySQL交互的整体架构如下图所示。
图2-1整个环境的架构
具体到MapReduce框架读/写数据库,有2个主要的程序分别是 DBInputFormat和DBOutputFormat,DBInputFormat 对应的是SQL语句select,而DBOutputFormat 对应的是 Inster/update, 使用DBInputFormat和DBOutputForma时候需要实现InputFormat这个抽象类,这个抽象类含有getSplits()和 createRecordReader()抽象方法,在DBInputFormat类中由 protected String getCountQuery() 方法传入结果集的个数,getSplits()方法再确定输入的切分原则,利用SQL中的 LIMIT 和 OFFSET 进行切分获得数据集的范围 ,请参考DBInputFormat源码中public InputSplit[] getSplits(JobConf job, int chunks) throws IOException的方法,在DBInputFormat源码中createRecordReader()则可以按一定格式读取相应数据。
1)建立关系数据库连接
DBConfiguration类中提供了一个静态方法创建数据库连接:
public static void configureDB(Job job,String driverClass,String dbUrl,String userName,String Password)
其中,job为当前准备执行的作业,driverClasss为数据库厂商提供的访问其数据库的驱动程序,dbUrl为运行数据库的主机的地址,userName和password分别为数据库提供访问地用户名和相应的访问密码。
2)相应的从关系数据库查询和读取数据的接口
3)相应的向关系数据库直接输出结果的编程接口
数据库连接完成后,即可完成从MapReduce程序向关系数据库写入数据的操作。为了告知数据库将写入哪个表中的哪些字段,DBOutputFormat中提供了一个静态方法来指定需要写入的数据表和字段:
public static void setOutput(Job job,String tableName,String ... fieldName)
其中,tableName指定即将写入的数据表,后续参数将指定哪些字段数据将写入该表。
虽然Hadoop允许从数据库中直接读取数据记录作为MapReduce的输入,但处理效率较低,而且大量频繁地从MapReduce程序中查询和读取关系数据库可能会大大增加数据库的访问负载,因此DBInputFormat仅适合读取小量数据记录的计算和应用,不适合数据仓库联机数据分析大量数据的读取处理。
读取大量数据记录一个更好的解决办法是:用数据库中的Dump工具将大量待分析数据输出为文本数据文件,并上载到HDFS中进行处理。
1)首先创建要读入的数据
首先创建数据库"school",使用下面命令进行:
create database school;
然后通过以下几句话,把我们事先准备好的sql语句(student.sql事先放到了D盘目录)导入到刚创建的"school"数据库中。用到的命令如下:
use school;
source d:\student.sql
"student.sql"中的内容如下所示:
DROP TABLE IF EXISTS `school`.`student`;
CREATE TABLE `school`.`student` (
`id` int(11) NOT NULL default ‘0‘,
`name` varchar(20) default NULL,
`sex` varchar(10) default NULL,
`age` int(10) default NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `student` VALUES (‘201201‘, ‘张三‘, ‘男‘, ‘21‘);
INSERT INTO `student` VALUES (‘201202‘, ‘李四‘, ‘男‘, ‘22‘);
INSERT INTO `student` VALUES (‘201203‘, ‘王五‘, ‘女‘, ‘20‘);
INSERT INTO `student` VALUES (‘201204‘, ‘赵六‘, ‘男‘, ‘21‘);
INSERT INTO `student` VALUES (‘201205‘, ‘小红‘, ‘女‘, ‘19‘);
INSERT INTO `student` VALUES (‘201206‘, ‘小明‘, ‘男‘, ‘22‘);
执行结果如下所示:
查询刚才创建的数据库表"student"的内容。
结果发现显示是乱码,记得我当时是设置的UTF-8,怎么就出现乱码了呢?其实我们使用的操作系统的系统为中文,且它的默认编码是gbk,而MySQL的编码有两种,它们分别是:
【client】:客户端的字符集。客户端默认字符集。当客户端向服务器发送请求时,请求以该字符集进行编码。
【mysqld】:服务器字符集,默认情况下所采用的。
找到安装MySQL目录,比如我们的安装目录为:
E:\HadoopWorkPlat\MySQL Server 5.5
从中找到"my.ini"配置文件,最终发现my.ini里的2个character_set把client改成gbk,把server改成utf8就可以了。
【client】端:
[client]
port=3306
[mysql]
default-character-set=gbk
【mysqld】端:
[mysqld]
# The default character set that will be used when a new schema or table is
# created and no character set is defined
character-set-server=utf8
按照上面修改完之后,重启MySQL服务。
此时在Windows下面的数据库表已经准备完成了。
首先通过"FlashFXP"把我们刚才的"student.sql"上传到"/home/hadoop"目录下面,然后按照上面的语句创建"school"数据库。
查看我们上传的"student.sql"内容:
创建"school"数据库,并导入"student.sql"语句。
显示数据库"school"中的表"student"信息。
显示表"student"中的内容。
到此为止在"Windows"和"Linux"两种环境下面都创建了表"student"表,并初始化了值。下面就开始通过MapReduce读取MySQL库中表"student"的信息。
2)使MySQL能远程连接
MySQL默认是允许别的机器进行远程访问地,为了使Hadoop集群能访问MySQL数据库,所以进行下面操作。
mysql -u root -p
GRANT ALL PRIVILEGES ON *.* TO ‘root‘@‘%‘ IDENTIFIED BY ‘hadoop‘ WITH GRANT OPTION;
FLUSH PRIVILEGES;
执行结果如下图。
Windows下面:
Linux下面:
到目前为止,如果连接Win7上面的MySQL数据库还不行,大家还应该记得前面在Linux下面关掉了防火墙,但是我们在Win7下对防火墙并没有做任何处理,如果不对防火墙做处理,即使执行了上面的远程授权,仍然不能连接。下面是设置Win7上面的防火墙,使远程机器能通过3306端口访问MySQL数据库。
解决方案:只要在‘入站规则‘上建立一个3306端口即可。
执行顺序:控制面板à管理工具à高级安全的Windows防火墙à入站规则
然后新建规则à选择‘端口‘à在‘特定本地端口‘上输入一个‘3306‘ à选择‘允许连接‘=>选择‘域‘、‘专用‘、‘公用‘=>给个名称,如:MySqlInput
3)对JDBC的Jar包处理
因为程序虽然用Eclipse编译运行但最终要提交到Hadoop集群上,所以JDBC的jar必须放到Hadoop集群中。有两种方式:
(1)在每个节点下的${HADOOP_HOME}/lib下添加该包,重启集群,一般是比较原始的方法。
我们的Hadoop安装包在"/usr/hadoop",所以把Jar放到"/usr/hadoop/lib"下面,然后重启,记得是Hadoop集群中所有的节点都要放,因为执行分布式是程序是在每个节点本地机器上进行。
(2)在Hadoop集群的分布式文件系统中创建"/lib"文件夹,并把我们的的JDBC的jar包上传上去,然后在主程序添加如下语句,就能保证 Hadoop集群中所有的节点都能使用这个jar包。因为这个jar包放在了HDFS上,而不是本地系统,这个要理解清楚。
DistributedCache.addFileToClassPath(new Path("/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
我们用的JDBC的jar如下所示:
mysql-connector-java-5.1.18-bin.jar
通过Eclipse下面的DFS Locations进行创建"/lib"文件夹,并上传JDBC的jar包。执行结果如下:
备注:我们这里采用了第二种方式。
4)源程序代码如下所示
package com.hebut.mr;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
public class ReadDB {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, StudentRecord, LongWritable, Text> {
// 实现map函数
public void map(LongWritable key, StudentRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}
}
public static class StudentRecord implements Writable, DBWritable {
public int id;
public String name;
public String sex;
public int age;
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
this.sex = Text.readString(in);
this.age = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
Text.writeString(out, this.sex);
out.writeInt(this.age);
}
@Override
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
this.sex = result.getString(3);
this.age = result.getInt(4);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
stmt.setString(3, this.sex);
stmt.setInt(4, this.age);
}
@Override
public String toString() {
return new String("学号:" + this.id + "_姓名:" + this.name
+ "_性别:"+ this.sex + "_年龄:" + this.age);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(ReadDB.class);
// 这句话很关键
conf.set("mapred.job.tracker", "192.168.1.2:9001");
// 非常重要,值得关注
DistributedCache.addFileToClassPath(new Path(
"/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
// 设置输入类型
conf.setInputFormat(DBInputFormat.class);
// 设置输出类型
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
// 设置Map和Reduce类
conf.setMapperClass(Map.class);
conf.setReducerClass(IdentityReducer.class);
// 设置输出目录
FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));
// 建立数据库连接
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.1.24:3306/school", "root", "hadoop");
// 读取"student"表中的数据
String[] fields = { "id", "name", "sex", "age" };
DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);
JobClient.runJob(conf);
}
}
备注:由于Hadoop1.0.0新的API对关系型数据库暂不支持,只能用旧的API进行,所以下面的"向数据库中输出数据"也是如此。
5)运行结果如下所示
经过上面的设置后,已经通过连接Win7和Linux上的MySQL数据库,执行结果都一样。唯独变得就是代码中"DBConfiguration.configureDB"中MySQL数据库所在机器的IP地址。
基于数据仓库的数据分析和挖掘输出结果的数据量一般不会太大,因而可能适合于直接向数据库写入。我们这里尝试与"WordCount"程序相结合,把单词统计的结果存入到关系型数据库中。
1)创建写入的数据库表
我们还使用刚才创建的数据库"school",只是在里添加一个新的表"wordcount",还是使用下面语句执行:
use school;
source sql脚本全路径
下面是要创建的"wordcount"表的sql脚本。
DROP TABLE IF EXISTS `school`.`wordcount`;
CREATE TABLE `school`.`wordcount` (
`id` int(11) NOT NULL auto_increment,
`word` varchar(20) default NULL,
`number` int(11) default NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
执行效果如下所示:
2)程序源代码如下所示
package com.hebut.mr;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
public class WriteDB {
// Map处理过程
public static class Map extends MapReduceBase implements
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
// Combine处理过程
public static class Combine extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
// Reduce处理过程
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, WordRecord, Text> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<WordRecord, Text> collector, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
WordRecord wordcount = new WordRecord();
wordcount.word = key.toString();
wordcount.number = sum;
collector.collect(wordcount, new Text());
}
}
public static class WordRecord implements Writable, DBWritable {
public String word;
public int number;
@Override
public void readFields(DataInput in) throws IOException {
this.word = Text.readString(in);
this.number = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, this.word);
out.writeInt(this.number);
}
@Override
public void readFields(ResultSet result) throws SQLException {
this.word = result.getString(1);
this.number = result.getInt(2);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setString(1, this.word);
stmt.setInt(2, this.number);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WriteDB.class);
// 这句话很关键
conf.set("mapred.job.tracker", "192.168.1.2:9001");
DistributedCache.addFileToClassPath(new Path(
"/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
// 设置输入输出类型
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(DBOutputFormat.class);
// 不加这两句,通不过,但是网上给的例子没有这两句。
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
// 设置Map和Reduce类
conf.setMapperClass(Map.class);
conf.setCombinerClass(Combine.class);
conf.setReducerClass(Reduce.class);
// 设置输如目录
FileInputFormat.setInputPaths(conf, new Path("wdb_in"));
// 建立数据库连接
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.1.24:3306/school", "root", "hadoop");
// 写入"wordcount"表中的数据
String[] fields = { "word", "number" };
DBOutputFormat.setOutput(conf, "wordcount", fields);
JobClient.runJob(conf);
}
}
3)运行结果如下所示
测试数据:
(1)file1.txt
hello word
hello hadoop
(2)file2.txt
虾皮 hadoop
虾皮 word
软件 软件
运行结果:
我们发现上图中出现了"?",后来查找原来是因为我的测试数据时在Windows用记事本写的然后保存为"UTF-8",在保存时为了区分编码,自动在前面加了一个"BOM",但是不会显示任何结果。然而我们的代码把它识别为"?"进行处理。这就出现了上面的结果,如果我们在每个要处理的文件前面的第一行加一个空格,结果就成如下显示:
接着又做了一个测试,在Linux上面用下面命令创建了一个文件,并写上中文内容。结果显示并没有出现"?",而且网上说不同的记事本软件(EmEditor、UE)保存为"UTF-8"就没有这个问题。经过修改之后的Map类,就能够正常识别了。
// Map处理过程
public static class Map extends MapReduceBase implements
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
//处理记事本UTF-8的BOM问题
if (line.getBytes().length > 0) {
if ((int) line.charAt(0) == 65279) {
line = line.substring(1);
}
}
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
处理之后的结果:
从上图中得知,我们的问题已经解决了,因此,在编辑、更改任何文本文件时,请务必使用不会乱加BOM的编辑器。Linux下的编辑器应该都没有这个问 题。Windows下,请勿使用记事本等编辑器。推荐的编辑器是: Editplus 2.12版本以上; EmEditor; UltraEdit(需要取消‘添加BOM‘的相关选项); Dreamweaver(需要取消‘添加BOM‘的相关选项) 等。
对于已经添加了BOM的文件,要取消的话,可以用以上编辑器另存一次。(Editplus需要先另存为gb,再另存为UTF-8。) DW解决办法如下: 用DW打开指定文件,按Ctrl+Jà标题/编码à编码选择"UTF-8",去掉"包括Unicode签名(BOM)"勾选à保存/另存为,即可。
国外有一个牛人已经把这个问题解决了,使用"UnicodeInputStream"、"UnicodeReader"。
地址:http://koti.mbnet.fi/akini/java/unicodereader/
示例:Java读带有BOM的UTF-8文件乱码原因及解决方法
代码:http://download.csdn.net/detail/xia520pi/4146123
测试数据:
(1)file1.txt
MapReduce is simple
(2)file2.txt
MapReduce is powerful is simple
(3)file2.txt
Hello MapReduce bye MapReduce
运行结果:
到目前为止,MapReduce与关系型数据库交互已经结束,从结果中得知,目前新版的API还不能很好的支持关系型数据库的操作,上面两个例子都是使用的旧版的API。关于更多的MySQL操作,具体参考"Hadoop集群_第10期副刊_常用MySQL数据库命令_V1.0"。
本期历时五天,终于完成,期间遇到的关键问题如下:
从这几天对MapReduce的了解,发现其实Hadoop对关系型数据库的处理还不是很强,主要是Hadoop和关系型数据做的事不是同一类型,各有所特长。下面几期我们将对Hadoop里的HBase和Hive进行全面了解。
标签:
原文地址:http://www.cnblogs.com/jianliang-Wu/p/5619237.html