数据集成 代码示例 使用Java调用CDM服务的REST API创建、启动、查询、删除CDM作业的代码示例如下: package cdmclient; import java.io.IOException; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; public class CdmClient { private final static String DOMAINNAME"云账号名"; private final static String USERNAME"云用户名"; private final static String USERPASSWORD"云用户密码"; private final static String PROJECTID"项目ID"; private final static String CLUSTERID"CDM集群ID"; private final static String JOBNAME"作业名称"; private final static String FROMLINKNAME"源连接名称"; private final static String TOLINKNAME"目的连接名称"; private final static String IAMENDPOINT"IAM的Endpoint"; private final static String CDMENDPOINT"CDM的Endpoint"; private CloseableHttpClient httpclient; private String token; public CdmClient() { this.httpclient createHttpClient(); this.token login(); } private CloseableHttpClient createHttpClient() { CloseableHttpClient httpclient HttpClients.createDefault(); return httpclient; } private String login(){ HttpPost httpPost new HttpPost(" String json "{rn"+ ""auth": {rn"+ ""identity": {rn"+ ""methods": ["password"],rn"+ ""password": {rn"+ ""user": {rn"+ ""name": ""+USERNAME+"",rn"+ ""password": ""+USERPASSWORD+"",rn"+ ""domain": {rn"+ ""name": ""+DOMAINNAME+""rn"+ "}rn"+ "}rn"+ "}rn"+ "},rn"+ ""scope": {rn"+ ""project": {rn"+ ""name": "PROJECTNAME"rn"+ "}rn"+ "}rn"+ "}rn"+ "}rn"; try { StringEntity s new StringEntity(json); s.setContentEncoding("UTF8"); s.setContentType("application/json"); httpPost.setEntity(s); CloseableHttpResponse response httpclient.execute(httpPost); Header tokenHeader response.getFirstHeader("XSubjectToken"); String token tokenHeader.getValue(); System.out.println("Login successful"); return token; } catch (Exception e) { throw new RuntimeException("login failed.", e); } } /创建作业/ public void createJob(){ HttpPost httpPost new HttpPost(" /此处JSON信息比较复杂,可以先在作业管理界面上创建一个作业,然后单击作业后的“作业JSON定义”,复制其中的JSON内容,格式化为Java字符串语法,然后粘贴到此处。 JSON消息体中一般只需要替换连接名、导入和导出的表名、导入导出表的字段列表、源表中用于分区的字段。/ String json "{rn"+ ""jobs": [rn"+ "{rn"+ ""fromconnectorname": "genericjdbcconnector",rn"+ ""name": ""+JOBNAME+"",rn"+ ""toconnectorname": "genericjdbcconnector",rn"+ ""driverconfigvalues": {rn"+ ""configs": [rn"+ "{rn"+ ""inputs": [rn"+ "{rn"+ ""name": "throttlingConfig.numExtractors",rn"+ ""value": "1"rn"+ "}rn"+ "],rn"+ ""validators": [],rn"+ ""type": "JOB",rn"+ ""id": 30,rn"+ ""name": "throttlingConfig"rn"+ "}rn"+ "]rn"+ "},rn"+ ""fromlinkname": ""+FROMLINKNAME+"",rn"+ ""fromconfigvalues": {rn"+ ""configs": [rn"+ "{rn"+ ""inputs": [rn"+ "{rn"+ ""name": "fromJobConfig.schemaName",rn"+ ""value": "sqoop"rn"+ "},rn"+ "{rn"+ ""name": "fromJobConfig.tableName",rn"+ ""value": "city1"rn"+ "},rn"+ "{rn"+ ""name": "fromJobConfig.columnList",rn"+ ""value": "code&name"rn"+ "},rn"+ "{rn"+ ""name": "fromJobConfig.partitionColumn",rn"+ ""value": "code"rn"+ "}rn"+ "],rn"+ ""validators": [],rn"+ ""type": "JOB",rn"+ ""id": 7,rn"+ ""name": "fromJobConfig"rn"+ "}rn"+ "]rn"+ "},rn"+ ""tolinkname": ""+TOLINKNAME+"",rn"+ ""toconfigvalues": {rn"+ ""configs": [rn"+ "{rn"+ ""inputs": [rn"+ "{rn"+ ""name": "toJobConfig.schemaName",rn"+ ""value": "sqoop"rn"+ "},rn"+ "{rn"+ ""name": "toJobConfig.tableName",rn"+ ""value": "city2"rn"+ "},rn"+ "{rn"+ ""name": "toJobConfig.columnList",rn"+ ""value": "code&name"rn"+ "}, rn"+ "{rn"+ ""name": "toJobConfig.shouldClearTable",rn"+ ""value": "true"rn"+ "}rn"+ "],rn"+ ""validators": [],rn"+ ""type": "JOB",rn"+ ""id": 9,rn"+ ""name": "toJobConfig"rn"+ "}rn"+ "]rn"+ "}rn"+ "}rn"+ "]rn"+ "}rn"; try { StringEntity s new StringEntity(json); s.setContentEncoding("UTF8"); s.setContentType("application/json"); httpPost.setEntity(s); httpPost.addHeader("XAuthToken", this.token); httpPost.addHeader("XLanguage", "enus"); CloseableHttpResponse response httpclient.execute(httpPost); int status response.getStatusLine().getStatusCode(); if(status 200){ System.out.println("Create job successful."); }else{ System.out.println("Create job failed."); HttpEntity entity response.getEntity(); System.out.println(EntityUtils.toString(entity)); } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Create job failed.", e); } } /启动作业/ public void startJob(){ HttpPut httpPut new HttpPut(" String json ""; try { StringEntity s new StringEntity(json); s.setContentEncoding("UTF8"); s.setContentType("application/json"); httpPut.setEntity(s); httpPut.addHeader("XAuthToken", this.token); httpPut.addHeader("XLanguage", "enus"); CloseableHttpResponse response httpclient.execute(httpPut); int status response.getStatusLine().getStatusCode(); if(status 200){ System.out.println("Start job successful."); }else{ System.out.println("Start job failed."); HttpEntity entity response.getEntity(); System.out.println(EntityUtils.toString(entity)); } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Start job failed.", e); } } /循环查询作业运行状态,直到作业运行结束。/ public void getJobStatus(){ HttpGet httpGet new HttpGet(" try { httpGet.addHeader("XAuthToken", this.token); httpGet.addHeader("XLanguage", "enus"); boolean flag true; while(flag){ CloseableHttpResponse response httpclient.execute(httpGet); int status response.getStatusLine().getStatusCode(); if(status 200){ HttpEntity entity response.getEntity(); String msg EntityUtils.toString(entity); if(msg.contains(""status":"SUCCEEDED"")){ System.out.println("Job succeeded"); break; }else if (msg.contains(""status":"FAILED"")){ System.out.println("Job failed."); break; }else{ Thread.sleep(1000); } }else{ System.out.println("Get job status failed."); HttpEntity entity response.getEntity(); System.out.println(EntityUtils.toString(entity)); break; } } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Get job status failed.", e); } } /删除作业/ public void deleteJob(){ HttpDelete httpDelte new HttpDelete(" try { httpDelte.addHeader("XAuthToken", this.token); httpDelte.addHeader("XLanguage", "enus"); CloseableHttpResponse response httpclient.execute(httpDelte); int status response.getStatusLine().getStatusCode(); if(status 200){ System.out.println("Delete job successful."); }else{ System.out.println("Delete job failed."); HttpEntity entity response.getEntity(); System.out.println(EntityUtils.toString(entity)); } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Delete job failed.", e); } } /关闭/ public void close(){ try { httpclient.close(); } catch (IOException e) { throw new RuntimeException("Close failed.", e); } } public static void main(String[] args){ CdmClient cdmClient new CdmClient(); cdmClient.createJob(); cdmClient.startJob(); cdmClient.getJobStatus(); cdmClient.deleteJob(); cdmClient.close(); } }
来自: