行動履歴をDjangoのcommandsで処理

行動履歴から、相関とクラスタリングを行う

# -*- coding: utf-8 -*-
# vim:tabstop=4:shiftwidth=4:expandtab

import os
import sys
from datetime import (datetime, date,)
import logging
import re
import shutil
import tempfile

import pandas as pd
import numpy as np
from scipy.sparse.csc import csc_matrix
from scipy.sparse import csr_matrix

from sklearn.metrics.pairwise import (
    cosine_similarity,
    euclidean_distances,
)
from sklearn import preprocessing
from sklearn.decomposition import NMF
from sklearn.cluster import KMeans

from django.conf import settings
# from django.utils import timezone
from .base import (
    BaseCommand,
)

f2 = '%Y-%m-%d %H:%M:%S'
my_parser = lambda datetime: pd.datetime.strptime(datetime, f2)

logger = logging.getLogger(__name__)


class Command(BaseCommand):

    def handle(self, *args, **kwargs):

        data_file = kwargs['data_file']
        columns = [
            'request_at',
            'user_id',
            'a',
            'item_cd',
            'x',
            'y',
        ]

        dtypes = {
            'request_at': 'object',
            'user_id': 'object',
            'a': 'object',
            'item_cd': 'object',
            'x': 'object',
            'y': 'int64'
        }

        calcls = PyCalActionLog

        calculater = calcls(
            data_file=data_file,
            columns=columns,
            dtypes=dtypes
        )
        try:
            calculater.run()
        except Exception as e:
            logger.exception(e)

    def add_arguments(self, parser):

        parser.add_argument(
            'data_file',
            action='store',
            help=u'読み込むTSVファイルのパス',
        )


def df_to_tsv(filename, df):

    with tempfile.NamedTemporaryFile(delete=False, dir='/tmp',) as f:
        df.to_csv(f.name, sep='\t', index=False, header=False, float_format='%.8f')
        shutil.move(f.name, filename)
    if os.path.exists(f.name):
        os.remove(f.name)

def sparse_argsort(arr):
    indices = np.nonzero(arr)[0]
    return indices[np.argsort(arr[indices])]


class PyCalActionLog(object):

    def __init__(self, data_file, columns, dtypes):
        self.data_file = data_file
        self.data_columns = columns
        self.dtypes = dtypes

    def run(self):
        """
        行動履歴より、計算を行う
        """
        # df : pandas の df
        logger.info('read_actionlog start')
        df = self.read_actionlog()
        logger.info('read_actionlog finish')

        logger.info('cal_item_correlation start')

        df_count_per_user_item = df[['user_id', 'item_cd', ]].groupby(
            ['user_id', 'item_cd', ]
        ).size().reset_index(name='count')

        df_count_per_user = df[['user_id', ]].groupby(
            ['user_id', ]
        ).size().reset_index(name='count_per_user')

        df_count_per_item = df[['item_cd', ]].groupby(
            ['item_cd', ]
        ).size().reset_index(name='count_per_item')

        # df_... : numpy の matrixs
        df_count_params, df_count = self.cal_count(df_count_per_user_item, df_count_per_user, df_count_per_item)
        df_tfidf = self.cal_tfidf(df_count_params, df_count)

        # df_count_mat : shape => (item_cd, user_id)
        le_user, le_item, df_count_mat = self.trans_score2int(df_count_per_user, df_count_per_item, df_tfidf)

        # 機種相関を出力
        self.cal_item_correlation(le_item, df_count_mat)
        logger.info('cal_item_correlation finish')

        # NMF より次元削減
        logger.info('cal_usercluster, cal_usercluster_itemrank start')
        U, M = self.nmf_decomposition(df_count_mat, 15)
        user_km = self.user_kmeans(U, 20)

        self.cal_usercluster(le_user, user_km)
        self.cal_usercluster_itemrank(le_item, df_count_mat, user_km)
        logger.info('cal_usercluster, cal_usercluster_itemrank finish')

    def read_actionlog(self):
        df = pd.read_csv(
            self.data_file, header=None, sep='\t',
            names=self.data_columns, dtype=self.dtypes,  # parse_dates=['request_at', ],
        )
        df.columns = self.data_columns
        return df

    def cal_count(self, df_count_per_user_item, df_count_per_user, df_count_per_item):

        all_size = df_count_per_user_item['count'].count()

        df_count_per_user_item['all_size'] = all_size

        df_count_per_user_item = df_count_per_user_item.merge(
            df_count_per_user,
            left_on='user_id',
            right_on='user_id',
            how='outer'
        )

        df_count_per_user_item = df_count_per_user_item.merge(
            df_count_per_item,
            left_on='item_cd',
            right_on='item_cd',
            how='outer'
        )

        return np.array(df_count_per_user_item.columns), df_count_per_user_item.values

    def cal_tfidf(self, df_count_params, df_count):
        count, all_size_col, count_per_user, count_per_item = 2, 3, 4, 5
        user_id, item_cd = 0, 1

        df_tfidf = (df_count[:, count] / df_count[:, count_per_user]) * np.log1p(
            ((df_count[:, all_size_col] - df_count[:, count_per_item]) / df_count[:, count_per_item]).astype(np.float64)
        )

        return np.hstack((df_count[:, [user_id, item_cd]], df_tfidf[:, np.newaxis]))

    def trans_score2int(self, df_count_per_user, df_count_per_item, df_tfidf):
        le_user = preprocessing.LabelEncoder()
        le_item = preprocessing.LabelEncoder()

        le_user.fit(df_count_per_user.values[:, 0])
        le_item.fit(df_count_per_item.values[:, 0])

        df_count_per_item_int = le_item.transform(df_tfidf[:, 1])
        df_count_per_user_int = le_user.transform(df_tfidf[:, 0])

        df_count_mat = csr_matrix(
            (
                df_tfidf[:, 2].astype(np.float64),
                (df_count_per_item_int, df_count_per_user_int)),
            shape=(df_count_per_item.shape[0], df_count_per_user.shape[0])
        )

        return le_user, le_item, df_count_mat

    def cal_item_correlation(self, le_item, df_count_mat):
        similarity_mat = cosine_similarity(df_count_mat)
        rank = 100
        rank_mat = np.argsort(similarity_mat)[:, ::-1][:, 1:rank + 1]

        res = np.array([]).reshape(0, 3)

        for i in range(len(rank_mat)):
            mat = np.concatenate(
                (
                    np.full((rank_mat[i].shape[0], 1), i).astype('O'),
                    rank_mat[i, :][:, np.newaxis].astype('O'),
                    similarity_mat[i, rank_mat[i, :]][:, np.newaxis].astype(np.float64)
                ), axis=1
            )
            res = np.append(res, mat, axis=0)

        res = res[np.where(res[:, 2] != 0.0)]
        res[:, [0, 1]] = le_item.inverse_transform(res[:, [0, 1]].astype(int))

        df_res = pd.DataFrame(
            data=res,
            columns=['item_cd', 'correlation_item_cd', 'value'],
        )

        df_res['value'] = pd.to_numeric(df_res['value'])
        df_res = df_res.round({'value': 8})

        df_to_tsv(os.path.join(settings.RESULT_DIR, 'pyitem_correlation.tsv'), df_res)

    def nmf_decomposition(self, df_count_mat, rank):
        nmf_dec = NMF(
            n_components=rank,
            init='random',
            random_state=1
        )
        U = nmf_dec.fit_transform(df_count_mat.T)
        M = nmf_dec.components_

        return U, M

    def user_kmeans(self, U, k):
        km = KMeans(
            n_clusters=k,
            init='k-means++',
            n_init=10,
            max_iter=300,
            tol=1e-04,
            random_state=0
        )

        y_km = km.fit_predict(U)
        cluster_labels = np.unique(y_km)
        n_clusters = cluster_labels.shape[0]

        # クラスタメンバの大きい順に cluster_idを番号をふる
        return np.vectorize(
            dict(zip(np.argsort(np.bincount(y_km))[::-1], np.arange(n_clusters))).get
        )(y_km)

    def cal_usercluster(self, le_user, user_km):
        usercluster_mat = np.concatenate(
            (
                le_user.inverse_transform(np.arange(user_km.shape[0]))[:, np.newaxis].astype('object'),
                user_km[:, np.newaxis]
            ), axis=1
        )

        df_usercluster = pd.DataFrame(
            data=usercluster_mat,
            columns=['user_id', 'cluster_id'],
        )
        df_usercluster['cluster_id'] = df_usercluster['cluster_id'].astype('int64')

        df_to_tsv(os.path.join(settings.RESULT_DIR, 'user_cluster.tsv'), df_usercluster)

    def cal_usercluster_itemrank(self, le_item, df_count_mat, user_km):

        rank = 100

        item_list = le_item.inverse_transform(np.arange(df_count_mat.shape[0]))

        cluster2item_mat = np.concatenate(
            (
                user_km[:, np.newaxis],
                df_count_mat.T.astype(np.float64).toarray()
            ), axis=1
        )

        df_cluster2item = pd.DataFrame(
            data=cluster2item_mat,
        )
        df_cluster2item[0] = df_cluster2item[0].astype('int64')

        item_groupby_cluster = df_cluster2item.groupby(0).sum().reset_index().values

        mat_cluster = item_groupby_cluster[:, 0].astype('int64')
        mat_item_sumscore = item_groupby_cluster[:, 1:]

        res = np.array([]).reshape(0, 3)

        for i in range(len(mat_cluster)):
            cluster_id = mat_cluster[i]
            item_ranking_list = sparse_argsort(mat_item_sumscore[i])[::-1]
            length_item_ranking_list = len(item_ranking_list)
            if length_item_ranking_list > rank:
                item_ranking_list = item_ranking_list[:rank]
                length_item_ranking_list = 100
            rank_list = np.arange(1, len(item_ranking_list) + 1)

            mat = np.concatenate(
                (
                    item_list[item_ranking_list][:, np.newaxis],
                    np.full((length_item_ranking_list, 1), cluster_id),
                    rank_list[:, np.newaxis],
                ), axis=1
            )
            res = np.append(res, mat, axis=0)

        dtypes = {
            'item_cd': 'object',
            'cluster_id': 'int64',
            'rank': 'int64'
        }

        df_popularitemsincluster = pd.DataFrame(
            data=res,
            columns=['item_cd', 'cluster_id', 'rank'],
        )
        for k, v in dtypes.items():
            df_popularitemsincluster[k] = df_popularitemsincluster[k].astype(v)

        df_to_tsv(
            os.path.join(settings.RESULT_DIR, 'pypopular_items_in_user_cluster.tsv'),
            df_popularitemsincluster
        )