Experimental XMPP filter-based API, including <iq/> send/recv helper.

The filter mechanism allows you to route any matching stanza to a custom
channel. The SendRecv(iq) method uses a once-only filter to watch for
a reply.
This commit is contained in:
Matt Goodall 2012-07-09 02:08:01 +01:00
parent ec1eaf94b9
commit 2ab32b0959
2 changed files with 135 additions and 3 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"encoding/xml"
"flag"
"log"
"xmpp"
@ -29,9 +30,56 @@ func main() {
if err != nil {
log.Fatal(err)
}
log.Printf("Connection established for %s\n", x.JID)
// Announce presence.
x.Send(xmpp.Presence{})
x.Send(xmpp.Message{To: "carol@localhost", Body: "Hello!"})
// Filter messages into dedicated channel and start a thread to log them.
_, messages := x.AddFilter(
func(v interface{}) bool {
_, ok := v.(*xmpp.Message)
return ok
},
)
go func() {
for message := range messages {
log.Printf("* message: %v\n", message)
}
}()
// Log any stanzas that are not handled elsewhere.
go func() {
for {
stanza := x.Recv()
log.Printf("* recv: %v\n", stanza)
}
}()
// Get disco#info for home server.
info := &DiscoInfo{}
iq := xmpp.Iq{Id: "abc", Type: "get", To: x.JID.Domain}
iq.PayloadEncode(info)
reply, _ := x.SendRecv(&iq)
reply.PayloadDecode(info)
log.Printf("* info: %v\n", info)
select {}
}
type DiscoInfo struct {
XMLName xml.Name `xml:"http://jabber.org/protocol/disco#info query"`
Identity []DiscoIdentity `xml:"identity"`
Feature []DiscoFeature `xml:"feature"`
}
type DiscoIdentity struct {
Type string `xml:"type,attr"`
Name string `xml:"name,attr"`
Category string `xml:"category,attr"`
}
type DiscoFeature struct {
Var string `xml:"var,attr"`
}

View File

@ -1,6 +1,9 @@
package xmpp
import "log"
import (
"fmt"
"log"
)
// Handles XMPP conversations over a Stream. Use NewClientXMPP and/or
// NewComponentXMPP to create and configuring a XMPP instance.
@ -11,6 +14,8 @@ type XMPP struct {
stream *Stream
in chan interface{}
out chan interface{}
nextFilterId FilterId
filters map[FilterId]filter
}
func newXMPP(jid JID, stream *Stream) *XMPP {
@ -19,6 +24,8 @@ func newXMPP(jid JID, stream *Stream) *XMPP {
stream,
make(chan interface{}),
make(chan interface{}),
0,
make(map[FilterId]filter),
}
go x.sender()
go x.receiver()
@ -30,6 +37,69 @@ func (x *XMPP) Send(v interface{}) {
x.out <- v
}
func (x *XMPP) Recv() interface{} {
return <-x.in
}
func (x *XMPP) SendRecv(iq *Iq) (*Iq, error) {
fid, ch := x.AddFilter(IqResult(iq.Id))
defer x.RemoveFilter(fid)
x.Send(iq)
stanza := <-ch
reply, ok := stanza.(*Iq)
if !ok {
return nil, fmt.Errorf("Expected Iq, for %T", stanza)
}
return reply, nil
}
type FilterId int64
func (fid FilterId) Error() string {
return fmt.Sprintf("Invalid filter id: %d", fid)
}
func (x *XMPP) AddFilter(fn FilterFn) (FilterId, chan interface{}) {
ch := make(chan interface{})
filterId := x.nextFilterId
x.nextFilterId ++
x.filters[filterId] = filter{fn, ch}
return filterId, ch
}
func (x *XMPP) RemoveFilter(id FilterId) error {
filter, ok := x.filters[id]
if !ok {
return id
}
close(filter.ch)
delete(x.filters, id)
return nil
}
func IqResult(id string) FilterFn {
return func(v interface{}) bool {
iq, ok := v.(*Iq)
if !ok {
return false
}
if iq.Id != id {
return false
}
return true
}
}
type FilterFn func(v interface{}) bool
type filter struct {
fn FilterFn
ch chan interface{}
}
func (x *XMPP) sender() {
for v := range x.out {
x.stream.Send(v)
@ -60,6 +130,20 @@ func (x *XMPP) receiver() {
log.Fatal(err)
}
x.in <- v
filtered := false
for _, filter := range x.filters {
if filter.fn(v) {
filter.ch <- v
filtered = true
}
}
if !filtered {
x.in <- v
}
}
}
// BUG(matt): filter id generation is not re-entrant.
// BUG(matt): filters map is not re-entrant.