EventSource Wrapper

Monkey Programming Forums/User Modules/EventSource Wrapper

k.o.g.(Posted 2016) [#1]
** 18.05.2016 **
[CHANGED] Name of Subscribe / Unsubscribe to On / Off
[ADDED] Custom Events

** 18.05.2016 **
Initial Release




Hello

I'm presenting a little module for using EventSource on all Targets, which supports brl.socket and the HTML5 Target

Here you got some infos about EventSource:
http://www.html5rocks.com/en/tutorials/eventsource/basics/

Custom Events are not supported.

Currently is nothing documented and only tested Platforms are:
HTML5, iOS, Stdcpp, GLFW(3)

For Android i need a Milliseconds function and then i will test it. Have fun with using, maybe i will update some days the code.

interfaces.monkey
format_codebox('Strict

Private

Public

Interface IEventSource
Method OnOpen:Void(callback:IOnEventSourceOpen) Property
Method OnMessage:Void(callback:IOnEventSourceMessage) Property
Method OnError:Void(callback:IOnEventSourceError) Property

Method Url:String() Property
Method ReadyState:Int() Property
Method Disconnect:Void()

Method On:Bool(event:String, callback:IOnEventSourceEventMessage)
Method Off:Bool(event:String, callback:IOnEventSourceEventMessage)
End
Interface IOnEventSourceOpen
Method OnEventSourceOpen:Void(source:IEventSource)
End
Interface IOnEventSourceMessage
Method OnEventSourceMessage:Void(source:IEventSource, message:String, id:String = "")
End
Interface IOnEventSourceEventMessage
Method OnEventSourceEventMessage:Void(source:IEventSource, event:String, message:String, id:String = "")
End
Interface IOnEventSourceError
Method OnEventSourceError:Void(source:IEventSource)
End')

eventsource.monkey
format_codebox('' 2016 Benjamin 'k.o.g.' Aregger

Strict

Public
Import interfaces

Private

#If LANG = "js" Then
Import dom.dom

Extern Private
Function NewEventSource:_EventSource(url:String) = "new EventSource"
Class _EventSource Extends EventTarget = "EventSource"
Field readyState:Int
Field url:String

Method close:Void()
End

Private

Class EventSourceListener Extends EventListener Final
Private
Field _source:EventSource

Method New(source:EventSource)
_source = source
End

Method handleEvent:Int(event:Event)
Select event.type
Case "open"
_source.OnOpen()

Case "message"
Local msg:MessageEvent = MessageEvent(event)
If msg = Null Return 1

_source.Fire("", msg.data, msg.lastEventId)

Case "error"
_source.OnError()

Default
Local msg:MessageEvent = MessageEvent(event)
If msg = Null Return 1

_source.Fire(event.type, msg.data, msg.lastEventId)
End

Return 1
End

Method Discard:Void()
_source = Null
End
End

Public

#Else
Import brl.socket
Import brl.databuffer
Import brl.datastream
Import brl.asyncevent

Import "time.${LANG}"

Extern Private
Function NativeMillisecs:Int()
Private
Global _dummy:Int = NativeMillisecs()

#End

Public

Private
Class EventSourceBase Implements IEventSource Abstract
Private
Field _onOpen:IOnEventSourceOpen
Field _onMessage:IOnEventSourceMessage
Field _onError:IOnEventSourceError

Field _subscriptions:StringMap<List<IOnEventSourceEventMessage>>


Method New()
_subscriptions = New StringMap<List<IOnEventSourceEventMessage>>()
End

Method SubscriptionListByName:List<IOnEventSourceEventMessage>(event:String)
Return _subscriptions.ValueForKey(event)
End

Method Fire:Void(event:String, message:String, id:String = "")
If event = "" Then
OnMessage(message, id)
Else
Local list:List<IOnEventSourceEventMessage> = SubscriptionListByName(event)
If list And list.Count() > 0 Then
For Local callback:IOnEventSourceEventMessage = Eachin list
callback.OnEventSourceEventMessage(Self, event, message, id)
Next
End
End
End

Method OnOpen:Void()
If _onOpen _onOpen.OnEventSourceOpen(Self)
End
Method OnMessage:Void(message:String, id:String = "")
If _onMessage _onMessage.OnEventSourceMessage(Self, message, id)
End
Method OnError:Void()
If _onError _onError.OnEventSourceError(Self)
End

Method Discard:Void()
End

Public
Const NONE:Int = -1
Const CONNECTING:Int = 0
Const OPEN:Int = 1
Const CLOSED:Int = 2

Method OnOpen:Void(callback:IOnEventSourceOpen) Property
_onOpen = callback
End
Method OnMessage:Void(callback:IOnEventSourceMessage) Property
_onMessage = callback
End
Method OnError:Void(callback:IOnEventSourceError) Property
_onError = callback
End

Method Url:String() Property Abstract
Method ReadyState:Int() Property Abstract

Method Disconnect:Void()
If _subscriptions Then
For Local list:List<IOnEventSourceEventMessage> = Eachin _subscriptions.Values()
list.Clear()
Next
_subscriptions.Clear()
End
_subscriptions = Null
End

Method On:Bool(event:String, callback:IOnEventSourceEventMessage)
Local list:List<IOnEventSourceEventMessage> = SubscriptionListByName(event)
If list = Null Then
list = New List<IOnEventSourceEventMessage>()
_subscriptions.Add(event, list)
Else If list.Contains(callback) Then
Return False
End
Local n:list.Node<IOnEventSourceEventMessage> = list.AddLast(callback)
Return n <> Null
End

Method Off:Bool(event:String, callback:IOnEventSourceEventMessage)
Local list:List<IOnEventSourceEventMessage> = SubscriptionListByName(event)
If list = Null Or list.Contains(callback) = False Return False
list.Remove(callback)
If list.Count() <= 0 Then
_subscriptions.Remove(event)
End
Return True
End
End

Public

#If LANG = "js" Then

Class EventSource Extends EventSourceBase Final
Global BUFFERSIZE:Int = 2048

Private
Field _source:_EventSource
Field _listener:EventSourceListener

Method New()
End

Method Discard:Void()
If _source Then
_source.removeEventListener("open", _listener)
_source.removeEventListener("message", _listener)
_source.removeEventListener("error", _listener)

_source.close()
End

If _listener _listener.Discard()

_listener = Null
_source = Null

Super.Discard()
End

Public
Method New(url:String)
_source = NewEventSource(url)
_listener = New EventSourceListener(Self)

_source.addEventListener("open", _listener, True)
_source.addEventListener("message", _listener, True)
_source.addEventListener("error", _listener, True)
End

Method Url:String() Property
Return _source.url
End

Method ReadyState:Int() Property
Return _source.readyState
End

Method Disconnect:Void()
Discard()

Super.Disconnect()
End

Method On:Bool(event:String, callback:IOnEventSourceEventMessage)
_source.addEventListener(event, _listener, True)
Return Super.On(event, callback)
End

Method Off:Bool(event:String, callback:IOnEventSourceEventMessage)
_source.removeEventListener(event, _listener, True)
Return Super.Off(event, callback)
End
End

#Else

Class EventSource Extends EventSourceBase Implements IOnConnectComplete, IOnSendComplete, IOnReceiveComplete, IAsyncEventSource Final
Global BUFFERSIZE:Int = 2048

Private
Const NEWLINE:String = "~r~n"

Field _socket:Socket
Field _buffer:DataBuffer
Field _connected:Bool = False


Field _url:String = ""
Field _host:String = ""
Field _port:Int = 80
Field _path:String = "/"
Field _headers:StringMap<String>

Field _retry:Int = 3000
Field _readyState:Int = EventSource.NONE

Field _connectionLost:Int = -1
Field _wasConnected:Bool = False

Field _asyncAdded:Bool = False

Method New()
End

Method OnError:Void()
_readyState = EventSource.CONNECTING
Super.OnError()
End

Method OnConnectComplete:Void( connected:Bool, source:Socket )
If connected = False Then
OnError()
Return
EndIf

_connected = True
_wasConnected = True

Local request:DataBuffer = New DataBuffer(103 + _path.Length() + _host.Length() + String(_port).Length())
Local stream:DataStream = New DataStream(request)

stream.WriteString("GET " + _path + " HTTP/1.1" + NEWLINE , "ascii")
stream.WriteString("Host: " + _host+":"+_port + NEWLINE, "ascii")
stream.WriteString("Accept: text/event-stream" + NEWLINE, "ascii")
stream.WriteString("Connection: keep-alive" + NEWLINE, "ascii")
stream.WriteString("Cache-Control: no-cache" + NEWLINE, "ascii")
stream.WriteString(NEWLINE)

Local length:Int = stream.Position()

stream.Close()
stream = NULL

source.SendAsync(request, 0, length, Self)
End

Method OnReceiveComplete:Void( data:DataBuffer, offset:Int, count:Int, source:Socket )
If count = 0 Then
OnError()
Discard()
_connectionLost = NativeMillisecs()
Return
Endif

Local stream:DataStream = New DataStream(data, offset, count)

If _headers = Null Then
_headers = New StringMap<String>()

While stream.Eof() = False
Local tmp:String = stream.ReadLine()
If tmp.Length() <= 0 Exit
Local chunk:String[] = tmp.Split(":")
If chunk.Length() <= 1 Then Continue

_headers.Add(chunk[0], chunk[1])
Wend
offset += stream.Position()


_readyState = EventSource.OPEN
OnOpen()
End

Local event:String = ""
Local id:String = ""
Local message:String = ""

While stream.Eof() = False
Local line:String = stream.ReadLine().Trim()
If line.Length() <= 0 Then
If message Then
message = message[0 .. (-NEWLINE.Length())]

Fire(event, message, id)
End
event = ""
id = ""
message = ""
Continue
End
Local pos:Int = line.Find(":")
If pos = -1 Continue
Local chunk:String[2]
chunk[0] = line[0..pos]
chunk[1] = line[pos + 1..].Trim()

Select chunk[0]
Case "id"
id = chunk[1]

Case "event"
event = chunk[1]

Case "retry"
_retry = Int(chunk[1])
If _retry = 0 _retry = 3000

Case "data"
message += chunk[1] + NEWLINE

Default
'Print "Unknown: "+chunk[0]
End
Wend

stream.Close()
stream = Null

source.ReceiveAsync(data, 0, data.Length(), Self)
End

Method OnSendComplete:Void( data:DataBuffer, offset:Int, count:Int, source:Socket )
data.Discard()

If _asyncAdded = False AddAsyncEventSource(Self)
_asyncAdded = True

source.ReceiveAsync(_buffer, 0, _buffer.Length(), Self)
End

Method UpdateAsyncEvents:Void ()
If _connectionLost > -1 Then
If NativeMillisecs() - _connectionLost > _retry Then
Connect()
End
End
End

Method Connect:Void()
Discard()

_readyState = EventSource.CONNECTING

_buffer = New DataBuffer(BUFFERSIZE)
_socket = New Socket("stream")
_socket.ConnectAsync(_host, _port, Self)
End

Method Discard:Void()
If _socket _socket.Close()
If _buffer _buffer.Discard()
If _headers _headers.Clear()

_socket = Null
_buffer = Null
_headers = Null

_connected = False

_dummy = 0
_connectionLost = -1

Super.Discard()
End

Public
Method New(url:String)
_url = url

Local pos:Int = -1, pos2:Int = -1

' Protocol
pos = url.Find("http://")
If pos = -1 Then
Error("Protocol not supported!")
End
url = url[7..]

' Host & Port
pos = url.Find(":")
pos2 = url.Find("/")
If pos2 = -1 pos2 = url.Length()

If pos > -1 Then
_host = url[0..pos]
_port = Int(url[pos+1..pos2])
Else
_host = url[0..pos2]
_port = 80
End
url = url[pos2..]

' Path
pos = -1
pos2 = -1
If url.Length() > 1 Then
pos = url.Find("#")
If pos = -1 pos = url.Length()
_path = url[0 .. pos]
End

Connect()
End

Method OnError:Void(callback:IOnEventSourceError) Property
Super.OnError(callback)
End

Method Url:String() Property
Return _url
End
Method ReadyState:Int() Property
Return _readyState
End

Method Disconnect:Void()
_wasConnected = False
RemoveAsyncEventSource(Self)
_asyncAdded = False
Discard()
_readyState = EventSource.CLOSED

Super.Disconnect()
End
End

#End')

demo.monkey
format_code('' 2016 Benjamin 'k.o.g.' Aregger
Strict

#If TARGET <> "stdcpp" Then
Import mojo
#End
Import brl.asyncevent
Import eventsource

Class Listener Implements IOnEventSourceMessage, IOnEventSourceEventMessage, IOnEventSourceOpen, IOnEventSourceError
Method OnEventSourceOpen:Void(source:IEventSource)
Print "Open"
Print source.ReadyState
End

Method OnEventSourceError:Void(source:IEventSource)
Print "Error"
Print source.ReadyState
End

Method OnEventSourceMessage:Void(source:IEventSource, message:String , id:String = "")
Print "Message"
Print source.ReadyState
Print message
End

Method OnEventSourceEventMessage:Void(source:IEventSource, event:String, message:String, id:String = "")
Print "Custom Event: "+event
Print source.ReadyState
Print message
End
End

Const URL:String = "http://reallocalhost:8888/tests/eventsource/test.php?blabla#asd"

#If TARGET <> "stdcpp" Then

Class TestApp Extends App
Field _source:EventSource

Method OnCreate:Int()
SetUpdateRate(60)

Local listener:Listener = New Listener()

_source = New EventSource(URL)
_source.OnOpen = listener
_source.OnMessage = listener
_source.OnError = listener

_source.On("test", listener)

Print "Connecting"
Print _source.ReadyState

Return 0
End

Method OnUpdate:Int()
UpdateAsyncEvents()
Return 0
End
Method OnRender:Int()
Cls(0, 0, 0)
DrawText(Millisecs(), DeviceWidth() / 2, DeviceHeight() / 2, 0.5, 0.5)
Return 0
End
End

Function Main:Int()
New TestApp()
Return 0
End

#Else

Extern Private
#If HOST = "winnt" Then
Function usleep:Void(ms:Int) = "::Sleep"
#Else
Function usleep:Void(ms:Int)
#End

Private

Function Sleep:Void(ms:Int)
usleep(ms)
End

Public

Function Main:Int()
Local listener:Listener = New Listener()

Local source:EventSource = New EventSource(URL)

Print "Connecting"
Print source.ReadyState

source.OnOpen = listener
source.OnMessage = listener
source.OnError = listener

source.On("test", listener)

While True
UpdateAsyncEvents()

Sleep(1000)
Wend
Return 0
End

#End')