Kettle本身提供了很多组件,多个组件一起构成一个transformation(转换),多个转换一起构成一个job(任务)。kettle的组件已经非常丰富,在组件不满足需求时可以在kettle上面开发自己的组件,kettle支持的组件开发如下:
Kettle中的插件包含两部分:
一是系统本身就已经实现的功能点,在源码目录src中说明,如kettle-steps.xml;
二是系统之外开发的插件,在plugins目录对应插件目录下的plugins.xml说明,如plugins/steps/S3CsvInput/plugins.xml。
1.1系统集成插件定义( step )
内容 |
位置 |
插件说明信息 |
engine/src/kettle-steps.xml,所有插件集中说明 |
插件源码 |
src-engine与src-ui下,org.pentaho.di.steps.插件名 |
插件图片 |
插件说明xml中说明 |
插件界面文字说明 |
org.pentaho.di.steps.插件名.messages |
插件说明信息中包括描述信息、类名(包括package,反射用)、父级目录(Spoon左侧栏目录)、提示信息和图片信息。Kettle使用国家化方式编程,所以软件中的所有文字描述均由messages_**.properties提供。
系统集成插件配置说明kettle-steps.xml结构:
1.2扩展插件定义(step)
所有新开发的扩展插件,均放在同一的目录下进行管理,插件管理模块会自动去该目录下进行搜索查找。插件目录结构如下所示(以S3CsvInput步骤为例):
Plugins/steps/S3CsvInput
扩展插件定义
内容 |
位置 |
插件说明信息 |
plugins/steps/插件名称/plugin.xml |
插件源码 |
plugins/steps/插件名称/*.jar |
插件图片 |
plugins/steps/插件名称/ |
插件依赖包 |
plugins/steps/插件名称/ |
扩展插件与系统集成插件的说明内容相似,扩展插件增加ID属性和依赖属性,同时他的目录结构、描述信息和提示信息均能进行国际化配置。
1.3扩展插件配置说明plugin.xml结构及参数说明:
ID:在kettle插件中必须全局唯一,因为被kettle序列化了,所以不要随便改变
Iconfile: kettle中插件显示的图片,必须是png图片
Description:插件描叙,显示在树形菜单里面。
Tooltip:树形菜单中,鼠标滑过的时候显示的提示信息
Category:插件显示的父目录
Classname:元数据类
Library:指明了插件需要加载所依赖的jar包
2. Kettle转换步骤扩展插件的开发
2.1 Kettle转换步骤插件至少需要实现四个接口
org.pentaho.di.trans.step.StepMetaInterface:元数据的处理,加载xml,校验,主要是对一个步骤的定义的基本数据。
org.pentaho.di.trans.step. StepDataInterface:数据处理涉及的具体数据,以及对数据的状态的设置和回收。
org.pentaho.di.trans.step. StepInterface:负责数据处理,转换和流转。这里面主要由processRow()方法来处理。
org.pentaho.di.trans.step. StepDialogInterface:提供GUI/dialog,编辑步骤的元数据。
对于以上四个接口的实现,都有相应的基类,具体的步骤只需要继承基类和实现相应的接口即可。
Step扩展接口:
Java 接口 |
基类 |
主要功能 |
StepMetaInterface |
BaseStepMeta |
存储step设置信息 验证step设置信息 序列化step设置信息 提供获取step类的方法 |
StepDialogInterface |
BaseStepDialog |
step属性信息配置窗口 |
StepInterface |
BaseStep |
处理rows |
StepDataInterface |
BaseStepData |
为数据处理提高数据存储 |
2.2 Kettle转换步骤插件各个类命名推荐规则
stepInterface的实现类以插件的功能相关命名:*.java
stepDataInterface的实现类:*Data.java
stepMetaInterface的实现类:*Meta.java
StepDialogInterface的实现类:*Dialog.java
2.3 TemplateStepPlugin插件模板各个类源码部分方法说明:
一、元数据类:
下面是元数据的几个关键的方法,注意元数据类里面用私有成员变量outputField存储了下一个步骤的输出字段。
// keeptrack of the step settings
public String getOutputField()
public void setOutputField(…)
public void setDefault()
// xml类型元数据的写入与加载
public String getXML()
public void loadXML(…)
// 资源库类型元数据的写入与加载
public void readRep(…)
public void saveRep(…)
//提供有关步骤如何影响处理行字段结构的信息
public void getFields(…)
//为步骤进行扩展有效性检查
public void check(…)
// 为step,data
和dialog
类提供实例
public StepInterface getStep(…)
public StepDataInterface getStepData()
public StepDialogInterface getDialog(…)
TemplateStepMeta元数据类其实还有很多方面,不过大多被他的父类BaseStepMeta给默认实现了,这些默认的实现足以使我们的元数据类工作良好。
二、对话框类:
TemeplateStepDialog为步骤实现了对话框的设置,kettle的用户界面部件是使用的eclipse的swt框架。在开发过程中,一个对话框对象拥有一个元数据对象,它记录了应该从哪里读取配置?应该把设置好的配置保存在哪里?
三、步骤类:
步骤类是实际的处理和转换工作的地方。因为大部分样本代码已经由父类BaseStep提供了,插件开发者只需要关注下面几个特定的方法就行了。
//初始化和关闭
public boolean init(…)
public void dispose(..)
// 处理行
public void run()
public boolean processRow(..) //步骤主要数据处理方法
Init()方法在转换执行前被kettle调用,转换必须在所有步骤初始化成功时才真正执行。dispose()方法是在步骤执行完之后执行(非转换执行完),它完成资源的关闭,像文件句柄、缓存等等。
run()方法在实际处理记录集的时候调用,其实就是个循环,由上游获取的每条记录交由processRow()方法处理,当此步骤没有数据处理或转换被停止时退出循环。
processRow()方法在处理单条记录的时候被调用。这个方法通常通过调用getRow()来获取需要处理的单条记录。这个方法如果有需要将会被阻塞,例如当此步骤希望放慢脚步处理数据时。processRow()随后的流程将执行转换工作并调用putRow()方法将处理过的记录放到它的下游步骤。
四、数据类:
大多数步骤都需要临时的缓冲或者临时的存储。数据类就是这些数据合适的存放位置。每一个执行线程拥有的一个数据类的实例,所以它们能在独立的空间里面运行。TemplateStepData继承自BaseStepData,作为一个经验法则,不要将non-constant字段放置BaseStepData类里面,如果一定要,请将它最好放置在派生类TemplateStepData里面.
2.4 插件配置
在plugins\steps下新建文件夹TemplateStepPlugin,将编写好的插件源码编译打包成一个jar包,连同插件显示图片和写好的plugin.xml一起放入TemplateStepPlugin。
3. 插件注册与查找
Spoon在启动的时候会对所有插件进行注册,并保存在PluginRegistry类里面。平台通过查找PluginRegistry注册表获取插件信息。Kettle安装插件需要进行重启,卸载插件也只需简单的删除plugins目录结构下对应的文件即可。
插件注册时序图:
PluginRegistry首选注册本系统的插件类型处理类,源码中注册了10种类型:PluginRegistry.addPluginType(RowDistributionPluginType.getInstance());
PluginRegistry.addPluginType(StepPluginType.getInstance());
PluginRegistry.addPluginType(PartitionerPluginType.getInstance());
PluginRegistry.addPluginType(JobEntryPluginType.getInstance());
PluginRegistry.addPluginType(LogTablePluginType.getInstance());
PluginRegistry.addPluginType(RepositoryPluginType.getInstance());
PluginRegistry.addPluginType(LifecyclePluginType.getInstance());
PluginRegistry.addPluginType(KettleLifecyclePluginType.getInstance());
PluginRegistry.addPluginType(ImportRulePluginType.getInstance());
PluginRegistry.addPluginType(CartePluginType.getInstance());
此处以StepPluginType为例。注册类型处理类后,PluginRegistry按照不同的类型进行插件搜索(模板模式),基类BasePluginType提供了本地搜索、jar搜索、xml信息搜索3种钩子。根据搜索结果,按照不同的插件类型存储在PluginRegistry中。
PluginRegistry提供了插件查找功能,准确的来说是插件信息的查找功能。以steps在左侧功能栏里面的显示为例,进行插件查找的说明。PluginRegistry提供了getPlugins获取指定插件类型列表、getPlugin获取指定成名插件、getCateories获取目录结构、getClass获取指定插件类等方法。
左侧显示由Spoon.refreshCoreObjects()函数实现,如果选择时trans相关的内容,将显示所有的step插件。流程图如下所示:
实现代码:
if (showTrans) {
selectionLabel.setText(BaseMessages.getString(PKG, "Spoon.Steps"));
PluginRegistry registry = PluginRegistry.getInstance();
final List<PluginInterface> basesteps = registry.getPlugins(StepPluginType.class); //获取插件信息
final List<String> basecat = registry.getCategories(StepPluginType.class); //获取目录信息
if( stepFilter == null )
{
stepFilter = new StepFilterConfigure();
}
//items filter...
for (int i = 0; i <basecat.size(); i++) { //依次添加获取到的目录
StringtmpCatName = basecat.get(i).toLowerCase();
if(stepFilter.getTransCategoriesFilteredList().contains(tmpCatName))
{
//filter categories
continue;
}
TreeItem item = new TreeItem(coreObjectsTree, SWT.NONE);
item.setText(basecat.get(i));
item.setImage(GUIResource.getInstance().getImageArrow());
//replace icon
Image newIcon = null;
String iconName = "cate_" + basecat.get(i)+ ".png";
String path = "ui/images/";
String iconPath = path + iconName;
newIcon = GUIResource.getInstance().getImageByName(iconPath);
if( newIcon != null )
{
item.setImage(newIcon);
}
for (int j = 0; j <basesteps.size(); j++) { //依次添加获取到的该目录下的插件
if(basesteps.get(j).getCategory().equalsIgnoreCase(basecat.get(i)) && !stepFilter.getTransStepFilteredList().contains(basesteps.get(j).getName())){
final Image stepimg = GUIResource.getInstance().getImagesStepsSmall()
.get(basesteps.get(j).getIds()[0]);
String pluginName =basesteps.get(j).getName();
String pluginDescription =basesteps.get(j).getDescription();
if (!filterMatch(pluginName) &&!filterMatch(pluginDescription))
continue;
TreeItem stepItem = new TreeItem(item, SWT.NONE);
stepItem.setImage(stepimg);
stepItem.setText(pluginName);
stepItem.addListener(SWT.Selection, new Listener() {
public void handleEvent(Eventarg0) {
//System.out.println("Tree item Listener fired");
}
});
coreStepToolTipMap.put(pluginName,pluginDescription);
}
}
}
}
原文地址:http://blog.csdn.net/bluebelfast/article/details/43192995