本系列教程基于Kettle 8.1(pdi-ce-8.1.0.0-365)。大部分内容同样适用于Kettle 7.x版本。
章节目录:
本章说明
Kettle是提供了一些api的,我们可以通过这些api去执行Kettle作业、转换。除了执行作业Kettle还有其他很多api可供使用,本章只介绍作业的执行,其他api有兴趣的可以去探索探索。
搭建Kettle运行环境
首先需要搭建一个Kettle运行环境,很简单,就是从data-integration\lib\目录下复制部分核心jar包出来,导入到java项目(jdk1.8)中,本章节所需jar包如下(不要忘了数据库连接驱动):
代码示例(作业、转换、资源库)
这里就直接放代码了,包含作业文件、转换文件、资源库作业的执行示例。
package com.staroon.kettle.exec;
import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.logging.LogLevel; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.kdr.KettleDatabaseRepository; import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import java.util.HashMap; import java.util.Map;
public class RunKettleJob { public static void main(String[] args) {
Map<String, String> params = new HashMap<>(); params.put("filename", "runjob"); params.put("extend", "txt");
String kjbPath = "D:/kettle/jobs/Kettledoc/常规作业示例.kjb"; String ktrPath = "D:/kettle/jobs/Kettledoc/转换任务.ktr";
runRepoJob(params); }
public static void runRepoJob(Map<String, String> params) { try { KettleEnvironment.init(); KettleDatabaseRepository repository = new KettleDatabaseRepository(); DatabaseMeta databaseMeta = new DatabaseMeta( "kettle", "mysql", "jdbc", "127.0.0.1", "kettle", "3308", "root", "lwsjfwq" );
databaseMeta.getAttributes().put("EXTRA_OPTION_MYSQL.characterEncoding", "utf8");
if (databaseMeta.testConnection().startsWith("正确")) { System.out.println("数据库连接成功"); } else { System.out.println("数据库连接失败"); return; }
KettleDatabaseRepositoryMeta repositoryMeta = new KettleDatabaseRepositoryMeta( "kettle", "kettle", "Kettle Repository", databaseMeta ); repository.init(repositoryMeta); repository.connect("admin", "admin");
RepositoryDirectoryInterface dir = repository.findDirectory("/批处理/");
JobMeta jobMeta = repository.loadJob("资源库作业示例", dir, null, null);
for (String param : params.keySet()) { jobMeta.setParameterValue(param, params.get(param)); }
Job job = new Job(repository, jobMeta); job.setLogLevel(LogLevel.DEBUG); job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { throw new Exception("作业执行出错"); } } catch (Exception e) { e.printStackTrace(); } }
public static void runTrans(Map<String, String> params, String ktrPath) { try { KettleEnvironment.init(); EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(ktrPath);
for (String param : params.keySet()) { transMeta.setParameterValue(param, params.get(param)); }
Trans trans = new Trans(transMeta);
trans.setLogLevel(LogLevel.DEBUG);
trans.execute(null); trans.waitUntilFinished(); if (trans.getErrors() > 0) { throw new Exception("转换执行出错"); } } catch (Exception e) { e.printStackTrace(); } }
public static void runJob(Map<String, String> params, String kjbPath) { try { KettleEnvironment.init(); JobMeta jobMeta = new JobMeta(kjbPath, null);
for (String param : params.keySet()) { jobMeta.setParameterValue(param, params.get(param)); }
Job job = new Job(null, jobMeta);
job.setLogLevel(LogLevel.DEBUG);
job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { throw new Exception("作业执行出错"); } } catch (Exception e) { e.printStackTrace(); } } }
|
本章完!