1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
7 * Author: Andreas Dilger <adilger@clusterfs.com>
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
34 #include "llite_internal.h"
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
39 struct ll_file_data *fd;
41 OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
45 static void ll_file_data_put(struct ll_file_data *fd)
48 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
51 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
52 struct lustre_handle *fh)
54 op_data->op_fid1 = ll_i2info(inode)->lli_fid;
55 op_data->op_attr.ia_mode = inode->i_mode;
56 op_data->op_attr.ia_atime = inode->i_atime;
57 op_data->op_attr.ia_mtime = inode->i_mtime;
58 op_data->op_attr.ia_ctime = inode->i_ctime;
59 op_data->op_attr.ia_size = i_size_read(inode);
60 op_data->op_attr_blocks = inode->i_blocks;
61 ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
62 op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
63 memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
64 op_data->op_capa1 = ll_mdscapa_get(inode);
67 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
68 struct obd_client_handle *och)
72 op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
73 ATTR_MTIME_SET | ATTR_CTIME_SET;
75 if (!(och->och_flags & FMODE_WRITE))
78 if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
79 !S_ISREG(inode->i_mode))
80 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
82 ll_epoch_close(inode, op_data, &och, 0);
85 ll_pack_inode2opdata(inode, op_data, &och->och_fh);
89 static int ll_close_inode_openhandle(struct obd_export *md_exp,
91 struct obd_client_handle *och)
93 struct obd_export *exp = ll_i2mdexp(inode);
94 struct md_op_data *op_data;
95 struct ptlrpc_request *req = NULL;
96 struct obd_device *obd = class_exp2obd(exp);
103 * XXX: in case of LMV, is this correct to access
106 CERROR("Invalid MDC connection handle "LPX64"\n",
107 ll_i2mdexp(inode)->exp_handle.h_cookie);
112 * here we check if this is forced umount. If so this is called on
113 * canceling "open lock" and we do not call md_close() in this case, as
114 * it will not be successful, as import is already deactivated.
119 OBD_ALLOC_PTR(op_data);
121 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
123 ll_prepare_close(inode, op_data, och);
124 epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
125 rc = md_close(md_exp, op_data, och, &req);
128 /* This close must have the epoch closed. */
129 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
130 LASSERT(epoch_close);
131 /* MDS has instructed us to obtain Size-on-MDS attribute from
132 * OSTs and send setattr to back to MDS. */
133 rc = ll_sizeonmds_update(inode, &och->och_fh,
134 op_data->op_ioepoch);
136 CERROR("inode %lu mdc Size-on-MDS update failed: "
137 "rc = %d\n", inode->i_ino, rc);
141 CERROR("inode %lu mdc close failed: rc = %d\n",
144 ll_finish_md_op_data(op_data);
147 rc = ll_objects_destroy(req, inode);
149 CERROR("inode %lu ll_objects destroy: rc = %d\n",
153 ptlrpc_req_finished(req); /* This is close request */
157 if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
158 S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
159 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
161 md_clear_open_replay_data(md_exp, och);
162 /* Free @och if it is not waiting for DONE_WRITING. */
163 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
170 int ll_md_real_close(struct inode *inode, int flags)
172 struct ll_inode_info *lli = ll_i2info(inode);
173 struct obd_client_handle **och_p;
174 struct obd_client_handle *och;
179 if (flags & FMODE_WRITE) {
180 och_p = &lli->lli_mds_write_och;
181 och_usecount = &lli->lli_open_fd_write_count;
182 } else if (flags & FMODE_EXEC) {
183 och_p = &lli->lli_mds_exec_och;
184 och_usecount = &lli->lli_open_fd_exec_count;
186 LASSERT(flags & FMODE_READ);
187 och_p = &lli->lli_mds_read_och;
188 och_usecount = &lli->lli_open_fd_read_count;
191 down(&lli->lli_och_sem);
192 if (*och_usecount) { /* There are still users of this handle, so
194 up(&lli->lli_och_sem);
199 up(&lli->lli_och_sem);
201 if (och) { /* There might be a race and somebody have freed this och
203 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
210 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
213 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
214 struct ll_inode_info *lli = ll_i2info(inode);
218 /* clear group lock, if present */
219 if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
220 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
221 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
222 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
226 /* Let's see if we have good enough OPEN lock on the file and if
227 we can skip talking to MDS */
228 if (file->f_dentry->d_inode) { /* Can this ever be false? */
230 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
231 struct lustre_handle lockh;
232 struct inode *inode = file->f_dentry->d_inode;
233 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235 down(&lli->lli_och_sem);
236 if (fd->fd_omode & FMODE_WRITE) {
238 LASSERT(lli->lli_open_fd_write_count);
239 lli->lli_open_fd_write_count--;
240 } else if (fd->fd_omode & FMODE_EXEC) {
242 LASSERT(lli->lli_open_fd_exec_count);
243 lli->lli_open_fd_exec_count--;
246 LASSERT(lli->lli_open_fd_read_count);
247 lli->lli_open_fd_read_count--;
249 up(&lli->lli_och_sem);
251 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
252 LDLM_IBITS, &policy, lockmode,
254 rc = ll_md_real_close(file->f_dentry->d_inode,
258 CERROR("Releasing a file %p with negative dentry %p. Name %s",
259 file, file->f_dentry, file->f_dentry->d_name.name);
262 LUSTRE_FPRIVATE(file) = NULL;
263 ll_file_data_put(fd);
264 ll_capa_close(inode);
269 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271 /* While this returns an error code, fput() the caller does not, so we need
272 * to make every effort to clean up all of our state here. Also, applications
273 * rarely check close errors and even if an error is returned they will not
274 * re-try the close call.
276 int ll_file_release(struct inode *inode, struct file *file)
278 struct ll_file_data *fd;
279 struct ll_sb_info *sbi = ll_i2sbi(inode);
280 struct ll_inode_info *lli = ll_i2info(inode);
281 struct lov_stripe_md *lsm = lli->lli_smd;
285 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
286 inode->i_generation, inode);
288 /* don't do anything for / */
289 if (inode->i_sb->s_root == file->f_dentry)
292 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
293 fd = LUSTRE_FPRIVATE(file);
296 /* don't do anything for / */
297 if (inode->i_sb->s_root == file->f_dentry) {
298 LUSTRE_FPRIVATE(file) = NULL;
299 ll_file_data_put(fd);
304 lov_test_and_clear_async_rc(lsm);
305 lli->lli_async_rc = 0;
307 rc = ll_md_close(sbi->ll_md_exp, inode, file);
311 static int ll_intent_file_open(struct file *file, void *lmm,
312 int lmmsize, struct lookup_intent *itp)
314 struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
315 struct dentry *parent = file->f_dentry->d_parent;
316 const char *name = file->f_dentry->d_name.name;
317 const int len = file->f_dentry->d_name.len;
318 struct md_op_data *op_data;
319 struct ptlrpc_request *req;
325 /* Usually we come here only for NFSD, and we want open lock.
326 But we can also get here with pre 2.6.15 patchless kernels, and in
327 that case that lock is also ok */
328 /* We can also get here if there was cached open handle in revalidate_it
329 * but it disappeared while we were getting from there to ll_file_open.
330 * But this means this file was closed and immediatelly opened which
331 * makes a good candidate for using OPEN lock */
332 /* If lmmsize & lmm are not 0, we are just setting stripe info
333 * parameters. No need for the open lock */
334 if (!lmm && !lmmsize)
335 itp->it_flags |= MDS_OPEN_LOCK;
337 op_data = ll_prep_md_op_data(NULL, parent->d_inode,
338 file->f_dentry->d_inode, name, len,
339 O_RDWR, LUSTRE_OPC_ANY, NULL);
341 RETURN(PTR_ERR(op_data));
343 rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
344 0 /*unused */, &req, ll_md_blocking_ast, 0);
345 ll_finish_md_op_data(op_data);
347 /* reason for keep own exit path - don`t flood log
348 * with messages with -ESTALE errors.
350 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
351 it_open_error(DISP_OPEN_OPEN, itp))
353 ll_release_openhandle(file->f_dentry, itp);
357 if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
358 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
359 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
363 if (itp->d.lustre.it_lock_mode)
364 md_set_lock_data(sbi->ll_md_exp,
365 &itp->d.lustre.it_lock_handle,
366 file->f_dentry->d_inode);
368 rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
371 ptlrpc_req_finished(itp->d.lustre.it_data);
374 it_clear_disposition(itp, DISP_ENQ_COMPLETE);
375 ll_intent_drop_lock(itp);
380 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
381 struct lookup_intent *it, struct obd_client_handle *och)
383 struct ptlrpc_request *req = it->d.lustre.it_data;
384 struct mdt_body *body;
388 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
389 LASSERT(body != NULL); /* reply already checked out */
390 LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
392 memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
393 och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
394 och->och_fid = lli->lli_fid;
395 och->och_flags = it->it_flags;
396 lli->lli_ioepoch = body->ioepoch;
398 return md_set_open_replay_data(md_exp, och, req);
401 int ll_local_open(struct file *file, struct lookup_intent *it,
402 struct ll_file_data *fd, struct obd_client_handle *och)
404 struct inode *inode = file->f_dentry->d_inode;
405 struct ll_inode_info *lli = ll_i2info(inode);
408 LASSERT(!LUSTRE_FPRIVATE(file));
413 struct ptlrpc_request *req = it->d.lustre.it_data;
414 struct mdt_body *body;
417 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
421 body = lustre_msg_buf(req->rq_repmsg,
422 DLM_REPLY_REC_OFF, sizeof(*body));
424 if ((it->it_flags & FMODE_WRITE) &&
425 (body->valid & OBD_MD_FLSIZE))
427 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
428 lli->lli_ioepoch, PFID(&lli->lli_fid));
432 LUSTRE_FPRIVATE(file) = fd;
433 ll_readahead_init(inode, &fd->fd_ras);
434 fd->fd_omode = it->it_flags;
438 /* Open a file, and (for the very first open) create objects on the OSTs at
439 * this time. If opened with O_LOV_DELAY_CREATE, then we don't do the object
440 * creation or open until ll_lov_setstripe() ioctl is called. We grab
441 * lli_open_sem to ensure no other process will create objects, send the
442 * stripe MD to the MDS, or try to destroy the objects if that fails.
444 * If we already have the stripe MD locally then we don't request it in
445 * md_open(), by passing a lmm_size = 0.
447 * It is up to the application to ensure no other processes open this file
448 * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
449 * used. We might be able to avoid races of that sort by getting lli_open_sem
450 * before returning in the O_LOV_DELAY_CREATE case and dropping it here
451 * or in ll_file_release(), but I'm not sure that is desirable/necessary.
453 int ll_file_open(struct inode *inode, struct file *file)
455 struct ll_inode_info *lli = ll_i2info(inode);
456 struct lookup_intent *it, oit = { .it_op = IT_OPEN,
457 .it_flags = file->f_flags };
458 struct lov_stripe_md *lsm;
459 struct ptlrpc_request *req = NULL;
460 struct obd_client_handle **och_p;
462 struct ll_file_data *fd;
466 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
467 inode->i_generation, inode, file->f_flags);
469 /* don't do anything for / */
470 if (inode->i_sb->s_root == file->f_dentry)
473 #ifdef LUSTRE_KERNEL_VERSION
476 it = file->private_data; /* XXX: compat macro */
477 file->private_data = NULL; /* prevent ll_local_open assertion */
480 fd = ll_file_data_get();
484 /* don't do anything for / */
485 if (inode->i_sb->s_root == file->f_dentry) {
486 LUSTRE_FPRIVATE(file) = fd;
490 if (!it || !it->d.lustre.it_disposition) {
491 /* Convert f_flags into access mode. We cannot use file->f_mode,
492 * because everything but O_ACCMODE mask was stripped from
494 if ((oit.it_flags + 1) & O_ACCMODE)
496 if (file->f_flags & O_TRUNC)
497 oit.it_flags |= FMODE_WRITE;
499 /* kernel only call f_op->open in dentry_open. filp_open calls
500 * dentry_open after call to open_namei that checks permissions.
501 * Only nfsd_open call dentry_open directly without checking
502 * permissions and because of that this code below is safe. */
503 if (oit.it_flags & FMODE_WRITE)
504 oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
506 /* We do not want O_EXCL here, presumably we opened the file
507 * already? XXX - NFS implications? */
508 oit.it_flags &= ~O_EXCL;
513 /* Let's see if we have file open on MDS already. */
514 if (it->it_flags & FMODE_WRITE) {
515 och_p = &lli->lli_mds_write_och;
516 och_usecount = &lli->lli_open_fd_write_count;
517 } else if (it->it_flags & FMODE_EXEC) {
518 och_p = &lli->lli_mds_exec_och;
519 och_usecount = &lli->lli_open_fd_exec_count;
521 och_p = &lli->lli_mds_read_och;
522 och_usecount = &lli->lli_open_fd_read_count;
525 down(&lli->lli_och_sem);
526 if (*och_p) { /* Open handle is present */
527 if (it_disposition(it, DISP_OPEN_OPEN)) {
528 /* Well, there's extra open request that we do not need,
529 let's close it somehow. This will decref request. */
530 rc = it_open_error(DISP_OPEN_OPEN, it);
532 ll_file_data_put(fd);
533 GOTO(out_och_free, rc);
535 ll_release_openhandle(file->f_dentry, it);
536 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
541 rc = ll_local_open(file, it, fd, NULL);
543 up(&lli->lli_och_sem);
544 ll_file_data_put(fd);
548 LASSERT(*och_usecount == 0);
549 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
551 ll_file_data_put(fd);
552 GOTO(out_och_free, rc = -ENOMEM);
555 if (!it->d.lustre.it_disposition) {
556 it->it_flags |= O_CHECK_STALE;
557 rc = ll_intent_file_open(file, NULL, 0, it);
558 it->it_flags &= ~O_CHECK_STALE;
560 ll_file_data_put(fd);
561 GOTO(out_och_free, rc);
564 /* Got some error? Release the request */
565 if (it->d.lustre.it_status < 0) {
566 req = it->d.lustre.it_data;
567 ptlrpc_req_finished(req);
569 md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
570 &it->d.lustre.it_lock_handle,
571 file->f_dentry->d_inode);
573 req = it->d.lustre.it_data;
575 /* md_intent_lock() didn't get a request ref if there was an
576 * open error, so don't do cleanup on the request here
578 /* XXX (green): Should not we bail out on any error here, not
579 * just open error? */
580 rc = it_open_error(DISP_OPEN_OPEN, it);
582 ll_file_data_put(fd);
583 GOTO(out_och_free, rc);
586 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
587 rc = ll_local_open(file, it, fd, *och_p);
589 up(&lli->lli_och_sem);
590 ll_file_data_put(fd);
591 GOTO(out_och_free, rc);
594 up(&lli->lli_och_sem);
596 /* Must do this outside lli_och_sem lock to prevent deadlock where
597 different kind of OPEN lock for this same inode gets cancelled
598 by ldlm_cancel_lru */
599 if (!S_ISREG(inode->i_mode))
606 if (file->f_flags & O_LOV_DELAY_CREATE ||
607 !(file->f_mode & FMODE_WRITE)) {
608 CDEBUG(D_INODE, "object creation was delayed\n");
612 file->f_flags &= ~O_LOV_DELAY_CREATE;
615 ptlrpc_req_finished(req);
617 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
621 OBD_FREE(*och_p, sizeof (struct obd_client_handle));
622 *och_p = NULL; /* OBD_FREE writes some magic there */
625 up(&lli->lli_och_sem);
631 /* Fills the obdo with the attributes for the inode defined by lsm */
632 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
634 struct ptlrpc_request_set *set;
635 struct ll_inode_info *lli = ll_i2info(inode);
636 struct lov_stripe_md *lsm = lli->lli_smd;
638 struct obd_info oinfo = { { { 0 } } };
642 LASSERT(lsm != NULL);
646 oinfo.oi_oa->o_id = lsm->lsm_object_id;
647 oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
648 oinfo.oi_oa->o_mode = S_IFREG;
649 oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
650 OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
651 OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
652 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
654 oinfo.oi_capa = ll_mdscapa_get(inode);
656 set = ptlrpc_prep_set();
658 CERROR("can't allocate ptlrpc set\n");
661 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
663 rc = ptlrpc_set_wait(set);
664 ptlrpc_set_destroy(set);
666 capa_put(oinfo.oi_capa);
670 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
671 OBD_MD_FLATIME | OBD_MD_FLMTIME |
672 OBD_MD_FLCTIME | OBD_MD_FLSIZE);
674 obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
675 CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
676 lli->lli_smd->lsm_object_id, i_size_read(inode),
677 inode->i_blocks, inode->i_blksize);
681 static inline void ll_remove_suid(struct inode *inode)
685 /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
686 mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
688 /* was any of the uid bits set? */
689 mode &= inode->i_mode;
690 if (mode && !capable(CAP_FSETID)) {
691 inode->i_mode &= ~mode;
692 // XXX careful here - we cannot change the size
696 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
698 struct ll_inode_info *lli = ll_i2info(inode);
699 struct lov_stripe_md *lsm = lli->lli_smd;
700 struct obd_export *exp = ll_i2dtexp(inode);
703 struct ldlm_lock *lock;
704 struct lov_stripe_md *lsm;
705 } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
706 __u32 stripe, vallen = sizeof(stripe);
710 if (lsm->lsm_stripe_count == 1)
711 GOTO(check, stripe = 0);
713 /* get our offset in the lov */
714 rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
716 CERROR("obd_get_info: rc = %d\n", rc);
719 LASSERT(stripe < lsm->lsm_stripe_count);
722 if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
723 lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
724 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
725 lsm->lsm_oinfo[stripe]->loi_id,
726 lsm->lsm_oinfo[stripe]->loi_gr);
727 RETURN(-ELDLM_NO_LOCK_DATA);
733 /* Flush the page cache for an extent as its canceled. When we're on an LOV,
734 * we get a lock cancellation for each stripe, so we have to map the obd's
735 * region back onto the stripes in the file that it held.
737 * No one can dirty the extent until we've finished our work and they can
738 * enqueue another lock. The DLM protects us from ll_file_read/write here,
739 * but other kernel actors could have pages locked.
741 * Called with the DLM lock held. */
742 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
743 struct ldlm_lock *lock, __u32 stripe)
745 ldlm_policy_data_t tmpex;
746 unsigned long start, end, count, skip, i, j;
748 int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
749 struct lustre_handle lockh;
752 memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
753 CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
754 inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
757 /* our locks are page granular thanks to osc_enqueue, we invalidate the
759 if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
760 ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
761 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
763 LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
764 LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
768 start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
769 end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
770 if (lsm->lsm_stripe_count > 1) {
771 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
772 skip = (lsm->lsm_stripe_count - 1) * count;
773 start += start/count * skip + stripe * count;
775 end += end/count * skip + stripe * count;
777 if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
780 i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
785 CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
786 "count: %lu skip: %lu end: %lu%s\n", start, start % count,
787 count, skip, end, discard ? " (DISCARDING)" : "");
789 /* walk through the vmas on the inode and tear down mmaped pages that
790 * intersect with the lock. this stops immediately if there are no
791 * mmap()ed regions of the file. This is not efficient at all and
792 * should be short lived. We'll associate mmap()ed pages with the lock
793 * and will be able to find them directly */
794 for (i = start; i <= end; i += (j + skip)) {
795 j = min(count - (i % count), end - i + 1);
797 LASSERT(inode->i_mapping);
798 if (ll_teardown_mmaps(inode->i_mapping,
799 (__u64)i << CFS_PAGE_SHIFT,
800 ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
804 /* this is the simplistic implementation of page eviction at
805 * cancelation. It is careful to get races with other page
806 * lockers handled correctly. fixes from bug 20 will make it
807 * more efficient by associating locks with pages and with
808 * batching writeback under the lock explicitly. */
809 for (i = start, j = start % count; i <= end;
810 j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
812 CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
818 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
819 LPU64" >= "LPU64" start %lu i %lu end %lu\n",
820 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
823 if (!mapping_has_pages(inode->i_mapping)) {
824 CDEBUG(D_INODE|D_PAGE, "nothing left\n");
830 page = find_get_page(inode->i_mapping, i);
833 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
834 i, tmpex.l_extent.start);
837 /* page->mapping to check with racing against teardown */
838 if (!discard && clear_page_dirty_for_io(page)) {
839 rc = ll_call_writepage(inode, page);
841 CERROR("writepage inode %lu(%p) of page %p "
842 "failed: %d\n", inode->i_ino, inode,
844 /* either waiting for io to complete or reacquiring
845 * the lock that the failed writepage released */
849 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
850 /* check to see if another DLM lock covers this page b=2765 */
851 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
852 LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
854 &lock->l_resource->lr_name, LDLM_EXTENT,
855 &tmpex, LCK_PR | LCK_PW, &lockh);
857 if (rc2 <= 0 && page->mapping != NULL) {
858 struct ll_async_page *llap = llap_cast_private(page);
859 /* checking again to account for writeback's
861 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
863 ll_ra_accounting(llap, inode->i_mapping);
864 ll_truncate_complete_page(page);
867 page_cache_release(page);
869 LASSERTF(tmpex.l_extent.start <=
870 (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
871 lock->l_policy_data.l_extent.end + 1),
872 "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
873 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
878 static int ll_extent_lock_callback(struct ldlm_lock *lock,
879 struct ldlm_lock_desc *new, void *data,
882 struct lustre_handle lockh = { 0 };
886 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
887 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
892 case LDLM_CB_BLOCKING:
893 ldlm_lock2handle(lock, &lockh);
894 rc = ldlm_cli_cancel(&lockh);
896 CERROR("ldlm_cli_cancel failed: %d\n", rc);
898 case LDLM_CB_CANCELING: {
900 struct ll_inode_info *lli;
901 struct lov_stripe_md *lsm;
905 /* This lock wasn't granted, don't try to evict pages */
906 if (lock->l_req_mode != lock->l_granted_mode)
909 inode = ll_inode_from_lock(lock);
912 lli = ll_i2info(inode);
915 if (lli->lli_smd == NULL)
919 stripe = ll_lock_to_stripe_offset(inode, lock);
923 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
925 lov_stripe_lock(lsm);
926 lock_res_and_lock(lock);
927 kms = ldlm_extent_shift_kms(lock,
928 lsm->lsm_oinfo[stripe]->loi_kms);
930 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
931 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
932 lsm->lsm_oinfo[stripe]->loi_kms, kms);
933 lsm->lsm_oinfo[stripe]->loi_kms = kms;
934 unlock_res_and_lock(lock);
935 lov_stripe_unlock(lsm);
948 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
950 /* XXX ALLOCATE - 160 bytes */
951 struct inode *inode = ll_inode_from_lock(lock);
952 struct ll_inode_info *lli = ll_i2info(inode);
953 struct lustre_handle lockh = { 0 };
958 if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
959 LDLM_FL_BLOCK_CONV)) {
960 LBUG(); /* not expecting any blocked async locks yet */
961 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
963 ldlm_lock_dump(D_OTHER, lock, 0);
964 ldlm_reprocess_all(lock->l_resource);
968 LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
970 stripe = ll_lock_to_stripe_offset(inode, lock);
974 if (lock->l_lvb_len) {
975 struct lov_stripe_md *lsm = lli->lli_smd;
977 lvb = lock->l_lvb_data;
978 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
980 lock_res_and_lock(lock);
981 ll_inode_size_lock(inode, 1);
982 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
983 kms = ldlm_extent_shift_kms(NULL, kms);
984 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
985 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
986 lsm->lsm_oinfo[stripe].loi_kms, kms);
987 lsm->lsm_oinfo[stripe].loi_kms = kms;
988 ll_inode_size_unlock(inode, 1);
989 unlock_res_and_lock(lock);
994 wake_up(&lock->l_waitq);
996 ldlm_lock2handle(lock, &lockh);
997 ldlm_lock_decref(&lockh, LCK_PR);
1002 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1004 struct ptlrpc_request *req = reqp;
1005 struct inode *inode = ll_inode_from_lock(lock);
1006 struct ll_inode_info *lli;
1007 struct lov_stripe_md *lsm;
1008 struct ost_lvb *lvb;
1010 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
1014 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1015 lli = ll_i2info(inode);
1017 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1020 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1022 /* First, find out which stripe index this lock corresponds to. */
1023 stripe = ll_lock_to_stripe_offset(inode, lock);
1025 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1027 rc = lustre_pack_reply(req, 2, size, NULL);
1029 CERROR("lustre_pack_reply: %d\n", rc);
1033 lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
1034 lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1035 lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1036 lvb->lvb_atime = LTIME_S(inode->i_atime);
1037 lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1039 LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1040 " atime "LPU64", mtime "LPU64", ctime "LPU64,
1041 i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1042 lvb->lvb_atime, lvb->lvb_ctime);
1047 /* These errors are normal races, so we don't want to fill the console
1048 * with messages by calling ptlrpc_error() */
1049 if (rc == -ELDLM_NO_LOCK_DATA)
1050 lustre_pack_reply(req, 1, NULL, NULL);
1052 req->rq_status = rc;
1056 static void ll_merge_lvb(struct inode *inode)
1058 struct ll_inode_info *lli = ll_i2info(inode);
1059 struct ll_sb_info *sbi = ll_i2sbi(inode);
1063 ll_inode_size_lock(inode, 1);
1064 inode_init_lvb(inode, &lvb);
1065 obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1066 i_size_write(inode, lvb.lvb_size);
1067 inode->i_blocks = lvb.lvb_blocks;
1068 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1069 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1070 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1071 ll_inode_size_unlock(inode, 1);
1075 int ll_local_size(struct inode *inode)
1077 ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1078 struct ll_inode_info *lli = ll_i2info(inode);
1079 struct ll_sb_info *sbi = ll_i2sbi(inode);
1080 struct lustre_handle lockh = { 0 };
1085 if (lli->lli_smd->lsm_stripe_count == 0)
1088 rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1089 &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
1095 ll_merge_lvb(inode);
1096 obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
1100 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1103 struct lustre_handle lockh = { 0 };
1104 struct obd_enqueue_info einfo = { 0 };
1105 struct obd_info oinfo = { { { 0 } } };
1111 einfo.ei_type = LDLM_EXTENT;
1112 einfo.ei_mode = LCK_PR;
1113 einfo.ei_flags = LDLM_FL_HAS_INTENT;
1114 einfo.ei_cb_bl = ll_extent_lock_callback;
1115 einfo.ei_cb_cp = ldlm_completion_ast;
1116 einfo.ei_cb_gl = ll_glimpse_callback;
1117 einfo.ei_cbdata = NULL;
1119 oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1120 oinfo.oi_lockh = &lockh;
1123 rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1127 CERROR("obd_enqueue returned rc %d, "
1128 "returning -EIO\n", rc);
1129 RETURN(rc > 0 ? -EIO : rc);
1132 lov_stripe_lock(lsm);
1133 memset(&lvb, 0, sizeof(lvb));
1134 obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1135 st->st_size = lvb.lvb_size;
1136 st->st_blocks = lvb.lvb_blocks;
1137 st->st_mtime = lvb.lvb_mtime;
1138 st->st_atime = lvb.lvb_atime;
1139 st->st_ctime = lvb.lvb_ctime;
1140 lov_stripe_unlock(lsm);
1145 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1146 * file (because it prefers KMS over RSS when larger) */
1147 int ll_glimpse_size(struct inode *inode, int ast_flags)
1149 struct ll_inode_info *lli = ll_i2info(inode);
1150 struct ll_sb_info *sbi = ll_i2sbi(inode);
1151 struct lustre_handle lockh = { 0 };
1152 struct obd_enqueue_info einfo = { 0 };
1153 struct obd_info oinfo = { { { 0 } } };
1157 if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1160 CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1162 if (!lli->lli_smd) {
1163 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1167 /* NOTE: this looks like DLM lock request, but it may not be one. Due
1168 * to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1169 * won't revoke any conflicting DLM locks held. Instead,
1170 * ll_glimpse_callback() will be called on each client
1171 * holding a DLM lock against this file, and resulting size
1172 * will be returned for each stripe. DLM lock on [0, EOF] is
1173 * acquired only if there were no conflicting locks. */
1174 einfo.ei_type = LDLM_EXTENT;
1175 einfo.ei_mode = LCK_PR;
1176 einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1177 einfo.ei_cb_bl = ll_extent_lock_callback;
1178 einfo.ei_cb_cp = ldlm_completion_ast;
1179 einfo.ei_cb_gl = ll_glimpse_callback;
1180 einfo.ei_cbdata = inode;
1182 oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1183 oinfo.oi_lockh = &lockh;
1184 oinfo.oi_md = lli->lli_smd;
1186 rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1190 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1191 RETURN(rc > 0 ? -EIO : rc);
1194 ll_merge_lvb(inode);
1196 CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1197 i_size_read(inode), inode->i_blocks);
1202 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1203 struct lov_stripe_md *lsm, int mode,
1204 ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1207 struct ll_sb_info *sbi = ll_i2sbi(inode);
1209 struct obd_enqueue_info einfo = { 0 };
1210 struct obd_info oinfo = { { { 0 } } };
1214 LASSERT(!lustre_handle_is_used(lockh));
1215 LASSERT(lsm != NULL);
1217 /* don't drop the mmapped file to LRU */
1218 if (mapping_mapped(inode->i_mapping))
1219 ast_flags |= LDLM_FL_NO_LRU;
1221 /* XXX phil: can we do this? won't it screw the file size up? */
1222 if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1223 (sbi->ll_flags & LL_SBI_NOLCK))
1226 CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1227 inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1229 einfo.ei_type = LDLM_EXTENT;
1230 einfo.ei_mode = mode;
1231 einfo.ei_flags = ast_flags;
1232 einfo.ei_cb_bl = ll_extent_lock_callback;
1233 einfo.ei_cb_cp = ldlm_completion_ast;
1234 einfo.ei_cb_gl = ll_glimpse_callback;
1235 einfo.ei_cbdata = inode;
1237 oinfo.oi_policy = *policy;
1238 oinfo.oi_lockh = lockh;
1241 rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
1242 *policy = oinfo.oi_policy;
1246 ll_inode_size_lock(inode, 1);
1247 inode_init_lvb(inode, &lvb);
1248 obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1250 if (policy->l_extent.start == 0 &&
1251 policy->l_extent.end == OBD_OBJECT_EOF) {
1252 /* vmtruncate()->ll_truncate() first sets the i_size and then
1253 * the kms under both a DLM lock and the
1254 * ll_inode_size_lock(). If we don't get the
1255 * ll_inode_size_lock() here we can match the DLM lock and
1256 * reset i_size from the kms before the truncating path has
1257 * updated the kms. generic_file_write can then trust the
1258 * stale i_size when doing appending writes and effectively
1259 * cancel the result of the truncate. Getting the
1260 * ll_inode_size_lock() after the enqueue maintains the DLM
1261 * -> ll_inode_size_lock() acquiring order. */
1262 i_size_write(inode, lvb.lvb_size);
1263 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1264 inode->i_ino, i_size_read(inode));
1268 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1269 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1270 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1272 ll_inode_size_unlock(inode, 1);
1277 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1278 struct lov_stripe_md *lsm, int mode,
1279 struct lustre_handle *lockh)
1281 struct ll_sb_info *sbi = ll_i2sbi(inode);
1285 /* XXX phil: can we do this? won't it screw the file size up? */
1286 if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1287 (sbi->ll_flags & LL_SBI_NOLCK))
1290 rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1295 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1298 struct inode *inode = file->f_dentry->d_inode;
1299 struct ll_inode_info *lli = ll_i2info(inode);
1300 struct lov_stripe_md *lsm = lli->lli_smd;
1301 struct ll_sb_info *sbi = ll_i2sbi(inode);
1302 struct ll_lock_tree tree;
1303 struct ll_lock_tree_node *node;
1305 struct ll_ra_read bead;
1308 ssize_t retval, chunk, sum = 0;
1312 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1313 inode->i_ino, inode->i_generation, inode, count, *ppos);
1314 /* "If nbyte is 0, read() will return 0 and have no other results."
1315 * -- Single Unix Spec */
1319 ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1322 /* Read on file with no objects should return zero-filled
1323 * buffers up to file size (we can get non-zero sizes with
1324 * mknod + truncate, then opening file for read. This is a
1325 * common pattern in NFS case, it seems). Bug 6243 */
1327 /* Since there are no objects on OSTs, we have nothing to get
1328 * lock on and so we are forced to access inode->i_size
1331 /* Read beyond end of file */
1332 if (*ppos >= i_size_read(inode))
1335 if (count > i_size_read(inode) - *ppos)
1336 count = i_size_read(inode) - *ppos;
1337 /* Make sure to correctly adjust the file pos pointer for
1339 notzeroed = clear_user(buf, count);
1348 if (sbi->ll_max_rw_chunk != 0) {
1349 /* first, let's know the end of the current stripe */
1351 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1354 /* correct, the end is beyond the request */
1355 if (end > *ppos + count - 1)
1356 end = *ppos + count - 1;
1358 /* and chunk shouldn't be too large even if striping is wide */
1359 if (end - *ppos > sbi->ll_max_rw_chunk)
1360 end = *ppos + sbi->ll_max_rw_chunk - 1;
1362 end = *ppos + count - 1;
1365 node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1367 GOTO(out, retval = PTR_ERR(node));
1370 tree.lt_fd = LUSTRE_FPRIVATE(file);
1371 rc = ll_tree_lock(&tree, node, buf, count,
1372 file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1374 GOTO(out, retval = rc);
1376 ll_inode_size_lock(inode, 1);
1378 * Consistency guarantees: following possibilities exist for the
1379 * relation between region being read and real file size at this
1382 * (A): the region is completely inside of the file;
1384 * (B-x): x bytes of region are inside of the file, the rest is
1387 * (C): the region is completely outside of the file.
1389 * This classification is stable under DLM lock acquired by
1390 * ll_tree_lock() above, because to change class, other client has to
1391 * take DLM lock conflicting with our lock. Also, any updates to
1392 * ->i_size by other threads on this client are serialized by
1393 * ll_inode_size_lock(). This guarantees that short reads are handled
1394 * correctly in the face of concurrent writes and truncates.
1396 inode_init_lvb(inode, &lvb);
1397 obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1399 if (*ppos + count - 1 > kms) {
1400 /* A glimpse is necessary to determine whether we return a
1401 * short read (B) or some zeroes at the end of the buffer (C) */
1402 ll_inode_size_unlock(inode, 1);
1403 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1405 ll_tree_unlock(&tree);
1409 /* region is within kms and, hence, within real file size (A).
1410 * We need to increase i_size to cover the read region so that
1411 * generic_file_read() will do its job, but that doesn't mean
1412 * the kms size is _correct_, it is only the _minimum_ size.
1413 * If someone does a stat they will get the correct size which
1414 * will always be >= the kms value here. b=11081 */
1415 if (i_size_read(inode) < kms)
1416 i_size_write(inode, kms);
1417 ll_inode_size_unlock(inode, 1);
1420 chunk = end - *ppos + 1;
1421 CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1422 inode->i_ino, chunk, *ppos, i_size_read(inode));
1424 /* turn off the kernel's read-ahead */
1425 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1428 file->f_ra.ra_pages = 0;
1430 /* initialize read-ahead window once per syscall */
1433 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1434 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1435 ll_ra_read_in(file, &bead);
1439 file_accessed(file);
1440 retval = generic_file_read(file, buf, chunk, ppos);
1441 ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1443 ll_tree_unlock(&tree);
1449 if (retval == chunk && count > 0)
1455 ll_ra_read_ex(file, &bead);
1456 retval = (sum > 0) ? sum : retval;
1461 * Write to a file (through the page cache).
1463 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1466 struct inode *inode = file->f_dentry->d_inode;
1467 struct ll_sb_info *sbi = ll_i2sbi(inode);
1468 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1469 struct ll_lock_tree tree;
1470 struct ll_lock_tree_node *node;
1471 loff_t maxbytes = ll_file_maxbytes(inode);
1472 loff_t lock_start, lock_end, end;
1473 ssize_t retval, chunk, sum = 0;
1477 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1478 inode->i_ino, inode->i_generation, inode, count, *ppos);
1480 SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1482 /* POSIX, but surprised the VFS doesn't check this already */
1486 /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1487 * called on the file, don't fail the below assertion (bug 2388). */
1488 if (file->f_flags & O_LOV_DELAY_CREATE &&
1489 ll_i2info(inode)->lli_smd == NULL)
1492 LASSERT(ll_i2info(inode)->lli_smd != NULL);
1494 down(&ll_i2info(inode)->lli_write_sem);
1497 chunk = 0; /* just to fix gcc's warning */
1498 end = *ppos + count - 1;
1500 if (file->f_flags & O_APPEND) {
1502 lock_end = OBD_OBJECT_EOF;
1503 } else if (sbi->ll_max_rw_chunk != 0) {
1504 /* first, let's know the end of the current stripe */
1506 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1509 /* correct, the end is beyond the request */
1510 if (end > *ppos + count - 1)
1511 end = *ppos + count - 1;
1513 /* and chunk shouldn't be too large even if striping is wide */
1514 if (end - *ppos > sbi->ll_max_rw_chunk)
1515 end = *ppos + sbi->ll_max_rw_chunk - 1;
1520 lock_end = *ppos + count - 1;
1522 node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1525 GOTO(out, retval = PTR_ERR(node));
1527 tree.lt_fd = LUSTRE_FPRIVATE(file);
1528 rc = ll_tree_lock(&tree, node, buf, count,
1529 file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1531 GOTO(out, retval = rc);
1533 /* This is ok, g_f_w will overwrite this under i_sem if it races
1534 * with a local truncate, it just makes our maxbyte checking easier.
1535 * The i_size value gets updated in ll_extent_lock() as a consequence
1536 * of the [0,EOF] extent lock we requested above. */
1537 if (file->f_flags & O_APPEND) {
1538 *ppos = i_size_read(inode);
1539 end = *ppos + count - 1;
1542 if (*ppos >= maxbytes) {
1543 send_sig(SIGXFSZ, current, 0);
1544 GOTO(out_unlock, retval = -EFBIG);
1546 if (*ppos + count > maxbytes)
1547 count = maxbytes - *ppos;
1549 /* generic_file_write handles O_APPEND after getting i_mutex */
1550 chunk = end - *ppos + 1;
1551 CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1552 inode->i_ino, chunk, *ppos);
1553 retval = generic_file_write(file, buf, chunk, ppos);
1554 ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1557 ll_tree_unlock(&tree);
1564 if (retval == chunk && count > 0)
1568 up(&ll_i2info(inode)->lli_write_sem);
1570 retval = (sum > 0) ? sum : retval;
1571 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1572 retval > 0 ? retval : 0);
1577 * Send file content (through pagecache) somewhere with helper
1579 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1580 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1581 read_actor_t actor, void *target)
1583 struct inode *inode = in_file->f_dentry->d_inode;
1584 struct ll_inode_info *lli = ll_i2info(inode);
1585 struct lov_stripe_md *lsm = lli->lli_smd;
1586 struct ll_lock_tree tree;
1587 struct ll_lock_tree_node *node;
1589 struct ll_ra_read bead;
1594 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1595 inode->i_ino, inode->i_generation, inode, count, *ppos);
1597 /* "If nbyte is 0, read() will return 0 and have no other results."
1598 * -- Single Unix Spec */
1602 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1603 /* turn off the kernel's read-ahead */
1604 in_file->f_ra.ra_pages = 0;
1606 /* File with no objects, nothing to lock */
1608 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1610 node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1612 RETURN(PTR_ERR(node));
1614 tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1615 rc = ll_tree_lock(&tree, node, NULL, count,
1616 in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1620 ll_inode_size_lock(inode, 1);
1622 * Consistency guarantees: following possibilities exist for the
1623 * relation between region being read and real file size at this
1626 * (A): the region is completely inside of the file;
1628 * (B-x): x bytes of region are inside of the file, the rest is
1631 * (C): the region is completely outside of the file.
1633 * This classification is stable under DLM lock acquired by
1634 * ll_tree_lock() above, because to change class, other client has to
1635 * take DLM lock conflicting with our lock. Also, any updates to
1636 * ->i_size by other threads on this client are serialized by
1637 * ll_inode_size_lock(). This guarantees that short reads are handled
1638 * correctly in the face of concurrent writes and truncates.
1640 inode_init_lvb(inode, &lvb);
1641 obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1643 if (*ppos + count - 1 > kms) {
1644 /* A glimpse is necessary to determine whether we return a
1645 * short read (B) or some zeroes at the end of the buffer (C) */
1646 ll_inode_size_unlock(inode, 1);
1647 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1651 /* region is within kms and, hence, within real file size (A) */
1652 i_size_write(inode, kms);
1653 ll_inode_size_unlock(inode, 1);
1656 CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1657 inode->i_ino, count, *ppos, i_size_read(inode));
1659 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1660 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1661 ll_ra_read_in(in_file, &bead);
1663 file_accessed(in_file);
1664 retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1665 ll_ra_read_ex(in_file, &bead);
1668 ll_tree_unlock(&tree);
1673 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1676 struct ll_inode_info *lli = ll_i2info(inode);
1677 struct obd_export *exp = ll_i2dtexp(inode);
1678 struct ll_recreate_obj ucreatp;
1679 struct obd_trans_info oti = { 0 };
1680 struct obdo *oa = NULL;
1683 struct lov_stripe_md *lsm, *lsm2;
1686 if (!capable (CAP_SYS_ADMIN))
1689 rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1690 sizeof(struct ll_recreate_obj));
1698 down(&lli->lli_size_sem);
1701 GOTO(out, rc = -ENOENT);
1702 lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1703 (lsm->lsm_stripe_count));
1705 OBD_ALLOC(lsm2, lsm_size);
1707 GOTO(out, rc = -ENOMEM);
1709 oa->o_id = ucreatp.lrc_id;
1710 oa->o_gr = ucreatp.lrc_group;
1711 oa->o_nlink = ucreatp.lrc_ost_idx;
1712 oa->o_flags |= OBD_FL_RECREATE_OBJS;
1713 oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1714 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1715 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1717 oti.oti_objid = NULL;
1718 memcpy(lsm2, lsm, lsm_size);
1719 rc = obd_create(exp, oa, &lsm2, &oti);
1721 OBD_FREE(lsm2, lsm_size);
1724 up(&lli->lli_size_sem);
1729 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1730 int flags, struct lov_user_md *lum, int lum_size)
1732 struct ll_inode_info *lli = ll_i2info(inode);
1733 struct lov_stripe_md *lsm;
1734 struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1738 down(&lli->lli_size_sem);
1741 up(&lli->lli_size_sem);
1742 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1747 rc = ll_intent_file_open(file, lum, lum_size, &oit);
1750 if (it_disposition(&oit, DISP_LOOKUP_NEG))
1751 GOTO(out_req_free, rc = -ENOENT);
1752 rc = oit.d.lustre.it_status;
1754 GOTO(out_req_free, rc);
1756 ll_release_openhandle(file->f_dentry, &oit);
1759 up(&lli->lli_size_sem);
1760 ll_intent_release(&oit);
1763 ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1767 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1768 struct lov_mds_md **lmmp, int *lmm_size,
1769 struct ptlrpc_request **request)
1771 struct ll_sb_info *sbi = ll_i2sbi(inode);
1772 struct mdt_body *body;
1773 struct lov_mds_md *lmm = NULL;
1774 struct ptlrpc_request *req = NULL;
1775 struct obd_capa *oc;
1778 rc = ll_get_max_mdsize(sbi, &lmmsize);
1782 oc = ll_mdscapa_get(inode);
1783 rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1784 oc, filename, strlen(filename) + 1,
1785 OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
1788 CDEBUG(D_INFO, "md_getattr_name failed "
1789 "on %s: rc %d\n", filename, rc);
1793 body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
1794 LASSERT(body != NULL); /* checked by mdc_getattr_name */
1795 /* swabbed by mdc_getattr_name */
1796 LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1798 lmmsize = body->eadatasize;
1800 if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1802 GOTO(out, rc = -ENODATA);
1805 lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
1806 LASSERT(lmm != NULL);
1807 LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1810 * This is coming from the MDS, so is probably in
1811 * little endian. We convert it to host endian before
1812 * passing it to userspace.
1814 if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1815 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1816 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1817 } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1818 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1821 if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1822 struct lov_stripe_md *lsm;
1823 struct lov_user_md_join *lmj;
1824 int lmj_size, i, aindex = 0;
1826 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1828 GOTO(out, rc = -ENOMEM);
1829 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1831 GOTO(out_free_memmd, rc);
1833 lmj_size = sizeof(struct lov_user_md_join) +
1834 lsm->lsm_stripe_count *
1835 sizeof(struct lov_user_ost_data_join);
1836 OBD_ALLOC(lmj, lmj_size);
1838 GOTO(out_free_memmd, rc = -ENOMEM);
1840 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1841 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1842 struct lov_extent *lex =
1843 &lsm->lsm_array->lai_ext_array[aindex];
1845 if (lex->le_loi_idx + lex->le_stripe_count <= i)
1847 CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1848 LPU64" len %d\n", aindex, i,
1849 lex->le_start, (int)lex->le_len);
1850 lmj->lmm_objects[i].l_extent_start =
1853 if ((int)lex->le_len == -1)
1854 lmj->lmm_objects[i].l_extent_end = -1;
1856 lmj->lmm_objects[i].l_extent_end =
1857 lex->le_start + lex->le_len;
1858 lmj->lmm_objects[i].l_object_id =
1859 lsm->lsm_oinfo[i]->loi_id;
1860 lmj->lmm_objects[i].l_object_gr =
1861 lsm->lsm_oinfo[i]->loi_gr;
1862 lmj->lmm_objects[i].l_ost_gen =
1863 lsm->lsm_oinfo[i]->loi_ost_gen;
1864 lmj->lmm_objects[i].l_ost_idx =
1865 lsm->lsm_oinfo[i]->loi_ost_idx;
1867 lmm = (struct lov_mds_md *)lmj;
1870 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1874 *lmm_size = lmmsize;
1879 static int ll_lov_setea(struct inode *inode, struct file *file,
1882 int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1883 struct lov_user_md *lump;
1884 int lum_size = sizeof(struct lov_user_md) +
1885 sizeof(struct lov_user_ost_data);
1889 if (!capable (CAP_SYS_ADMIN))
1892 OBD_ALLOC(lump, lum_size);
1896 rc = copy_from_user(lump, (struct lov_user_md *)arg, lum_size);
1898 OBD_FREE(lump, lum_size);
1902 rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1904 OBD_FREE(lump, lum_size);
1908 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1911 struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1913 int flags = FMODE_WRITE;
1916 /* Bug 1152: copy properly when this is no longer true */
1917 LASSERT(sizeof(lum) == sizeof(*lump));
1918 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1919 rc = copy_from_user(&lum, lump, sizeof(lum));
1923 rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1925 put_user(0, &lump->lmm_stripe_count);
1926 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1927 0, ll_i2info(inode)->lli_smd, lump);
1932 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1934 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1939 return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1943 static int ll_get_grouplock(struct inode *inode, struct file *file,
1946 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1947 ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1948 .end = OBD_OBJECT_EOF}};
1949 struct lustre_handle lockh = { 0 };
1950 struct ll_inode_info *lli = ll_i2info(inode);
1951 struct lov_stripe_md *lsm = lli->lli_smd;
1955 if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1959 policy.l_extent.gid = arg;
1960 if (file->f_flags & O_NONBLOCK)
1961 flags = LDLM_FL_BLOCK_NOWAIT;
1963 rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1967 fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1969 memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1974 static int ll_put_grouplock(struct inode *inode, struct file *file,
1977 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1978 struct ll_inode_info *lli = ll_i2info(inode);
1979 struct lov_stripe_md *lsm = lli->lli_smd;
1983 if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1984 /* Ugh, it's already unlocked. */
1988 if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1991 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1993 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1998 memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2003 static int join_sanity_check(struct inode *head, struct inode *tail)
2006 if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2007 CERROR("server do not support join \n");
2010 if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2011 CERROR("tail ino %lu and ino head %lu must be regular\n",
2012 head->i_ino, tail->i_ino);
2015 if (head->i_ino == tail->i_ino) {
2016 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2019 if (i_size_read(head) % JOIN_FILE_ALIGN) {
2020 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2026 static int join_file(struct inode *head_inode, struct file *head_filp,
2027 struct file *tail_filp)
2029 struct dentry *tail_dentry = tail_filp->f_dentry;
2030 struct lookup_intent oit = {.it_op = IT_OPEN,
2031 .it_flags = head_filp->f_flags|O_JOIN_FILE};
2032 struct lustre_handle lockh;
2033 struct md_op_data *op_data;
2038 tail_dentry = tail_filp->f_dentry;
2040 data = i_size_read(head_inode);
2041 op_data = ll_prep_md_op_data(NULL, head_inode,
2042 tail_dentry->d_parent->d_inode,
2043 tail_dentry->d_name.name,
2044 tail_dentry->d_name.len, 0,
2045 LUSTRE_OPC_ANY, &data);
2046 if (IS_ERR(op_data))
2047 RETURN(PTR_ERR(op_data));
2049 rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
2050 op_data, &lockh, NULL, 0, ldlm_completion_ast,
2051 ll_md_blocking_ast, NULL, 0);
2053 ll_finish_md_op_data(op_data);
2057 rc = oit.d.lustre.it_status;
2059 if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2060 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2061 ptlrpc_req_finished((struct ptlrpc_request *)
2062 oit.d.lustre.it_data);
2066 if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2068 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2069 oit.d.lustre.it_lock_mode = 0;
2071 ll_release_openhandle(head_filp->f_dentry, &oit);
2073 ll_intent_release(&oit);
2077 static int ll_file_join(struct inode *head, struct file *filp,
2078 char *filename_tail)
2080 struct inode *tail = NULL, *first = NULL, *second = NULL;
2081 struct dentry *tail_dentry;
2082 struct file *tail_filp, *first_filp, *second_filp;
2083 struct ll_lock_tree first_tree, second_tree;
2084 struct ll_lock_tree_node *first_node, *second_node;
2085 struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2086 int rc = 0, cleanup_phase = 0;
2089 CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2090 head->i_ino, head->i_generation, head, filename_tail);
2092 tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2093 if (IS_ERR(tail_filp)) {
2094 CERROR("Can not open tail file %s", filename_tail);
2095 rc = PTR_ERR(tail_filp);
2098 tail = igrab(tail_filp->f_dentry->d_inode);
2100 tlli = ll_i2info(tail);
2101 tail_dentry = tail_filp->f_dentry;
2102 LASSERT(tail_dentry);
2105 /*reorder the inode for lock sequence*/
2106 first = head->i_ino > tail->i_ino ? head : tail;
2107 second = head->i_ino > tail->i_ino ? tail : head;
2108 first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2109 second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2111 CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2112 head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2113 first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2114 if (IS_ERR(first_node)){
2115 rc = PTR_ERR(first_node);
2118 first_tree.lt_fd = first_filp->private_data;
2119 rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2124 second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2125 if (IS_ERR(second_node)){
2126 rc = PTR_ERR(second_node);
2129 second_tree.lt_fd = second_filp->private_data;
2130 rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2135 rc = join_sanity_check(head, tail);
2139 rc = join_file(head, filp, tail_filp);
2143 switch (cleanup_phase) {
2145 ll_tree_unlock(&second_tree);
2146 obd_cancel_unused(ll_i2dtexp(second),
2147 ll_i2info(second)->lli_smd, 0, NULL);
2149 ll_tree_unlock(&first_tree);
2150 obd_cancel_unused(ll_i2dtexp(first),
2151 ll_i2info(first)->lli_smd, 0, NULL);
2153 filp_close(tail_filp, 0);
2156 if (head && rc == 0) {
2157 obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2159 hlli->lli_smd = NULL;
2164 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2170 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2172 struct inode *inode = dentry->d_inode;
2173 struct obd_client_handle *och;
2179 /* Root ? Do nothing. */
2180 if (dentry->d_inode->i_sb->s_root == dentry)
2183 /* No open handle to close? Move away */
2184 if (!it_disposition(it, DISP_OPEN_OPEN))
2187 LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2189 OBD_ALLOC(och, sizeof(*och));
2191 GOTO(out, rc = -ENOMEM);
2193 ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2194 ll_i2info(inode), it, och);
2196 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2199 /* this one is in place of ll_file_open */
2200 ptlrpc_req_finished(it->d.lustre.it_data);
2201 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2205 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2208 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2212 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2213 inode->i_generation, inode, cmd);
2214 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2216 /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2217 if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2221 case LL_IOC_GETFLAGS:
2222 /* Get the current value of the file flags */
2223 return put_user(fd->fd_flags, (int *)arg);
2224 case LL_IOC_SETFLAGS:
2225 case LL_IOC_CLRFLAGS:
2226 /* Set or clear specific file flags */
2227 /* XXX This probably needs checks to ensure the flags are
2228 * not abused, and to handle any flag side effects.
2230 if (get_user(flags, (int *) arg))
2233 if (cmd == LL_IOC_SETFLAGS) {
2234 if ((flags & LL_FILE_IGNORE_LOCK) &&
2235 !(file->f_flags & O_DIRECT)) {
2236 CERROR("%s: unable to disable locking on "
2237 "non-O_DIRECT file\n", current->comm);
2241 fd->fd_flags |= flags;
2243 fd->fd_flags &= ~flags;
2246 case LL_IOC_LOV_SETSTRIPE:
2247 RETURN(ll_lov_setstripe(inode, file, arg));
2248 case LL_IOC_LOV_SETEA:
2249 RETURN(ll_lov_setea(inode, file, arg));
2250 case LL_IOC_LOV_GETSTRIPE:
2251 RETURN(ll_lov_getstripe(inode, arg));
2252 case LL_IOC_RECREATE_OBJ:
2253 RETURN(ll_lov_recreate_obj(inode, file, arg));
2254 case EXT3_IOC_GETFLAGS:
2255 case EXT3_IOC_SETFLAGS:
2256 RETURN(ll_iocontrol(inode, file, cmd, arg));
2257 case EXT3_IOC_GETVERSION_OLD:
2258 case EXT3_IOC_GETVERSION:
2259 RETURN(put_user(inode->i_generation, (int *)arg));
2264 ftail = getname((const char *)arg);
2266 RETURN(PTR_ERR(ftail));
2267 rc = ll_file_join(inode, file, ftail);
2271 case LL_IOC_GROUP_LOCK:
2272 RETURN(ll_get_grouplock(inode, file, arg));
2273 case LL_IOC_GROUP_UNLOCK:
2274 RETURN(ll_put_grouplock(inode, file, arg));
2275 case IOC_OBD_STATFS:
2276 RETURN(ll_obd_statfs(inode, (void *)arg));
2278 /* We need to special case any other ioctls we want to handle,
2279 * to send them to the MDS/OST as appropriate and to properly
2280 * network encode the arg field.
2281 case EXT3_IOC_SETVERSION_OLD:
2282 case EXT3_IOC_SETVERSION:
2284 case LL_IOC_FLUSHCTX:
2285 RETURN(ll_flush_ctx(inode));
2286 case LL_IOC_GETFACL: {
2287 struct rmtacl_ioctl_data ioc;
2289 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2292 RETURN(ll_ioctl_getfacl(inode, &ioc));
2294 case LL_IOC_SETFACL: {
2295 struct rmtacl_ioctl_data ioc;
2297 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2300 RETURN(ll_ioctl_setfacl(inode, &ioc));
2303 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2308 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2310 struct inode *inode = file->f_dentry->d_inode;
2311 struct ll_inode_info *lli = ll_i2info(inode);
2312 struct lov_stripe_md *lsm = lli->lli_smd;
2315 retval = offset + ((origin == 2) ? i_size_read(inode) :
2316 (origin == 1) ? file->f_pos : 0);
2317 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2318 inode->i_ino, inode->i_generation, inode, retval, retval,
2319 origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2320 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2322 if (origin == 2) { /* SEEK_END */
2323 int nonblock = 0, rc;
2325 if (file->f_flags & O_NONBLOCK)
2326 nonblock = LDLM_FL_BLOCK_NOWAIT;
2329 rc = ll_glimpse_size(inode, nonblock);
2334 ll_inode_size_lock(inode, 0);
2335 offset += i_size_read(inode);
2336 ll_inode_size_unlock(inode, 0);
2337 } else if (origin == 1) { /* SEEK_CUR */
2338 offset += file->f_pos;
2342 if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2343 if (offset != file->f_pos) {
2344 file->f_pos = offset;
2345 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2347 file->f_version = ++event;
2356 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2358 struct inode *inode = dentry->d_inode;
2359 struct ll_inode_info *lli = ll_i2info(inode);
2360 struct lov_stripe_md *lsm = lli->lli_smd;
2361 struct ptlrpc_request *req;
2362 struct obd_capa *oc;
2365 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2366 inode->i_generation, inode);
2367 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2369 /* fsync's caller has already called _fdata{sync,write}, we want
2370 * that IO to finish before calling the osc and mdc sync methods */
2371 rc = filemap_fdatawait(inode->i_mapping);
2373 /* catch async errors that were recorded back when async writeback
2374 * failed for pages in this mapping. */
2375 err = lli->lli_async_rc;
2376 lli->lli_async_rc = 0;
2380 err = lov_test_and_clear_async_rc(lsm);
2385 oc = ll_mdscapa_get(inode);
2386 err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2392 ptlrpc_req_finished(req);
2399 RETURN(rc ? rc : -ENOMEM);
2401 oa->o_id = lsm->lsm_object_id;
2402 oa->o_gr = lsm->lsm_object_gr;
2403 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2404 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2405 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2408 oc = ll_osscapa_get(inode, 0, CAPA_OPC_OSS_WRITE);
2409 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2410 0, OBD_OBJECT_EOF, oc);
2420 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2422 struct inode *inode = file->f_dentry->d_inode;
2423 struct ll_sb_info *sbi = ll_i2sbi(inode);
2424 struct ldlm_res_id res_id =
2425 { .name = { fid_seq(ll_inode2fid(inode)),
2426 fid_oid(ll_inode2fid(inode)),
2427 fid_ver(ll_inode2fid(inode)),
2429 struct lustre_handle lockh = {0};
2430 ldlm_policy_data_t flock;
2431 ldlm_mode_t mode = 0;
2436 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2437 inode->i_ino, file_lock);
2439 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2441 if (file_lock->fl_flags & FL_FLOCK) {
2442 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2443 /* set missing params for flock() calls */
2444 file_lock->fl_end = OFFSET_MAX;
2445 file_lock->fl_pid = current->tgid;
2447 flock.l_flock.pid = file_lock->fl_pid;
2448 flock.l_flock.start = file_lock->fl_start;
2449 flock.l_flock.end = file_lock->fl_end;
2451 switch (file_lock->fl_type) {
2456 /* An unlock request may or may not have any relation to
2457 * existing locks so we may not be able to pass a lock handle
2458 * via a normal ldlm_lock_cancel() request. The request may even
2459 * unlock a byte range in the middle of an existing lock. In
2460 * order to process an unlock request we need all of the same
2461 * information that is given with a normal read or write record
2462 * lock request. To avoid creating another ldlm unlock (cancel)
2463 * message we'll treat a LCK_NL flock request as an unlock. */
2470 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2485 flags = LDLM_FL_BLOCK_NOWAIT;
2491 flags = LDLM_FL_TEST_LOCK;
2492 /* Save the old mode so that if the mode in the lock changes we
2493 * can decrement the appropriate reader or writer refcount. */
2494 file_lock->fl_type = mode;
2497 CERROR("unknown fcntl lock command: %d\n", cmd);
2501 CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2502 "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2503 flags, mode, flock.l_flock.start, flock.l_flock.end);
2505 rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id,
2506 LDLM_FLOCK, &flock, mode, &flags, NULL,
2507 ldlm_flock_completion_ast, NULL, file_lock,
2508 NULL, 0, NULL, &lockh, 0);
2509 if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2510 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2511 #ifdef HAVE_F_OP_FLOCK
2512 if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2513 !(flags & LDLM_FL_TEST_LOCK))
2514 posix_lock_file_wait(file, file_lock);
2520 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2527 int ll_have_md_lock(struct inode *inode, __u64 bits)
2529 struct lustre_handle lockh;
2530 ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2538 fid = &ll_i2info(inode)->lli_fid;
2539 CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2541 flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2542 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2543 LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2550 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2551 if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2552 * and return success */
2554 /* This path cannot be hit for regular files unless in
2555 * case of obscure races, so no need to to validate
2557 if (!S_ISREG(inode->i_mode) &&
2558 !S_ISDIR(inode->i_mode))
2563 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2571 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2573 struct inode *inode = dentry->d_inode;
2574 struct ptlrpc_request *req = NULL;
2575 struct ll_sb_info *sbi;
2576 struct obd_export *exp;
2581 CERROR("REPORT THIS LINE TO PETER\n");
2584 sbi = ll_i2sbi(inode);
2586 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2587 inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2588 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2589 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2592 exp = ll_i2mdexp(inode);
2594 if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2595 struct lookup_intent oit = { .it_op = IT_GETATTR };
2596 struct md_op_data *op_data;
2598 /* Call getattr by fid, so do not provide name at all. */
2599 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2600 dentry->d_inode, NULL, 0, 0,
2601 LUSTRE_OPC_ANY, NULL);
2602 if (IS_ERR(op_data))
2603 RETURN(PTR_ERR(op_data));
2605 oit.it_flags |= O_CHECK_STALE;
2606 rc = md_intent_lock(exp, op_data, NULL, 0,
2607 /* we are not interested in name
2610 ll_md_blocking_ast, 0);
2611 ll_finish_md_op_data(op_data);
2612 oit.it_flags &= ~O_CHECK_STALE;
2614 rc = ll_inode_revalidate_fini(inode, rc);
2618 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2620 ll_intent_release(&oit);
2624 /* Unlinked? Unhash dentry, so it is not picked up later by
2625 do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2626 here to preserve get_cwd functionality on 2.6.
2628 if (!dentry->d_inode->i_nlink) {
2629 spin_lock(&dcache_lock);
2630 ll_drop_dentry(dentry);
2631 spin_unlock(&dcache_lock);
2634 ll_lookup_finish_locks(&oit, dentry);
2635 } else if (!ll_have_md_lock(dentry->d_inode,
2636 MDS_INODELOCK_UPDATE)) {
2637 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2638 obd_valid valid = OBD_MD_FLGETATTR;
2639 struct obd_capa *oc;
2642 if (S_ISREG(inode->i_mode)) {
2643 rc = ll_get_max_mdsize(sbi, &ealen);
2646 valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2648 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2649 * capa for this inode. Because we only keep capas of dirs
2651 oc = ll_mdscapa_get(inode);
2652 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2656 rc = ll_inode_revalidate_fini(inode, rc);
2660 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2666 /* if object not yet allocated, don't validate size */
2667 if (ll_i2info(inode)->lli_smd == NULL)
2670 /* ll_glimpse_size will prefer locally cached writes if they extend
2672 rc = ll_glimpse_size(inode, 0);
2675 ptlrpc_req_finished(req);
2679 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2680 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2681 struct lookup_intent *it, struct kstat *stat)
2683 struct inode *inode = de->d_inode;
2686 res = ll_inode_revalidate_it(de, it);
2687 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2692 stat->dev = inode->i_sb->s_dev;
2693 stat->ino = inode->i_ino;
2694 stat->mode = inode->i_mode;
2695 stat->nlink = inode->i_nlink;
2696 stat->uid = inode->i_uid;
2697 stat->gid = inode->i_gid;
2698 stat->rdev = kdev_t_to_nr(inode->i_rdev);
2699 stat->atime = inode->i_atime;
2700 stat->mtime = inode->i_mtime;
2701 stat->ctime = inode->i_ctime;
2702 #ifdef HAVE_INODE_BLKSIZE
2703 stat->blksize = inode->i_blksize;
2705 stat->blksize = 1 << inode->i_blkbits;
2708 ll_inode_size_lock(inode, 0);
2709 stat->size = i_size_read(inode);
2710 stat->blocks = inode->i_blocks;
2711 ll_inode_size_unlock(inode, 0);
2715 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2717 struct lookup_intent it = { .it_op = IT_GETATTR };
2719 return ll_getattr_it(mnt, de, &it, stat);
2724 int lustre_check_acl(struct inode *inode, int mask)
2726 #ifdef CONFIG_FS_POSIX_ACL
2727 struct ll_inode_info *lli = ll_i2info(inode);
2728 struct posix_acl *acl;
2732 spin_lock(&lli->lli_lock);
2733 acl = posix_acl_dup(lli->lli_posix_acl);
2734 spin_unlock(&lli->lli_lock);
2739 rc = posix_acl_permission(inode, acl, mask);
2740 posix_acl_release(acl);
2748 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2749 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2751 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2752 inode->i_ino, inode->i_generation, inode, mask);
2753 if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2754 return lustre_check_remote_perm(inode, mask);
2756 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2757 return generic_permission(inode, mask, lustre_check_acl);
2760 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2761 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2763 int ll_inode_permission(struct inode *inode, int mask)
2766 int mode = inode->i_mode;
2769 CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2770 inode->i_ino, inode->i_generation, inode, mask);
2772 if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2773 return lustre_check_remote_perm(inode, mask);
2775 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2777 if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2778 (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2780 if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2782 if (current->fsuid == inode->i_uid) {
2785 if (((mode >> 3) & mask & S_IRWXO) != mask)
2787 rc = lustre_check_acl(inode, mask);
2791 goto check_capabilities;
2795 if (in_group_p(inode->i_gid))
2798 if ((mode & mask & S_IRWXO) == mask)
2802 if (!(mask & MAY_EXEC) ||
2803 (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2804 if (capable(CAP_DAC_OVERRIDE))
2807 if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2808 (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2815 /* -o localflock - only provides locally consistent flock locks */
2816 struct file_operations ll_file_operations = {
2817 .read = ll_file_read,
2818 .write = ll_file_write,
2819 .ioctl = ll_file_ioctl,
2820 .open = ll_file_open,
2821 .release = ll_file_release,
2822 .mmap = ll_file_mmap,
2823 .llseek = ll_file_seek,
2824 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2825 .sendfile = ll_file_sendfile,
2830 struct file_operations ll_file_operations_flock = {
2831 .read = ll_file_read,
2832 .write = ll_file_write,
2833 .ioctl = ll_file_ioctl,
2834 .open = ll_file_open,
2835 .release = ll_file_release,
2836 .mmap = ll_file_mmap,
2837 .llseek = ll_file_seek,
2838 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2839 .sendfile = ll_file_sendfile,
2842 #ifdef HAVE_F_OP_FLOCK
2843 .flock = ll_file_flock,
2845 .lock = ll_file_flock
2848 /* These are for -o noflock - to return ENOSYS on flock calls */
2849 struct file_operations ll_file_operations_noflock = {
2850 .read = ll_file_read,
2851 .write = ll_file_write,
2852 .ioctl = ll_file_ioctl,
2853 .open = ll_file_open,
2854 .release = ll_file_release,
2855 .mmap = ll_file_mmap,
2856 .llseek = ll_file_seek,
2857 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2858 .sendfile = ll_file_sendfile,
2861 #ifdef HAVE_F_OP_FLOCK
2862 .flock = ll_file_noflock,
2864 .lock = ll_file_noflock
2867 struct inode_operations ll_file_inode_operations = {
2868 #ifdef LUSTRE_KERNEL_VERSION
2869 .setattr_raw = ll_setattr_raw,
2871 .setattr = ll_setattr,
2872 .truncate = ll_truncate,
2873 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2874 .getattr = ll_getattr,
2876 .revalidate_it = ll_inode_revalidate_it,
2878 .permission = ll_inode_permission,
2879 .setxattr = ll_setxattr,
2880 .getxattr = ll_getxattr,
2881 .listxattr = ll_listxattr,
2882 .removexattr = ll_removexattr,