[关闭]
@xiaoxiaowang 2017-08-20T16:48:57.000000Z 字数 4954 阅读 753

  最近一直在优化海量数据(几千万)处理这一块。我使用的是java提供的ExecuterPool线程池来实现的,这几天在研究如何使用生产者和消费者模式去解决类似处理数据的问题,下面是思考与实现的过程~

思考

  简单的介绍下生产者与消费者模式,详细的可以去google。
吃过快餐肯定会遇到这样的场景:

你去打土豆丝,拿着大勺的大妈就会往你的盘子里放上一勺土豆丝,后厨的师傅会时不时的把做好的土豆丝端上来,有时候你去晚了,然而土豆丝师傅还在做,你又很想吃,那就只能稍等一会了,有时候人很多,那么可能就会有两三个大妈负责盛菜。

好了,来分析下上面的场景,一些名词在下面的程序中有出现

好的,然后回到处理数据的问题上,我简单画了一下过程:

think

如果你有处理过数据,这个过程你肯定会遇到

生产者消费者的实现

下面是看了http://blog.chinaunix.net/uid-20680669-id-3602844.html博客 之后结合上图写出的代码。

Storage

  1. public class Storage {
  2. private List<String> cacheList; //工单数据列表
  3. public boolean readOK;
  4. /**
  5. * 默认构造函数
  6. */
  7. public Storage() {
  8. cacheList = new ArrayList<>();
  9. readOK = false;
  10. }
  11. /**
  12. * 进行资源生产
  13. */
  14. public synchronized void produce(List<String> listProducer) {
  15. while (cacheList.size() != 0) {
  16. try {
  17. wait();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. System.out.println("increace error: " + e.getMessage());
  21. }
  22. }
  23. if (listProducer.size() > 0) {
  24. this.cacheList.addAll(listProducer);
  25. } else {
  26. readOK = true; //没有往缓冲区中放数据,说明读取操作完成
  27. }
  28. System.out.println(readOK);
  29. this.notifyAll();
  30. }
  31. /**
  32. * 消费者进行资源消费
  33. */
  34. public synchronized String consume() {
  35. String result = null;
  36. while (cacheList.size() == 0) {
  37. if (!readOK) {
  38. try {
  39. wait();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. System.out.println("decreace error: " + e.getMessage());
  43. }
  44. } else {
  45. break;
  46. }
  47. }
  48. if (cacheList.size() > 0) {
  49. result = cacheList.remove(0);
  50. }
  51. this.notifyAll();
  52. return result;
  53. }
  54. }

Producer

  1. public class Producer implements Runnable {
  2. private Storage manage;
  3. private int readSize;
  4. private int totalReadSize = 0;
  5. /**
  6. * 默认构造函数
  7. */
  8. public Producer(Storage trade) {
  9. manage = trade;
  10. readSize = 0;
  11. totalReadSize = 0;
  12. }
  13. public void run() {
  14. do {
  15. manage.produce(this.readData(50000));
  16. } while (readSize > 0);
  17. }
  18. public static String getRandomString(int length) { //length表示生成字符串的长度
  19. String base = "abcdefghijklmnopqrstuvwxyz0123456789";
  20. Random random = new Random();
  21. StringBuffer sb = new StringBuffer();
  22. for (int i = 0; i < length; i++) {
  23. int number = random.nextInt(base.length());
  24. sb.append(base.charAt(number));
  25. }
  26. return sb.toString();
  27. }
  28. /**
  29. * 进行数据读取
  30. *
  31. * @param readCount 读取的数据量
  32. * @return 返回读取的数据数
  33. */
  34. public List<String> readData(int readCount) {
  35. List<String> result = new ArrayList<>();
  36. Random random = new Random();
  37. int size = random.nextInt(100);
  38. System.out.println("数据size: " + size);
  39. for (int i = 0; i < size; i++) {
  40. result.add(getRandomString(10));
  41. }
  42. readSize = result.size(); //获取读取的数量
  43. if (readSize > 0) {
  44. totalReadSize += readSize;
  45. System.out.println("read size: " + readSize + " total read size: " + totalReadSize);
  46. System.out.println("read ok.");
  47. }
  48. return result;
  49. }
  50. }

Consumer

  1. public class Consumer implements Runnable {
  2. private Storage storage;
  3. public Consumer(Storage storage){
  4. this.storage = storage;
  5. }
  6. public void run() {
  7. this.beginDealData();
  8. }
  9. private void beginDealData(){
  10. String str = null;
  11. do{
  12. str = storage.consume();
  13. if(str != null){ //当没有数据时,跳出循环
  14. this.process(str);
  15. }
  16. }while(str != null); //当消费的资源为NULL时,则说明工作已经完成,可以跳出循环,结束线程
  17. }
  18. // 这里就是实际处理数据的方法
  19. private void process(String str){
  20. System.out.println("处理数据:" + str);
  21. }
  22. }

启动项

  1. public static void main(String[] args) {
  2. dealData();
  3. }
  4. private static void dealData() {
  5. Storage trade = new Storage();
  6. // 生产者 一个线程去读取
  7. Producer producer = new Producer(trade);
  8. Thread myReadThread = new Thread(producer);
  9. myReadThread.start();
  10. // 消费者 开了是个线程去处理数据
  11. Consumer consumer = new Consumer(trade);
  12. List<Thread> listThread = new ArrayList<>();
  13. for (int i = 0; i < 10; i++) {
  14. Thread myThread = new Thread(consumer);
  15. myThread.start(); //启动线程
  16. listThread.add(myThread);
  17. }
  18. // 当所有线程任务完成就清除
  19. while (listThread.size() > 0) {
  20. Thread mythread = listThread.get(0);
  21. if ("TERMINATED".equals(mythread.getState().toString())) {
  22. listThread.remove(mythread);
  23. }
  24. }
  25. }
  26. }

以上代码你可以直接复制到自己的ide中直接启动~,之后我会再github上创建个repo管理这些代码
下面是结果

  1. 数据size 61
  2. read size: 61 total read size: 61
  3. read ok.
  4. false
  5. false
  6. 数据size 0
  7. 处理数据:7xjprcutl9
  8. 处理数据:zl791k64b5
  9. 处理数据:l77wrc054f
  10. 处理数据:p5gpffv6x7
  11. 处理数据:d18q2o4e64
  12. 处理数据:1qxz2vnpxx
  13. 处理数据:9mwuuxarsa
  14. 处理数据:bfr5tqu79y
  15. 处理数据:45x29eb23g
  16. 处理数据:jyh3wdggra
  17. 处理数据:5hbnauixxu
  18. 处理数据:sqxx7e0iuw
  19. 。。。
  20. true
  21. 处理数据:99l17jyjai
  22. 处理数据:gunef9ngre
  23. 处理数据:pc69si84lp
  24. 处理数据:e9kxwunva4
  25. 处理数据:j5z7isuulq
  26. 处理数据:4i709oaupn
  27. 处理数据:9pdjg0h7ha
  28. 处理数据:a3cicjoxt2
  29. Process finished with exit code 0

代码解读

  这里创建了一个生产者线程,十个消费者线程,生产者每次随机生成数据模拟从数据库读数据并存入cacheList中,直到产生的为0的时候,意味着数据库中没有需要处理的数据了。十个消费者分别取处理这些数据。

关于生产者与消费者的实现的方式现在有
(1)wait() / notify()方法
(2)await() / signal()方法
(3)BlockingQueue阻塞队列方法
(4)PipedInputStream / PipedOutputStream
但是上面的思路不变。

上面可以理解为一个处理数据的框架,以后处理数据直接填充就ok了~

值得优化的地方

  其实对于处理海量数据这块,可能优化工作占得比重比较大。就个人经历讲一下方法。
  首先你需要把这三个过程各自消耗的时间统计出来,比如表中有1000w数据,那就先统计下处理10w或者20w所需要的时间,这里强调一下总数据量是1000w和10w分别去处理10w条数据消耗的时间是两码事!两码事!两码事!特别是有要关联其他表的时候,不信?自己去测试下!

这里强调一下,数据的写一定要使用批量写的方式!!!

与硬件有关的优化
  根据机器的cpu核数来确定代码中开的线程数,如果线程开多了,各线程之间的切换也需要消耗时间,具体的可参看博客http://ifeve.com/how-to-calculate-threadpool-size/,我是按照下面规则去设置线程池的大小
- 如果是CPU密集型应用,则线程池大小设置为N+1
- 如果是IO密集型应用,则线程池大小设置为2N+1

然后内存也需要考虑,因为你有把数据存入缓存的,数据量要控制好,不能把内存撑爆了。

如果配置低了,那就申请升级配置,如8G内存,四核处理器~

上面如果你都尝试了,但是任然需要很久的时间,这个时候那就需要加机器,比如开四台机器来处理1000w数据,这个就要使用分页的思想把数据分成四块去处理~

总结

  处理海量数据的过程还是能学到很多知识,从软件到硬件,从算法到jvm等等。

以上写的就是自己的一点点经验,能帮到你的话点个赞~

欢迎到我个人博客去转转

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注