Source code for workbench.workers.view_deep
''' view_deep worker '''
import zerorpc
class ViewDeep(object):
[docs] ''' ViewDeep: Generates a view_deep for any file type '''
dependencies = ['meta']
def __init__(self):
self.workbench = zerorpc.Client(timeout=300, heartbeat=60)
self.workbench.connect("tcp://127.0.0.1:4242")
def execute(self, input_data):
[docs]
# Grab the tag from the input and switch view types
md5 = input_data['meta']['md5']
tag = input_data['meta']['type_tag']
if tag == 'exe':
result = self.workbench.work_request('view_pe_deep', md5)['view_pe_deep']
elif tag == 'pdf':
result = self.workbench.work_request('view_pdf_deep', md5)['view_pdf_deep']
elif tag == 'zip':
result = self.workbench.work_request('view_zip_deep', md5)['view_zip_deep']
elif tag == 'pcap':
result = self.workbench.work_request('view_pcap_deep', md5)['view_pcap_deep']
elif tag == 'swf':
result = self.workbench.work_request('view_swf_deep', md5)['view_swf_deep']
elif tag == 'mem':
result = self.workbench.work_request('view_memory_deep', md5)['view_memory_deep']
else:
# In the case of an unsupported MIME type just return the meta data
result = input_data
return result
def __del__(self):
[docs] ''' Class Cleanup '''
# Close zeroRPC client
self.workbench.close()
# Unit test: Create the class, the proper input and run the execute() method for a test
def test():
[docs] ''' view_deep.py: Unit test'''
import pprint
# This worker test requires a local server running
workbench = zerorpc.Client(timeout=300, heartbeat=60)
workbench.connect("tcp://127.0.0.1:4242")
# Generate the input data for this worker
import os
data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'../data/pdf/bad/067b3929f096768e864f6a04f04d4e54')
md5 = workbench.store_sample(open(data_path, 'rb').read(), 'bad_pdf', 'pdf')
input_data = workbench.work_request('meta', md5)
# Execute the worker
worker = ViewDeep()
output = worker.execute(input_data)
print '\nViewDeep: '
pprint.pprint(output)
# Generate the input data for this worker
data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'../data/pe/bad/033d91aae8ad29ed9fbb858179271232')
md5 = workbench.store_sample(open(data_path, 'rb').read(), 'bad_pe', 'exe')
input_data = workbench.work_request('meta', md5)
# Execute the worker
output = worker.execute(input_data)
print '\nViewDeep: '
pprint.pprint(output)
# Generate the input data for this worker
data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'../data/zip/good.zip')
md5 = workbench.store_sample(open(data_path, 'rb').read(), 'good.zip', 'zip')
input_data = workbench.work_request('meta', md5)
# Execute the worker
output = worker.execute(input_data)
print '\nViewDeep: '
pprint.pprint(output)
if __name__ == "__main__":
test()