@bergus
2015-12-02T14:50:59.000000Z
字数 3335
阅读 1448
python
pyshell
写python程序的时候需要用到调用外部命令的模块,看了一下,还真不少,头疼,用着不顺手。根据官网推荐,我根据官网的subprocess模块定制了一个自己的shell,同时借鉴了github上面的shellpy模块,而且我觉得go语言的go-sh确实令人喜欢,所以我觉得基于流操作将会改变我们的很多工作。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import shlex
from subprocess import Popen
from subprocess import PIPE
def py_ver():
'''
判断python的版本
'''
import sys
return sys.version_info[0]
if py_ver() == 2:
builtin_str = str
bytes = str
str = unicode
basestring = basestring
numeric_types = (int, long, float)
elif py_ver() == 3:
builtin_str = str
str = str
bytes = bytes
basestring = (str, bytes)
numeric_types = (int, float)
else:
raise ValueError(u'python 版本不正确')
def parse_shell_token(t):
import os
# handle '~'
t = os.path.expanduser(t)
# handle env var
t = os.path.expandvars(t)
return t
def pipe_to_tmp(data):
'''
把管道或者内存中的数据缓存到临时文件
'''
if isinstance(data, (unicode, str)):
data = data.encode('utf-8')
import tempfile
stdin_tmp = tempfile.SpooledTemporaryFile()
stdin_tmp.write(data)
stdin_tmp.seek(0)
return stdin_tmp
class Shell(object):
def __init__(self, cmd_str, input_pipe=None):
self.cmd_str = cmd_str
self.popen = None
self.input_pipe = input_pipe
self.std = {'out': None, 'err': None}
def __getPopen(self):
if self.popen is None:
self.popen = Popen(
map(parse_shell_token, shlex.split(self.cmd_str, posix=False)),
stdin=self.input_pipe, stdout=PIPE, stderr=PIPE)
return self.popen
def pipe(self, cmd_str):
input_pipe = None
pp = self.__getPopen()
if pp.stdout.closed:
# 如果命令已经执行,那么就把标准输出的结果保存到临时文件
input_pipe = pipe_to_tmp(self.std['out'])
else:
input_pipe = pp.stdout
# print input_pipe.read()
# pp.stdout.close() # allow pp to receive SIGPIPE?
return Shell(cmd_str, input_pipe=input_pipe)
def __communicate(self):
pp = self.__getPopen()
if pp.returncode is None:
self.std['out'], self.std['err'] = pp.communicate()
def run(self):
if self.std['out'] is None:
self.__communicate()
print self.std['out']
def stdout(self):
if self.std['out'] is None:
self.__communicate()
return self.std['out']
def stderr(self):
if self.std['err'] is None:
self.__communicate()
return self.std['err']
cmd = Shell
if __name__ == '__main__':
# cmd('ls -l').run()
# cmd('ls -l').pipe('grep Shell.py').run()
# cmd('cat').pipe('> hello;cat hello').run()
# cmd('ls ~').run()
cmd('echo dddd').run()
#下面这个是改良版本,参考了python的bash类库的实现,仅用于学习
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from subprocess import Popen
from subprocess import PIPE
import shlex
def py_ver():
'''
得到python的版本
'''
import sys
return sys.version_info[0]
_ver = py_ver()
if _ver == 2:
builtin_str = str
bytes = str
str = unicode
basestring = basestring
numeric_types = (int, long, float)
elif _ver == 3:
builtin_str = str
str = str
bytes = bytes
basestring = (str, bytes)
numeric_types = (int, float)
else:
raise ValueError(u'python 版本不正确')
del _ver
#解析字符串中的环境变量
def parse_shell_token(t):
import os
#将~等用用户的家目录进行替换
t = os.path.expanduser(t)
#path中可以使用环境变量,'$PATH'...
t = os.path.expandvars(t)
return t
class cmd(object):
def __init__(self, *args, **kwargs):
self.stdout = None
self.cmd(*args, **kwargs)
def cmd(self, cmd, env=None, stdout=PIPE):
p = Popen(parse_shell_token(cmd), shell=True,
stdout=stdout, stdin=PIPE, stderr=PIPE, env=env)
self.stdout, self.stderr = p.communicate(input=self.stdout)
self.code = p.returncode
return self
def __repr__(self):
return self.value()
def __unicode__(self):
return self.value()
def __str__(self):
return self.value()
def __nonzero__(self):
return self.__bool__()
def __bool__(self):
return bool(self.value())
def value(self):
if not self.stdout:
return ''
return self.stdout.strip()
if __name__ == '__main__':
#print cmd('ls -l')
print cmd("ls . | grep 'pyc'")
#print cmd("konsole --hold -e 'konsole --help'")
#print cmd('scrapy list')
print cmd('ls $HOME')
#print cmd('ls ~')