• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package okio.samples;
17 
18 import java.io.IOException;
19 import okio.Buffer;
20 import okio.BufferedSource;
21 import okio.ForwardingSource;
22 import okio.Okio;
23 import okio.Source;
24 
25 /**
26  * Builds a buffered source that can rewind to a marked position earlier in the stream.
27  *
28  * <p>Mark potential positions to rewind back to with {@link #mark}; rewind back to these positions
29  * with {@link #reset}. Both operations apply to the position in the {@linkplain #source() buffered
30  * source}; resetting will impact the buffer.
31  *
32  * <p>When marking it is necessary to specify how much data to retain. Once you advance above this
33  * limit, the mark is discarded and resetting is not permitted. This may be used to lookahead a
34  * fixed number of bytes without loading an entire stream into memory. To reset an arbitrary
35  * number of bytes use {@code mark(Long#MAX_VALUE)}.
36  */
37 public final class SourceMarker {
38 
39   /*
40    * This class wraps the underlying source in a MarkSource to support mark and reset. It creates a
41    * BufferedSource for the caller so that it can track its offsets and manipulate its buffer.
42    */
43 
44   /**
45    * The offset into the underlying source. To compute the user's offset start with this and
46    * subtract userBuffer.size().
47    */
48   long offset;
49 
50   /** The offset of the earliest mark, or -1 for no mark. */
51   long mark = -1L;
52 
53   /** The offset of the latest readLimit, or -1 for no mark. */
54   long limit = -1L;
55 
56   boolean closed;
57 
58   final MarkSource markSource;
59   final BufferedSource userSource;
60 
61   /** A copy of the underlying source's data beginning at {@code mark}. */
62   final Buffer markBuffer;
63 
64   /** Just the userSource's buffer. */
65   final Buffer userBuffer;
66 
SourceMarker(Source source)67   public SourceMarker(Source source) {
68     this.markSource = new MarkSource(source);
69     this.markBuffer = new Buffer();
70     this.userSource = Okio.buffer(markSource);
71     this.userBuffer = userSource.getBuffer();
72   }
73 
source()74   public BufferedSource source() {
75     return userSource;
76   }
77 
78   /**
79    * Marks the current position in the stream as one to potentially return back to. Returns the
80    * offset of this position. Call {@link #reset(long)} with this position to return to it later. It
81    * is an error to call {@link #reset(long)} after consuming more than {@code readLimit} bytes from
82    * {@linkplain #source() the source}.
83    */
mark(long readLimit)84   public long mark(long readLimit) throws IOException {
85     if (readLimit < 0L) {
86       throw new IllegalArgumentException("readLimit < 0: " + readLimit);
87     }
88 
89     if (closed) {
90       throw new IllegalStateException("closed");
91     }
92 
93     // Mark the current position in the buffered source.
94     long userOffset = offset - userBuffer.size();
95 
96     // If this is a new mark promote userBuffer data into the markBuffer.
97     if (mark == -1L) {
98       markBuffer.writeAll(userBuffer);
99       mark = userOffset;
100       offset = userOffset;
101     }
102 
103     // Grow the limit if necessary.
104     long newMarkBufferLimit = userOffset + readLimit;
105     if (newMarkBufferLimit < 0) newMarkBufferLimit = Long.MAX_VALUE; // Long overflow!
106     limit = Math.max(limit, newMarkBufferLimit);
107 
108     return userOffset;
109   }
110 
111   /** Resets {@linkplain #source() the source} to {@code userOffset}. */
reset(long userOffset)112   public void reset(long userOffset) throws IOException {
113     if (closed) {
114       throw new IllegalStateException("closed");
115     }
116 
117     if (userOffset < mark // userOffset is before mark.
118         || userOffset > limit // userOffset is beyond limit.
119         || userOffset > mark + markBuffer.size() // userOffset is in the future.
120         || offset - userBuffer.size() > limit) { // Stream advanced beyond limit.
121       throw new IOException("cannot reset to " + userOffset + ": out of range");
122     }
123 
124     // Clear userBuffer to cause data at 'offset' to be returned by the next read.
125     offset = userOffset;
126     userBuffer.clear();
127   }
128 
129   final class MarkSource extends ForwardingSource {
MarkSource(Source source)130     MarkSource(Source source) {
131       super(source);
132     }
133 
read(Buffer sink, long byteCount)134     @Override public long read(Buffer sink, long byteCount) throws IOException {
135       if (closed) {
136         throw new IllegalStateException("closed");
137       }
138 
139       // If there's no mark, go to the underlying source.
140       if (mark == -1L) {
141         long result = super.read(sink, byteCount);
142         if (result == -1L) return -1L;
143         offset += result;
144         return result;
145       }
146 
147       // If we can read from markBuffer, do that.
148       if (offset < mark + markBuffer.size()) {
149         long posInBuffer = offset - mark;
150         long result = Math.min(byteCount, markBuffer.size() - posInBuffer);
151         markBuffer.copyTo(sink, posInBuffer, result);
152         offset += result;
153         return result;
154       }
155 
156       // If we can write to markBuffer, do that.
157       if (offset < limit) {
158         long byteCountBeforeLimit = limit - (mark + markBuffer.size());
159         long result = super.read(markBuffer, Math.min(byteCount, byteCountBeforeLimit));
160         if (result == -1L) return -1L;
161         markBuffer.copyTo(sink, markBuffer.size() - result, result);
162         offset += result;
163         return result;
164       }
165 
166       // Attempt to read past the limit. Data will not be saved.
167       long result = super.read(sink, byteCount);
168       if (result == -1L) return -1L;
169 
170       // We read past the limit. Discard marked data.
171       markBuffer.clear();
172       mark = -1L;
173       limit = -1L;
174       return result;
175     }
176 
close()177     @Override public void close() throws IOException {
178       if (closed) return;
179 
180       closed = true;
181       markBuffer.clear();
182       super.close();
183     }
184   }
185 }
186