GitHub – mbuhot/glyn: Type-safe PubSub and Registry for Gleam actors with distributed clustering support, built on Syn.

GitHub – mbuhot/glyn: Type-safe PubSub and Registry for Gleam actors with distributed clustering support, built on Syn.



Hex Docs

Type-safe PubSub and Registry for Gleam actors with distributed clustering support.

Built on the Erlang syn library.

Glyn provides two complementary systems for actor communication:

  • PubSub: Broadcast events to multiple subscribers
  • Registry: Direct command routing to named processes

Both systems integrate seamlessly with Gleam’s actor model using selector composition patterns.

Creating Message Types and Decoders

First, define your message types and corresponding decoder functions.
Explicit decoders are required to ensure messages sent between nodes in a cluster are handled with type safety.
Note the Glyn does not JSON encode messages, they are sent directly as erlang terms and should be decoded from tuples.

// my_app/orders.gleam
import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode
import gleam/erlang/process.{type Subject}

// Define your message types
pub type Event {
  OrderCreated(id: String, amount: Int)
  OrderShipped(id: String, tracking: String)
  SystemAlert(message: String)
}

pub type Command {
  ProcessOrder(id: String, reply_with: Subject(Bool))
  GetStatus(reply_with: Subject(String))
  Shutdown
}

// Helper function to match specific atoms
fn expect_atom(expected: String) -> decode.Decoder(atom.Atom) {
  use value <- decode.then(atom.decoder())
  case atom.to_string(value) == expected {
    True -> decode.success(value)
    False -> decode.failure(value, "Expected atom: " <> expected)
  }
}

// Unsafe cast for Subject decoding - use with caution
@external(erlang, "gleam_stdlib", "identity")
fn unsafe_cast_subject(value: Dynamic) -> Subject(a)

// Create decoder functions
pub fn event_decoder() -> decode.Decoder(Event) {
  decode.one_of(
    {
      use _ <- decode.field(0, expect_atom("order_created"))
      use id <- decode.field(1, decode.string)
      use amount <- decode.field(2, decode.int)
      decode.success(OrderCreated(id, amount))
    },
    or: [
      {
        use _ <- decode.field(0, expect_atom("order_shipped"))
        use id <- decode.field(1, decode.string)
        use tracking <- decode.field(2, decode.string)
        decode.success(OrderShipped(id, tracking))
      },
      {
        use _ <- decode.field(0, expect_atom("system_alert"))
        use message <- decode.field(1, decode.string)
        decode.success(SystemAlert(message))
      },
    ]
  )
}

pub fn command_decoder() -> decode.Decoder(Command) {
  decode.one_of(
    // Shutdown is a simple variant - encoded as bare atom
    decode.map(expect_atom("shutdown"), fn(_) { Shutdown }),
    or: [
      // ProcessOrder has data - encoded as tuple
      {
        use _ <- decode.field(0, expect_atom("process_order"))
        use id <- decode.field(1, decode.string)
        use reply_with <- decode.field(2, decode.dynamic)
        decode.success(ProcessOrder(id, unsafe_cast_subject(reply_with)))
      },
      // GetStatus has data - encoded as tuple
      {
        use _ <- decode.field(0, expect_atom("get_status"))
        use reply_with <- decode.field(1, decode.dynamic)
        decode.success(GetStatus(unsafe_cast_subject(reply_with)))
      },
    ]
  )
}
import glyn/pubsub
import my_app/orders
import gleam/erlang/process

pub fn main() {
  // Create PubSub system with decoder
  let pubsub = pubsub.new(
    "orders",
    orders.event_decoder(),
  )

  // Subscribe returns a Selector that can be composed
  let selector =
    process.new_selector()
    |> process.merge_selector(
      pubsub.subscribe(pubsub, "processing")
      |> process.map_selector(OrderEvent)
    )

  // Publish events
  let assert Ok(Nil) = pubsub.publish(pubsub, "processing",
    orders.OrderCreated("ORDER123", 99))

  // Receive messages through selector
  let assert Ok(OrderEvent(orders.OrderCreated(id, amount))) =
    process.selector_receive(selector, 100)
}
import glyn/registry
import my_app/orders  // Using same orders module from above
import gleam/erlang/process.{type Subject}

pub fn main() {
  // Create Registry system with decoder and error default
  let registry = registry.new(
    "orders",
    orders.command_decoder(),
    orders.Shutdown
  )

  // Register returns a Selector for receiving commands
  let assert Ok(command_selector) = registry.register(registry, "order_processor", "v1.0")

  // Compose with other selectors
  let selector =
    process.new_selector()
    |> process.merge_selector(
      process.map_selector(command_selector, UserCommand)
    )

  // Send commands to registered processes
  let assert Ok(_) = registry.send(registry, "order_processor",
    orders.ProcessOrder("ORDER123", process.new_subject()))

  // Or use call for request/reply pattern
  let assert Ok(status) = registry.call(registry, "order_processor", waiting: 1000,
    sending: fn(reply) { orders.GetStatus(reply) })
}

Multi-Channel Actor Integration

The real power of Gleam’s typed actor system comes from composing multiple message channels in a single actor:

import glyn/pubsub
import glyn/registry
import my_app/orders  // Using the orders module we defined above
import gleam/erlang/process.{type Subject}
import gleam/otp/actor

// Actor message type that composes multiple channels
pub type ProcessorMessage {
  DirectCommand(DirectMessage)  // Direct actor commands
  OrderCommand(orders.Command)  // Registry commands
  OrderEvent(orders.Event)      // PubSub events
  Shutdown
}

pub type DirectMessage {
  GetStats(reply_with: Subject(String))
  Reset
}

pub fn start_order_processor() {
  actor.new_with_initialiser(5000, fn(subject) {
    // Create PubSub and Registry systems
    let event_pubsub = pubsub.new(
      "events",
      orders.event_decoder(),
    )

    let command_registry = registry.new(
      "commands",
      orders.command_decoder(),
      orders.Shutdown
    )

    // Get selectors from both systems
    let event_selector = pubsub.subscribe(event_pubsub, "orders")
    let assert Ok(command_selector) = registry.register(command_registry, "order_processor", "v1.0")

    // Create direct command channel
    let direct_subject = process.new_subject()

    // Compose all channels using selectors
    let selector =
      process.new_selector()
      |> process.select(subject)
      |> process.select_map(direct_subject, DirectCommand)
      |> process.merge_selector(process.map_selector(command_selector, OrderCommand))
      |> process.merge_selector(process.map_selector(event_selector, OrderEvent))

    // Return initialized actor
    actor.initialised(ProcessorState(status: "ready", processed: 0))
    |> actor.selecting(selector)
    |> actor.returning(direct_subject)  // Return direct command interface
    |> Ok
  })
  |> actor.on_message(handle_processor_message)
  |> actor.start()
}

fn handle_processor_message(state, message) {
  case message {
    OrderCommand(orders.ProcessOrder(id, reply_with)) -> {
      // Handle registry command
      process.send(reply_with, True)
      actor.continue(ProcessorState(..state, processed: state.processed + 1))
    }
    OrderCommand(orders.GetStatus(reply_with)) -> {
      // Return status
      process.send(reply_with, state.status)
      actor.continue(state)
    }
    OrderEvent(orders.OrderCreated(id, amount)) -> {
      // React to PubSub event
      actor.continue(ProcessorState(..state, status: "Processing order " <> id))
    }
    OrderEvent(orders.SystemAlert(message)) -> {
      // Handle system-wide alerts
      actor.continue(ProcessorState(..state, status: "Alert: " <> message))
    }
    DirectCommand(user_command) -> {
      // Handle direct commands
      actor.continue(state)
    }
    Shutdown -> {
      actor.stop()
    }
  }
}
gleam test           # Run tests
gleam docs build     # Build documentation

This project is licensed under the MIT License.



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *