Training courses

Kernel and Embedded Linux

Bootlin training courses

Embedded Linux, kernel,
Yocto Project, Buildroot, real-time,
graphics, boot time, debugging...

Bootlin logo

Elixir Cross Referencer

#!/usr/bin/python
# Tests p2p_connect
# Will try to connect to another peer
# and form a group
######### MAY NEED TO RUN AS SUDO #############

import dbus
import sys, os
import time
import gobject
import getopt
from dbus.mainloop.glib import DBusGMainLoop


def usage():
	print "Usage:"
	print "  %s -i <interface_name> -m <wps_method> \ " \
		% sys.argv[0]
	print "		-a <addr> [-p <pin>] [-g <go_intent>] \ "
	print "  		[-w <wpas_dbus_interface>]"
	print "Options:"
	print "  -i = interface name"
	print "  -m = wps method"
	print "  -a = peer address"
	print "  -p = pin number (8 digits)"
	print "  -g = group owner intent"
	print "  -w = wpas dbus interface = fi.w1.wpa_supplicant1"
	print "Example:"
	print "  %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0]


# Required Signals
def GONegotiationSuccess(status):
	print "Go Negotiation Success"

def GONegotiationFailure(status):
	print 'Go Negotiation Failed. Status:'
	print format(status)
	os._exit(0)

def GroupStarted(properties):
	if properties.has_key("group_object"):
		print 'Group Formation Complete %s' \
			% properties["group_object"]
	os._exit(0)

def WpsFailure(status, etc):
	print "WPS Authentication Failure".format(status)
	print etc
	os._exit(0)

class P2P_Connect():
	# Needed Variables
	global bus
	global wpas_object
	global interface_object
	global p2p_interface
	global ifname
	global wpas
	global wpas_dbus_interface
	global timeout
	global path
	global wps_method
	global go_intent
	global addr
	global pin

	# Dbus Paths
	global wpas_dbus_opath
	global wpas_dbus_interfaces_opath
	global wpas_dbus_interfaces_interface
	global wpas_dbus_interfaces_p2pdevice

	# Dictionary of Arguements
	global p2p_connect_arguements

	# Constructor
	def __init__(self,ifname,wpas_dbus_interface,addr,
					pin,wps_method,go_intent):
		# Initializes variables and threads
		self.ifname = ifname
		self.wpas_dbus_interface = wpas_dbus_interface
		self.wps_method = wps_method
		self.go_intent = go_intent
		self.addr = addr
		self.pin = pin

		# Generating interface/object paths
		self.wpas_dbus_opath = \
			"/" + self.wpas_dbus_interface.replace(".","/")
		self.wpas_wpas_dbus_interfaces_opath = \
			self.wpas_dbus_opath + "/Interfaces"
		self.wpas_dbus_interfaces_interface = \
			self.wpas_dbus_interface + ".Interface"
		self.wpas_dbus_interfaces_p2pdevice = \
			self.wpas_dbus_interfaces_interface + ".P2PDevice"

		# Getting interfaces and objects
		DBusGMainLoop(set_as_default=True)
		self.bus = dbus.SystemBus()
		self.wpas_object = self.bus.get_object(
				self.wpas_dbus_interface,
				self.wpas_dbus_opath)
		self.wpas = dbus.Interface(
				self.wpas_object, self.wpas_dbus_interface)

		# See if wpa_supplicant already knows about this interface
		self.path = None
		try:
			self.path = self.wpas.GetInterface(ifname)
		except:
			if not str(exc).startswith(
				self.wpas_dbus_interface + \
				".InterfaceUnknown:"):
				raise exc
			try:
				path = self.wpas.CreateInterface(
					{'Ifname': ifname, 'Driver': 'test'})
				time.sleep(1)

			except dbus.DBusException, exc:
				if not str(exc).startswith(
					self.wpas_dbus_interface + \
					".InterfaceExists:"):
					raise exc

		# Get Interface and objects
		self.interface_object = self.bus.get_object(
				self.wpas_dbus_interface,self.path)
		self.p2p_interface = dbus.Interface(
				self.interface_object,
				self.wpas_dbus_interfaces_p2pdevice)

		# Add signals
		self.bus.add_signal_receiver(GONegotiationSuccess,
			dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
			signal_name="GONegotiationSuccess")
		self.bus.add_signal_receiver(GONegotiationFailure,
			dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
			signal_name="GONegotiationFailure")
		self.bus.add_signal_receiver(GroupStarted,
			dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
			signal_name="GroupStarted")
		self.bus.add_signal_receiver(WpsFailure,
			dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
			signal_name="WpsFailed")


	#Constructing all the arguements needed to connect
	def constructArguements(self):
		# Adding required arguements
		self.p2p_connect_arguements = {'wps_method':self.wps_method,
			'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}

		# Display requires a pin, and a go intent of 15
		if (self.wps_method == 'display'):
			if (self.pin != None):
				self.p2p_connect_arguements.update({'pin':self.pin})
			else:
				print "Error:\n  Pin required for wps_method=display"
				usage()
				quit()

			if (self.go_intent != None and int(self.go_intent) != 15):
				print "go_intent overwritten to 15"

			self.go_intent = '15'

		# Keypad requires a pin, and a go intent of less than 15
		elif (self.wps_method == 'keypad'):
			if (self.pin != None):
				self.p2p_connect_arguements.update({'pin':self.pin})
			else:
				print "Error:\n  Pin required for wps_method=keypad"
				usage()
				quit()

			if (self.go_intent != None and int(self.go_intent) == 15):
				error = "Error :\n Group Owner intent cannot be" + \
					" 15 for wps_method=keypad"
				print error
				usage()
				quit()

		# Doesn't require pin
		# for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad
		elif (self.wps_method == 'pin'):
			if (self.pin != None):
				print "pin ignored"

		# No pin is required for pbc so it is ignored
		elif (self.wps_method == 'pbc'):
			if (self.pin != None):
				print "pin ignored"

		else:
			print "Error:\n  wps_method not supported or does not exist"
			usage()
			quit()

		# Go_intent is optional for all arguements
		if (self.go_intent != None):
			self.p2p_connect_arguements.update(
				{'go_intent':dbus.Int32(self.go_intent)})

	# Running p2p_connect
	def run(self):
		try:
			result_pin = self.p2p_interface.Connect(
				self.p2p_connect_arguements)

		except dbus.DBusException, exc:
				raise exc

		if (self.wps_method == 'pin' and \
		not self.p2p_connect_arguements.has_key('pin') ):
			print "Connect return with pin value of %d " % int(result_pin)
		gobject.MainLoop().run()

if __name__ == "__main__":

	# Required
	interface_name = None
	wps_method = None
	addr = None

	# Conditionally optional
	pin = None

	# Optional
	wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
	go_intent = None

	# Using getopts to handle options
	try:
		options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:")

	except getopt.GetoptError:
		usage()
		quit()

	# If theres a switch, override default option
	for key, value in options:
		# Help
		if (key == "-h"):
			usage()
			quit()
		# Interface Name
		elif (key == "-i"):
			interface_name = value
		# WPS Method
		elif (key == "-m"):
			wps_method = value
		# Address
		elif (key == "-a"):
			addr = value
		# Pin
		elif (key == "-p"):
			pin = value
		# Group Owner Intent
		elif (key == "-g"):
			go_intent = value
		# Dbus interface
		elif (key == "-w"):
			wpas_dbus_interface = value
		else:
			assert False, "unhandled option"

	# Required Arguements check
	if (interface_name == None or wps_method == None or addr == None):
		print "Error:\n  Required arguements not specified"
		usage()
		quit()

	# Group Owner Intent Check
	if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ):
		print "Error:\n  Group Owner Intent must be between 0 and 15 inclusive"
		usage()
		quit()

	# Pin Check
	if (pin != None and len(pin) != 8):
		print "Error:\n  Pin is not 8 digits"
		usage()
		quit()

	try:
		p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface,
			addr,pin,wps_method,go_intent)

	except:
		print "Error:\n  Invalid Arguements"
		usage()
		quit()

	p2p_connect_test.constructArguements()
	p2p_connect_test.run()

	os._exit(0)