Skip to content

Instantly share code, notes, and snippets.

@mattparrilla
Created November 7, 2019 21:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattparrilla/b3891d6dc0cc3b6554992acb843b50df to your computer and use it in GitHub Desktop.
Save mattparrilla/b3891d6dc0cc3b6554992acb843b50df to your computer and use it in GitHub Desktop.
Find LCM traffic on an unknown UDP port
#!/usr/bin/env python
import lcm
import multiprocessing
import argparse
def try_port(port, channel, timeout):
"""Look for LCM channel on port"""
def handler(c, d):
print('Found {} on port {}'.format(channel if channel != '.+' else 'LCM', port))
return False
lc = lcm.LCM('udpm://239.255.76.67:{}?ttl=1'.format(port))
lc.subscribe(channel, handler)
return lc.handle_timeout(timeout * 1000)
def exit_pool(pool):
"""Return a function to exit on success, accepting pool as an argument"""
def exit_on_success(result):
"""Terminate the pool if we find our port"""
global lcm_found
if result:
pool.terminate()
pool.close()
lcm_found = True
return exit_on_success
if __name__ == '__main__':
lcm_found = False # Used to identify if we've found our LCM
# Define command line arguments
parser = argparse.ArgumentParser()
parser.add_argument('--channel', help='LCM channel to search for', default='any')
parser.add_argument('--timeout', help='Seconds to spend on each port', type=float,
default=0.05)
parser.add_argument('--min_port', help='Port to begin search on', type=int,
default=1024)
parser.add_argument('--max_port', help='Port to end search on', type=int,
default=65535)
# Parse command line args
args = parser.parse_args()
channel = args.channel if args.channel != 'any' else '.+'
pool = multiprocessing.Pool()
exit_callback = exit_pool(pool)
for i in list(range(args.min_port, args.max_port)):
pool.apply_async(try_port, args=(i, channel, args.timeout),
callback=exit_callback)
# clean up after ourselves
pool.close()
pool.join()
if not lcm_found:
print('Did not find requested LCM traffic')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment