ファイルを書き出す
import signal
import sys
import os
import glob
import logging
import logging.handlers
import shutil
import tempfile
import random
import datetime
import string
import click
import itertools
def write_str_into_file(
iterable,
output_filename,
):
with tempfile.NamedTemporaryFile(delete=False, dir='/var/tmp',) as f:
for row in iterable:
f.write(row)
shutil.move(f.name, output_filename)
if os.path.exists(f.name):
os.remove(f.name)
class SignalException(Exception):
def __init__(self, message):
super(SignalException, self).__init__(message)
def do_exit(sig, stack):
raise SignalException("Exiting")
class TsvRowGenerator(object):
def __init__(
self, dt_iso_max, dt_iso_min, date_iso_max, date_iso_min,
):
self.dt_iso_max = datetime.datetime.strptime(
dt_iso_max, '%Y/%m/%d %H:%M:%S')
self.dt_iso_min = datetime.datetime.strptime(
dt_iso_min, '%Y/%m/%d %H:%M:%S')
self.date_iso_max = datetime.datetime.strptime(
date_iso_max, '%Y/%m/%d')
self.date_iso_min = datetime.datetime.strptime(
date_iso_min, '%Y/%m/%d')
delta = self.dt_iso_max - self.dt_iso_min
date_delta = self.date_iso_max - self.date_iso_min
self.int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
self.int_date_delta = (date_delta.days * 24 * 60 * 60) + \
date_delta.seconds
def iterows(self):
yield (
"\t".join(
["int", "short", "long", "double", "bool",
"char", "utf8", "dt_iso8601", "date_iso8601"]
) + "\n")
while 1:
rdp = random.randint(0, (1 << 32) - 1)
random_second = rdp % self.int_delta
randomtime = self.dt_iso_min + datetime.timedelta(
seconds=random_second)
random_date_second = rdp % self.int_date_delta
randomdatetime = self.date_iso_min + datetime.timedelta(
seconds=random_date_second)
yield ("\t".join(
[
str(rdp - (1 << 31)),
str((rdp >> 16) - (1 << 15)),
str(rdp - (1 << 31)),
str(random.uniform(0.1, 2.7)),
str(rdp % 2),
random.choice(
string.ascii_letters) + random.choice(
string.ascii_letters) + random.choice(
string.ascii_letters) + random.choice(
string.ascii_letters),
u"ごんた".encode('utf-8'),
randomtime.strftime("%Y-%m-%d %H:%M:%S"),
randomdatetime.strftime("%Y-%m-%d"),
]) + "\n")
@click.command()
@click.argument('rows', type=int, default=1000000)
@click.option(
'-f', '--filename',
default="~/kadai_1.tsv",
)
@click.option('-D', '--dt-iso-max', default="2016/12/31 00:00:00")
@click.option('-d', '--dt-iso-min', default="2016/12/1 00:00:00")
@click.option('-T', '--date-iso-max', default="2016/12/31")
@click.option('-t', '--date-iso-min', default="2016/12/1")
def cmd(rows, filename, dt_iso_max, dt_iso_min,
date_iso_max, date_iso_min):
LOG_MANYROWSTSV = 'logging_warning.out'
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.WARNING)
handler = logging.handlers.RotatingFileHandler(
LOG_MANYROWSTSV, maxBytes=2000, backupCount=5,)
my_logger.addHandler(handler)
s = datetime.datetime.now()
print s + datetime.timedelta(0, 0, 0, 0, 0, 9)
signal.signal(signal.SIGINT, do_exit)
signal.signal(signal.SIGHUP, do_exit)
signal.signal(signal.SIGTERM, do_exit)
try:
write_str_into_file(
iterable=itertools.islice(
TsvRowGenerator(
dt_iso_max, dt_iso_min, date_iso_max, date_iso_min,
).iterows(), rows + 1),
output_filename=os.path.abspath(os.path.expanduser(filename)),)
print os.path.abspath(os.path.expanduser(filename))
except SignalException as e1:
my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
logfiles = glob.glob('%s*' % LOG_MANYROWSTSV)
print logfiles
sys.exit(1)
finally:
e = datetime.datetime.now()
print str(e-s)
def main():
cmd()
if __name__ == '__main__':
main()
import signal
import sys
import os
import glob
import logging
import logging.handlers
import csv
import datetime
import click
import pickle
import struct
from manyrowstsv import write_str_into_file
class ParseRowsTsv(object):
def __init__(
self, file, inputf, outputf
):
self.inputf = os.path.abspath(os.path.expanduser(inputf))
self.outputf = os.path.abspath(os.path.expanduser(outputf))
self.file = file
def write_into_file(self):
if self.file == 'pickle':
write_str_into_file(self.pickle_tsv(), self.outputf)
elif self.file == 'struct':
write_str_into_file(self.struct_tsv(), self.outputf)
def read_tsv(self):
with open(self.inputf, "rb") as f:
reader = csv.reader(f, delimiter="\t", lineterminator='\n')
yield reader.next()
for row in reader:
row = (
int(row[0]),
int(row[1]),
int(row[2]),
float(row[3]),
int(row[4]),
row[5],
row[6],
row[7],
row[8],
)
yield row
def pickle_tsv(self):
for record in self.read_tsv():
yield pickle.dumps(record)
def struct_tsv(self):
lines = self.read_tsv()
line = lines.next()
inits = struct.Struct(
's '.join(
[str(len(line[i])) for i in range(9)]) + 's')
yield inits.pack(*line)
for record in lines:
s = struct.Struct(
'i h l d ? %ds %ds %ds %ds' % (
len(record[5]), len(record[6]),
len(record[7]), len(record[8]),
)
)
yield s.pack(*record)
class SignalException(Exception):
def __init__(self, message):
super(SignalException, self).__init__(message)
def do_exit(sig, stack):
raise SignalException("Exiting")
@click.command()
@click.option(
'--file', type=click.Choice(['pickle', 'struct']),
default='pickle')
@click.option('-i', '--inputf', default='~/kadai_1.tsv')
@click.option('-o', '--outputf', default='~/kadai_2.p')
def cmd(file, inputf, outputf):
s = datetime.datetime.now()
print s + datetime.timedelta(0, 0, 0, 0, 0, 9)
signal.signal(signal.SIGINT, do_exit)
signal.signal(signal.SIGHUP, do_exit)
signal.signal(signal.SIGTERM, do_exit)
LOG_MANYROWSTSV = 'logging_warning.out'
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.WARNING)
handler = logging.handlers.RotatingFileHandler(
LOG_MANYROWSTSV, maxBytes=2000, backupCount=5,)
my_logger.addHandler(handler)
parser = ParseRowsTsv(file, inputf, outputf)
try:
parser.write_into_file()
except SignalException as e1:
my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
logfiles = glob.glob('%s*' % LOG_MANYROWSTSV)
print logfiles
sys.exit(1)
finally:
e = datetime.datetime.now()
print str(e-s)
def main():
cmd()
if __name__ == '__main__':
main()
ファイルを読み込む
import signal
import sys
import os
import glob
import logging
import logging.handlers
import datetime
import click
import pickle
import struct
import tempfile
import shutil
import math
import concurrent.futures
import errno
class SignalException(Exception):
def __init__(self, message):
super(SignalException, self).__init__(message)
def do_exit(sig, stack):
raise SignalException("Exiting")
def tsv_separate_generator(inputf):
CHUNK_SIZE = 1024 * 1024 * 100
with open(inputf, 'rb') as f:
f_size = os.stat(f.fileno()).st_size
split_count = math.ceil(f_size / CHUNK_SIZE)
start_offset = len(f.readline())
for split_idx in range(split_count):
offset = CHUNK_SIZE * (split_idx + 1) - 1
f.seek(offset)
last_line_len = len(f.readline())
if offset < f_size:
end_offset = offset + last_line_len
else:
end_offset = f_size
yield (
split_idx,
start_offset,
end_offset,
)
if end_offset >= f_size or last_line_len == 0:
break
start_offset = end_offset
def sum_file(self, files):
with tempfile.NamedTemporaryFile(delete=False, dir='/var/tmp/',) as f:
s = 0
for file in self.files:
with open(file) as f1:
os.sendfile(f.fileno(), f1.fileno(), s)
s += os.stat(file).st_size
return f.name
class ReadTsvGenerator(object):
def __init__(self, inputf, iterable):
self.inputf = inputf
self.iterable = iterable
def read_tsv(self):
with open(self.inputf, "rb") as f:
start_offset = self.iterable[1],
end_offset = self.iterable[2],
f.seek(start_offset[0])
start = start_offset[0]
while start < end_offset[0]:
row = f.readline()
start += len(row)
row = [
i.decode(
'utf-8'
) for i in row.strip(b'\n').split(b'\t')
]
row = (
int(row[0]),
int(row[1]),
int(row[2]),
float(row[3]),
int(row[4]),
row[5],
row[6],
row[7],
row[8],
)
yield row
class ParseTsvGenerator(object):
def __init__(self, iterable):
self.iterable = iterable
def pickle_tsv(self):
lines = self.iterable
next(lines)
for record in lines:
yield pickle.dumps(record)
def struct_tsv(self):
lines = self.iterable
next(lines)
for record in lines:
s = struct.Struct(
'i h l d ? %ds %ds %ds %ds' % (
len(record[5]), len(record[6]),
len(record[7]), len(record[8]),
)
)
yield s.pack(*record)
class ParseRowsTsv(object):
def __init__(self, file, inputf, outputf):
self.file = file
self.inputf = os.path.abspath(os.path.expanduser(inputf))
self.outputf = os.path.abspath(os.path.expanduser(outputf))
def dotask(self, rule):
parsetsv = ParseTsvGenerator(
ReadTsvGenerator(self.inputf, rule).read_tsv())
if self.file == 'pickle':
w = parsetsv.pickle_tsv()
elif self.file == 'struct':
w = parsetsv.struct_tsv()
with tempfile.NamedTemporaryFile(
delete=False, dir='/var/tmp', suffix='_dotask', prefix='tmp_',
) as f:
for row in w:
f.write(row)
return f.name
def multi_do_task(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
future_to_tsv = {
executor.submit(
self.dotask, rule
): rule for rule in tsv_separate_generator(self.inputf)}
with tempfile.TemporaryDirectory(
suffix='_tsv', prefix='tmp_', dir='/var/tmp') as temp_dir:
with tempfile.NamedTemporaryFile(
suffix='_tsv', prefix='tmp_',
delete=False, dir=temp_dir,) as f:
s = 0
for future in concurrent.futures.as_completed(
future_to_tsv):
chunk = future_to_tsv[future][2] - \
future_to_tsv[future][1]
with open(future.result()) as separatefile:
os.sendfile(
f.fileno(), separatefile.fileno(), s, chunk)
s += os.stat(separatefile.fileno()).st_size
try:
os.remove(separatefile.name)
except OSError as exc:
if exc.errno != errno.ENOENT:
raise
shutil.move(f.name, self.outputf)
@click.command()
@click.option(
'--file', type=click.Choice(['pickle', 'struct']),
default='pickle')
@click.option('-i', '--inputf', default='~/kadai_1.tsv')
@click.option('-o', '--outputf', default='~/zone/kadai_2v3.p')
def cmd(file, inputf, outputf):
s = datetime.datetime.now()
print(s + datetime.timedelta(0, 0, 0, 0, 0, 9))
signal.signal(signal.SIGINT, do_exit)
signal.signal(signal.SIGHUP, do_exit)
signal.signal(signal.SIGTERM, do_exit)
LOG_MANYROWSTSV = 'logging_warning.out'
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.WARNING)
handler = logging.handlers.RotatingFileHandler(
LOG_MANYROWSTSV, maxBytes=2000, backupCount=5,)
my_logger.addHandler(handler)
parser = ParseRowsTsv(file, inputf, outputf)
try:
parser.multi_do_task()
except SignalException as e1:
my_logger.warning('%s: %s' % (e1, datetime.datetime.now()))
logfiles = glob.glob('%s*' % LOG_MANYROWSTSV)
print(logfiles)
sys.exit(1)
finally:
e = datetime.datetime.now()
print(str(e-s))
def main():
cmd()
if __name__ == '__main__':
main()