Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pure-python implementation #55

Merged
merged 17 commits into from
May 8, 2024
Merged

pure-python implementation #55

merged 17 commits into from
May 8, 2024

Conversation

blublinsky
Copy link
Collaborator

This is initial crack at pure python runtime. The only tested implementation is noop. Once we agree on the approach. Will do other transforms, that can do local run

@blublinsky blublinsky requested review from roytman and daw3rd May 2, 2024 12:37
@blublinsky blublinsky marked this pull request as draft May 2, 2024 12:47
Copy link
Member

@daw3rd daw3rd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks nice. At some point we need to separate ray and its dependencies into a separate wheel? Do you want to do that now?

self.params = self.base.params
return is_valid


if __name__ == "__main__":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking this main may need to move into python_launcher.py and ray_launcher.py.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for the others as well of course.

Copy link
Collaborator Author

@blublinsky blublinsky May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is

Copy link
Member

@roytman roytman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

several comments

:return: 0 - success or 1 - failure
"""
start_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"orchestrator started at {start_ts}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding a name to TransformConfiguration or Transformer (it can be automatically discovered from the class name) and print it here.
f"orchestrator {name} started at {start_ts}"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

logger.error("No DataAccess instance provided - exiting")
return 1
# Get files to process
files, profile = data_access.get_files_to_process()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to provide an option to pass data directly from one transformer to another.
plus data_access should support a specific file access.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, no. The results are saved to S3

data_access_factory=data_access_factory, statistics=statistics, params=transform_config
)
# process data
logger.debug("Begin processing files")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, the transformer name here will be useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

logger.info(f"Completed {completed} files in {(time.time() - t_start)/60} min")
logger.debug("Done processing files, waiting for flush() completion.")
# invoke flush to ensure that all results are returned
start = time.time()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are several start* variables, I'd rename it here to something like start_flushing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah

"job_output_stats": stats,
}
logger.debug(f"Saved job metadata: {metadata}.")
data_access.save_job_metadata(metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we always assume that there is the output folder? what about processing data from one transformer to another.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. we assume that there is input/output


def _submit_for_execution(self) -> int:
"""
Submit for Ray execution
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? submit to Ray execution?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

transform_config=self.transform_runtime_config,
)
logger.debug("Completed orchestrator")
time.sleep(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this sleep?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

logger.debug("Completed orchestrator")
time.sleep(10)
except Exception as e:
logger.info(f"Exception running ray remote orchestration\n{e}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the message is wrong

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

# execute local processing
logger.debug(f"Begin transforming table from {f_name}")
out_tables, stats = self.transform.transform(table=table)
logger.debug(f"Done transforming table from {f_name}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add here the length of the out_tables

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah

try:
# get flush results
logger.debug(f"Begin flushing transform")
out_tables, stats = self.transform.flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put t_start = time.time() before this line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah, its just debug

@blublinsky blublinsky marked this pull request as ready for review May 3, 2024 15:36

class TransformConfiguration(CLIArgumentProvider):
"""
Provides support the configuration of a transformer running in the ray environment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove 'ray'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everywhere :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. Its fortunately in 1 place

cli_prefix = "runtime_"


class TransformExecutionConfiguration(CLIArgumentProvider):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch a new class. I guess this needs some doc in the doc directory.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but without it it will be a lot of duplication of code

@@ -44,7 +44,7 @@
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}


class TestLauncher(TransformLauncher):
class TestLauncher(TransformLauncherRay):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a general common about all ray launcher esting. our classes need generalization to not only use/create ray-based launchers and then add cases for the pure python launcher.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@daw3rd daw3rd merged commit 1db0cd9 into dev May 8, 2024
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants