spark の 文字取り込み

rdd でちょっとしたお試し。 flatMap は list --> rdd とするっぽい

import re
from collections import Counter

def parse_string(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
    except ValueError as e:
        return []
    text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
    words = [word.lower() for word in re.split("\W*\s+\W*", text, flags=re.UNICODE)]
    words_pair = ['_'.join([words[i], words[i+1]]) for i in xrange(len(words)-1) if words[i] == "python"]
    counts = Counter(words_pair)
    return [(k, v) for k, v in counts.items()]

wiki = sc.textFile(
    "/data/test", 16
).flatMap(
    parse_string
).reduceByKey(lambda x, y: x + y)
result = wiki.collect()

for r  in result:
    print '\t'.join([unicode(l) for l in r])