Skip to content

Commit cfcd45d

Browse files
committed
log jobs and job defs in mlflow
1 parent 0f85ba9 commit cfcd45d

File tree

10 files changed

+147
-15
lines changed

10 files changed

+147
-15
lines changed

jupyter_scheduler/executors.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
from typing import Dict
88

99
import fsspec
10+
import mlflow
1011
import nbconvert
1112
import nbformat
1213
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1314

1415
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1516
from jupyter_scheduler.orm import Job, create_session
1617
from jupyter_scheduler.parameterize import add_parameters
18+
from jupyter_scheduler.scheduler import MLFLOW_SERVER_URI
1719
from jupyter_scheduler.utils import get_utc_timestamp
1820

1921

@@ -172,6 +174,22 @@ def create_output_files(self, job: DescribeJob, notebook_node):
172174
output, _ = cls().from_notebook_node(notebook_node)
173175
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
174176
f.write(output)
177+
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
178+
with mlflow.start_run(run_id=job.mlflow_run_id):
179+
try:
180+
ep.preprocess(nb)
181+
if job.parameters:
182+
mlflow.log_params(job.parameters)
183+
except CellExecutionError as e:
184+
raise e
185+
finally:
186+
for output_format in job.output_formats:
187+
cls = nbconvert.get_exporter(output_format)
188+
output, resources = cls().from_notebook_node(nb)
189+
output_path = self.staging_paths[output_format]
190+
with fsspec.open(output_path, "w", encoding="utf-8") as f:
191+
f.write(output)
192+
mlflow.log_artifact(output_path)
175193

176194
def supported_features(cls) -> Dict[JobFeature, bool]:
177195
return {

jupyter_scheduler/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class CreateJob(BaseModel):
8686
output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE
8787
compute_type: Optional[str] = None
8888
package_input_folder: Optional[bool] = None
89+
mlflow_logging: Optional[bool] = None
90+
mlflow_experiment_id: Optional[str] = None
91+
mlflow_run_id: Optional[str] = None
8992

9093
@root_validator
9194
def compute_input_filename(cls, values) -> Dict:
@@ -148,6 +151,9 @@ class DescribeJob(BaseModel):
148151
downloaded: bool = False
149152
package_input_folder: Optional[bool] = None
150153
packaged_files: Optional[List[str]] = []
154+
mlflow_logging: Optional[bool] = None
155+
mlflow_experiment_id: Optional[str] = None
156+
mlflow_run_id: Optional[str] = None
151157

152158
class Config:
153159
orm_mode = True
@@ -213,6 +219,8 @@ class CreateJobDefinition(BaseModel):
213219
schedule: Optional[str] = None
214220
timezone: Optional[str] = None
215221
package_input_folder: Optional[bool] = None
222+
mlflow_logging: Optional[bool] = None
223+
mlflow_experiment_id: Optional[str] = None
216224

217225
@root_validator
218226
def compute_input_filename(cls, values) -> Dict:
@@ -240,6 +248,8 @@ class DescribeJobDefinition(BaseModel):
240248
active: bool
241249
package_input_folder: Optional[bool] = None
242250
packaged_files: Optional[List[str]] = []
251+
mlflow_logging: Optional[bool] = None
252+
mlflow_experiment_id: Optional[str] = None
243253

244254
class Config:
245255
orm_mode = True

jupyter_scheduler/orm.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class CommonColumns:
8787
create_time = Column(Integer, default=get_utc_timestamp)
8888
package_input_folder = Column(Boolean)
8989
packaged_files = Column(JsonType, default=[])
90+
mlflow_logging = Column(Boolean)
91+
mlflow_experiment_id = Column(String(256), nullable=True)
9092

9193

9294
class Job(CommonColumns, Base):
@@ -100,6 +102,7 @@ class Job(CommonColumns, Base):
100102
url = Column(String(256), default=generate_jobs_url)
101103
pid = Column(Integer)
102104
idempotency_token = Column(String(256))
105+
mlflow_run_id = Column(String(256), nullable=True)
103106

104107

105108
class JobDefinition(CommonColumns, Base):

jupyter_scheduler/scheduler.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import shutil
55
import subprocess
66
from typing import Dict, List, Optional, Type, Union
7+
from uuid import uuid4
78

89
import fsspec
10+
import mlflow
911
import psutil
1012
from jupyter_core.paths import jupyter_data_dir
1113
from jupyter_server.transutils import _i18n
@@ -46,6 +48,10 @@
4648
create_output_filename,
4749
)
4850

51+
MLFLOW_SERVER_HOST = "127.0.0.1"
52+
MLFLOW_SERVER_PORT = "5000"
53+
MLFLOW_SERVER_URI = f"http://{MLFLOW_SERVER_HOST}:{MLFLOW_SERVER_PORT}"
54+
4955

5056
class BaseScheduler(LoggingConfigurable):
5157
"""Base class for schedulers. A default implementation
@@ -405,16 +411,13 @@ def start_mlflow_server(self):
405411
[
406412
"mlflow",
407413
"server",
408-
"--backend-store-uri",
409-
"./mlruns",
410-
"--default-artifact-root",
411-
"./mlartifacts",
412414
"--host",
413-
"0.0.0.0",
415+
MLFLOW_SERVER_HOST,
414416
"--port",
415-
"5000",
417+
MLFLOW_SERVER_PORT,
416418
]
417419
)
420+
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
418421

419422
def __init__(
420423
self,
@@ -481,6 +484,19 @@ def create_job(self, model: CreateJob) -> str:
481484
if not model.output_formats:
482485
model.output_formats = []
483486

487+
mlflow_client = mlflow.MlflowClient()
488+
489+
if model.job_definition_id and model.mlflow_experiment_id:
490+
experiment_id = model.mlflow_experiment_id
491+
else:
492+
experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}")
493+
model.mlflow_experiment_id = experiment_id
494+
input_file_path = os.path.join(self.root_dir, model.input_uri)
495+
mlflow.log_artifact(input_file_path, "input")
496+
497+
mlflow_run = mlflow_client.create_run(experiment_id=experiment_id, run_name=model.name)
498+
model.mlflow_run_id = mlflow_run.info.run_id
499+
484500
job = Job(**model.dict(exclude_none=True, exclude={"input_uri"}))
485501

486502
session.add(job)
@@ -628,6 +644,12 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
628644
if not self.file_exists(model.input_uri):
629645
raise InputUriError(model.input_uri)
630646

647+
mlflow_client = mlflow.MlflowClient()
648+
experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}")
649+
model.mlflow_experiment_id = experiment_id
650+
input_file_path = os.path.join(self.root_dir, model.input_uri)
651+
mlflow.log_artifact(input_file_path, "input")
652+
631653
job_definition = JobDefinition(**model.dict(exclude_none=True, exclude={"input_uri"}))
632654
session.add(job_definition)
633655
session.commit()

src/components/mlflow-checkbox.tsx

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ import React, { ChangeEvent } from 'react';
22

33
import { Checkbox, FormControlLabel, FormGroup } from '@mui/material';
44

5-
export type MLFlowCheckboxProps = {
5+
export function MLFlowLoggingControl(props: {
66
onChange: (event: ChangeEvent<HTMLInputElement>) => void;
7-
};
8-
9-
export function MLFlowCheckbox(props: MLFlowCheckboxProps): JSX.Element {
7+
}): JSX.Element {
108
return (
119
<FormGroup>
1210
<FormControlLabel
13-
control={<Checkbox onChange={props.onChange} value={'mlflowLogging'} />}
11+
control={<Checkbox onChange={props.onChange} name={'mlflowLogging'} />}
1412
label="Log with MLFlow"
1513
/>
1614
</FormGroup>

src/handler.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ export namespace Scheduler {
364364
schedule?: string;
365365
timezone?: string;
366366
package_input_folder?: boolean;
367+
mlflow_logging?: boolean;
368+
mlflow_experiment_id?: string;
367369
}
368370

369371
export interface IUpdateJobDefinition {
@@ -391,6 +393,8 @@ export namespace Scheduler {
391393
update_time: number;
392394
active: boolean;
393395
package_input_folder?: boolean;
396+
mlflow_logging: boolean;
397+
mlflow_experiment_id?: string;
394398
}
395399

396400
export interface IEmailNotifications {
@@ -418,6 +422,9 @@ export namespace Scheduler {
418422
output_formats?: string[];
419423
compute_type?: string;
420424
package_input_folder?: boolean;
425+
mlflow_logging?: boolean;
426+
mlflow_experiment_id?: string;
427+
mlflow_run_id?: string;
421428
}
422429

423430
export interface ICreateJobFromDefinition {
@@ -467,6 +474,9 @@ export namespace Scheduler {
467474
end_time?: number;
468475
downloaded: boolean;
469476
package_input_folder?: boolean;
477+
mlflow_logging?: boolean;
478+
mlflow_experiment_id?: string;
479+
mlflow_run_id?: string;
470480
}
471481

472482
export interface ICreateJobResponse {

src/mainviews/create-job.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import {
4343
import { Box, Stack } from '@mui/system';
4444
import { getErrorMessage } from '../util/errors';
4545
import { PackageInputFolderControl } from '../components/input-folder-checkbox';
46-
import { MLFlowCheckbox } from '../components/mlflow-checkbox';
46+
import { MLFlowLoggingControl } from '../components/mlflow-checkbox';
4747

4848
export interface ICreateJobProps {
4949
model: ICreateJobModel;
@@ -512,7 +512,7 @@ export function CreateJob(props: ICreateJobProps): JSX.Element {
512512
onChange={handleInputChange}
513513
inputFile={props.model.inputFile}
514514
/>
515-
<MLFlowCheckbox onChange={handleInputChange} />
515+
<MLFlowLoggingControl onChange={handleInputChange} />
516516
<OutputFormatPicker
517517
label={trans.__('Output formats')}
518518
name="outputFormat"

src/mainviews/detail-view/job-definition.tsx

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { Scheduler as SchedulerTokens } from '../../tokens';
3232

3333
import { timestampLocalize } from './job-detail';
3434
import { getErrorMessage } from '../../util/errors';
35+
import { OpenInNew } from '@mui/icons-material';
3536

3637
export interface IJobDefinitionProps {
3738
app: JupyterFrontEnd;
@@ -175,6 +176,18 @@ export function JobDefinition(props: IJobDefinitionProps): JSX.Element {
175176
>
176177
{trans.__('Edit Job Definition')}
177178
</Button>
179+
{model.mlflowLogging === true && (
180+
<Button
181+
variant="outlined"
182+
onClick={() => {
183+
const mlFlowUrl = `http://127.0.0.1:5000/#/experiments/${props.model?.mlflowExperimentId}`;
184+
window.open(mlFlowUrl);
185+
}}
186+
endIcon={<OpenInNew />}
187+
>
188+
{trans.__('Open in MLFlow')}
189+
</Button>
190+
)}
178191
<ConfirmDialogDeleteButton
179192
handleDelete={async () => {
180193
log('job-definition-detail.delete');
@@ -231,6 +244,16 @@ export function JobDefinition(props: IJobDefinitionProps): JSX.Element {
231244
label: trans.__('Time zone')
232245
}
233246
],
247+
[
248+
{
249+
value: model.mlflowLogging ? trans.__('Yes') : trans.__('No'),
250+
label: trans.__('MLFlow Logging')
251+
},
252+
{
253+
value: props.model.mlflowExperimentId,
254+
label: trans.__('MLFLow Experiment Id')
255+
}
256+
],
234257
[
235258
{
236259
value: model.packageInputFolder ? trans.__('Yes') : trans.__('No'),

src/mainviews/detail-view/job-detail.tsx

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ import {
3939
LabeledValue
4040
} from '../../components/labeled-value';
4141
import { getErrorMessage } from '../../util/errors';
42+
import { OpenInNew } from '@mui/icons-material';
43+
44+
const MLFLOW_SERVER_HOST = '127.0.0.1';
45+
const MLFLOW_SERVER_PORT = '5000';
46+
const MLFLOW_SERVER_URI = `http://${MLFLOW_SERVER_HOST}:${MLFLOW_SERVER_PORT}`;
4247

4348
export interface IJobDetailProps {
4449
app: JupyterFrontEnd;
@@ -167,6 +172,18 @@ export function JobDetail(props: IJobDetailProps): JSX.Element {
167172
{trans.__('Download Job Files')}
168173
</Button>
169174
)}
175+
{props.model?.mlflowLogging === true && (
176+
<Button
177+
variant="outlined"
178+
onClick={() => {
179+
const mlFlowUrl = `${MLFLOW_SERVER_URI}/#/experiments/${props.model?.mlflowExperimentId}/runs/${props.model?.mlflowRunId}`;
180+
window.open(mlFlowUrl);
181+
}}
182+
endIcon={<OpenInNew />}
183+
>
184+
{trans.__('Open in MLFlow')}
185+
</Button>
186+
)}
170187
{props.model !== null && props.model.status === 'IN_PROGRESS' && (
171188
<ConfirmDialogStopButton
172189
handleStop={handleStopJob}
@@ -250,6 +267,24 @@ export function JobDetail(props: IJobDetailProps): JSX.Element {
250267
value: timestampLocalize(props.model.endTime ?? ''),
251268
label: trans.__('End time')
252269
},
270+
{
271+
value: props.model.mlflowLogging ? trans.__('Yes') : trans.__('No'),
272+
273+
label: trans.__('MLFlow Logging')
274+
}
275+
],
276+
[
277+
{
278+
value: props.model.mlflowExperimentId,
279+
label: trans.__('MLFLow Experiment Id')
280+
},
281+
{
282+
value: props.model.mlflowRunId,
283+
284+
label: trans.__('MLFlow Run Id')
285+
}
286+
],
287+
[
253288
{
254289
value: props.model.packageInputFolder
255290
? trans.__('Yes')

src/model.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ export interface ICreateJobModel
100100
// Is the create button disabled due to a submission in progress?
101101
createInProgress?: boolean;
102102
packageInputFolder?: boolean;
103+
mlflowLogging?: boolean;
104+
mlflowExperimentId?: string;
105+
mlflowRunId?: string;
103106
}
104107

105108
export const defaultScheduleFields: ModelWithScheduleFields = {
@@ -312,6 +315,9 @@ export interface IJobDetailModel {
312315
job_files: Scheduler.IJobFile[];
313316
downloaded: boolean;
314317
packageInputFolder?: boolean;
318+
mlflowLogging?: boolean;
319+
mlflowExperimentId?: string;
320+
mlflowRunId?: string;
315321
}
316322

317323
export interface IJobDefinitionModel {
@@ -339,6 +345,8 @@ export interface IJobDefinitionModel {
339345
endTime?: number;
340346
outputPrefix?: string;
341347
packageInputFolder?: boolean;
348+
mlflowLogging?: boolean;
349+
mlflowExperimentId?: string;
342350
}
343351

344352
const convertParameters = (parameters: {
@@ -388,7 +396,10 @@ export function convertDescribeJobtoJobDetail(
388396
startTime: describeJob.start_time,
389397
endTime: describeJob.end_time,
390398
downloaded: describeJob.downloaded,
391-
packageInputFolder: describeJob.package_input_folder
399+
packageInputFolder: describeJob.package_input_folder,
400+
mlflowLogging: describeJob.mlflow_logging,
401+
mlflowExperimentId: describeJob.mlflow_experiment_id,
402+
mlflowRunId: describeJob.mlflow_run_id
392403
};
393404
}
394405

@@ -417,7 +428,9 @@ export function convertDescribeDefinitiontoDefinition(
417428
updateTime: describeDefinition.update_time,
418429
schedule: describeDefinition.schedule,
419430
timezone: describeDefinition.timezone,
420-
packageInputFolder: describeDefinition.package_input_folder
431+
packageInputFolder: describeDefinition.package_input_folder,
432+
mlflowLogging: describeDefinition.mlflow_logging,
433+
mlflowExperimentId: describeDefinition.mlflow_experiment_id
421434
};
422435
}
423436

0 commit comments

Comments
 (0)