Whamcloud - gitweb
32360dc1e655973f14d46b7a28cfbb018d67cfcd
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #include "llite_internal.h"
32
33 /* also used by llite/special.c:ll_special_open() */
34 struct ll_file_data *ll_file_data_get(void)
35 {
36         struct ll_file_data *fd;
37
38         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
39         return fd;
40 }
41
42 static void ll_file_data_put(struct ll_file_data *fd)
43 {
44         if (fd != NULL)
45                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
46 }
47
48 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
49                           struct lustre_handle *fh)
50 {
51         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
52         op_data->op_attr.ia_mode = inode->i_mode;
53         op_data->op_attr.ia_atime = inode->i_atime;
54         op_data->op_attr.ia_mtime = inode->i_mtime;
55         op_data->op_attr.ia_ctime = inode->i_ctime;
56         op_data->op_attr.ia_size = i_size_read(inode);
57         op_data->op_attr_blocks = inode->i_blocks;
58         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
59         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
60         memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
61         op_data->op_capa1 = ll_mdscapa_get(inode);
62 }
63
64 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
65                              struct obd_client_handle *och)
66 {
67         ENTRY;
68
69         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
70                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
71
72         if (!(och->och_flags & FMODE_WRITE))
73                 goto out;
74
75         if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
76             !S_ISREG(inode->i_mode))
77                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
78         else
79                 ll_epoch_close(inode, op_data, &och, 0);
80
81 out:
82         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
83         EXIT;
84 }
85
86 static int ll_close_inode_openhandle(struct obd_export *md_exp,
87                                      struct inode *inode,
88                                      struct obd_client_handle *och)
89 {
90         struct obd_export *exp = ll_i2mdexp(inode);
91         struct md_op_data *op_data;
92         struct ptlrpc_request *req = NULL;
93         struct obd_device *obd = class_exp2obd(exp);
94         int epoch_close = 1;
95         int seq_end = 0, rc;
96         ENTRY;
97
98         if (obd == NULL) {
99                 /*
100                  * XXX: in case of LMV, is this correct to access
101                  * ->exp_handle?
102                  */
103                 CERROR("Invalid MDC connection handle "LPX64"\n",
104                        ll_i2mdexp(inode)->exp_handle.h_cookie);
105                 GOTO(out, rc = 0);
106         }
107
108         /*
109          * here we check if this is forced umount. If so this is called on
110          * canceling "open lock" and we do not call md_close() in this case, as
111          * it will not be successful, as import is already deactivated.
112          */
113         if (obd->obd_force)
114                 GOTO(out, rc = 0);
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
119
120         ll_prepare_close(inode, op_data, och);
121         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
122         rc = md_close(md_exp, op_data, och->och_mod, &req);
123         if (rc != -EAGAIN)
124                 seq_end = 1;
125
126         if (rc == -EAGAIN) {
127                 /* This close must have the epoch closed. */
128                 LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
129                 LASSERT(epoch_close);
130                 /* MDS has instructed us to obtain Size-on-MDS attribute from
131                  * OSTs and send setattr to back to MDS. */
132                 rc = ll_sizeonmds_update(inode, och->och_mod,
133                                          &och->och_fh, op_data->op_ioepoch);
134                 if (rc) {
135                         CERROR("inode %lu mdc Size-on-MDS update failed: "
136                                "rc = %d\n", inode->i_ino, rc);
137                         rc = 0;
138                 }
139         } else if (rc) {
140                 CERROR("inode %lu mdc close failed: rc = %d\n",
141                        inode->i_ino, rc);
142         }
143         ll_finish_md_op_data(op_data);
144
145         if (rc == 0) {
146                 rc = ll_objects_destroy(req, inode);
147                 if (rc)
148                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
149                                inode->i_ino, rc);
150         }
151
152         EXIT;
153 out:
154       
155         if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
156             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
157                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
158         } else {
159                 if (seq_end)
160                         ptlrpc_close_replay_seq(req);
161                 md_clear_open_replay_data(md_exp, och);
162                 /* Free @och if it is not waiting for DONE_WRITING. */
163                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
164                 OBD_FREE_PTR(och);
165         }
166         if (req) /* This is close request */
167                 ptlrpc_req_finished(req);
168         return rc;
169 }
170
171 int ll_md_real_close(struct inode *inode, int flags)
172 {
173         struct ll_inode_info *lli = ll_i2info(inode);
174         struct obd_client_handle **och_p;
175         struct obd_client_handle *och;
176         __u64 *och_usecount;
177         int rc = 0;
178         ENTRY;
179
180         if (flags & FMODE_WRITE) {
181                 och_p = &lli->lli_mds_write_och;
182                 och_usecount = &lli->lli_open_fd_write_count;
183         } else if (flags & FMODE_EXEC) {
184                 och_p = &lli->lli_mds_exec_och;
185                 och_usecount = &lli->lli_open_fd_exec_count;
186         } else {
187                 LASSERT(flags & FMODE_READ);
188                 och_p = &lli->lli_mds_read_och;
189                 och_usecount = &lli->lli_open_fd_read_count;
190         }
191
192         down(&lli->lli_och_sem);
193         if (*och_usecount) { /* There are still users of this handle, so
194                                 skip freeing it. */
195                 up(&lli->lli_och_sem);
196                 RETURN(0);
197         }
198         och=*och_p;
199         *och_p = NULL;
200         up(&lli->lli_och_sem);
201
202         if (och) { /* There might be a race and somebody have freed this och
203                       already */
204                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
205                                                inode, och);
206         }
207
208         RETURN(rc);
209 }
210
211 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
212                 struct file *file)
213 {
214         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
215         struct ll_inode_info *lli = ll_i2info(inode);
216         int rc = 0;
217         ENTRY;
218
219         /* clear group lock, if present */
220         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
221                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
222                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
223                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
224                                       &fd->fd_cwlockh);
225         }
226
227         /* Let's see if we have good enough OPEN lock on the file and if
228            we can skip talking to MDS */
229         if (file->f_dentry->d_inode) { /* Can this ever be false? */
230                 int lockmode;
231                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
232                 struct lustre_handle lockh;
233                 struct inode *inode = file->f_dentry->d_inode;
234                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
235
236                 down(&lli->lli_och_sem);
237                 if (fd->fd_omode & FMODE_WRITE) {
238                         lockmode = LCK_CW;
239                         LASSERT(lli->lli_open_fd_write_count);
240                         lli->lli_open_fd_write_count--;
241                 } else if (fd->fd_omode & FMODE_EXEC) {
242                         lockmode = LCK_PR;
243                         LASSERT(lli->lli_open_fd_exec_count);
244                         lli->lli_open_fd_exec_count--;
245                 } else {
246                         lockmode = LCK_CR;
247                         LASSERT(lli->lli_open_fd_read_count);
248                         lli->lli_open_fd_read_count--;
249                 }
250                 up(&lli->lli_och_sem);
251
252                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
253                                    LDLM_IBITS, &policy, lockmode,
254                                    &lockh)) {
255                         rc = ll_md_real_close(file->f_dentry->d_inode,
256                                               fd->fd_omode);
257                 }
258         } else {
259                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
260                        file, file->f_dentry, file->f_dentry->d_name.name);
261         }
262
263         LUSTRE_FPRIVATE(file) = NULL;
264         ll_file_data_put(fd);
265         ll_capa_close(inode);
266
267         RETURN(rc);
268 }
269
270 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
271
272 /* While this returns an error code, fput() the caller does not, so we need
273  * to make every effort to clean up all of our state here.  Also, applications
274  * rarely check close errors and even if an error is returned they will not
275  * re-try the close call.
276  */
277 int ll_file_release(struct inode *inode, struct file *file)
278 {
279         struct ll_file_data *fd;
280         struct ll_sb_info *sbi = ll_i2sbi(inode);
281         struct ll_inode_info *lli = ll_i2info(inode);
282         struct lov_stripe_md *lsm = lli->lli_smd;
283         int rc;
284
285         ENTRY;
286         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
287                inode->i_generation, inode);
288
289 #ifdef CONFIG_FS_POSIX_ACL
290         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
291             inode == inode->i_sb->s_root->d_inode) {
292                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
293
294                 LASSERT(fd != NULL);
295                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
296                         fd->fd_flags &= ~LL_FILE_RMTACL;
297                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
298                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
299                 }
300         }
301 #endif
302
303         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
304         fd = LUSTRE_FPRIVATE(file);
305         LASSERT(fd != NULL);
306
307         /* don't do anything for / */
308         if (inode->i_sb->s_root == file->f_dentry) {
309                 LUSTRE_FPRIVATE(file) = NULL;
310                 ll_file_data_put(fd);
311                 RETURN(0);
312         }
313         
314         if (lsm)
315                 lov_test_and_clear_async_rc(lsm);
316         lli->lli_async_rc = 0;
317
318         rc = ll_md_close(sbi->ll_md_exp, inode, file);
319         RETURN(rc);
320 }
321
322 static int ll_intent_file_open(struct file *file, void *lmm,
323                                int lmmsize, struct lookup_intent *itp)
324 {
325         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
326         struct dentry *parent = file->f_dentry->d_parent;
327         const char *name = file->f_dentry->d_name.name;
328         const int len = file->f_dentry->d_name.len;
329         struct md_op_data *op_data;
330         struct ptlrpc_request *req;
331         int rc;
332
333         if (!parent)
334                 RETURN(-ENOENT);
335
336         /* Usually we come here only for NFSD, and we want open lock.
337            But we can also get here with pre 2.6.15 patchless kernels, and in
338            that case that lock is also ok */
339         /* We can also get here if there was cached open handle in revalidate_it
340          * but it disappeared while we were getting from there to ll_file_open.
341          * But this means this file was closed and immediatelly opened which
342          * makes a good candidate for using OPEN lock */
343         /* If lmmsize & lmm are not 0, we are just setting stripe info
344          * parameters. No need for the open lock */
345         if (!lmm && !lmmsize)
346                 itp->it_flags |= MDS_OPEN_LOCK;
347
348         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
349                                       file->f_dentry->d_inode, name, len,
350                                       O_RDWR, LUSTRE_OPC_ANY, NULL);
351         if (IS_ERR(op_data))
352                 RETURN(PTR_ERR(op_data));
353
354         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
355                             0 /*unused */, &req, ll_md_blocking_ast, 0);
356         ll_finish_md_op_data(op_data);
357         if (rc == -ESTALE) {
358                 /* reason for keep own exit path - don`t flood log
359                 * with messages with -ESTALE errors.
360                 */
361                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
362                      it_open_error(DISP_OPEN_OPEN, itp))
363                         GOTO(out, rc);
364                 ll_release_openhandle(file->f_dentry, itp);
365                 GOTO(out_stale, rc);
366         }
367
368         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
369                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
370                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
371                 GOTO(out, rc);
372         }
373
374         if (itp->d.lustre.it_lock_mode)
375                 md_set_lock_data(sbi->ll_md_exp,
376                                  &itp->d.lustre.it_lock_handle, 
377                                  file->f_dentry->d_inode);
378
379         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
380 out:
381         ptlrpc_req_finished(itp->d.lustre.it_data);
382
383 out_stale:
384         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
385         ll_intent_drop_lock(itp);
386
387         RETURN(rc);
388 }
389
390 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
391                        struct lookup_intent *it, struct obd_client_handle *och)
392 {
393         struct ptlrpc_request *req = it->d.lustre.it_data;
394         struct mdt_body *body;
395
396         LASSERT(och);
397
398         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
399         LASSERT(body != NULL);                      /* reply already checked out */
400
401         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
402         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
403         och->och_fid = lli->lli_fid;
404         och->och_flags = it->it_flags;
405         lli->lli_ioepoch = body->ioepoch;
406
407         return md_set_open_replay_data(md_exp, och, req);
408 }
409
410 int ll_local_open(struct file *file, struct lookup_intent *it,
411                   struct ll_file_data *fd, struct obd_client_handle *och)
412 {
413         struct inode *inode = file->f_dentry->d_inode;
414         struct ll_inode_info *lli = ll_i2info(inode);
415         ENTRY;
416
417         LASSERT(!LUSTRE_FPRIVATE(file));
418
419         LASSERT(fd != NULL);
420
421         if (och) {
422                 struct ptlrpc_request *req = it->d.lustre.it_data;
423                 struct mdt_body *body;
424                 int rc;
425
426                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
427                 if (rc)
428                         RETURN(rc);
429
430                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
431                 if ((it->it_flags & FMODE_WRITE) &&
432                     (body->valid & OBD_MD_FLSIZE))
433                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
434                                lli->lli_ioepoch, PFID(&lli->lli_fid));
435         }
436
437         LUSTRE_FPRIVATE(file) = fd;
438         ll_readahead_init(inode, &fd->fd_ras);
439         fd->fd_omode = it->it_flags;
440         RETURN(0);
441 }
442
443 /* Open a file, and (for the very first open) create objects on the OSTs at
444  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
445  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
446  * lli_open_sem to ensure no other process will create objects, send the
447  * stripe MD to the MDS, or try to destroy the objects if that fails.
448  *
449  * If we already have the stripe MD locally then we don't request it in
450  * md_open(), by passing a lmm_size = 0.
451  *
452  * It is up to the application to ensure no other processes open this file
453  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
454  * used.  We might be able to avoid races of that sort by getting lli_open_sem
455  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
456  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
457  */
458 int ll_file_open(struct inode *inode, struct file *file)
459 {
460         struct ll_inode_info *lli = ll_i2info(inode);
461         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
462                                           .it_flags = file->f_flags };
463         struct lov_stripe_md *lsm;
464         struct ptlrpc_request *req = NULL;
465         struct obd_client_handle **och_p;
466         __u64 *och_usecount;
467         struct ll_file_data *fd;
468         int rc = 0;
469         ENTRY;
470
471         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
472                inode->i_generation, inode, file->f_flags);
473
474 #ifdef HAVE_VFS_INTENT_PATCHES
475         it = file->f_it;
476 #else
477         it = file->private_data; /* XXX: compat macro */
478         file->private_data = NULL; /* prevent ll_local_open assertion */
479 #endif
480
481         fd = ll_file_data_get();
482         if (fd == NULL)
483                 RETURN(-ENOMEM);
484
485         /* don't do anything for / */
486         if (inode->i_sb->s_root == file->f_dentry) {
487                 LUSTRE_FPRIVATE(file) = fd;
488                 RETURN(0);
489         }
490
491         if (!it || !it->d.lustre.it_disposition) {
492                 /* Convert f_flags into access mode. We cannot use file->f_mode,
493                  * because everything but O_ACCMODE mask was stripped from
494                  * there */
495                 if ((oit.it_flags + 1) & O_ACCMODE)
496                         oit.it_flags++;
497                 if (file->f_flags & O_TRUNC)
498                         oit.it_flags |= FMODE_WRITE;
499
500                 /* kernel only call f_op->open in dentry_open.  filp_open calls
501                  * dentry_open after call to open_namei that checks permissions.
502                  * Only nfsd_open call dentry_open directly without checking
503                  * permissions and because of that this code below is safe. */
504                 if (oit.it_flags & FMODE_WRITE)
505                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
506
507                 /* We do not want O_EXCL here, presumably we opened the file
508                  * already? XXX - NFS implications? */
509                 oit.it_flags &= ~O_EXCL;
510
511                 it = &oit;
512         }
513
514 restart:
515         /* Let's see if we have file open on MDS already. */
516         if (it->it_flags & FMODE_WRITE) {
517                 och_p = &lli->lli_mds_write_och;
518                 och_usecount = &lli->lli_open_fd_write_count;
519         } else if (it->it_flags & FMODE_EXEC) {
520                 och_p = &lli->lli_mds_exec_och;
521                 och_usecount = &lli->lli_open_fd_exec_count;
522          } else {
523                 och_p = &lli->lli_mds_read_och;
524                 och_usecount = &lli->lli_open_fd_read_count;
525         }
526         
527         down(&lli->lli_och_sem);
528         if (*och_p) { /* Open handle is present */
529                 if (it_disposition(it, DISP_OPEN_OPEN)) {
530                         /* Well, there's extra open request that we do not need,
531                            let's close it somehow. This will decref request. */
532                         rc = it_open_error(DISP_OPEN_OPEN, it);
533                         if (rc) {
534                                 ll_file_data_put(fd);
535                                 GOTO(out_och_free, rc);
536                         }       
537                         ll_release_openhandle(file->f_dentry, it);
538                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
539                                              LPROC_LL_OPEN);
540                 }
541                 (*och_usecount)++;
542
543                 rc = ll_local_open(file, it, fd, NULL);
544                 if (rc) {
545                         up(&lli->lli_och_sem);
546                         ll_file_data_put(fd);
547                         RETURN(rc);
548                 }
549         } else {
550                 LASSERT(*och_usecount == 0);
551                 if (!it->d.lustre.it_disposition) {
552                         /* We cannot just request lock handle now, new ELC code
553                            means that one of other OPEN locks for this file
554                            could be cancelled, and since blocking ast handler
555                            would attempt to grab och_sem as well, that would
556                            result in a deadlock */
557                         up(&lli->lli_och_sem);
558                         it->it_flags |= O_CHECK_STALE;
559                         rc = ll_intent_file_open(file, NULL, 0, it);
560                         it->it_flags &= ~O_CHECK_STALE;
561                         if (rc) {
562                                 ll_file_data_put(fd);
563                                 GOTO(out_openerr, rc);
564                         }
565
566                         /* Got some error? Release the request */
567                         if (it->d.lustre.it_status < 0) {
568                                 req = it->d.lustre.it_data;
569                                 ptlrpc_req_finished(req);
570                         }
571                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
572                                          &it->d.lustre.it_lock_handle,
573                                          file->f_dentry->d_inode);
574                         goto restart;
575                 }
576                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
577                 if (!*och_p) {
578                         ll_file_data_put(fd);
579                         GOTO(out_och_free, rc = -ENOMEM);
580                 }
581                 (*och_usecount)++;
582                 req = it->d.lustre.it_data;
583
584                 /* md_intent_lock() didn't get a request ref if there was an
585                  * open error, so don't do cleanup on the request here
586                  * (bug 3430) */
587                 /* XXX (green): Should not we bail out on any error here, not
588                  * just open error? */
589                 rc = it_open_error(DISP_OPEN_OPEN, it);
590                 if (rc) {
591                         ll_file_data_put(fd);
592                         GOTO(out_och_free, rc);
593                 }
594
595                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
596                 rc = ll_local_open(file, it, fd, *och_p);
597                 if (rc) {
598                         up(&lli->lli_och_sem);
599                         ll_file_data_put(fd);
600                         GOTO(out_och_free, rc);
601                 }
602         }
603         up(&lli->lli_och_sem);
604
605         /* Must do this outside lli_och_sem lock to prevent deadlock where
606            different kind of OPEN lock for this same inode gets cancelled
607            by ldlm_cancel_lru */
608         if (!S_ISREG(inode->i_mode))
609                 GOTO(out, rc);
610
611         ll_capa_open(inode);
612
613         lsm = lli->lli_smd;
614         if (lsm == NULL) {
615                 if (file->f_flags & O_LOV_DELAY_CREATE ||
616                     !(file->f_mode & FMODE_WRITE)) {
617                         CDEBUG(D_INODE, "object creation was delayed\n");
618                         GOTO(out, rc);
619                 }
620         }
621         file->f_flags &= ~O_LOV_DELAY_CREATE;
622         GOTO(out, rc);
623 out:
624         ptlrpc_req_finished(req);
625         if (req)
626                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
627 out_och_free:
628         if (rc) {
629                 if (*och_p) {
630                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
631                         *och_p = NULL; /* OBD_FREE writes some magic there */
632                         (*och_usecount)--;
633                 }
634                 up(&lli->lli_och_sem);
635 out_openerr: ;/* Looks weierd, eh? Just wait for statahead code to insert
636                 a statement here <-- remove this comment after statahead
637                 landing */
638         }
639
640         return rc;
641 }
642
643 /* Fills the obdo with the attributes for the inode defined by lsm */
644 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
645 {
646         struct ptlrpc_request_set *set;
647         struct ll_inode_info *lli = ll_i2info(inode);
648         struct lov_stripe_md *lsm = lli->lli_smd;
649
650         struct obd_info oinfo = { { { 0 } } };
651         int rc;
652         ENTRY;
653
654         LASSERT(lsm != NULL);
655
656         oinfo.oi_md = lsm;
657         oinfo.oi_oa = obdo;
658         oinfo.oi_oa->o_id = lsm->lsm_object_id;
659         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
660         oinfo.oi_oa->o_mode = S_IFREG;
661         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
662                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
663                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
664                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
665                                OBD_MD_FLGROUP;
666         oinfo.oi_capa = ll_mdscapa_get(inode);
667
668         set = ptlrpc_prep_set();
669         if (set == NULL) {
670                 CERROR("can't allocate ptlrpc set\n");
671                 rc = -ENOMEM;
672         } else {
673                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
674                 if (rc == 0)
675                         rc = ptlrpc_set_wait(set);
676                 ptlrpc_set_destroy(set);
677         }
678         capa_put(oinfo.oi_capa);
679         if (rc)
680                 RETURN(rc);
681
682         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
683                                  OBD_MD_FLATIME | OBD_MD_FLMTIME |
684                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
685
686         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
687         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
688                lli->lli_smd->lsm_object_id, i_size_read(inode),
689                (unsigned long long)inode->i_blocks, ll_inode_blksize(inode));
690         RETURN(0);
691 }
692
693 static inline void ll_remove_suid(struct inode *inode)
694 {
695         unsigned int mode;
696
697         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
698         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
699
700         /* was any of the uid bits set? */
701         mode &= inode->i_mode;
702         if (mode && !capable(CAP_FSETID)) {
703                 inode->i_mode &= ~mode;
704                 // XXX careful here - we cannot change the size
705         }
706 }
707
708 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
709 {
710         struct ll_inode_info *lli = ll_i2info(inode);
711         struct lov_stripe_md *lsm = lli->lli_smd;
712         struct obd_export *exp = ll_i2dtexp(inode);
713         struct {
714                 char name[16];
715                 struct ldlm_lock *lock;
716                 struct lov_stripe_md *lsm;
717         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
718         __u32 stripe, vallen = sizeof(stripe);
719         int rc;
720         ENTRY;
721
722         if (lsm->lsm_stripe_count == 1)
723                 GOTO(check, stripe = 0);
724
725         /* get our offset in the lov */
726         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
727         if (rc != 0) {
728                 CERROR("obd_get_info: rc = %d\n", rc);
729                 RETURN(rc);
730         }
731         LASSERT(stripe < lsm->lsm_stripe_count);
732
733 check:
734         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
735             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
736                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
737                            lsm->lsm_oinfo[stripe]->loi_id,
738                            lsm->lsm_oinfo[stripe]->loi_gr);
739                 RETURN(-ELDLM_NO_LOCK_DATA);
740         }
741
742         RETURN(stripe);
743 }
744
745 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
746  * we get a lock cancellation for each stripe, so we have to map the obd's
747  * region back onto the stripes in the file that it held.
748  *
749  * No one can dirty the extent until we've finished our work and they can
750  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
751  * but other kernel actors could have pages locked.
752  *
753  * Called with the DLM lock held. */
754 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
755                               struct ldlm_lock *lock, __u32 stripe)
756 {
757         ldlm_policy_data_t tmpex;
758         unsigned long start, end, count, skip, i, j;
759         struct page *page;
760         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
761         struct lustre_handle lockh;
762         struct address_space *mapping = inode->i_mapping;
763
764         ENTRY;
765         tmpex = lock->l_policy_data;
766         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
767                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
768                i_size_read(inode));
769
770         /* our locks are page granular thanks to osc_enqueue, we invalidate the
771          * whole page. */
772         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
773             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
774                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
775                            CFS_PAGE_SIZE);
776         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
777         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
778
779         count = ~0;
780         skip = 0;
781         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
782         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
783         if (lsm->lsm_stripe_count > 1) {
784                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
785                 skip = (lsm->lsm_stripe_count - 1) * count;
786                 start += start/count * skip + stripe * count;
787                 if (end != ~0)
788                         end += end/count * skip + stripe * count;
789         }
790         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
791                 end = ~0;
792
793         i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
794             CFS_PAGE_SHIFT : 0;
795         if (i < end)
796                 end = i;
797
798         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
799                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
800                count, skip, end, discard ? " (DISCARDING)" : "");
801
802         /* walk through the vmas on the inode and tear down mmaped pages that
803          * intersect with the lock.  this stops immediately if there are no
804          * mmap()ed regions of the file.  This is not efficient at all and
805          * should be short lived. We'll associate mmap()ed pages with the lock
806          * and will be able to find them directly */
807         for (i = start; i <= end; i += (j + skip)) {
808                 j = min(count - (i % count), end - i + 1);
809                 LASSERT(j > 0);
810                 LASSERT(mapping);
811                 if (ll_teardown_mmaps(mapping,
812                                       (__u64)i << CFS_PAGE_SHIFT,
813                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
814                         break;
815         }
816
817         /* this is the simplistic implementation of page eviction at
818          * cancelation.  It is careful to get races with other page
819          * lockers handled correctly.  fixes from bug 20 will make it
820          * more efficient by associating locks with pages and with
821          * batching writeback under the lock explicitly. */
822         for (i = start, j = start % count; i <= end;
823              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
824                 if (j == count) {
825                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
826                         i += skip;
827                         j = 0;
828                         if (i > end)
829                                 break;
830                 }
831                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
832                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
833                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
834                          start, i, end);
835
836                 if (!mapping_has_pages(mapping)) {
837                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
838                         break;
839                 }
840
841                 cond_resched();
842
843                 page = find_lock_page(mapping, i);
844                 if (page == NULL)
845                         continue;
846                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
847                                i, tmpex.l_extent.start);
848                 if (!discard && PageWriteback(page))
849                         wait_on_page_writeback(page);
850
851                 /* page->mapping to check with racing against teardown */
852                 if (!discard && clear_page_dirty_for_io(page)) {
853                         rc = ll_call_writepage(inode, page);
854                         /* either waiting for io to complete or reacquiring
855                          * the lock that the failed writepage released */
856                         lock_page(page);
857                         wait_on_page_writeback(page);
858                         if (rc < 0) {
859                                 CERROR("writepage inode %lu(%p) of page %p "
860                                        "failed: %d\n", inode->i_ino, inode,
861                                        page, rc);
862                                 if (rc == -ENOSPC)
863                                         set_bit(AS_ENOSPC, &mapping->flags);
864                                 else
865                                         set_bit(AS_EIO, &mapping->flags);
866                         }
867                 }
868
869                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
870                 /* check to see if another DLM lock covers this page b=2765 */
871                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
872                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
873                                       LDLM_FL_TEST_LOCK,
874                                       &lock->l_resource->lr_name, LDLM_EXTENT,
875                                       &tmpex, LCK_PR | LCK_PW, &lockh);
876
877                 if (rc2 <= 0 && page->mapping != NULL) {
878                         struct ll_async_page *llap = llap_cast_private(page);
879                         /* checking again to account for writeback's
880                          * lock_page() */
881                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
882                         if (llap)
883                                 ll_ra_accounting(llap, mapping);
884                         ll_truncate_complete_page(page);
885                 }
886                 unlock_page(page);
887                 page_cache_release(page);
888         }
889         LASSERTF(tmpex.l_extent.start <=
890                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
891                   lock->l_policy_data.l_extent.end + 1),
892                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
893                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
894                  start, i, end);
895         EXIT;
896 }
897
898 static int ll_extent_lock_callback(struct ldlm_lock *lock,
899                                    struct ldlm_lock_desc *new, void *data,
900                                    int flag)
901 {
902         struct lustre_handle lockh = { 0 };
903         int rc;
904         ENTRY;
905
906         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
907                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
908                 LBUG();
909         }
910
911         switch (flag) {
912         case LDLM_CB_BLOCKING:
913                 ldlm_lock2handle(lock, &lockh);
914                 rc = ldlm_cli_cancel(&lockh);
915                 if (rc != ELDLM_OK)
916                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
917                 break;
918         case LDLM_CB_CANCELING: {
919                 struct inode *inode;
920                 struct ll_inode_info *lli;
921                 struct lov_stripe_md *lsm;
922                 int stripe;
923                 __u64 kms;
924
925                 /* This lock wasn't granted, don't try to evict pages */
926                 if (lock->l_req_mode != lock->l_granted_mode)
927                         RETURN(0);
928
929                 inode = ll_inode_from_lock(lock);
930                 if (inode == NULL)
931                         RETURN(0);
932                 lli = ll_i2info(inode);
933                 if (lli == NULL)
934                         goto iput;
935                 if (lli->lli_smd == NULL)
936                         goto iput;
937                 lsm = lli->lli_smd;
938
939                 stripe = ll_lock_to_stripe_offset(inode, lock);
940                 if (stripe < 0)
941                         goto iput;
942
943                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
944
945                 lov_stripe_lock(lsm);
946                 lock_res_and_lock(lock);
947                 kms = ldlm_extent_shift_kms(lock,
948                                             lsm->lsm_oinfo[stripe]->loi_kms);
949
950                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
951                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
952                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
953                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
954                 unlock_res_and_lock(lock);
955                 lov_stripe_unlock(lsm);
956         iput:
957                 iput(inode);
958                 break;
959         }
960         default:
961                 LBUG();
962         }
963
964         RETURN(0);
965 }
966
967 #if 0
968 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
969 {
970         /* XXX ALLOCATE - 160 bytes */
971         struct inode *inode = ll_inode_from_lock(lock);
972         struct ll_inode_info *lli = ll_i2info(inode);
973         struct lustre_handle lockh = { 0 };
974         struct ost_lvb *lvb;
975         int stripe;
976         ENTRY;
977
978         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
979                      LDLM_FL_BLOCK_CONV)) {
980                 LBUG(); /* not expecting any blocked async locks yet */
981                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
982                            "lock, returning");
983                 ldlm_lock_dump(D_OTHER, lock, 0);
984                 ldlm_reprocess_all(lock->l_resource);
985                 RETURN(0);
986         }
987
988         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
989
990         stripe = ll_lock_to_stripe_offset(inode, lock);
991         if (stripe < 0)
992                 goto iput;
993
994         if (lock->l_lvb_len) {
995                 struct lov_stripe_md *lsm = lli->lli_smd;
996                 __u64 kms;
997                 lvb = lock->l_lvb_data;
998                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
999
1000                 lock_res_and_lock(lock);
1001                 ll_inode_size_lock(inode, 1);
1002                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
1003                 kms = ldlm_extent_shift_kms(NULL, kms);
1004                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
1005                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
1006                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
1007                 lsm->lsm_oinfo[stripe].loi_kms = kms;
1008                 ll_inode_size_unlock(inode, 1);
1009                 unlock_res_and_lock(lock);
1010         }
1011
1012 iput:
1013         iput(inode);
1014         wake_up(&lock->l_waitq);
1015
1016         ldlm_lock2handle(lock, &lockh);
1017         ldlm_lock_decref(&lockh, LCK_PR);
1018         RETURN(0);
1019 }
1020 #endif
1021
1022 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
1023 {
1024         struct ptlrpc_request *req = reqp;
1025         struct inode *inode = ll_inode_from_lock(lock);
1026         struct ll_inode_info *lli;
1027         struct lov_stripe_md *lsm;
1028         struct ost_lvb *lvb;
1029         int rc, stripe;
1030         ENTRY;
1031
1032         if (inode == NULL)
1033                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
1034         lli = ll_i2info(inode);
1035         if (lli == NULL)
1036                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1037         lsm = lli->lli_smd;
1038         if (lsm == NULL)
1039                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1040
1041         /* First, find out which stripe index this lock corresponds to. */
1042         stripe = ll_lock_to_stripe_offset(inode, lock);
1043         if (stripe < 0)
1044                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
1045
1046         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
1047         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1048                              sizeof(*lvb));
1049         rc = req_capsule_server_pack(&req->rq_pill);
1050         if (rc) {
1051                 CERROR("lustre_pack_reply: %d\n", rc);
1052                 GOTO(iput, rc);
1053         }
1054
1055         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
1056         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
1057         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
1058         lvb->lvb_atime = LTIME_S(inode->i_atime);
1059         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
1060
1061         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1062                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1063                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
1064                    lvb->lvb_atime, lvb->lvb_ctime);
1065  iput:
1066         iput(inode);
1067
1068  out:
1069         /* These errors are normal races, so we don't want to fill the console
1070          * with messages by calling ptlrpc_error() */
1071         if (rc == -ELDLM_NO_LOCK_DATA)
1072                 lustre_pack_reply(req, 1, NULL, NULL);
1073
1074         req->rq_status = rc;
1075         return rc;
1076 }
1077
1078 static int ll_merge_lvb(struct inode *inode)
1079 {
1080         struct ll_inode_info *lli = ll_i2info(inode);
1081         struct ll_sb_info *sbi = ll_i2sbi(inode);
1082         struct ost_lvb lvb;
1083         int rc;
1084
1085         ENTRY;
1086
1087         ll_inode_size_lock(inode, 1);
1088         inode_init_lvb(inode, &lvb);
1089         rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1090         i_size_write(inode, lvb.lvb_size);
1091         inode->i_blocks = lvb.lvb_blocks;
1092
1093         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1094         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1095         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1096         ll_inode_size_unlock(inode, 1);
1097
1098         RETURN(rc);
1099 }
1100
1101 int ll_local_size(struct inode *inode)
1102 {
1103         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1104         struct ll_inode_info *lli = ll_i2info(inode);
1105         struct ll_sb_info *sbi = ll_i2sbi(inode);
1106         struct lustre_handle lockh = { 0 };
1107         int flags = 0;
1108         int rc;
1109         ENTRY;
1110
1111         if (lli->lli_smd->lsm_stripe_count == 0)
1112                 RETURN(0);
1113
1114         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1115                        &policy, LCK_PR, &flags, inode, &lockh);
1116         if (rc < 0)
1117                 RETURN(rc);
1118         else if (rc == 0)
1119                 RETURN(-ENODATA);
1120
1121         rc = ll_merge_lvb(inode);
1122         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
1123         RETURN(rc);
1124 }
1125
1126 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1127                      lstat_t *st)
1128 {
1129         struct lustre_handle lockh = { 0 };
1130         struct ldlm_enqueue_info einfo = { 0 };
1131         struct obd_info oinfo = { { { 0 } } };
1132         struct ost_lvb lvb;
1133         int rc;
1134
1135         ENTRY;
1136
1137         einfo.ei_type = LDLM_EXTENT;
1138         einfo.ei_mode = LCK_PR;
1139         einfo.ei_cb_bl = ll_extent_lock_callback;
1140         einfo.ei_cb_cp = ldlm_completion_ast;
1141         einfo.ei_cb_gl = ll_glimpse_callback;
1142         einfo.ei_cbdata = NULL;
1143
1144         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1145         oinfo.oi_lockh = &lockh;
1146         oinfo.oi_md = lsm;
1147         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
1148
1149         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1150         if (rc == -ENOENT)
1151                 RETURN(rc);
1152         if (rc != 0) {
1153                 CERROR("obd_enqueue returned rc %d, "
1154                        "returning -EIO\n", rc);
1155                 RETURN(rc > 0 ? -EIO : rc);
1156         }
1157
1158         lov_stripe_lock(lsm);
1159         memset(&lvb, 0, sizeof(lvb));
1160         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1161         st->st_size = lvb.lvb_size;
1162         st->st_blocks = lvb.lvb_blocks;
1163         st->st_mtime = lvb.lvb_mtime;
1164         st->st_atime = lvb.lvb_atime;
1165         st->st_ctime = lvb.lvb_ctime;
1166         lov_stripe_unlock(lsm);
1167
1168         RETURN(rc);
1169 }
1170
1171 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1172  * file (because it prefers KMS over RSS when larger) */
1173 int ll_glimpse_size(struct inode *inode, int ast_flags)
1174 {
1175         struct ll_inode_info *lli = ll_i2info(inode);
1176         struct ll_sb_info *sbi = ll_i2sbi(inode);
1177         struct lustre_handle lockh = { 0 };
1178         struct ldlm_enqueue_info einfo = { 0 };
1179         struct obd_info oinfo = { { { 0 } } };
1180         int rc;
1181         ENTRY;
1182
1183         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1184                 RETURN(0);
1185
1186         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1187
1188         if (!lli->lli_smd) {
1189                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1190                 RETURN(0);
1191         }
1192
1193         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1194          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1195          *       won't revoke any conflicting DLM locks held. Instead,
1196          *       ll_glimpse_callback() will be called on each client
1197          *       holding a DLM lock against this file, and resulting size
1198          *       will be returned for each stripe. DLM lock on [0, EOF] is
1199          *       acquired only if there were no conflicting locks. */
1200         einfo.ei_type = LDLM_EXTENT;
1201         einfo.ei_mode = LCK_PR;
1202         einfo.ei_cb_bl = ll_extent_lock_callback;
1203         einfo.ei_cb_cp = ldlm_completion_ast;
1204         einfo.ei_cb_gl = ll_glimpse_callback;
1205         einfo.ei_cbdata = inode;
1206
1207         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1208         oinfo.oi_lockh = &lockh;
1209         oinfo.oi_md = lli->lli_smd;
1210         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1211
1212         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1213         if (rc == -ENOENT)
1214                 RETURN(rc);
1215         if (rc != 0) {
1216                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1217                 RETURN(rc > 0 ? -EIO : rc);
1218         }
1219
1220         rc = ll_merge_lvb(inode);
1221
1222         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1223                i_size_read(inode), (unsigned long long)inode->i_blocks);
1224
1225         RETURN(rc);
1226 }
1227
1228 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1229                    struct lov_stripe_md *lsm, int mode,
1230                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1231                    int ast_flags)
1232 {
1233         struct ll_sb_info *sbi = ll_i2sbi(inode);
1234         struct ost_lvb lvb;
1235         struct ldlm_enqueue_info einfo = { 0 };
1236         struct obd_info oinfo = { { { 0 } } };
1237         int rc;
1238         ENTRY;
1239
1240         LASSERT(!lustre_handle_is_used(lockh));
1241         LASSERT(lsm != NULL);
1242
1243         /* don't drop the mmapped file to LRU */
1244         if (mapping_mapped(inode->i_mapping))
1245                 ast_flags |= LDLM_FL_NO_LRU;
1246
1247         /* XXX phil: can we do this?  won't it screw the file size up? */
1248         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1249             (sbi->ll_flags & LL_SBI_NOLCK))
1250                 RETURN(0);
1251
1252         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1253                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1254
1255         einfo.ei_type = LDLM_EXTENT;
1256         einfo.ei_mode = mode;
1257         einfo.ei_cb_bl = ll_extent_lock_callback;
1258         einfo.ei_cb_cp = ldlm_completion_ast;
1259         einfo.ei_cb_gl = ll_glimpse_callback;
1260         einfo.ei_cbdata = inode;
1261
1262         oinfo.oi_policy = *policy;
1263         oinfo.oi_lockh = lockh;
1264         oinfo.oi_md = lsm;
1265         oinfo.oi_flags = ast_flags;
1266
1267         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
1268         *policy = oinfo.oi_policy;
1269         if (rc > 0)
1270                 rc = -EIO;
1271
1272         ll_inode_size_lock(inode, 1);
1273         inode_init_lvb(inode, &lvb);
1274         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1275
1276         if (policy->l_extent.start == 0 &&
1277             policy->l_extent.end == OBD_OBJECT_EOF) {
1278                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1279                  * the kms under both a DLM lock and the
1280                  * ll_inode_size_lock().  If we don't get the
1281                  * ll_inode_size_lock() here we can match the DLM lock and
1282                  * reset i_size from the kms before the truncating path has
1283                  * updated the kms.  generic_file_write can then trust the
1284                  * stale i_size when doing appending writes and effectively
1285                  * cancel the result of the truncate.  Getting the
1286                  * ll_inode_size_lock() after the enqueue maintains the DLM
1287                  * -> ll_inode_size_lock() acquiring order. */
1288                 i_size_write(inode, lvb.lvb_size);
1289                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1290                        inode->i_ino, i_size_read(inode));
1291         }
1292
1293         if (rc == 0) {
1294                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1295                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1296                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1297         }
1298         ll_inode_size_unlock(inode, 1);
1299
1300         RETURN(rc);
1301 }
1302
1303 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1304                      struct lov_stripe_md *lsm, int mode,
1305                      struct lustre_handle *lockh)
1306 {
1307         struct ll_sb_info *sbi = ll_i2sbi(inode);
1308         int rc;
1309         ENTRY;
1310
1311         /* XXX phil: can we do this?  won't it screw the file size up? */
1312         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1313             (sbi->ll_flags & LL_SBI_NOLCK))
1314                 RETURN(0);
1315
1316         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1317
1318         RETURN(rc);
1319 }
1320
1321 static void ll_set_file_contended(struct inode *inode)
1322 {
1323         struct ll_inode_info *lli = ll_i2info(inode);
1324         cfs_time_t now = cfs_time_current();
1325
1326         spin_lock(&lli->lli_lock);
1327         lli->lli_contention_time = now;
1328         lli->lli_flags |= LLIF_CONTENDED;
1329         spin_unlock(&lli->lli_lock);
1330 }
1331
1332 void ll_clear_file_contended(struct inode *inode)
1333 {
1334         struct ll_inode_info *lli = ll_i2info(inode);
1335
1336         spin_lock(&lli->lli_lock);
1337         lli->lli_flags &= ~LLIF_CONTENDED;
1338         spin_unlock(&lli->lli_lock);
1339 }
1340
1341 static int ll_is_file_contended(struct file *file)
1342 {
1343         struct inode *inode = file->f_dentry->d_inode;
1344         struct ll_inode_info *lli = ll_i2info(inode);
1345         struct ll_sb_info *sbi = ll_i2sbi(inode);
1346         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1347         ENTRY;
1348
1349         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1350                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1351                        " osc connect flags = 0x"LPX64"\n",
1352                        sbi->ll_lco.lco_flags);
1353                 RETURN(0);
1354         }
1355         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1356                 RETURN(1);
1357         if (lli->lli_flags & LLIF_CONTENDED) {
1358                 cfs_time_t cur_time = cfs_time_current();
1359                 cfs_time_t retry_time;
1360
1361                 retry_time = cfs_time_add(
1362                         lli->lli_contention_time,
1363                         cfs_time_seconds(sbi->ll_contention_time));
1364                 if (cfs_time_after(cur_time, retry_time)) {
1365                         ll_clear_file_contended(inode);
1366                         RETURN(0);
1367                 }
1368                 RETURN(1);
1369         }
1370         RETURN(0);
1371 }
1372
1373 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1374                                  const char *buf, size_t count,
1375                                  loff_t start, loff_t end, int rw)
1376 {
1377         int append;
1378         int tree_locked = 0;
1379         int rc;
1380         struct inode * inode = file->f_dentry->d_inode;
1381         ENTRY;
1382
1383         append = (rw == WRITE) && (file->f_flags & O_APPEND);
1384
1385         if (append || !ll_is_file_contended(file)) {
1386                 struct ll_lock_tree_node *node;
1387                 int ast_flags;
1388
1389                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1390                 if (file->f_flags & O_NONBLOCK)
1391                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1392                 node = ll_node_from_inode(inode, start, end,
1393                                           (rw == WRITE) ? LCK_PW : LCK_PR);
1394                 if (IS_ERR(node)) {
1395                         rc = PTR_ERR(node);
1396                         GOTO(out, rc);
1397                 }
1398                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1399                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1400                 if (rc == 0)
1401                         tree_locked = 1;
1402                 else if (rc == -EUSERS)
1403                         ll_set_file_contended(inode);
1404                 else
1405                         GOTO(out, rc);
1406         }
1407         RETURN(tree_locked);
1408 out:
1409         return rc;
1410 }
1411
1412 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1413                             loff_t *ppos)
1414 {
1415         struct inode *inode = file->f_dentry->d_inode;
1416         struct ll_inode_info *lli = ll_i2info(inode);
1417         struct lov_stripe_md *lsm = lli->lli_smd;
1418         struct ll_sb_info *sbi = ll_i2sbi(inode);
1419         struct ll_lock_tree tree;
1420         struct ost_lvb lvb;
1421         struct ll_ra_read bead;
1422         int ra = 0;
1423         loff_t end;
1424         ssize_t retval, chunk, sum = 0;
1425         int tree_locked;
1426
1427         __u64 kms;
1428         ENTRY;
1429         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1430                inode->i_ino, inode->i_generation, inode, count, *ppos);
1431         /* "If nbyte is 0, read() will return 0 and have no other results."
1432          *                      -- Single Unix Spec */
1433         if (count == 0)
1434                 RETURN(0);
1435
1436         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1437
1438         if (!lsm) {
1439                 /* Read on file with no objects should return zero-filled
1440                  * buffers up to file size (we can get non-zero sizes with
1441                  * mknod + truncate, then opening file for read. This is a
1442                  * common pattern in NFS case, it seems). Bug 6243 */
1443                 int notzeroed;
1444                 /* Since there are no objects on OSTs, we have nothing to get
1445                  * lock on and so we are forced to access inode->i_size
1446                  * unguarded */
1447
1448                 /* Read beyond end of file */
1449                 if (*ppos >= i_size_read(inode))
1450                         RETURN(0);
1451
1452                 if (count > i_size_read(inode) - *ppos)
1453                         count = i_size_read(inode) - *ppos;
1454                 /* Make sure to correctly adjust the file pos pointer for
1455                  * EFAULT case */
1456                 notzeroed = clear_user(buf, count);
1457                 count -= notzeroed;
1458                 *ppos += count;
1459                 if (!count)
1460                         RETURN(-EFAULT);
1461                 RETURN(count);
1462         }
1463 repeat:
1464         if (sbi->ll_max_rw_chunk != 0) {
1465                 /* first, let's know the end of the current stripe */
1466                 end = *ppos;
1467                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
1468                                 (obd_off *)&end);
1469
1470                 /* correct, the end is beyond the request */
1471                 if (end > *ppos + count - 1)
1472                         end = *ppos + count - 1;
1473
1474                 /* and chunk shouldn't be too large even if striping is wide */
1475                 if (end - *ppos > sbi->ll_max_rw_chunk)
1476                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1477         } else {
1478                 end = *ppos + count - 1;
1479         }
1480
1481         tree_locked = ll_file_get_tree_lock(&tree, file, buf,
1482                                             count, *ppos, end, READ);
1483         if (tree_locked < 0)
1484                 GOTO(out, retval = tree_locked);
1485
1486         ll_inode_size_lock(inode, 1);
1487         /*
1488          * Consistency guarantees: following possibilities exist for the
1489          * relation between region being read and real file size at this
1490          * moment:
1491          *
1492          *  (A): the region is completely inside of the file;
1493          *
1494          *  (B-x): x bytes of region are inside of the file, the rest is
1495          *  outside;
1496          *
1497          *  (C): the region is completely outside of the file.
1498          *
1499          * This classification is stable under DLM lock acquired by
1500          * ll_tree_lock() above, because to change class, other client has to
1501          * take DLM lock conflicting with our lock. Also, any updates to
1502          * ->i_size by other threads on this client are serialized by
1503          * ll_inode_size_lock(). This guarantees that short reads are handled
1504          * correctly in the face of concurrent writes and truncates.
1505          */
1506         inode_init_lvb(inode, &lvb);
1507         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
1508         kms = lvb.lvb_size;
1509         if (*ppos + count - 1 > kms) {
1510                 /* A glimpse is necessary to determine whether we return a
1511                  * short read (B) or some zeroes at the end of the buffer (C) */
1512                 ll_inode_size_unlock(inode, 1);
1513                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1514                 if (retval) {
1515                         if (tree_locked)
1516                                 ll_tree_unlock(&tree);
1517                         goto out;
1518                 }
1519         } else {
1520                 /* region is within kms and, hence, within real file size (A).
1521                  * We need to increase i_size to cover the read region so that
1522                  * generic_file_read() will do its job, but that doesn't mean
1523                  * the kms size is _correct_, it is only the _minimum_ size.
1524                  * If someone does a stat they will get the correct size which
1525                  * will always be >= the kms value here.  b=11081 */
1526                 if (i_size_read(inode) < kms)
1527                         i_size_write(inode, kms);
1528                 ll_inode_size_unlock(inode, 1);
1529         }
1530
1531         chunk = end - *ppos + 1;
1532         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1533                inode->i_ino, chunk, *ppos, i_size_read(inode));
1534
1535         if (tree_locked) {
1536                 /* turn off the kernel's read-ahead */
1537                 file->f_ra.ra_pages = 0;
1538
1539                 /* initialize read-ahead window once per syscall */
1540                 if (ra == 0) {
1541                         ra = 1;
1542                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1543                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1544                         ll_ra_read_in(file, &bead);
1545                 }
1546
1547                 /* BUG: 5972 */
1548                 file_accessed(file);
1549                 retval = generic_file_read(file, buf, chunk, ppos);
1550                 ll_tree_unlock(&tree);
1551         } else {
1552                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1553         }
1554
1555         ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
1556
1557         if (retval > 0) {
1558                 buf += retval;
1559                 count -= retval;
1560                 sum += retval;
1561                 if (retval == chunk && count > 0)
1562                         goto repeat;
1563         }
1564
1565  out:
1566         if (ra != 0)
1567                 ll_ra_read_ex(file, &bead);
1568         retval = (sum > 0) ? sum : retval;
1569         RETURN(retval);
1570 }
1571
1572 /*
1573  * Write to a file (through the page cache).
1574  */
1575 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1576                              loff_t *ppos)
1577 {
1578         struct inode *inode = file->f_dentry->d_inode;
1579         struct ll_sb_info *sbi = ll_i2sbi(inode);
1580         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1581         struct ll_lock_tree tree;
1582         loff_t maxbytes = ll_file_maxbytes(inode);
1583         loff_t lock_start, lock_end, end;
1584         ssize_t retval, chunk, sum = 0;
1585         int tree_locked;
1586         ENTRY;
1587
1588         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1589                inode->i_ino, inode->i_generation, inode, count, *ppos);
1590
1591         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1592
1593         /* POSIX, but surprised the VFS doesn't check this already */
1594         if (count == 0)
1595                 RETURN(0);
1596
1597         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1598          * called on the file, don't fail the below assertion (bug 2388). */
1599         if (file->f_flags & O_LOV_DELAY_CREATE &&
1600             ll_i2info(inode)->lli_smd == NULL)
1601                 RETURN(-EBADF);
1602
1603         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1604
1605         down(&ll_i2info(inode)->lli_write_sem);
1606
1607 repeat:
1608         chunk = 0; /* just to fix gcc's warning */
1609         end = *ppos + count - 1;
1610
1611         if (file->f_flags & O_APPEND) {
1612                 lock_start = 0;
1613                 lock_end = OBD_OBJECT_EOF;
1614         } else if (sbi->ll_max_rw_chunk != 0) {
1615                 /* first, let's know the end of the current stripe */
1616                 end = *ppos;
1617                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
1618                                 (obd_off *)&end);
1619
1620                 /* correct, the end is beyond the request */
1621                 if (end > *ppos + count - 1)
1622                         end = *ppos + count - 1;
1623
1624                 /* and chunk shouldn't be too large even if striping is wide */
1625                 if (end - *ppos > sbi->ll_max_rw_chunk)
1626                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1627                 lock_start = *ppos;
1628                 lock_end = end;
1629         } else {
1630                 lock_start = *ppos;
1631                 lock_end = *ppos + count - 1;
1632         }
1633
1634         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1635                                             lock_start, lock_end, WRITE);
1636         if (tree_locked < 0)
1637                 GOTO(out, retval = tree_locked);
1638
1639         /* This is ok, g_f_w will overwrite this under i_sem if it races
1640          * with a local truncate, it just makes our maxbyte checking easier.
1641          * The i_size value gets updated in ll_extent_lock() as a consequence
1642          * of the [0,EOF] extent lock we requested above. */
1643         if (file->f_flags & O_APPEND) {
1644                 *ppos = i_size_read(inode);
1645                 end = *ppos + count - 1;
1646         }
1647
1648         if (*ppos >= maxbytes) {
1649                 send_sig(SIGXFSZ, current, 0);
1650                 GOTO(out_unlock, retval = -EFBIG);
1651         }
1652         if (end > maxbytes - 1)
1653                 end = maxbytes - 1;
1654
1655         /* generic_file_write handles O_APPEND after getting i_mutex */
1656         chunk = end - *ppos + 1;
1657         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1658                inode->i_ino, chunk, *ppos);
1659         if (tree_locked)
1660                 retval = generic_file_write(file, buf, chunk, ppos);
1661         else
1662                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1663                                              ppos, WRITE);
1664         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1665
1666 out_unlock:
1667         if (tree_locked)
1668                 ll_tree_unlock(&tree);
1669
1670 out:
1671         if (retval > 0) {
1672                 buf += retval;
1673                 count -= retval;
1674                 sum += retval;
1675                 if (retval == chunk && count > 0)
1676                         goto repeat;
1677         }
1678
1679         up(&ll_i2info(inode)->lli_write_sem);
1680
1681         retval = (sum > 0) ? sum : retval;
1682         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1683                            retval > 0 ? retval : 0);
1684         RETURN(retval);
1685 }
1686
1687 /*
1688  * Send file content (through pagecache) somewhere with helper
1689  */
1690 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1691                                 read_actor_t actor, void *target)
1692 {
1693         struct inode *inode = in_file->f_dentry->d_inode;
1694         struct ll_inode_info *lli = ll_i2info(inode);
1695         struct lov_stripe_md *lsm = lli->lli_smd;
1696         struct ll_lock_tree tree;
1697         struct ll_lock_tree_node *node;
1698         struct ost_lvb lvb;
1699         struct ll_ra_read bead;
1700         int rc;
1701         ssize_t retval;
1702         __u64 kms;
1703         ENTRY;
1704         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1705                inode->i_ino, inode->i_generation, inode, count, *ppos);
1706
1707         /* "If nbyte is 0, read() will return 0 and have no other results."
1708          *                      -- Single Unix Spec */
1709         if (count == 0)
1710                 RETURN(0);
1711
1712         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1713         /* turn off the kernel's read-ahead */
1714         in_file->f_ra.ra_pages = 0;
1715
1716         /* File with no objects, nothing to lock */
1717         if (!lsm)
1718                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1719
1720         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1721         if (IS_ERR(node))
1722                 RETURN(PTR_ERR(node));
1723
1724         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1725         rc = ll_tree_lock(&tree, node, NULL, count,
1726                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1727         if (rc != 0)
1728                 RETURN(rc);
1729
1730         ll_clear_file_contended(inode);
1731         ll_inode_size_lock(inode, 1);
1732         /*
1733          * Consistency guarantees: following possibilities exist for the
1734          * relation between region being read and real file size at this
1735          * moment:
1736          *
1737          *  (A): the region is completely inside of the file;
1738          *
1739          *  (B-x): x bytes of region are inside of the file, the rest is
1740          *  outside;
1741          *
1742          *  (C): the region is completely outside of the file.
1743          *
1744          * This classification is stable under DLM lock acquired by
1745          * ll_tree_lock() above, because to change class, other client has to
1746          * take DLM lock conflicting with our lock. Also, any updates to
1747          * ->i_size by other threads on this client are serialized by
1748          * ll_inode_size_lock(). This guarantees that short reads are handled
1749          * correctly in the face of concurrent writes and truncates.
1750          */
1751         inode_init_lvb(inode, &lvb);
1752         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1753         kms = lvb.lvb_size;
1754         if (*ppos + count - 1 > kms) {
1755                 /* A glimpse is necessary to determine whether we return a
1756                  * short read (B) or some zeroes at the end of the buffer (C) */
1757                 ll_inode_size_unlock(inode, 1);
1758                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1759                 if (retval)
1760                         goto out;
1761         } else {
1762                 /* region is within kms and, hence, within real file size (A) */
1763                 i_size_write(inode, kms);
1764                 ll_inode_size_unlock(inode, 1);
1765         }
1766
1767         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1768                inode->i_ino, count, *ppos, i_size_read(inode));
1769
1770         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1771         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1772         ll_ra_read_in(in_file, &bead);
1773         /* BUG: 5972 */
1774         file_accessed(in_file);
1775         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1776         ll_ra_read_ex(in_file, &bead);
1777
1778  out:
1779         ll_tree_unlock(&tree);
1780         RETURN(retval);
1781 }
1782
1783 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1784                                unsigned long arg)
1785 {
1786         struct ll_inode_info *lli = ll_i2info(inode);
1787         struct obd_export *exp = ll_i2dtexp(inode);
1788         struct ll_recreate_obj ucreatp;
1789         struct obd_trans_info oti = { 0 };
1790         struct obdo *oa = NULL;
1791         int lsm_size;
1792         int rc = 0;
1793         struct lov_stripe_md *lsm, *lsm2;
1794         ENTRY;
1795
1796         if (!capable (CAP_SYS_ADMIN))
1797                 RETURN(-EPERM);
1798
1799         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1800                             sizeof(struct ll_recreate_obj));
1801         if (rc) {
1802                 RETURN(-EFAULT);
1803         }
1804         OBDO_ALLOC(oa);
1805         if (oa == NULL)
1806                 RETURN(-ENOMEM);
1807
1808         down(&lli->lli_size_sem);
1809         lsm = lli->lli_smd;
1810         if (lsm == NULL)
1811                 GOTO(out, rc = -ENOENT);
1812         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1813                    (lsm->lsm_stripe_count));
1814
1815         OBD_ALLOC(lsm2, lsm_size);
1816         if (lsm2 == NULL)
1817                 GOTO(out, rc = -ENOMEM);
1818
1819         oa->o_id = ucreatp.lrc_id;
1820         oa->o_gr = ucreatp.lrc_group;
1821         oa->o_nlink = ucreatp.lrc_ost_idx;
1822         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1823         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1824         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1825                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1826
1827         memcpy(lsm2, lsm, lsm_size);
1828         rc = obd_create(exp, oa, &lsm2, &oti);
1829
1830         OBD_FREE(lsm2, lsm_size);
1831         GOTO(out, rc);
1832 out:
1833         up(&lli->lli_size_sem);
1834         OBDO_FREE(oa);
1835         return rc;
1836 }
1837
1838 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1839                              int flags, struct lov_user_md *lum, int lum_size)
1840 {
1841         struct ll_inode_info *lli = ll_i2info(inode);
1842         struct lov_stripe_md *lsm;
1843         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1844         int rc = 0;
1845         ENTRY;
1846
1847         down(&lli->lli_size_sem);
1848         lsm = lli->lli_smd;
1849         if (lsm) {
1850                 up(&lli->lli_size_sem);
1851                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1852                        inode->i_ino);
1853                 RETURN(-EEXIST);
1854         }
1855
1856         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1857         if (rc)
1858                 GOTO(out, rc);
1859         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1860                 GOTO(out_req_free, rc = -ENOENT);
1861         rc = oit.d.lustre.it_status;
1862         if (rc < 0)
1863                 GOTO(out_req_free, rc);
1864
1865         ll_release_openhandle(file->f_dentry, &oit);
1866
1867  out:
1868         up(&lli->lli_size_sem);
1869         ll_intent_release(&oit);
1870         RETURN(rc);
1871 out_req_free:
1872         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1873         goto out;
1874 }
1875
1876 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1877                              struct lov_mds_md **lmmp, int *lmm_size, 
1878                              struct ptlrpc_request **request)
1879 {
1880         struct ll_sb_info *sbi = ll_i2sbi(inode);
1881         struct mdt_body  *body;
1882         struct lov_mds_md *lmm = NULL;
1883         struct ptlrpc_request *req = NULL;
1884         struct obd_capa *oc;
1885         int rc, lmmsize;
1886
1887         rc = ll_get_max_mdsize(sbi, &lmmsize);
1888         if (rc)
1889                 RETURN(rc);
1890
1891         oc = ll_mdscapa_get(inode);
1892         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
1893                              oc, filename, strlen(filename) + 1,
1894                              OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
1895                              ll_i2suppgid(inode), &req);
1896         capa_put(oc);
1897         if (rc < 0) {
1898                 CDEBUG(D_INFO, "md_getattr_name failed "
1899                        "on %s: rc %d\n", filename, rc);
1900                 GOTO(out, rc);
1901         }
1902
1903         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1904         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1905
1906         lmmsize = body->eadatasize;
1907
1908         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1909                         lmmsize == 0) {
1910                 GOTO(out, rc = -ENODATA);
1911         }
1912
1913         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1914         LASSERT(lmm != NULL);
1915
1916         /*
1917          * This is coming from the MDS, so is probably in
1918          * little endian.  We convert it to host endian before
1919          * passing it to userspace.
1920          */
1921         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1922                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1923                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1924         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1925                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1926         }
1927
1928         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1929                 struct lov_stripe_md *lsm;
1930                 struct lov_user_md_join *lmj;
1931                 int lmj_size, i, aindex = 0;
1932
1933                 rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
1934                 if (rc < 0)
1935                         GOTO(out, rc = -ENOMEM);
1936                 rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
1937                 if (rc)
1938                         GOTO(out_free_memmd, rc);
1939
1940                 lmj_size = sizeof(struct lov_user_md_join) +
1941                            lsm->lsm_stripe_count *
1942                            sizeof(struct lov_user_ost_data_join);
1943                 OBD_ALLOC(lmj, lmj_size);
1944                 if (!lmj)
1945                         GOTO(out_free_memmd, rc = -ENOMEM);
1946
1947                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1948                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1949                         struct lov_extent *lex =
1950                                 &lsm->lsm_array->lai_ext_array[aindex];
1951
1952                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1953                                 aindex ++;
1954                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1955                                         LPU64" len %d\n", aindex, i,
1956                                         lex->le_start, (int)lex->le_len);
1957                         lmj->lmm_objects[i].l_extent_start =
1958                                 lex->le_start;
1959
1960                         if ((int)lex->le_len == -1)
1961                                 lmj->lmm_objects[i].l_extent_end = -1;
1962                         else
1963                                 lmj->lmm_objects[i].l_extent_end =
1964                                         lex->le_start + lex->le_len;
1965                         lmj->lmm_objects[i].l_object_id =
1966                                 lsm->lsm_oinfo[i]->loi_id;
1967                         lmj->lmm_objects[i].l_object_gr =
1968                                 lsm->lsm_oinfo[i]->loi_gr;
1969                         lmj->lmm_objects[i].l_ost_gen =
1970                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1971                         lmj->lmm_objects[i].l_ost_idx =
1972                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1973                 }
1974                 lmm = (struct lov_mds_md *)lmj;
1975                 lmmsize = lmj_size;
1976 out_free_memmd:
1977                 obd_free_memmd(sbi->ll_dt_exp, &lsm);
1978         }
1979 out:
1980         *lmmp = lmm;
1981         *lmm_size = lmmsize;
1982         *request = req;
1983         return rc;
1984 }
1985
1986 static int ll_lov_setea(struct inode *inode, struct file *file,
1987                             unsigned long arg)
1988 {
1989         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1990         struct lov_user_md  *lump;
1991         int lum_size = sizeof(struct lov_user_md) +
1992                        sizeof(struct lov_user_ost_data);
1993         int rc;
1994         ENTRY;
1995
1996         if (!capable (CAP_SYS_ADMIN))
1997                 RETURN(-EPERM);
1998
1999         OBD_ALLOC(lump, lum_size);
2000         if (lump == NULL) {
2001                 RETURN(-ENOMEM);
2002         }
2003         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
2004         if (rc) {
2005                 OBD_FREE(lump, lum_size);
2006                 RETURN(-EFAULT);
2007         }
2008
2009         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
2010
2011         OBD_FREE(lump, lum_size);
2012         RETURN(rc);
2013 }
2014
2015 static int ll_lov_setstripe(struct inode *inode, struct file *file,
2016                             unsigned long arg)
2017 {
2018         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
2019         int rc;
2020         int flags = FMODE_WRITE;
2021         ENTRY;
2022
2023         /* Bug 1152: copy properly when this is no longer true */
2024         LASSERT(sizeof(lum) == sizeof(*lump));
2025         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
2026         rc = copy_from_user(&lum, lump, sizeof(lum));
2027         if (rc)
2028                 RETURN(-EFAULT);
2029
2030         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
2031         if (rc == 0) {
2032                  put_user(0, &lump->lmm_stripe_count);
2033                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
2034                                     0, ll_i2info(inode)->lli_smd, lump);
2035         }
2036         RETURN(rc);
2037 }
2038
2039 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
2040 {
2041         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
2042
2043         if (!lsm)
2044                 RETURN(-ENODATA);
2045
2046         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
2047                             (void *)arg);
2048 }
2049
2050 static int ll_get_grouplock(struct inode *inode, struct file *file,
2051                             unsigned long arg)
2052 {
2053         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2054         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
2055                                                     .end = OBD_OBJECT_EOF}};
2056         struct lustre_handle lockh = { 0 };
2057         struct ll_inode_info *lli = ll_i2info(inode);
2058         struct lov_stripe_md *lsm = lli->lli_smd;
2059         int flags = 0, rc;
2060         ENTRY;
2061
2062         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
2063                 RETURN(-EINVAL);
2064         }
2065
2066         policy.l_extent.gid = arg;
2067         if (file->f_flags & O_NONBLOCK)
2068                 flags = LDLM_FL_BLOCK_NOWAIT;
2069
2070         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
2071         if (rc)
2072                 RETURN(rc);
2073
2074         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
2075         fd->fd_gid = arg;
2076         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
2077
2078         RETURN(0);
2079 }
2080
2081 static int ll_put_grouplock(struct inode *inode, struct file *file,
2082                             unsigned long arg)
2083 {
2084         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2085         struct ll_inode_info *lli = ll_i2info(inode);
2086         struct lov_stripe_md *lsm = lli->lli_smd;
2087         int rc;
2088         ENTRY;
2089
2090         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
2091                 /* Ugh, it's already unlocked. */
2092                 RETURN(-EINVAL);
2093         }
2094
2095         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
2096                 RETURN(-EINVAL);
2097
2098         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
2099
2100         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
2101         if (rc)
2102                 RETURN(rc);
2103
2104         fd->fd_gid = 0;
2105         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
2106
2107         RETURN(0);
2108 }
2109
2110 static int join_sanity_check(struct inode *head, struct inode *tail)
2111 {
2112         ENTRY;
2113         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
2114                 CERROR("server do not support join \n");
2115                 RETURN(-EINVAL);
2116         }
2117         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
2118                 CERROR("tail ino %lu and ino head %lu must be regular\n",
2119                        head->i_ino, tail->i_ino);
2120                 RETURN(-EINVAL);
2121         }
2122         if (head->i_ino == tail->i_ino) {
2123                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
2124                 RETURN(-EINVAL);
2125         }
2126         if (i_size_read(head) % JOIN_FILE_ALIGN) {
2127                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
2128                 RETURN(-EINVAL);
2129         }
2130         RETURN(0);
2131 }
2132
2133 static int join_file(struct inode *head_inode, struct file *head_filp,
2134                      struct file *tail_filp)
2135 {
2136         struct dentry *tail_dentry = tail_filp->f_dentry;
2137         struct lookup_intent oit = {.it_op = IT_OPEN,
2138                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
2139         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
2140                 ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
2141
2142         struct lustre_handle lockh;
2143         struct md_op_data *op_data;
2144         int    rc;
2145         loff_t data;
2146         ENTRY;
2147
2148         tail_dentry = tail_filp->f_dentry;
2149
2150         data = i_size_read(head_inode);
2151         op_data = ll_prep_md_op_data(NULL, head_inode,
2152                                      tail_dentry->d_parent->d_inode,
2153                                      tail_dentry->d_name.name,
2154                                      tail_dentry->d_name.len, 0,
2155                                      LUSTRE_OPC_ANY, &data);
2156         if (IS_ERR(op_data))
2157                 RETURN(PTR_ERR(op_data));
2158
2159         rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
2160                          op_data, &lockh, NULL, 0, 0);
2161
2162         ll_finish_md_op_data(op_data);
2163         if (rc < 0)
2164                 GOTO(out, rc);
2165
2166         rc = oit.d.lustre.it_status;
2167
2168         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2169                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2170                 ptlrpc_req_finished((struct ptlrpc_request *)
2171                                     oit.d.lustre.it_data);
2172                 GOTO(out, rc);
2173         }
2174
2175         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2176                                            * away */
2177                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2178                 oit.d.lustre.it_lock_mode = 0;
2179         }
2180         ll_release_openhandle(head_filp->f_dentry, &oit);
2181 out:
2182         ll_intent_release(&oit);
2183         RETURN(rc);
2184 }
2185
2186 static int ll_file_join(struct inode *head, struct file *filp,
2187                         char *filename_tail)
2188 {
2189         struct inode *tail = NULL, *first = NULL, *second = NULL;
2190         struct dentry *tail_dentry;
2191         struct file *tail_filp, *first_filp, *second_filp;
2192         struct ll_lock_tree first_tree, second_tree;
2193         struct ll_lock_tree_node *first_node, *second_node;
2194         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2195         int rc = 0, cleanup_phase = 0;
2196         ENTRY;
2197
2198         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2199                head->i_ino, head->i_generation, head, filename_tail);
2200
2201         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2202         if (IS_ERR(tail_filp)) {
2203                 CERROR("Can not open tail file %s", filename_tail);
2204                 rc = PTR_ERR(tail_filp);
2205                 GOTO(cleanup, rc);
2206         }
2207         tail = igrab(tail_filp->f_dentry->d_inode);
2208
2209         tlli = ll_i2info(tail);
2210         tail_dentry = tail_filp->f_dentry;
2211         LASSERT(tail_dentry);
2212         cleanup_phase = 1;
2213
2214         /*reorder the inode for lock sequence*/
2215         first = head->i_ino > tail->i_ino ? head : tail;
2216         second = head->i_ino > tail->i_ino ? tail : head;
2217         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2218         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2219
2220         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2221                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2222         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2223         if (IS_ERR(first_node)){
2224                 rc = PTR_ERR(first_node);
2225                 GOTO(cleanup, rc);
2226         }
2227         first_tree.lt_fd = first_filp->private_data;
2228         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2229         if (rc != 0)
2230                 GOTO(cleanup, rc);
2231         cleanup_phase = 2;
2232
2233         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2234         if (IS_ERR(second_node)){
2235                 rc = PTR_ERR(second_node);
2236                 GOTO(cleanup, rc);
2237         }
2238         second_tree.lt_fd = second_filp->private_data;
2239         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2240         if (rc != 0)
2241                 GOTO(cleanup, rc);
2242         cleanup_phase = 3;
2243
2244         rc = join_sanity_check(head, tail);
2245         if (rc)
2246                 GOTO(cleanup, rc);
2247
2248         rc = join_file(head, filp, tail_filp);
2249         if (rc)
2250                 GOTO(cleanup, rc);
2251 cleanup:
2252         switch (cleanup_phase) {
2253         case 3:
2254                 ll_tree_unlock(&second_tree);
2255                 obd_cancel_unused(ll_i2dtexp(second),
2256                                   ll_i2info(second)->lli_smd, 0, NULL);
2257         case 2:
2258                 ll_tree_unlock(&first_tree);
2259                 obd_cancel_unused(ll_i2dtexp(first),
2260                                   ll_i2info(first)->lli_smd, 0, NULL);
2261         case 1:
2262                 filp_close(tail_filp, 0);
2263                 if (tail)
2264                         iput(tail);
2265                 if (head && rc == 0) {
2266                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
2267                                        &hlli->lli_smd);
2268                         hlli->lli_smd = NULL;
2269                 }
2270         case 0:
2271                 break;
2272         default:
2273                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2274                 LBUG();
2275         }
2276         RETURN(rc);
2277 }
2278
2279 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2280 {
2281         struct inode *inode = dentry->d_inode;
2282         struct obd_client_handle *och;
2283         int rc;
2284         ENTRY;
2285
2286         LASSERT(inode);
2287
2288         /* Root ? Do nothing. */
2289         if (dentry->d_inode->i_sb->s_root == dentry)
2290                 RETURN(0);
2291
2292         /* No open handle to close? Move away */
2293         if (!it_disposition(it, DISP_OPEN_OPEN))
2294                 RETURN(0);
2295
2296         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2297
2298         OBD_ALLOC(och, sizeof(*och));
2299         if (!och)
2300                 GOTO(out, rc = -ENOMEM);
2301
2302         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2303                     ll_i2info(inode), it, och);
2304
2305         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2306                                        inode, och);
2307  out:
2308         /* this one is in place of ll_file_open */
2309         ptlrpc_req_finished(it->d.lustre.it_data);
2310         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2311         RETURN(rc);
2312 }
2313
2314 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2315                   unsigned long arg)
2316 {
2317         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2318         int flags;
2319         ENTRY;
2320
2321         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2322                inode->i_generation, inode, cmd);
2323         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2324
2325         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2326         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2327                 RETURN(-ENOTTY);
2328
2329         switch(cmd) {
2330         case LL_IOC_GETFLAGS:
2331                 /* Get the current value of the file flags */
2332                 return put_user(fd->fd_flags, (int *)arg);
2333         case LL_IOC_SETFLAGS:
2334         case LL_IOC_CLRFLAGS:
2335                 /* Set or clear specific file flags */
2336                 /* XXX This probably needs checks to ensure the flags are
2337                  *     not abused, and to handle any flag side effects.
2338                  */
2339                 if (get_user(flags, (int *) arg))
2340                         RETURN(-EFAULT);
2341
2342                 if (cmd == LL_IOC_SETFLAGS) {
2343                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2344                             !(file->f_flags & O_DIRECT)) {
2345                                 CERROR("%s: unable to disable locking on "
2346                                        "non-O_DIRECT file\n", current->comm);
2347                                 RETURN(-EINVAL);
2348                         }
2349
2350                         fd->fd_flags |= flags;
2351                 } else {
2352                         fd->fd_flags &= ~flags;
2353                 }
2354                 RETURN(0);
2355         case LL_IOC_LOV_SETSTRIPE:
2356                 RETURN(ll_lov_setstripe(inode, file, arg));
2357         case LL_IOC_LOV_SETEA:
2358                 RETURN(ll_lov_setea(inode, file, arg));
2359         case LL_IOC_LOV_GETSTRIPE:
2360                 RETURN(ll_lov_getstripe(inode, arg));
2361         case LL_IOC_RECREATE_OBJ:
2362                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2363         case EXT3_IOC_GETFLAGS:
2364         case EXT3_IOC_SETFLAGS:
2365                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2366         case EXT3_IOC_GETVERSION_OLD:
2367         case EXT3_IOC_GETVERSION:
2368                 RETURN(put_user(inode->i_generation, (int *)arg));
2369         case LL_IOC_JOIN: {
2370                 char *ftail;
2371                 int rc;
2372
2373                 ftail = getname((const char *)arg);
2374                 if (IS_ERR(ftail))
2375                         RETURN(PTR_ERR(ftail));
2376                 rc = ll_file_join(inode, file, ftail);
2377                 putname(ftail);
2378                 RETURN(rc);
2379         }
2380         case LL_IOC_GROUP_LOCK:
2381                 RETURN(ll_get_grouplock(inode, file, arg));
2382         case LL_IOC_GROUP_UNLOCK:
2383                 RETURN(ll_put_grouplock(inode, file, arg));
2384         case IOC_OBD_STATFS:
2385                 RETURN(ll_obd_statfs(inode, (void *)arg));
2386
2387         /* We need to special case any other ioctls we want to handle,
2388          * to send them to the MDS/OST as appropriate and to properly
2389          * network encode the arg field.
2390         case EXT3_IOC_SETVERSION_OLD:
2391         case EXT3_IOC_SETVERSION:
2392         */
2393         case LL_IOC_FLUSHCTX:
2394                 RETURN(ll_flush_ctx(inode));
2395         default: {
2396                 int err;
2397
2398                 if (LLIOC_STOP == 
2399                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2400                         RETURN(err);
2401
2402                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2403                                      (void *)arg));
2404         }
2405         }
2406 }
2407
2408 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2409 {
2410         struct inode *inode = file->f_dentry->d_inode;
2411         struct ll_inode_info *lli = ll_i2info(inode);
2412         struct lov_stripe_md *lsm = lli->lli_smd;
2413         loff_t retval;
2414         ENTRY;
2415         retval = offset + ((origin == 2) ? i_size_read(inode) :
2416                            (origin == 1) ? file->f_pos : 0);
2417         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2418                inode->i_ino, inode->i_generation, inode, retval, retval,
2419                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2420         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2421
2422         if (origin == 2) { /* SEEK_END */
2423                 int nonblock = 0, rc;
2424
2425                 if (file->f_flags & O_NONBLOCK)
2426                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2427
2428                 if (lsm != NULL) {
2429                         rc = ll_glimpse_size(inode, nonblock);
2430                         if (rc != 0)
2431                                 RETURN(rc);
2432                 }
2433
2434                 ll_inode_size_lock(inode, 0);
2435                 offset += i_size_read(inode);
2436                 ll_inode_size_unlock(inode, 0);
2437         } else if (origin == 1) { /* SEEK_CUR */
2438                 offset += file->f_pos;
2439         }
2440
2441         retval = -EINVAL;
2442         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2443                 if (offset != file->f_pos) {
2444                         file->f_pos = offset;
2445 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2446                         file->f_reada = 0;
2447                         file->f_version = ++event;
2448 #endif
2449                 }
2450                 retval = offset;
2451         }
2452         
2453         RETURN(retval);
2454 }
2455
2456 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2457 {
2458         struct inode *inode = dentry->d_inode;
2459         struct ll_inode_info *lli = ll_i2info(inode);
2460         struct lov_stripe_md *lsm = lli->lli_smd;
2461         struct ptlrpc_request *req;
2462         struct obd_capa *oc;
2463         int rc, err;
2464         ENTRY;
2465         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2466                inode->i_generation, inode);
2467         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2468
2469         /* fsync's caller has already called _fdata{sync,write}, we want
2470          * that IO to finish before calling the osc and mdc sync methods */
2471         rc = filemap_fdatawait(inode->i_mapping);
2472
2473         /* catch async errors that were recorded back when async writeback
2474          * failed for pages in this mapping. */
2475         err = lli->lli_async_rc;
2476         lli->lli_async_rc = 0;
2477         if (rc == 0)
2478                 rc = err;
2479         if (lsm) {
2480                 err = lov_test_and_clear_async_rc(lsm);
2481                 if (rc == 0)
2482                         rc = err;
2483         }
2484
2485         oc = ll_mdscapa_get(inode);
2486         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2487                       &req);
2488         capa_put(oc);
2489         if (!rc)
2490                 rc = err;
2491         if (!err)
2492                 ptlrpc_req_finished(req);
2493
2494         if (data && lsm) {
2495                 struct obdo *oa;
2496                 
2497                 OBDO_ALLOC(oa);
2498                 if (!oa)
2499                         RETURN(rc ? rc : -ENOMEM);
2500
2501                 oa->o_id = lsm->lsm_object_id;
2502                 oa->o_gr = lsm->lsm_object_gr;
2503                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2504                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2505                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2506                                            OBD_MD_FLGROUP);
2507
2508                 oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2509                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2510                                0, OBD_OBJECT_EOF, oc);
2511                 capa_put(oc);
2512                 if (!rc)
2513                         rc = err;
2514                 OBDO_FREE(oa);
2515         }
2516
2517         RETURN(rc);
2518 }
2519
2520 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2521 {
2522         struct inode *inode = file->f_dentry->d_inode;
2523         struct ll_sb_info *sbi = ll_i2sbi(inode);
2524         struct ldlm_res_id res_id =
2525                 { .name = { fid_seq(ll_inode2fid(inode)),
2526                             fid_oid(ll_inode2fid(inode)),
2527                             fid_ver(ll_inode2fid(inode)),
2528                             LDLM_FLOCK} };
2529         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2530                 ldlm_flock_completion_ast, NULL, file_lock };
2531         struct lustre_handle lockh = {0};
2532         ldlm_policy_data_t flock;
2533         int flags = 0;
2534         int rc;
2535         ENTRY;
2536
2537         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2538                inode->i_ino, file_lock);
2539
2540         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2541  
2542         if (file_lock->fl_flags & FL_FLOCK) {
2543                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2544                 /* set missing params for flock() calls */
2545                 file_lock->fl_end = OFFSET_MAX;
2546                 file_lock->fl_pid = current->tgid;
2547         }
2548         flock.l_flock.pid = file_lock->fl_pid;
2549         flock.l_flock.start = file_lock->fl_start;
2550         flock.l_flock.end = file_lock->fl_end;
2551
2552         switch (file_lock->fl_type) {
2553         case F_RDLCK:
2554                 einfo.ei_mode = LCK_PR;
2555                 break;
2556         case F_UNLCK:
2557                 /* An unlock request may or may not have any relation to
2558                  * existing locks so we may not be able to pass a lock handle
2559                  * via a normal ldlm_lock_cancel() request. The request may even
2560                  * unlock a byte range in the middle of an existing lock. In
2561                  * order to process an unlock request we need all of the same
2562                  * information that is given with a normal read or write record
2563                  * lock request. To avoid creating another ldlm unlock (cancel)
2564                  * message we'll treat a LCK_NL flock request as an unlock. */
2565                 einfo.ei_mode = LCK_NL;
2566                 break;
2567         case F_WRLCK:
2568                 einfo.ei_mode = LCK_PW;
2569                 break;
2570         default:
2571                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2572                 LBUG();
2573         }
2574
2575         switch (cmd) {
2576         case F_SETLKW:
2577 #ifdef F_SETLKW64
2578         case F_SETLKW64:
2579 #endif
2580                 flags = 0;
2581                 break;
2582         case F_SETLK:
2583 #ifdef F_SETLK64
2584         case F_SETLK64:
2585 #endif
2586                 flags = LDLM_FL_BLOCK_NOWAIT;
2587                 break;
2588         case F_GETLK:
2589 #ifdef F_GETLK64
2590         case F_GETLK64:
2591 #endif
2592                 flags = LDLM_FL_TEST_LOCK;
2593                 /* Save the old mode so that if the mode in the lock changes we
2594                  * can decrement the appropriate reader or writer refcount. */
2595                 file_lock->fl_type = einfo.ei_mode;
2596                 break;
2597         default:
2598                 CERROR("unknown fcntl lock command: %d\n", cmd);
2599                 LBUG();
2600         }
2601
2602         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2603                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2604                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2605
2606         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
2607                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2608         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2609                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2610 #ifdef HAVE_F_OP_FLOCK
2611         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2612             !(flags & LDLM_FL_TEST_LOCK))
2613                 posix_lock_file_wait(file, file_lock);
2614 #endif
2615
2616         RETURN(rc);
2617 }
2618
2619 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2620 {
2621         ENTRY;
2622
2623         RETURN(-ENOSYS);
2624 }
2625
2626 int ll_have_md_lock(struct inode *inode, __u64 bits)
2627 {
2628         struct lustre_handle lockh;
2629         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2630         struct lu_fid *fid;
2631         int flags;
2632         ENTRY;
2633
2634         if (!inode)
2635                RETURN(0);
2636
2637         fid = &ll_i2info(inode)->lli_fid;
2638         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2639
2640         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2641         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2642                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2643                 RETURN(1);
2644         }
2645         RETURN(0);
2646 }
2647
2648 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2649                             struct lustre_handle *lockh)
2650 {
2651         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2652         struct lu_fid *fid;
2653         ldlm_mode_t rc;
2654         int flags;
2655         ENTRY;
2656
2657         fid = &ll_i2info(inode)->lli_fid;
2658         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2659
2660         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
2661         rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2662                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2663         RETURN(rc);
2664 }
2665
2666 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2667         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2668                               * and return success */
2669                 inode->i_nlink = 0;
2670                 /* This path cannot be hit for regular files unless in
2671                  * case of obscure races, so no need to to validate
2672                  * size. */
2673                 if (!S_ISREG(inode->i_mode) &&
2674                     !S_ISDIR(inode->i_mode))
2675                         return 0;
2676         }
2677
2678         if (rc) {
2679                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2680                 return -abs(rc);
2681
2682         }
2683
2684         return 0;
2685 }
2686
2687 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2688 {
2689         struct inode *inode = dentry->d_inode;
2690         struct ptlrpc_request *req = NULL;
2691         struct ll_sb_info *sbi;
2692         struct obd_export *exp;
2693         int rc;
2694         ENTRY;
2695
2696         if (!inode) {
2697                 CERROR("REPORT THIS LINE TO PETER\n");
2698                 RETURN(0);
2699         }
2700         sbi = ll_i2sbi(inode);
2701
2702         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2703                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2704
2705         exp = ll_i2mdexp(inode);
2706
2707         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2708                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2709                 struct md_op_data *op_data;
2710
2711                 /* Call getattr by fid, so do not provide name at all. */
2712                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2713                                              dentry->d_inode, NULL, 0, 0,
2714                                              LUSTRE_OPC_ANY, NULL);
2715                 if (IS_ERR(op_data))
2716                         RETURN(PTR_ERR(op_data));
2717
2718                 oit.it_flags |= O_CHECK_STALE;
2719                 rc = md_intent_lock(exp, op_data, NULL, 0,
2720                                     /* we are not interested in name
2721                                        based lookup */
2722                                     &oit, 0, &req,
2723                                     ll_md_blocking_ast, 0);
2724                 ll_finish_md_op_data(op_data);
2725                 oit.it_flags &= ~O_CHECK_STALE;
2726                 if (rc < 0) {
2727                         rc = ll_inode_revalidate_fini(inode, rc);
2728                         GOTO (out, rc);
2729                 }
2730
2731                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2732                 if (rc != 0) {
2733                         ll_intent_release(&oit);
2734                         GOTO(out, rc);
2735                 }
2736
2737                 /* Unlinked? Unhash dentry, so it is not picked up later by
2738                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2739                    here to preserve get_cwd functionality on 2.6.
2740                    Bug 10503 */
2741                 if (!dentry->d_inode->i_nlink) {
2742                         spin_lock(&dcache_lock);
2743                         ll_drop_dentry(dentry);
2744                         spin_unlock(&dcache_lock);
2745                 }
2746
2747                 ll_lookup_finish_locks(&oit, dentry);
2748         } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
2749                                                      MDS_INODELOCK_LOOKUP)) {
2750                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2751                 obd_valid valid = OBD_MD_FLGETATTR;
2752                 struct obd_capa *oc;
2753                 int ealen = 0;
2754
2755                 if (S_ISREG(inode->i_mode)) {
2756                         rc = ll_get_max_mdsize(sbi, &ealen);
2757                         if (rc)
2758                                 RETURN(rc);
2759                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2760                 }
2761                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2762                  * capa for this inode. Because we only keep capas of dirs
2763                  * fresh. */
2764                 oc = ll_mdscapa_get(inode);
2765                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2766                                 ealen, &req);
2767                 capa_put(oc);
2768                 if (rc) {
2769                         rc = ll_inode_revalidate_fini(inode, rc);
2770                         RETURN(rc);
2771                 }
2772
2773                 rc = ll_prep_inode(&inode, req, NULL);
2774                 if (rc)
2775                         GOTO(out, rc);
2776         }
2777
2778         /* if object not yet allocated, don't validate size */
2779         if (ll_i2info(inode)->lli_smd == NULL)
2780                 GOTO(out, rc = 0);
2781
2782         /* ll_glimpse_size will prefer locally cached writes if they extend
2783          * the file */
2784         rc = ll_glimpse_size(inode, 0);
2785         EXIT;
2786 out:
2787         ptlrpc_req_finished(req);
2788         return rc;
2789 }
2790
2791 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2792                   struct lookup_intent *it, struct kstat *stat)
2793 {
2794         struct inode *inode = de->d_inode;
2795         int res = 0;
2796
2797         res = ll_inode_revalidate_it(de, it);
2798         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2799
2800         if (res)
2801                 return res;
2802
2803         stat->dev = inode->i_sb->s_dev;
2804         stat->ino = inode->i_ino;
2805         stat->mode = inode->i_mode;
2806         stat->nlink = inode->i_nlink;
2807         stat->uid = inode->i_uid;
2808         stat->gid = inode->i_gid;
2809         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2810         stat->atime = inode->i_atime;
2811         stat->mtime = inode->i_mtime;
2812         stat->ctime = inode->i_ctime;
2813 #ifdef HAVE_INODE_BLKSIZE
2814         stat->blksize = inode->i_blksize;
2815 #else
2816         stat->blksize = 1 << inode->i_blkbits;
2817 #endif
2818
2819         ll_inode_size_lock(inode, 0);
2820         stat->size = i_size_read(inode);
2821         stat->blocks = inode->i_blocks;
2822         ll_inode_size_unlock(inode, 0);
2823
2824         return 0;
2825 }
2826 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2827 {
2828         struct lookup_intent it = { .it_op = IT_GETATTR };
2829
2830         return ll_getattr_it(mnt, de, &it, stat);
2831 }
2832
2833 static
2834 int lustre_check_acl(struct inode *inode, int mask)
2835 {
2836 #ifdef CONFIG_FS_POSIX_ACL
2837         struct ll_inode_info *lli = ll_i2info(inode);
2838         struct posix_acl *acl;
2839         int rc;
2840         ENTRY;
2841
2842         spin_lock(&lli->lli_lock);
2843         acl = posix_acl_dup(lli->lli_posix_acl);
2844         spin_unlock(&lli->lli_lock);
2845
2846         if (!acl)
2847                 RETURN(-EAGAIN);
2848
2849         rc = posix_acl_permission(inode, acl, mask);
2850         posix_acl_release(acl);
2851
2852         RETURN(rc);
2853 #else
2854         return -EAGAIN;
2855 #endif
2856 }
2857
2858 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2859 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2860 {
2861         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2862                inode->i_ino, inode->i_generation, inode, mask);
2863         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2864                 return lustre_check_remote_perm(inode, mask);
2865         
2866         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2867         return generic_permission(inode, mask, lustre_check_acl);
2868 }
2869 #else
2870 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2871 {
2872         int mode = inode->i_mode;
2873         int rc;
2874
2875         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2876                inode->i_ino, inode->i_generation, inode, mask);
2877
2878         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2879                 return lustre_check_remote_perm(inode, mask);
2880
2881         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2882
2883         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2884             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2885                 return -EROFS;
2886         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2887                 return -EACCES;
2888         if (current->fsuid == inode->i_uid) {
2889                 mode >>= 6;
2890         } else if (1) {
2891                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2892                         goto check_groups;
2893                 rc = lustre_check_acl(inode, mask);
2894                 if (rc == -EAGAIN)
2895                         goto check_groups;
2896                 if (rc == -EACCES)
2897                         goto check_capabilities;
2898                 return rc;
2899         } else {
2900 check_groups:
2901                 if (in_group_p(inode->i_gid))
2902                         mode >>= 3;
2903         }
2904         if ((mode & mask & S_IRWXO) == mask)
2905                 return 0;
2906
2907 check_capabilities:
2908         if (!(mask & MAY_EXEC) ||
2909             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2910                 if (capable(CAP_DAC_OVERRIDE))
2911                         return 0;
2912
2913         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2914             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2915                 return 0;
2916         
2917         return -EACCES;
2918 }
2919 #endif
2920
2921 /* -o localflock - only provides locally consistent flock locks */
2922 struct file_operations ll_file_operations = {
2923         .read           = ll_file_read,
2924         .write          = ll_file_write,
2925         .ioctl          = ll_file_ioctl,
2926         .open           = ll_file_open,
2927         .release        = ll_file_release,
2928         .mmap           = ll_file_mmap,
2929         .llseek         = ll_file_seek,
2930         .sendfile       = ll_file_sendfile,
2931         .fsync          = ll_fsync,
2932 };
2933
2934 struct file_operations ll_file_operations_flock = {
2935         .read           = ll_file_read,
2936         .write          = ll_file_write,
2937         .ioctl          = ll_file_ioctl,
2938         .open           = ll_file_open,
2939         .release        = ll_file_release,
2940         .mmap           = ll_file_mmap,
2941         .llseek         = ll_file_seek,
2942         .sendfile       = ll_file_sendfile,
2943         .fsync          = ll_fsync,
2944 #ifdef HAVE_F_OP_FLOCK
2945         .flock          = ll_file_flock,
2946 #endif
2947         .lock           = ll_file_flock
2948 };
2949
2950 /* These are for -o noflock - to return ENOSYS on flock calls */
2951 struct file_operations ll_file_operations_noflock = {
2952         .read           = ll_file_read,
2953         .write          = ll_file_write,
2954         .ioctl          = ll_file_ioctl,
2955         .open           = ll_file_open,
2956         .release        = ll_file_release,
2957         .mmap           = ll_file_mmap,
2958         .llseek         = ll_file_seek,
2959         .sendfile       = ll_file_sendfile,
2960         .fsync          = ll_fsync,
2961 #ifdef HAVE_F_OP_FLOCK
2962         .flock          = ll_file_noflock,
2963 #endif
2964         .lock           = ll_file_noflock
2965 };
2966
2967 struct inode_operations ll_file_inode_operations = {
2968 #ifdef HAVE_VFS_INTENT_PATCHES
2969         .setattr_raw    = ll_setattr_raw,
2970 #endif
2971         .setattr        = ll_setattr,
2972         .truncate       = ll_truncate,
2973         .getattr        = ll_getattr,
2974         .permission     = ll_inode_permission,
2975         .setxattr       = ll_setxattr,
2976         .getxattr       = ll_getxattr,
2977         .listxattr      = ll_listxattr,
2978         .removexattr    = ll_removexattr,
2979 };
2980
2981 /* dynamic ioctl number support routins */
2982 static struct llioc_ctl_data {
2983         struct rw_semaphore ioc_sem;
2984         struct list_head    ioc_head;
2985 } llioc = { 
2986         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2987         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2988 };
2989
2990
2991 struct llioc_data {
2992         struct list_head        iocd_list;
2993         unsigned int            iocd_size;
2994         llioc_callback_t        iocd_cb;
2995         unsigned int            iocd_count;
2996         unsigned int            iocd_cmd[0];
2997 };
2998
2999 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3000 {
3001         unsigned int size;
3002         struct llioc_data *in_data = NULL;
3003         ENTRY;
3004
3005         if (cb == NULL || cmd == NULL ||
3006             count > LLIOC_MAX_CMD || count < 0)
3007                 RETURN(NULL);
3008
3009         size = sizeof(*in_data) + count * sizeof(unsigned int);
3010         OBD_ALLOC(in_data, size);
3011         if (in_data == NULL)
3012                 RETURN(NULL);
3013
3014         memset(in_data, 0, sizeof(*in_data));
3015         in_data->iocd_size = size;
3016         in_data->iocd_cb = cb;
3017         in_data->iocd_count = count;
3018         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3019
3020         down_write(&llioc.ioc_sem);
3021         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3022         up_write(&llioc.ioc_sem);
3023
3024         RETURN(in_data);
3025 }
3026
3027 void ll_iocontrol_unregister(void *magic)
3028 {
3029         struct llioc_data *tmp;
3030
3031         if (magic == NULL)
3032                 return;
3033
3034         down_write(&llioc.ioc_sem);
3035         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3036                 if (tmp == magic) {
3037                         unsigned int size = tmp->iocd_size;
3038
3039                         list_del(&tmp->iocd_list);
3040                         up_write(&llioc.ioc_sem);
3041
3042                         OBD_FREE(tmp, size);
3043                         return;
3044                 }
3045         }
3046         up_write(&llioc.ioc_sem);
3047
3048         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3049 }
3050
3051 EXPORT_SYMBOL(ll_iocontrol_register);
3052 EXPORT_SYMBOL(ll_iocontrol_unregister);
3053
3054 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
3055                         unsigned int cmd, unsigned long arg, int *rcp)
3056 {
3057         enum llioc_iter ret = LLIOC_CONT;
3058         struct llioc_data *data;
3059         int rc = -EINVAL, i;
3060
3061         down_read(&llioc.ioc_sem);
3062         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3063                 for (i = 0; i < data->iocd_count; i++) {
3064                         if (cmd != data->iocd_cmd[i]) 
3065                                 continue;
3066
3067                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3068                         break;
3069                 }
3070
3071                 if (ret == LLIOC_STOP)
3072                         break;
3073         }
3074         up_read(&llioc.ioc_sem);
3075
3076         if (rcp)
3077                 *rcp = rc;
3078         return ret;
3079 }