python decrorator

用了python好久, 惭愧的是一些高级的语法糖就没怎么用过。花了点时间好好研究一下python 装饰器。
所谓装饰, 就是想要对一些已经有的模块做一些修饰工作, 但又不让这个功能侵入到原有代码中去,常用的使用场景例如, 插入日志, 性能测试, 事务处理等等, 有了装饰器, 就可以提取大量函数中与本身功能无关的代码,从而重用代码。

1
2
3
4
5
6
7
8
9
10
11
import datetime

def log(func):
def wrapper(*args, **kw):
print('call %s():' % func.__name__)
return func(*args, **kw)
return wrapper

@log
def now():
print(datetime.datetime.today())

对上面代码的注解:

  1. 函数log中需要一个func的参数, 也就是用来回调的函数。
  2. 在now函数前有一个@log, 即之前我们定义的函数

当我们调用now函数时, now()返回了wrapper()函数, 所以now其实变成了wrapper的一个变量, 而后面的log执行其实变成了wrapper()

1
2
3
now()
call now():
2015-06-18 11:00:52.339373

这样, 装饰器没有影响到原来的函数,也没影响函数调用的模块。

而多个装饰器,

1
2
3
4
@decorator1
@decorator2
def now():
print(datetime.datetime.today())

其实等价于 now = decorator1(decorator2(now))
(其实就是说装饰器的调用顺序与其声明的顺序相反)

而如果装饰器自己带参数, 其实就是说装饰器这个函数需要返回一个真正的decorator
比如:(本例子引用自廖雪峰python3装饰器)

1
2
3
4
5
6
7
def log(text):
def decorator(func):
def wrapper(*args, **kw):
print('%s %s():' % (text, func.__name__))
return func(*args, **kw)
return wrapper
return decorator

若调用

1
2
3
@log
def now():
print(datetime.datetime.today())

实际上就是
now = decorator(text)(now)

而如果被装饰的那个函数带参数, 则装饰器的函数也需要加入对应参数

1
2
3
4
5
6
7
8
9
10
def log(func):
def wrapper(text2):
print('%s' % (func.__name__))
return func(text2)
return wrapper

@log
def now(text2):
print(datetime.datetime.today())
print(text2)

比如调用

1
now("haha")

输出如下:

1
2
3
now
2015-06-18 11:20:11.246544
haha

简单的总结到这里, 比较好的例子可以参看
Python修饰器的函数式编程

selenium, Phantomjs use to crawl paper

本意是用scrapy去抓取文件的, 不过pdf下载部分一直搞不定, 只好先找到页面连接, 然后用selenium和phantomjs来模拟浏览器的行为, 采用点击,另存为的方法
不得不说, 标准的论文数据库还是对下载做了许多限制的, 比如我点击论文的pdf地址, 后退回原网页,再重新点击pdf地址,也会给出警告,然后转移到请求登录的界面。
这里给出一个简单的单页抓取模板, 但是这种方法效率并不高,SSRN数据库加载的很慢。

1. 基本下载模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python
# encoding: utf-8
__author__ = 'dm'

from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
import requests
from bs4 import BeautifulSoup
import time
import urllib.request

driver = webdriver.PhantomJS()
driver.set_window_size(1120,550)
paper_url = "http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2702516"
driver.get(paper_url)
# driver.get("http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2703212")
content = driver.page_source
flag = 1
try:
download_button = driver.find_element_by_id("openDownloadLink1")
time.sleep(2)
download_button.click()
if "Data_Integrity_Notice" in driver.current_url:
driver.find_element_by_css_selector("#AnonymousTab > a.deselected > nobr").click()
driver.find_element_by_name("ProcessAnym").click()
else:
pdf_name = paper_url[51:] +".pdf"
urllib.request.urlretrieve(driver.current_url, pdf_name)
except NoSuchElementException as e:
print("need buy it")
flag = 0
driver.quit()

2. 测试脚本

用selenium直接导出了一个测试脚本, 非常方便, 推荐大家使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import NoAlertPresentException
import unittest, time, re

class Test1(unittest.TestCase):
def setUp(self):
self.driver = webdriver.Chrome()
self.driver.implicitly_wait(30)
self.base_url = "http://papers.ssrn.com/"
self.verificationErrors = []
self.accept_next_alert = True

def test_1(self):
driver = self.driver
driver.get(self.base_url + "/sol3/papers.cfm?abstract_id=2697209&download=yes")
driver.find_element_by_id("openDownloadLink1").click()
driver.find_element_by_css_selector("#AnonymousTab > a.deselected > nobr").click()
driver.find_element_by_name("ProcessAnym").click()

def is_element_present(self, how, what):
try: self.driver.find_element(by=how, value=what)
except NoSuchElementException as e: return False
return True

def is_alert_present(self):
try: self.driver.switch_to.alert()
except NoAlertPresentException as e: return False
return True

def close_alert_and_get_its_text(self):
try:
alert = self.driver.switch_to.alert()
alert_text = alert.text
if self.accept_next_alert:
alert.accept()
else:
alert.dismiss()
return alert_text
finally: self.accept_next_alert = True

def tearDown(self):
self.driver.quit()
self.assertEqual([], self.verificationErrors)

if __name__ == "__main__":
unittest.main()

3. PhantomJS以及firefox设置代理

需要在配置文件中加入以下内容

3.1 PhantomJS

1
2
3
4
5
service_args = [
'--proxy=127.0.0.1:1080' ,
'--proxy-type=socks5' ,
]
driver = webdriver.PhantomJS(service_args= service_args)

3.2 Firefox

1
2
3
4
5
6
7
8
profile = webdriver.FirefoxProfile()
profile.set_preference('network.proxy.type', 1) #默认值0,就是直接连接;1就是手工配置代理。
profile.set_preference('network.proxy.socks', proxyip) #proxyip需自定义, 我用的是本地的代理"127.0.0.1"
profile.set_preference('network.proxy.socks_port', port) #port自定义
profile.set_preference('network.proxy.ssl',ip)
profile.set_preference('network.proxy.ssl_port', port)
profile.update_preferences()
browser = webdriver.Firefox(profile)

使用Spark分析author—keyword数据集

数据集来自唐杰Aminer dataset中自2006-2011中抽取的DM顶级会议的paper, 其中user为paper author, 而item 为出现在paper中的keyword

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
dataset = sc.textFile("hdfs:///dm/author_keyword/author_keyword.txt")
user_fields = dataset.map(lambda line: line.split(" "))
# dataset
num_users = user_fields.map(lambda fields: fields[0]).distinct().count()
# num_users: 27130
num_keywords = user_fields.map(lambda field: field[1]).distinct().count()
# num_keywords: 13543
num_ratings = user_fields.map(lambda field: field[2]).distinct().count()
#num_ratings: 30
ratings = user_fields.map(lambda field: int(field[2]))
max_rating = ratings.reduce(lambda x,y : max(x, y))
min_rating = ratings.reduce(lambda x,y : min(x, y))
#max : 52 min:1
num_ratings = user_fields.count()
# num_ratings: 703320 总记录条数
mean_rating = ratings.reduce(lambda x,y: x+y) *1.0 / num_ratings
#mean_rating: 1.113
ratings_per_user = num_ratings / num_users
# ratings_per_user: 25 平均每个用户和25条记录有关
median_rating = np.median(ratings.collect())
#median_rating : 1.0 rating的中位数为1
ratings_per_keyword = num_ratings / num_keywords
#ratings_per_keyword: 51
ratings.stats()
# (count: 703320, mean: 1.11235852812, stdev: 0.529708031287, max: 52.0, min: 1.0)
count_by_rating = ratings.countByValue()
# {1: 650410, 2: 39472, 3: 7984, 4: 2731, 5: 1178, 6: 629, 7: 363, 8: 193, 9: 103, 10: 80, 11: 48, 12: 27, 13: 21, 14: 18, 15: 12, 16: 12, 17: 10, 18: 3, 19: 6, 20: 4, 21: 3, 22: 2, 24: 1, 27: 1, 29: 3, 30: 1, 33: 2, 34: 1, 44: 1, 52: 1}
#绘图
x_axis = np.array(count_by_rating.keys())
y_axis = np.array([float(c) for c in count_by_rating.values()])
y_axis_normed = y_axis / y_axis.sum()
pos = np.arange(len(x_axis))
width =1.0
ax = plt.axes()
ax.set_xticks(pos +(width /2))
ax.set_xticklabels(x_axis)
plt.bar(pos, y_axis_normed, width, color='lightblue')


user_rating_grouped = user_fields.map(lambda fields:(int(fields[0]), int (fields[2]))).groupByKey()
#user_rating_grouped: PythonRDD[58] at RDD at PythonRDD.scala:43
user_ratings_byuser = user_ratings_grouped.map(lambda (k, v): (k, len(v)))
user_ratings_byuser_local = user_ratings_byuser.map(lambda (k, v): v).collect()
#各author rating次数直方图
hist(user_ratings_byuser_local, bins=200, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)

rating分布
author rating次数分布直方图

聚类概述以及在Spark中运行Kmeans聚类

最近在做model的前期工作,聚类, 因为聚类结果要用于下一步的推荐模型当中,所以学习了一下几种常见的聚类算法。这里只写下三种,一种是最简单的Kmeans聚类算法, 另一种是层次聚类算法, 最后一种则算是两种算法的结合,二分K均值聚类。

  1. Kmeans聚类
    machine learning的主要问题可以分为分类以及聚类问题。 分类是有监督的学习, 而聚类是没有监督的学习, 换而言之, 分类问题是训练一个有类标的训练集,进而对测试集进行预测, 而聚类问题没有类标,我们只需要知道如何计算相似度就可以了,所以通常情况下clustering是不需要使用训练数据进行学习的。组内的相似性越好, 组间差别越大, 那么聚类就越好。
    Kmeans是一个非常经典的简单算法,也算得上是最简单的一种。以下为一个简单的例子。来源于维基百科:K-means_clustering
    K initial means

    1. k initial “means” (in this case k=3) are randomly generated within the data domain (shown in color).
      k clusters created
    2. k clusters are created by associating every observation with the nearest mean. The partitions here represent the Voronoi diagram generated by the means.
      new mean
    3. The centroid of each of the k clusters becomes the new mean.
      clusters finished
    4. Steps 2 and 3 are repeated until convergence has been reached.

      算法

    5. 选择K个点作为初始质心
    6. repeat
    7. 将每个点指派到最近的质心
    8. 重新计算每个簇的质心
    9. until 质心不发生变化

      关于求簇中心的方法,常用的有欧拉距离和cosine:

    10. 欧拉距离
      $$d{ij}=\sqrt{\sum{k=1}^n(x{ik} - x{jk})^2}$$
    11. cosine(用于document中用于计算相似度)
      目标函数
      通常情况采用的都是SSE(summary of the Squared Error)作为度量聚类质量的目标函数
      也就是满足下面这个式子
      $$arg minS \sum{i=1}^k \sum_{x\in S_i}|| x- \it\mu_i||^2$$
      其中$\mu_i$是每个簇所有点的均值, 也就是质心。

      关于选定K个中心点的初值,通常是针对具体问题有一些启发式的选取方法,或者随机选取然后跑多次取最好的一个结果。

  2. run kmeans on spark
    spark社区越来越活跃, 最近也一直在学习spark, 感觉看别人写的example都非常一目了然, 而自己写的时候就是一头包。
    其实spark中给出了一些已有的机器学习算法的实现方式,简单的贴一个用spark跑Kmeans的实例代码.

    1. 引入SparkConf以及Kmeans方法

      1
      2
      3
      import org.apache.spark.mllib.clustering.KMeans
      import org.apache.spark.mllib.linalg.Vectors
      import org.apache.spark.{SparkConf, SparkContext}
    2. 设置具体参数以及解析数据并缓存(因为数据挺大的, 有170W用户,为了便于使用包括数据源以及聚类数目,迭代次数全部采用参数形式从命令行中读取)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      val sparkConf = new SparkConf().setAppName("KmeansAuthors").setMaster("local[4]")
      val sc = new SparkContext(sparkConf)
      // 从hdfs上读取数据
      // val data = sc.textFile("hdfs://localhost:9000/input/feature_1109_2.csv")
      val data = sc.textFile(args(0))
      val parsedData = data.map(s => Vectors.dense(s.split(',').map(_.toDouble))).cache()
      // Cluster the data into two classes using KMeans
      val numClusters = args(1).toInt
      val numIterations = args(2).toInt
      val clusters = KMeans.train(parsedData, numClusters, numIterations)
    3. 聚类并评估结果

      1
      2
      3
      4
      5
      val clusterCenters = clusters.clusterCenters
      val labels = clusters.predict(parsedData)
      labels.saveAsTextFile(args(3))
      val WSSSE = clusters.computeCost(parsedData)
      println("Within Set Sum of Squared Errors = " + WSSSE)
    4. 完整代码见我的github中:KmeansExample
      之后打包成jar包提交到spark集群即可。
      贴一下我的提交命令:

      1
      ./spark-submit --master spark://sparkmaster:7077 --class KmeansExample --driver-memory 4g --executor-memory 8g /home/open/KmeansExample/out/artifacts/KmeansExample_jar/KmeansExample.jar hdfs://localhost:9000/input/feature_1109_2.csv 10 2000 hdfs://localhost:9000/input/kmeansResult2
  3. 层次聚类
    与Kmeans不同,层次聚类得到的结果是一棵树, 可以在任意的地方切一刀进而得到想要的结果, 用在模型中也是因为不想和评审们解释太多。有两种产生层次聚类的基本方法:

    1. 自顶向下
      从包含所有点的某个簇开始, 每一步分裂一个簇, 直至剩下单点簇, 在这种情况下, 我们需要决定每一步分裂哪一个簇, 以及如何分裂。
    2. 自底向上,从点作为个体簇开始,每一步合并最近的簇。因此需要定义簇的邻近性概念。
      对于自底向上方法来说,那么其实最主要的是一个问题, 如何计算两个簇之间的距离?
      通常有三个做法:
    3. MAX

      不同簇最远的两个点的距离定义为这两个簇的距离

    4. MIN

      不同簇最近的两个点的距离定义为这两个簇的距离

    5. Average

      所有点对的平均距离

  4. 二分k均值(bisecting k-means)算法
    二分k均值(bisecting k-means)算法的主要思想是:首先将所有点作为一个簇,然后将该簇一分为二。之后选择能最大程度降低聚类代价函数(也就是误差平方和)的簇划分为两个簇。以此进行下去,直到簇的数目等于用户给定的数目k为止。
    原理很简单。这里也不再赘述了。考虑再三,准备把这个算法用到自己的model里来进行聚类。
    这里有一个spark库, bisecting Kmeans
    我forked到了本地准备进一步尝试, 如无意外,稍后会给出应用的方法。

  1. 参考文章:
    1. 漫谈 Clustering (1): k-means
    2. K-means聚类算法

socket编程

socket是什么

  1. 首先,我们要知道网络中进程是如何通信的,在TCP/IP协议簇中,网络层的IP地址可以标识一台计算机在网络中的地址,而传输层的”协议+端口”则能够唯一的标识主机中的应用程序(进程)。这样(ip address, protocol, port)就可以标识出网络的进程了。
  2. 现行TCP/IP体系结构比较简单,只有四层,即 应用层,传输层,网际层,网络接口层。
    |应用层|HTTP, …,SMTP,DNS…RTP|
    |:—-|:————:|
    |运输层|TCP, UDP|
    |网际层| IP协议|
    |网络接口层|网络接口1…n|
    即下图所示
    TCP/IP协议簇之间的关系
    但是在这当中,我们并没有看到socket在哪里,事实上,socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。
    Socket抽象层

客户端

  • 创建socket连接
    1
    2
    3
    4
    5
    import socket
    s = socker.socket(socket.AF_INET, socket.SOCK_STREAM)
    #socket(family,type[,protocal]) 使用给定的地址族、套接字类型、协议编号(默认为0)来创建套接字。
    #family:AF_INET(用于 Internet 进程间通信) 或者 AF_UNIX(用于同一台机器进程间通信)
    #type: SOCKET_STREAM(流式套接字,主要用于 TCP 协议)或者SOCKET_DGRAM(数据报套接字,主要用于 UDP 协议)

如果创建socker失败,会抛出一个socket.error的异常,必须进行处理

1
2
3
4
5
6
7
8
9
10
11
import socket   #for sockets
import sys #for exit

try:
#create an AF_INET, STREAM socket (TCP)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error, msg:
print ' 创建socket错误,错误码:' + str(msg[0]) + ' , 错误原因: ' + msg[1]
sys.exit();

print 'socket建立成功 ^_^'

  • 连接服务器
    连接服务器时需要知道其IP地址以及端口号
    这里以github为例

    1
    2
    3
    4
    5
    6
    host = 'www.github.com'
    port = 80
    sip = socket.gethostbyname(host)
    s.connect((sip, port))
    print 'socket 连接到:,' + host +'的IP地址为:' + sip
    #Socket Connected to www.github.com on ip 192.30.252.130
  • 发送消息

    1
    2
    3
    4
    5
    6
    #请求网页内容
    message = "GET / HTTP/1.1\r\n\r\n"
    s.sendall(message)

    #sendall(data[, flags]) -- send all data
    #send(data[, flags]) -- send data, may not send all of it
  • 接收消息

    1
    2
    3
    4
    5
    6
    7
    8
    '''
    recv(buflen[, flags]) -- receive data
    | recv_into(buffer[, nbytes[, flags]]) -- receive data (into a buffer)
    | recvfrom(buflen[, flags]) -- receive data and sender's address
    | recvfrom_into(buffer[, nbytes, [, flags])
    | -- receive data and sender's address (into a buffer)
    '''

    reply = s.recv(4096)
    • 关闭socket连接
      1
      s.close()

服务器端

  • 打开socket

  • 绑定socket

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import socket
    import sys
    HOST = '' # Symbolic name meaning all available interfaces
    PORT = 8888 # Arbitrary non-privileged port
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print 'Socket created'
    s.bind((HOST, PORT))
    print 'Socket bind complete'
    ```
    - 监听连接
    ```python
    # listen(n) -- start listening for incoming connections
    s.listen(5)
  • 接收连接

    1
    2
    3
    #accept() -- accept a connection, returning new socket and client address
    conn, addr = s.accept()
    print 'Connected with ' + addr[0] + ':' + str(addr[1])

参考文献:

1. Linux socket 编程,第一部分
2.Linux Socket编程(不限Linux)
3.python socket编程详细介绍
4.socket library

perceptron

相关原理就不再介绍了, 代码贴一下:
这里解决的是李航统计学习方法书上2.1的习题

1
# -*- coding:utf-8 -*-
import numpy as np
 
training_set = np.array([[(3, 3), 1], [(4, 3), 1], [(1, 1), -1]])
w = np.zeros(2, dtype=int)
b = 0

def update(item):
    global w, b
    w[0] += 1 * item[1] * item[0][0]
    w[1] += 1 * item[1] * item[0][1]
    b += 1 * item[1]
    print w, b

def cal(item):
    res = 0
    for i in range(len(item[0])):
        res += item[0][i] * w[i]
    res += b
    res *= item[1]
    return res

def check():
    flag = 0
    for item in training_set:
        if cal(item) <= 0:
            flag = 1
            update(item)
    if not flag:
        print "RESULT: w: " + str(w) + " b: " + str(b)
    return flag

if __name__ == "__main__":
    for i in range(1000):
        if not check(): break