@bergus
2015-12-02T06:50:59.000000Z
字数 3335
阅读 1617
python pyshell
写python程序的时候需要用到调用外部命令的模块,看了一下,还真不少,头疼,用着不顺手。根据官网推荐,我根据官网的subprocess模块定制了一个自己的shell,同时借鉴了github上面的shellpy模块,而且我觉得go语言的go-sh确实令人喜欢,所以我觉得基于流操作将会改变我们的很多工作。
#!/usr/bin/env python# -*- coding:utf-8 -*-import shlexfrom subprocess import Popenfrom subprocess import PIPEdef py_ver():'''判断python的版本'''import sysreturn sys.version_info[0]if py_ver() == 2:builtin_str = strbytes = strstr = unicodebasestring = basestringnumeric_types = (int, long, float)elif py_ver() == 3:builtin_str = strstr = strbytes = bytesbasestring = (str, bytes)numeric_types = (int, float)else:raise ValueError(u'python 版本不正确')def parse_shell_token(t):import os# handle '~'t = os.path.expanduser(t)# handle env vart = os.path.expandvars(t)return tdef pipe_to_tmp(data):'''把管道或者内存中的数据缓存到临时文件'''if isinstance(data, (unicode, str)):data = data.encode('utf-8')import tempfilestdin_tmp = tempfile.SpooledTemporaryFile()stdin_tmp.write(data)stdin_tmp.seek(0)return stdin_tmpclass Shell(object):def __init__(self, cmd_str, input_pipe=None):self.cmd_str = cmd_strself.popen = Noneself.input_pipe = input_pipeself.std = {'out': None, 'err': None}def __getPopen(self):if self.popen is None:self.popen = Popen(map(parse_shell_token, shlex.split(self.cmd_str, posix=False)),stdin=self.input_pipe, stdout=PIPE, stderr=PIPE)return self.popendef pipe(self, cmd_str):input_pipe = Nonepp = self.__getPopen()if pp.stdout.closed:# 如果命令已经执行,那么就把标准输出的结果保存到临时文件input_pipe = pipe_to_tmp(self.std['out'])else:input_pipe = pp.stdout# print input_pipe.read()# pp.stdout.close() # allow pp to receive SIGPIPE?return Shell(cmd_str, input_pipe=input_pipe)def __communicate(self):pp = self.__getPopen()if pp.returncode is None:self.std['out'], self.std['err'] = pp.communicate()def run(self):if self.std['out'] is None:self.__communicate()print self.std['out']def stdout(self):if self.std['out'] is None:self.__communicate()return self.std['out']def stderr(self):if self.std['err'] is None:self.__communicate()return self.std['err']cmd = Shellif __name__ == '__main__':# cmd('ls -l').run()# cmd('ls -l').pipe('grep Shell.py').run()# cmd('cat').pipe('> hello;cat hello').run()# cmd('ls ~').run()cmd('echo dddd').run()#下面这个是改良版本,参考了python的bash类库的实现,仅用于学习#!/usr/bin/env python# -*- coding:utf-8 -*-from subprocess import Popenfrom subprocess import PIPEimport shlexdef py_ver():'''得到python的版本'''import sysreturn sys.version_info[0]_ver = py_ver()if _ver == 2:builtin_str = strbytes = strstr = unicodebasestring = basestringnumeric_types = (int, long, float)elif _ver == 3:builtin_str = strstr = strbytes = bytesbasestring = (str, bytes)numeric_types = (int, float)else:raise ValueError(u'python 版本不正确')del _ver#解析字符串中的环境变量def parse_shell_token(t):import os#将~等用用户的家目录进行替换t = os.path.expanduser(t)#path中可以使用环境变量,'$PATH'...t = os.path.expandvars(t)return tclass cmd(object):def __init__(self, *args, **kwargs):self.stdout = Noneself.cmd(*args, **kwargs)def cmd(self, cmd, env=None, stdout=PIPE):p = Popen(parse_shell_token(cmd), shell=True,stdout=stdout, stdin=PIPE, stderr=PIPE, env=env)self.stdout, self.stderr = p.communicate(input=self.stdout)self.code = p.returncodereturn selfdef __repr__(self):return self.value()def __unicode__(self):return self.value()def __str__(self):return self.value()def __nonzero__(self):return self.__bool__()def __bool__(self):return bool(self.value())def value(self):if not self.stdout:return ''return self.stdout.strip()if __name__ == '__main__':#print cmd('ls -l')print cmd("ls . | grep 'pyc'")#print cmd("konsole --hold -e 'konsole --help'")#print cmd('scrapy list')print cmd('ls $HOME')#print cmd('ls ~')
