Files
ac/src/internal/mqtt/server.go
2025-06-26 10:41:21 +08:00

223 lines
5.6 KiB
Go

package mqtt
import (
"bytes"
"fmt"
"log/slog"
"github.com/golang/protobuf/proto"
mqtt_server "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
"ac/internal/context"
"ac/internal/logger"
"ac/internal/mqtt/message"
"ac/pkg/app"
)
type Server struct {
app.App
mqttServer *mqtt_server.Server
}
var mqttServer *mqtt_server.Server
func NewServer(ac app.App) (*Server, error) {
server := mqtt_server.New(&mqtt_server.Options{
Logger: slog.New(slog.NewTextHandler(logger.Log.Out, &slog.HandlerOptions{
Level: slog.LevelDebug,
})),
InlineClient: true, // you must enable inline client to use direct publishing and subscribing.
})
_ = server.AddHook(new(auth.AllowHook), nil)
tcp := listeners.NewTCP(listeners.Config{
ID: "t1",
Address: ":5247",
})
err := server.AddListener(tcp)
if err != nil {
logger.MqttLog.Fatal(err)
}
// Add ac hook (AcHook) to the server
err = server.AddHook(new(AcHook), &AcHookOptions{
Server: server,
})
if err != nil {
logger.MqttLog.Fatal(err)
}
mqttServer = server
return &Server{ac, server}, nil
}
// Options contains configuration settings for the hook.
type AcHookOptions struct {
Server *mqtt_server.Server
}
type AcHook struct {
mqtt_server.HookBase
config *AcHookOptions
}
func (h *AcHook) ID() string {
return "events-ac"
}
func (h *AcHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt_server.OnConnect,
mqtt_server.OnDisconnect,
mqtt_server.OnSubscribed,
mqtt_server.OnUnsubscribed,
mqtt_server.OnPublished,
mqtt_server.OnPublish,
}, []byte{b})
}
func (h *AcHook) Init(config any) error {
h.Log.Info("initialised")
if _, ok := config.(*AcHookOptions); !ok && config != nil {
return mqtt_server.ErrInvalidConfigType
}
h.config = config.(*AcHookOptions)
if h.config.Server == nil {
return mqtt_server.ErrInvalidConfigType
}
return nil
}
// subscribeCallback handles messages for subscribed topics
func (h *AcHook) subscribeCallback(cl *mqtt_server.Client, sub packets.Subscription, pk packets.Packet) {
h.Log.Info("hook subscribed message", "client", cl.ID, "topic", pk.TopicName)
}
func (h *AcHook) OnConnect(cl *mqtt_server.Client, pk packets.Packet) error {
h.Log.Info("client connected", "client", cl.ID)
// Example demonstrating how to subscribe to a topic within the hook.
h.config.Server.Subscribe("hook/direct/publish", 1, h.subscribeCallback)
// Example demonstrating how to publish a message within the hook
err := h.config.Server.Publish("hook/direct/publish", []byte("packet hook message"), false, 0)
if err != nil {
h.Log.Error("hook.publish", "error", err)
}
acSelf := context.GetSelf()
ap, ok := acSelf.ApFindByClientId(cl.ID)
if ok {
acSelf.DeleteAp(ap.Client)
}
logger.MqttLog.Infof("Create a new Client for: %s", cl.ID)
ap = acSelf.NewAp(cl)
return nil
}
func (h *AcHook) OnDisconnect(cl *mqtt_server.Client, err error, expire bool) {
if err != nil {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
} else {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire)
}
context.GetSelf().DeleteAp(cl)
}
func (h *AcHook) OnSubscribed(cl *mqtt_server.Client, pk packets.Packet, reasonCodes []byte) {
h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}
func (h *AcHook) OnUnsubscribed(cl *mqtt_server.Client, pk packets.Packet) {
h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
}
func (h *AcHook) OnPublish(cl *mqtt_server.Client, pk packets.Packet) (packets.Packet, error) {
h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
if cl.ID == "inline" {
return pk, nil
}
acSelf := context.GetSelf()
ap, ok := acSelf.ApFindByClient(cl)
if !ok {
logger.MqttLog.Infof("Create a new Client for: %s", cl.ID)
ap = acSelf.NewAp(cl)
}
if pk.TopicName == "AP/echo" {
var unmarshaledEcho message.Echo
err := proto.Unmarshal(pk.Payload, &unmarshaledEcho)
if err != nil {
logger.MqttLog.Errorf("Unmarshal to struct error: %v", err)
} else {
logger.MqttLog.Infof("received echo: %+v", &unmarshaledEcho)
ap.LastEcho = &unmarshaledEcho
}
}
return pk, nil
}
func (h *AcHook) OnPublished(cl *mqtt_server.Client, pk packets.Packet) {
h.Log.Info("published to client", "client", cl.ID, "payload", string(pk.Payload))
}
func SendCmdReboot(apSn string) {
var cmd message.CMD
cmd.Type = new(message.CMDType)
*cmd.Type = message.CMDType_REBOOT
encoded, err := proto.Marshal(&cmd)
if err != nil {
logger.MqttLog.Errorf("Encode to protobuf data error: %v", err)
} else {
topic := "AC/" + apSn + "/M_cmd"
err := mqttServer.Publish(topic, encoded, false, 2)
if err != nil {
logger.MqttLog.Error("publish", "error", err)
}
}
}
func SendCmdKickUser(apSn, staMac string) {
var cmd message.CMD
cmd.Type = new(message.CMDType)
*cmd.Type = message.CMDType_KICK_USER
cmd.Args = new(string)
*cmd.Args = staMac
encoded, err := proto.Marshal(&cmd)
if err != nil {
logger.MqttLog.Errorf("Encode to protobuf data error: %v", err)
} else {
topic := "AC/" + apSn + "/M_cmd"
err := mqttServer.Publish(topic, encoded, false, 2)
if err != nil {
logger.MqttLog.Error("publish", "error", err)
}
}
}
func (s *Server)Run() error {
logger.MqttLog.Infof("Start MQTT server")
return s.mqttServer.Serve()
}
func (s *Server)Stop() {
if s.mqttServer != nil {
logger.MqttLog.Infof("Stop MQTT server")
if err := s.mqttServer.Close(); err != nil {
logger.MqttLog.Errorf("Could not close MQTT server: %#v", err)
}
}
}