janusgraph+cassandra で graph database を構築

Cassandra の yum install

  • /etc/yum.repos.d/cassandra.repo
name=Apache Cassandra
yum -y install cassandra

vim /etc/cassandra/conf/cassandra.yaml

### : /etc/cassandra/conf/cassandra.yaml
<start_rpc: false
>start_rpc: true

service cassandra start
service cassandra enable


wget https://github.com/JanusGraph/janusgraph/releases/download/v0.2.0/janusgraph-0.2.0-hadoop2.zip

unzip janusgraph-0.2.0-hadoop2.zip

cd janusgraph-0.2.0-hadoop2

gremlin> graph = JanusGraphFactory.open('conf/janusgraph-cassandra.properties')
gremlin> GraphOfTheGodsFactory.loadWithoutMixedIndex(graph, true)
gremlin> g = graph.traversal()
==>graphtraversalsource[standardjanusgraph[cassandrathrift:[]], standard]

Python での S3 からファイル取得(boto3)

boto3というモジュールが存在して、それを使ってS3 のファイルが取得できる。


In [1]: import boto3

In [7]: import botocore

In [21]: s3 = boto3.resource('s3', aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY,)

In [22]: try:
    ...:     s3.meta.client.head_bucket(Bucket=S3_BUCKET)
    ...: except Exception as e:
    ...:     print(e)

In [23]: for bucket in s3.buckets.all():
    ...:     for key in bucket.objects.all():
    ...:          print(key.key)


In [26]:try:
   ...:     s3.Bucket(S3_BUCKET).download_file(KEY, FILE)
   ...: except botocore.exceptions.ClientError as e:
   ...:     if e.response['Error']['Code'] == "404":
   ...:         raise S3FileNotFound
   ...:     else:
   ...:         raise

private static DataType parseDataType(Config fieldsConfig) {
  String type = fieldsConfig.getString(FIELD_TYPE_CONFIG);
  switch (type) {
    case "string":
      return DataTypes.StringType;
    case "byte":
      return DataTypes.ByteType;
    case "short":
      return DataTypes.ShortType;
    case "int":
      return DataTypes.IntegerType;
    case "long":
      return DataTypes.LongType;
    case "float":
      return DataTypes.FloatType;
    case "double":
      return DataTypes.DoubleType;
    case "decimal":
      ConfigUtils.assertConfig(fieldsConfig, DECIMAL_SCALE_CONFIG);
      ConfigUtils.assertConfig(fieldsConfig, DECIMAL_PRECISION_CONFIG);
      return DataTypes.createDecimalType(
    case "boolean":
      return DataTypes.BooleanType;
    case "binary":
      return DataTypes.BinaryType;
    case "date":
      return DataTypes.DateType;
    case "timestamp":
      return DataTypes.TimestampType;
    case "array":
    case "map":
    case "struct":
      throw new RuntimeException("Schema check does not currently support complex types");
      throw new RuntimeException("Unknown type: " + type);

LDA(Latent Dirichlet Allocation) でのトピック抽出

以下の形式のsample.csvからデータを取得し、sklean の LDA でトピック抽出する。

id text
1 今日は晴れ。明日は雨
2 今日はカープが優勝した。
... ...
  • text2topic.py
#!/usr/bin/env python
# coding:utf-8

from __future__ import print_function
from time import time

from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import NMF, LatentDirichletAllocation

import pickle
import os
import pandas as pd

import MeCab

n_samples = 2000
n_features = 1000
n_components = 10
n_top_words = 20

FETCHED_PAGES_DIR_NAME = 'fetched_pages'
QUERIES = '胃もたれ 虫歯 花粉症対策 鬱 機械 骨折 肩こり 書類'.split(' ')
NB_PKL_FILENAME = 'naive_bayes_classifier.pkl'
MIN_TFIDF = 0.001
TFIDF_RESULT_PKL_FILENAME = 'tfidf_result.pkl'
TFIDF_VECTORIZER_PKL_FILENAME = 'tfidf_vectorizer.pkl'

def print_top_words(model, feature_names, n_top_words):
    for topic_idx, topic in enumerate(model.components_):
        message = "Topic #%d: " % topic_idx
        message += " ".join([feature_names[i]
                             for i in topic.argsort()[:-n_top_words - 1:-1]])

def is_bigger_than_min_tfidf(term, terms, tfidfs):
    [term for term in terms if is_bigger_than_min_tfidf(term, terms, tfidfs)]で使う
    if tfidfs[terms.index(term)] > MIN_TFIDF:
        return True
    return False

def tfidf(values):
    # analyzerは文字列を入れると文字列のlistが返る関数
    vectorizer = TfidfVectorizer(analyzer=stems, min_df=1, max_df=50, max_features=n_features)
    corpus = [v for v in values]

    x = vectorizer.fit_transform(corpus)

    return x, vectorizer  # xはtfidf_resultとしてmainで受け取る

def countvec(values):
    # analyzerは文字列を入れると文字列のlistが返る関数
    vectorizer = CountVectorizer(analyzer=stems, min_df=1, max_df=50, max_features=n_features)
    corpus = [v for v in values]

    x = vectorizer.fit_transform(corpus)

    return x, vectorizer  # xはtfidf_resultとしてmainで受け取る

def _split_to_words(text, to_stem=False):
    入力: 'すべて自分のほうへ'
    出力: tuple(['すべて', '自分', 'の', 'ほう', 'へ'])
    tagger = MeCab.Tagger('mecabrc')  # 別のTaggerを使ってもいい
    mecab_result = tagger.parse(text)
    info_of_words = mecab_result.split('\n')
    words = []
    for info in info_of_words:
        # macabで分けると、文の最後に’’が、その手前に'EOS'が来る
        if info == 'EOS' or info == '':
            # info => 'な\t助詞,終助詞,*,*,*,*,な,ナ,ナ'
        info_elems = info.split(',')
        # 6番目に、無活用系の単語が入る。もし6番目が'*'だったら0番目を入れる
        if info_elems[6] == '*':
            # info_elems[0] => 'ヴァンロッサム\t名詞'
        if to_stem:
            # 語幹に変換
        # 語をそのまま
    return words

def words(text):
    words = _split_to_words(text=text, to_stem=False)
    return words

def stems(text):
    stems = _split_to_words(text=text, to_stem=True)
    return stems

if __name__ == '__main__':
    print("Loading dataset...")
    t0 = time()
    df = pd.read_csv('/tmp/sample.csv', header=None)
    y = df.values[:n_samples, 0]
    print("done in %0.3fs." % (time() - t0))

    print("Extracting tf features for tfidf...")
    t0 = time()
    tfidf_result, tfidf_vectorizer = tfidf(df.values[:n_samples, 1])  # tfidf_resultはtfidf関数のx

    pkl_tfidf_result_path = os.path.join('.', TFIDF_RESULT_PKL_FILENAME)
    pkl_tfidf_vectorizer_path = os.path.join('.', TFIDF_VECTORIZER_PKL_FILENAME)

    with open(pkl_tfidf_result_path, 'wb') as f:
        pickle.dump(tfidf_result, f)
    with open(pkl_tfidf_vectorizer_path, 'wb') as f:
        pickle.dump(tfidf_vectorizer, f)
    print("done in %0.3fs." % (time() - t0))

    # Use tf (raw term count) features for LDA.
    t0 = time()
    print("Extracting tf features for countvec...")
    tf_result, tf_vectorizer = countvec(df.values[:n_samples, 1])
    t0 = time()
    print("done in %0.3fs." % (time() - t0))

    # Fit the NMF model
    print("Fitting the NMF model (Frobenius norm) with tf-idf features, "
          "n_samples=%d and n_features=%d..."
          % (n_samples, n_features))
    t0 = time()
    nmf = NMF(n_components=n_components, random_state=1,
              alpha=.1, l1_ratio=.5).fit(tfidf_result)
    print("done in %0.3fs." % (time() - t0))

    print("\nTopics in NMF model (Frobenius norm):")
    tfidf_feature_names = tfidf_vectorizer.get_feature_names()
    print_top_words(nmf, tfidf_feature_names, n_top_words)

    # Fit the NMF model
    print("Fitting the NMF model (generalized Kullback-Leibler divergence) with "
          "tf-idf features, n_samples=%d and n_features=%d..."
          % (n_samples, n_features))
    t0 = time()
    nmf = NMF(n_components=n_components, random_state=1,
              beta_loss='kullback-leibler', solver='mu', max_iter=1000, alpha=.1,
    print("done in %0.3fs." % (time() - t0))

    print("\nTopics in NMF model (generalized Kullback-Leibler divergence):")
    tfidf_feature_names = tfidf_vectorizer.get_feature_names()
    print_top_words(nmf, tfidf_feature_names, n_top_words)

    print("Fitting LDA models with tf features, "
          "n_samples=%d and n_features=%d..."
          % (n_samples, n_features))
    lda = LatentDirichletAllocation(n_components=n_components, max_iter=5,
    t0 = time()
    print("done in %0.3fs." % (time() - t0))

    print("\nTopics in LDA model:")
    tf_feature_names = tf_vectorizer.get_feature_names()
    print_top_words(lda, tf_feature_names, n_top_words)

#!/usr/bin/env python
# coding:utf-8

import requests
import json
from heapq import heappush, heappop

URL = "http://localhost:8983/solr/project/tvrh"

def get_tvlist(url, _id, start, rows):
    params = {
        'q': 'id:{}'.format(_id),
        'rows': rows,
        'start': start,
        'indent': 'true',
        'tv.tf_idf': 'true',
        'tv.fl': 'includes',
        'fl': 'id',
    r = requests.get(URL, params=params)
    dic = json.loads(r.text)
    tv_list = dic.get('termVectors', [])
    return tv_list

def analisys_dic(tv_list):
    res = {}
    num = len(tv_list) // 2
    for i in range(num):
        _id = tv_list[2 * i]
        term_list = tv_list[2 * i + 1][3]
        term_list_num = len(term_list) // 2
        _heap = []
        for h in range(term_list_num):
            string = term_list[2 * h]
            tfidf = term_list[2 * h + 1][1]
            if tfidf < 0.1:
                heappush(_heap, (- tfidf, string))
        tfidf_list = []
        for n in range(10):
            if _heap == []:
            pop_tuple = heappop(_heap)
            tfidf_list.append((pop_tuple[1], - pop_tuple[0]))

        res.update({_id: tfidf_list})
    return res



AWS にて、 dockerの立ち上げがうまく行かなかった



$ git clone https://github.com/efkbook/blog-sample

$ cd blog-sample/

$ docker-compose up -d
Building go
Step 1/5 : FROM golang:1.8
1.8: Pulling from library/golang
4176fe04cefe: Pull complete
851356ecf618: Pull complete
6115379c7b49: Pull complete
69914558965c: Pull complete
b108f9aa98db: Pull complete
df7abcd2981e: Pull complete
3e60cb3f592b: Pull complete
Digest: sha256:f0b5dab7581eddb49dabd1d1b9aa505ca3edcdf79a66395b5bfa4f3c036b49ef
Status: Downloaded newer image for golang:1.8
 ---> 0d283eb41a92
Step 2/5 : RUN mkdir -p /go/src/github.com/efkbook/blog-sample
 ---> Running in 59dba2dba710
 ---> 2fcb72cc73c5
Removing intermediate container 59dba2dba710
Step 3/5 : WORKDIR /go/src/github.com/efkbook/blog-sample
 ---> 9719d83418fe
Removing intermediate container cdab235d0dd4
Step 4/5 : CMD make app/run
 ---> Running in ae90cd6f44cd
 ---> e6a2ebc9eaa0
Removing intermediate container ae90cd6f44cd
Step 5/5 : EXPOSE 8080
 ---> Running in a3e82ed29f86
 ---> c50bc29f92a4
Removing intermediate container a3e82ed29f86
Successfully built c50bc29f92a4
Successfully tagged blogsample_go:latest
WARNING: Image for service go was built because it did not already exist. To rebuild this image you must use `docker-compose build` or `docker-compose up --build`.
Building fluentd
Step 1/7 : FROM debian:jessie
jessie: Pulling from library/debian
f2b6b4884fc8: Pull complete
Digest: sha256:a69efd2cf1f83d7fe6a374aaf27974ed031d72b2886acb4b730c6a2017db0c3d
Status: Downloaded newer image for debian:jessie
 ---> 5dd74d62fab8
Step 2/7 : ENV DEBIAN_FRONTEND noninteractive
 ---> Running in 6bd988af2ec6
 ---> 3cb72631b1ab
Removing intermediate container 6bd988af2ec6
Installation completed. Happy Logging!

 ---> 6bb3f47cc669
Removing intermediate container e83c29c33e61
Step 5/7 : RUN /usr/sbin/td-agent-gem install fluent-plugin-elasticsearch fluent-plugin-record-reformer
 ---> Running in 4358c3476ca1
Successfully installed serverengine-2.0.6
Building native extensions.  This could take a while...
invalid options: -SHN
(invalid options are ignored)
Successfully installed strptime-0.2.3
Successfully installed dig_rb-1.0.1
Successfully installed fluentd-1.1.3
Successfully installed excon-0.62.0
Successfully installed multipart-post-2.0.0
Successfully installed faraday-0.14.0
Successfully installed elasticsearch-transport-6.0.2
Successfully installed elasticsearch-api-6.0.2
Successfully installed elasticsearch-6.0.2
Successfully installed fluent-plugin-elasticsearch-2.8.6
Parsing documentation for serverengine-2.0.6
Installing ri documentation for serverengine-2.0.6
Parsing documentation for strptime-0.2.3
Installing ri documentation for strptime-0.2.3
Parsing documentation for dig_rb-1.0.1
Installing ri documentation for dig_rb-1.0.1
Parsing documentation for fluentd-1.1.3
Installing ri documentation for fluentd-1.1.3
Parsing documentation for excon-0.62.0
Installing ri documentation for excon-0.62.0
Parsing documentation for multipart-post-2.0.0
Installing ri documentation for multipart-post-2.0.0
Parsing documentation for faraday-0.14.0
Installing ri documentation for faraday-0.14.0
Parsing documentation for elasticsearch-transport-6.0.2
Installing ri documentation for elasticsearch-transport-6.0.2
Parsing documentation for elasticsearch-api-6.0.2
Installing ri documentation for elasticsearch-api-6.0.2
Parsing documentation for elasticsearch-6.0.2
Installing ri documentation for elasticsearch-6.0.2
Parsing documentation for fluent-plugin-elasticsearch-2.8.6
Installing ri documentation for fluent-plugin-elasticsearch-2.8.6
Done installing documentation for serverengine, strptime, dig_rb, fluentd, excon, multipart-post, faraday, elasticsearch-transport, elasticsearch-api, elasticsearch, fluent-plugin-elasticsearch after 11 seconds
Successfully installed fluent-plugin-record-reformer-0.9.1
Parsing documentation for fluent-plugin-record-reformer-0.9.1
Installing ri documentation for fluent-plugin-record-reformer-0.9.1
Done installing documentation for fluent-plugin-record-reformer after 0 seconds
12 gems installed
 ---> 4db79640c552
Removing intermediate container 4358c3476ca1
Step 6/7 : EXPOSE 24224
 ---> Running in e09f02db180e
 ---> 75708fd35464
Removing intermediate container e09f02db180e
Step 7/7 : CMD exec td-agent -c /fluentd/etc/$FLUENTD_CONF -p /fluentd/plugins $FLUENTD_OPT
 ---> Running in 927bdb731fc6
 ---> 9ff77d0acf21
Removing intermediate container 927bdb731fc6
Successfully built 9ff77d0acf21
Successfully tagged blogsample_fluentd:latest
WARNING: Image for service fluentd was built because it did not already exist. To rebuild this image you must use `docker-compose build` or `docker-compose up --build`.
Building elasticsearch
Step 1/2 : FROM docker.elastic.co/elasticsearch/elasticsearch:5.4.3
5.4.3: Pulling from elasticsearch/elasticsearch
d3aeceeb0289: Pull complete
8f71a54ee36c: Pull complete
71f1935b230b: Pull complete
16c3cc6647f0: Pull complete
c23ca9956682: Pull complete
fa98162d672f: Pull complete
34efa21beb0e: Pull complete
139c723cd9ea: Pull complete
03a95a5ee04e: Pull complete
7f59c15ccc56: Pull complete
Digest: sha256:b14e0b6d7819d9a755d380cf32641befadac1c3a96c48a69831005250cfe54fd
Status: Downloaded newer image for docker.elastic.co/elasticsearch/elasticsearch:5.4.3
 ---> 2ae8547160a7
Step 2/2 : RUN cd /usr/share/elasticsearch &&   bin/elasticsearch-plugin install analysis-kuromoji &&   bin/elasticsearch-plugin install analysis-icu
 ---> Running in 79819937416c
-> Downloading analysis-kuromoji from elastic
[=================================================] 100%??
-> Installed analysis-kuromoji
-> Downloading analysis-icu from elastic
[=================================================] 100%??
-> Installed analysis-icu
 ---> 9f70a0c52685
Removing intermediate container 79819937416c
Successfully built 9f70a0c52685
Successfully tagged blogsample_elasticsearch:latest
WARNING: Image for service elasticsearch was built because it did not already exist. To rebuild this image you must use `docker-compose build` or `docker-compose up --build`.
Pulling kibana (docker.elastic.co/kibana/kibana:5.4.3)...
5.4.3: Pulling from kibana/kibana
d3aeceeb0289: Already exists
8579720ccec7: Pull complete
047c80c6b8c9: Pull complete
dd3d39cdec23: Pull complete
360978fe41a9: Pull complete
35aa697b4267: Pull complete
166e29386d64: Pull complete
d7dd1d81bf77: Pull complete
ed50f0e6ccee: Pull complete
Digest: sha256:a14294558c0d4a03d0abf5264b8147fa2a00f9b090e48448a1586b075d14e4b1
Status: Downloaded newer image for docker.elastic.co/kibana/kibana:5.4.3
Pulling nginx (nginx:1.11)...
1.11: Pulling from library/nginx
6d827a3ef358: Already exists
f8f2e0556751: Pull complete
5c9972dca3fd: Pull complete
451b9524cb06: Pull complete
Digest: sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582
Status: Downloaded newer image for nginx:1.11
Creating blogsample_nginx_1         ...
Creating blogsample_fluentd_1       ... error
Creating blogsample_elasticsearch_1 ...
Creating blogsample_fluentd_1       ...
Creating blogsample_go_1            ...
Creating blogsample_go_1            ... done
ERROR: for blogsample_fluentd_1  Cannot start service fluentd: driver failed programming eCreating blogsample_elasticsearch_1 ... done
Creating blogsample_kibana_1        ... done
Creating blogsample_nginx_1         ... done

ERROR: for fluentd  Cannot start service fluentd: driver failed programming external connectivity on endpoint blogsample_fluentd_1 (59456b19d7383bd270ed7424a55702be74a9b924c77c2db0275d6758d718bf32): Error starting userland proxy: listen tcp bind: address already in use
ERROR: Encountered errors while bringing up the project.


$ free -m
             total       used       free     shared    buffers     cached
Mem:          2001       1938         62          0          0         27
-/+ buffers/cache:       1910         90
Swap:            0          0          0

$ free -m
             total       used       free     shared    buffers     cached
Mem:          2001       1084        917          0         12         81
-/+ buffers/cache:        990       1010
Swap:            0          0          0

## CPU
$ sar -u
12:00:01 AM     CPU     %user     %nice   %system   %iowait    %steal     %idle
05:20:01 PM     all      0.04      0.00      0.03      0.01      0.02     99.90
05:30:01 PM     all      0.04      0.00      0.03      0.01      0.02     99.91
05:40:01 PM     all      0.04      0.00      0.03      0.01      0.02     99.90
05:50:01 PM     all     13.82      0.00      3.01      1.03      0.08     82.06
06:00:36 PM     all     14.26      0.00      5.28     78.84      1.12      0.50
06:10:01 PM     all     34.88      0.00      6.14     56.52      0.89      1.58

## ロードアベレージ
$ sar -q
12:00:01 AM   runq-sz  plist-sz   ldavg-1   ldavg-5  ldavg-15
05:20:01 PM         0       147      0.00      0.00      0.00
05:30:01 PM         1       147      0.00      0.00      0.00
05:40:01 PM         0       147      0.00      0.00      0.00
05:50:01 PM         1       157      0.94      0.39      0.15
06:00:36 PM         0       277      9.69      7.82      4.20
06:10:01 PM         0       201      1.51      6.54      5.89

## メモリ
$ sar -r
12:00:01 AM kbmemfree kbmemused  %memused kbbuffers  kbcached  kbcommit   %commit

05:20:01 PM    909852   1139400     55.60    166756    410508    971816     47.42
05:30:01 PM    909884   1139368     55.60    166756    410508    971816     47.42
05:40:01 PM    909764   1139488     55.61    166756    410524    971816     47.42
05:50:01 PM     74796   1974456     96.35    234604   1085612   1069228     52.18
06:00:36 PM     65172   1984080     96.82       116     25968   3084236    150.51
06:10:01 PM    336440   1712812     83.58      4668     46448   2591224    126.45

## Disk I/O
$ sar -b

12:00:01 AM       tps      rtps      wtps   bread/s   bwrtn/s
05:20:01 PM      0.16      0.00      0.16      0.00      1.90
05:30:01 PM      0.15      0.00      0.15      0.00      1.70
05:40:01 PM      0.16      0.00      0.16      0.00      1.82
05:50:01 PM     89.59      6.19     83.40    330.35  12059.86
06:00:36 PM   1179.01   1157.77     21.24 110175.33   3395.56
06:10:01 PM    791.31    786.20      5.11  81238.48    647.16