SFTPサーバに接続
sftpで実行するためには以下のようにする。
$ sftp -P port user@domain ## historyの時間確認 $ export HISTTIMEFORMAT='%F %T ' $ history | less ## lsコマンドで時間確認 $ ls -alt
git flowでdevelopが更新されていた場合の処理
一旦featureをpushしてdevelopにマージさせる。
$ git checkout feature/[name] $ git push $ git checkout develop $ git diff $ git diff origin/develop $ git pull $ git merge origin/feature/[name] $ git checkout feature/[name] ## git pullする前に処理状況を調べる $ git status $ git diff $ git pull
踏み台サーバなどを用いて、多段階接続をするとき(ssh -W)
Proxycommandをssh configに登録して、 ssh -W
から対応することができる。
Python3とsystemd-tmpfiles.dの便利なサイト
Python3ではsendとthrowについてわからなかったので参考にしたサイト。 iteratorについてはこんな使い方があるのかと感動した。
あと、systemd-tmpfiles.dにて /tmp/
or /var/
の定期的削除に関して、
systemd-tmpfiles-clean.serviceで削除をしている。そのためのconfは
/usr/lib/tmpfiles.d/*.conf, /run/tmpfiles.d/*.conf, and /etc/tmpfiles.d/*.conf.
に記載できる。
$ systemctl status systemd-tmpfiles-setup.service systemd-tmpfiles-clean.timer $ less /usr/lib/systemd/system/systemd-tmpfiles-clean.service
ファイルが置かれた瞬間にimport開始する
課題
(1) ファイルが置かれた瞬間にimport開始する(今回は空処理)仕組みの作成
- /var/data/xxx.logみたいな場所に元ファイルが複数存在する。
- db_importerバッチが、複数同時動作しても大丈夫なように(同じ元ファイルを重複してimportしないように)する。
- PosixPath()を返すイテレータ。1ファイルでも、globでも、inotifyでも、同じインターフェイスで使う方は気にしない。差し替えるだけ。
- pathを返すイテレータは、2種類。コマンドで、ファイルパスのglobを受け取って、それに合致するファイルパスを返すものと、コマンドで、監視ディレクトリと、ファイル名部分のglobを受け取って、それに合致するファイルが監視ディレクトリにmvされたとき、またはclose_on_writeされたときに、ファイルパスを返すもの。
- マルチプロセスの場合には、同じファイルを同時に読むための、ファイルパスとオフセット範囲のイテレータ。次に、ファイルパス、もしくはファイルパスとオフセットのイテレータを受け取って、lineを返すイテレータ。必要に応じて、チャンクのitertools.isliceとか。
(2) import処理に関しての条件
- 同時に同じファイルを処理しないようにするのは、ファイルロックがまず思いつくけど、ロック無しで。(シンボリックリンクでなんとかする)
- 多数のファイルが一気に置かれた場合も、指定したプロセス数以下でマイペース処理すること
- dbにロードしていく。エラーが起こったら、エラー用のフォルダに移動。成功したら、成功用のフォルダに移動。
- 成功用フォルダは、8日後に自動削除されるような、systemd-tmpfiles.dの設定ファイルも書くこと。
- コマンドオプションで、対象のファイルパス(globで指定できるように。引数で貰った文字列から、pathlibのglob使えばいいと思う)、成功用フォルダ、失敗用フォルダを指定できるように。
- 上の方でエラーが起こったときに、一番下にエラーを伝えて、mvとかの処理させて、次のファイルに進むこと。
(3) ヒント
- 対象ファイルパスは、globで指定できるように。引数で貰った文字列から、pathlibのglob使えばいいと思う
- ファイルロックをせずに、シンボリックリンクでなんとかなりそうな気がするから考えて。force無しで、シンボリックリンク作ろうとして失敗したら、他のプロセスが処理してんじゃねーか、と。mtimeも見たほうがいいかもしれないけど。どっかにsymlink作って、それをmvするのが確実っぽいな。symlink作成時もしくはmv時に失敗したら、誰か処理中ってことで
- inotifyの場合、1プロセスで監視と処理をやると、詰まるので、多数のファイルが置かれた場合に詰まるというか、「なんか起こったけど覚えきれませんでしたー」みたいなフラグが来るので、そのフラグ処理をいれる(結局、norifyで教えてもらうファイル名を処理するのではなく、シグナルととらえてglobで走査する)のと、なるべくつまらないように、別スレッドで監視することが望ましい。
8日後に消えるsystemd-tmpfiles.dの設定
*/usr/lib/tmpfiles.d/var.conf
# This file is part of systemd. # # systemd is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # See tmpfiles.d(5) for details d /var 0755 - - - L /var/run - - - - ../run d /var/log 0755 - - - f /var/log/wtmp 0664 root utmp - f /var/log/btmp 0600 root utmp - d /var/cache 0755 - - - d /var/lib 0755 - - - v /var/lib/machines 0700 - - - d /var/spool 0755 - - - # 8日後に自動削除 d /var/data/mag/complete/ 0755 - - 7d
pythonでinotifyのloopを回す。
*accesslog_dbimport.py
# -*- coding: utf-8 -*- # vim:tabstop=4:shiftwidth=4:expandtab import signal import sys import glob from pathlib import Path import logging import logging.handlers import datetime import inotify.adapters from inotify.constants import IN_CLOSE_WRITE, IN_MOVED_TO, IN_Q_OVERFLOW # ログハンドラーを設定する LOG = 'logging.out' my_logger = logging.getLogger('MyLogger') my_logger.setLevel(logging.DEBUG) handler = logging.handlers.RotatingFileHandler( LOG, maxBytes=10000, backupCount=5,) my_logger.addHandler(handler) # 作業ディレクトリ、成功or失敗ディレクトリ BASE_DIR = "/var/data/mag/" UPLOAD_DIR = "upload" WORK_DIR = "work" COMPLETE_DIR = "complete" FAIL_DIR = "fail" base_dir = Path(BASE_DIR) complete_files = [] class SignalException(Exception): def __init__(self, message): super(SignalException, self).__init__(message) def do_exit(sig, stack): raise SignalException("Exiting") class AccesslogDBimport(object): def __init__(self): self.error_flag = False self.num = 0 self.target_list = [] self.values_list = [] self.commit_count = 0 def normal(self, dir_path, filename_glob_pattern): for path in dir_path.glob(filename_glob_pattern): try: result = yield path except Exception as e: print(e) else: if result is not None: pass def ino(self, dir_path, filename_glob_pattern): i = inotify.adapters.Inotify() i.add_watch(bytes(dir_path), mask=IN_MOVED_TO | IN_Q_OVERFLOW | IN_CLOSE_WRITE) try: my_logger.info('Inotify start\t%s' % (datetime.datetime.now())) for event in i.event_gen(): if event is None: continue (header, type_names, dirname, filename) = event my_logger.info( "INFO: WD=(%d) MASK=(%d) COOKIE=(%d) LEN=(%d) MASK->NAMES=%s FILENAME=[%s]" % (header.wd, header.mask, header.cookie, header.len, type_names, str(dirname + b"/" + filename)) ) for path in dir_path.glob(filename_glob_pattern): yield path else: yield None except Exception as e: print(e) my_logger.warning('%s\t%s' % (e, datetime.datetime.now())) finally: i.remove_watch(bytes(dir_path)) my_logger.info('Inotify stop\t%s' % (datetime.datetime.now())) def success_handler(self, work_file): # work_fileは絶対パス base_dir = Path(BASE_DIR) path = base_dir / COMPLETE_DIR / '{0}_complete_{1}.log'.format( work_file.stem, datetime.datetime.now().strftime('%Y%m%d%H%M%S') ) if work_file.is_symlink() and work_file is not None: work_file.resolve().rename(path) work_file.unlink() my_logger.info('%s\t%s\t%s' % (str(path), "success_handler", datetime.datetime.now())) if self.commit_count: print("Commit!") self.commit_count = 0 def error_handler(self, work_file): # work_fileは絶対パス base_dir = Path(BASE_DIR) path = base_dir / FAIL_DIR / '{0}_fail_{1}.log'.format( work_file.stem, datetime.datetime.now().strftime('%Y%m%d%H%M%S') ) if work_file.is_symlink() and work_file is not None: work_file.resolve().rename(path) work_file.unlink() my_logger.info('%s\t%s\t%s' % (str(path), "error_handler", datetime.datetime.now())) if self.commit_count: print("Not Commit!") self.num = 0 self.target_list = [] self.values_list = [] self.commit_count = 0 def work_file_iter(self, upload_file_iter): # upload_fileは絶対パス for upload_file in upload_file_iter: my_logger.info('%s\t%s\t%s' % (str(upload_file), "work_file_iter", datetime.datetime.now())) try: if upload_file is None: result = yield None else: work_path = base_dir / WORK_DIR / '{}.lock'.format(upload_file.stem) work_path.symlink_to(upload_file) result = yield work_path except Exception as e: my_logger.warning('%s\t%s\t%s' % (e, datetime.datetime.now(), str(upload_file))) def iterget_line(self, file_iter): file_list = [] for path in file_iter: # my_logger.info('%s\t%s\t%s' % (str(path), "iterget_line", datetime.datetime.now())) if path is None: try: result = yield None except Exception as e: my_logger.warning('%s\t%s\t%s' % (e, datetime.datetime.now(), str(path))) for filename in file_list: self.error_handler(filename) file_list = [] yield else: if result is not None: for filename in file_list: self.success_handler(filename) file_list = [] yield else: with path.open(mode='r',) as f: for line in f: try: result = yield line except Exception as e: my_logger.warning('%s\t%s\t%s' % (e, datetime.datetime.now(), str(path))) file_list.append(path) for filename in file_list: self.error_handler(filename) file_list = [] yield break else: if result is not None: for filename in file_list: self.success_handler(filename) file_list = [] yield else: file_list.append(path) def iterparse_line(self, line_iter): # line = next(line_iter) for line in line_iter: # my_logger.info('%s\t%s\t%s' % (line, "iterparse_line", datetime.datetime.now())) try: if line is None: result = yield None else: a, b, c = line.strip().split("\t") result = yield [a, b, c] except Exception as e: result = line_iter.throw(e) yield else: if result is not None: result = line_iter.send(result) # my_logger.info('%s\t%s\t%s' % (result, "iterparse_linesuccess", datetime.datetime.now())) yield def db_import(self, table): target = ",\n".join(self.target_list) sql = ''' INSERT INTO `{table}` (`session_id`, `url`, `created_dt`) VALUES {target} ON DUPLICATE KEY UPDATE `session_id` = VALUES(`session_id`), `url` = VALUES(`url`), `created_dt` = VALUES(`created_dt`) '''.format( target=target, table=table ) return sql % tuple(self.values_list) def insertchunk_iterator(self, iterparse_line_iter, chunk_size, table): # flag = False self.num = 0 result = None self.target_list = [] self.values_list = [] for row in iterparse_line_iter: # my_logger.info('%s\t%s\t%s' % (row, "insertchunk_iterator", datetime.datetime.now())) if row is None: try: if len(self.target_list) > 0: sql = self.db_import(table) self.num = 0 self.target_list = [] self.values_list = [] result = yield sql else: result = yield None except Exception as e: result = iterparse_line_iter.throw(e) yield else: if result is not None: result = iterparse_line_iter.send(result) yield else: try: # flag = False self.num += 1 self.target_list.append("( %s, %s, %s )") self.values_list.extend(row) if self.num >= chunk_size: sql = self.db_import(table) self.num = 0 self.target_list = [] self.values_list = [] result = yield sql except Exception as e: result = iterparse_line_iter.throw(e) yield else: if result is not None: result = iterparse_line_iter.send(result) # my_logger.info('%s\t%s\t%s' % (result, "insertchunksuccess", datetime.datetime.now())) yield def chunkinsert_mysql(self, insertchunk_iter): for sql in insertchunk_iter: # my_logger.info('%s\t%s\t%s' % (sql, "chunkinsert_mysql", datetime.datetime.now())) try: if sql is not None: if not self.commit_count: print("SQL connect") print(sql) self.commit_count += 1 except Exception as e: result = insertchunk_iter.throw(e) else: result = insertchunk_iter.send("success") # my_logger.info('%s\t%s\t%s' % (result, "success", datetime.datetime.now())) def cmd(): s = datetime.datetime.now() print(s + datetime.timedelta(0, 0, 0, 0, 0, 9)) # シグナル signal.signal(signal.SIGINT, do_exit) signal.signal(signal.SIGHUP, do_exit) signal.signal(signal.SIGTERM, do_exit) try: update_dir = base_dir / UPLOAD_DIR filename_glob_pattern = "*.log" importer = AccesslogDBimport() upload_file_iter = importer.ino(update_dir, filename_glob_pattern) path_iter = importer.work_file_iter(upload_file_iter) line_iter = importer.iterget_line(path_iter) pased_item_iter = importer.iterparse_line(line_iter) insertchunk_iter = importer.insertchunk_iterator(pased_item_iter, chunk_size=13, table="t_access") importer.chunkinsert_mysql(insertchunk_iter) except SignalException as e1: my_logger.warning('%s: %s' % (e1, datetime.datetime.now())) logfiles = glob.glob('%s*' % LOG) print(logfiles) sys.exit(1) finally: e = datetime.datetime.now() print(str(e - s)) def main(): cmd() if __name__ == '__main__': main()
t_accessの元データファイルをランダムに作成
*webaccess_logmaker.py
# -*- coding: utf-8 -*- # vim:tabstop=4:shiftwidth=4:expandtab import datetime import random import signal import sys import os import glob import logging import logging.handlers import click import tempfile import shutil import time TOP_PAD = 1 << (59 + 4) TS_LOW_MASK = (1 << 12) - 1 TS_HIGH_MASK = (1 << 59) - 1 - TS_LOW_MASK VERSION = 4 << 12 LOCAL_EPOCH_TS = datetime.datetime(2017, 1, 1).timestamp() class SignalException(Exception): def __init__(self, message): super(SignalException, self).__init__(message) def do_exit(sig, stack): raise SignalException("Exiting") class WebAccessDataGenerator(object): def mmid_maker(self): # mmidを作成する関数 ts = int((datetime.datetime.now().timestamp() - LOCAL_EPOCH_TS) * 1e7) ts_low = ts & TS_LOW_MASK ts_high = ts & TS_HIGH_MASK # バージョンはuuid4と同じ表示にしてる ts_part = (TOP_PAD | (ts_high << 4) | VERSION | ts_low) << 64 # バリアント10 random_part = random.randrange((1 << 62) - 1) | (1 << (64 - 1)) mmid_hex = str(hex(ts_part | random_part)) return "-".join([ mmid_hex[2:10], mmid_hex[10:14], mmid_hex[14:18], mmid_hex[18:22], mmid_hex[22:34], ]) # return (ts_part | random_part).to_bytes(16, byteorder='big') def datamaker(self, num): session_id = self.mmid_maker() # import pdb; pdb.set_trace() url = "http://localhost/%d" % (random.randrange(1 << 10), ) created_dt = ( datetime.datetime.now() + datetime.timedelta(seconds=num) ).strftime('%Y-%m-%d %H:%M:%S') return "\t".join([session_id, url, created_dt]) + "\n" def dateiterator(self): for i in range(40): yield self.datamaker(i) # time.sleep(1) class IterToFile(object): def __init__(self, iterable, output_filename): self.iterable = iterable self.output_filename = output_filename def write_file(self): with tempfile.TemporaryDirectory( suffix='_tsv', prefix='tmp_', dir='/var/tmp') as temp_dir: fname = self.write_str_into_file(temp_dir) shutil.move(fname, self.output_filename) def write_str_into_file(self, dir_name): with tempfile.NamedTemporaryFile(delete=False, dir=dir_name,) as f: for row in self.iterable: f.write(row.encode('utf-8')) return f.name class WebAccessTsv(object): def __init__(self, outputf): self.outputf = os.path.abspath(os.path.expanduser(outputf)) def write_into_file(self): _data = WebAccessDataGenerator() data_iter = [] for term in _data.dateiterator(): data_iter.append(term) if len(data_iter) > 1000: IterToFile(data_iter, self.outputf).write_file() else: IterToFile(data_iter, self.outputf).write_file() @click.command() @click.option('-o', '--outputf', default='/var/data/mag/upload/t_access.log') def cmd(outputf): s = datetime.datetime.now() print(s + datetime.timedelta(0, 0, 0, 0, 0, 9)) # シグナル signal.signal(signal.SIGINT, do_exit) signal.signal(signal.SIGHUP, do_exit) signal.signal(signal.SIGTERM, do_exit) # ログハンドラーを設定する LOG = 'logging.out' my_logger = logging.getLogger('MyLogger') my_logger.setLevel(logging.WARNING) handler = logging.handlers.RotatingFileHandler( LOG, maxBytes=2000, backupCount=5,) my_logger.addHandler(handler) try: worker = WebAccessTsv(outputf) worker.write_into_file() except SignalException as e1: my_logger.warning('%s: %s' % (e1, datetime.datetime.now())) logfiles = glob.glob('%s*' % LOG) print(logfiles) sys.exit(1) finally: e = datetime.datetime.now() print(str(e - s)) def main(): cmd() if __name__ == '__main__': main()
サブバージョンのgit clone的なやつ
以下でなんのブランチ舞があるか確認できる。大概trunk、branches、tagだけ。
$ svn ls svn+ssh://svnserve@[ドメイン名など]/ $ svn checkout svn+ssh://svnserve@[ドメイン名など]/branches/ .
以下で変更点が確認できる。
$ svn log [file名] $ svn diff -c [r抜いた番号] | less