1、 采用推技术
1) 设计
a)服务器端设计
建立服务器数据读取线程
FeedThread,功能定时从文件或数据库中读取股票行情数据。
采用flash技术,如下:
AsyncMessage msg = new AsyncMessage();
msg.setDestination("market-da
market-da
<destination id="market-da
<properties>
<server>
<allow-subtopics>true</allow-subtopics>
<subtopic-separator>.</subtopic-separator>s
</server>
</properties>
<channels>
<channel ref="my-polling-amf"/>
<channel ref="my-streaming-amf"/>
<channel ref="per-client-qos-polling-amf"/>
</channels>
</destination>
b)客户端设计
1)客户端建立连接(traderdesktop.mxml)
功能:初始化股票列表(可通过建立数据库服务,获取股票列表)。
initializeWatchList(["IBM", "JBLU", "ADBE", "GE", "C"]);
2)每个股票建立订阅机制。
privatefunctionsubscribe(symbol:String):void
{
varconsumer:Consumer = newConsumer();
consumer.destination = "market-da
consumer.subtopic = symbol;
consumer.channelSet = newChannelSet([channels.selectedItem]);
consumer.addEventListener(MessageEvent.MESSAGE, messageHandler);
consumer.subscribe();
consumers[symbol] = consumer;
}
3)客户端接收服务CustomDelayQueueProcessor extends FlexClientOutboundQueueProcessor
功能:接收从consumer.destination过来的数据,延迟处理。
publicFlushResult flush(ListoutboundQueue)
{
intdelay = delayTimeBetweenFlushes;
// Read custom delay from client's FlexClient instance
FlexClient flexClient = getFlexClient();
if(flexClient != null)
{
Object obj = flexClient.getAttribute("market-da
if(obj != null)
{
try{
delay = Integer.parseInt((String) obj);
} catch(NumberFormatException ignore) {
}
}
}
longcurrentTime = System.currentTimeMillis();
if((currentTime - lastFlushTime) <delay)
{
// Delaying flush. No messages will be returned at this point
FlushResult flushResult = newFlushResult();
// Don't return any messages to flush.
// And request that the next flush doesn't occur until 3 seconds since the previous.
flushResult.setNextFlushWaitTimeMillis((int)(delay - (currentTime - lastFlushTime)));
returnflushResult;
}
else// OK to flush.
{
// Flushing. All queued messages will now be returned
lastFlushTime= currentTime;
FlushResult flushResult = newFlushResult();
flushResult.setNextFlushWaitTimeMillis(delay);
flushResult.setMessages(newArrayList(outboundQueue));
outboundQueue.clear();
returnflushResult;
}
}