11using System ;
22using System . Collections . Generic ;
33using System . IO ;
4- using System . Threading ;
5- using System . Threading . Tasks ;
64using Azure . Storage . Blobs ;
75using Azure . Storage . Blobs . Specialized ;
86
@@ -14,7 +12,28 @@ public class BlobStream : Stream
1412 private const int PageSizeInBytes = 512 ;
1513 public const int DefaultBufferSize = 1024 * 1024 * 4 ;
1614 private readonly PageBlobClient _pageBlob ;
15+ private object _syncRoot = new ( ) ;
1716
17+ public static BlobStream OpenStream ( PageBlobClient pageBlob )
18+ {
19+ if ( ! pageBlob . Exists ( ) )
20+ {
21+ pageBlob . Create ( 0 ) ;
22+ }
23+
24+ return new BlobStream ( pageBlob ) ;
25+ }
26+
27+ public static BufferedStream OpenBufferedStream ( PageBlobClient pageBlob )
28+ {
29+ if ( ! pageBlob . Exists ( ) )
30+ {
31+ pageBlob . Create ( 0 ) ;
32+ }
33+
34+ return new BufferedStream ( new BlobStream ( pageBlob ) , DefaultBufferSize ) ;
35+ }
36+
1837 public BlobStream ( string connectionString , string container , string fileName )
1938 : this ( GetClient ( connectionString , container , fileName ) )
2039 {
@@ -26,7 +45,7 @@ public BlobStream(PageBlobClient pageBlob)
2645 _pageBlob . CreateIfNotExists ( 0 ) ;
2746 }
2847
29- private long BlobLength => _pageBlob . GetProperties ( ) . Value . ContentLength ;
48+ private long BlobLength => _pageBlob . GetProperties ( ) . Value . ContentLength ;
3049
3150 public override bool CanRead => true ;
3251
@@ -80,52 +99,45 @@ public override long Seek(long offset, SeekOrigin origin)
8099 Position += offset ;
81100 break ;
82101 case SeekOrigin . End :
83- Position = Length - offset ;
102+ Position = Length + offset ;
84103 break ;
85104 }
86-
105+
87106 return Position ;
88107 }
89108
90109 public override void SetLength ( long value )
91110 {
92- if ( value > Length )
111+ var length = Length ;
112+ if ( value != length )
93113 {
94114 var newSize = NextPageAddress ( value ) ;
95115 _pageBlob . Resize ( newSize ) ;
96116 }
117+
118+ SetLengthInternal ( value ) ;
97119 }
98120
99121 public override int Read ( byte [ ] buffer , int offset , int count )
100122 {
101- var bytesRead = 0 ;
102- using ( var stream = _pageBlob . OpenRead ( false , Position ) )
123+ lock ( _syncRoot )
103124 {
104- bytesRead = stream . Read ( buffer , offset , count ) ;
105- if ( Position + count > Length )
125+ int bytesRead = 0 ;
126+ var length = Length ;
127+
128+ if ( Position + count > length )
106129 {
107- bytesRead = ( int ) ( Length - Position ) ;
130+ count = ( int ) ( length - Position ) ;
108131 }
109- }
110-
111- Position += bytesRead ;
112- return bytesRead ;
113- }
114-
115- public override async Task < int > ReadAsync ( byte [ ] buffer , int offset , int count , CancellationToken cancellationToken )
116- {
117- var bytesRead = 0 ;
118- using ( var stream = await _pageBlob . OpenReadAsync ( false , Position , cancellationToken : cancellationToken ) )
119- {
120- bytesRead = await stream . ReadAsync ( buffer , offset , count , cancellationToken ) ;
121- if ( Position + count > Length )
132+
133+ using ( var stream = _pageBlob . OpenRead ( false , Position ) )
122134 {
123- bytesRead = ( int ) ( Length - Position ) ;
135+ bytesRead = stream . Read ( buffer , offset , count ) ;
124136 }
125- }
126137
127- Position += bytesRead ;
128- return bytesRead ;
138+ Position += bytesRead ;
139+ return bytesRead ;
140+ }
129141 }
130142
131143 private void EnsureCapacity ( long position )
@@ -137,84 +149,40 @@ private void EnsureCapacity(long position)
137149 }
138150 }
139151
140- private async Task EnsureCapacityAsync ( long position )
141- {
142- if ( BlobLength < position )
143- {
144- var newSize = NextPageAddress ( position ) ;
145- await _pageBlob . ResizeAsync ( newSize ) ;
146- }
147- }
148-
149152 public override void Write ( byte [ ] buffer , int offset , int count )
150153 {
151- EnsureCapacity ( Position + count ) ;
152-
153- var pageStartAddress = PreviousPageAddress ( Position ) ;
154- var pageBytes = NextPageAddress ( Position + count ) - pageStartAddress ;
155- var offsetInFirstPage = ( int ) ( Position % PageSizeInBytes ) ;
156- var offsetInLastPage = ( offsetInFirstPage + count ) % PageSizeInBytes ;
157-
158- var bufferToMerge = new byte [ pageBytes ] ;
159- if ( offsetInFirstPage > 0 || ( pageBytes > PageSizeInBytes && offsetInLastPage > 0 ) )
154+ lock ( _syncRoot )
160155 {
161- var localCount = ( int ) ( pageBytes - PageSizeInBytes ) ;
162- using ( var stream = _pageBlob . OpenRead ( false , pageStartAddress ) )
156+ var pageStartAddress = PreviousPageAddress ( Position ) ;
157+ var pageBytes = NextPageAddress ( Position + count ) - pageStartAddress ;
158+ var offsetInFirstPage = ( int ) ( Position % PageSizeInBytes ) ;
159+ var offsetInLastPage = ( offsetInFirstPage + count ) % PageSizeInBytes ;
160+
161+ var bufferToMerge = new byte [ pageBytes ] ;
162+ if ( offsetInFirstPage > 0 || ( pageBytes > PageSizeInBytes && offsetInLastPage > 0 ) )
163163 {
164- _ = stream . Read ( bufferToMerge , 0 , localCount ) ;
164+ //var localCount = (int)(pageBytes - PageSizeInBytes);
165+ using ( var stream = _pageBlob . OpenRead ( false , pageStartAddress ) )
166+ {
167+ _ = stream . Read ( bufferToMerge , 0 , ( int ) pageBytes ) ;
168+ }
165169 }
166- }
167170
168- Buffer . BlockCopy ( buffer , offset , bufferToMerge , offsetInFirstPage , count ) ;
169-
170- EnsureCapacity ( pageStartAddress + bufferToMerge . Length ) ;
171-
172- using ( var stream = _pageBlob . OpenWrite ( false , pageStartAddress ) )
173- {
174- stream . Write ( bufferToMerge , 0 , bufferToMerge . Length ) ;
175- stream . Flush ( ) ;
176- }
177-
178- Position += count ;
179- if ( Position > Length )
180- {
181- SetLengthInternal ( Position ) ;
182- }
183- }
171+ Buffer . BlockCopy ( buffer , offset , bufferToMerge , offsetInFirstPage , count ) ;
172+
173+ EnsureCapacity ( pageStartAddress + bufferToMerge . Length ) ;
184174
185- public override async Task WriteAsync ( byte [ ] buffer , int offset , int count , CancellationToken cancellationToken )
186- {
187- //await EnsureCapacityAsync(Position + count);
188-
189- var pageStartAddress = PreviousPageAddress ( Position ) ;
190- var pageBytes = NextPageAddress ( Position + count ) - pageStartAddress ;
191- var offsetInFirstPage = ( int ) ( Position % PageSizeInBytes ) ;
192- var offsetInLastPage = ( offsetInFirstPage + count ) % PageSizeInBytes ;
193-
194- var bufferToMerge = new byte [ pageBytes ] ;
195- if ( offsetInFirstPage > 0 || ( pageBytes > PageSizeInBytes && offsetInLastPage > 0 ) )
196- {
197- var localCount = ( int ) ( pageBytes - PageSizeInBytes ) ;
198- using ( var stream = await _pageBlob . OpenReadAsync ( false , pageStartAddress , cancellationToken : cancellationToken ) )
175+ using ( var stream = _pageBlob . OpenWrite ( false , pageStartAddress ) )
199176 {
200- _ = await stream . ReadAsync ( bufferToMerge , 0 , localCount , cancellationToken ) ;
177+ stream . Write ( bufferToMerge , 0 , bufferToMerge . Length ) ;
178+ stream . Flush ( ) ;
201179 }
202- }
203-
204- Buffer . BlockCopy ( buffer , offset , bufferToMerge , offsetInFirstPage , count ) ;
205180
206- await EnsureCapacityAsync ( pageStartAddress + bufferToMerge . Length ) ;
207-
208- using ( var stream = await _pageBlob . OpenWriteAsync ( false , pageStartAddress , cancellationToken : cancellationToken ) )
209- {
210- await stream . WriteAsync ( bufferToMerge , 0 , bufferToMerge . Length , cancellationToken ) ;
211- await stream . FlushAsync ( cancellationToken ) ;
212- }
213-
214- Position += count ;
215- if ( Position > Length )
216- {
217- await SetLengthInternalAsync ( Position , cancellationToken ) ;
181+ Position += count ;
182+ if ( Position > Length )
183+ {
184+ SetLengthInternal ( Position ) ;
185+ }
218186 }
219187 }
220188
@@ -229,30 +197,12 @@ private long PreviousPageAddress(long position)
229197 var previousPageAddress = position - position % PageSizeInBytes ;
230198 return previousPageAddress ;
231199 }
232-
233- public static BlobStream Open ( PageBlobClient pageBlob )
234- {
235- if ( ! pageBlob . Exists ( ) )
236- {
237- pageBlob . Create ( 0 ) ;
238- }
239-
240- return new BlobStream ( pageBlob ) ;
241- }
242-
200+
243201 private void SetLengthInternal ( long newLength )
244202 {
245203 _pageBlob . SetMetadata ( new Dictionary < string , string >
246204 {
247205 [ MetadataLengthKey ] = newLength . ToString ( )
248206 } ) ;
249207 }
250-
251- private Task SetLengthInternalAsync ( long newLength , CancellationToken cancellationToken = default )
252- {
253- return _pageBlob . SetMetadataAsync ( new Dictionary < string , string >
254- {
255- [ MetadataLengthKey ] = newLength . ToString ( )
256- } , cancellationToken : cancellationToken ) ;
257- }
258208}
0 commit comments