1. Authenticate Webhook Request

import base64

def validateBasicAuth(headers):
    """
    Validate basic authentication from HTTP headers.
    Args:
        headers (dict): Dictionary containing HTTP headers.
    Returns:
        bool: True if authentication is successful, False otherwise.
    """
    
    if 'Authorization' not in headers:
        return False  # Authorization header is missing

    authHeader = headers['Authorization']
    try:
        authType, encodedCredentials = authHeader.split()
        if authType.lower() != 'basic':
            return False  # Unsupported authentication type

        decodedCredentials = base64.b64decode(encodedCredentials).decode('utf-8')
        username, password = decodedCredentials.split(':', 1)

        # Check username and password (replace with your authentication logic)
        return username == 'user' and password == 'password'
    except Exception as e:
        print(f"Error decoding credentials: {e}")
        return False

requestHeaders = {'Authorization': 'Basic dXNlcjpwYXNzd29yZA=='}  # Replace with actual headers
authenticationResult = validateBasicAuth(requestHeaders)

if authenticationResult:
    print("Authentication successful")
else:
    print("Authentication failed")

2. Handle TASK_CREATED event

Sample payload

import json

def handleRocketlaneEvent(payloadJson):
    payloadData = json.loads(payloadJson)
    eventType = payloadData["eventType"]

    if eventType == 'TASK_CEATED':
        taskId = payloadData["data"]["task"]["taskId"]
        taskName = payloadData["data"]["task"]["taskName"]
        print(f'Task created with id {taskId} and name {taskName}')
      

3. Extract old and new values, when task field is updated

Sample payload

import json

def handleRocketlaneEvent(payloadJson):
    payloadData = json.loads(payloadJson)
    eventType = payloadData["eventType"]

    if eventType == 'TASK_UPDATED':
        changedFields =  payloadData["changeLog"]["changedFields"]
        # Consuming change log if taskName is present in changedFields
        if 'taskName' in changedFields:
          	taskId = payloadData["data"]["task"]["taskId"];
            
            fromTask = payloadData["changeLog"]["from"]
            fromTaskName = fromTask["taskName"] if fromTask else ''
            
            toTask = payloadData["changeLog"]["to"]
            toTaskName = toTask["taskName"] if toTask else ''
            print( f'TaskName updated from : {fromTaskName} to : {toTaskName} for taskId : {taskId}')

4. Check if a certain User was assigned to a Task.

Sample payload

import json

def handleRocketlaneEvent(payloadJson):
    payloadData = json.loads(payloadJson)
    eventType = payloadData["eventType"]

    if eventType == 'TASK_UPDATED':
        changedFields = payloadData["changeLog"]["changedFields"]
        # Consuming change log if assignees is present in changedFields
        if 'assignees' in changedFields:
          	taskName = payloadData["data"]["task"]["taskName"]
            
            assigneesFrom = payloadData["changeLog"]["from"]["assignees"]
            assigneeFromMembers =  assigneesFrom["members"] if assigneesFrom else ''
            isAssigneeFromMemsPresent=  assigneeFromMembers and len(assigneeFromMembers) > 0
            assigneesFromMemEmails= [member["emailId"] for member in assigneeFromMembers] if isAssigneeFromMemsPresent else ''

            assigneesTo = payloadData["changeLog"]["to"]["assignees"]
            assigneeToMembers = assigneesTo["members"] if assigneesTo else ''
            isAssigneeToMemsPresent= assigneeToMembers and len(assigneeToMembers) > 0
            assigneesToMemEmails= [member["emailId"] for member in assigneeToMembers] if isAssigneeToMemsPresent else ''

            new_assignees = list(filter(lambda item: item not in assigneesFromMemEmails, assigneesToMemEmails))
            if '[email protected]' in new_assignees:
                print( f'Aaron Paul is assigned to the task : {taskName}')