[关闭]
@bergus 2015-10-23T10:26:19.000000Z 字数 3924 阅读 1851

python多进程多点分片下载器

python 下载器 多进程


  1. #! /usr/bin/env python
  2. # encoding=utf-8
  3. from __future__ import unicode_literals
  4. from multiprocessing.dummy import Pool as ThreadPool
  5. import threading
  6. import os
  7. import sys
  8. import cPickle
  9. from collections import namedtuple
  10. import urllib2
  11. from urlparse import urlsplit
  12. import time
  13. # global lock
  14. lock = threading.Lock()
  15. # default parameters
  16. defaults = dict(
  17. thread_count=10,
  18. buffer_size=500 * 1024,
  19. block_size=1000 * 1024)
  20. def progress(percent, width=50):
  21. print "%s %d%%\r" % (('%%-%ds' % width) % (width * percent / 100 * '='), percent),
  22. if percent >= 100:
  23. print
  24. sys.stdout.flush()
  25. def write_data(filepath, data):
  26. with open(filepath, 'wb') as output:
  27. cPickle.dump(data, output)
  28. def read_data(filepath):
  29. with open(filepath, 'rb') as output:
  30. return cPickle.load(output)
  31. FileInfo = namedtuple('FileInfo', 'url name size lastmodified')
  32. def get_file_info(url):
  33. class HeadRequest(urllib2.Request):
  34. def get_method(self):
  35. return "HEAD"
  36. res = urllib2.urlopen(HeadRequest(url))
  37. res.read()
  38. headers = dict(res.headers)
  39. size = int(headers.get('content-length', 0))
  40. lastmodified = headers.get('last-modified', '')
  41. name = None
  42. if headers.has_key('content-disposition'):
  43. name = headers['content-disposition'].split('filename=')[1]
  44. if name[0] == '"' or name[0] == "'":
  45. name = name[1:-1]
  46. else:
  47. name = os.path.basename(urlsplit(url)[2])
  48. return FileInfo(url, name, size, lastmodified)
  49. def download(url, output,
  50. thread_count=defaults['thread_count'],
  51. buffer_size=defaults['buffer_size'],
  52. block_size=defaults['block_size']):
  53. # get latest file info
  54. file_info = get_file_info(url)
  55. # init path
  56. if output is None:
  57. output = file_info.name
  58. workpath = '%s.ing' % output
  59. infopath = '%s.inf' % output
  60. # split file to blocks. every block is a array [start, offset, end],
  61. # then each greenlet download filepart according to a block, and
  62. # update the block' offset.
  63. blocks = []
  64. if os.path.exists(infopath):
  65. # load blocks
  66. _x, blocks = read_data(infopath)
  67. if (_x.url != url or
  68. _x.name != file_info.name or
  69. _x.lastmodified != file_info.lastmodified):
  70. blocks = []
  71. if len(blocks) == 0:
  72. # set blocks
  73. if block_size > file_info.size:
  74. blocks = [[0, 0, file_info.size]]
  75. else:
  76. block_count, remain = divmod(file_info.size, block_size)
  77. blocks = [[i * block_size, i * block_size,
  78. (i + 1) * block_size - 1] for i in range(block_count)]
  79. blocks[-1][-1] += remain
  80. # create new blank workpath
  81. with open(workpath, 'wb') as fobj:
  82. fobj.write('')
  83. print 'Downloading %s' % url
  84. # start monitor
  85. threading.Thread(target=_monitor, args=(
  86. infopath, file_info, blocks)).start()
  87. # start downloading
  88. with open(workpath, 'rb+') as fobj:
  89. args = [(url, blocks[i], fobj, buffer_size)
  90. for i in range(len(blocks)) if blocks[i][1] < blocks[i][2]]
  91. if thread_count > len(args):
  92. thread_count = len(args)
  93. pool = ThreadPool(thread_count)
  94. pool.map(_worker, args)
  95. pool.close()
  96. pool.join()
  97. # rename workpath to output
  98. if os.path.exists(output):
  99. os.remove(output)
  100. os.rename(workpath, output)
  101. # delete infopath
  102. if os.path.exists(infopath):
  103. os.remove(infopath)
  104. assert all([block[1] >= block[2] for block in blocks]) is True
  105. def _worker((url, block, fobj, buffer_size)):
  106. req = urllib2.Request(url)
  107. req.headers['Range'] = 'bytes=%s-%s' % (block[1], block[2])
  108. res = urllib2.urlopen(req)
  109. while 1:
  110. chunk = res.read(buffer_size)
  111. if not chunk:
  112. break
  113. with lock:
  114. fobj.seek(block[1])
  115. fobj.write(chunk)
  116. block[1] += len(chunk)
  117. def _monitor(infopath, file_info, blocks):
  118. while 1:
  119. with lock:
  120. percent = sum([block[1] - block[0]
  121. for block in blocks]) * 100 / file_info.size
  122. progress(percent)
  123. if percent >= 100:
  124. break
  125. write_data(infopath, (file_info, blocks))
  126. time.sleep(2)
  127. if __name__ == '__main__':
  128. import argparse
  129. parser = argparse.ArgumentParser(description='多线程文件下载器.')
  130. parser.add_argument('url', type=str, help='下载连接')
  131. parser.add_argument('-o', type=str, default=None,
  132. dest="output", help='输出文件')
  133. parser.add_argument(
  134. '-t', type=int, default=defaults['thread_count'], dest="thread_count", help='下载的线程数量')
  135. parser.add_argument(
  136. '-b', type=int, default=defaults['buffer_size'], dest="buffer_size", help='缓存大小')
  137. parser.add_argument(
  138. '-s', type=int, default=defaults['block_size'], dest="block_size", help='字区大小')
  139. argv = sys.argv[1:]
  140. if len(argv) == 0:
  141. argv = ['https://eyes.nasa.gov/eyesproduct/EYES/os/win']
  142. args = parser.parse_args(argv)
  143. start_time = time.time()
  144. download(args.url, args.output, args.thread_count,
  145. args.buffer_size, args.block_size)
  146. print '下载时间: %ds' % int(time.time() - start_time)
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注