1億行のファイルを読み込む方法

ファイルを書き出す

  • manyrowstsv.py
#!/usr/bin/env python
# coding:utf-8

import signal
import sys
import os
import glob
import logging
import logging.handlers
import shutil
import tempfile
import random
import datetime
import string
import click
import itertools


def write_str_into_file(
    iterable,
    output_filename,
):
    with tempfile.NamedTemporaryFile(delete=False, dir='/var/tmp',) as f:
        for row in iterable:
            f.write(row)
        shutil.move(f.name, output_filename)
    if os.path.exists(f.name):
        os.remove(f.name)


class SignalException(Exception):
    def __init__(self, message):
        super(SignalException, self).__init__(message)


def do_exit(sig, stack):
    raise SignalException("Exiting")


class TsvRowGenerator(object):

    def __init__(
        self, dt_iso_max, dt_iso_min, date_iso_max, date_iso_min,
            ):
        self.dt_iso_max = datetime.datetime.strptime(
            dt_iso_max, '%Y/%m/%d %H:%M:%S')
        self.dt_iso_min = datetime.datetime.strptime(
            dt_iso_min, '%Y/%m/%d %H:%M:%S')
        self.date_iso_max = datetime.datetime.strptime(
            date_iso_max, '%Y/%m/%d')
        self.date_iso_min = datetime.datetime.strptime(
            date_iso_min, '%Y/%m/%d')
        delta = self.dt_iso_max - self.dt_iso_min
        date_delta = self.date_iso_max - self.date_iso_min
        self.int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
        self.int_date_delta = (date_delta.days * 24 * 60 * 60) + \
            date_delta.seconds

    def iterows(self):
        yield (
            "\t".join(
                ["int", "short", "long", "double", "bool",
                    "char", "utf8", "dt_iso8601", "date_iso8601"]
                ) + "\n")
        while 1:
            rdp = random.randint(0, (1 << 32) - 1)
            random_second = rdp % self.int_delta
            randomtime = self.dt_iso_min + datetime.timedelta(
                seconds=random_second)
            random_date_second = rdp % self.int_date_delta
            randomdatetime = self.date_iso_min + datetime.timedelta(
                seconds=random_date_second)
            yield ("\t".join(
                [
                    str(rdp - (1 << 31)),
                    str((rdp >> 16) - (1 << 15)),
                    str(rdp - (1 << 31)),
                    str(random.uniform(0.1, 2.7)),
                    str(rdp % 2),
                    random.choice(
                        string.ascii_letters) + random.choice(
                        string.ascii_letters) + random.choice(
                        string.ascii_letters) + random.choice(
                        string.ascii_letters),
                    u"ごんた".encode('utf-8'),
                    randomtime.strftime("%Y-%m-%d %H:%M:%S"),
                    randomdatetime.strftime("%Y-%m-%d"),
                ]) + "\n")


@click.command()
@click.argument('rows', type=int, default=1000000)
@click.option(
    '-f', '--filename',
    default="~/kadai_1.tsv",
    )
@click.option('-D', '--dt-iso-max', default="2016/12/31 00:00:00")
@click.option('-d', '--dt-iso-min', default="2016/12/1 00:00:00")
@click.option('-T', '--date-iso-max', default="2016/12/31")
@click.option('-t', '--date-iso-min', default="2016/12/1")
def cmd(rows, filename, dt_iso_max, dt_iso_min,
        date_iso_max, date_iso_min):
    LOG_MANYROWSTSV = 'logging_warning.out'
    my_logger = logging.getLogger('MyLogger')
    my_logger.setLevel(logging.WARNING)
    handler = logging.handlers.RotatingFileHandler(
        LOG_MANYROWSTSV, maxBytes=2000, backupCount=5,)
    my_logger.addHandler(handler)
    s = datetime.datetime.now()
    print s + datetime.timedelta(0, 0, 0, 0, 0, 9)
    signal.signal(signal.SIGINT, do_exit)
    signal.signal(signal.SIGHUP, do_exit)
    signal.signal(signal.SIGTERM, do_exit)
    try:
        write_str_into_file(
            iterable=itertools.islice(
                TsvRowGenerator(
                    dt_iso_max, dt_iso_min, date_iso_max, date_iso_min,
                ).iterows(), rows + 1),
            output_filename=os.path.abspath(os.path.expanduser(filename)),)
        print os.path.abspath(os.path.expanduser(filename))
    except SignalException as e1:
        my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
        logfiles = glob.glob('%s*' % LOG_MANYROWSTSV)
        print logfiles
        sys.exit(1)
    finally:
        e = datetime.datetime.now()
        print str(e-s)


def main():
    cmd()


if __name__ == '__main__':
    main()
  • parselargetsv.py
#!/usr/bin/env python
# coding:utf-8

import signal
import sys
import os
import glob
import logging
import logging.handlers
import csv
import datetime
import click
import pickle
import struct
from manyrowstsv import write_str_into_file


class ParseRowsTsv(object):

    def __init__(
        self, file, inputf, outputf
            ):
        self.inputf = os.path.abspath(os.path.expanduser(inputf))
        self.outputf = os.path.abspath(os.path.expanduser(outputf))
        self.file = file

    def write_into_file(self):
        if self.file == 'pickle':
            write_str_into_file(self.pickle_tsv(), self.outputf)
        elif self.file == 'struct':
            write_str_into_file(self.struct_tsv(), self.outputf)

    def read_tsv(self):
        with open(self.inputf, "rb") as f:
            reader = csv.reader(f, delimiter="\t", lineterminator='\n')
            yield reader.next()
            for row in reader:
                row = (
                    int(row[0]),
                    int(row[1]),
                    int(row[2]),
                    float(row[3]),
                    int(row[4]),
                    row[5],
                    row[6],
                    row[7],
                    row[8],
                )
                yield row

    def pickle_tsv(self):
        for record in self.read_tsv():
            yield pickle.dumps(record)

    def struct_tsv(self):
        lines = self.read_tsv()
        line = lines.next()
        inits = struct.Struct(
            's '.join(
                [str(len(line[i])) for i in range(9)]) + 's')
        yield inits.pack(*line)
        for record in lines:
            s = struct.Struct(
                'i h l d ? %ds %ds %ds %ds' % (
                    len(record[5]), len(record[6]),
                    len(record[7]), len(record[8]),
                    )
                )
            yield s.pack(*record)


class SignalException(Exception):
    def __init__(self, message):
        super(SignalException, self).__init__(message)


def do_exit(sig, stack):
    raise SignalException("Exiting")


@click.command()
@click.option(
    '--file', type=click.Choice(['pickle', 'struct']),
    default='pickle')
@click.option('-i', '--inputf', default='~/kadai_1.tsv')
@click.option('-o', '--outputf', default='~/kadai_2.p')
def cmd(file, inputf, outputf):
    s = datetime.datetime.now()
    print s + datetime.timedelta(0, 0, 0, 0, 0, 9)
    # シグナル
    signal.signal(signal.SIGINT, do_exit)
    signal.signal(signal.SIGHUP, do_exit)
    signal.signal(signal.SIGTERM, do_exit)
    # ログハンドラーを設定する
    LOG_MANYROWSTSV = 'logging_warning.out'
    my_logger = logging.getLogger('MyLogger')
    my_logger.setLevel(logging.WARNING)
    handler = logging.handlers.RotatingFileHandler(
        LOG_MANYROWSTSV, maxBytes=2000, backupCount=5,)
    my_logger.addHandler(handler)

    parser = ParseRowsTsv(file, inputf, outputf)

    try:
        parser.write_into_file()

    except SignalException as e1:
        my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
        logfiles = glob.glob('%s*' % LOG_MANYROWSTSV)
        print logfiles
        sys.exit(1)
    finally:
        e = datetime.datetime.now()
        print str(e-s)


def main():
    cmd()


if __name__ == '__main__':
    main()

ファイルを読み込む

  • parsetsv_multitask_p3.py
#!/usr/bin/env python3
# coding:utf-8

import signal
import sys
import os
import glob
import logging
import logging.handlers
import datetime
import click
import pickle
import struct
import tempfile
import shutil
import math
import concurrent.futures
import errno


class SignalException(Exception):
    def __init__(self, message):
        super(SignalException, self).__init__(message)


def do_exit(sig, stack):
    raise SignalException("Exiting")


# ファイルを分割し、index、offset、offset + lengthを返す。
def tsv_separate_generator(inputf):
    CHUNK_SIZE = 1024 * 1024 * 100
    with open(inputf, 'rb') as f:
        f_size = os.stat(f.fileno()).st_size
        split_count = math.ceil(f_size / CHUNK_SIZE)
        start_offset = len(f.readline())
        for split_idx in range(split_count):
            offset = CHUNK_SIZE * (split_idx + 1) - 1
            f.seek(offset)
            last_line_len = len(f.readline())
            if offset < f_size:
                end_offset = offset + last_line_len
            else:
                end_offset = f_size
            yield (
                split_idx,
                start_offset,
                end_offset,
            )
            if end_offset >= f_size or last_line_len == 0:
                break
            start_offset = end_offset


def sum_file(self, files):
    with tempfile.NamedTemporaryFile(delete=False, dir='/var/tmp/',) as f:
        s = 0
        for file in self.files:
            with open(file) as f1:
                os.sendfile(f.fileno(), f1.fileno(), s)
            s += os.stat(file).st_size
        return f.name


class ReadTsvGenerator(object):

    def __init__(self, inputf, iterable):
        self.inputf = inputf
        self.iterable = iterable

    def read_tsv(self):
        with open(self.inputf, "rb") as f:
            start_offset = self.iterable[1],
            end_offset = self.iterable[2],
            f.seek(start_offset[0])
            start = start_offset[0]
            while start < end_offset[0]:
                row = f.readline()
                start += len(row)
                row = [
                    i.decode(
                        'utf-8'
                    ) for i in row.strip(b'\n').split(b'\t')
                    ]
                row = (
                    int(row[0]),
                    int(row[1]),
                    int(row[2]),
                    float(row[3]),
                    int(row[4]),
                    row[5],
                    row[6],
                    row[7],
                    row[8],
                )
                yield row


class ParseTsvGenerator(object):
    def __init__(self, iterable):
        self.iterable = iterable

    def pickle_tsv(self):
        lines = self.iterable
        next(lines)
        for record in lines:
            yield pickle.dumps(record)

    def struct_tsv(self):
        lines = self.iterable
        next(lines)
        for record in lines:
            s = struct.Struct(
                'i h l d ? %ds %ds %ds %ds' % (
                    len(record[5]), len(record[6]),
                    len(record[7]), len(record[8]),
                    )
                )
            yield s.pack(*record)


class ParseRowsTsv(object):

    def __init__(self, file, inputf, outputf):
        self.file = file
        self.inputf = os.path.abspath(os.path.expanduser(inputf))
        self.outputf = os.path.abspath(os.path.expanduser(outputf))

    # 単一タスク
    def dotask(self, rule):
        parsetsv = ParseTsvGenerator(
            ReadTsvGenerator(self.inputf, rule).read_tsv())
        if self.file == 'pickle':
            w = parsetsv.pickle_tsv()
        elif self.file == 'struct':
            w = parsetsv.struct_tsv()
        with tempfile.NamedTemporaryFile(
            delete=False, dir='/var/tmp', suffix='_dotask', prefix='tmp_',
                ) as f:
            for row in w:
                f.write(row)
            return f.name

    # マルチプロセス
    def multi_do_task(self):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            future_to_tsv = {
                executor.submit(
                    self.dotask, rule
                ): rule for rule in tsv_separate_generator(self.inputf)}
            with tempfile.TemporaryDirectory(
                    suffix='_tsv', prefix='tmp_', dir='/var/tmp') as temp_dir:
                with tempfile.NamedTemporaryFile(
                        suffix='_tsv', prefix='tmp_',
                        delete=False, dir=temp_dir,) as f:
                    s = 0
                    for future in concurrent.futures.as_completed(
                            future_to_tsv):
                        chunk = future_to_tsv[future][2] - \
                            future_to_tsv[future][1]
                        with open(future.result()) as separatefile:
                            os.sendfile(
                                f.fileno(), separatefile.fileno(), s, chunk)
                            s += os.stat(separatefile.fileno()).st_size
                        try:
                            os.remove(separatefile.name)
                        except OSError as exc:
                            if exc.errno != errno.ENOENT:
                                raise
                    shutil.move(f.name, self.outputf)


@click.command()
@click.option(
    '--file', type=click.Choice(['pickle', 'struct']),
    default='pickle')
@click.option('-i', '--inputf', default='~/kadai_1.tsv')
@click.option('-o', '--outputf', default='~/zone/kadai_2v3.p')
def cmd(file, inputf, outputf):
    s = datetime.datetime.now()
    print(s + datetime.timedelta(0, 0, 0, 0, 0, 9))
    # シグナル
    signal.signal(signal.SIGINT, do_exit)
    signal.signal(signal.SIGHUP, do_exit)
    signal.signal(signal.SIGTERM, do_exit)
    # ログハンドラーを設定する
    LOG_MANYROWSTSV = 'logging_warning.out'
    my_logger = logging.getLogger('MyLogger')
    my_logger.setLevel(logging.WARNING)
    handler = logging.handlers.RotatingFileHandler(
        LOG_MANYROWSTSV, maxBytes=2000, backupCount=5,)
    my_logger.addHandler(handler)

    parser = ParseRowsTsv(file, inputf, outputf)

    try:
        parser.multi_do_task()

    except SignalException as e1:
        my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
        logfiles = glob.glob('%s*' % LOG_MANYROWSTSV)
        print(logfiles)
        sys.exit(1)
    finally:
        e = datetime.datetime.now()
        print(str(e-s))


def main():
    cmd()


if __name__ == '__main__':
    main()