XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfc.hh"
23#include "XrdPfcIO.hh"
24#include "XrdPfcTrace.hh"
25
27#include "XrdSys/XrdSysTimer.hh"
28#include "XrdOss/XrdOss.hh"
29#include "XrdOuc/XrdOucEnv.hh"
31
32#include "XrdCl/XrdClURL.hh"
33
34#include <cassert>
35#include <cstdio>
36#include <sstream>
37#include <unordered_map>
38
39#include <fcntl.h>
40
41using namespace XrdPfc;
42
43namespace
44{
45
46const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
47
48Cache* cache() { return &Cache::GetInstance(); }
49
50}
51
52const char *File::m_traceID = "File";
53
54//------------------------------------------------------------------------------
55
56File::File(const std::string& path, long long iOffset, long long iFileSize) :
57 m_ref_cnt(0),
58 m_data_file(0),
59 m_info_file(0),
60 m_cfi(Cache::TheOne().GetTrace(), Cache::TheOne().is_prefetch_enabled()),
61 m_filename(path),
62 m_offset(iOffset),
63 m_file_size(iFileSize),
64 m_current_io(m_io_set.end()),
65 m_ios_in_detach(0),
66 m_non_flushed_cnt(0),
67 m_in_sync(false),
68 m_detach_time_logged(false),
69 m_in_shutdown(false),
70 m_state_cond(0),
71 m_block_size(0),
72 m_num_blocks(0),
73 m_resmon_token(-1),
74 m_prefetch_state(kOff),
75 m_prefetch_bytes(0),
76 m_prefetch_read_cnt(0),
77 m_prefetch_hit_cnt(0),
78 m_prefetch_score(0)
79{}
80
81File::~File()
82{
83 TRACEF(Debug, "~File() for ");
84}
85
86void File::Close()
87{
88 // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
89 // A stat is called after close to re-check that m_stat_blocks have been reported correctly
90 // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
91 // in report_and_merge_delta_stats() below.
92 //
93 // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
94 // get reported in as stat.st_blocks.
95 // The reported number is correct in a stat immediately following a close.
96 // If one starts off by writing the last byte of the file, this pre-allocation does not get
97 // triggered up to that point. But comes back with a vengeance right after.
98 //
99 // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
100
101 if (m_info_file)
102 {
103 TRACEF(Debug, "Close() closing info-file ");
104 m_info_file->Close();
105 delete m_info_file;
106 m_info_file = nullptr;
107 }
108
109 if (m_data_file)
110 {
111 TRACEF(Debug, "Close() closing data-file ");
112 m_data_file->Close();
113 delete m_data_file;
114 m_data_file = nullptr;
115 }
116
117 if (m_resmon_token >= 0)
118 {
119 // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
120 // but in this case the file will get unlinked by the cache and reported as purge event.
121 // We check if the reported st_blocks so far is correct.
122 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
123 struct stat s;
124 int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
125 if (sr == 0 && s.st_blocks != m_st_blocks) {
126 Stats stats;
127 stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
128 m_st_blocks = s.st_blocks;
129 Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
130 }
131 }
132
133 Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
134 }
135
136 TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
137}
138
139//------------------------------------------------------------------------------
140
141File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO)
142{
143 File *file = new File(path, offset, fileSize);
144 if ( ! file->Open(inputIO))
145 {
146 delete file;
147 file = 0;
148 }
149 return file;
150}
151
152//------------------------------------------------------------------------------
153
155{
156 // Called from Cache::Unlink() when the file is currently open.
157 // Cache::Unlink is also called on FSync error and when wrong number of bytes
158 // is received from a remote read.
159 //
160 // From this point onward the file will not be written to, cinfo file will
161 // not be updated, and all new read requests will return -ENOENT.
162 //
163 // File's entry in the Cache's active map is set to nullptr and will be
164 // removed from there shortly, in any case, well before this File object
165 // shuts down. Cache::Unlink() also reports the appropriate purge event.
166
167 XrdSysCondVarHelper _lck(m_state_cond);
168
169 m_in_shutdown = true;
170
171 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
172 {
173 m_prefetch_state = kStopped;
174 cache()->DeRegisterPrefetchFile(this);
175 }
176
177 report_and_merge_delta_stats();
178
179 return m_st_blocks;
180}
181
182//------------------------------------------------------------------------------
183
184void File::check_delta_stats()
185{
186 // Called under m_state_cond lock.
187 // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
188 if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
189 report_and_merge_delta_stats();
190}
191
192void File::report_and_merge_delta_stats()
193{
194 // Called under m_state_cond lock.
195 struct stat s;
196 m_data_file->Fstat(&s);
197 // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
198 // aggressive pre-allocation in this field (XFS, 4GB).
199 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
200 : m_file_size >> 9;
201 long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
202 m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
203 m_st_blocks = st_blocks_to_report;
204 Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
205 m_stats.AddUp(m_delta_stats);
206 m_delta_stats.Reset();
207}
208
209//------------------------------------------------------------------------------
210
212{
213 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
214
215 XrdSysCondVarHelper _lck(m_state_cond);
216 dec_ref_count(b);
217}
218
219void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
220{
221 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222
223 XrdSysCondVarHelper _lck(m_state_cond);
224
225 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
226 {
227 dec_ref_count(*i);
228 }
229}
230
231//------------------------------------------------------------------------------
232
234{
235 std::string loc(io->GetLocation());
236 XrdSysCondVarHelper _lck(m_state_cond);
237 insert_remote_location(loc);
238}
239
240//------------------------------------------------------------------------------
241
243{
244 // Returns true if delay is needed.
245
246 TRACEF(Debug, "ioActive start for io " << io);
247
248 std::string loc(io->GetLocation());
249
250 {
251 XrdSysCondVarHelper _lck(m_state_cond);
252
253 IoSet_i mi = m_io_set.find(io);
254
255 if (mi != m_io_set.end())
256 {
257 unsigned int n_active_reads = io->m_active_read_reqs;
258
259 TRACE(Info, "ioActive for io " << io <<
260 ", active_reads " << n_active_reads <<
261 ", active_prefetches " << io->m_active_prefetches <<
262 ", allow_prefetching " << io->m_allow_prefetching <<
263 ", ios_in_detach " << m_ios_in_detach);
264 TRACEF(Info,
265 "\tio_map.size() " << m_io_set.size() <<
266 ", block_map.size() " << m_block_map.size() << ", file");
267
268 insert_remote_location(loc);
269
270 io->m_allow_prefetching = false;
271 io->m_in_detach = true;
272
273 // Check if any IO is still available for prfetching. If not, stop it.
274 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
275 {
276 if ( ! select_current_io_or_disable_prefetching(false) )
277 {
278 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
279 }
280 }
281
282 // On last IO, consider write queue blocks. Note, this also contains
283 // blocks being prefetched.
284
285 bool io_active_result;
286
287 if (n_active_reads > 0)
288 {
289 io_active_result = true;
290 }
291 else if (m_io_set.size() - m_ios_in_detach == 1)
292 {
293 io_active_result = ! m_block_map.empty();
294 }
295 else
296 {
297 io_active_result = io->m_active_prefetches > 0;
298 }
299
300 if ( ! io_active_result)
301 {
302 ++m_ios_in_detach;
303 }
304
305 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
306
307 return io_active_result;
308 }
309 else
310 {
311 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
312 return false;
313 }
314 }
315}
316
317//------------------------------------------------------------------------------
318
320{
321 XrdSysCondVarHelper _lck(m_state_cond);
322 m_detach_time_logged = false;
323}
324
326{
327 // Returns true if sync is required.
328 // This method is called after corresponding IO is detached from PosixCache.
329
330 XrdSysCondVarHelper _lck(m_state_cond);
331 if ( ! m_in_shutdown)
332 {
333 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
334 {
335 report_and_merge_delta_stats();
336 m_cfi.WriteIOStatDetach(m_stats);
337 m_detach_time_logged = true;
338 m_in_sync = true;
339 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
340 return true;
341 }
342 }
343 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
344 return false;
345}
346
347//------------------------------------------------------------------------------
348
350{
351 // Called from Cache::GetFile() when a new IO asks for the file.
352
353 TRACEF(Debug, "AddIO() io = " << (void*)io);
354
355 time_t now = time(0);
356 std::string loc(io->GetLocation());
357
358 m_state_cond.Lock();
359
360 IoSet_i mi = m_io_set.find(io);
361
362 if (mi == m_io_set.end())
363 {
364 m_io_set.insert(io);
365 io->m_attach_time = now;
366 m_delta_stats.IoAttach();
367
368 insert_remote_location(loc);
369
370 if (m_prefetch_state == kStopped)
371 {
372 m_prefetch_state = kOn;
373 cache()->RegisterPrefetchFile(this);
374 }
375 }
376 else
377 {
378 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
379 }
380
381 m_state_cond.UnLock();
382}
383
384//------------------------------------------------------------------------------
385
387{
388 // Called from Cache::ReleaseFile.
389
390 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
391
392 time_t now = time(0);
393
394 m_state_cond.Lock();
395
396 IoSet_i mi = m_io_set.find(io);
397
398 if (mi != m_io_set.end())
399 {
400 if (mi == m_current_io)
401 {
402 ++m_current_io;
403 }
404
405 m_delta_stats.IoDetach(now - io->m_attach_time);
406 m_io_set.erase(mi);
407 --m_ios_in_detach;
408
409 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
410 {
411 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
412 m_prefetch_state = kStopped;
413 cache()->DeRegisterPrefetchFile(this);
414 }
415 }
416 else
417 {
418 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
419 }
420
421 m_state_cond.UnLock();
422}
423
424//------------------------------------------------------------------------------
425
426bool File::Open(XrdOucCacheIO* inputIO)
427{
428 // Sets errno accordingly.
429
430 static const char *tpfx = "Open() ";
431
432 TRACEF(Dump, tpfx << "entered");
433
434 // Before touching anything, check with ResourceMonitor if a scan is in progress.
435 // This function will wait internally if needed until it is safe to proceed.
436 Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
437
439
440 XrdOss &myOss = * Cache::GetInstance().GetOss();
441 const char *myUser = conf.m_username.c_str();
442 XrdOucEnv myEnv;
443 struct stat data_stat, info_stat;
444
445 std::string ifn = m_filename + Info::s_infoExtension;
446
447 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
448 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
449
450 // Create the data file itself.
451 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
452 myEnv.Put("oss.asize", size_str);
453 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
454
455 int res;
456
457 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
458 {
459 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
460 errno = -res;
461 return false;
462 }
463
464 m_data_file = myOss.newFile(myUser);
465 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
466 {
467 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
468 errno = -res;
469 delete m_data_file; m_data_file = 0;
470 return false;
471 }
472
473 myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
474 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
475 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
476 {
477 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
478 errno = -res;
479 m_data_file->Close(); delete m_data_file; m_data_file = 0;
480 return false;
481 }
482
483 m_info_file = myOss.newFile(myUser);
484 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
485 {
486 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
487 errno = -res;
488 delete m_info_file; m_info_file = 0;
489 m_data_file->Close(); delete m_data_file; m_data_file = 0;
490 return false;
491 }
492
493 bool initialize_info_file = true;
494
495 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
496 {
497 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
498 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
499 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
500 ", block_size=" << (m_cfi.GetBufferSize() >> 10) << "k)");
501
502 // Check if data file exists and is of reasonable size.
503 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
504 {
505 initialize_info_file = false;
506 } else {
507 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
508 m_cfi.ResetAllAccessStats();
509 m_data_file->Ftruncate(0);
510 // data-file might not have existed at entry -- data_stat is then undefined
511 if (data_existed)
512 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
513 }
514 }
515
516 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
517 {
518 if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
519 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
520 {
521 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
522 initialize_info_file = true;
523 m_cfi.ResetAllAccessStats();
524 m_data_file->Ftruncate(0);
525 // data-file is known to exist due to checks in the previous if block
526 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
527 } else {
528 // TODO: If the file is complete, we don't need to reset net cksums.
529 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
530 }
531 }
532
533 // Check if we have pfc url arguments.
534 long long pfc_blocksize = conf.m_bufferSize;
535 int pfc_prefetch = conf.m_prefetch_max_blocks;
537 {
538 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
539 }
540
541 if (initialize_info_file)
542 {
543 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
544 m_cfi.SetCkSumState(conf.get_cs_Chk());
545 m_cfi.ResetNoCkSumTime();
546 m_cfi.Write(m_info_file, ifn.c_str());
547 m_info_file->Fsync();
548 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
549 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()
550 << " block size = " << pfc_blocksize);
551 }
552 else
553 {
554 if (futimens(m_info_file->getFD(), NULL)) {
555 TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
556 }
557 if (pfc_blocksize != conf.m_bufferSize) {
558 TRACEF(Info, tpfx << "URL CGI pfc.blocksize ignored for an already existing file");
559 }
560 }
561
562 m_cfi.WriteIOStatAttach();
563 m_state_cond.Lock();
564 m_block_size = m_cfi.GetBufferSize();
565 m_num_blocks = m_cfi.GetNBlocks();
566 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
567 m_prefetch_max_blocks_in_flight = pfc_prefetch;
568 if (pfc_prefetch != conf.m_prefetch_max_blocks)
569 TRACEF(Debug, tpfx << "pfc.prefetch set to " << pfc_prefetch << " via CGI parameter");
570
571 m_data_file->Fstat(&data_stat);
572 m_st_blocks = data_stat.st_blocks;
573
574 m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
575 constexpr long long MB = 1024 * 1024;
576 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
577 // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
578 // actual threshold based on return values from register_file_update_stats().
579
580 m_state_cond.UnLock();
581
582 return true;
583}
584
585void File::parse_pfc_url_args(XrdOucCacheIO* inputIO, long long &pfc_blocksize, int &pfc_prefetch) const
586{
587 const Configuration &conf = Cache::TheOne().RefConfiguration();
588
589 XrdCl::URL url(inputIO->Path());
590 auto const & urlp = url.GetParams();
591
592 auto extract = [&](const std::string &key, std::string &value) -> bool {
593 auto it = urlp.find(key);
594 if (it != urlp.end()) {
595 value = it->second;
596 return true;
597 } else {
598 value.clear();
599 return false;
600 }
601 };
602
603 std::string val;
604 if (conf.m_cgi_blocksize_allowed && extract("pfc.blocksize", val))
605 {
606 const char *tpfx = "File::Open::urlcgi pfc.blocksize ";
607 long long bsize;
608 if (Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
610 {
611 pfc_blocksize = bsize;
612 } else {
613 TRACEF(Error, tpfx << "Error processing the parameter.");
614 }
615 }
616 if (conf.m_cgi_prefetch_allowed && extract("pfc.prefetch", val))
617 {
618 const char *tpfx = "File::Open::urlcgi pfc.prefetch ";
619 int pref;
620 if (Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
622 {
623 pfc_prefetch = pref;
624 } else {
625 TRACEF(Error, tpfx << "Error processing the parameter.");
626 }
627 }
628}
629
630//------------------------------------------------------------------------------
631
632int File::Fstat(struct stat &sbuff)
633{
634 // Stat on an open file.
635 // Corrects size to actual full size of the file.
636 // Sets atime to 0 if the file is only partially downloaded, in accordance
637 // with pfc.onlyifcached settings.
638 // Called from IO::Fstat() and Cache::Stat() when the file is active.
639 // Returns 0 on success, -errno on error.
640
641 int res;
642
643 if ((res = m_data_file->Fstat(&sbuff))) return res;
644
645 sbuff.st_size = m_file_size;
646
647 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
648 if ( ! is_cached)
649 sbuff.st_atime = 0;
650
651 return 0;
652}
653
654//==============================================================================
655// Read and helpers
656//==============================================================================
657
658bool File::overlap(int blk, // block to query
659 long long blk_size, //
660 long long req_off, // offset of user request
661 int req_size, // size of user request
662 // output:
663 long long &off, // offset in user buffer
664 long long &blk_off, // offset in block
665 int &size) // size to copy
666{
667 const long long beg = blk * blk_size;
668 const long long end = beg + blk_size;
669 const long long req_end = req_off + req_size;
670
671 if (req_off < end && req_end > beg)
672 {
673 const long long ovlp_beg = std::max(beg, req_off);
674 const long long ovlp_end = std::min(end, req_end);
675
676 off = ovlp_beg - req_off;
677 blk_off = ovlp_beg - beg;
678 size = (int) (ovlp_end - ovlp_beg);
679
680 assert(size <= blk_size);
681 return true;
682 }
683 else
684 {
685 return false;
686 }
687}
688
689//------------------------------------------------------------------------------
690
691Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
692{
693 // Must be called w/ state_cond locked.
694 // Checks on size etc should be done before.
695 //
696 // Reference count is 0 so increase it in calling function if you want to
697 // catch the block while still in memory.
698
699 const long long off = i * m_block_size;
700 const int last_block = m_num_blocks - 1;
701 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
702
703 int blk_size, req_size;
704 if (i == last_block) {
705 blk_size = req_size = m_file_size - off;
706 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
707 } else {
708 blk_size = req_size = m_block_size;
709 }
710
711 Block *b = 0;
712 char *buf = cache()->RequestRAM(req_size);
713
714 if (buf)
715 {
716 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
717
718 if (b)
719 {
720 m_block_map[i] = b;
721
722 // Actual Read request is issued in ProcessBlockRequests().
723
724 if (m_prefetch_state == kOn && (int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
725 {
726 m_prefetch_state = kHold;
727 cache()->DeRegisterPrefetchFile(this);
728 }
729 }
730 else
731 {
732 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
733 }
734 }
735
736 return b;
737}
738
739void File::ProcessBlockRequest(Block *b)
740{
741 // This *must not* be called with block_map locked.
742
744
745 if (XRD_TRACE What >= TRACE_Dump) {
746 char buf[256];
747 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
748 b->get_offset()/m_block_size, (void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (void*)b->get_buff(), (void*)brh);
749 TRACEF(Dump, "ProcessBlockRequest() " << buf);
750 }
751
752 if (b->req_cksum_net())
753 {
754 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
755 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
756 } else {
757 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
758 }
759}
760
761void File::ProcessBlockRequests(BlockList_t& blks)
762{
763 // This *must not* be called with block_map locked.
764
765 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
766 {
767 ProcessBlockRequest(*bi);
768 }
769}
770
771//------------------------------------------------------------------------------
772
773void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
774{
775 int n_chunks = ioVec.size();
776 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
777
778 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
779 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
780
781 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
782
783 int pos = 0;
784 while (n_chunks > XrdProto::maxRvecsz) {
785 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
786 pos += XrdProto::maxRvecsz;
787 n_chunks -= XrdProto::maxRvecsz;
788 }
789 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
790}
791
792//------------------------------------------------------------------------------
793
794int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
795{
796 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
797
798 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
799
800 if (rs < 0)
801 {
802 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
803 return rs;
804 }
805
806 if (rs != expected_size)
807 {
808 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
809 return -EIO;
810 }
811
812 return (int) rs;
813}
814
815//------------------------------------------------------------------------------
816
817int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
818{
819 // rrc_func is ONLY called from async processing.
820 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
821 // This streamlines implementation of synchronous IO::Read().
822
823 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
824
825 m_state_cond.Lock();
826
827 if (m_in_shutdown || io->m_in_detach)
828 {
829 m_state_cond.UnLock();
830 return m_in_shutdown ? -ENOENT : -EBADF;
831 }
832
833 // Shortcut -- file is fully downloaded.
834
835 if (m_cfi.IsComplete())
836 {
837 m_state_cond.UnLock();
838 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
839 if (ret > 0) {
840 XrdSysCondVarHelper _lck(m_state_cond);
841 m_delta_stats.AddBytesHit(ret);
842 check_delta_stats();
843 }
844 return ret;
845 }
846
847 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
848
849 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
850}
851
852//------------------------------------------------------------------------------
853
854int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
855{
856 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
857
858 m_state_cond.Lock();
859
860 if (m_in_shutdown || io->m_in_detach)
861 {
862 m_state_cond.UnLock();
863 return m_in_shutdown ? -ENOENT : -EBADF;
864 }
865
866 // Shortcut -- file is fully downloaded.
867
868 if (m_cfi.IsComplete())
869 {
870 m_state_cond.UnLock();
871 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
872 if (ret > 0) {
873 XrdSysCondVarHelper _lck(m_state_cond);
874 m_delta_stats.AddBytesHit(ret);
875 check_delta_stats();
876 }
877 return ret;
878 }
879
880 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
881}
882
883//------------------------------------------------------------------------------
884
885int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
886 ReadReqRH *rh, const char *tpfx)
887{
888 // Non-trivial processing for Read and ReadV.
889 // Entered under lock.
890 //
891 // loop over reqired blocks:
892 // - if on disk, ok;
893 // - if in ram or incoming, inc ref-count
894 // - otherwise request and inc ref count (unless RAM full => request direct)
895 // unlock
896
897 int prefetch_cnt = 0;
898
899 ReadRequest *read_req = nullptr;
900 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
901
902 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
903
904 std::vector<XrdOucIOVec> iovec_disk;
905 std::vector<XrdOucIOVec> iovec_direct;
906 int iovec_disk_total = 0;
907 int iovec_direct_total = 0;
908
909 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
910 {
911 const XrdOucIOVec &iov = readV[iov_idx];
912 long long iUserOff = iov.offset;
913 int iUserSize = iov.size;
914 char *iUserBuff = iov.data;
915
916 const int idx_first = iUserOff / m_block_size;
917 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
918
919 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
920
921 enum LastBlock_e { LB_other, LB_disk, LB_direct };
922
923 LastBlock_e lbe = LB_other;
924
925 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
926 {
927 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
928 BlockMap_i bi = m_block_map.find(block_idx);
929
930 // overlap and read
931 long long off; // offset in user buffer
932 long long blk_off; // offset in block
933 int size; // size to copy
934
935 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
936
937 // In RAM or incoming?
938 if (bi != m_block_map.end())
939 {
940 inc_ref_count(bi->second);
941 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
942
943 if (bi->second->is_finished())
944 {
945 // note, blocks with error should not be here !!!
946 // they should be either removed or reissued in ProcessBlockResponse()
947 assert(bi->second->is_ok());
948
949 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
950
951 if (bi->second->m_prefetch)
952 ++prefetch_cnt;
953 }
954 else
955 {
956 if ( ! read_req)
957 read_req = new ReadRequest(io, rh);
958
959 // We have a lock on state_cond --> as we register the request before releasing the lock,
960 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
961
962 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
963 ++read_req->m_n_chunk_reqs;
964 }
965
966 lbe = LB_other;
967 }
968 // On disk?
969 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
970 {
971 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
972
973 if (lbe == LB_disk)
974 iovec_disk.back().size += size;
975 else
976 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
977 iovec_disk_total += size;
978
979 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
980 ++prefetch_cnt;
981
982 lbe = LB_disk;
983 }
984 // Neither ... then we have to go get it ...
985 else
986 {
987 if ( ! read_req)
988 read_req = new ReadRequest(io, rh);
989
990 // Is there room for one more RAM Block?
991 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
992 if (b)
993 {
994 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
995 inc_ref_count(b);
996 blks_to_request.push_back(b);
997
998 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
999 ++read_req->m_n_chunk_reqs;
1000
1001 lbe = LB_other;
1002 }
1003 else // Nope ... read this directly without caching.
1004 {
1005 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
1006
1007 iovec_direct_total += size;
1008 read_req->m_direct_done = false;
1009
1010 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
1011 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
1012 // is determined in the RequestBlocksDirect().
1013 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
1014 iovec_direct.back().size += size;
1015 } else {
1016 long long in_offset = block_idx * m_block_size + blk_off;
1017 char *out_pos = iUserBuff + off;
1018 while (size > XrdProto::maxRVdsz) {
1019 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
1020 in_offset += XrdProto::maxRVdsz;
1021 out_pos += XrdProto::maxRVdsz;
1022 size -= XrdProto::maxRVdsz;
1023 }
1024 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1025 }
1026
1027 lbe = LB_direct;
1028 }
1029 }
1030 } // end for over blocks in an IOVec
1031 } // end for over readV IOVec
1032
1033 inc_prefetch_hit_cnt(prefetch_cnt);
1034
1035 m_state_cond.UnLock();
1036
1037 // First, send out remote requests for new blocks.
1038 if ( ! blks_to_request.empty())
1039 {
1040 ProcessBlockRequests(blks_to_request);
1041 blks_to_request.clear();
1042 }
1043
1044 // Second, send out remote direct read requests.
1045 if ( ! iovec_direct.empty())
1046 {
1047 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1048
1049 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
1050 }
1051
1052 // Begin synchronous part where we process data that is already in RAM or on disk.
1053
1054 long long bytes_read = 0;
1055 int error_cond = 0; // to be set to -errno
1056
1057 // Third, process blocks that are available in RAM.
1058 if ( ! blks_ready.empty())
1059 {
1060 for (auto &bvi : blks_ready)
1061 {
1062 for (auto &cr : bvi.second)
1063 {
1064 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
1065 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1066 bytes_read += cr.m_size;
1067 }
1068 }
1069 }
1070
1071 // Fourth, read blocks from disk.
1072 if ( ! iovec_disk.empty())
1073 {
1074 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1075 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1076 if (rc >= 0)
1077 {
1078 bytes_read += rc;
1079 }
1080 else
1081 {
1082 error_cond = rc;
1083 TRACEF(Error, tpfx << "failed read from disk");
1084 }
1085 }
1086
1087 // End synchronous part -- update with sync stats and determine actual state of this read.
1088 // Note: remote reads might have already finished during disk-read!
1089
1090 m_state_cond.Lock();
1091
1092 for (auto &bvi : blks_ready)
1093 dec_ref_count(bvi.first, (int) bvi.second.size());
1094
1095 if (read_req)
1096 {
1097 read_req->m_bytes_read += bytes_read;
1098 if (error_cond)
1099 read_req->update_error_cond(error_cond);
1100 read_req->m_stats.m_BytesHit += bytes_read;
1101 read_req->m_sync_done = true;
1102
1103 if (read_req->is_complete())
1104 {
1105 // Almost like FinalizeReadRequest(read_req) -- but no callout!
1106 m_delta_stats.AddReadStats(read_req->m_stats);
1107 check_delta_stats();
1108 m_state_cond.UnLock();
1109
1110 int ret = read_req->return_value();
1111 delete read_req;
1112 return ret;
1113 }
1114 else
1115 {
1116 m_state_cond.UnLock();
1117 return -EWOULDBLOCK;
1118 }
1119 }
1120 else
1121 {
1122 m_delta_stats.m_BytesHit += bytes_read;
1123 check_delta_stats();
1124 m_state_cond.UnLock();
1125
1126 // !!! No callout.
1127
1128 return error_cond ? error_cond : bytes_read;
1129 }
1130}
1131
1132
1133//==============================================================================
1134// WriteBlock and Sync
1135//==============================================================================
1136
1138{
1139 // write block buffer into disk file
1140 long long offset = b->m_offset - m_offset;
1141 long long size = b->get_size();
1142 ssize_t retval;
1143
1144 if (m_cfi.IsCkSumCache())
1145 if (b->has_cksums())
1146 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1147 else
1148 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1149 else
1150 retval = m_data_file->Write(b->get_buff(), offset, size);
1151
1152 if (retval < size)
1153 {
1154 if (retval < 0) {
1155 TRACEF(Error, "WriteToDisk() write error " << retval);
1156 } else {
1157 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1158 }
1159
1160 XrdSysCondVarHelper _lck(m_state_cond);
1161
1162 dec_ref_count(b);
1163
1164 return;
1165 }
1166
1167 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1168
1169 // Set written bit.
1170 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1171
1172 bool schedule_sync = false;
1173 {
1174 XrdSysCondVarHelper _lck(m_state_cond);
1175
1176 m_cfi.SetBitWritten(blk_idx);
1177
1178 if (b->m_prefetch)
1179 {
1180 m_cfi.SetBitPrefetch(blk_idx);
1181 }
1182 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1183 {
1184 m_cfi.ResetCkSumNet();
1185 }
1186
1187 // Set synced bit or stash block index if in actual sync.
1188 // Synced state is only written out to cinfo file when data file is synced.
1189 if (m_in_sync)
1190 {
1191 m_writes_during_sync.push_back(blk_idx);
1192 }
1193 else
1194 {
1195 m_cfi.SetBitSynced(blk_idx);
1196 ++m_non_flushed_cnt;
1197 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1198 ! m_in_shutdown)
1199 {
1200 schedule_sync = true;
1201 m_in_sync = true;
1202 m_non_flushed_cnt = 0;
1203 }
1204 }
1205 // As soon as the reference count is decreased on the block, the
1206 // file object may be deleted. Thus, to avoid holding both locks at a time,
1207 // we defer the ref count decrease until later if a sync is needed
1208 if (!schedule_sync) {
1209 dec_ref_count(b);
1210 }
1211 }
1212
1213 if (schedule_sync)
1214 {
1215 cache()->ScheduleFileSync(this);
1216 XrdSysCondVarHelper _lck(m_state_cond);
1217 dec_ref_count(b);
1218 }
1219}
1220
1221//------------------------------------------------------------------------------
1222
1224{
1225 TRACEF(Dump, "Sync()");
1226
1227 int ret = m_data_file->Fsync();
1228 bool errorp = false;
1229 if (ret == XrdOssOK)
1230 {
1231 Stats loc_stats;
1232 {
1233 XrdSysCondVarHelper _lck(&m_state_cond);
1234 report_and_merge_delta_stats();
1235 loc_stats = m_stats;
1236 }
1237 m_cfi.WriteIOStat(loc_stats);
1238 m_cfi.Write(m_info_file, m_filename.c_str());
1239 int cret = m_info_file->Fsync();
1240 if (cret != XrdOssOK)
1241 {
1242 TRACEF(Error, "Sync cinfo file sync error " << cret);
1243 errorp = true;
1244 }
1245 }
1246 else
1247 {
1248 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1249 errorp = true;
1250 }
1251
1252 if (errorp)
1253 {
1254 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1255
1256 // Unlink will also call this->initiate_emergency_shutdown()
1257 Cache::GetInstance().UnlinkFile(m_filename, false);
1258
1259 XrdSysCondVarHelper _lck(&m_state_cond);
1260
1261 m_writes_during_sync.clear();
1262 m_in_sync = false;
1263
1264 return;
1265 }
1266
1267 int written_while_in_sync;
1268 bool resync = false;
1269 {
1270 XrdSysCondVarHelper _lck(&m_state_cond);
1271 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1272 {
1273 m_cfi.SetBitSynced(*i);
1274 }
1275 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1276 m_writes_during_sync.clear();
1277
1278 // If there were writes during sync and the file is now complete,
1279 // let us call Sync again without resetting the m_in_sync flag.
1280 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1281 resync = true;
1282 else
1283 m_in_sync = false;
1284 }
1285 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1286
1287 if (resync)
1288 Sync();
1289}
1290
1291
1292//==============================================================================
1293// Block processing
1294//==============================================================================
1295
1296void File::free_block(Block* b)
1297{
1298 // Method always called under lock.
1299 int i = b->m_offset / m_block_size;
1300 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1301 size_t ret = m_block_map.erase(i);
1302 if (ret != 1)
1303 {
1304 // assert might be a better option than a warning
1305 TRACEF(Error, "free_block did not erase " << i << " from map");
1306 }
1307 else
1308 {
1309 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1310 delete b;
1311 }
1312
1313 if (m_prefetch_state == kHold && (int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1314 {
1315 m_prefetch_state = kOn;
1316 cache()->RegisterPrefetchFile(this);
1317 }
1318}
1319
1320//------------------------------------------------------------------------------
1321
1322bool File::select_current_io_or_disable_prefetching(bool skip_current)
1323{
1324 // Method always called under lock. It also expects prefetch to be active.
1325
1326 int io_size = (int) m_io_set.size();
1327 bool io_ok = false;
1328
1329 if (io_size == 1)
1330 {
1331 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1332 if (io_ok)
1333 {
1334 m_current_io = m_io_set.begin();
1335 }
1336 }
1337 else if (io_size > 1)
1338 {
1339 IoSet_i mi = m_current_io;
1340 if (skip_current && mi != m_io_set.end()) ++mi;
1341
1342 for (int i = 0; i < io_size; ++i)
1343 {
1344 if (mi == m_io_set.end()) mi = m_io_set.begin();
1345
1346 if ((*mi)->m_allow_prefetching)
1347 {
1348 m_current_io = mi;
1349 io_ok = true;
1350 break;
1351 }
1352 ++mi;
1353 }
1354 }
1355
1356 if ( ! io_ok)
1357 {
1358 m_current_io = m_io_set.end();
1359 m_prefetch_state = kStopped;
1360 cache()->DeRegisterPrefetchFile(this);
1361 }
1362
1363 return io_ok;
1364}
1365
1366//------------------------------------------------------------------------------
1367
1368void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1369{
1370 // Called from DirectResponseHandler.
1371 // NOT under lock.
1372
1373 if (error_cond)
1374 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1375
1376 m_state_cond.Lock();
1377
1378 if (error_cond)
1379 rreq->update_error_cond(error_cond);
1380 else {
1381 rreq->m_stats.m_BytesBypassed += bytes_read;
1382 rreq->m_bytes_read += bytes_read;
1383 }
1384
1385 rreq->m_direct_done = true;
1386
1387 bool rreq_complete = rreq->is_complete();
1388
1389 m_state_cond.UnLock();
1390
1391 if (rreq_complete)
1392 FinalizeReadRequest(rreq);
1393}
1394
1395void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1396{
1397 // Called from ProcessBlockResponse().
1398 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1399 // Does not manage m_read_req.
1400 // Will not complete the request.
1401
1402 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1403 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1404
1405 rreq->update_error_cond(b->get_error());
1406 --rreq->m_n_chunk_reqs;
1407
1408 dec_ref_count(b);
1409}
1410
1411void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1412{
1413 // Called from ProcessBlockResponse().
1414 // NOT under lock as it does memcopy ofor exisf block data.
1415 // Acquires lock for block, m_read_req and rreq state update.
1416
1417 ReadRequest *rreq = creq.m_read_req;
1418
1419 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1420 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1421
1422 m_state_cond.Lock();
1423
1424 rreq->m_bytes_read += creq.m_size;
1425
1426 if (b->get_req_id() == (void*) rreq)
1427 rreq->m_stats.m_BytesMissed += creq.m_size;
1428 else
1429 rreq->m_stats.m_BytesHit += creq.m_size;
1430
1431 --rreq->m_n_chunk_reqs;
1432
1433 if (b->m_prefetch)
1434 inc_prefetch_hit_cnt(1);
1435
1436 dec_ref_count(b);
1437
1438 bool rreq_complete = rreq->is_complete();
1439
1440 m_state_cond.UnLock();
1441
1442 if (rreq_complete)
1443 FinalizeReadRequest(rreq);
1444}
1445
1446void File::FinalizeReadRequest(ReadRequest *rreq)
1447{
1448 // called from ProcessBlockResponse()
1449 // NOT under lock -- does callout
1450 {
1451 XrdSysCondVarHelper _lck(m_state_cond);
1452 m_delta_stats.AddReadStats(rreq->m_stats);
1453 check_delta_stats();
1454 }
1455
1456 rreq->m_rh->Done(rreq->return_value());
1457 delete rreq;
1458}
1459
1460void File::ProcessBlockResponse(Block *b, int res)
1461{
1462 static const char* tpfx = "ProcessBlockResponse ";
1463
1464 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1465
1466 if (res >= 0 && res != b->get_size())
1467 {
1468 // Incorrect number of bytes received, apparently size of the file on the remote
1469 // is different than what the cache expects it to be.
1470 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1471 Cache::GetInstance().UnlinkFile(m_filename, false);
1472 }
1473
1474 m_state_cond.Lock();
1475
1476 // Deregister block from IO's prefetch count, if needed.
1477 if (b->m_prefetch)
1478 {
1479 IO *io = b->get_io();
1480 IoSet_i mi = m_io_set.find(io);
1481 if (mi != m_io_set.end())
1482 {
1483 --io->m_active_prefetches;
1484
1485 // If failed and IO is still prefetching -- disable prefetching on this IO.
1486 if (res < 0 && io->m_allow_prefetching)
1487 {
1488 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1489 io->m_allow_prefetching = false;
1490
1491 // Check if any IO is still available for prfetching. If not, stop it.
1492 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1493 {
1494 if ( ! select_current_io_or_disable_prefetching(false) )
1495 {
1496 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1497 }
1498 }
1499 }
1500
1501 // If failed with no subscribers -- delete the block and exit.
1502 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1503 {
1504 free_block(b);
1505 m_state_cond.UnLock();
1506 return;
1507 }
1508 m_prefetch_bytes += b->get_size();
1509 }
1510 else
1511 {
1512 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1513 }
1514 }
1515
1516 if (res == b->get_size())
1517 {
1518 b->set_downloaded();
1519 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1520 if ( ! m_in_shutdown)
1521 {
1522 // Increase ref-count for the writer.
1523 inc_ref_count(b);
1524 m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1525 // No check for writes, report-and-merge forced during Sync().
1526 cache()->AddWriteTask(b, true);
1527 }
1528
1529 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1530 vChunkRequest_t creqs_to_notify;
1531 creqs_to_notify.swap( b->m_chunk_reqs );
1532
1533 m_state_cond.UnLock();
1534
1535 for (auto &creq : creqs_to_notify)
1536 {
1537 ProcessBlockSuccess(b, creq);
1538 }
1539 }
1540 else
1541 {
1542 if (res < 0) {
1543 bool new_error = b->get_io()->register_block_error(res);
1544 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1545 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1546 << ", io=" << b->get_io() << ", error=" << res);
1547 } else {
1548 bool first_p = b->get_io()->register_incomplete_read();
1549 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1550 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1551 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1552#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1553 res = -EIO;
1554#else
1555 res = -EREMOTEIO;
1556#endif
1557 }
1558 b->set_error(res);
1559
1560 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1561 // Collect others with a different IO, the first of them will be used to reissue the request.
1562 // This is then done outside of lock.
1563 std::list<ReadRequest*> rreqs_to_complete;
1564 vChunkRequest_t creqs_to_keep;
1565
1566 for(ChunkRequest &creq : b->m_chunk_reqs)
1567 {
1568 ReadRequest *rreq = creq.m_read_req;
1569
1570 if (rreq->m_io == b->get_io())
1571 {
1572 ProcessBlockError(b, rreq);
1573 if (rreq->is_complete())
1574 {
1575 rreqs_to_complete.push_back(rreq);
1576 }
1577 }
1578 else
1579 {
1580 creqs_to_keep.push_back(creq);
1581 }
1582 }
1583
1584 bool reissue = false;
1585 if ( ! creqs_to_keep.empty())
1586 {
1587 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1588
1589 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1590 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1591
1592 b->reset_error_and_set_io(rreq->m_io, rreq);
1593 b->m_chunk_reqs.swap( creqs_to_keep );
1594 reissue = true;
1595 }
1596
1597 m_state_cond.UnLock();
1598
1599 for (auto rreq : rreqs_to_complete)
1600 FinalizeReadRequest(rreq);
1601
1602 if (reissue)
1603 ProcessBlockRequest(b);
1604 }
1605}
1606
1607//------------------------------------------------------------------------------
1608
1609const char* File::lPath() const
1610{
1611 return m_filename.c_str();
1612}
1613
1614//------------------------------------------------------------------------------
1615
1616int File::offsetIdx(int iIdx) const
1617{
1618 return iIdx - m_offset/m_block_size;
1619}
1620
1621
1622//------------------------------------------------------------------------------
1623
1625{
1626 // Check that block is not on disk and not in RAM.
1627 // TODO: Could prefetch several blocks at once!
1628 // blks_max could be an argument
1629
1630 BlockList_t blks;
1631
1632 TRACEF(DumpXL, "Prefetch() entering.");
1633 {
1634 XrdSysCondVarHelper _lck(m_state_cond);
1635
1636 if (m_prefetch_state != kOn)
1637 {
1638 return;
1639 }
1640
1641 if ( ! select_current_io_or_disable_prefetching(true) )
1642 {
1643 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1644 return;
1645 }
1646
1647 // Select block(s) to fetch.
1648 for (int f = 0; f < m_num_blocks; ++f)
1649 {
1650 if ( ! m_cfi.TestBitWritten(f))
1651 {
1652 int f_act = f + m_offset / m_block_size;
1653
1654 BlockMap_i bi = m_block_map.find(f_act);
1655 if (bi == m_block_map.end())
1656 {
1657 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1658 if (b)
1659 {
1660 TRACEF(Dump, "Prefetch take block " << f_act);
1661 blks.push_back(b);
1662 // Note: block ref_cnt not increased, it will be when placed into write queue.
1663
1664 inc_prefetch_read_cnt(1);
1665 }
1666 else
1667 {
1668 // This shouldn't happen as prefetching stops when RAM is 70% full.
1669 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1670 }
1671 break;
1672 }
1673 }
1674 }
1675
1676 if (blks.empty())
1677 {
1678 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1679 m_prefetch_state = kComplete;
1680 cache()->DeRegisterPrefetchFile(this);
1681 }
1682 else
1683 {
1684 (*m_current_io)->m_active_prefetches += (int) blks.size();
1685 }
1686 }
1687
1688 if ( ! blks.empty())
1689 {
1690 ProcessBlockRequests(blks);
1691 }
1692}
1693
1694
1695//------------------------------------------------------------------------------
1696
1698{
1699 return m_prefetch_score;
1700}
1701
1703{
1704 return Cache::TheOne().GetLog();
1705}
1706
1708{
1709 return Cache::TheOne().GetTrace();
1710}
1711
1712void File::insert_remote_location(const std::string &loc)
1713{
1714 if ( ! loc.empty())
1715 {
1716 size_t p = loc.find_first_of('@');
1717 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1718 }
1719}
1720
1721std::string File::GetRemoteLocations() const
1722{
1723 std::string s;
1724 if ( ! m_remote_locations.empty())
1725 {
1726 size_t sl = 0;
1727 int nl = 0;
1728 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1729 {
1730 sl += i->size();
1731 }
1732 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1733 s = '[';
1734 int j = 1;
1735 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1736 {
1737 s += '"'; s += *i; s += '"';
1738 if (j < nl) s += ',';
1739 }
1740 s += ']';
1741 }
1742 else
1743 {
1744 s = "[]";
1745 }
1746 return s;
1747}
1748
1749//==============================================================================
1750//======================= RESPONSE HANDLERS ==============================
1751//==============================================================================
1752
1754{
1755 m_block->m_file->ProcessBlockResponse(m_block, res);
1756 delete this;
1757}
1758
1759//------------------------------------------------------------------------------
1760
1762{
1763 m_mutex.Lock();
1764
1765 int n_left = --m_to_wait;
1766
1767 if (res < 0) {
1768 if (m_errno == 0) m_errno = res; // store first reported error
1769 } else {
1770 m_bytes_read += res;
1771 }
1772
1773 m_mutex.UnLock();
1774
1775 if (n_left == 0)
1776 {
1777 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1778 delete this;
1779 }
1780}
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
#define XRD_TRACE
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:136
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:163
XrdSysError * GetLog() const
Definition XrdPfc.hh:294
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:215
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:295
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1188
static const Cache & TheOne()
Definition XrdPfc.cc:133
XrdOss * GetOss() const
Definition XrdPfc.hh:280
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
bool register_incomplete_read()
Definition XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
bool register_block_error(int res)
Definition XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70
const char * GetLocation()
Definition XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
XrdPosixStats Stats
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:115
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:116
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:80
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition XrdPfc.hh:119
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:73
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition XrdPfc.hh:112
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
long long m_bufferSize
cache block size, default 128 kB
Definition XrdPfc.hh:107
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:114
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:117
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition XrdPfc.hh:118
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68