Skip to content

Commit ff2754b

Browse files
committed
Added another test - multidevice resnet + documentation of LLAPI
1 parent e56ede3 commit ff2754b

File tree

6 files changed

+235
-12
lines changed

6 files changed

+235
-12
lines changed

src/DAG/dag_builder.h

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,89 @@
22

33
#include "redisai.h"
44

5+
/**
6+
* @brief Create a new empty DAG runInfo object.
7+
*/
58
RAI_DAGRunCtx *RAI_DAGRunCtxCreate(void);
69

10+
/**
11+
* @brief Create a new MODELRUN op for a DAG.
12+
* @param model The model to run.
13+
*/
714
RAI_DAGRunOp *RAI_DAGCreateModelRunOp(RAI_Model *model);
815

16+
/**
17+
* @brief Create a new SCRIPTRUN op for a DAG.
18+
* @param script The script to run.
19+
* @param func_name The specific function to run in the given script.
20+
*/
921
RAI_DAGRunOp *RAI_DAGCreateScriptRunOp(RAI_Script *script, const char *func_name);
1022

23+
/**
24+
* @brief Add an input key to a DAG run op (before inserting it to the DAG).
25+
* @param DAGop The DAG run op (MODELRUN / SCRIPTRUN).
26+
* @param input The tensor input name (this name should appear in a previous op of the DAG).
27+
*/
1128
int RAI_DAGRunOpAddInput(RAI_DAGRunOp *DAGOp, const char *input);
1229

30+
/**
31+
* @brief Add an output key to a DAG run op (before inserting it to the DAG).
32+
* @param DAGop The DAG run op (MODELRUN / SCRIPTRUN).
33+
* @param output The tensor output name (this name may appear in one of the following ops of the
34+
* DAG).
35+
*/
1336
int RAI_DAGRunOpAddOutput(RAI_DAGRunOp *DAGOp, const char *output);
1437

38+
/**
39+
* @brief Add a run op (MODELRUN/SCRIPTRUN) to a DAG.
40+
* @param runInfo The DAG to insert the op to.
41+
* @param DAGop The DAG run op (MODELRUN / SCRIPTRUN).
42+
* @param err Error is returned in case of a MODELRUN op if the number of inputs and outputs
43+
* given to the op does not match to the number of inputs and outputs in the model definition.
44+
*/
1545
int RAI_DAGAddRunOp(RAI_DAGRunCtx *run_info, RAI_DAGRunOp *DAGop, RAI_Error *err);
1646

47+
/**
48+
* @brief Load a tensor from keyspace to the DAG local context.
49+
* @param runInfo The DAG to load the tensor into.
50+
* @param tname The tensor key.
51+
* @param err Error is returned in case that the key does not exist, or not holding a tensor type.
52+
*/
1753
int RAI_DAGLoadTensor(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Error *err);
1854

55+
/**
56+
* @brief Load a tensor from keyspace to the DAG local context.
57+
* @param runInfo The DAG to load the tensor into.
58+
* @param tname The tensor key (can hold any binary string).
59+
* @param err Error is returned in case that the key does not exist, or not holding a tensor type.
60+
*/
1961
int RAI_DAGLoadTensorRS(RAI_DAGRunCtx *run_info, RedisModuleString *t_name, RAI_Error *err);
2062

63+
/**
64+
* @brief Append a TENSORSET op to a DAG (can use to load an intermediate tensors)
65+
* @param runInfo The DAG to append this op into.
66+
* @param tensor The tensor to set.
67+
*/
2168
int RAI_DAGAddTensorSet(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Tensor *tensor);
2269

70+
/**
71+
* @brief Append a TENSORGET op to a DAG (can use to output intermediate and final tensors)
72+
* @param runInfo The DAG to append this op into.
73+
* @param tensor The tensor to set.
74+
*/
2375
int RAI_DAGAddTensorGet(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Error *err);
2476

77+
/**
78+
* @brief Returns the number of ops in a DAG.
79+
*/
2580
size_t RAI_DAGNumOps(RAI_DAGRunCtx *run_info);
2681

82+
/**
83+
* @brief Free DAG's runInfo and all its internal ops.
84+
*/
2785
void RAI_DAGFree(RAI_DAGRunCtx *run_info);
2886

29-
void RAI_DAGRunOpFree(RAI_DAGRunOp *dagOp);
87+
/**
88+
* @brief Free a specific DAG run op (MODELRUN/SCRIPTRUN).
89+
*/
90+
void RAI_DAGRunOpFree(RAI_DAGRunOp *dagOp);

src/DAG/dag_execute.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,44 @@
1616
demangle the keys in order to persist them.*/
1717
int MangleTensorsNames(RedisAI_RunInfo *rinfo);
1818

19+
/**
20+
* @brief Run asynchronously a DAG. This will validate that the sequence of DAG ops
21+
* is valid and generate a unique key to the tensor that flow in the DAG (mangleTensorsNames)
22+
* Then, DAG is sent to the devices' run queues and will be execute by a workung thread.
23+
* @param DAGAsyncFinish This is a callback that will be called after the whole DAG finish its run.
24+
* @param private_data This is an input to the DAGAsyncFinish callback. Can be used to save the
25+
* results and errors
26+
* @param err Error is returned in case that the validation failed, and the DAG wasn't inserted to
27+
* the queues.
28+
*/
1929
int RAI_DAGRun(RAI_DAGRunCtx *run_info, RAI_OnFinishCB DAGAsyncFinish, void *private_data,
2030
RAI_Error *err);
2131

32+
/**
33+
* @brief This can be called in the finish CB, returns the number of outputs (TENSORGET ops).
34+
* @param finish_ctx This represents the DAG runInfo at the end of the run.
35+
*/
2236
size_t RAI_DAGNumOutputs(RAI_OnFinishCtx *finish_ctx);
2337

38+
/**
39+
* @brief This can be called in the finish CB, returns a specific output tensor (result of a
40+
* TENSORGET op).
41+
* @param finish_ctx This represents the DAG runInfo at the end of the run.
42+
* @param index The index of the TENSORGET op in the DAG.
43+
* @retval returns the tensor that the i'th TENSORGET op outputs.
44+
*/
2445
RAI_Tensor *RAI_DAGOutputTensor(RAI_OnFinishCtx *finish_ctx, size_t index);
2546

47+
/**
48+
* @brief Returns 1 if (at least) one of the DAG ops encountered an error.
49+
*/
2650
int RAI_DAGRunError(RAI_OnFinishCtx *finish_ctx);
2751

52+
/**
53+
* @brief This can be called in the finish CB, returns the status of a certain in a DAG.
54+
* @param finish_ctx This represents the DAG runInfo at the end of the run.
55+
* @param index Index of a specific op in the DAG.
56+
* @retval returns an object that represents the i'th op status, from which a user can
57+
* obtain the error code (error code is "OK" if no error has occurred) and error details.
58+
*/
2859
RAI_Error *RAI_DAGCopyOpStatus(RAI_OnFinishCtx *finish_ctx, size_t index);

src/tensor.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -486,12 +486,12 @@ int RAI_TensorGetValueAsDouble(RAI_Tensor *t, long long i, double *val) {
486486
*val = ((double *)data)[i];
487487
break;
488488
default:
489-
return 1;
489+
return REDISMODULE_ERR;
490490
}
491491
} else {
492-
return 1;
492+
return REDISMODULE_ERR;
493493
}
494-
return 0;
494+
return REDISMODULE_OK;
495495
}
496496

497497
int RAI_TensorGetValueAsLongLong(RAI_Tensor *t, long long i, long long *val) {
@@ -515,7 +515,7 @@ int RAI_TensorGetValueAsLongLong(RAI_Tensor *t, long long i, long long *val) {
515515
*val = ((int64_t *)data)[i];
516516
break;
517517
default:
518-
return 0;
518+
return REDISMODULE_ERR;
519519
}
520520
} else if (dtype.code == kDLUInt) {
521521
switch (dtype.bits) {
@@ -532,12 +532,12 @@ int RAI_TensorGetValueAsLongLong(RAI_Tensor *t, long long i, long long *val) {
532532
*val = ((uint64_t *)data)[i];
533533
break;
534534
default:
535-
return 0;
535+
return REDISMODULE_ERR;
536536
}
537537
} else {
538-
return 0;
538+
return REDISMODULE_ERR;
539539
}
540-
return 1;
540+
return REDISMODULE_OK;
541541
}
542542

543543
RAI_Tensor *RAI_TensorGetShallowCopy(RAI_Tensor *t) {
@@ -819,7 +819,7 @@ int RAI_TensorReplyWithValues(RedisModuleCtx *ctx, RAI_Tensor *t) {
819819
double val;
820820
for (i = 0; i < len; i++) {
821821
int ret = RAI_TensorGetValueAsDouble(t, i, &val);
822-
if (ret == 1) {
822+
if (ret == REDISMODULE_ERR) {
823823
RedisModule_ReplyWithError(ctx, "ERR cannot get values for this datatype");
824824
return -1;
825825
}
@@ -829,7 +829,7 @@ int RAI_TensorReplyWithValues(RedisModuleCtx *ctx, RAI_Tensor *t) {
829829
long long val;
830830
for (i = 0; i < len; i++) {
831831
int ret = RAI_TensorGetValueAsLongLong(t, i, &val);
832-
if (!ret) {
832+
if (ret == REDISMODULE_ERR) {
833833
RedisModule_ReplyWithError(ctx, "ERR cannot get values for this datatype");
834834
return -1;
835835
}

tests/flow/tests_llapi.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,47 @@ def test_dag_build_and_run(env):
103103

104104
ret = con.execute_command("RAI_llapi.DAGrun")
105105
env.assertEqual(ret, b'DAG run success')
106+
107+
108+
@ensure_test_module_loaded
109+
def test_llapi_dagrun_multidevice_resnet(env):
110+
con = env.getConnection()
111+
112+
model_name_0 = 'imagenet_model1:{{1}}'
113+
model_name_1 = 'imagenet_model2:{{1}}'
114+
script_name_0 = 'imagenet_script1:{{1}}'
115+
script_name_1 = 'imagenet_script2:{{1}}'
116+
inputvar = 'images'
117+
outputvar = 'output'
118+
image_key = 'image:{{1}}'
119+
temp_key1 = 'temp_key1:{{1}}'
120+
temp_key2_0 = 'temp_key2_0'
121+
temp_key2_1 = 'temp_key2_1'
122+
class_key_0 = 'output0:{{1}}'
123+
class_key_1 = 'output1:{{1}}'
124+
125+
model_pb, script, labels, img = load_resnet_test_data()
126+
127+
device_0 = 'CPU:1'
128+
device_1 = DEVICE
129+
130+
ret = con.execute_command('AI.MODELSET', model_name_0, 'TF', device_0,
131+
'INPUTS', inputvar,
132+
'OUTPUTS', outputvar,
133+
'BLOB', model_pb)
134+
env.assertEqual(ret, b'OK')
135+
136+
ret = con.execute_command('AI.MODELSET', model_name_1, 'TF', device_1,
137+
'INPUTS', inputvar,
138+
'OUTPUTS', outputvar,
139+
'BLOB', model_pb)
140+
env.assertEqual(ret, b'OK')
141+
ret = con.execute_command('AI.SCRIPTSET', script_name_0, device_0, 'SOURCE', script)
142+
env.assertEqual(ret, b'OK')
143+
ret = con.execute_command('AI.SCRIPTSET', script_name_1, device_1, 'SOURCE', script)
144+
env.assertEqual(ret, b'OK')
145+
ret = con.execute_command('AI.TENSORSET', image_key, 'UINT8', img.shape[1], img.shape[0], 3, 'BLOB', img.tobytes())
146+
env.assertEqual(ret, b'OK')
147+
148+
ret = con.execute_command("RAI_llapi.DAG_resnet")
149+
env.assertEqual(ret, b'DAG resnet success')

tests/module/LLAPI.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,5 +264,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
264264
return REDISMODULE_ERR;
265265
}
266266

267+
if(RedisModule_CreateCommand(ctx, "RAI_llapi.DAG_resnet", RAI_llapi_DAG_resnet, "",
268+
0, 0, 0) == REDISMODULE_ERR) {
269+
return REDISMODULE_ERR;
270+
}
271+
267272
return REDISMODULE_OK;
268273
}

tests/module/LLAPI_DAG.c

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,10 +286,9 @@ static int _testSimpleDAGRun2Error(RedisModuleCtx *ctx, RAI_DAGRunCtx *run_info)
286286
int res = LLAPIMODULE_ERR;
287287

288288
RedisModuleKey *key;
289-
RAI_Tensor *tensor = _getFromKeySpace(ctx, "a{1}", &key);
289+
RAI_Tensor *tensor = (RAI_Tensor*) _getFromKeySpace(ctx, "a{1}", &key);
290290
RedisModule_CloseKey(key);
291291
RedisAI_DAGAddTensorSet(run_info, "input1", tensor);
292-
//RedisAI_DAGLoadTensor(run_info, "a{1}", err);
293292

294293
// The script myscript{1} should exist in key space.
295294
RAI_Script *script = (RAI_Script*)_getFromKeySpace(ctx, "myscript{1}", &key);
@@ -383,3 +382,86 @@ int RAI_llapi_DAGRun(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
383382
}
384383
return RedisModule_ReplyWithSimpleString(ctx, "DAG run success");
385384
}
385+
386+
int RAI_llapi_DAG_resnet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
387+
REDISMODULE_NOT_USED(argv);
388+
389+
if(argc > 1) {
390+
RedisModule_WrongArity(ctx);
391+
return REDISMODULE_OK;
392+
}
393+
RAI_Error* err;
394+
RedisAI_InitError(&err);
395+
RAI_Tensor** outputs = array_new(RAI_Tensor *, 1);
396+
RAI_Error** opsStatus = array_new(RAI_Error *, 1);
397+
DAGRunResults results = {.outputs = outputs, .opsStatus = opsStatus};
398+
const char *test_res = "DAG resnet failed";
399+
400+
// Build the DAG with LOAD->SCRIPTRUN->MODELRUN->MODELRUN-SCRIPTRUN->SCRIPTRUN->TENSORGET
401+
RAI_DAGRunCtx *run_info = RedisAI_DAGRunCtxCreate();
402+
int status = RedisAI_DAGLoadTensor(run_info, "image:{{1}}", err);
403+
if (status != REDISMODULE_OK) goto cleanup;
404+
405+
RedisModuleKey *key;
406+
RAI_Script *script = (RAI_Script *)_getFromKeySpace(ctx, "imagenet_script1:{{1}}", &key);
407+
RedisModule_CloseKey(key);
408+
RAI_DAGRunOp *script_op = RedisAI_DAGCreateScriptRunOp(script, "pre_process_3ch");
409+
RedisAI_DAGRunOpAddInput(script_op, "image:{{1}}");
410+
RedisAI_DAGRunOpAddOutput(script_op, "tmp_key:{{1}}");
411+
RedisAI_DAGAddRunOp(run_info, script_op, err);
412+
413+
RAI_Model *model = (RAI_Model *)_getFromKeySpace(ctx, "imagenet_model1:{{1}}", &key);
414+
RedisModule_CloseKey(key);
415+
RAI_DAGRunOp *model_op = RedisAI_DAGCreateModelRunOp(model);
416+
RedisAI_DAGRunOpAddInput(model_op, "tmp_key:{{1}}");
417+
RedisAI_DAGRunOpAddOutput(model_op, "tmp_key2_0");
418+
status = RedisAI_DAGAddRunOp(run_info, model_op, err);
419+
if (status != REDISMODULE_OK) goto cleanup;
420+
421+
model = (RAI_Model *)_getFromKeySpace(ctx, "imagenet_model2:{{1}}", &key);
422+
RedisModule_CloseKey(key);
423+
model_op = RedisAI_DAGCreateModelRunOp(model);
424+
RedisAI_DAGRunOpAddInput(model_op, "tmp_key:{{1}}");
425+
RedisAI_DAGRunOpAddOutput(model_op, "tmp_key2_1");
426+
status = RedisAI_DAGAddRunOp(run_info, model_op, err);
427+
if (status != REDISMODULE_OK) goto cleanup;
428+
429+
script_op = RedisAI_DAGCreateScriptRunOp(script, "ensemble");
430+
RedisAI_DAGRunOpAddInput(script_op, "tmp_key2_0");
431+
RedisAI_DAGRunOpAddInput(script_op, "tmp_key2_1");
432+
RedisAI_DAGRunOpAddOutput(script_op, "tmp_key_1:{{1}}");
433+
RedisAI_DAGAddRunOp(run_info, script_op, err);
434+
435+
script_op = RedisAI_DAGCreateScriptRunOp(script, "post_process");
436+
RedisAI_DAGRunOpAddInput(script_op, "tmp_key_1:{{1}}");
437+
RedisAI_DAGRunOpAddOutput(script_op, "output:{{1}}");
438+
RedisAI_DAGAddRunOp(run_info, script_op, err);
439+
440+
RedisAI_DAGAddTensorGet(run_info, "output:{{1}}", err);
441+
442+
pthread_mutex_lock(&global_lock);
443+
if (RedisAI_DAGRun(run_info, _DAGFinishFunc, &results, err) != REDISMODULE_OK) {
444+
pthread_mutex_unlock(&global_lock);
445+
goto cleanup;
446+
}
447+
// Wait until the onFinish callback returns.
448+
pthread_cond_wait(&global_cond, &global_lock);
449+
pthread_mutex_unlock(&global_lock);
450+
451+
// Verify that we received the expected output.
452+
RedisModule_Assert(array_len(results.outputs) == 1);
453+
RAI_Tensor *out_tensor = outputs[0];
454+
long long val;
455+
if(RedisAI_TensorGetValueAsLongLong(out_tensor, 0, &val) != 0) goto cleanup;
456+
RedisAI_TensorFree(out_tensor);
457+
if (0<= val && val <= 1000) {
458+
test_res = "DAG resnet success";
459+
}
460+
461+
cleanup:
462+
RedisAI_FreeError(err);
463+
array_free(results.outputs);
464+
array_free(results.opsStatus);
465+
RedisAI_DAGFree(run_info);
466+
return RedisModule_ReplyWithSimpleString(ctx, test_res);
467+
}

0 commit comments

Comments
 (0)