Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions mesures/p2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# -*- coding: utf-8 -*-
from mesures.headers import P2_HEADER as COLUMNS
from mesures.p1 import P1
from mesures.dates import *
from mesures.utils import check_line_terminator_param
from zipfile import ZipFile
import os
import pandas as pd


class P2(P1):
Expand All @@ -12,3 +17,81 @@ def __init__(self, data, distributor=None, compression='bz2', columns=COLUMNS, v
"""
super(P2, self).__init__(data, distributor, compression=compression, columns=columns, version=version)
self.prefix = 'P2'

def reader(self, filepath):
if isinstance(filepath, str):
df = pd.read_csv(filepath, sep=';', names=self.columns)
elif isinstance(filepath, list):
df = pd.DataFrame(data=filepath)
else:
raise Exception("Filepath must be an str or a list")

df['tipo_medida'] = 11
df.groupby(
['cups', 'tipo_medida', 'timestamp', 'season', 'method']
).aggregate(
{'ai': 'sum',
'ae': 'sum',
'r1': 'sum',
'r2': 'sum',
'r3': 'sum',
'r4': 'sum'}
)

df['timestamp'] = df['timestamp'].apply(lambda x: x.strftime(DATETIME_MASK))

df['method'] = 1
df['res'] = 0
df['res2'] = 0

df['ai'] = df['ai'].astype('int')
df['ae'] = df['ae'].astype('int')
df['r1'] = df['r1'].astype('int')
df['r2'] = df['r2'].astype('int')
df['r3'] = df['r3'].astype('int')
df['r4'] = df['r4'].astype('int')

for key in self.columns:
if 'quality' in key and key not in df:
df[key] = 0
df = df[self.columns]
return df

def writer(self):
"""
P2 contains all curve measure in one file
:return: file path
"""
daymin = self.file['timestamp'].min()
daymax = self.file['timestamp'].max()
self.measures_date = daymin
zipped_file = ZipFile(os.path.join('/tmp', self.zip_filename), 'w')
while daymin <= daymax:
di = daymin
df = (datetime.strptime(daymin, DATETIME_MASK) + timedelta(days=1)).strftime(DATETIME_MASK)
self.measures_date = di
dataf = self.file[(self.file['timestamp'] >= di) & (self.file['timestamp'] < df)]
# dataf['timestamp'] = dataf['timestamp'].apply(lambda x: x.strftime(DATETIME_HOUR_MASK))
# Avoid to generate file if dataframe is empty
if len(dataf):
existing_files = os.listdir('/tmp')
if existing_files:
versions = [int(f.split('.')[1]) for f in existing_files if self.zip_filename.split('.')[0] in f]
if versions:
self.version = max(versions) + 1

file_path = os.path.join('/tmp', self.filename)
kwargs = {'sep': ';',
'header': False,
'columns': self.columns,
'index': False,
check_line_terminator_param(): ';\n'
}
if self.default_compression:
kwargs.update({'compression': self.default_compression})
dataf.to_csv(file_path, **kwargs)
zipped_file.write(file_path, arcname=os.path.basename(file_path))

daymin = df
zipped_file.close()
return zipped_file.filename
93 changes: 18 additions & 75 deletions mesures/p2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
from mesures.headers import P2_HEADER as COLUMNS
from mesures.p2 import P2
from mesures.utils import check_line_terminator_param
from zipfile import ZipFile
import os
import pandas as pd


class P2D(P2):
Expand All @@ -19,7 +17,7 @@ def __init__(self, data, distributor=None, comer=None, compression='bz2', column
super(P2D, self).__init__(data, distributor, compression=compression, columns=columns, version=version)
self.prefix = 'P2D'
self.comer = comer
self.measures_date = None
self.measures_date = self.file['timestamp'].min()

@property
def filename(self):
Expand All @@ -42,80 +40,25 @@ def zip_filename(self):
version=self.version
)

def reader(self, filepath):
if isinstance(filepath, str):
df = pd.read_csv(filepath, sep=';', names=self.columns)
elif isinstance(filepath, list):
df = pd.DataFrame(data=filepath)
else:
raise Exception("Filepath must be an str or a list")

df['tipo_medida'] = 11
df.groupby(
['cups', 'tipo_medida', 'timestamp', 'season', 'method']
).aggregate(
{'ai': 'sum',
'ae': 'sum',
'r1': 'sum',
'r2': 'sum',
'r3': 'sum',
'r4': 'sum'}
)

df['timestamp'] = df['timestamp'].apply(lambda x: x.strftime(DATETIME_MASK))

df['method'] = 1
df['res'] = 0
df['res2'] = 0

df['ai'] = df['ai'].astype('int')
df['ae'] = df['ae'].astype('int')
df['r1'] = df['r1'].astype('int')
df['r2'] = df['r2'].astype('int')
df['r3'] = df['r3'].astype('int')
df['r4'] = df['r4'].astype('int')

for key in self.columns:
if 'quality' in key and key not in df:
df[key] = 0
df = df[self.columns]
return df

def writer(self):
"""
P2D contains all curve measure in one file
:return: file path
:return: file path of generated P1D File
"""
daymin = self.file['timestamp'].min()
daymax = self.file['timestamp'].max()
self.measures_date = daymin
zipped_file = ZipFile(os.path.join('/tmp', self.zip_filename), 'w')
while daymin <= daymax:
di = daymin
df = (datetime.strptime(daymin, DATETIME_MASK) + timedelta(days=1)).strftime(DATETIME_MASK)
self.measures_date = di
dataf = self.file[(self.file['timestamp'] >= di) & (self.file['timestamp'] < df)]
# dataf['timestamp'] = dataf['timestamp'].apply(lambda x: x.strftime(DATETIME_HOUR_MASK))
# Avoid to generate file if dataframe is empty
if len(dataf):
existing_files = os.listdir('/tmp')
if existing_files:
versions = [int(f.split('.')[1]) for f in existing_files if self.zip_filename.split('.')[0] in f]
if versions:
self.version = max(versions) + 1
existing_files = os.listdir('/tmp')
if existing_files:
versions = [int(f.split('.')[1]) for f in existing_files if self.filename.split('.')[0] in f]
if versions:
self.version = max(versions) + 1

file_path = os.path.join('/tmp', self.filename)
kwargs = {'sep': ';',
'header': False,
'columns': self.columns,
'index': False,
check_line_terminator_param(): ';\n'
}
if self.default_compression:
kwargs.update({'compression': self.default_compression})
dataf.to_csv(file_path, **kwargs)
zipped_file.write(file_path, arcname=os.path.basename(file_path))
file_path = os.path.join('/tmp', self.filename)
kwargs = {'sep': ';',
'header': False,
'columns': self.columns,
'index': False,
check_line_terminator_param(): ';\n'
}
if self.default_compression:
kwargs.update({'compression': self.default_compression})

daymin = df
zipped_file.close()
return zipped_file.filename
self.file.to_csv(file_path, **kwargs)
return file_path
2 changes: 1 addition & 1 deletion spec/generation_files_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import numpy as np
import zipfile

DELETE_FILES = 'rm -rf *.bz2 *.zip *.[0123456789]'
DELETE_FILES = 'rm -rf *.bz2 *.zip *.[0123456789] medidas_*.txt'

class SampleData:
@staticmethod
Expand Down