Whamcloud - gitweb
ldlm and its users: do not pass struct ldlm_res_id by value, pass a pointer instead...
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <lustre_mdc.h>
29 #include <linux/pagemap.h>
30 #include <linux/file.h>
31 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
32 #include <linux/lustre_compat25.h>
33 #endif
34 #include "llite_internal.h"
35
36 /* also used by llite/special.c:ll_special_open() */
37 struct ll_file_data *ll_file_data_get(void)
38 {
39         struct ll_file_data *fd;
40
41         OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
42         return fd;
43 }
44
45 static void ll_file_data_put(struct ll_file_data *fd)
46 {
47         if (fd != NULL)
48                 OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
49 }
50
51 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
52                           struct lustre_handle *fh)
53 {
54         op_data->fid1 = ll_i2info(inode)->lli_fid;
55         op_data->attr.ia_mode = inode->i_mode;
56         op_data->attr.ia_atime = inode->i_atime;
57         op_data->attr.ia_mtime = inode->i_mtime;
58         op_data->attr.ia_ctime = inode->i_ctime;
59         op_data->attr.ia_size = inode->i_size;
60         op_data->attr_blocks = inode->i_blocks;
61         ((struct ll_iattr *)&op_data->attr)->ia_attr_flags = inode->i_flags;
62         op_data->ioepoch = ll_i2info(inode)->lli_ioepoch;
63         memcpy(&op_data->handle, fh, sizeof(op_data->handle));
64         op_data->mod_capa1 = ll_mdscapa_get(inode);
65 }
66
67 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
68                              struct obd_client_handle *och)
69 {
70         ENTRY;
71
72         op_data->attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
73                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
74
75         if (!(och->och_flags & FMODE_WRITE))
76                 goto out;
77
78         if (!S_ISREG(inode->i_mode))
79                 op_data->attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
80         else
81                 ll_epoch_close(inode, op_data, &och, 0);
82
83 out:
84         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
85         EXIT;
86 }
87
88 static int ll_close_inode_openhandle(struct obd_export *md_exp,
89                                      struct inode *inode,
90                                      struct obd_client_handle *och)
91 {
92         struct md_op_data *op_data;
93         struct ptlrpc_request *req = NULL;
94         struct obd_device *obd;
95         int epoch_close = 1;
96         int rc;
97         ENTRY;
98
99         obd = class_exp2obd(ll_i2mdexp(inode));
100         if (obd == NULL) {
101                 /*
102                  * XXX: in case of LMV, is this correct to access
103                  * ->exp_handle?
104                  */
105                 CERROR("Invalid MDC connection handle "LPX64"\n",
106                        ll_i2mdexp(inode)->exp_handle.h_cookie);
107                 GOTO(out, rc = 0);
108         }
109
110         /*
111          * here we check if this is forced umount. If so this is called on
112          * canceling "open lock" and we do not call md_close() in this case, as
113          * it will not be successful, as import is already deactivated.
114          */
115         if (obd->obd_no_recov)
116                 GOTO(out, rc = 0);
117
118         OBD_ALLOC_PTR(op_data);
119         if (op_data == NULL)
120                 GOTO(out, rc = -ENOMEM);
121
122         ll_prepare_close(inode, op_data, och);
123         epoch_close = (och->och_flags & FMODE_WRITE) &&
124                       ((op_data->flags & MF_EPOCH_CLOSE) ||
125                        !S_ISREG(inode->i_mode));
126         rc = md_close(md_exp, op_data, och, &req);
127
128         ll_finish_md_op_data(op_data);
129         if (rc == -EAGAIN) {
130                 /* This close must have closed the epoch. */
131                 LASSERT(epoch_close);
132                 /* MDS has instructed us to obtain Size-on-MDS attribute from
133                  * OSTs and send setattr to back to MDS. */
134                 rc = ll_sizeonmds_update(inode, &och->och_fh);
135                 if (rc) {
136                         CERROR("inode %lu mdc Size-on-MDS update failed: "
137                                "rc = %d\n", inode->i_ino, rc);
138                         rc = 0;
139                 }
140         } else if (rc) {
141                 CERROR("inode %lu mdc close failed: rc = %d\n",
142                        inode->i_ino, rc);
143         }
144
145         if (!epoch_close && (och->och_flags & FMODE_WRITE))
146                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
147
148         if (rc == 0) {
149                 rc = ll_objects_destroy(req, inode);
150                 if (rc)
151                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
152                                inode->i_ino, rc);
153         }
154
155         ptlrpc_req_finished(req); /* This is close request */
156         EXIT;
157 out:
158         md_clear_open_replay_data(md_exp, och);
159         if (epoch_close || !(och->och_flags & FMODE_WRITE))
160                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
161         return rc;
162 }
163
164 int ll_md_real_close(struct inode *inode, int flags)
165 {
166         struct ll_inode_info *lli = ll_i2info(inode);
167         struct obd_client_handle **och_p;
168         struct obd_client_handle *och;
169         __u64 *och_usecount;
170         int rc = 0;
171         ENTRY;
172
173         if (flags & FMODE_WRITE) {
174                 och_p = &lli->lli_mds_write_och;
175                 och_usecount = &lli->lli_open_fd_write_count;
176         } else if (flags & FMODE_EXEC) {
177                 och_p = &lli->lli_mds_exec_och;
178                 och_usecount = &lli->lli_open_fd_exec_count;
179         } else {
180                 LASSERT(flags & FMODE_READ);
181                 och_p = &lli->lli_mds_read_och;
182                 och_usecount = &lli->lli_open_fd_read_count;
183         }
184
185         down(&lli->lli_och_sem);
186         if (*och_usecount) { /* There are still users of this handle, so
187                                 skip freeing it. */
188                 up(&lli->lli_och_sem);
189                 RETURN(0);
190         }
191         och=*och_p;
192         *och_p = NULL;
193         up(&lli->lli_och_sem);
194
195         if (och) { /* There might be a race and somebody have freed this och
196                       already */
197                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
198                                                inode, och);
199                 /* Do not free @och is it is waiting for DONE_WRITING. */
200                 if (och->och_fh.cookie == DEAD_HANDLE_MAGIC)
201                         OBD_FREE(och, sizeof *och);
202         }
203
204         RETURN(rc);
205 }
206
207 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
208                 struct file *file)
209 {
210         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
211         struct ll_inode_info *lli = ll_i2info(inode);
212         int rc = 0;
213         ENTRY;
214
215         /* clear group lock, if present */
216         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
217                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
218                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
219                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
220                                       &fd->fd_cwlockh);
221         }
222
223         /* Let's see if we have good enough OPEN lock on the file and if
224            we can skip talking to MDS */
225         if (file->f_dentry->d_inode) { /* Can this ever be false? */
226                 int lockmode;
227                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
228                 struct lustre_handle lockh;
229                 struct inode *inode = file->f_dentry->d_inode;
230                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
231
232                 down(&lli->lli_och_sem);
233                 if (fd->fd_omode & FMODE_WRITE) {
234                         lockmode = LCK_CW;
235                         LASSERT(lli->lli_open_fd_write_count);
236                         lli->lli_open_fd_write_count--;
237                 } else if (fd->fd_omode & FMODE_EXEC) {
238                         lockmode = LCK_PR;
239                         LASSERT(lli->lli_open_fd_exec_count);
240                         lli->lli_open_fd_exec_count--;
241                 } else {
242                         lockmode = LCK_CR;
243                         LASSERT(lli->lli_open_fd_read_count);
244                         lli->lli_open_fd_read_count--;
245                 }
246                 up(&lli->lli_och_sem);
247
248                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
249                                    LDLM_IBITS, &policy, lockmode,
250                                    &lockh)) {
251                         rc = ll_md_real_close(file->f_dentry->d_inode,
252                                               fd->fd_omode);
253                 }
254         } else {
255                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
256                        file, file->f_dentry, file->f_dentry->d_name.name);
257         }
258
259         LUSTRE_FPRIVATE(file) = NULL;
260         ll_file_data_put(fd);
261         ll_capa_close(inode);
262
263         RETURN(rc);
264 }
265
266 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
267
268 /* While this returns an error code, fput() the caller does not, so we need
269  * to make every effort to clean up all of our state here.  Also, applications
270  * rarely check close errors and even if an error is returned they will not
271  * re-try the close call.
272  */
273 int ll_file_release(struct inode *inode, struct file *file)
274 {
275         struct ll_file_data *fd;
276         struct ll_sb_info *sbi = ll_i2sbi(inode);
277         struct ll_inode_info *lli = ll_i2info(inode);
278         struct lov_stripe_md *lsm = lli->lli_smd;
279         int rc;
280
281         ENTRY;
282         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
283                inode->i_generation, inode);
284
285         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
286         fd = LUSTRE_FPRIVATE(file);
287         LASSERT(fd != NULL);
288
289         /* don't do anything for / */
290         if (inode->i_sb->s_root == file->f_dentry) {
291                 LUSTRE_FPRIVATE(file) = NULL;
292                 ll_file_data_put(fd);
293                 RETURN(0);
294         }
295
296         if (lsm)
297                 lov_test_and_clear_async_rc(lsm);
298         lli->lli_async_rc = 0;
299
300         rc = ll_md_close(sbi->ll_md_exp, inode, file);
301         RETURN(rc);
302 }
303
304 static int ll_intent_file_open(struct file *file, void *lmm,
305                                int lmmsize, struct lookup_intent *itp)
306 {
307         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
308         struct dentry *parent = file->f_dentry->d_parent;
309         const char *name = file->f_dentry->d_name.name;
310         const int len = file->f_dentry->d_name.len;
311         struct lustre_handle lockh;
312         struct md_op_data *op_data;
313         int rc;
314
315         if (!parent)
316                 RETURN(-ENOENT);
317
318         /* Usually we come here only for NFSD, and we want open lock.
319            But we can also get here with pre 2.6.15 patchless kernels, and in
320            that case that lock is also ok */
321         /* We can also get here if there was cached open handle in revalidate_it
322          * but it disappeared while we were getting from there to ll_file_open.
323          * But this means this file was closed and immediatelly opened which
324          * makes a good candidate for using OPEN lock */
325         /* If lmmsize & lmm are not 0, we are just setting stripe info
326          * parameters. No need for the open lock */
327         if (!lmm && !lmmsize)
328                 itp->it_flags |= MDS_OPEN_LOCK;
329
330         op_data  = ll_prep_md_op_data(NULL, parent->d_inode, NULL, name, len,
331                                       O_RDWR);
332         if (op_data == NULL)
333                 RETURN(-ENOMEM);
334
335         rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_CW, op_data,
336                         &lockh, lmm, lmmsize, ldlm_completion_ast,
337                         ll_md_blocking_ast, NULL, 0);
338
339         ll_finish_md_op_data(op_data);
340         if (rc < 0) {
341                 CERROR("lock enqueue: err: %d\n", rc);
342                 RETURN(rc);
343         }
344
345         if (itp->d.lustre.it_lock_mode) { /* If we got lock - release it right
346                                            * away */
347                 ldlm_lock_decref(&lockh, itp->d.lustre.it_lock_mode);
348                 itp->d.lustre.it_lock_mode = 0;
349         }
350         rc = ll_prep_inode(&file->f_dentry->d_inode,
351                            (struct ptlrpc_request *)itp->d.lustre.it_data,
352                            DLM_REPLY_REC_OFF, NULL);
353         RETURN(rc);
354 }
355
356 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
357                        struct lookup_intent *it, struct obd_client_handle *och)
358 {
359         struct ptlrpc_request *req = it->d.lustre.it_data;
360         struct mdt_body *body;
361
362         LASSERT(och);
363
364         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
365         LASSERT(body != NULL);                      /* reply already checked out */
366         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
367
368         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
369         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
370         och->och_fid = lli->lli_fid;
371         och->och_flags = it->it_flags;
372         lli->lli_ioepoch = body->ioepoch;
373
374         return md_set_open_replay_data(md_exp, och, req);
375 }
376
377 int ll_local_open(struct file *file, struct lookup_intent *it,
378                   struct ll_file_data *fd, struct obd_client_handle *och)
379 {
380         struct inode *inode = file->f_dentry->d_inode;
381         struct ll_inode_info *lli = ll_i2info(inode);
382         ENTRY;
383
384         LASSERT(!LUSTRE_FPRIVATE(file));
385
386         LASSERT(fd != NULL);
387
388         if (och) {
389                 struct ptlrpc_request *req = it->d.lustre.it_data;
390                 struct mdt_body *body;
391                 int rc;
392
393                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
394                 if (rc)
395                         RETURN(rc);
396
397                 body = lustre_msg_buf(req->rq_repmsg,
398                                       DLM_REPLY_REC_OFF, sizeof(*body));
399
400                 if ((it->it_flags & FMODE_WRITE) &&
401                     (body->valid & OBD_MD_FLSIZE))
402                 {
403                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
404                                lli->lli_ioepoch, PFID(&lli->lli_fid));
405                 }
406         }
407
408         LUSTRE_FPRIVATE(file) = fd;
409         ll_readahead_init(inode, &fd->fd_ras);
410         fd->fd_omode = it->it_flags;
411         RETURN(0);
412 }
413
414 /* Open a file, and (for the very first open) create objects on the OSTs at
415  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
416  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
417  * lli_open_sem to ensure no other process will create objects, send the
418  * stripe MD to the MDS, or try to destroy the objects if that fails.
419  *
420  * If we already have the stripe MD locally then we don't request it in
421  * md_open(), by passing a lmm_size = 0.
422  *
423  * It is up to the application to ensure no other processes open this file
424  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
425  * used.  We might be able to avoid races of that sort by getting lli_open_sem
426  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
427  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
428  */
429 int ll_file_open(struct inode *inode, struct file *file)
430 {
431         struct ll_inode_info *lli = ll_i2info(inode);
432         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
433                                           .it_flags = file->f_flags };
434         struct lov_stripe_md *lsm;
435         struct ptlrpc_request *req = NULL;
436         struct obd_client_handle **och_p;
437         __u64 *och_usecount;
438         struct ll_file_data *fd;
439         int rc = 0;
440         ENTRY;
441
442         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
443                inode->i_generation, inode, file->f_flags);
444
445 #ifdef LUSTRE_KERNEL_VERSION
446         it = file->f_it;
447 #else
448         it = file->private_data; /* XXX: compat macro */
449         file->private_data = NULL; /* prevent ll_local_open assertion */
450 #endif
451
452         fd = ll_file_data_get();
453         if (fd == NULL)
454                 RETURN(-ENOMEM);
455
456         /* don't do anything for / */
457         if (inode->i_sb->s_root == file->f_dentry) {
458                 LUSTRE_FPRIVATE(file) = fd;
459                 RETURN(0);
460         }
461
462         if (!it || !it->d.lustre.it_disposition) {
463                 /* Convert f_flags into access mode. We cannot use file->f_mode,
464                  * because everything but O_ACCMODE mask was stripped from
465                  * there */
466                 if ((oit.it_flags + 1) & O_ACCMODE)
467                         oit.it_flags++;
468                 if (oit.it_flags & O_TRUNC)
469                         oit.it_flags |= FMODE_WRITE;
470
471                 if (oit.it_flags & O_CREAT)
472                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
473
474                 /* NFS hack - some strange NFS clients create files with zero
475                  * permission bits, and then expect to be able to open such
476                  * files. We are relying on real VFS client to do ll_permission
477                  * first before coming here, so if we got here, we either came
478                  * from NFS or all access checks ar eok, so it is safe to set
479                  * this flag in any case (XXX - race with chmod?)
480                  */
481                 oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
482
483                 /* We do not want O_EXCL here, presumably we opened the file
484                  * already? XXX - NFS implications? */
485                 oit.it_flags &= ~O_EXCL;
486
487                 it = &oit;
488         }
489
490         /* Let's see if we have file open on MDS already. */
491         if (it->it_flags & FMODE_WRITE) {
492                 och_p = &lli->lli_mds_write_och;
493                 och_usecount = &lli->lli_open_fd_write_count;
494         } else if (it->it_flags & FMODE_EXEC) {
495                 och_p = &lli->lli_mds_exec_och;
496                 och_usecount = &lli->lli_open_fd_exec_count;
497          } else {
498                 och_p = &lli->lli_mds_read_och;
499                 och_usecount = &lli->lli_open_fd_read_count;
500         }
501         down(&lli->lli_och_sem);
502         if (*och_p) { /* Open handle is present */
503                 if (it_disposition(it, DISP_OPEN_OPEN)) {
504                         /* Well, there's extra open request that we do not need,
505                            let's close it somehow. This will decref request. */
506                         ll_release_openhandle(file->f_dentry, it);
507                 }
508                 (*och_usecount)++;
509
510                 rc = ll_local_open(file, it, fd, NULL);
511                 if (rc) {
512                         up(&lli->lli_och_sem);
513                         ll_file_data_put(fd);
514                         RETURN(rc);
515                 }
516         } else {
517                 LASSERT(*och_usecount == 0);
518                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
519                 if (!*och_p) {
520                         ll_file_data_put(fd);
521                         GOTO(out_och_free, rc = -ENOMEM);
522                 }
523                 (*och_usecount)++;
524                 if (!it->d.lustre.it_disposition) {
525                         rc = ll_intent_file_open(file, NULL, 0, it);
526                         if (rc) {
527                                 ll_file_data_put(fd);
528                                 GOTO(out_och_free, rc);
529                         }
530
531                         /* Got some error? Release the request */
532                         if (it->d.lustre.it_status < 0) {
533                                 req = it->d.lustre.it_data;
534                                 ptlrpc_req_finished(req);
535                         }
536                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
537                                          &it->d.lustre.it_lock_handle,
538                                          file->f_dentry->d_inode);
539                 }
540                 req = it->d.lustre.it_data;
541
542                 /* md_intent_lock() didn't get a request ref if there was an
543                  * open error, so don't do cleanup on the request here
544                  * (bug 3430) */
545                 /* XXX (green): Should not we bail out on any error here, not
546                  * just open error? */
547                 rc = it_open_error(DISP_OPEN_OPEN, it);
548                 if (rc) {
549                         ll_file_data_put(fd);
550                         GOTO(out_och_free, rc);
551                 }
552
553                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
554                 rc = ll_local_open(file, it, fd, *och_p);
555                 if (rc) {
556                         up(&lli->lli_och_sem);
557                         ll_file_data_put(fd);
558                         GOTO(out_och_free, rc);
559                 }
560         }
561         up(&lli->lli_och_sem);
562
563         /* Must do this outside lli_och_sem lock to prevent deadlock where
564            different kind of OPEN lock for this same inode gets cancelled
565            by ldlm_cancel_lru */
566         if (!S_ISREG(inode->i_mode))
567                 GOTO(out, rc);
568
569         ll_capa_open(inode);
570
571         lsm = lli->lli_smd;
572         if (lsm == NULL) {
573                 if (file->f_flags & O_LOV_DELAY_CREATE ||
574                     !(file->f_mode & FMODE_WRITE)) {
575                         CDEBUG(D_INODE, "object creation was delayed\n");
576                         GOTO(out, rc);
577                 }
578         }
579         file->f_flags &= ~O_LOV_DELAY_CREATE;
580         GOTO(out, rc);
581 out:
582         ptlrpc_req_finished(req);
583         if (req)
584                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
585 out_och_free:
586         if (rc) {
587                 if (*och_p) {
588                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
589                         *och_p = NULL; /* OBD_FREE writes some magic there */
590                         (*och_usecount)--;
591                 }
592                 up(&lli->lli_och_sem);
593         }
594
595         return rc;
596 }
597
598 /* Fills the obdo with the attributes for the inode defined by lsm */
599 int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
600 {
601         struct ptlrpc_request_set *set;
602         struct ll_inode_info *lli = ll_i2info(inode);
603         struct lov_stripe_md *lsm = lli->lli_smd;
604
605         struct obd_info oinfo = { { { 0 } } };
606         int rc;
607         ENTRY;
608
609         LASSERT(lsm != NULL);
610
611         oinfo.oi_md = lsm;
612         oinfo.oi_oa = obdo;
613         oinfo.oi_oa->o_id = lsm->lsm_object_id;
614         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
615         oinfo.oi_oa->o_mode = S_IFREG;
616         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
617                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
618                                OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
619                                OBD_MD_FLCTIME | OBD_MD_FLGROUP;
620         oinfo.oi_capa = ll_mdscapa_get(inode);
621
622         set = ptlrpc_prep_set();
623         if (set == NULL) {
624                 CERROR("can't allocate ptlrpc set\n");
625                 rc = -ENOMEM;
626         } else {
627                 rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
628                 if (rc == 0)
629                         rc = ptlrpc_set_wait(set);
630                 ptlrpc_set_destroy(set);
631         }
632         capa_put(oinfo.oi_capa);
633         if (rc)
634                 RETURN(rc);
635
636         oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
637                                  OBD_MD_FLMTIME | OBD_MD_FLCTIME |
638                                  OBD_MD_FLSIZE);
639
640         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
641         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
642                lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks,
643                inode->i_blksize);
644         RETURN(0);
645 }
646
647 static inline void ll_remove_suid(struct inode *inode)
648 {
649         unsigned int mode;
650
651         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
652         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
653
654         /* was any of the uid bits set? */
655         mode &= inode->i_mode;
656         if (mode && !capable(CAP_FSETID)) {
657                 inode->i_mode &= ~mode;
658                 // XXX careful here - we cannot change the size
659         }
660 }
661
662 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
663 {
664         struct ll_inode_info *lli = ll_i2info(inode);
665         struct lov_stripe_md *lsm = lli->lli_smd;
666         struct obd_export *exp = ll_i2dtexp(inode);
667         struct {
668                 char name[16];
669                 struct ldlm_lock *lock;
670                 struct lov_stripe_md *lsm;
671         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
672         __u32 stripe, vallen = sizeof(stripe);
673         int rc;
674         ENTRY;
675
676         if (lsm->lsm_stripe_count == 1)
677                 GOTO(check, stripe = 0);
678
679         /* get our offset in the lov */
680         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
681         if (rc != 0) {
682                 CERROR("obd_get_info: rc = %d\n", rc);
683                 RETURN(rc);
684         }
685         LASSERT(stripe < lsm->lsm_stripe_count);
686
687 check:
688         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
689             lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){
690                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
691                            lsm->lsm_oinfo[stripe].loi_id,
692                            lsm->lsm_oinfo[stripe].loi_gr);
693                 RETURN(-ELDLM_NO_LOCK_DATA);
694         }
695
696         RETURN(stripe);
697 }
698
699 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
700  * we get a lock cancellation for each stripe, so we have to map the obd's
701  * region back onto the stripes in the file that it held.
702  *
703  * No one can dirty the extent until we've finished our work and they can
704  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
705  * but other kernel actors could have pages locked.
706  *
707  * Called with the DLM lock held. */
708 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
709                               struct ldlm_lock *lock, __u32 stripe)
710 {
711         ldlm_policy_data_t tmpex;
712         unsigned long start, end, count, skip, i, j;
713         struct page *page;
714         int rc, rc2, l_flags, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
715         struct lustre_handle lockh;
716         ENTRY;
717
718         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
719         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
720                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
721                inode->i_size);
722
723         /* our locks are page granular thanks to osc_enqueue, we invalidate the
724          * whole page. */
725         if ((tmpex.l_extent.start & ~PAGE_CACHE_MASK) != 0 ||
726             ((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) != 0)
727                 LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",PAGE_SIZE);
728         LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
729         LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
730
731         count = ~0;
732         skip = 0;
733         start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
734         end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
735         if (lsm->lsm_stripe_count > 1) {
736                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
737                 skip = (lsm->lsm_stripe_count - 1) * count;
738                 start += start/count * skip + stripe * count;
739                 if (end != ~0)
740                         end += end/count * skip + stripe * count;
741         }
742         if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
743                 end = ~0;
744
745         i = inode->i_size ? (inode->i_size - 1) >> PAGE_CACHE_SHIFT : 0;
746         if (i < end)
747                 end = i;
748
749         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
750                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
751                count, skip, end, discard ? " (DISCARDING)" : "");
752
753         /* walk through the vmas on the inode and tear down mmaped pages that
754          * intersect with the lock.  this stops immediately if there are no
755          * mmap()ed regions of the file.  This is not efficient at all and
756          * should be short lived. We'll associate mmap()ed pages with the lock
757          * and will be able to find them directly */
758         for (i = start; i <= end; i += (j + skip)) {
759                 j = min(count - (i % count), end - i + 1);
760                 LASSERT(j > 0);
761                 LASSERT(inode->i_mapping);
762                 if (ll_teardown_mmaps(inode->i_mapping,
763                                       (__u64)i << PAGE_CACHE_SHIFT,
764                                       ((__u64)(i+j) << PAGE_CACHE_SHIFT) - 1) )
765                         break;
766         }
767
768         /* this is the simplistic implementation of page eviction at
769          * cancelation.  It is careful to get races with other page
770          * lockers handled correctly.  fixes from bug 20 will make it
771          * more efficient by associating locks with pages and with
772          * batching writeback under the lock explicitly. */
773         for (i = start, j = start % count; i <= end;
774              j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
775                 if (j == count) {
776                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
777                         i += skip;
778                         j = 0;
779                         if (i > end)
780                                 break;
781                 }
782                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
783                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
784                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
785                          start, i, end);
786
787                 if (!mapping_has_pages(inode->i_mapping)) {
788                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
789                         break;
790                 }
791
792                 cond_resched();
793
794                 page = find_get_page(inode->i_mapping, i);
795                 if (page == NULL)
796                         continue;
797                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
798                                i, tmpex.l_extent.start);
799                 lock_page(page);
800
801                 /* page->mapping to check with racing against teardown */
802                 if (!discard && clear_page_dirty_for_io(page)) {
803                         rc = ll_call_writepage(inode, page);
804                         if (rc != 0)
805                                 CERROR("writepage of page %p failed: %d\n",
806                                        page, rc);
807                         /* either waiting for io to complete or reacquiring
808                          * the lock that the failed writepage released */
809                         lock_page(page);
810                 }
811
812                 tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
813                 l_flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
814                 /* check to see if another DLM lock covers this page b=2765 */
815                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
816                                       l_flags, &lock->l_resource->lr_name,
817                                       LDLM_EXTENT, &tmpex, LCK_PR | LCK_PW, &lockh);
818
819                 if (rc2 <= 0 && page->mapping != NULL) {
820                         struct ll_async_page *llap = llap_cast_private(page);
821                         // checking again to account for writeback's lock_page()
822                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
823                         if (llap)
824                                 ll_ra_accounting(llap, inode->i_mapping);
825                         ll_truncate_complete_page(page);
826                 }
827                 unlock_page(page);
828                 page_cache_release(page);
829         }
830         LASSERTF(tmpex.l_extent.start <=
831                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
832                   lock->l_policy_data.l_extent.end + 1),
833                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
834                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
835                  start, i, end);
836         EXIT;
837 }
838
839 static int ll_extent_lock_callback(struct ldlm_lock *lock,
840                                    struct ldlm_lock_desc *new, void *data,
841                                    int flag)
842 {
843         struct lustre_handle lockh = { 0 };
844         int rc;
845         ENTRY;
846
847         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
848                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
849                 LBUG();
850         }
851
852         switch (flag) {
853         case LDLM_CB_BLOCKING:
854                 ldlm_lock2handle(lock, &lockh);
855                 rc = ldlm_cli_cancel(&lockh);
856                 if (rc != ELDLM_OK)
857                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
858                 break;
859         case LDLM_CB_CANCELING: {
860                 struct inode *inode;
861                 struct ll_inode_info *lli;
862                 struct lov_stripe_md *lsm;
863                 int stripe;
864                 __u64 kms;
865
866                 /* This lock wasn't granted, don't try to evict pages */
867                 if (lock->l_req_mode != lock->l_granted_mode)
868                         RETURN(0);
869
870                 inode = ll_inode_from_lock(lock);
871                 if (inode == NULL)
872                         RETURN(0);
873                 lli = ll_i2info(inode);
874                 if (lli == NULL)
875                         goto iput;
876                 if (lli->lli_smd == NULL)
877                         goto iput;
878                 lsm = lli->lli_smd;
879
880                 stripe = ll_lock_to_stripe_offset(inode, lock);
881                 if (stripe < 0)
882                         goto iput;
883
884                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
885
886                 lov_stripe_lock(lsm);
887                 lock_res_and_lock(lock);
888                 kms = ldlm_extent_shift_kms(lock,
889                                             lsm->lsm_oinfo[stripe].loi_kms);
890
891                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
892                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
893                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
894                 lsm->lsm_oinfo[stripe].loi_kms = kms;
895                 unlock_res_and_lock(lock);
896                 lov_stripe_unlock(lsm);
897         iput:
898                 iput(inode);
899                 break;
900         }
901         default:
902                 LBUG();
903         }
904
905         RETURN(0);
906 }
907
908 #if 0
909 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
910 {
911         /* XXX ALLOCATE - 160 bytes */
912         struct inode *inode = ll_inode_from_lock(lock);
913         struct ll_inode_info *lli = ll_i2info(inode);
914         struct lustre_handle lockh = { 0 };
915         struct ost_lvb *lvb;
916         int stripe;
917         ENTRY;
918
919         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
920                      LDLM_FL_BLOCK_CONV)) {
921                 LBUG(); /* not expecting any blocked async locks yet */
922                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
923                            "lock, returning");
924                 ldlm_lock_dump(D_OTHER, lock, 0);
925                 ldlm_reprocess_all(lock->l_resource);
926                 RETURN(0);
927         }
928
929         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
930
931         stripe = ll_lock_to_stripe_offset(inode, lock);
932         if (stripe < 0)
933                 goto iput;
934
935         if (lock->l_lvb_len) {
936                 struct lov_stripe_md *lsm = lli->lli_smd;
937                 __u64 kms;
938                 lvb = lock->l_lvb_data;
939                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
940
941                 LOCK_INODE_MUTEX(inode);
942                 lock_res_and_lock(lock);
943                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
944                 kms = ldlm_extent_shift_kms(NULL, kms);
945                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
946                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
947                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
948                 lsm->lsm_oinfo[stripe].loi_kms = kms;
949                 unlock_res_and_lock(lock);
950                 UNLOCK_INODE_MUTEX(inode);
951         }
952
953 iput:
954         iput(inode);
955         wake_up(&lock->l_waitq);
956
957         ldlm_lock2handle(lock, &lockh);
958         ldlm_lock_decref(&lockh, LCK_PR);
959         RETURN(0);
960 }
961 #endif
962
963 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
964 {
965         struct ptlrpc_request *req = reqp;
966         struct inode *inode = ll_inode_from_lock(lock);
967         struct ll_inode_info *lli;
968         struct lov_stripe_md *lsm;
969         struct ost_lvb *lvb;
970         int rc, stripe;
971         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
972         ENTRY;
973
974         if (inode == NULL)
975                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
976         lli = ll_i2info(inode);
977         if (lli == NULL)
978                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
979         lsm = lli->lli_smd;
980         if (lsm == NULL)
981                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
982
983         /* First, find out which stripe index this lock corresponds to. */
984         stripe = ll_lock_to_stripe_offset(inode, lock);
985         if (stripe < 0)
986                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
987
988         rc = lustre_pack_reply(req, 2, size, NULL);
989         if (rc) {
990                 CERROR("lustre_pack_reply: %d\n", rc);
991                 GOTO(iput, rc);
992         }
993
994         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
995         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
996         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
997         lvb->lvb_atime = LTIME_S(inode->i_atime);
998         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
999
1000         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
1001                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
1002                    inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
1003                    lvb->lvb_atime, lvb->lvb_ctime);
1004  iput:
1005         iput(inode);
1006
1007  out:
1008         /* These errors are normal races, so we don't want to fill the console
1009          * with messages by calling ptlrpc_error() */
1010         if (rc == -ELDLM_NO_LOCK_DATA)
1011                 lustre_pack_reply(req, 1, NULL, NULL);
1012
1013         req->rq_status = rc;
1014         return rc;
1015 }
1016
1017 static void ll_merge_lvb(struct inode *inode)
1018 {
1019         struct ll_inode_info *lli = ll_i2info(inode);
1020         struct ll_sb_info *sbi = ll_i2sbi(inode);
1021         struct ost_lvb lvb;
1022         ENTRY;
1023
1024         ll_inode_size_lock(inode, 1);
1025         inode_init_lvb(inode, &lvb);
1026         obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
1027         inode->i_size = lvb.lvb_size;
1028         inode->i_blocks = lvb.lvb_blocks;
1029         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1030         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1031         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1032         ll_inode_size_unlock(inode, 1);
1033         EXIT;
1034 }
1035
1036 int ll_local_size(struct inode *inode)
1037 {
1038         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
1039         struct ll_inode_info *lli = ll_i2info(inode);
1040         struct ll_sb_info *sbi = ll_i2sbi(inode);
1041         struct lustre_handle lockh = { 0 };
1042         int flags = 0;
1043         int rc;
1044         ENTRY;
1045
1046         if (lli->lli_smd->lsm_stripe_count == 0)
1047                 RETURN(0);
1048
1049         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
1050                        &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
1051         if (rc < 0)
1052                 RETURN(rc);
1053         else if (rc == 0)
1054                 RETURN(-ENODATA);
1055
1056         ll_merge_lvb(inode);
1057         obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
1058         RETURN(0);
1059 }
1060
1061 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1062                      lstat_t *st)
1063 {
1064         struct lustre_handle lockh = { 0 };
1065         struct obd_enqueue_info einfo = { 0 };
1066         struct obd_info oinfo = { { { 0 } } };
1067         struct ost_lvb lvb;
1068         int rc;
1069
1070         ENTRY;
1071
1072         einfo.ei_type = LDLM_EXTENT;
1073         einfo.ei_mode = LCK_PR;
1074         einfo.ei_flags = LDLM_FL_HAS_INTENT;
1075         einfo.ei_cb_bl = ll_extent_lock_callback;
1076         einfo.ei_cb_cp = ldlm_completion_ast;
1077         einfo.ei_cb_gl = ll_glimpse_callback;
1078         einfo.ei_cbdata = NULL;
1079
1080         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1081         oinfo.oi_lockh = &lockh;
1082         oinfo.oi_md = lsm;
1083
1084         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1085         if (rc == -ENOENT)
1086                 RETURN(rc);
1087         if (rc != 0) {
1088                 CERROR("obd_enqueue returned rc %d, "
1089                        "returning -EIO\n", rc);
1090                 RETURN(rc > 0 ? -EIO : rc);
1091         }
1092
1093         lov_stripe_lock(lsm);
1094         memset(&lvb, 0, sizeof(lvb));
1095         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1096         st->st_size = lvb.lvb_size;
1097         st->st_blocks = lvb.lvb_blocks;
1098         st->st_mtime = lvb.lvb_mtime;
1099         st->st_atime = lvb.lvb_atime;
1100         st->st_ctime = lvb.lvb_ctime;
1101         lov_stripe_unlock(lsm);
1102
1103         RETURN(rc);
1104 }
1105
1106 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
1107  * file (because it prefers KMS over RSS when larger) */
1108 int ll_glimpse_size(struct inode *inode, int ast_flags)
1109 {
1110         struct ll_inode_info *lli = ll_i2info(inode);
1111         struct ll_sb_info *sbi = ll_i2sbi(inode);
1112         struct lustre_handle lockh = { 0 };
1113         struct obd_enqueue_info einfo = { 0 };
1114         struct obd_info oinfo = { { { 0 } } };
1115         int rc;
1116         ENTRY;
1117
1118         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
1119                 RETURN(0);
1120
1121         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1122
1123         if (!lli->lli_smd) {
1124                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1125                 RETURN(0);
1126         }
1127
1128         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1129          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1130          *       won't revoke any conflicting DLM locks held. Instead,
1131          *       ll_glimpse_callback() will be called on each client
1132          *       holding a DLM lock against this file, and resulting size
1133          *       will be returned for each stripe. DLM lock on [0, EOF] is
1134          *       acquired only if there were no conflicting locks. */
1135         einfo.ei_type = LDLM_EXTENT;
1136         einfo.ei_mode = LCK_PR;
1137         einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
1138         einfo.ei_cb_bl = ll_extent_lock_callback;
1139         einfo.ei_cb_cp = ldlm_completion_ast;
1140         einfo.ei_cb_gl = ll_glimpse_callback;
1141         einfo.ei_cbdata = inode;
1142
1143         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1144         oinfo.oi_lockh = &lockh;
1145         oinfo.oi_md = lli->lli_smd;
1146
1147         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
1148         if (rc == -ENOENT)
1149                 RETURN(rc);
1150         if (rc != 0) {
1151                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1152                 RETURN(rc > 0 ? -EIO : rc);
1153         }
1154
1155         ll_merge_lvb(inode);
1156
1157         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
1158                inode->i_size, inode->i_blocks);
1159
1160         RETURN(rc);
1161 }
1162
1163 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1164                    struct lov_stripe_md *lsm, int mode,
1165                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1166                    int ast_flags)
1167 {
1168         struct ll_sb_info *sbi = ll_i2sbi(inode);
1169         struct ost_lvb lvb;
1170         struct obd_enqueue_info einfo = { 0 };
1171         struct obd_info oinfo = { { { 0 } } };
1172         int rc;
1173         ENTRY;
1174
1175         LASSERT(!lustre_handle_is_used(lockh));
1176         LASSERT(lsm != NULL);
1177
1178         /* don't drop the mmapped file to LRU */
1179         if (mapping_mapped(inode->i_mapping))
1180                 ast_flags |= LDLM_FL_NO_LRU;
1181
1182         /* XXX phil: can we do this?  won't it screw the file size up? */
1183         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1184             (sbi->ll_flags & LL_SBI_NOLCK))
1185                 RETURN(0);
1186
1187         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1188                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1189
1190         einfo.ei_type = LDLM_EXTENT;
1191         einfo.ei_mode = mode;
1192         einfo.ei_flags = ast_flags;
1193         einfo.ei_cb_bl = ll_extent_lock_callback;
1194         einfo.ei_cb_cp = ldlm_completion_ast;
1195         einfo.ei_cb_gl = ll_glimpse_callback;
1196         einfo.ei_cbdata = inode;
1197
1198         oinfo.oi_policy = *policy;
1199         oinfo.oi_lockh = lockh;
1200         oinfo.oi_md = lsm;
1201
1202         rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
1203         *policy = oinfo.oi_policy;
1204         if (rc > 0)
1205                 rc = -EIO;
1206
1207         ll_inode_size_lock(inode, 1);
1208         inode_init_lvb(inode, &lvb);
1209         obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
1210
1211         if (policy->l_extent.start == 0 &&
1212             policy->l_extent.end == OBD_OBJECT_EOF) {
1213                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1214                  * the kms under both a DLM lock and the
1215                  * ll_inode_size_lock().  If we don't get the
1216                  * ll_inode_size_lock() here we can match the DLM lock and
1217                  * reset i_size from the kms before the truncating path has
1218                  * updated the kms.  generic_file_write can then trust the
1219                  * stale i_size when doing appending writes and effectively
1220                  * cancel the result of the truncate.  Getting the
1221                  * ll_inode_size_lock() after the enqueue maintains the DLM
1222                  * -> ll_inode_size_lock() acquiring order. */
1223                 inode->i_size = lvb.lvb_size;
1224         }
1225
1226         if (rc == 0) {
1227                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1228                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1229                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1230         }
1231         ll_inode_size_unlock(inode, 1);
1232
1233         RETURN(rc);
1234 }
1235
1236 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1237                      struct lov_stripe_md *lsm, int mode,
1238                      struct lustre_handle *lockh)
1239 {
1240         struct ll_sb_info *sbi = ll_i2sbi(inode);
1241         int rc;
1242         ENTRY;
1243
1244         /* XXX phil: can we do this?  won't it screw the file size up? */
1245         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1246             (sbi->ll_flags & LL_SBI_NOLCK))
1247                 RETURN(0);
1248
1249         rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
1250
1251         RETURN(rc);
1252 }
1253
1254 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1255                             loff_t *ppos)
1256 {
1257         struct inode *inode = file->f_dentry->d_inode;
1258         struct ll_inode_info *lli = ll_i2info(inode);
1259         struct lov_stripe_md *lsm = lli->lli_smd;
1260         struct ll_sb_info *sbi = ll_i2sbi(inode);
1261         struct ll_lock_tree tree;
1262         struct ll_lock_tree_node *node;
1263         struct ost_lvb lvb;
1264         struct ll_ra_read bead;
1265         int rc, ra = 0;
1266         loff_t end;
1267         ssize_t retval, chunk, sum = 0;
1268
1269         __u64 kms;
1270         ENTRY;
1271         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1272                inode->i_ino, inode->i_generation, inode, count, *ppos);
1273
1274         /* "If nbyte is 0, read() will return 0 and have no other results."
1275          *                      -- Single Unix Spec */
1276         if (count == 0)
1277                 RETURN(0);
1278
1279         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1280                             count);
1281
1282         if (!lsm) {
1283                 /* Read on file with no objects should return zero-filled
1284                  * buffers up to file size (we can get non-zero sizes with
1285                  * mknod + truncate, then opening file for read. This is a
1286                  * common pattern in NFS case, it seems). Bug 6243 */
1287                 int notzeroed;
1288                 /* Since there are no objects on OSTs, we have nothing to get
1289                  * lock on and so we are forced to access inode->i_size
1290                  * unguarded */
1291
1292                 /* Read beyond end of file */
1293                 if (*ppos >= inode->i_size)
1294                         RETURN(0);
1295
1296                 if (count > inode->i_size - *ppos)
1297                         count = inode->i_size - *ppos;
1298                 /* Make sure to correctly adjust the file pos pointer for
1299                  * EFAULT case */
1300                 notzeroed = clear_user(buf, count);
1301                 count -= notzeroed;
1302                 *ppos += count;
1303                 if (!count)
1304                         RETURN(-EFAULT);
1305                 RETURN(count);
1306         }
1307
1308 repeat:
1309         if (sbi->ll_max_rw_chunk != 0) {
1310                 /* first, let's know the end of the current stripe */
1311                 end = *ppos;
1312                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1313
1314                 /* correct, the end is beyond the request */
1315                 if (end > *ppos + count - 1)
1316                         end = *ppos + count - 1;
1317
1318                 /* and chunk shouldn't be too large even if striping is wide */
1319                 if (end - *ppos > sbi->ll_max_rw_chunk)
1320                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1321         } else {
1322                 end = *ppos + count - 1;
1323         }
1324
1325         node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
1326         tree.lt_fd = LUSTRE_FPRIVATE(file);
1327         rc = ll_tree_lock(&tree, node, buf, count,
1328                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1329         if (rc != 0)
1330                 GOTO(out, retval = rc);
1331
1332         ll_inode_size_lock(inode, 1);
1333         /*
1334          * Consistency guarantees: following possibilities exist for the
1335          * relation between region being read and real file size at this
1336          * moment:
1337          *
1338          *  (A): the region is completely inside of the file;
1339          *
1340          *  (B-x): x bytes of region are inside of the file, the rest is
1341          *  outside;
1342          *
1343          *  (C): the region is completely outside of the file.
1344          *
1345          * This classification is stable under DLM lock acquired by
1346          * ll_tree_lock() above, because to change class, other client has to
1347          * take DLM lock conflicting with our lock. Also, any updates to
1348          * ->i_size by other threads on this client are serialized by
1349          * ll_inode_size_lock(). This guarantees that short reads are handled
1350          * correctly in the face of concurrent writes and truncates.
1351          */
1352         inode_init_lvb(inode, &lvb);
1353         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1354         kms = lvb.lvb_size;
1355         if (*ppos + count - 1 > kms) {
1356                 /* A glimpse is necessary to determine whether we return a
1357                  * short read (B) or some zeroes at the end of the buffer (C) */
1358                 ll_inode_size_unlock(inode, 1);
1359                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1360                 if (retval) {
1361                         ll_tree_unlock(&tree);
1362                         goto out;
1363                 }
1364         } else {
1365                 /* region is within kms and, hence, within real file size (A) */
1366                 inode->i_size = kms;
1367                 ll_inode_size_unlock(inode, 1);
1368         }
1369
1370         chunk = end - *ppos + 1;
1371         CDEBUG(D_VFSTRACE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1372                inode->i_ino, chunk, *ppos, inode->i_size);
1373
1374         /* turn off the kernel's read-ahead */
1375 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1376         file->f_ramax = 0;
1377 #else
1378         file->f_ra.ra_pages = 0;
1379 #endif
1380         /* initialize read-ahead window once per syscall */
1381         if (ra == 0) {
1382                 ra = 1;
1383                 bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1384                 bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1385                 ll_ra_read_in(file, &bead);
1386         }
1387
1388         /* BUG: 5972 */
1389         file_accessed(file);
1390         retval = generic_file_read(file, buf, chunk, ppos);
1391         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 0);
1392
1393         ll_tree_unlock(&tree);
1394
1395         if (retval > 0) {
1396                 buf += retval;
1397                 count -= retval;
1398                 sum += retval;
1399                 if (retval == chunk && count > 0)
1400                         goto repeat;
1401         }
1402
1403  out:
1404         if (ra != 0)
1405                 ll_ra_read_ex(file, &bead);
1406         retval = (sum > 0) ? sum : retval;
1407         RETURN(retval);
1408 }
1409
1410 /*
1411  * Write to a file (through the page cache).
1412  */
1413 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1414                              loff_t *ppos)
1415 {
1416         struct inode *inode = file->f_dentry->d_inode;
1417         struct ll_sb_info *sbi = ll_i2sbi(inode);
1418         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1419         struct ll_lock_tree tree;
1420         struct ll_lock_tree_node *node;
1421         loff_t maxbytes = ll_file_maxbytes(inode);
1422         loff_t lock_start, lock_end, end;
1423         ssize_t retval, chunk, sum = 0;
1424         int rc;
1425         ENTRY;
1426         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1427                inode->i_ino, inode->i_generation, inode, count, *ppos);
1428
1429         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1430
1431         /* POSIX, but surprised the VFS doesn't check this already */
1432         if (count == 0)
1433                 RETURN(0);
1434
1435         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1436          * called on the file, don't fail the below assertion (bug 2388). */
1437         if (file->f_flags & O_LOV_DELAY_CREATE &&
1438             ll_i2info(inode)->lli_smd == NULL)
1439                 RETURN(-EBADF);
1440
1441         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1442
1443         down(&ll_i2info(inode)->lli_write_sem);
1444
1445 repeat:
1446         chunk = 0; /* just to fix gcc's warning */
1447         end = *ppos + count - 1;
1448
1449         if (file->f_flags & O_APPEND) {
1450                 lock_start = 0;
1451                 lock_end = OBD_OBJECT_EOF;
1452         } else if (sbi->ll_max_rw_chunk != 0) {
1453                 /* first, let's know the end of the current stripe */
1454                 end = *ppos;
1455                 obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
1456
1457                 /* correct, the end is beyond the request */
1458                 if (end > *ppos + count - 1)
1459                         end = *ppos + count - 1;
1460
1461                 /* and chunk shouldn't be too large even if striping is wide */
1462                 if (end - *ppos > sbi->ll_max_rw_chunk)
1463                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1464                 lock_start = *ppos;
1465                 lock_end = end;
1466         } else {
1467                 lock_start = *ppos;
1468                 lock_end = *ppos + count - 1;
1469         }
1470         node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
1471
1472         if (IS_ERR(node))
1473                 GOTO(out, retval = PTR_ERR(node));
1474
1475         tree.lt_fd = LUSTRE_FPRIVATE(file);
1476         rc = ll_tree_lock(&tree, node, buf, count,
1477                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
1478         if (rc != 0)
1479                 GOTO(out, retval = rc);
1480
1481         /* this is ok, g_f_w will overwrite this under i_mutex if it races
1482          * with a local truncate, it just makes our maxbyte checking easier */
1483         if (file->f_flags & O_APPEND) {
1484                 *ppos = inode->i_size;
1485                 end = *ppos + count - 1;
1486         }
1487
1488         if (*ppos >= maxbytes) {
1489                 send_sig(SIGXFSZ, current, 0);
1490                 GOTO(out, retval = -EFBIG);
1491         }
1492         if (*ppos + count > maxbytes)
1493                 count = maxbytes - *ppos;
1494
1495         /* generic_file_write handles O_APPEND after getting i_mutex */
1496         chunk = end - *ppos + 1;
1497         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1498                inode->i_ino, chunk, *ppos);
1499         retval = generic_file_write(file, buf, chunk, ppos);
1500         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
1501
1502 out:
1503         ll_tree_unlock(&tree);
1504
1505         if (retval > 0) {
1506                 buf += retval;
1507                 count -= retval;
1508                 sum += retval;
1509                 if (retval == chunk && count > 0)
1510                         goto repeat;
1511         }
1512
1513         up(&ll_i2info(inode)->lli_write_sem);
1514
1515         retval = (sum > 0) ? sum : retval;
1516         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
1517                             retval > 0 ? retval : 0);
1518         RETURN(retval);
1519 }
1520
1521 /*
1522  * Send file content (through pagecache) somewhere with helper
1523  */
1524 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1525 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1526                                 read_actor_t actor, void *target)
1527 {
1528         struct inode *inode = in_file->f_dentry->d_inode;
1529         struct ll_inode_info *lli = ll_i2info(inode);
1530         struct lov_stripe_md *lsm = lli->lli_smd;
1531         struct ll_lock_tree tree;
1532         struct ll_lock_tree_node *node;
1533         struct ost_lvb lvb;
1534         struct ll_ra_read bead;
1535         int rc;
1536         ssize_t retval;
1537         __u64 kms;
1538         ENTRY;
1539         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1540                inode->i_ino, inode->i_generation, inode, count, *ppos);
1541
1542         /* "If nbyte is 0, read() will return 0 and have no other results."
1543          *                      -- Single Unix Spec */
1544         if (count == 0)
1545                 RETURN(0);
1546
1547         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
1548                             count);
1549
1550         /* File with no objects, nothing to lock */
1551         if (!lsm)
1552                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1553
1554         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1555         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1556         rc = ll_tree_lock(&tree, node, NULL, count,
1557                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1558         if (rc != 0)
1559                 RETURN(rc);
1560
1561         ll_inode_size_lock(inode, 1);
1562         /*
1563          * Consistency guarantees: following possibilities exist for the
1564          * relation between region being read and real file size at this
1565          * moment:
1566          *
1567          *  (A): the region is completely inside of the file;
1568          *
1569          *  (B-x): x bytes of region are inside of the file, the rest is
1570          *  outside;
1571          *
1572          *  (C): the region is completely outside of the file.
1573          *
1574          * This classification is stable under DLM lock acquired by
1575          * ll_tree_lock() above, because to change class, other client has to
1576          * take DLM lock conflicting with our lock. Also, any updates to
1577          * ->i_size by other threads on this client are serialized by
1578          * ll_inode_size_lock(). This guarantees that short reads are handled
1579          * correctly in the face of concurrent writes and truncates.
1580          */
1581         inode_init_lvb(inode, &lvb);
1582         obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
1583         kms = lvb.lvb_size;
1584         if (*ppos + count - 1 > kms) {
1585                 /* A glimpse is necessary to determine whether we return a
1586                  * short read (B) or some zeroes at the end of the buffer (C) */
1587                 ll_inode_size_unlock(inode, 1);
1588                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1589                 if (retval)
1590                         goto out;
1591         } else {
1592                 /* region is within kms and, hence, within real file size (A) */
1593                 inode->i_size = kms;
1594                 ll_inode_size_unlock(inode, 1);
1595         }
1596
1597         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1598                inode->i_ino, count, *ppos, inode->i_size);
1599
1600         /* turn off the kernel's read-ahead */
1601         in_file->f_ra.ra_pages = 0;
1602
1603         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1604         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1605         ll_ra_read_in(in_file, &bead);
1606         /* BUG: 5972 */
1607         file_accessed(in_file);
1608         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1609         ll_ra_read_ex(in_file, &bead);
1610
1611  out:
1612         ll_tree_unlock(&tree);
1613         RETURN(retval);
1614 }
1615 #endif
1616
1617 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1618                                unsigned long arg)
1619 {
1620         struct ll_inode_info *lli = ll_i2info(inode);
1621         struct obd_export *exp = ll_i2dtexp(inode);
1622         struct ll_recreate_obj ucreatp;
1623         struct obd_trans_info oti = { 0 };
1624         struct obdo *oa = NULL;
1625         int lsm_size;
1626         int rc = 0;
1627         struct lov_stripe_md *lsm, *lsm2;
1628         ENTRY;
1629
1630         if (!capable (CAP_SYS_ADMIN))
1631                 RETURN(-EPERM);
1632
1633         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1634                             sizeof(struct ll_recreate_obj));
1635         if (rc) {
1636                 RETURN(-EFAULT);
1637         }
1638         oa = obdo_alloc();
1639         if (oa == NULL)
1640                 RETURN(-ENOMEM);
1641
1642         down(&lli->lli_open_sem);
1643         lsm = lli->lli_smd;
1644         if (lsm == NULL)
1645                 GOTO(out, rc = -ENOENT);
1646         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1647                    (lsm->lsm_stripe_count));
1648
1649         OBD_ALLOC(lsm2, lsm_size);
1650         if (lsm2 == NULL)
1651                 GOTO(out, rc = -ENOMEM);
1652
1653         oa->o_id = ucreatp.lrc_id;
1654         oa->o_gr = ucreatp.lrc_group;
1655         oa->o_nlink = ucreatp.lrc_ost_idx;
1656         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1657         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1658         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1659                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1660
1661         oti.oti_objid = NULL;
1662         memcpy(lsm2, lsm, lsm_size);
1663         rc = obd_create(exp, oa, &lsm2, &oti);
1664
1665         OBD_FREE(lsm2, lsm_size);
1666         GOTO(out, rc);
1667 out:
1668         up(&lli->lli_open_sem);
1669         obdo_free(oa);
1670         return rc;
1671 }
1672
1673 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1674                                     int flags, struct lov_user_md *lum,
1675                                     int lum_size)
1676 {
1677         struct ll_inode_info *lli = ll_i2info(inode);
1678         struct lov_stripe_md *lsm;
1679         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1680         int rc = 0;
1681         ENTRY;
1682
1683         down(&lli->lli_open_sem);
1684         lsm = lli->lli_smd;
1685         if (lsm) {
1686                 up(&lli->lli_open_sem);
1687                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1688                        inode->i_ino);
1689                 RETURN(-EEXIST);
1690         }
1691
1692         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1693         if (rc)
1694                 GOTO(out, rc);
1695         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1696                 GOTO(out_req_free, rc = -ENOENT);
1697         rc = oit.d.lustre.it_status;
1698         if (rc < 0)
1699                 GOTO(out_req_free, rc);
1700
1701         ll_release_openhandle(file->f_dentry, &oit);
1702
1703  out:
1704         up(&lli->lli_open_sem);
1705         ll_intent_release(&oit);
1706         RETURN(rc);
1707 out_req_free:
1708         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1709         goto out;
1710 }
1711
1712 static int ll_lov_setea(struct inode *inode, struct file *file,
1713                             unsigned long arg)
1714 {
1715         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1716         struct lov_user_md  *lump;
1717         int lum_size = sizeof(struct lov_user_md) +
1718                        sizeof(struct lov_user_ost_data);
1719         int rc;
1720         ENTRY;
1721
1722         if (!capable (CAP_SYS_ADMIN))
1723                 RETURN(-EPERM);
1724
1725         OBD_ALLOC(lump, lum_size);
1726         if (lump == NULL) {
1727                 RETURN(-ENOMEM);
1728         }
1729         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1730         if (rc) {
1731                 OBD_FREE(lump, lum_size);
1732                 RETURN(-EFAULT);
1733         }
1734
1735         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1736
1737         OBD_FREE(lump, lum_size);
1738         RETURN(rc);
1739 }
1740
1741 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1742                             unsigned long arg)
1743 {
1744         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1745         int rc;
1746         int flags = FMODE_WRITE;
1747         ENTRY;
1748
1749         /* Bug 1152: copy properly when this is no longer true */
1750         LASSERT(sizeof(lum) == sizeof(*lump));
1751         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1752         rc = copy_from_user(&lum, lump, sizeof(lum));
1753         if (rc)
1754                 RETURN(-EFAULT);
1755
1756         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1757         if (rc == 0) {
1758                  put_user(0, &lump->lmm_stripe_count);
1759                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1760                                     0, ll_i2info(inode)->lli_smd, lump);
1761         }
1762         RETURN(rc);
1763 }
1764
1765 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1766 {
1767         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1768
1769         if (!lsm)
1770                 RETURN(-ENODATA);
1771
1772         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
1773                             (void *)arg);
1774 }
1775
1776 static int ll_get_grouplock(struct inode *inode, struct file *file,
1777                             unsigned long arg)
1778 {
1779         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1780         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1781                                                     .end = OBD_OBJECT_EOF}};
1782         struct lustre_handle lockh = { 0 };
1783         struct ll_inode_info *lli = ll_i2info(inode);
1784         struct lov_stripe_md *lsm = lli->lli_smd;
1785         int flags = 0, rc;
1786         ENTRY;
1787
1788         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1789                 RETURN(-EINVAL);
1790         }
1791
1792         policy.l_extent.gid = arg;
1793         if (file->f_flags & O_NONBLOCK)
1794                 flags = LDLM_FL_BLOCK_NOWAIT;
1795
1796         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1797         if (rc)
1798                 RETURN(rc);
1799
1800         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1801         fd->fd_gid = arg;
1802         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1803
1804         RETURN(0);
1805 }
1806
1807 static int ll_put_grouplock(struct inode *inode, struct file *file,
1808                             unsigned long arg)
1809 {
1810         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1811         struct ll_inode_info *lli = ll_i2info(inode);
1812         struct lov_stripe_md *lsm = lli->lli_smd;
1813         int rc;
1814         ENTRY;
1815
1816         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1817                 /* Ugh, it's already unlocked. */
1818                 RETURN(-EINVAL);
1819         }
1820
1821         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1822                 RETURN(-EINVAL);
1823
1824         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1825
1826         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1827         if (rc)
1828                 RETURN(rc);
1829
1830         fd->fd_gid = 0;
1831         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1832
1833         RETURN(0);
1834 }
1835
1836 static int join_sanity_check(struct inode *head, struct inode *tail)
1837 {
1838         ENTRY;
1839         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1840                 CERROR("server do not support join \n");
1841                 RETURN(-EINVAL);
1842         }
1843         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1844                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1845                        head->i_ino, tail->i_ino);
1846                 RETURN(-EINVAL);
1847         }
1848         if (head->i_ino == tail->i_ino) {
1849                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1850                 RETURN(-EINVAL);
1851         }
1852         if (head->i_size % JOIN_FILE_ALIGN) {
1853                 CERROR("hsize %llu must be times of 64K\n", head->i_size);
1854                 RETURN(-EINVAL);
1855         }
1856         RETURN(0);
1857 }
1858
1859 static int join_file(struct inode *head_inode, struct file *head_filp,
1860                      struct file *tail_filp)
1861 {
1862         struct inode *tail_inode, *tail_parent;
1863         struct dentry *tail_dentry = tail_filp->f_dentry;
1864         struct lookup_intent oit = {.it_op = IT_OPEN,
1865                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1866         struct lustre_handle lockh;
1867         struct md_op_data *op_data;
1868         __u32  hsize = head_inode->i_size >> 32;
1869         __u32  tsize = head_inode->i_size;
1870         int    rc;
1871         ENTRY;
1872
1873         tail_dentry = tail_filp->f_dentry;
1874         tail_inode = tail_dentry->d_inode;
1875         tail_parent = tail_dentry->d_parent->d_inode;
1876
1877         op_data = ll_prep_md_op_data(NULL, head_inode, tail_parent,
1878                                      tail_dentry->d_name.name,
1879                                      tail_dentry->d_name.len, 0);
1880         if (op_data == NULL)
1881                 RETURN(-ENOMEM);
1882
1883         rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
1884                         op_data, &lockh, &tsize, 0, ldlm_completion_ast,
1885                         ll_md_blocking_ast, &hsize, 0);
1886
1887         ll_finish_md_op_data(op_data);
1888         if (rc < 0)
1889                 GOTO(out, rc);
1890
1891         rc = oit.d.lustre.it_status;
1892
1893         if (rc < 0) {
1894                 ptlrpc_req_finished((struct ptlrpc_request *)
1895                                                           oit.d.lustre.it_data);
1896                 GOTO(out, rc);
1897         }
1898
1899         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
1900                                            * away */
1901                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
1902                 oit.d.lustre.it_lock_mode = 0;
1903         }
1904         ll_release_openhandle(head_filp->f_dentry, &oit);
1905 out:
1906         ll_intent_release(&oit);
1907         RETURN(rc);
1908 }
1909
1910 static int ll_file_join(struct inode *head, struct file *filp,
1911                         char *filename_tail)
1912 {
1913         struct inode *tail = NULL, *first = NULL, *second = NULL;
1914         struct dentry *tail_dentry;
1915         struct file *tail_filp, *first_filp, *second_filp;
1916         struct ll_lock_tree first_tree, second_tree;
1917         struct ll_lock_tree_node *first_node, *second_node;
1918         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1919         int rc = 0, cleanup_phase = 0;
1920         ENTRY;
1921
1922         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1923                head->i_ino, head->i_generation, head, filename_tail);
1924
1925         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1926         if (IS_ERR(tail_filp)) {
1927                 CERROR("Can not open tail file %s", filename_tail);
1928                 rc = PTR_ERR(tail_filp);
1929                 GOTO(cleanup, rc);
1930         }
1931         tail = igrab(tail_filp->f_dentry->d_inode);
1932
1933         tlli = ll_i2info(tail);
1934         tail_dentry = tail_filp->f_dentry;
1935         LASSERT(tail_dentry);
1936         cleanup_phase = 1;
1937
1938         /*reorder the inode for lock sequence*/
1939         first = head->i_ino > tail->i_ino ? head : tail;
1940         second = head->i_ino > tail->i_ino ? tail : head;
1941         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1942         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1943
1944         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1945                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1946         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1947         if (IS_ERR(first_node)){
1948                 rc = PTR_ERR(first_node);
1949                 GOTO(cleanup, rc);
1950         }
1951         first_tree.lt_fd = first_filp->private_data;
1952         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1953         if (rc != 0)
1954                 GOTO(cleanup, rc);
1955         cleanup_phase = 2;
1956
1957         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1958         if (IS_ERR(second_node)){
1959                 rc = PTR_ERR(second_node);
1960                 GOTO(cleanup, rc);
1961         }
1962         second_tree.lt_fd = second_filp->private_data;
1963         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
1964         if (rc != 0)
1965                 GOTO(cleanup, rc);
1966         cleanup_phase = 3;
1967
1968         rc = join_sanity_check(head, tail);
1969         if (rc)
1970                 GOTO(cleanup, rc);
1971
1972         rc = join_file(head, filp, tail_filp);
1973         if (rc)
1974                 GOTO(cleanup, rc);
1975 cleanup:
1976         switch (cleanup_phase) {
1977         case 3:
1978                 ll_tree_unlock(&second_tree);
1979                 obd_cancel_unused(ll_i2dtexp(second),
1980                                   ll_i2info(second)->lli_smd, 0, NULL);
1981         case 2:
1982                 ll_tree_unlock(&first_tree);
1983                 obd_cancel_unused(ll_i2dtexp(first),
1984                                   ll_i2info(first)->lli_smd, 0, NULL);
1985         case 1:
1986                 filp_close(tail_filp, 0);
1987                 if (tail)
1988                         iput(tail);
1989                 if (head && rc == 0) {
1990                         obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
1991                                        &hlli->lli_smd);
1992                         hlli->lli_smd = NULL;
1993                 }
1994         case 0:
1995                 break;
1996         default:
1997                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1998                 LBUG();
1999         }
2000         RETURN(rc);
2001 }
2002
2003 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2004 {
2005         struct inode *inode = dentry->d_inode;
2006         struct obd_client_handle *och;
2007         int rc;
2008         ENTRY;
2009
2010         LASSERT(inode);
2011
2012         /* Root ? Do nothing. */
2013         if (dentry->d_inode->i_sb->s_root == dentry)
2014                 RETURN(0);
2015
2016         /* No open handle to close? Move away */
2017         if (!it_disposition(it, DISP_OPEN_OPEN))
2018                 RETURN(0);
2019
2020         OBD_ALLOC(och, sizeof(*och));
2021         if (!och)
2022                 GOTO(out, rc = -ENOMEM);
2023
2024         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
2025                     ll_i2info(inode), it, och);
2026
2027         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
2028                                        inode, och);
2029
2030         /* Do not free @och is it is waiting for DONE_WRITING. */
2031         if (och->och_fh.cookie == DEAD_HANDLE_MAGIC)
2032                 OBD_FREE(och, sizeof(*och));
2033  out:
2034         /* this one is in place of ll_file_open */
2035         ptlrpc_req_finished(it->d.lustre.it_data);
2036         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2037         RETURN(rc);
2038 }
2039
2040 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2041                   unsigned long arg)
2042 {
2043         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2044         int flags;
2045         ENTRY;
2046
2047         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2048                inode->i_generation, inode, cmd);
2049
2050         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2051         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2052                 RETURN(-ENOTTY);
2053
2054         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
2055         switch(cmd) {
2056         case LL_IOC_GETFLAGS:
2057                 /* Get the current value of the file flags */
2058                 return put_user(fd->fd_flags, (int *)arg);
2059         case LL_IOC_SETFLAGS:
2060         case LL_IOC_CLRFLAGS:
2061                 /* Set or clear specific file flags */
2062                 /* XXX This probably needs checks to ensure the flags are
2063                  *     not abused, and to handle any flag side effects.
2064                  */
2065                 if (get_user(flags, (int *) arg))
2066                         RETURN(-EFAULT);
2067
2068                 if (cmd == LL_IOC_SETFLAGS) {
2069                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2070                             !(file->f_flags & O_DIRECT)) {
2071                                 CERROR("%s: unable to disable locking on "
2072                                        "non-O_DIRECT file\n", current->comm);
2073                                 RETURN(-EINVAL);
2074                         }
2075
2076                         fd->fd_flags |= flags;
2077                 } else {
2078                         fd->fd_flags &= ~flags;
2079                 }
2080                 RETURN(0);
2081         case LL_IOC_LOV_SETSTRIPE:
2082                 RETURN(ll_lov_setstripe(inode, file, arg));
2083         case LL_IOC_LOV_SETEA:
2084                 RETURN(ll_lov_setea(inode, file, arg));
2085         case LL_IOC_LOV_GETSTRIPE:
2086                 RETURN(ll_lov_getstripe(inode, arg));
2087         case LL_IOC_RECREATE_OBJ:
2088                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2089         case EXT3_IOC_GETFLAGS:
2090         case EXT3_IOC_SETFLAGS:
2091                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2092         case EXT3_IOC_GETVERSION_OLD:
2093         case EXT3_IOC_GETVERSION:
2094                 RETURN(put_user(inode->i_generation, (int *)arg));
2095         case LL_IOC_JOIN: {
2096                 char *ftail;
2097                 int rc;
2098
2099                 ftail = getname((const char *)arg);
2100                 if (IS_ERR(ftail))
2101                         RETURN(PTR_ERR(ftail));
2102                 rc = ll_file_join(inode, file, ftail);
2103                 putname(ftail);
2104                 RETURN(rc);
2105         }
2106         case LL_IOC_GROUP_LOCK:
2107                 RETURN(ll_get_grouplock(inode, file, arg));
2108         case LL_IOC_GROUP_UNLOCK:
2109                 RETURN(ll_put_grouplock(inode, file, arg));
2110         case IOC_OBD_STATFS:
2111                 RETURN(ll_obd_statfs(inode, (void *)arg));
2112
2113         /* We need to special case any other ioctls we want to handle,
2114          * to send them to the MDS/OST as appropriate and to properly
2115          * network encode the arg field.
2116         case EXT3_IOC_SETVERSION_OLD:
2117         case EXT3_IOC_SETVERSION:
2118         */
2119         case LL_IOC_FLUSHCTX:
2120                 RETURN(ll_flush_ctx(inode));
2121         case LL_IOC_GETFACL: {
2122                 struct rmtacl_ioctl_data ioc;
2123
2124                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2125                         RETURN(-EFAULT);
2126
2127                 RETURN(ll_ioctl_getfacl(inode, &ioc));
2128         }
2129         case LL_IOC_SETFACL: {
2130                 struct rmtacl_ioctl_data ioc;
2131
2132                 if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
2133                         RETURN(-EFAULT);
2134
2135                 RETURN(ll_ioctl_setfacl(inode, &ioc));
2136         }
2137         default:
2138                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2139                                      (void *)arg));
2140         }
2141 }
2142
2143 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2144 {
2145         struct inode *inode = file->f_dentry->d_inode;
2146         struct ll_inode_info *lli = ll_i2info(inode);
2147         struct lov_stripe_md *lsm = lli->lli_smd;
2148         loff_t retval;
2149         ENTRY;
2150         retval = offset + ((origin == 2) ? inode->i_size :
2151                            (origin == 1) ? file->f_pos : 0);
2152         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2153                inode->i_ino, inode->i_generation, inode, retval, retval,
2154                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2155
2156         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
2157         if (origin == 2) { /* SEEK_END */
2158                 int nonblock = 0, rc;
2159
2160                 if (file->f_flags & O_NONBLOCK)
2161                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2162
2163                 if (lsm != NULL) {
2164                         rc = ll_glimpse_size(inode, nonblock);
2165                         if (rc != 0)
2166                                 RETURN(rc);
2167                 }
2168
2169                 ll_inode_size_lock(inode, 0);
2170                 offset += inode->i_size;
2171                 ll_inode_size_unlock(inode, 0);
2172         } else if (origin == 1) { /* SEEK_CUR */
2173                 offset += file->f_pos;
2174         }
2175
2176         retval = -EINVAL;
2177         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2178                 if (offset != file->f_pos) {
2179                         file->f_pos = offset;
2180 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2181                         file->f_reada = 0;
2182                         file->f_version = ++event;
2183 #endif
2184                 }
2185                 retval = offset;
2186         } else {
2187                 CERROR("invalid offset offset "LPX64" inode=%lu/%u(%p)"
2188                        "seek (%s) isize "LPU64", f_ops "LPU64"\n",
2189                        offset, inode->i_ino, inode->i_generation, inode,
2190                        origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR"
2191                        : "SEEK_SET", inode->i_size, file->f_pos);
2192         }
2193
2194         RETURN(retval);
2195 }
2196
2197 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2198 {
2199         struct inode *inode = dentry->d_inode;
2200         struct ll_inode_info *lli = ll_i2info(inode);
2201         struct lov_stripe_md *lsm = lli->lli_smd;
2202         struct ptlrpc_request *req;
2203         struct obd_capa *oc;
2204         int rc, err;
2205         ENTRY;
2206         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2207                inode->i_generation, inode);
2208
2209         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
2210
2211         /* fsync's caller has already called _fdata{sync,write}, we want
2212          * that IO to finish before calling the osc and mdc sync methods */
2213         rc = filemap_fdatawait(inode->i_mapping);
2214
2215         /* catch async errors that were recorded back when async writeback
2216          * failed for pages in this mapping. */
2217         err = lli->lli_async_rc;
2218         lli->lli_async_rc = 0;
2219         if (rc == 0)
2220                 rc = err;
2221         if (lsm) {
2222                 err = lov_test_and_clear_async_rc(lsm);
2223                 if (rc == 0)
2224                         rc = err;
2225         }
2226
2227         oc = ll_mdscapa_get(inode);
2228         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2229                       &req);
2230         capa_put(oc);
2231         if (!rc)
2232                 rc = err;
2233         if (!err)
2234                 ptlrpc_req_finished(req);
2235
2236         if (data && lsm) {
2237                 struct obdo *oa = obdo_alloc();
2238                 struct obd_capa *ocapa;
2239
2240                 if (!oa)
2241                         RETURN(rc ? rc : -ENOMEM);
2242
2243                 oa->o_id = lsm->lsm_object_id;
2244                 oa->o_gr = lsm->lsm_object_gr;
2245                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2246                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2247                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
2248                                            OBD_MD_FLGROUP);
2249
2250                 ocapa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2251                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
2252                                0, OBD_OBJECT_EOF, ocapa);
2253                 capa_put(ocapa);
2254                 if (!rc)
2255                         rc = err;
2256                 obdo_free(oa);
2257         }
2258
2259         RETURN(rc);
2260 }
2261
2262 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2263 {
2264         struct inode *inode = file->f_dentry->d_inode;
2265         struct ll_sb_info *sbi = ll_i2sbi(inode);
2266         struct ldlm_res_id res_id =
2267                 { .name = { fid_seq(ll_inode2fid(inode)),
2268                             fid_oid(ll_inode2fid(inode)),
2269                             fid_ver(ll_inode2fid(inode)),
2270                             LDLM_FLOCK} };
2271         struct lustre_handle lockh = {0};
2272         ldlm_policy_data_t flock;
2273         ldlm_mode_t mode = 0;
2274         int flags = 0;
2275         int rc;
2276         ENTRY;
2277
2278         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2279                inode->i_ino, file_lock);
2280
2281         flock.l_flock.pid = file_lock->fl_pid;
2282         flock.l_flock.start = file_lock->fl_start;
2283         flock.l_flock.end = file_lock->fl_end;
2284
2285         switch (file_lock->fl_type) {
2286         case F_RDLCK:
2287                 mode = LCK_PR;
2288                 break;
2289         case F_UNLCK:
2290                 /* An unlock request may or may not have any relation to
2291                  * existing locks so we may not be able to pass a lock handle
2292                  * via a normal ldlm_lock_cancel() request. The request may even
2293                  * unlock a byte range in the middle of an existing lock. In
2294                  * order to process an unlock request we need all of the same
2295                  * information that is given with a normal read or write record
2296                  * lock request. To avoid creating another ldlm unlock (cancel)
2297                  * message we'll treat a LCK_NL flock request as an unlock. */
2298                 mode = LCK_NL;
2299                 break;
2300         case F_WRLCK:
2301                 mode = LCK_PW;
2302                 break;
2303         default:
2304                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2305                 LBUG();
2306         }
2307
2308         switch (cmd) {
2309         case F_SETLKW:
2310 #ifdef F_SETLKW64
2311         case F_SETLKW64:
2312 #endif
2313                 flags = 0;
2314                 break;
2315         case F_SETLK:
2316 #ifdef F_SETLK64
2317         case F_SETLK64:
2318 #endif
2319                 flags = LDLM_FL_BLOCK_NOWAIT;
2320                 break;
2321         case F_GETLK:
2322 #ifdef F_GETLK64
2323         case F_GETLK64:
2324 #endif
2325                 flags = LDLM_FL_TEST_LOCK;
2326                 /* Save the old mode so that if the mode in the lock changes we
2327                  * can decrement the appropriate reader or writer refcount. */
2328                 file_lock->fl_type = mode;
2329                 break;
2330         default:
2331                 CERROR("unknown fcntl lock command: %d\n", cmd);
2332                 LBUG();
2333         }
2334
2335         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2336                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2337                flags, mode, flock.l_flock.start, flock.l_flock.end);
2338
2339         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id,
2340                               LDLM_FLOCK, &flock, mode, &flags, NULL,
2341                               ldlm_flock_completion_ast, NULL, file_lock,
2342                               NULL, 0, NULL, &lockh, 0);
2343         RETURN(rc);
2344 }
2345
2346 int ll_have_md_lock(struct inode *inode, __u64 bits)
2347 {
2348         struct lustre_handle lockh;
2349         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2350         struct lu_fid *fid;
2351         int flags;
2352         ENTRY;
2353
2354         if (!inode)
2355                RETURN(0);
2356
2357         fid = &ll_i2info(inode)->lli_fid;
2358         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2359
2360         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2361         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
2362                           LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2363                 RETURN(1);
2364         }
2365
2366         RETURN(0);
2367 }
2368
2369 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2370         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2371                               * and return success */
2372                 inode->i_nlink = 0;
2373                 /* This path cannot be hit for regular files unless in
2374                  * case of obscure races, so no need to to validate
2375                  * size. */
2376                 if (!S_ISREG(inode->i_mode) &&
2377                     !S_ISDIR(inode->i_mode))
2378                         return 0;
2379         }
2380
2381         if (rc) {
2382                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2383                 return -abs(rc);
2384
2385         }
2386
2387         return 0;
2388 }
2389
2390 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2391 {
2392         struct inode *inode = dentry->d_inode;
2393         struct ptlrpc_request *req = NULL;
2394         struct ll_sb_info *sbi;
2395         struct obd_export *exp;
2396         int rc;
2397         ENTRY;
2398
2399         if (!inode) {
2400                 CERROR("REPORT THIS LINE TO PETER\n");
2401                 RETURN(0);
2402         }
2403         sbi = ll_i2sbi(inode);
2404
2405         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2406                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2407 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2408         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
2409 #endif
2410
2411         exp = ll_i2mdexp(inode);
2412
2413         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2414                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2415                 struct md_op_data *op_data;
2416
2417                 /* Call getattr by fid, so do not provide name at all. */
2418                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2419                                              dentry->d_inode, NULL, 0, 0);
2420                 if (op_data == NULL)
2421                         RETURN(-ENOMEM);
2422                 it->it_flags |= O_CHECK_STALE;
2423                 rc = md_intent_lock(exp, op_data, NULL, 0,
2424                                     /* we are not interested in name
2425                                        based lookup */
2426                                     &oit, 0, &req,
2427                                     ll_md_blocking_ast, 0);
2428                 ll_finish_md_op_data(op_data);
2429                 it->it_flags &= ~ O_CHECK_STALE;
2430                 if (rc < 0) {
2431                         rc = ll_inode_revalidate_fini(inode, rc);
2432                         GOTO (out, rc);
2433                 }
2434
2435                 rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2436                 if (rc != 0) {
2437                         ll_intent_release(&oit);
2438                         GOTO(out, rc);
2439                 }
2440
2441                 /* Unlinked? Unhash dentry, so it is not picked up later by
2442                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2443                    here to preserve get_cwd functionality on 2.6.
2444                    Bug 10503 */
2445                 if (!dentry->d_inode->i_nlink) {
2446                         spin_lock(&dcache_lock);
2447                         ll_drop_dentry(dentry);
2448                         spin_unlock(&dcache_lock);
2449                 }
2450
2451                 ll_lookup_finish_locks(&oit, dentry);
2452         } else if (!ll_have_md_lock(dentry->d_inode,
2453                                     MDS_INODELOCK_UPDATE)) {
2454                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2455                 obd_valid valid = OBD_MD_FLGETATTR;
2456                 int ealen = 0;
2457                 struct obd_capa *oc;
2458
2459                 if (S_ISREG(inode->i_mode)) {
2460                         rc = ll_get_max_mdsize(sbi, &ealen);
2461                         if (rc)
2462                                 RETURN(rc);
2463                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2464                 }
2465                 oc = ll_mdscapa_get(inode);
2466                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
2467                                 ealen, &req);
2468                 capa_put(oc);
2469                 if (rc) {
2470                         rc = ll_inode_revalidate_fini(inode, rc);
2471                         RETURN(rc);
2472                 }
2473
2474                 rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
2475                                    NULL);
2476                 if (rc)
2477                         GOTO(out, rc);
2478         }
2479
2480         /* if object not yet allocated, don't validate size */
2481         if (ll_i2info(inode)->lli_smd == NULL)
2482                 GOTO(out, rc = 0);
2483
2484         /* ll_glimpse_size will prefer locally cached writes if they extend
2485            the file */
2486         rc = ll_glimpse_size(inode, 0);
2487         EXIT;
2488 out:
2489         ptlrpc_req_finished(req);
2490         return rc;
2491 }
2492
2493 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2494 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2495                   struct lookup_intent *it, struct kstat *stat)
2496 {
2497         struct inode *inode = de->d_inode;
2498         int res = 0;
2499
2500         res = ll_inode_revalidate_it(de, it);
2501         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
2502
2503         if (res)
2504                 return res;
2505
2506         stat->dev = inode->i_sb->s_dev;
2507         stat->ino = inode->i_ino;
2508         stat->mode = inode->i_mode;
2509         stat->nlink = inode->i_nlink;
2510         stat->uid = inode->i_uid;
2511         stat->gid = inode->i_gid;
2512         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2513         stat->atime = inode->i_atime;
2514         stat->mtime = inode->i_mtime;
2515         stat->ctime = inode->i_ctime;
2516         stat->blksize = inode->i_blksize;
2517
2518         ll_inode_size_lock(inode, 0);
2519         stat->size = inode->i_size;
2520         stat->blocks = inode->i_blocks;
2521         ll_inode_size_unlock(inode, 0);
2522
2523         return 0;
2524 }
2525 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2526 {
2527         struct lookup_intent it = { .it_op = IT_GETATTR };
2528
2529         return ll_getattr_it(mnt, de, &it, stat);
2530 }
2531 #endif
2532
2533 static
2534 int lustre_check_acl(struct inode *inode, int mask)
2535 {
2536 #ifdef CONFIG_FS_POSIX_ACL
2537         struct ll_inode_info *lli = ll_i2info(inode);
2538         struct posix_acl *acl;
2539         int rc;
2540         ENTRY;
2541
2542         spin_lock(&lli->lli_lock);
2543         acl = posix_acl_dup(lli->lli_posix_acl);
2544         spin_unlock(&lli->lli_lock);
2545
2546         if (!acl)
2547                 RETURN(-EAGAIN);
2548
2549         rc = posix_acl_permission(inode, acl, mask);
2550         posix_acl_release(acl);
2551
2552         RETURN(rc);
2553 #else
2554         return -EAGAIN;
2555 #endif
2556 }
2557
2558 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2559 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2560 {
2561         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2562                inode->i_ino, inode->i_generation, inode, mask);
2563         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2564                 return lustre_check_remote_perm(inode, mask);
2565         return generic_permission(inode, mask, lustre_check_acl);
2566 }
2567 #else
2568 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2569 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2570 #else
2571 int ll_inode_permission(struct inode *inode, int mask)
2572 #endif
2573 {
2574         int mode = inode->i_mode;
2575         int rc;
2576
2577         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2578                inode->i_ino, inode->i_generation, inode, mask);
2579
2580         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2581                 return lustre_check_remote_perm(inode, mask);
2582
2583         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2584             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2585                 return -EROFS;
2586         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2587                 return -EACCES;
2588         if (current->fsuid == inode->i_uid) {
2589                 mode >>= 6;
2590         } else if (1) {
2591                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2592                         goto check_groups;
2593                 rc = lustre_check_acl(inode, mask);
2594                 if (rc == -EAGAIN)
2595                         goto check_groups;
2596                 if (rc == -EACCES)
2597                         goto check_capabilities;
2598                 return rc;
2599         } else {
2600 check_groups:
2601                 if (in_group_p(inode->i_gid))
2602                         mode >>= 3;
2603         }
2604         if ((mode & mask & S_IRWXO) == mask)
2605                 return 0;
2606
2607 check_capabilities:
2608         if (!(mask & MAY_EXEC) ||
2609             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2610                 if (capable(CAP_DAC_OVERRIDE))
2611                         return 0;
2612
2613         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2614             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2615                 return 0;
2616         return -EACCES;
2617 }
2618 #endif
2619
2620 struct file_operations ll_file_operations = {
2621         .read           = ll_file_read,
2622         .write          = ll_file_write,
2623         .ioctl          = ll_file_ioctl,
2624         .open           = ll_file_open,
2625         .release        = ll_file_release,
2626         .mmap           = ll_file_mmap,
2627         .llseek         = ll_file_seek,
2628 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2629         .sendfile       = ll_file_sendfile,
2630 #endif
2631         .fsync          = ll_fsync,
2632         /* .lock           = ll_file_flock */
2633 };
2634
2635 struct file_operations ll_file_operations_flock = {
2636         .read           = ll_file_read,
2637         .write          = ll_file_write,
2638         .ioctl          = ll_file_ioctl,
2639         .open           = ll_file_open,
2640         .release        = ll_file_release,
2641         .mmap           = ll_file_mmap,
2642         .llseek         = ll_file_seek,
2643 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2644         .sendfile       = ll_file_sendfile,
2645 #endif
2646         .fsync          = ll_fsync,
2647         .lock           = ll_file_flock
2648 };
2649
2650
2651 struct inode_operations ll_file_inode_operations = {
2652 #ifdef LUSTRE_KERNEL_VERSION
2653         .setattr_raw    = ll_setattr_raw,
2654 #endif
2655         .setattr        = ll_setattr,
2656         .truncate       = ll_truncate,
2657 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2658         .getattr        = ll_getattr,
2659 #else
2660         .revalidate_it  = ll_inode_revalidate_it,
2661 #endif
2662         .permission     = ll_inode_permission,
2663         .setxattr       = ll_setxattr,
2664         .getxattr       = ll_getxattr,
2665         .listxattr      = ll_listxattr,
2666         .removexattr    = ll_removexattr,
2667 };
2668