You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
89 lines
2.8 KiB
Python
89 lines
2.8 KiB
Python
#!/usr/bin/env python
|
|
|
|
import boto3
|
|
import json
|
|
|
|
|
|
class Certificate():
|
|
|
|
def __init__(self, certId=''):
|
|
self.id = certId
|
|
self.arn = ''
|
|
self.client = boto3.client('iot')
|
|
if (self.id != ''):
|
|
result = self.client.describe_certificate(certificateId=self.id)
|
|
self.arn = result['certificateDescription']['certificateArn']
|
|
|
|
def create(self):
|
|
assert not self.exists(), "Cert already exists"
|
|
cert = self.create_keys_and_certificate()
|
|
self.id = cert["certificateId"]
|
|
self.arn = cert["certificateArn"]
|
|
return cert
|
|
|
|
def create_keys_and_certificate(self):
|
|
result = self.client.create_keys_and_certificate(setAsActive=True)
|
|
return result
|
|
|
|
def delete(self):
|
|
cert_not_found = True
|
|
# Detach Policies attached to the cert
|
|
policies_attached = self.list_policies()
|
|
for policy in policies_attached:
|
|
self.detach_policy(policy['policyName'])
|
|
|
|
# Detach Things attached to the cert
|
|
things_attached = self.list_things()
|
|
for thing in things_attached:
|
|
self.detach_thing(thing)
|
|
|
|
# Update the status of the certificate to INACTIVE
|
|
try:
|
|
self.client.update_certificate(certificateId=self.id,
|
|
newStatus='INACTIVE')
|
|
cert_not_found = False
|
|
except self.client.exceptions.ResourceNotFoundException:
|
|
cert_not_found = True
|
|
return cert_not_found
|
|
|
|
# Delete the certificate
|
|
try:
|
|
self.client.delete_certificate(certificateId=self.id)
|
|
cert_not_found = False
|
|
except self.client.exceptions.ResourceNotFoundException:
|
|
cert_not_found = True
|
|
return cert_not_found
|
|
|
|
def exists(self):
|
|
if self.id == '':
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def get_arn(self):
|
|
return self.arn
|
|
|
|
def list_policies(self):
|
|
policies = self.client.list_principal_policies(principal=self.arn)
|
|
policies = policies['policies']
|
|
return policies
|
|
|
|
def attach_policy(self, policy_name):
|
|
self.client.attach_policy(policyName=policy_name, target=self.arn)
|
|
|
|
def detach_policy(self, policy_name):
|
|
self.client.detach_policy(policyName=policy_name, target=self.arn)
|
|
|
|
def list_things(self):
|
|
things = self.client.list_principal_things(principal=self.arn)
|
|
things = things['things']
|
|
return things
|
|
|
|
def attach_thing(self, thing_name):
|
|
self.client.attach_thing_principal(thingName=thing_name,
|
|
principal=self.arn)
|
|
|
|
def detach_thing(self, thing_name):
|
|
self.client.detach_thing_principal(thingName=thing_name,
|
|
principal=self.arn)
|