Replace Send/Recv funcs with In/Out channels.

Basically, make the channels public and just let the client handle a
stream/net error if it's interested.

This simplifies the API a little and also allows an application to
select {} from the XMPP channel and any other channels at the same time.
This commit is contained in:
Matt Goodall 2012-07-16 17:13:34 +01:00
parent e22b95e43e
commit cc012762e9
5 changed files with 50 additions and 61 deletions

View File

@ -19,15 +19,15 @@ func main() {
// Create stream and configure it as a client connection. // Create stream and configure it as a client connection.
jid := must(xmpp.ParseJID(*jid)).(xmpp.JID) jid := must(xmpp.ParseJID(*jid)).(xmpp.JID)
stream := must(xmpp.NewStream(jid.Domain + ":5222", &xmpp.StreamConfig{LogStanzas: true})).(*xmpp.Stream) stream := must(xmpp.NewStream(jid.Domain + ":5222", &xmpp.StreamConfig{LogStanzas: true})).(*xmpp.Stream)
x := must(xmpp.NewClientXMPP(stream, jid, *password, &xmpp.ClientConfig{InsecureSkipVerify: true})).(*xmpp.XMPP) client := must(xmpp.NewClientXMPP(stream, jid, *password, &xmpp.ClientConfig{InsecureSkipVerify: true})).(*xmpp.XMPP)
log.Printf("Connection established for %s\n", x.JID) log.Printf("Connection established for %s\n", client.JID)
// Announce presence. // Announce presence.
x.Send(xmpp.Presence{}) client.Out <- xmpp.Presence{}
// Filter messages into dedicated channel and start a goroutine to log them. // Filter messages into dedicated channel and start a goroutine to log them.
_, messages := x.AddFilter( _, messages := client.AddFilter(
xmpp.MatcherFunc( xmpp.MatcherFunc(
func(v interface{}) bool { func(v interface{}) bool {
_, ok := v.(*xmpp.Message) _, ok := v.(*xmpp.Message)
@ -43,20 +43,16 @@ func main() {
// Log any stanzas that are not handled elsewhere. // Log any stanzas that are not handled elsewhere.
go func() { go func() {
for { for x := range client.In {
stanza, err := x.Recv() log.Printf("* recv: %v\n", x)
if err != nil {
log.Fatal(err)
}
log.Printf("* recv: %v\n", stanza)
} }
}() }()
// Get disco#info for home server. // Get disco#info for home server.
info := &DiscoInfo{} info := &DiscoInfo{}
iq := xmpp.Iq{Id: xmpp.UUID4(), Type: "get", To: x.JID.Domain} iq := xmpp.Iq{Id: xmpp.UUID4(), Type: "get", To: client.JID.Domain}
iq.PayloadEncode(info) iq.PayloadEncode(info)
reply, _ := x.SendRecv(&iq) reply, _ := client.SendRecv(&iq)
reply.PayloadDecode(info) reply.PayloadDecode(info)
log.Printf("* info: %v\n", info) log.Printf("* info: %v\n", info)

View File

@ -19,14 +19,10 @@ func main() {
// Create stream and configure it as a component connection. // Create stream and configure it as a component connection.
jid := must(xmpp.ParseJID(*jid)).(xmpp.JID) jid := must(xmpp.ParseJID(*jid)).(xmpp.JID)
stream := must(xmpp.NewStream(*addr, &xmpp.StreamConfig{LogStanzas: true})).(*xmpp.Stream) stream := must(xmpp.NewStream(*addr, &xmpp.StreamConfig{LogStanzas: true})).(*xmpp.Stream)
x := must(xmpp.NewComponentXMPP(stream, jid, *secret)).(*xmpp.XMPP) comp := must(xmpp.NewComponentXMPP(stream, jid, *secret)).(*xmpp.XMPP)
for { for x := range comp.In {
v, err := x.Recv() log.Printf("recv: %v", x)
if err != nil {
log.Fatal(err)
}
log.Printf("recv: %v", v)
} }
} }

View File

@ -21,22 +21,30 @@
stream, err := xmpp.NewStream("localhost:5347", nil) stream, err := xmpp.NewStream("localhost:5347", nil)
X, err := xmpp.NewComponentXMPP(stream, jid, "secret") X, err := xmpp.NewComponentXMPP(stream, jid, "secret")
Messages are sent using the XMPP.Send method, e.g. a client typically Outgoing XMPP stanzas are sent to the XMPP instance's Out channel, e.g. a
announces its presence on the XMPP network as soon as it's connected: client typically announces its presence on the XMPP network as soon as it's
connected:
X.Send(xmpp.Presence{}) X.Out <- xmpp.Presence{}
Incoming messages can be received in a simple loop, ended by an os.EOF for Incoming messages are handled by consuming the XMPP instance's In channel.
clean shutdown or any other error for something unexpected. XMPP defines The channel is sent all XMPP stanzas as well as terminating error (io.EOF
four types of stanza: <error/>, <iq/>, <message/> and <presence/> for clean shutdown or any other error for something unexpected). The
represented by Error, Iq, Message and Presence structs respectively. channel is also closed after an error.
XMPP defines four types of stanza: <error/>, <iq/>, <message/> and
<presence/> represented by Error, Iq, Message (shown below) and Presence
structs respectively.
for { for i := range X.In {
stanza, err := X.Recv() switch v := i.(type) {
if err == io.EOF { case error:
break log.Printf("error : %v\n", v)
case *xmpp.Message:
log.Printf("msg : %s says %s\n", v.From, v.Body)
default:
log.Printf("%T : %v\n", v, v)
} }
log.Printf("%T : %v\n", stanza, stanza)
} }
Note: A "bound" JID is negotatiated during XMPP setup and may be different Note: A "bound" JID is negotatiated during XMPP setup and may be different

View File

@ -9,14 +9,21 @@ import (
// Handles XMPP conversations over a Stream. Use NewClientXMPP or // Handles XMPP conversations over a Stream. Use NewClientXMPP or
// NewComponentXMPP to create and configure a XMPP instance. // NewComponentXMPP to create and configure a XMPP instance.
type XMPP struct { type XMPP struct {
// JID associated with the stream. Note: this may be negotiated with the // JID associated with the stream. Note: this may be negotiated with the
// server during setup and so must be used for all messages. // server during setup and so must be used for all messages.
JID JID JID JID
stream *Stream stream *Stream
// Stanza channels. // Channel of incoming messages. Values will be one of Iq, Message,
in chan interface{} // Presence, Error or error. Will be closed at the end when the stream is
out chan interface{} // closed or the stream's net connection dies.
In chan interface{}
// Channel of outgoing messages. Messages must be able to be marshaled by
// the standard xml package, however you should try to send one of Iq,
// Message or Presence.
Out chan interface{}
// Incoming stanza filters. // Incoming stanza filters.
filterLock sync.Mutex filterLock sync.Mutex
@ -28,34 +35,20 @@ func newXMPP(jid JID, stream *Stream) *XMPP {
x := &XMPP{ x := &XMPP{
JID: jid, JID: jid,
stream: stream, stream: stream,
in: make(chan interface{}), In: make(chan interface{}),
out: make(chan interface{}), Out: make(chan interface{}),
} }
go x.sender() go x.sender()
go x.receiver() go x.receiver()
return x return x
} }
// Send a stanza.
func (x *XMPP) Send(v interface{}) {
x.out <- v
}
// Return the next stanza.
func (x *XMPP) Recv() (interface{}, error) {
v := <-x.in
if err, ok := v.(error); ok {
return nil, err
}
return v, nil
}
func (x *XMPP) SendRecv(iq *Iq) (*Iq, error) { func (x *XMPP) SendRecv(iq *Iq) (*Iq, error) {
fid, ch := x.AddFilter(IqResult(iq.Id)) fid, ch := x.AddFilter(IqResult(iq.Id))
defer x.RemoveFilter(fid) defer x.RemoveFilter(fid)
x.Send(iq) x.Out <- iq
stanza := <-ch stanza := <-ch
reply, ok := stanza.(*Iq) reply, ok := stanza.(*Iq)
@ -156,19 +149,19 @@ type filter struct {
} }
func (x *XMPP) sender() { func (x *XMPP) sender() {
for v := range x.out { for v := range x.Out {
x.stream.Send(v) x.stream.Send(v)
} }
} }
func (x *XMPP) receiver() { func (x *XMPP) receiver() {
defer close(x.in) defer close(x.In)
for { for {
start, err := x.stream.Next() start, err := x.stream.Next()
if err != nil { if err != nil {
x.in <- err x.In <- err
return return
} }
@ -200,7 +193,7 @@ func (x *XMPP) receiver() {
} }
if !filtered { if !filtered {
x.in <- v x.In <- v
} }
} }
} }

View File

@ -51,14 +51,10 @@ func main() {
} }
// Signal presence. // Signal presence.
x.Send(xmpp.Presence{}) x.Out <- xmpp.Presence{}
// Log anything that arrives. // Log anything that arrives.
for { for stanza := range x.In {
stanza, err := x.Recv()
if err != nil {
log.Fatal(err)
}
log.Printf("recv: %T %v", stanza, stanza) log.Printf("recv: %T %v", stanza, stanza)
} }
} }