Examples
1. Authenticate Webhook Request
import base64
def validateBasicAuth(headers):
"""
Validate basic authentication from HTTP headers.
Args:
headers (dict): Dictionary containing HTTP headers.
Returns:
bool: True if authentication is successful, False otherwise.
"""
if 'Authorization' not in headers:
return False # Authorization header is missing
authHeader = headers['Authorization']
try:
authType, encodedCredentials = authHeader.split()
if authType.lower() != 'basic':
return False # Unsupported authentication type
decodedCredentials = base64.b64decode(encodedCredentials).decode('utf-8')
username, password = decodedCredentials.split(':', 1)
# Check username and password (replace with your authentication logic)
return username == 'user' and password == 'password'
except Exception as e:
print(f"Error decoding credentials: {e}")
return False
requestHeaders = {'Authorization': 'Basic dXNlcjpwYXNzd29yZA=='} # Replace with actual headers
authenticationResult = validateBasicAuth(requestHeaders)
if authenticationResult:
print("Authentication successful")
else:
print("Authentication failed")
2. Handle TASK_CREATED event
import json
def handleRocketlaneEvent(payloadJson):
payloadData = json.loads(payloadJson)
eventType = payloadData["eventType"]
if eventType == 'TASK_CEATED':
taskId = payloadData["data"]["task"]["taskId"]
taskName = payloadData["data"]["task"]["taskName"]
print(f'Task created with id {taskId} and name {taskName}')
3. Extract old and new values, when task field is updated
import json
def handleRocketlaneEvent(payloadJson):
payloadData = json.loads(payloadJson)
eventType = payloadData["eventType"]
if eventType == 'TASK_UPDATED':
changedFields = payloadData["changeLog"]["changedFields"]
# Consuming change log if taskName is present in changedFields
if 'taskName' in changedFields:
taskId = payloadData["data"]["task"]["taskId"];
fromTask = payloadData["changeLog"]["from"]
fromTaskName = fromTask["taskName"] if fromTask else ''
toTask = payloadData["changeLog"]["to"]
toTaskName = toTask["taskName"] if toTask else ''
print( f'TaskName updated from : {fromTaskName} to : {toTaskName} for taskId : {taskId}')
4. Check if a certain User was assigned to a Task.
import json
def handleRocketlaneEvent(payloadJson):
payloadData = json.loads(payloadJson)
eventType = payloadData["eventType"]
if eventType == 'TASK_UPDATED':
changedFields = payloadData["changeLog"]["changedFields"]
# Consuming change log if assignees is present in changedFields
if 'assignees' in changedFields:
taskName = payloadData["data"]["task"]["taskName"]
assigneesFrom = payloadData["changeLog"]["from"]["assignees"]
assigneeFromMembers = assigneesFrom["members"] if assigneesFrom else ''
isAssigneeFromMemsPresent= assigneeFromMembers and len(assigneeFromMembers) > 0
assigneesFromMemEmails= [member["emailId"] for member in assigneeFromMembers] if isAssigneeFromMemsPresent else ''
assigneesTo = payloadData["changeLog"]["to"]["assignees"]
assigneeToMembers = assigneesTo["members"] if assigneesTo else ''
isAssigneeToMemsPresent= assigneeToMembers and len(assigneeToMembers) > 0
assigneesToMemEmails= [member["emailId"] for member in assigneeToMembers] if isAssigneeToMemsPresent else ''
new_assignees = list(filter(lambda item: item not in assigneesFromMemEmails, assigneesToMemEmails))
if '[email protected]' in new_assignees:
print( f'Aaron Paul is assigned to the task : {taskName}')
Updated 11 months ago