• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *
8  *   - Redistributions of source code must retain the above copyright
9  *     notice, this list of conditions and the following disclaimer.
10  *
11  *   - Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *
15  *   - Neither the name of Oracle nor the names of its
16  *     contributors may be used to endorse or promote products derived
17  *     from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
23  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 /*
33  * This source code is provided to illustrate the usage of a given feature
34  * or technique and has been deliberately simplified. Additional steps
35  * required for a production-quality application, such as security checks,
36  * input validation and proper error handling, might not be present in
37  * this sample code.
38  */
39 
40 
41 import java.io.IOException;
42 import java.nio.ByteBuffer;
43 import java.nio.channels.AsynchronousSocketChannel;
44 import java.nio.channels.CompletionHandler;
45 import java.util.LinkedList;
46 import java.util.Queue;
47 import java.util.concurrent.atomic.AtomicReference;
48 
49 /**
50  * Client represents a remote connection to the chat server.
51  * It contains methods for reading and writing messages from the
52  * channel.
53  * Messages are considered to be separated by newline, so incomplete
54  * messages are buffered in the {@code Client}.
55  *
56  * All reads and writes are asynchronous and uses the nio2 asynchronous
57  * elements.
58  */
59 class Client {
60     private final AsynchronousSocketChannel channel;
61     private AtomicReference<ClientReader> reader;
62     private String userName;
63     private final StringBuilder messageBuffer = new StringBuilder();
64 
65     private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
66     private boolean writing = false;
67 
Client(AsynchronousSocketChannel channel, ClientReader reader)68     public Client(AsynchronousSocketChannel channel, ClientReader reader) {
69         this.channel = channel;
70         this.reader = new AtomicReference<ClientReader>(reader);
71     }
72 
73     /**
74      * Enqueues a write of the buffer to the channel.
75      * The call is asynchronous so the buffer is not safe to modify after
76      * passing the buffer here.
77      *
78      * @param buffer the buffer to send to the channel
79      */
writeMessage(final ByteBuffer buffer)80     private void writeMessage(final ByteBuffer buffer) {
81         boolean threadShouldWrite = false;
82 
83         synchronized(queue) {
84             queue.add(buffer);
85             // Currently no thread writing, make this thread dispatch a write
86             if (!writing) {
87                 writing = true;
88                 threadShouldWrite = true;
89             }
90         }
91 
92         if (threadShouldWrite) {
93             writeFromQueue();
94         }
95     }
96 
writeFromQueue()97     private void writeFromQueue() {
98         ByteBuffer buffer;
99 
100         synchronized (queue) {
101             buffer = queue.poll();
102             if (buffer == null) {
103                 writing = false;
104             }
105         }
106 
107         // No new data in buffer to write
108         if (writing) {
109             writeBuffer(buffer);
110         }
111     }
112 
writeBuffer(ByteBuffer buffer)113     private void writeBuffer(ByteBuffer buffer) {
114         channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
115             @Override
116             public void completed(Integer result, ByteBuffer buffer) {
117                 if (buffer.hasRemaining()) {
118                     channel.write(buffer, buffer, this);
119                 } else {
120                     // Go back and check if there is new data to write
121                     writeFromQueue();
122                 }
123             }
124 
125             @Override
126             public void failed(Throwable exc, ByteBuffer attachment) {
127             }
128         });
129     }
130 
131     /**
132      * Sends a message
133      * @param string the message
134      */
writeStringMessage(String string)135     public void writeStringMessage(String string) {
136         writeMessage(ByteBuffer.wrap(string.getBytes()));
137     }
138 
139     /**
140      * Send a message from a specific client
141      * @param client the message is sent from
142      * @param message to send
143      */
writeMessageFrom(Client client, String message)144     public void writeMessageFrom(Client client, String message) {
145         if (reader.get().acceptsMessages()) {
146             writeStringMessage(client.getUserName() + ": " + message);
147         }
148     }
149 
150     /**
151      * Enqueue a read
152      * @param completionHandler callback on completed read
153      */
read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler)154     public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
155         ByteBuffer input = ByteBuffer.allocate(256);
156         if (!channel.isOpen()) {
157             return;
158         }
159         channel.read(input, input, completionHandler);
160     }
161 
162     /**
163      * Closes the channel
164      */
close()165     public void close() {
166         try {
167             channel.close();
168         } catch (IOException e) {
169             e.printStackTrace();
170         }
171     }
172 
173     /**
174      * Run the current states actions.
175      */
run()176     public void run() {
177         reader.get().run(this);
178     }
179 
setUserName(String userName)180     public void setUserName(String userName) {
181         this.userName = userName;
182     }
183 
setReader(ClientReader reader)184     public void setReader(ClientReader reader) {
185         this.reader.set(reader);
186     }
187 
getUserName()188     public String getUserName() {
189         return userName;
190     }
191 
appendMessage(String message)192     public void appendMessage(String message) {
193         synchronized (messageBuffer) {
194             messageBuffer.append(message);
195         }
196     }
197 
198     /**
199      * @return the next newline separated message in the buffer. null is returned if the buffer
200      * doesn't contain any newline.
201      */
nextMessage()202     public String nextMessage() {
203         synchronized(messageBuffer) {
204             int nextNewline = messageBuffer.indexOf("\n");
205             if (nextNewline == -1) {
206                 return null;
207             }
208             String message = messageBuffer.substring(0, nextNewline + 1);
209             messageBuffer.delete(0, nextNewline + 1);
210             return message;
211         }
212     }
213 }
214