XRootD
Loading...
Searching...
No Matches
XrdPfcIOFileBlock.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include <math.h>
20#include <sstream>
21#include <cstdio>
22#include <iostream>
23#include <assert.h>
24#include <fcntl.h>
25
26#include "XrdPfcIOFileBlock.hh"
27#include "XrdPfc.hh"
28#include "XrdPfcStats.hh"
29#include "XrdPfcTrace.hh"
30
31#include "XrdSys/XrdSysError.hh"
33
34#include "XrdOuc/XrdOucEnv.hh"
35
36using namespace XrdPfc;
37
38//______________________________________________________________________________
40 IO(io, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_info_file(0)
41{
43 GetBlockSizeFromPath();
44 initLocalStat();
45}
46
47//______________________________________________________________________________
49{
50 // called from Detach() if no sync is needed or
51 // from Cache's sync thread
52
53 TRACEIO(Debug, "deleting IOFileBlock");
54}
55
56// Check if m_mutex is needed at all, it is only used in ioActive and DetachFinalize
57// and in Read for block selection -- see if Prefetch Read requires mutex
58// to be held.
59// I think I need it in ioActive and Read.
60
61//______________________________________________________________________________
63{
64 IO::Update(iocp);
65 {
66 XrdSysMutexHelper lock(&m_mutex);
67
68 for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
69 {
70 // Need to update all File / block objects.
71 if (it->second) it->second->ioUpdated(this);
72 }
73 }
74}
75
76//______________________________________________________________________________
78{
79 // Called from XrdPosixFile when local connection is closed.
80
82
83 bool active = false;
84 {
85 XrdSysMutexHelper lock(&m_mutex);
86
87 for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
88 {
89 // Need to initiate stop on all File / block objects.
90 if (it->second && it->second->ioActive(this))
91 {
92 active = true;
93 }
94 }
95 }
96
97 return active;
98}
99
100//______________________________________________________________________________
102{
103 // Effectively a destructor.
104
105 TRACEIO(Info, "DetachFinalize() " << this);
106
107 CloseInfoFile();
108 {
109 XrdSysMutexHelper lock(&m_mutex);
110 for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
111 {
112 if (it->second)
113 {
114 it->second->RequestSyncOfDetachStats();
115 m_cache.ReleaseFile(it->second, this);
116 }
117 }
118 }
119
120 delete this;
121}
122
123//______________________________________________________________________________
124void IOFileBlock::CloseInfoFile()
125{
126 // write access statistics to info file and close it
127 // detach time is needed for file purge
128 if (m_info_file)
129 {
130 if (m_info.GetFileSize() > 0)
131 {
132 // We do not maintain access statistics for individual blocks.
133 Stats as;
134 m_info.WriteIOStatDetach(as);
135 }
136 m_info.Write(m_info_file, GetFilename().c_str());
137 m_info_file->Fsync();
138 m_info_file->Close();
139
140 delete m_info_file;
141 m_info_file = 0;
142 }
143}
144
145//______________________________________________________________________________
146void IOFileBlock::GetBlockSizeFromPath()
147{
148 const static std::string tag = "hdfsbsize=";
149
150 std::string path = GetInput()->Path();
151 size_t pos1 = path.find(tag);
152 size_t t = tag.length();
153
154 if (pos1 != path.npos)
155 {
156 pos1 += t;
157 size_t pos2 = path.find("&", pos1);
158 if (pos2 != path.npos )
159 {
160 std::string bs = path.substr(pos1, pos2 - pos1);
161 m_blocksize = atoi(bs.c_str());
162 }
163 else
164 {
165 m_blocksize = atoi(path.substr(pos1).c_str());
166 }
167
168 TRACEIO(Debug, "GetBlockSizeFromPath(), blocksize = " << m_blocksize );
169 }
170}
171
172//______________________________________________________________________________
173File* IOFileBlock::newBlockFile(long long off, int blocksize)
174{
175 // NOTE: Can return 0 if opening of a local file fails!
176
177 std::string fname = GetFilename();
178
179 std::stringstream ss;
180 ss << fname;
181 char offExt[64];
182 // filename like <origpath>___<size>_<offset>
183 sprintf(&offExt[0], "___%lld_%lld", m_blocksize, off);
184 ss << &offExt[0];
185 fname = ss.str();
186
187 TRACEIO(Debug, "FileBlock(), create XrdPfcFile ");
188
189 File *file = Cache::GetInstance().GetFile(fname, this, off, blocksize);
190 return file;
191}
192
193//______________________________________________________________________________
194int IOFileBlock::Fstat(struct stat &sbuff)
195{
196 // local stat is create in constructor. if file was on disk before
197 // attach that the only way stat was not successful is becuse there
198 // were info file read errors
199 if ( ! m_localStat) return -ENOENT;
200
201 memcpy(&sbuff, m_localStat, sizeof(struct stat));
202 return 0;
203}
204
205//______________________________________________________________________________
207{
208 if ( ! m_localStat) return -ENOENT;
209
210 return m_localStat->st_size;
211}
212
213//______________________________________________________________________________
214int IOFileBlock::initLocalStat()
215{
216 std::string path = GetFilename() + Info::s_infoExtension;
217
218 int res = -1;
219 struct stat tmpStat;
220 XrdOucEnv myEnv;
221
222 // try to read from existing file
223 if (m_cache.GetOss()->Stat(path.c_str(), &tmpStat) == XrdOssOK)
224 {
225 m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
226 if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
227 {
228 if (m_info.Read(m_info_file, path.c_str()))
229 {
230 tmpStat.st_size = m_info.GetFileSize();
231 TRACEIO(Info, "initCachedStat successfully read size from existing info file = " << tmpStat.st_size);
232 res = 0;
233 }
234 else
235 {
236 // file exist but can't read it
237 TRACEIO(Debug, "initCachedStat info file is not complete");
238 }
239 }
240 }
241
242 // if there is no local info file, try to read from client and then save stat into a new *cinfo file
243 if (res)
244 {
245 if (m_info_file) { delete m_info_file; m_info_file = 0; }
246
247 res = GetInput()->Fstat(tmpStat);
248 TRACEIO(Debug, "initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size);
249 if (res == 0)
250 {
251 if (m_cache.GetOss()->Create(m_cache.RefConfiguration().m_username.c_str(), path.c_str(), 0600, myEnv, XRDOSS_mkpath) == XrdOssOK)
252 {
253 m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
254 if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
255 {
256 // This is writing the top-level cinfo
257 // The info file is used to get file size on defer open
258 // don't initalize buffer, it does not hold useful information in this case
260 // m_info.DisableDownloadStatus(); -- this stopped working a while back.
261 m_info.Write(m_info_file, path.c_str());
262 m_info_file->Fsync();
263 }
264 else
265 {
266 TRACEIO(Error, "initCachedStat can't open info file path");
267 }
268 }
269 else
270 {
271 TRACEIO(Error, "initCachedStat can't create info file path");
272 }
273 }
274 }
275
276 if (res == 0)
277 {
278 m_localStat = new struct stat;
279 memcpy(m_localStat, &tmpStat, sizeof(struct stat));
280 }
281
282 return res;
283}
284
285//______________________________________________________________________________
286int IOFileBlock::Read(char *buff, long long off, int size)
287{
288 // protect from reads over the file size
289
290 long long fileSize = FSize();
291
292 if (off >= fileSize)
293 return 0;
294 if (off < 0)
295 {
296 return -EINVAL;
297 }
298 if (off + size > fileSize)
299 size = fileSize - off;
300
301 long long off0 = off;
302 int idx_first = off0 / m_blocksize;
303 int idx_last = (off0 + size - 1) / m_blocksize;
304 int bytes_read = 0;
305 TRACEIO(Dump, "Read() "<< off << "@" << size << " block range ["<< idx_first << ", " << idx_last << "]");
306
307 for (int blockIdx = idx_first; blockIdx <= idx_last; ++blockIdx)
308 {
309 // locate block
310 File *fb;
311 m_mutex.Lock();
312 std::map<int, File*>::iterator it = m_blocks.find(blockIdx);
313 if (it != m_blocks.end())
314 {
315 fb = it->second;
316 }
317 else
318 {
319 size_t pbs = m_blocksize;
320 // check if this is last block
321 int lastIOFileBlock = (fileSize-1)/m_blocksize;
322 if (blockIdx == lastIOFileBlock )
323 {
324 pbs = fileSize - blockIdx*m_blocksize;
325 // TRACEIO(Dump, "Read() last block, change output file size to " << pbs);
326 }
327
328 // Note: File* can be 0 and stored as 0 if local open fails!
329 fb = newBlockFile(blockIdx*m_blocksize, pbs);
330 m_blocks.insert(std::make_pair(blockIdx, fb));
331 }
332 m_mutex.UnLock();
333
334 // edit size if read request is reaching more than a block
335 int readBlockSize = size;
336 if (idx_first != idx_last)
337 {
338 if (blockIdx == idx_first)
339 {
340 readBlockSize = (blockIdx + 1) * m_blocksize - off0;
341 TRACEIO(Dump, "Read partially till the end of the block");
342 }
343 else if (blockIdx == idx_last)
344 {
345 readBlockSize = (off0 + size) - blockIdx * m_blocksize;
346 TRACEIO(Dump, "Read partially till the end of the block");
347 }
348 else
349 {
350 readBlockSize = m_blocksize;
351 }
352 }
353
354 TRACEIO(Dump, "Read() block[ " << blockIdx << "] read-block-size[" << readBlockSize << "], offset[" << readBlockSize << "] off = " << off );
355
356 int retvalBlock;
357 if (fb != 0)
358 {
359 struct ZHandler : public ReadReqRH
360 { using ReadReqRH::ReadReqRH;
361 XrdSysCondVar m_cond {0};
362 int m_retval {0};
363
364 void Done(int result) override
365 { m_cond.Lock(); m_retval = result; m_cond.Signal(); m_cond.UnLock(); }
366 };
367
368 ReadReqRHCond rh(ObtainReadSid(), nullptr);
369
370 rh.m_cond.Lock();
371 retvalBlock = fb->Read(this, buff, off, readBlockSize, &rh);
372 if (retvalBlock == -EWOULDBLOCK)
373 {
374 rh.m_cond.Wait();
375 retvalBlock = rh.m_retval;
376 }
377 rh.m_cond.UnLock();
378 }
379 else
380 {
381 retvalBlock = GetInput()->Read(buff, off, readBlockSize);
382 }
383
384 TRACEIO(Dump, "Read() Block read returned " << retvalBlock);
385 if (retvalBlock == readBlockSize)
386 {
387 bytes_read += retvalBlock;
388 buff += retvalBlock;
389 off += retvalBlock;
390 }
391 else if (retvalBlock >= 0)
392 {
393 TRACEIO(Warning, "Read() incomplete read, missing bytes " << readBlockSize-retvalBlock);
394 return -EIO;
395 }
396 else
397 {
398 TRACEIO(Error, "Read() read error, retval" << retvalBlock);
399 return retvalBlock;
400 }
401 }
402
403 return bytes_read;
404}
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACEIO(act, x)
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
virtual int Fsync()
Definition XrdOss.hh:144
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Read(char *buff, long long offs, int rlen)=0
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:411
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:315
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:159
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:492
XrdOss * GetOss() const
Definition XrdPfc.hh:385
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void Update(XrdOucCacheIO &iocp) override
long long FSize() override
int Fstat(struct stat &sbuff) override
int Read(char *Buffer, long long Offset, int Length) override
IOFileBlock(XrdOucCacheIO *io, Cache &cache)
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
std::string GetFilename()
Definition XrdPfcIO.hh:56
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:30
Cache & m_cache
reference to Cache object
Definition XrdPfcIO.hh:52
const char * RefreshLocation()
Definition XrdPfcIO.hh:57
void Update(XrdOucCacheIO &iocp) override
Definition XrdPfcIO.cc:16
unsigned short ObtainReadSid()
Definition XrdPfcIO.hh:59
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Statistics of cache utilisation by a File object.
XrdSysTrace * GetTrace()
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:108
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:67