Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
3b8713d
POC of Dask replacing Pandas for CSV
tgaddair Oct 18, 2020
197314c
WIP performance improvements for categorical
tgaddair Oct 22, 2020
6bf7083
Removed debug code
tgaddair Oct 22, 2020
dd52d5c
Auto parallelize across CPU cores
tgaddair Oct 22, 2020
1f228b7
Added DataProcessingEngine
tgaddair Oct 23, 2020
12bfea7
Fixed split
tgaddair Oct 23, 2020
b39d372
Fixed API
tgaddair Oct 23, 2020
5b5fc60
Fixed data processing
tgaddair Oct 24, 2020
8b2b594
Drop index
tgaddair Oct 24, 2020
b7f9546
Added Petastorm dataset
tgaddair Oct 24, 2020
c952130
Cleaned up dataset creation
tgaddair Oct 24, 2020
2c93b60
Added Dataset
tgaddair Oct 24, 2020
ea6b4a7
Train from dataset
tgaddair Oct 24, 2020
b203fa6
Fixed bugs
tgaddair Oct 24, 2020
6b3bb08
Fixed string_utils
tgaddair Oct 25, 2020
ef2a314
Fixed tests
tgaddair Oct 25, 2020
9a743fe
Fixed temp dataset
tgaddair Oct 25, 2020
a630f14
Added Backend
tgaddair Oct 25, 2020
945d56e
Plumb through backend
tgaddair Oct 25, 2020
2aab9c5
Plumb backend through get_feature_meta
tgaddair Oct 25, 2020
0a0a7c4
Plumb through backend to add_feature_data
tgaddair Oct 25, 2020
3419178
Plumb in preprocess_for_prediction
tgaddair Oct 25, 2020
9d13c71
Fixed Pandas processing
tgaddair Oct 25, 2020
95a7952
Added cache management
tgaddair Oct 25, 2020
22b7538
Fixed unit tests
tgaddair Oct 25, 2020
fd7cbab
Removed context, engine to processor
tgaddair Oct 25, 2020
b63b316
Added numerical test
tgaddair Oct 25, 2020
0941ecd
RayBackend -> DaskBackend
tgaddair Oct 25, 2020
77a59f9
Fixed read_xsv
tgaddair Oct 25, 2020
cab90a1
Fixed set feature
tgaddair Oct 26, 2020
755c204
Untracked Netflix example
tgaddair Oct 26, 2020
a105e41
Added Dask requirements
tgaddair Oct 26, 2020
0cbb582
Fixed bag feature
tgaddair Oct 26, 2020
87f6e4b
Fixed vector feature
tgaddair Oct 26, 2020
5e089d4
Fixed h3
tgaddair Oct 26, 2020
4f6c0ba
Fixed date
tgaddair Oct 26, 2020
93cbccd
Fixed timeseries
tgaddair Oct 26, 2020
0e93043
Converted audio features processing
tgaddair Oct 27, 2020
bb33fc0
Fixed reshaping
tgaddair Oct 27, 2020
7e8d3c3
Fixed tests
tgaddair Oct 27, 2020
8924ef6
Removed debug print
tgaddair Oct 27, 2020
8e95be2
Fixed image processing
tgaddair Oct 27, 2020
b19ea58
Added tests for exceptions
tgaddair Oct 27, 2020
0afade5
meta_kwargs -> map_objects
tgaddair Oct 27, 2020
aabb582
Removed unused methods
tgaddair Oct 27, 2020
ac29b9d
Removed prints
tgaddair Oct 27, 2020
e3e7a14
Reduced runtime
tgaddair Oct 27, 2020
318da2f
Removed Dask dependency on critical code paths
tgaddair Oct 28, 2020
1a98f33
Added dask extra
tgaddair Oct 28, 2020
f625402
Fixed concatenation
tgaddair Oct 28, 2020
418600a
Fixed split empty dataset
tgaddair Oct 28, 2020
ed9451b
Fixed subselect
tgaddair Oct 28, 2020
a3de815
Restored tests, removed subselect
tgaddair Oct 28, 2020
eacfd63
Moved meta.json
tgaddair Oct 30, 2020
4d8e690
Fixed cache key
tgaddair Oct 30, 2020
cd70992
Updated Petastorm
tgaddair Oct 30, 2020
6c94f22
Spawn Dask tests
tgaddair Oct 30, 2020
cb8bb91
Merge branch 'master' into dask
tgaddair Oct 30, 2020
985b5bd
Fixed test_sequence_features.py
tgaddair Oct 30, 2020
60ad4f4
Added tables
tgaddair Oct 30, 2020
dff8461
Fixed image features
tgaddair Oct 31, 2020
0469097
Fixed string_utils.py
tgaddair Oct 31, 2020
1922f35
Fixed kfold
tgaddair Oct 31, 2020
8952e23
Fixed test splits
tgaddair Oct 31, 2020
5057be6
Fixed test_visualization_api.py
tgaddair Oct 31, 2020
20351e0
Fixed test_visualization.py
tgaddair Oct 31, 2020
92d64c1
Fixed Dask
tgaddair Oct 31, 2020
25ab59b
Fixed test_experiment.py
tgaddair Oct 31, 2020
9f92c38
Changed backend to processor in string_utils
tgaddair Nov 6, 2020
ccbaac9
Resolved conflicts
tgaddair Nov 18, 2020
60f1e59
Renamed Batcher to BatchProvider
tgaddair Nov 18, 2020
53a427a
Renamed get_proc_features_from_lists
tgaddair Nov 18, 2020
877b604
Fixed Dask tests
tgaddair Nov 18, 2020
8b819be
Refactored Batcher
tgaddair Nov 18, 2020
3268700
Renamed BatchProvider to Batcher
tgaddair Nov 18, 2020
093e3d2
Resolved conflicts
tgaddair Nov 26, 2020
d0e3aa1
Fixed API
tgaddair Nov 26, 2020
2d6a231
Fixed index column
tgaddair Nov 26, 2020
b7da27d
Fixed reshaping
tgaddair Nov 26, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ludwig/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from ludwig.backend import LOCAL_BACKEND, Backend, create_backend
from ludwig.constants import FULL, PREPROCESSING, TEST, TRAINING, VALIDATION
from ludwig.contrib import contrib_command
from ludwig.data.dataset import Dataset
from ludwig.data.dataset.base import Dataset
from ludwig.data.postprocessing import convert_predictions, postprocess
from ludwig.data.preprocessing import (load_metadata,
preprocess_for_prediction,
Expand Down
6 changes: 6 additions & 0 deletions ludwig/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ def get_local_backend():
return LOCAL_BACKEND


def create_dask_backend():
from ludwig.backend.dask import DaskBackend
return DaskBackend()


backend_registry = {
'dask': create_dask_backend,
'local': get_local_backend,
None: get_local_backend,
}
Expand Down
38 changes: 38 additions & 0 deletions ludwig/backend/dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from ludwig.backend.base import Backend
from ludwig.constants import NAME
from ludwig.data.dataframe.dask import DaskEngine


class DaskBackend(Backend):
def __init__(self):
super().__init__()
self._df_engine = DaskEngine()

@property
def df_engine(self):
return self._df_engine

@property
def supports_multiprocessing(self):
return False

def check_lazy_load_supported(self, feature):
raise ValueError(f'DaskBackend does not support lazy loading of data files at train time. '
f'Set preprocessing config `in_memory: True` for feature {feature[NAME]}')
1 change: 1 addition & 0 deletions ludwig/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@
PROC_COLUMN = 'proc_column'

CHECKSUM = 'checksum'
RESHAPE = 'reshape'
Empty file.
32 changes: 32 additions & 0 deletions ludwig/data/batcher/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from abc import ABC, abstractmethod


class Batcher(ABC):
@abstractmethod
def next_batch(self):
raise NotImplementedError()

@abstractmethod
def last_batch(self):
raise NotImplementedError()

@abstractmethod
def set_epoch(self, epoch):
raise NotImplementedError()
78 changes: 5 additions & 73 deletions ludwig/utils/batcher.py → ludwig/data/batcher/bucketed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,65 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import math

import numpy as np

from ludwig.data.sampler import DistributedSampler


class Batcher(object):
def __init__(self, dataset, sampler,
batch_size=128,
ignore_last=False):
# store our dataset as well
self.dataset = dataset
self.sampler = sampler
self.sample_it = iter(self.sampler)

self.ignore_last = ignore_last
self.batch_size = batch_size
self.total_size = len(sampler)
self.steps_per_epoch = int(
math.ceil(self.total_size / self.batch_size))
self.index = 0
self.step = 0

def next_batch(self):
if self.last_batch():
raise StopIteration()

indices = []
for _ in range(self.batch_size):
try:
indices.append(next(self.sample_it))
self.index += 1
except StopIteration:
break

sub_batch = {}
for proc_column in self.dataset.features:
sub_batch[proc_column] = self.dataset.get(
proc_column,
indices
)
from ludwig.data.batcher.base import Batcher

self.step += 1
return sub_batch

def last_batch(self):
return self.index >= self.total_size or (
self.ignore_last and
self.index + self.batch_size >= self.total_size)

def set_epoch(self, epoch):
self.index = 0
self.step = 0
self.sampler.set_epoch(epoch)
self.sample_it = iter(self.sampler)


class BucketedBatcher(object):
class BucketedBatcher(Batcher):
def __init__(self, dataset, bucketing_field, batch_size=128, buckets=10,
should_shuffle=True, ignore_last=False,
should_trim=False, trim_side='right'):
Expand Down Expand Up @@ -117,8 +64,7 @@ def next_batch(self):
if self.last_batch():
if self.should_shuffle:
self.shuffle(self.buckets_idcs)
self.reset()
self.epoch += 1
self.set_epoch(self.epoch + 1)

if self.ignore_last:
idcs_below_size = self.indices + self.batch_size < self.bucket_sizes
Expand Down Expand Up @@ -157,9 +103,10 @@ def last_batch(self):
self.indices + self.batch_size < self.bucket_sizes
))

def reset(self):
def set_epoch(self, epoch):
self.indices = np.array([0] * len(self.buckets_idcs))
self.step = 0
self.epoch = epoch


# todo future: reintroduce the bucketed batcher
Expand Down Expand Up @@ -213,18 +160,3 @@ def reset(self):
# ignore_last=ignore_last
# )
# return batcher

def initialize_batcher(dataset, batch_size=128,
should_shuffle=True,
seed=0,
ignore_last=False,
horovod=None):
sampler = DistributedSampler(len(dataset),
shuffle=should_shuffle,
seed=seed,
horovod=horovod)
batcher = Batcher(dataset,
sampler,
batch_size=batch_size,
ignore_last=ignore_last)
return batcher
61 changes: 61 additions & 0 deletions ludwig/data/batcher/iterable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
from ludwig.data.batcher.base import Batcher


class IterableBatcher(Batcher):
def __init__(self,
dataset,
iterable_dataset,
batch_size,
steps_per_epoch,
shuffle_buffer_size,
ignore_last=False):

if shuffle_buffer_size > 0:
iterable_dataset = iterable_dataset.shuffle(shuffle_buffer_size)
iterable_dataset = iterable_dataset.batch(batch_size)

self.dataset = dataset
self.data_it = iter(iterable_dataset)

self.ignore_last = ignore_last
self.steps_per_epoch = steps_per_epoch
self.step = 0

def next_batch(self):
if self.last_batch():
raise StopIteration()

sub_batch = {}
batch = next(self.data_it)
for features_name in self.dataset.features:
sub_batch[features_name] = self.dataset.get(
features_name,
batch
)

self.step += 1
return sub_batch

def last_batch(self):
return self.step >= self.steps_per_epoch or (
self.ignore_last and
self.step + 1 >= self.steps_per_epoch)

def set_epoch(self, epoch):
self.step = 0
70 changes: 70 additions & 0 deletions ludwig/data/batcher/random_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2019 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import math

from ludwig.data.batcher.base import Batcher


class RandomAccessBatcher(Batcher):
def __init__(self, dataset, sampler,
batch_size=128,
ignore_last=False):
# store our dataset as well
self.dataset = dataset
self.sampler = sampler
self.sample_it = iter(self.sampler)

self.ignore_last = ignore_last
self.batch_size = batch_size
self.total_size = len(sampler)
self.steps_per_epoch = int(
math.ceil(self.total_size / self.batch_size))
self.index = 0
self.step = 0

def next_batch(self):
if self.last_batch():
raise StopIteration()

indices = []
for _ in range(self.batch_size):
try:
indices.append(next(self.sample_it))
self.index += 1
except StopIteration:
break

sub_batch = {}
for proc_column in self.dataset.features:
sub_batch[proc_column] = self.dataset.get(
proc_column,
indices
)

self.step += 1
return sub_batch

def last_batch(self):
return self.index >= self.total_size or (
self.ignore_last and
self.index + self.batch_size >= self.total_size)

def set_epoch(self, epoch):
self.index = 0
self.step = 0
self.sampler.set_epoch(epoch)
self.sample_it = iter(self.sampler)
Loading