55import os
66import subprocess
77import uuid
8- import pathlib
98import json
109import datetime
1110import time
1211import requests
12+ import logging
13+
14+ from datetime import datetime
1315from dateutil import parser as DateParser
1416from subprocess import PIPE
15- from raft_sdk .raft_common import RaftDefinitions , RaftJsonDict , get_version
16- from raft_sdk .raft_service import RaftJobConfig , RaftJobError , print_status
17+ from raft_sdk .raft_common import RaftJsonDict , get_version
18+ from raft_sdk .raft_service import RaftJobConfig , print_status
19+
20+ from opencensus .ext .azure .log_exporter import AzureEventHandler
1721
1822script_dir = os .path .dirname (os .path .abspath (__file__ ))
1923json_hook = RaftJsonDict .raft_json_object_hook
@@ -132,9 +136,12 @@ def trigger_webhook(url, data, metadata=None):
132136
133137
134138class RaftLocalCLI ():
135- def __init__ (self , network = 'host' ):
139+ def __init__ (self , network = 'host' , telemetry = True ):
140+ # This will hole a cumulative count of the bugs found over the course of the job.
136141 self .bugs = []
137142 self .status = []
143+ self .appinsights_instrumentation_key = '9d67f59d-4f44-475c-9363-d0ae7ea61e95'
144+ self .telemetry = telemetry
138145
139146 self .network = network
140147 self .work_directory = work_directory
@@ -149,6 +156,55 @@ def __init__(self, network='host'):
149156 self .storage , self .secrets_path , self .events_sink = \
150157 init_local ()
151158
159+ self .source = "local"
160+ self .logger = logging .getLogger (__name__ )
161+ logging .basicConfig (level = logging .INFO )
162+ if (telemetry ):
163+ ai_key = 'InstrumentationKey=' + self .appinsights_instrumentation_key
164+ handler = AzureEventHandler (connection_string = ai_key )
165+ handler .add_telemetry_processor (self .telemetry_processor )
166+ self .logger .addHandler (handler )
167+
168+ # Remove identifying information
169+ def telemetry_processor (self , envelope ):
170+ envelope .tags ['ai.cloud.roleInstance' ] = ''
171+ return True
172+
173+ def common_custom_dimensions (self , units , count ):
174+ return {
175+ 'SiteHash' : str (uuid .getnode ()),
176+ 'Source' : self .source ,
177+ 'TimeStamp' : datetime .utcnow ().strftime ("%d/%m/%Y %H:%M:%S" ),
178+ 'Units' : units ,
179+ 'Version' : get_version (),
180+ 'Count' : count
181+ }
182+
183+ # These properties are used for the Created and Completed events
184+ def log_telemetry (self , name , units , count ):
185+ common = self .common_custom_dimensions (units , count )
186+ common .update ({'Name' : name })
187+ return {'custom_dimensions' : common }
188+
189+ # These properties are used for the BugsFound event
190+ def log_bugs_found_telemetry (self , toolname , units , count ):
191+ common = self .common_custom_dimensions (units , count )
192+ common .update ({'ToolName' : toolname })
193+ return {'custom_dimensions' : common }
194+
195+ # Record how many bugs were found by each tool
196+ def log_bugs_per_tool (self ):
197+ tools = {}
198+ for bug in self .bugs :
199+ toolname = bug ['Message' ]['Tool' ]
200+ if toolname in tools :
201+ tools [toolname ] += 1
202+ else :
203+ tools [toolname ] = 1
204+
205+ for toolname in tools :
206+ self .logger .info ("BugsFound" , extra = self .log_bugs_found_telemetry ('Task: ' + toolname , 'Bugs' , tools [toolname ]))
207+
152208 def mount_read_write (self , source , target ):
153209 return f'--mount type=bind,source="{ source } ",target="{ target } " '
154210
@@ -165,6 +221,8 @@ def common_environment_variables(self, job_id, work_dir):
165221 env += self .env_variable ('RAFT_CONTAINER_GROUP_NAME' , job_id )
166222 env += self .env_variable ('RAFT_WORK_DIRECTORY' , work_dir )
167223 env += self .env_variable ('RAFT_SITE_HASH' , '0' )
224+ if (self .telemetry ):
225+ env += self .env_variable ('RAFT_APP_INSIGHTS_KEY' , self .appinsights_instrumentation_key )
168226
169227 # If we are running in a github action (or some other unique environment)
170228 # we will set this value before running
@@ -174,6 +232,7 @@ def common_environment_variables(self, job_id, work_dir):
174232 env += self .env_variable ('RAFT_LOCAL' , 'Developer' )
175233 else :
176234 env += self .env_variable ('RAFT_LOCAL' , customLocal )
235+ self .source = customLocal
177236 return env
178237
179238 def process_job_events_sink (self , job_events_path ):
@@ -208,7 +267,7 @@ def process_job_events_sink(self, job_events_path):
208267 status .append (job_status [s ]['Message' ])
209268 self .status = status
210269 if len (bugs ) > 0 :
211- self .bugs = bugs
270+ self .bugs = self . bugs + bugs
212271
213272 def docker_create_bridge (self , network , job_id ):
214273 if network == 'host' :
@@ -399,21 +458,22 @@ def secrets_to_import(self, job_config):
399458 secrets = []
400459 testTasks = job_config .config .get ('testTasks' )
401460 if testTasks .get ('tasks' ):
402- for tt in testTasks ['tasks' ]:
403- if tt .get ('keyVaultSecrets' ):
404- for s in tt ['keyVaultSecrets' ]:
461+ for testTask in testTasks ['tasks' ]:
462+ if testTask .get ('keyVaultSecrets' ):
463+ for s in testTask ['keyVaultSecrets' ]:
405464 secrets .append (s )
406465 return secrets
407-
408-
466+
409467 def start_test_tasks (self , job_config , task_index ,\
410468 test_services_startup_delay , job_id , work_dir ,\
411469 job_dir , job_events , bridge_name , agent_utilities_url ):
412470
413471 testTasks = job_config .config .get ('testTasks' )
414472 if testTasks .get ('tasks' ):
415- for tt in testTasks ['tasks' ]:
416- config = self .tools [tt ['toolName' ]]
473+ for testTask in testTasks ['tasks' ]:
474+ # Record in telemetry that we are using a particular tool
475+ self .logger .info ("Created" , extra = self .log_telemetry ("Task: " + testTask ['toolName' ], "task" , 1 ))
476+ config = self .tools [testTask ['toolName' ]]
417477 std_out = docker ('pull ' + config ['container' ])
418478 print (std_out )
419479
@@ -424,8 +484,8 @@ def start_test_tasks(self, job_config, task_index,\
424484 if target_config .get ('localRun' ):
425485 testTasks ['targetConfiguration' ] = target_config ['localRun' ]
426486
427- for tt in testTasks ['tasks' ]:
428- config = self .tools [tt ['toolName' ]]
487+ for testTask in testTasks ['tasks' ]:
488+ config = self .tools [testTask ['toolName' ]]
429489 env = self .common_environment_variables (job_id , work_dir )
430490
431491 if (config .get ('environmentVariables' )):
@@ -441,17 +501,17 @@ def start_test_tasks(self, job_config, task_index,\
441501 args = map (lambda a : f'"{ a } "' , config ['run' ]['shellArguments' ])
442502 cmd = f"{ shell } { ' ' .join (args )} "
443503
444- if tt .get ('isIdling' ):
504+ if testTask .get ('isIdling' ):
445505 args = map (lambda a : f'"{ a } "' , config ['idle' ]['shellArguments' ])
446506 run_cmd = f"{ shell } { ' ' .join (args )} "
447507 startup_delay = 0
448508 else :
449509 run_cmd = cmd
450510 startup_delay = test_services_startup_delay
451511
452- task_dir = os .path .join (job_dir , tt ['outputFolder' ])
512+ task_dir = os .path .join (job_dir , testTask ['outputFolder' ])
453513 os .mkdir (task_dir )
454- task_events = os .path .join (job_events , tt ['outputFolder' ])
514+ task_events = os .path .join (job_events , testTask ['outputFolder' ])
455515 os .mkdir (task_events )
456516
457517 with open (os .path .join (task_dir , 'task-run.sh' ), 'w' ) as tc :
@@ -460,26 +520,26 @@ def start_test_tasks(self, job_config, task_index,\
460520
461521 env += self .env_variable ('RAFT_STARTUP_DELAY' , startup_delay )
462522 env += self .env_variable ('RAFT_RUN_CMD' , run_cmd )
463- env += self .env_variable ('RAFT_TOOL_RUN_DIRECTORY' , self .tool_paths [tt ['toolName' ]])
523+ env += self .env_variable ('RAFT_TOOL_RUN_DIRECTORY' , self .tool_paths [testTask ['toolName' ]])
464524 env += self .env_variable ('RAFT_POST_RUN_COMMAND' , '' )
465525 env += self .env_variable ('RAFT_CONTAINER_SHELL' , shell )
466526
467- if tt .get ('keyVaultSecrets' ):
468- for s in tt ['keyVaultSecrets' ]:
527+ if testTask .get ('keyVaultSecrets' ):
528+ for s in testTask ['keyVaultSecrets' ]:
469529 with open (os .path .join (self .secrets_path , s ), 'r' ) as secret_file :
470530 secret = secret_file .read ()
471531 env += self .env_variable (f'RAFT_{ s } ' , secret .strip ())
472532 # create work folder and mount it
473533
474534 # create task_config json, and save it to task_dir
475535 with open (os .path .join (task_dir , 'task-config.json' ), 'w' ) as tc :
476- if not (tt .get ('targetConfiguration' )):
477- tt ['targetConfiguration' ] = testTasks ['targetConfiguration' ]
536+ if not (testTask .get ('targetConfiguration' )):
537+ testTask ['targetConfiguration' ] = testTasks ['targetConfiguration' ]
478538
479- if not (tt .get ('Duration' )) and testTasks .get ('Duration' ):
480- tt ['Duration' ] = testTasks ['Duration' ]
539+ if not (testTask .get ('Duration' )) and testTasks .get ('Duration' ):
540+ testTask ['Duration' ] = testTasks ['Duration' ]
481541
482- json .dump (tt , tc , indent = 4 )
542+ json .dump (testTask , tc , indent = 4 )
483543
484544 mounts = self .mount_read_write (task_dir , work_dir )
485545 mounts += self .mount_read_write (task_events , '/raft-events-sink' )
@@ -493,7 +553,7 @@ def start_test_tasks(self, job_config, task_index,\
493553 for v in job_config .config .get ("readWriteFileShareMounts" ):
494554 mounts += self .mount_read_write (os .path .join (self .storage , v ['FileShareName' ]), v ['MountPath' ])
495555
496- container_name = f'raft-{ tt ["toolName" ]} -{ job_id } -{ task_index } '
556+ container_name = f'raft-{ testTask ["toolName" ]} -{ job_id } -{ task_index } '
497557
498558 # add command to execute
499559 cmd = self .docker_run_cmd (
@@ -564,6 +624,19 @@ def wait_for_container_termination(self, containers, service_containers,\
564624
565625 all_exited , _ , infos = self .check_containers_exited (containers )
566626 if all_exited :
627+ # Some status and bugs are not processed once the tasks finish
628+ # so process them now
629+ self .process_job_events_sink (job_events_path )
630+ print_status (self .status )
631+
632+ # Trigger bug found webhook for all the bugs we found.
633+ # Since self.bugs is a cumulative list of bugs found, just trigger
634+ # the webhooks once at the end of the run so there aren't multiple triggers
635+ # happening.
636+ if bug_found_webhook_url :
637+ for bug in self .bugs :
638+ trigger_webhook (bug_found_webhook_url , [bug ], metadata )
639+
567640 exit_infos = []
568641 for j in infos :
569642 exit_infos .append (
@@ -576,13 +649,9 @@ def wait_for_container_termination(self, containers, service_containers,\
576649 return exit_infos
577650 else :
578651 self .process_job_events_sink (job_events_path )
579-
580- if bug_found_webhook_url :
581- for bug in self .bugs :
582- trigger_webhook (bug_found_webhook_url , [bug ], metadata )
583-
584652 print_status (self .status )
585653
654+ # Trigger job status webhook
586655 if job_status_webhook_url :
587656 for k in self .status :
588657 trigger_webhook (job_status_webhook_url , [{'Message' : k }], metadata )
@@ -662,12 +731,25 @@ def new_job(self, job_config, job_status_webhook_url=None, bug_found_webhook_url
662731 if 'metadata' in job_config .config ['webhook' ]:
663732 metadata = job_config .config ['webhook' ]['metadata' ]
664733
734+ # Record in telemetry we've created a job
735+ self .logger .info ("Created" , extra = self .log_telemetry ("Job" , "job" , 1 ))
736+
665737 stats = self .wait_for_container_termination (test_task_container_names ,\
666738 test_target_container_names , [agent_utils ],\
667739 job_events , duration , metadata ,\
668740 job_status_webhook_url , bug_found_webhook_url )
669741 if stats :
670742 print (stats )
743+
744+ # Log the completion telemetry here so if there is a failure it's not logged.
745+ self .logger .info ("Completed" , extra = self .log_telemetry ("Job" , "job" , 1 ))
746+ # iterate through the tools and mark them as completed.
747+ testTasks = job_config .config .get ('testTasks' )
748+ if testTasks .get ('tasks' ):
749+ for testTask in testTasks ['tasks' ]:
750+ # Record in telemetry that we are using a particular tool
751+ self .logger .info ("Completed" , extra = self .log_telemetry ("Task: " + testTask ['toolName' ], "task" , 1 ))
752+
671753 finally :
672754 if len (test_task_container_names ) > 0 :
673755 self .docker_stop_containers (test_task_container_names )
@@ -679,6 +761,7 @@ def new_job(self, job_config, job_status_webhook_url=None, bug_found_webhook_url
679761
680762 try :
681763 self .docker_stop_containers (test_target_container_names )
764+
682765 except Exception as ex :
683766 print (f'Failed to stop test target containers due to { ex } ' )
684767
@@ -688,8 +771,7 @@ def new_job(self, job_config, job_status_webhook_url=None, bug_found_webhook_url
688771 except Exception as ex :
689772 print (f'Failed to stop agent utilities due to { ex } ' )
690773
691- #self.print_logs([agent_utils])
692- #self.print_logs(test_task_container_names)
774+ self .log_bugs_per_tool ()
693775
694776 print ("Job finished, cleaning up job containers" )
695777 print (f"------------------------ Job results: { job_dir } " )
@@ -707,6 +789,7 @@ def new_job(self, job_config, job_status_webhook_url=None, bug_found_webhook_url
707789 self .docker_remove_bridge (bridge_name )
708790 except Exception as ex :
709791 print (f'Failed to remove bridge { bridge_name } due to { ex } ' )
792+
710793 return {'jobId' : job_id }
711794
712795 def poll (self , job_id ):
@@ -732,7 +815,7 @@ def ArgumentRequired(name):
732815 print (f'Created events_sink folder: { event_sink } ' )
733816
734817 if job_action == 'create' :
735- cli = RaftLocalCLI (network = args .get ('network' ))
818+ cli = RaftLocalCLI (network = args .get ('network' ), telemetry = args . get ( 'no_telemetry' ) )
736819 json_config_path = args .get ('file' )
737820 if json_config_path is None :
738821 ArgumentRequired ('--file' )
@@ -753,7 +836,7 @@ def ArgumentRequired(name):
753836 job_config .config ['duration' ] = duration
754837
755838 cli .new_job (job_config , args .get ('jobStatusWebhookUrl' ), args .get ('bugFoundWebhookUrl' ))
756-
839+
757840
758841if __name__ == "__main__" :
759842 parser = argparse .ArgumentParser (
@@ -826,5 +909,12 @@ def ArgumentRequired(name):
826909This allows running of multiple jobs in parallel on the same device.
827910 ''' ))
828911
912+ job_parser .add_argument (
913+ '--no-telemetry' ,
914+ action = 'store_false' ,
915+ help = textwrap .dedent ('''\
916+ Use this flag to turn off anonymous telemetry
917+ ''' ))
918+
829919 args = parser .parse_args ()
830920 run (vars (args ))
0 commit comments