Skip to content

Use case: user signup workflow

Quanzheng Long edited this page Sep 14, 2023 · 1 revision

A common use case that is almost everywhere -- new user sign-up/register a new account in a website/system. E.g. Amazon/Linkedin/Google/etc...

Use case requirements

  • User fills a form and submit to the system with email
  • System will send an email for verification
  • User will click the link in the email to verify the account
  • If not clicking, a reminder will be sent every X hours
user case requirements

Some old solution

With some other existing technologies, you solve it using message queue(like SQS which has timer) + Database like below:

old solution
  • Using visibility timeout for backoff retry
  • Need to re-enqueue the message for larger backoff
  • Using visibility timeout for durable timer
  • Need to re-enqueue the message for once to have 24 hours timer
  • Need to create one queue for every step
  • Need additional storage for waiting & processing ready signal
  • Also need DLQ and build tooling around

The business code will be scattered. It's complicated and hard to maintain and extend.

New solution with iWF

The solution with iWF: iwf solution

  • All in one single place without scattered business logic
  • Natural to represent business
  • Builtin & rich support for operation tooling

It's so simple & easy to do that the business logic code can be shown here!

Also see the implementation in Java here.

class SubmitState(WorkflowState[Form]):
    def execute(self, ctx: WorkflowContext, input: Form, command_results: CommandResults, persistence: Persistence,
                communication: Communication,
                ) -> StateDecision:
        persistence.set_data_attribute(data_attribute_form, input)
        persistence.set_data_attribute(data_attribute_status, "waiting")
        print(f"API to send verification email to {input.email}")
        return StateDecision.single_next_state(VerifyState)


class VerifyState(WorkflowState[None]):
    def wait_until(self, ctx: WorkflowContext, input: T, persistence: Persistence, communication: Communication,
                   ) -> CommandRequest:
        return CommandRequest.for_any_command_completed(
            TimerCommand.timer_command_by_duration(
                timedelta(seconds=10)
            ),  # use 10 seconds for demo
            InternalChannelCommand.by_name(verify_channel),
        )

    def execute(self, ctx: WorkflowContext, input: T, command_results: CommandResults, persistence: Persistence,
                communication: Communication,
                ) -> StateDecision:
        form = persistence.get_data_attribute(data_attribute_form)
        if (
                command_results.internal_channel_commands[0].status
                == ChannelRequestStatus.RECEIVED
        ):
            print(f"API to send welcome email to {form.email}")
            return StateDecision.graceful_complete_workflow("done")
        else:
            print(f"API to send the a reminder email to {form.email}")
            return StateDecision.single_next_state(VerifyState)


class UserSignupWorkflow(ObjectWorkflow):
    def get_workflow_states(self) -> StateSchema:
        return StateSchema.with_starting_state(SubmitState(), VerifyState())

    def get_persistence_schema(self) -> PersistenceSchema:
        return PersistenceSchema.create(
            PersistenceField.data_attribute_def(data_attribute_form, Form),
            PersistenceField.data_attribute_def(data_attribute_status, str),
            PersistenceField.data_attribute_def(data_attribute_verified_source, str),
        )

    def get_communication_schema(self) -> CommunicationSchema:
        return CommunicationSchema.create(
            CommunicationMethod.internal_channel_def(verify_channel, None)
        )

    @rpc()
    def verify(
            self, source: str, persistence: Persistence, communication: Communication
    ) -> str:
        status = persistence.get_data_attribute(data_attribute_status)
        if status == "verified":
            return "already verified"
        persistence.set_data_attribute(data_attribute_status, "verified")
        persistence.set_data_attribute(data_attribute_verified_source, source)
        communication.publish_to_internal_channel(verify_channel)
        return "done"

And the application code will be simply interacting with the workflow like below:

@flask_app.route("/signup/submit")
def signup_submit():
    username = request.args["username"]
    form = Form(
        ...
    )
    try:
        client.start_workflow(UserSignupWorkflow, username, 3600, form)
    except WorkflowAlreadyStartedError:
        return "username already started registry"
    return "workflow started"


@flask_app.route("/signup/verify")
def signup_verify():
    username = request.args["username"]
    source = request.args["source"]
    return client.invoke_rpc(username, UserSignupWorkflow.verify, source)