[关闭]
@zhou-si 2016-09-11T10:14:58.000000Z 字数 6878 阅读 2411

solr按小时分片抽取一天数据

solr


简介

最近几天项目组有个紧急任务:从solr中抽取数据入hive库,我也不清楚为何要有这种业务需求,其实solr里是可以配置数据存储路径的,直接指定为hdfs某个目录也很简单。由于项目紧我也只是把solr数据抽取到windows上然后传回hdfs,其实可以直接打成jar包指定写出路径的……只介绍代码的实现和所需要的jar包

环境和所需jar包

jdk8 apache-solr-solrj-3.4.0.jar commons-httpclient-3.1.jar commons-io-2.5.jar commons-lang-2.6.jar commons-logging-1.1.1.jar httpclient-4.4.1.jar httpcore-4.4.1.jar httpmime-4.2.3.jar noggit-0.5.jar slf4j-api-1.7.7.jar solr-solrj-6.0.0.jar zookeeper-3.4.6.jar

代码

  1. package org.apache.solr.test.base.service.impl;
  2. import java.io.File;
  3. import java.io.FileWriter;
  4. import java.util.Date;
  5. import java.util.HashSet;
  6. import java.util.List;
  7. import org.apache.commons.lang.StringUtils;
  8. import org.apache.commons.lang.time.DateFormatUtils;
  9. import org.apache.solr.client.solrj.SolrClient;
  10. import org.apache.solr.client.solrj.SolrQuery;
  11. import org.apache.solr.client.solrj.SolrRequest;
  12. import org.apache.solr.client.solrj.impl.CloudSolrClient;
  13. import org.apache.solr.client.solrj.impl.HttpSolrClient;
  14. import org.apache.solr.client.solrj.response.QueryResponse;
  15. import org.apache.solr.common.SolrDocument;
  16. import org.apache.solr.common.SolrDocumentList;
  17. import org.apache.solr.common.params.FacetParams;
  18. import org.apache.solr.common.params.GroupParams;
  19. import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException;
  20. /**
  21. * 创建时间:20160728
  22. * 功能:抽取solr数据写到文件
  23. * @author 圣斗士宙斯
  24. *
  25. */
  26. public abstract class BaseSolrServiceImpl_ec {
  27. protected static SolrClient client;
  28. /**
  29. * 初始化solr连接
  30. */
  31. private static void init() {
  32. try {
  33. client = new CloudSolrClient(
  34. "zk_host1:port,zk_host2:port");//zookeeper的ip和端口
  35. ((CloudSolrClient) client).setDefaultCollection("xxx");
  36. ((CloudSolrClient) client).connect();
  37. } catch (Exception e) {
  38. client = new HttpSolrClient("http://solr_ip:port/solr/xxx");//solr库的链接
  39. }
  40. }
  41. /**
  42. *查询方法(可以不做排序过滤等)
  43. */
  44. public static QueryResponse query(String query, String fields,
  45. int pageNumber, int size, List<SolrQuery.SortClause> fieldSorts,
  46. String... conditions) throws Exception {
  47. SolrQuery solrQuery = new SolrQuery();
  48. //以下都是些控制条件,不用也可以删掉
  49. if (StringUtils.isBlank(query)) {
  50. solrQuery.setQuery("*:*");
  51. // solrQuery.addFacetQuery("*:*");无条件全表扫
  52. } else {
  53. solrQuery.setQuery(query);
  54. }
  55. size = size <= 0 ? 10 : size;
  56. pageNumber = pageNumber < 1 ? 0 : (pageNumber - 1) * size;
  57. solrQuery.setStart(pageNumber);
  58. solrQuery.setRows(size);
  59. if (StringUtils.isNotBlank(fields)) {
  60. solrQuery.setFields(fields);
  61. }
  62. if (fieldSorts != null) {
  63. for (SolrQuery.SortClause fieldSort : fieldSorts) {
  64. solrQuery.addSort(fieldSort);
  65. }
  66. }
  67. if (conditions != null) {
  68. if (conditions.length >= 1 && StringUtils.isNotBlank(conditions[0])) {
  69. //solrQuery.setFilterQueries(conditions[0].split(",|,"));
  70. solrQuery.add("shards", conditions[0]);
  71. }
  72. if (conditions.length >= 2 && StringUtils.isNotBlank(conditions[1])) {
  73. String[] split = conditions[1].split(",|,");
  74. solrQuery.setParam("group", true);
  75. solrQuery.setParam("group.ngroups", true);
  76. solrQuery.setParam("group.field", split);
  77. solrQuery.setParam(GroupParams.GROUP_SORT, "pub_time desc");
  78. }
  79. if (conditions.length >= 3 && StringUtils.isNotBlank(conditions[2])) {
  80. String[] split = conditions[2].split(";|;");
  81. solrQuery.set(FacetParams.FACET, true);
  82. if (split.length > 1) {
  83. solrQuery.set("facet.date", split[0]);
  84. solrQuery.set("facet.date.start", split[1]);
  85. solrQuery.set("facet.date.end", split[2]);
  86. solrQuery.set("facet.date.gap", split[3]);
  87. } else {
  88. solrQuery.set("facet.field", split[0]);
  89. }
  90. }
  91. if (conditions.length >= 5 && StringUtils.isNotBlank(conditions[4])) {
  92. solrQuery.setHighlight(true);
  93. solrQuery.setParam("hl.fl", conditions[4]);
  94. solrQuery.setParam("hl.fragsize", "160");
  95. solrQuery.setParam("hl.snippets", "2");
  96. solrQuery.setParam("hl.mergeContiguous", "true");
  97. solrQuery
  98. .setHighlightSimplePre("<font class='word-highlight'>");
  99. solrQuery.setHighlightSimplePost("</font>");
  100. }
  101. if (conditions.length >= 6 && StringUtils.isNotBlank(conditions[5])) {
  102. solrQuery.setParam("hl.q", conditions[5]);
  103. }
  104. }
  105. if (client instanceof CloudSolrClient) {
  106. if (conditions.length >= 4 && StringUtils.isNotBlank(conditions[3])) {
  107. solrQuery.add("shards", conditions[3]);
  108. }
  109. }
  110. QueryResponse queryResponse = client.query(solrQuery,
  111. SolrRequest.METHOD.POST);
  112. return queryResponse;
  113. }
  114. public static void main(String[] args) {
  115. //调用初始化连接方法
  116. init();
  117. //接收传入的参数,这里只是对时间做限制,一天内24小时
  118. String whereColumn = args[0];// crawltime
  119. String time1 = args[1];// 2016-01-17
  120. String time2 = time1;
  121. String partitions = args[2];
  122. //由于按天分页抽取solr库的数据会非常慢,分为24此抽取,分页数按照当前小时的总数量/一页显示的数量
  123. try {
  124. QueryResponse response = null;
  125. FileWriter fw = null;// BufferedWriter FileWriter
  126. StringBuilder sb = new StringBuilder();// StringBuffer
  127. for (int i = 0; i <= 23; i++) {
  128. // String stringHour ;
  129. if (i <= 9) {
  130. // stringHour = "0"+(i+=1);
  131. time1 = time1 + "T0" + i + ":00:00Z";
  132. if (i+1 == 10) {
  133. time2 = time2 + "T" + (i+1) + ":00:00Z";
  134. }else {
  135. time2 = time2 + "T0" + (i+1) + ":00:00Z";
  136. }
  137. } else {
  138. // stringHour = (i+=1)+"";
  139. time1 = time1 + "T" + i + ":00:00Z";
  140. time2 = time2 + "T" + (i+1) + ":00:00Z";
  141. }
  142. response = query(whereColumn + ":[" + time1 + " TO "
  143. + time2 + "] && title:*手机*"
  144. + " || ( weburl:*mobile.zol.com.cn* || weburl:*mo*)", null,
  145. 0, 5000, null,partitions);
  146. int numFound = (int) response.getResults().getNumFound();
  147. int fors = (int) Math.ceil((numFound / 5000));
  148. String string = time1.substring(0, 10).replace("-", "");
  149. System.out.println("第 "+time1 +"小时到第"+time2+"小时 统计条数: "+numFound);// 统计条数
  150. System.out.println("第 "+time1 +"小时到第"+time2+"小时 分页总数: "+(int) Math.ceil((numFound / 5000)));// 分页总数
  151. File file_Dir = new File("C:\\Users\\zhuqitian\\Desktop\\solrDate_ec\\"+string+"\\");
  152. if (!file_Dir.exists()) {
  153. file_Dir.mkdirs();
  154. }
  155. File file = new File("C:\\Users\\zhuqitian\\Desktop\\solrDate_ec\\"+string+"\\solrDate_ec_"
  156. + string + "_"+i+".txt");
  157. if (!file.exists()) {
  158. file.createNewFile();
  159. }
  160. fw = new FileWriter(file, true);// FileOutputStream
  161. for (int k = 0; k <= fors; k++) {
  162. SolrDocumentList docs1 = response.getResults();
  163. response = query(whereColumn + ":[" + time1 + " TO "
  164. + time2 + "]", null, k, 5000, null);
  165. for (SolrDocument sd : docs1) {
  166. sb.append(sd.getFieldValue("doc_id") + "!@#");
  167. sb.append(sd.getFieldValue("dedup_id") + "!@#");
  168. sb.append(sd.getFieldValue("url") + "!@#");
  169. sb.append(sd.getFieldValue("author") + "!@#");
  170. sb.append(sd.getFieldValue("weburl") + "!@#");
  171. sb.append(sd.getFieldValue("media") + "!@#");
  172. sb.append(sd.getFieldValue("region") + "!@#");
  173. sb.append(sd.getFieldValue("source") + "!@#");
  174. sb.append(sd.getFieldValue("create_time") + "!@#");
  175. sb.append(sd.getFieldValue("summary") + "!@#");
  176. sb.append(sd.getFieldValue("keywords") + "!@#");
  177. sb.append(sd.getFieldValue("simple_keywords") + "!@#");
  178. sb.append(sd.getFieldValue("sentiment") + "!@#");
  179. sb.append(sd.getFieldValue("sentiment_val") + "!@#");
  180. sb.append(sd.getFieldValue("comment_count") + "!@#");
  181. sb.append(sd.getFieldValue("click_count") + "!@#");
  182. sb.append(sd.getFieldValue("crawltime") + "!@#");
  183. sb.append(sd.getFieldValue("title") + "!@#");
  184. sb.append(sd.getFieldValue("pub_time") + "!@#");
  185. sb.append("\n");
  186. // bytes = sb.toString().getBytes();
  187. }
  188. fw.write(sb.toString());
  189. System.out.println("第 "+ i +" 小时到第 "+(i+1)+"小时: 第 " + k +" 页写入完毕!文件名为 " + file.getName());
  190. sb.delete(0, sb.length());
  191. }
  192. time1 = args[1];
  193. time2 = time1;
  194. }
  195. fw.close();
  196. } catch (Exception e) {
  197. e.printStackTrace();
  198. }
  199. }
  200. }

效果

startTime: 2016-05-15T00:00:00Z
endTime: 2016-05-15T01:00:00Z
统计条数: 62308
分页总数: 12
startTime: 2016-05-15T01:00:00Z
endTime: 2016-05-15T02:00:00Z
统计条数: 41098
分页总数: 8
…………
第 0 小时到第 1小时: 第 0 页写入完毕!文件名为solrDate_ec_20160515_0.txt
第 0 小时到第 1小时: 第 1 页写入完毕!文件名为solrDate_ec_20160515_0.txt
第 0 小时到第 1小时: 第 2 页写入完毕!文件名为solrDate_ec_20160515_0.txt
…………
执行文件夹中会列出每个小时写进来的文件
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注