ファイルが置かれた瞬間にimport開始する

課題

(1) ファイルが置かれた瞬間にimport開始する(今回は空処理)仕組みの作成

  • /var/data/xxx.logみたいな場所に元ファイルが複数存在する。
  • db_importerバッチが、複数同時動作しても大丈夫なように(同じ元ファイルを重複してimportしないように)する。
  • PosixPath()を返すイテレータ。1ファイルでも、globでも、inotifyでも、同じインターフェイスで使う方は気にしない。差し替えるだけ。
  • pathを返すイテレータは、2種類。コマンドで、ファイルパスのglobを受け取って、それに合致するファイルパスを返すものと、コマンドで、監視ディレクトリと、ファイル名部分のglobを受け取って、それに合致するファイルが監視ディレクトリにmvされたとき、またはclose_on_writeされたときに、ファイルパスを返すもの。
  • マルチプロセスの場合には、同じファイルを同時に読むための、ファイルパスとオフセット範囲のイテレータ。次に、ファイルパス、もしくはファイルパスとオフセットのイテレータを受け取って、lineを返すイテレータ。必要に応じて、チャンクのitertools.isliceとか。

(2) import処理に関しての条件

  • 同時に同じファイルを処理しないようにするのは、ファイルロックがまず思いつくけど、ロック無しで。(シンボリックリンクでなんとかする)
  • 多数のファイルが一気に置かれた場合も、指定したプロセス数以下でマイペース処理すること
  • dbにロードしていく。エラーが起こったら、エラー用のフォルダに移動。成功したら、成功用のフォルダに移動。
  • 成功用フォルダは、8日後に自動削除されるような、systemd-tmpfiles.dの設定ファイルも書くこと。
  • コマンドオプションで、対象のファイルパス(globで指定できるように。引数で貰った文字列から、pathlibのglob使えばいいと思う)、成功用フォルダ、失敗用フォルダを指定できるように。
  • 上の方でエラーが起こったときに、一番下にエラーを伝えて、mvとかの処理させて、次のファイルに進むこと。

(3) ヒント

  • 対象ファイルパスは、globで指定できるように。引数で貰った文字列から、pathlibのglob使えばいいと思う
  • ファイルロックをせずに、シンボリックリンクでなんとかなりそうな気がするから考えて。force無しで、シンボリックリンク作ろうとして失敗したら、他のプロセスが処理してんじゃねーか、と。mtimeも見たほうがいいかもしれないけど。どっかにsymlink作って、それをmvするのが確実っぽいな。symlink作成時もしくはmv時に失敗したら、誰か処理中ってことで
  • inotifyの場合、1プロセスで監視と処理をやると、詰まるので、多数のファイルが置かれた場合に詰まるというか、「なんか起こったけど覚えきれませんでしたー」みたいなフラグが来るので、そのフラグ処理をいれる(結局、norifyで教えてもらうファイル名を処理するのではなく、シグナルととらえてglobで走査する)のと、なるべくつまらないように、別スレッドで監視することが望ましい。

8日後に消えるsystemd-tmpfiles.dの設定

*/usr/lib/tmpfiles.d/var.conf

#  This file is part of systemd.
#
#  systemd is free software; you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation; either version 2.1 of the License, or
#  (at your option) any later version.

# See tmpfiles.d(5) for details

d /var 0755 - - -

L /var/run - - - - ../run

d /var/log 0755 - - -
f /var/log/wtmp 0664 root utmp -
f /var/log/btmp 0600 root utmp -

d /var/cache 0755 - - -

d /var/lib 0755 - - -
v /var/lib/machines 0700 - - -

d /var/spool 0755 - - -

# 8日後に自動削除
d /var/data/mag/complete/ 0755 - - 7d

pythonでinotifyのloopを回す。

*accesslog_dbimport.py

# -*- coding: utf-8 -*-
# vim:tabstop=4:shiftwidth=4:expandtab

import signal
import sys
import glob
from pathlib import Path
import logging
import logging.handlers
import datetime
import inotify.adapters
from inotify.constants import IN_CLOSE_WRITE, IN_MOVED_TO, IN_Q_OVERFLOW

# ログハンドラーを設定する
LOG = 'logging.out'
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
handler = logging.handlers.RotatingFileHandler(
    LOG, maxBytes=10000, backupCount=5,)
my_logger.addHandler(handler)

# 作業ディレクトリ、成功or失敗ディレクトリ
BASE_DIR = "/var/data/mag/"
UPLOAD_DIR = "upload"
WORK_DIR = "work"
COMPLETE_DIR = "complete"
FAIL_DIR = "fail"
base_dir = Path(BASE_DIR)
complete_files = []


class SignalException(Exception):
    def __init__(self, message):
        super(SignalException, self).__init__(message)


def do_exit(sig, stack):
    raise SignalException("Exiting")


class AccesslogDBimport(object):
    def __init__(self):
        self.error_flag = False
        self.num = 0
        self.target_list = []
        self.values_list = []
        self.commit_count = 0

    def normal(self, dir_path, filename_glob_pattern):
        for path in dir_path.glob(filename_glob_pattern):
            try:
                result = yield path
            except Exception as e:
                print(e)
            else:
                if result is not None:
                    pass

    def ino(self, dir_path, filename_glob_pattern):
        i = inotify.adapters.Inotify()
        i.add_watch(bytes(dir_path), mask=IN_MOVED_TO | IN_Q_OVERFLOW | IN_CLOSE_WRITE)
        try:
            my_logger.info('Inotify start\t%s' % (datetime.datetime.now()))
            for event in i.event_gen():
                if event is None:
                    continue
                (header, type_names, dirname, filename) = event
                my_logger.info(
                    "INFO: WD=(%d) MASK=(%d) COOKIE=(%d) LEN=(%d) MASK->NAMES=%s FILENAME=[%s]"
                    % (header.wd, header.mask, header.cookie, header.len, type_names,
                        str(dirname + b"/" + filename))
                )
                for path in dir_path.glob(filename_glob_pattern):
                    yield path
                else:
                    yield None
        except Exception as e:
            print(e)
            my_logger.warning('%s\t%s' % (e, datetime.datetime.now()))
        finally:
            i.remove_watch(bytes(dir_path))
            my_logger.info('Inotify stop\t%s' % (datetime.datetime.now()))

    def success_handler(self, work_file):
        # work_fileは絶対パス
        base_dir = Path(BASE_DIR)
        path = base_dir / COMPLETE_DIR / '{0}_complete_{1}.log'.format(
            work_file.stem,
            datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        )
        if work_file.is_symlink() and work_file is not None:
            work_file.resolve().rename(path)
            work_file.unlink()
            my_logger.info('%s\t%s\t%s' % (str(path), "success_handler", datetime.datetime.now()))
            if self.commit_count:
                print("Commit!")
            self.commit_count = 0

    def error_handler(self, work_file):
        # work_fileは絶対パス
        base_dir = Path(BASE_DIR)
        path = base_dir / FAIL_DIR / '{0}_fail_{1}.log'.format(
            work_file.stem,
            datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        )
        if work_file.is_symlink() and work_file is not None:
            work_file.resolve().rename(path)
            work_file.unlink()
            my_logger.info('%s\t%s\t%s' % (str(path), "error_handler", datetime.datetime.now()))
            if self.commit_count:
                print("Not Commit!")
            self.num = 0
            self.target_list = []
            self.values_list = []
            self.commit_count = 0

    def work_file_iter(self, upload_file_iter):
        # upload_fileは絶対パス
        for upload_file in upload_file_iter:
            my_logger.info('%s\t%s\t%s' % (str(upload_file), "work_file_iter", datetime.datetime.now()))
            try:
                if upload_file is None:
                    result = yield None
                else:
                    work_path = base_dir / WORK_DIR / '{}.lock'.format(upload_file.stem)
                    work_path.symlink_to(upload_file)
                    result = yield work_path
            except Exception as e:
                my_logger.warning('%s\t%s\t%s' % (e, datetime.datetime.now(), str(upload_file)))

    def iterget_line(self, file_iter):
        file_list = []
        for path in file_iter:
            # my_logger.info('%s\t%s\t%s' % (str(path), "iterget_line", datetime.datetime.now()))
            if path is None:
                try:
                    result = yield None
                except Exception as e:
                    my_logger.warning('%s\t%s\t%s' % (e, datetime.datetime.now(), str(path)))
                    for filename in file_list:
                        self.error_handler(filename)
                    file_list = []
                    yield
                else:
                    if result is not None:
                        for filename in file_list:
                            self.success_handler(filename)
                        file_list = []
                        yield
            else:
                with path.open(mode='r',) as f:
                    for line in f:
                        try:
                            result = yield line
                        except Exception as e:
                            my_logger.warning('%s\t%s\t%s' % (e, datetime.datetime.now(), str(path)))
                            file_list.append(path)
                            for filename in file_list:
                                self.error_handler(filename)
                            file_list = []
                            yield
                            break
                        else:
                            if result is not None:
                                for filename in file_list:
                                    self.success_handler(filename)
                                file_list = []
                                yield
                    else:
                        file_list.append(path)

    def iterparse_line(self, line_iter):
        # line = next(line_iter)
        for line in line_iter:
            # my_logger.info('%s\t%s\t%s' % (line, "iterparse_line", datetime.datetime.now()))
            try:
                if line is None:
                    result = yield None
                else:
                    a, b, c = line.strip().split("\t")
                    result = yield [a, b, c]
            except Exception as e:
                result = line_iter.throw(e)
                yield
            else:
                if result is not None:
                    result = line_iter.send(result)
                    # my_logger.info('%s\t%s\t%s' % (result, "iterparse_linesuccess", datetime.datetime.now()))
                    yield

    def db_import(self, table):
        target = ",\n".join(self.target_list)
        sql = '''
            INSERT INTO `{table}`
            (`session_id`, `url`, `created_dt`) VALUES
            {target}
            ON DUPLICATE KEY UPDATE
            `session_id` = VALUES(`session_id`),
            `url` = VALUES(`url`),
            `created_dt` = VALUES(`created_dt`)
        '''.format(
            target=target,
            table=table
        )
        return sql % tuple(self.values_list)

    def insertchunk_iterator(self, iterparse_line_iter, chunk_size, table):
        # flag = False
        self.num = 0
        result = None
        self.target_list = []
        self.values_list = []
        for row in iterparse_line_iter:
            # my_logger.info('%s\t%s\t%s' % (row, "insertchunk_iterator", datetime.datetime.now()))
            if row is None:
                try:
                    if len(self.target_list) > 0:
                        sql = self.db_import(table)
                        self.num = 0
                        self.target_list = []
                        self.values_list = []
                        result = yield sql
                    else:
                        result = yield None
                except Exception as e:
                    result = iterparse_line_iter.throw(e)
                    yield
                else:
                    if result is not None:
                        result = iterparse_line_iter.send(result)
                        yield
            else:
                try:
                    # flag = False
                    self.num += 1
                    self.target_list.append("( %s, %s, %s )")
                    self.values_list.extend(row)
                    if self.num >= chunk_size:
                        sql = self.db_import(table)
                        self.num = 0
                        self.target_list = []
                        self.values_list = []
                        result = yield sql
                except Exception as e:
                    result = iterparse_line_iter.throw(e)
                    yield
                else:
                    if result is not None:
                        result = iterparse_line_iter.send(result)
                        # my_logger.info('%s\t%s\t%s' % (result, "insertchunksuccess", datetime.datetime.now()))
                        yield

    def chunkinsert_mysql(self, insertchunk_iter):
        for sql in insertchunk_iter:
            # my_logger.info('%s\t%s\t%s' % (sql, "chunkinsert_mysql", datetime.datetime.now()))
            try:
                if sql is not None:
                    if not self.commit_count:
                        print("SQL connect")
                    print(sql)
                    self.commit_count += 1
            except Exception as e:
                result = insertchunk_iter.throw(e)
            else:
                result = insertchunk_iter.send("success")
                # my_logger.info('%s\t%s\t%s' % (result, "success", datetime.datetime.now()))


def cmd():
    s = datetime.datetime.now()
    print(s + datetime.timedelta(0, 0, 0, 0, 0, 9))
    # シグナル
    signal.signal(signal.SIGINT, do_exit)
    signal.signal(signal.SIGHUP, do_exit)
    signal.signal(signal.SIGTERM, do_exit)

    try:
        update_dir = base_dir / UPLOAD_DIR
        filename_glob_pattern = "*.log"
        importer = AccesslogDBimport()
        upload_file_iter = importer.ino(update_dir, filename_glob_pattern)
        path_iter = importer.work_file_iter(upload_file_iter)
        line_iter = importer.iterget_line(path_iter)
        pased_item_iter = importer.iterparse_line(line_iter)
        insertchunk_iter = importer.insertchunk_iterator(pased_item_iter, chunk_size=13, table="t_access")
        importer.chunkinsert_mysql(insertchunk_iter)

    except SignalException as e1:
        my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
        logfiles = glob.glob('%s*' % LOG)
        print(logfiles)
        sys.exit(1)

    finally:
        e = datetime.datetime.now()
        print(str(e - s))


def main():
    cmd()


if __name__ == '__main__':
    main()

t_accessの元データファイルをランダムに作成

*webaccess_logmaker.py

# -*- coding: utf-8 -*-
# vim:tabstop=4:shiftwidth=4:expandtab

import datetime
import random
import signal
import sys
import os
import glob
import logging
import logging.handlers
import click
import tempfile
import shutil
import time

TOP_PAD = 1 << (59 + 4)
TS_LOW_MASK = (1 << 12) - 1
TS_HIGH_MASK = (1 << 59) - 1 - TS_LOW_MASK
VERSION = 4 << 12
LOCAL_EPOCH_TS = datetime.datetime(2017, 1, 1).timestamp()


class SignalException(Exception):
    def __init__(self, message):
        super(SignalException, self).__init__(message)


def do_exit(sig, stack):
    raise SignalException("Exiting")


class WebAccessDataGenerator(object):

    def mmid_maker(self):
        # mmidを作成する関数
        ts = int((datetime.datetime.now().timestamp() - LOCAL_EPOCH_TS) * 1e7)
        ts_low = ts & TS_LOW_MASK
        ts_high = ts & TS_HIGH_MASK
        # バージョンはuuid4と同じ表示にしてる
        ts_part = (TOP_PAD | (ts_high << 4) | VERSION | ts_low) << 64
        # バリアント10
        random_part = random.randrange((1 << 62) - 1) | (1 << (64 - 1))
        mmid_hex = str(hex(ts_part | random_part))
        return "-".join([
            mmid_hex[2:10],
            mmid_hex[10:14],
            mmid_hex[14:18],
            mmid_hex[18:22],
            mmid_hex[22:34],
        ])
        # return (ts_part | random_part).to_bytes(16, byteorder='big')

    def datamaker(self, num):
        session_id = self.mmid_maker()
        # import pdb; pdb.set_trace()
        url = "http://localhost/%d" % (random.randrange(1 << 10), )
        created_dt = (
            datetime.datetime.now() + datetime.timedelta(seconds=num)
        ).strftime('%Y-%m-%d %H:%M:%S')
        return "\t".join([session_id, url, created_dt]) + "\n"

    def dateiterator(self):
        for i in range(40):
            yield self.datamaker(i)
            # time.sleep(1)


class IterToFile(object):

    def __init__(self, iterable, output_filename):
        self.iterable = iterable
        self.output_filename = output_filename

    def write_file(self):
        with tempfile.TemporaryDirectory(
                suffix='_tsv', prefix='tmp_', dir='/var/tmp') as temp_dir:
            fname = self.write_str_into_file(temp_dir)
            shutil.move(fname, self.output_filename)

    def write_str_into_file(self, dir_name):
        with tempfile.NamedTemporaryFile(delete=False, dir=dir_name,) as f:
            for row in self.iterable:
                f.write(row.encode('utf-8'))
            return f.name


class WebAccessTsv(object):

    def __init__(self, outputf):
        self.outputf = os.path.abspath(os.path.expanduser(outputf))

    def write_into_file(self):
        _data = WebAccessDataGenerator()
        data_iter = []
        for term in _data.dateiterator():
            data_iter.append(term)
            if len(data_iter) > 1000:
                IterToFile(data_iter, self.outputf).write_file()
        else:
            IterToFile(data_iter, self.outputf).write_file()


@click.command()
@click.option('-o', '--outputf', default='/var/data/mag/upload/t_access.log')
def cmd(outputf):
    s = datetime.datetime.now()
    print(s + datetime.timedelta(0, 0, 0, 0, 0, 9))
    # シグナル
    signal.signal(signal.SIGINT, do_exit)
    signal.signal(signal.SIGHUP, do_exit)
    signal.signal(signal.SIGTERM, do_exit)
    # ログハンドラーを設定する
    LOG = 'logging.out'
    my_logger = logging.getLogger('MyLogger')
    my_logger.setLevel(logging.WARNING)
    handler = logging.handlers.RotatingFileHandler(
        LOG, maxBytes=2000, backupCount=5,)
    my_logger.addHandler(handler)

    try:
        worker = WebAccessTsv(outputf)
        worker.write_into_file()

    except SignalException as e1:
        my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
        logfiles = glob.glob('%s*' % LOG)
        print(logfiles)
        sys.exit(1)
    finally:
        e = datetime.datetime.now()
        print(str(e - s))


def main():
    cmd()


if __name__ == '__main__':
    main()