标签:codeblock gobject deb obj source http dbn 插件 abd
Java 类名:com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp
Python 类名:CatalogSourceBatchOp
Catalog描述了数据库的属性和数据库的位置, 支持Mysql, Derby, Sqlite, Hive.
在使用时,需要先下载插件,详情请看https://www.yuque.com/pinshu/alink_guide/czg4cx
定义分成三步:
第一步,定义Catalog
数据库 |
Java 接口 |
Derby |
DerbyCatalog(String catalogName, String defaultDatabase, String derbyVersion, String derbyPath) |
MySql |
MySqlCatalog(String catalogName, String defaultDatabase, String mysqlVersion,String mysqlUrl, String port, String userName, String password) |
Sqlite |
SqliteCatalog(String catalogName, String defaultDatabase, String sqliteVersion, String dbUrl) |
Hive |
HiveCatalog(String catalogName, String defaultDatabase, String hiveVersion, String hiveConfDir) |
示例: derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+‘/‘+DERBY_DB) 各插件提供的版本: Hive:2.3.4 MySQL: 5.1.27 Derby: 10.6.1.0 SQLite: 3.19.3 odps: 0.36.4-public
第二步, 定义CatalogObject
dbName = "sqlite_db" tableName = "table" # 第一个参数是Catalog, 第二个参数是DB/Project catalogObject = CatalogObject(derby, ObjectPath(dbName, tableName))
第三步,定义Source和Sink
名称 |
中文名称 |
描述 |
类型 |
是否必须? |
默认值 |
catalogObject |
catalog object |
catalog object |
String |
? |
以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!
python
derbyFolder = "*" DERBY_SCHEMA = "derby_schema" DERBY_DB = "derby_db" derby = DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder+‘/‘+DERBY_DB) catalogObject = CatalogObject(derby, ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3")) catalogSinkBatchOp = CatalogSinkBatchOp() .setCatalogObject(catalogObject) source.link(catalogSinkBatchOp) BatchOperator.execute() catalogSourceBatchOp = CatalogSourceBatchOp() .setCatalogObject(catalogObject) catalogSourceBatchOp.print()
String derbyFolder = "*"; String DERBY_SCHEMA = "derby_schema"; String DERBY_DB = "derby_db"; DerbyCatalog derby = new DerbyCatalog("derby_test_catalog", DERBY_SCHEMA, "10.6.1.0", derbyFolder + ‘/‘ + DERBY_DB); CatalogObject catalogObject = new CatalogObject(derby, new ObjectPath("test_catalog_source_sink", "test_catalog_source_sink3")); catalogSinkBatchOp catalogSinkStreamOp = new catalogSinkBatchOp() .setCatalogObject(catalogObject); source.link(catalogSinkStreamOp); StreamOperator.execute(); CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp() .setCatalogObject(catalogObject); catalogSourceStreamOp.print(); StreamOperator.execute();
sqliteFolder = "*" SQLITE_SCHEMA = "sqlite_schema" SQLITE_DB = "sqlite_db" sqlite = SqliteCatalog("sqlite_test_catalog", None, "3.19.3", [sqliteFolder+‘/‘+SQLITE_DB]) catalogObject = CatalogObject(sqlite, ObjectPath(SQLITE_DB, "test_catalog_source_sink3")) catalogSinkBatchOp = CatalogSinkBatchOp() .setCatalogObject(catalogObject) source.link(catalogSinkBatchOp) BatchOperator.execute() catalogSourceBatchOp = CatalogSourceBatchOp() .setCatalogObject(catalogObject) catalogSourceBatchOp.print()
String sqliteFolder = "*"; String SQLITE_SCHEMA = "sqlite_schema"; String SQLITE_DB = "sqlite_db"; SqliteCatalog sqlite = new SqliteCatalog("sqlite_test_catalog", null, "3.19.3", sqliteFolder + ‘/‘ + SQLITE_DB); CatalogObject catalogObject = new CatalogObject(sqlite, new ObjectPath(SQLITE_DB, "test_catalog_source_sink3")); CatalogSinkBatchOp catalogSinkStreamOp = CatalogSinkBatchOp() .setCatalogObject(catalogObject); source.link(catalogSinkBatchOp); StreamOperator.execute(); CatalogSourceBatchOp catalogSourceStreamOp = new CatalogSourceBatchOp() .setCatalogObject(catalogObject); catalogSourceStreamOp.print(); StreamOperator.execute();
ALINK(十):加载数据集 (三)Catalog读入 (CatalogSourceBatchOp)
标签:codeblock gobject deb obj source http dbn 插件 abd
原文地址:https://www.cnblogs.com/qiu-hua/p/14887400.html