Created
November 7, 2019 21:40
-
-
Save mattparrilla/b3891d6dc0cc3b6554992acb843b50df to your computer and use it in GitHub Desktop.
Find LCM traffic on an unknown UDP port
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import lcm | |
import multiprocessing | |
import argparse | |
def try_port(port, channel, timeout): | |
"""Look for LCM channel on port""" | |
def handler(c, d): | |
print('Found {} on port {}'.format(channel if channel != '.+' else 'LCM', port)) | |
return False | |
lc = lcm.LCM('udpm://239.255.76.67:{}?ttl=1'.format(port)) | |
lc.subscribe(channel, handler) | |
return lc.handle_timeout(timeout * 1000) | |
def exit_pool(pool): | |
"""Return a function to exit on success, accepting pool as an argument""" | |
def exit_on_success(result): | |
"""Terminate the pool if we find our port""" | |
global lcm_found | |
if result: | |
pool.terminate() | |
pool.close() | |
lcm_found = True | |
return exit_on_success | |
if __name__ == '__main__': | |
lcm_found = False # Used to identify if we've found our LCM | |
# Define command line arguments | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--channel', help='LCM channel to search for', default='any') | |
parser.add_argument('--timeout', help='Seconds to spend on each port', type=float, | |
default=0.05) | |
parser.add_argument('--min_port', help='Port to begin search on', type=int, | |
default=1024) | |
parser.add_argument('--max_port', help='Port to end search on', type=int, | |
default=65535) | |
# Parse command line args | |
args = parser.parse_args() | |
channel = args.channel if args.channel != 'any' else '.+' | |
pool = multiprocessing.Pool() | |
exit_callback = exit_pool(pool) | |
for i in list(range(args.min_port, args.max_port)): | |
pool.apply_async(try_port, args=(i, channel, args.timeout), | |
callback=exit_callback) | |
# clean up after ourselves | |
pool.close() | |
pool.join() | |
if not lcm_found: | |
print('Did not find requested LCM traffic') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment