Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 /* also used by llite/special.c:ll_special_open() */
36 struct ll_file_data *ll_file_data_get(void)
37 {
38         struct ll_file_data *fd;
39
40         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
41         return fd;
42 }
43
44 static void ll_file_data_put(struct ll_file_data *fd)
45 {
46         if (fd != NULL)
47                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
48 }
49
50 static int ll_close_inode_openhandle(struct inode *inode,
51                                      struct obd_client_handle *och)
52 {
53         struct ptlrpc_request *req = NULL;
54         struct obd_device *obd;
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         obd = class_exp2obd(ll_i2mdcexp(inode));
60         if (obd == NULL) {
61                 CERROR("Invalid MDC connection handle "LPX64"\n",
62                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
63                 GOTO(out, rc = 0);
64         }
65
66         /*
67          * here we check if this is forced umount. If so this is called on
68          * canceling "open lock" and we do not call mdc_close() in this case, as
69          * it will not be successful, as import is already deactivated.
70          */
71         if (obd->obd_force)
72                 GOTO(out, rc = 0);
73
74         OBDO_ALLOC(oa);
75         if (!oa)
76                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
77
78         oa->o_id = inode->i_ino;
79         oa->o_valid = OBD_MD_FLID;
80         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
81                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
82                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
83                                    OBD_MD_FLCTIME);
84         if (ll_is_inode_dirty(inode)) {
85                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
86                 oa->o_valid |= OBD_MD_FLFLAGS;
87         }
88
89         rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
90         if (rc == EAGAIN) {
91                 /* We are the last writer, so the MDS has instructed us to get
92                  * the file size and any write cookies, then close again. */
93                 ll_queue_done_writing(inode);
94                 rc = 0;
95         } else if (rc) {
96                 CERROR("inode %lu mdc close failed: rc = %d\n",
97                        inode->i_ino, rc);
98         }
99
100         OBDO_FREE(oa);
101
102         if (rc == 0) {
103                 rc = ll_objects_destroy(req, inode);
104                 if (rc)
105                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
106                                inode->i_ino, rc);
107         }
108
109         ptlrpc_req_finished(req); /* This is close request */
110         EXIT;
111 out:
112         mdc_clear_open_replay_data(och);
113
114         return rc;
115 }
116
117 int ll_mdc_real_close(struct inode *inode, int flags)
118 {
119         struct ll_inode_info *lli = ll_i2info(inode);
120         int rc = 0;
121         struct obd_client_handle **och_p;
122         struct obd_client_handle *och;
123         __u64 *och_usecount;
124
125         ENTRY;
126
127         if (flags & FMODE_WRITE) {
128                 och_p = &lli->lli_mds_write_och;
129                 och_usecount = &lli->lli_open_fd_write_count;
130         } else if (flags & FMODE_EXEC) {
131                 och_p = &lli->lli_mds_exec_och;
132                 och_usecount = &lli->lli_open_fd_exec_count;
133          } else {
134                 LASSERT(flags & FMODE_READ);
135                 och_p = &lli->lli_mds_read_och;
136                 och_usecount = &lli->lli_open_fd_read_count;
137         }
138
139         down(&lli->lli_och_sem);
140         if (*och_usecount) { /* There are still users of this handle, so
141                                 skip freeing it. */
142                 up(&lli->lli_och_sem);
143                 RETURN(0);
144         }
145         och=*och_p;
146         *och_p = NULL;
147         up(&lli->lli_och_sem);
148
149         if (och) { /* There might be a race and somebody have freed this och
150                       already */
151                 rc = ll_close_inode_openhandle(inode, och);
152                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
153                 OBD_FREE(och, sizeof *och);
154         }
155
156         RETURN(rc);
157 }
158
159 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
160                         struct file *file)
161 {
162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
163         struct ll_inode_info *lli = ll_i2info(inode);
164         int rc = 0;
165         ENTRY;
166
167         /* clear group lock, if present */
168         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
169                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
170                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
171                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
172                                       &fd->fd_cwlockh);
173         }
174
175         /* Let's see if we have good enough OPEN lock on the file and if
176            we can skip talking to MDS */
177         if (file->f_dentry->d_inode) { /* Can this ever be false? */
178                 int lockmode;
179                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
180                 struct lustre_handle lockh;
181                 struct inode *inode = file->f_dentry->d_inode;
182                 struct ldlm_res_id file_res_id = {.name={inode->i_ino,
183                                                          inode->i_generation}};
184                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
185
186                 down(&lli->lli_och_sem);
187                 if (fd->fd_omode & FMODE_WRITE) {
188                         lockmode = LCK_CW;
189                         LASSERT(lli->lli_open_fd_write_count);
190                         lli->lli_open_fd_write_count--;
191                 } else if (fd->fd_omode & FMODE_EXEC) {
192                         lockmode = LCK_PR;
193                         LASSERT(lli->lli_open_fd_exec_count);
194                         lli->lli_open_fd_exec_count--;
195                 } else {
196                         lockmode = LCK_CR;
197                         LASSERT(lli->lli_open_fd_read_count);
198                         lli->lli_open_fd_read_count--;
199                 }
200                 up(&lli->lli_och_sem);
201
202                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
203                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
204                                      &lockh)) {
205                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
206                                                 fd->fd_omode);
207                 }
208         } else {
209                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
210                        file, file->f_dentry, file->f_dentry->d_name.name);
211         }
212
213         LUSTRE_FPRIVATE(file) = NULL;
214         ll_file_data_put(fd);
215
216         RETURN(rc);
217 }
218
219 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
220
221 /* While this returns an error code, fput() the caller does not, so we need
222  * to make every effort to clean up all of our state here.  Also, applications
223  * rarely check close errors and even if an error is returned they will not
224  * re-try the close call.
225  */
226 int ll_file_release(struct inode *inode, struct file *file)
227 {
228         struct ll_file_data *fd;
229         struct ll_sb_info *sbi = ll_i2sbi(inode);
230         struct ll_inode_info *lli = ll_i2info(inode);
231         struct lov_stripe_md *lsm = lli->lli_smd;
232         int rc;
233
234         ENTRY;
235         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
236                inode->i_generation, inode);
237
238         if (S_ISDIR(inode->i_mode))
239                 ll_stop_statahead(inode);
240
241         /* don't do anything for / */
242         if (inode->i_sb->s_root == file->f_dentry)
243                 RETURN(0);
244
245         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
246         fd = LUSTRE_FPRIVATE(file);
247         LASSERT(fd != NULL);
248
249         if (lsm)
250                 lov_test_and_clear_async_rc(lsm);
251         lli->lli_async_rc = 0;
252
253         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
254         RETURN(rc);
255 }
256
257 static int ll_intent_file_open(struct file *file, void *lmm,
258                                int lmmsize, struct lookup_intent *itp)
259 {
260         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
261         struct mdc_op_data data;
262         struct dentry *parent = file->f_dentry->d_parent;
263         const char *name = file->f_dentry->d_name.name;
264         const int len = file->f_dentry->d_name.len;
265         struct inode *inode = file->f_dentry->d_inode;
266         struct ptlrpc_request *req;
267         int rc;
268         ENTRY;
269
270         if (!parent)
271                 RETURN(-ENOENT);
272
273         ll_prepare_mdc_op_data(&data, parent->d_inode, inode,
274                                name, len, O_RDWR, NULL);
275
276         /* Usually we come here only for NFSD, and we want open lock.
277            But we can also get here with pre 2.6.15 patchless kernels, and in
278            that case that lock is also ok */
279         /* We can also get here if there was cached open handle in revalidate_it
280          * but it disappeared while we were getting from there to ll_file_open.
281          * But this means this file was closed and immediatelly opened which
282          * makes a good candidate for using OPEN lock */
283         /* If lmmsize & lmm are not 0, we are just setting stripe info
284          * parameters. No need for the open lock */
285         if (!lmm && !lmmsize)
286                 itp->it_flags |= MDS_OPEN_LOCK;
287
288         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
289                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
290         if (rc == -ESTALE) {
291                 /* reason for keep own exit path - don`t flood log
292                 * with messages with -ESTALE errors.
293                 */
294                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
295                      it_open_error(DISP_OPEN_OPEN, itp))
296                         GOTO(out, rc);
297                 ll_release_openhandle(file->f_dentry, itp);
298                 GOTO(out_stale, rc);
299         }
300
301         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
302                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
303                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
304                 GOTO(out, rc);
305         }
306
307         if (itp->d.lustre.it_lock_mode)
308                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
309                                   inode);
310
311         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
312                            req, DLM_REPLY_REC_OFF, NULL);
313 out:
314         ptlrpc_req_finished(itp->d.lustre.it_data);
315
316 out_stale:
317         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
318         ll_intent_drop_lock(itp);
319
320         RETURN(rc);
321 }
322
323
324 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
325                         struct obd_client_handle *och)
326 {
327         struct ptlrpc_request *req = it->d.lustre.it_data;
328         struct mds_body *body;
329
330         LASSERT(och);
331
332         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
333         LASSERT(body != NULL);                  /* reply already checked out */
334         /* and swabbed in mdc_enqueue */
335         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
336
337         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
338         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
339         lli->lli_io_epoch = body->io_epoch;
340
341         mdc_set_open_replay_data(och, it->d.lustre.it_data);
342 }
343
344 int ll_local_open(struct file *file, struct lookup_intent *it,
345                   struct ll_file_data *fd, struct obd_client_handle *och)
346 {
347         ENTRY;
348
349         LASSERT(!LUSTRE_FPRIVATE(file));
350
351         LASSERT(fd != NULL);
352
353         if (och)
354                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
355         LUSTRE_FPRIVATE(file) = fd;
356         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
357         fd->fd_omode = it->it_flags;
358
359         RETURN(0);
360 }
361
362 /* Open a file, and (for the very first open) create objects on the OSTs at
363  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
364  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
365  * lli_open_sem to ensure no other process will create objects, send the
366  * stripe MD to the MDS, or try to destroy the objects if that fails.
367  *
368  * If we already have the stripe MD locally then we don't request it in
369  * mdc_open(), by passing a lmm_size = 0.
370  *
371  * It is up to the application to ensure no other processes open this file
372  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
373  * used.  We might be able to avoid races of that sort by getting lli_open_sem
374  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
375  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
376  */
377 int ll_file_open(struct inode *inode, struct file *file)
378 {
379         struct ll_inode_info *lli = ll_i2info(inode);
380         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
381                                           .it_flags = file->f_flags };
382         struct lov_stripe_md *lsm;
383         struct ptlrpc_request *req = NULL;
384         struct obd_client_handle **och_p;
385         __u64 *och_usecount;
386         struct ll_file_data *fd;
387         int rc = 0;
388         ENTRY;
389
390         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
391                inode->i_generation, inode, file->f_flags);
392
393         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0)
394                 lli->lli_opendir_pid = current->pid;
395
396         /* don't do anything for / */
397         if (inode->i_sb->s_root == file->f_dentry)
398                 RETURN(0);
399
400 #ifdef HAVE_VFS_INTENT_PATCHES
401         it = file->f_it;
402 #else
403         it = file->private_data; /* XXX: compat macro */
404         file->private_data = NULL; /* prevent ll_local_open assertion */
405 #endif
406
407         fd = ll_file_data_get();
408         if (fd == NULL) {
409                 lli->lli_opendir_pid = 0;
410                 RETURN(-ENOMEM);
411         }
412         if (!it || !it->d.lustre.it_disposition) {
413                 /* Convert f_flags into access mode. We cannot use file->f_mode,
414                  * because everything but O_ACCMODE mask was stripped from it */
415                 if ((oit.it_flags + 1) & O_ACCMODE)
416                         oit.it_flags++;
417                 if (file->f_flags & O_TRUNC)
418                         oit.it_flags |= FMODE_WRITE;
419
420                 /* kernel only call f_op->open in dentry_open.  filp_open calls
421                  * dentry_open after call to open_namei that checks permissions.
422                  * Only nfsd_open call dentry_open directly without checking
423                  * permissions and because of that this code below is safe. */
424                 if (oit.it_flags & FMODE_WRITE)
425                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
426
427                 /* We do not want O_EXCL here, presumably we opened the file
428                  * already? XXX - NFS implications? */
429                 oit.it_flags &= ~O_EXCL;
430
431                 it = &oit;
432         }
433
434 restart:
435         /* Let's see if we have file open on MDS already. */
436         if (it->it_flags & FMODE_WRITE) {
437                 och_p = &lli->lli_mds_write_och;
438                 och_usecount = &lli->lli_open_fd_write_count;
439         } else if (it->it_flags & FMODE_EXEC) {
440                 och_p = &lli->lli_mds_exec_och;
441                 och_usecount = &lli->lli_open_fd_exec_count;
442          } else {
443                 och_p = &lli->lli_mds_read_och;
444                 och_usecount = &lli->lli_open_fd_read_count;
445         }
446
447         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
448                  it->d.lustre.it_disposition);
449
450         down(&lli->lli_och_sem);
451         if (*och_p) { /* Open handle is present */
452                 if (it_disposition(it, DISP_OPEN_OPEN)) {
453                         /* Well, there's extra open request that we do not need,
454                            let's close it somehow. This will decref request. */
455                         rc = it_open_error(DISP_OPEN_OPEN, it);
456                         if (rc) {
457                                 ll_file_data_put(fd);
458                                 GOTO(out_och_free, rc);
459                         }       
460                         ll_release_openhandle(file->f_dentry, it);
461                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
462                                              LPROC_LL_OPEN);
463                 }
464                 (*och_usecount)++;
465
466                 rc = ll_local_open(file, it, fd, NULL);
467
468                 LASSERTF(rc == 0, "rc = %d\n", rc);
469         } else {
470                 LASSERT(*och_usecount == 0);
471                 if (!it->d.lustre.it_disposition) {
472                         /* We cannot just request lock handle now, new ELC code
473                            means that one of other OPEN locks for this file
474                            could be cancelled, and since blocking ast handler
475                            would attempt to grab och_sem as well, that would
476                            result in a deadlock */
477                         up(&lli->lli_och_sem);
478                         rc = ll_intent_file_open(file, NULL, 0, it);
479                         if (rc) {
480                                 ll_file_data_put(fd);
481                                 GOTO(out_openerr, rc);
482                         }
483
484                         /* Got some error? Release the request */
485                         if (it->d.lustre.it_status < 0) {
486                                 req = it->d.lustre.it_data;
487                                 ptlrpc_req_finished(req);
488                         }
489                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
490                                           file->f_dentry->d_inode);
491                         goto restart;
492                 }
493  
494                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
495                 if (!*och_p) {
496                         ll_file_data_put(fd);
497                         GOTO(out_och_free, rc = -ENOMEM);
498                 }
499                 (*och_usecount)++;
500                req = it->d.lustre.it_data;
501
502                 /* mdc_intent_lock() didn't get a request ref if there was an
503                  * open error, so don't do cleanup on the request here
504                  * (bug 3430) */
505                 /* XXX (green): Should not we bail out on any error here, not
506                  * just open error? */
507                 rc = it_open_error(DISP_OPEN_OPEN, it);
508                 if (rc) {
509                         ll_file_data_put(fd);
510                         GOTO(out_och_free, rc);
511                 }
512
513                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
514                 rc = ll_local_open(file, it, fd, *och_p);
515                 LASSERTF(rc == 0, "rc = %d\n", rc);
516         }
517         up(&lli->lli_och_sem);
518
519         /* Must do this outside lli_och_sem lock to prevent deadlock where
520            different kind of OPEN lock for this same inode gets cancelled
521            by ldlm_cancel_lru */
522         if (!S_ISREG(inode->i_mode))
523                 GOTO(out, rc);
524
525         lsm = lli->lli_smd;
526         if (lsm == NULL) {
527                 if (file->f_flags & O_LOV_DELAY_CREATE ||
528                     !(file->f_mode & FMODE_WRITE)) {
529                         CDEBUG(D_INODE, "object creation was delayed\n");
530                         GOTO(out, rc);
531                 }
532         }
533         file->f_flags &= ~O_LOV_DELAY_CREATE;
534         GOTO(out, rc);
535  out:
536         ptlrpc_req_finished(req);
537         if (req)
538                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
539         if (rc == 0) {
540                 ll_open_complete(inode);
541         } else {
542 out_och_free:
543                 if (*och_p) {
544                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
545                         *och_p = NULL; /* OBD_FREE writes some magic there */
546                         (*och_usecount)--;
547                 }
548                 up(&lli->lli_och_sem);
549 out_openerr:
550                 lli->lli_opendir_pid = 0;
551         }
552         return rc;
553 }
554
555 /* Fills the obdo with the attributes for the inode defined by lsm */
556 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
557                    struct obdo *oa)
558 {
559         struct ptlrpc_request_set *set;
560         struct obd_info oinfo = { { { 0 } } };
561         int rc;
562         ENTRY;
563
564         LASSERT(lsm != NULL);
565
566         memset(oa, 0, sizeof *oa);
567         oinfo.oi_md = lsm;
568         oinfo.oi_oa = oa;
569         oa->o_id = lsm->lsm_object_id;
570         oa->o_mode = S_IFREG;
571         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
572                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
573                 OBD_MD_FLCTIME;
574
575         set = ptlrpc_prep_set();
576         if (set == NULL) {
577                 rc = -ENOMEM;
578         } else {
579                 rc = obd_getattr_async(exp, &oinfo, set);
580                 if (rc == 0)
581                         rc = ptlrpc_set_wait(set);
582                 ptlrpc_set_destroy(set);
583         }
584         if (rc)
585                 RETURN(rc);
586
587         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
588                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
589         RETURN(0);
590 }
591
592 static inline void ll_remove_suid(struct inode *inode)
593 {
594         unsigned int mode;
595
596         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
597         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
598
599         /* was any of the uid bits set? */
600         mode &= inode->i_mode;
601         if (mode && !capable(CAP_FSETID)) {
602                 inode->i_mode &= ~mode;
603                 // XXX careful here - we cannot change the size
604         }
605 }
606
607 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
608 {
609         struct ll_inode_info *lli = ll_i2info(inode);
610         struct lov_stripe_md *lsm = lli->lli_smd;
611         struct obd_export *exp = ll_i2obdexp(inode);
612         struct {
613                 char name[16];
614                 struct ldlm_lock *lock;
615                 struct lov_stripe_md *lsm;
616         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
617         __u32 stripe, vallen = sizeof(stripe);
618         int rc;
619         ENTRY;
620
621         if (lsm->lsm_stripe_count == 1)
622                 GOTO(check, stripe = 0);
623
624         /* get our offset in the lov */
625         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
626         if (rc != 0) {
627                 CERROR("obd_get_info: rc = %d\n", rc);
628                 RETURN(rc);
629         }
630         LASSERT(stripe < lsm->lsm_stripe_count);
631
632 check:
633         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
634             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[1]){
635                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
636                            lsm->lsm_oinfo[stripe]->loi_id,
637                            lsm->lsm_oinfo[stripe]->loi_gr);
638                 RETURN(-ELDLM_NO_LOCK_DATA);
639         }
640
641         RETURN(stripe);
642 }
643
644 /* Get extra page reference to ensure it is not going away */
645 void ll_pin_extent_cb(void *data)
646 {
647         struct page *page = data;
648         
649         page_cache_get(page);
650
651         return;
652 }
653 /* Flush the page from page cache for an extent as its canceled.
654  * Page to remove is delivered as @data.
655  *
656  * No one can dirty the extent until we've finished our work and they cannot
657  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
658  * but other kernel actors could have pages locked.
659  *
660  * If @discard is set, there is no need to write the page if it is dirty.
661  *
662  * Called with the DLM lock held. */
663 int ll_page_removal_cb(void *data, int discard)
664 {
665         int rc;
666         struct page *page = data;
667         struct address_space *mapping;
668
669         ENTRY;
670
671         /* We have page reference already from ll_pin_page */
672         lock_page(page);
673
674         /* Already truncated by somebody */
675         if (!page->mapping)
676                 GOTO(out, rc = 0);
677
678         mapping = page->mapping;
679
680         ll_teardown_mmaps(mapping,
681                           (__u64)page->index << PAGE_CACHE_SHIFT,
682                           ((__u64)page->index<<PAGE_CACHE_SHIFT)|
683                                                               ~PAGE_CACHE_MASK);        
684         LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
685
686         if (!discard && clear_page_dirty_for_io(page)) {
687                 LASSERT(page->mapping);
688                 rc = ll_call_writepage(page->mapping->host, page);
689                 /* either waiting for io to complete or reacquiring
690                  * the lock that the failed writepage released */
691                 lock_page(page);
692                 wait_on_page_writeback(page);
693                 if (rc != 0) {
694                         CERROR("writepage inode %lu(%p) of page %p "
695                                "failed: %d\n", mapping->host->i_ino,
696                                mapping->host, page, rc);
697 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
698                         if (rc == -ENOSPC)
699                                 set_bit(AS_ENOSPC, &mapping->flags);
700                         else
701                                 set_bit(AS_EIO, &mapping->flags);
702 #else
703                         mapping->gfp_mask |= AS_EIO_MASK;
704 #endif
705                 }
706         }
707         if (page->mapping != NULL) {
708                 struct ll_async_page *llap = llap_cast_private(page);
709                 // checking again to account for writeback's lock_page()
710                 LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
711                 if (llap)
712                         ll_ra_accounting(llap, page->mapping);
713                 ll_truncate_complete_page(page);
714         }
715         EXIT;
716 out:
717         LASSERT(!PageWriteback(page));
718         unlock_page(page);
719         page_cache_release(page);
720
721         return 0;
722 }
723
724 int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
725                              void *data, int flag)
726 {
727         struct inode *inode;
728         struct ll_inode_info *lli;
729         struct lov_stripe_md *lsm;
730         int stripe;
731         __u64 kms;
732
733         ENTRY;
734
735         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
736                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
737                 LBUG();
738         }
739
740         inode = ll_inode_from_lock(lock);
741         if (inode == NULL)
742                 RETURN(0);
743         lli = ll_i2info(inode);
744         if (lli == NULL)
745                 GOTO(iput, 0);
746         if (lli->lli_smd == NULL)
747                 GOTO(iput, 0);
748         lsm = lli->lli_smd;
749
750         stripe = ll_lock_to_stripe_offset(inode, lock);
751         if (stripe < 0)
752                 GOTO(iput, 0);
753
754         lov_stripe_lock(lsm);
755         lock_res_and_lock(lock);
756         kms = ldlm_extent_shift_kms(lock,
757                                     lsm->lsm_oinfo[stripe]->loi_kms);
758
759         if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
760                 LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
761                            lsm->lsm_oinfo[stripe]->loi_kms, kms);
762         lsm->lsm_oinfo[stripe]->loi_kms = kms;
763         unlock_res_and_lock(lock);
764         lov_stripe_unlock(lsm);
765         ll_try_done_writing(inode);
766         EXIT;
767 iput:
768         iput(inode);
769
770         return 0;
771 }
772
773 #if 0
774 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
775 {
776         /* XXX ALLOCATE - 160 bytes */
777         struct inode *inode = ll_inode_from_lock(lock);
778         struct ll_inode_info *lli = ll_i2info(inode);
779         struct lustre_handle lockh = { 0 };
780         struct ost_lvb *lvb;
781         int stripe;
782         ENTRY;
783
784         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
785                      LDLM_FL_BLOCK_CONV)) {
786                 LBUG(); /* not expecting any blocked async locks yet */
787                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
788                            "lock, returning");
789                 ldlm_lock_dump(D_OTHER, lock, 0);
790                 ldlm_reprocess_all(lock->l_resource);
791                 RETURN(0);
792         }
793
794         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
795
796         stripe = ll_lock_to_stripe_offset(inode, lock);
797         if (stripe < 0)
798                 goto iput;
799
800         if (lock->l_lvb_len) {
801                 struct lov_stripe_md *lsm = lli->lli_smd;
802                 __u64 kms;
803                 lvb = lock->l_lvb_data;
804                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
805
806                 lock_res_and_lock(lock);
807                 ll_inode_size_lock(inode, 1);
808                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
809                 kms = ldlm_extent_shift_kms(NULL, kms);
810                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
811                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
812                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
813                 lsm->lsm_oinfo[stripe].loi_kms = kms;
814                 ll_inode_size_unlock(inode, 1);
815                 unlock_res_and_lock(lock);
816         }
817
818 iput:
819         iput(inode);
820         wake_up(&lock->l_waitq);
821
822         ldlm_lock2handle(lock, &lockh);
823         ldlm_lock_decref(&lockh, LCK_PR);
824         RETURN(0);
825 }
826 #endif
827
828 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
829 {
830         struct ptlrpc_request *req = reqp;
831         struct inode *inode = ll_inode_from_lock(lock);
832         struct ll_inode_info *lli;
833         struct lov_stripe_md *lsm;
834         struct ost_lvb *lvb;
835         int rc, stripe;
836         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
837         ENTRY;
838
839         if (inode == NULL)
840                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
841         lli = ll_i2info(inode);
842         if (lli == NULL)
843                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
844         lsm = lli->lli_smd;
845         if (lsm == NULL)
846                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
847
848         /* First, find out which stripe index this lock corresponds to. */
849         stripe = ll_lock_to_stripe_offset(inode, lock);
850         if (stripe < 0)
851                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
852
853         rc = lustre_pack_reply(req, 2, size, NULL);
854         if (rc)
855                 GOTO(iput, rc);
856
857         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
858         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
859         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
860         lvb->lvb_atime = LTIME_S(inode->i_atime);
861         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
862
863         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
864                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
865                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
866                    lvb->lvb_atime, lvb->lvb_ctime);
867  iput:
868         iput(inode);
869
870  out:
871         /* These errors are normal races, so we don't want to fill the console
872          * with messages by calling ptlrpc_error() */
873         if (rc == -ELDLM_NO_LOCK_DATA)
874                 lustre_pack_reply(req, 1, NULL, NULL);
875
876         req->rq_status = rc;
877         return rc;
878 }
879
880 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
881                      lstat_t *st)
882 {
883         struct lustre_handle lockh = { 0 };
884         struct ldlm_enqueue_info einfo = { 0 };
885         struct obd_info oinfo = { { { 0 } } };
886         struct ost_lvb lvb;
887         int rc;
888         
889         ENTRY;
890         
891         einfo.ei_type = LDLM_EXTENT;
892         einfo.ei_mode = LCK_PR;
893         einfo.ei_cb_bl = osc_extent_blocking_cb;
894         einfo.ei_cb_cp = ldlm_completion_ast;
895         einfo.ei_cb_gl = ll_glimpse_callback;
896         einfo.ei_cbdata = NULL;
897
898         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
899         oinfo.oi_lockh = &lockh;
900         oinfo.oi_md = lsm;
901         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
902
903         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
904         if (rc == -ENOENT)
905                 RETURN(rc);
906         if (rc != 0) {
907                 CERROR("obd_enqueue returned rc %d, "
908                        "returning -EIO\n", rc);
909                 RETURN(rc > 0 ? -EIO : rc);
910         }
911         
912         lov_stripe_lock(lsm);
913         memset(&lvb, 0, sizeof(lvb));
914         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
915         st->st_size = lvb.lvb_size;
916         st->st_blocks = lvb.lvb_blocks;
917         st->st_mtime = lvb.lvb_mtime;
918         st->st_atime = lvb.lvb_atime;
919         st->st_ctime = lvb.lvb_ctime;
920         lov_stripe_unlock(lsm);
921         
922         RETURN(rc);
923 }
924
925 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
926  * file (because it prefers KMS over RSS when larger) */
927 int ll_glimpse_size(struct inode *inode, int ast_flags)
928 {
929         struct ll_inode_info *lli = ll_i2info(inode);
930         struct ll_sb_info *sbi = ll_i2sbi(inode);
931         struct lustre_handle lockh = { 0 };
932         struct ldlm_enqueue_info einfo = { 0 };
933         struct obd_info oinfo = { { { 0 } } };
934         struct ost_lvb lvb;
935         int rc;
936         ENTRY;
937
938         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
939
940         if (!lli->lli_smd) {
941                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
942                 RETURN(0);
943         }
944
945         /* NOTE: this looks like DLM lock request, but it may not be one. Due
946          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
947          *       won't revoke any conflicting DLM locks held. Instead,
948          *       ll_glimpse_callback() will be called on each client
949          *       holding a DLM lock against this file, and resulting size
950          *       will be returned for each stripe. DLM lock on [0, EOF] is
951          *       acquired only if there were no conflicting locks. */
952         einfo.ei_type = LDLM_EXTENT;
953         einfo.ei_mode = LCK_PR;
954         einfo.ei_cb_bl = osc_extent_blocking_cb;
955         einfo.ei_cb_cp = ldlm_completion_ast;
956         einfo.ei_cb_gl = ll_glimpse_callback;
957         einfo.ei_cbdata = inode;
958
959         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
960         oinfo.oi_lockh = &lockh;
961         oinfo.oi_md = lli->lli_smd;
962         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
963
964         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
965         if (rc == -ENOENT)
966                 RETURN(rc);
967         if (rc != 0) {
968                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
969                 RETURN(rc > 0 ? -EIO : rc);
970         }
971
972         ll_inode_size_lock(inode, 1);
973         inode_init_lvb(inode, &lvb);
974         rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
975         i_size_write(inode, lvb.lvb_size);
976         inode->i_blocks = lvb.lvb_blocks;
977         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
978         LTIME_S(inode->i_atime) = lvb.lvb_atime;
979         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
980         ll_inode_size_unlock(inode, 1);
981
982         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
983                i_size_read(inode), (long long)inode->i_blocks);
984
985         RETURN(rc);
986 }
987
988 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
989                    struct lov_stripe_md *lsm, int mode,
990                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
991                    int ast_flags)
992 {
993         struct ll_sb_info *sbi = ll_i2sbi(inode);
994         struct ost_lvb lvb;
995         struct ldlm_enqueue_info einfo = { 0 };
996         struct obd_info oinfo = { { { 0 } } };
997         int rc;
998         ENTRY;
999
1000         LASSERT(!lustre_handle_is_used(lockh));
1001         LASSERT(lsm != NULL);
1002
1003         /* don't drop the mmapped file to LRU */
1004         if (mapping_mapped(inode->i_mapping))
1005                 ast_flags |= LDLM_FL_NO_LRU;
1006
1007         /* XXX phil: can we do this?  won't it screw the file size up? */
1008         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1009             (sbi->ll_flags & LL_SBI_NOLCK))
1010                 RETURN(0);
1011
1012         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1013                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1014
1015         einfo.ei_type = LDLM_EXTENT;
1016         einfo.ei_mode = mode;
1017         einfo.ei_cb_bl = osc_extent_blocking_cb;
1018         einfo.ei_cb_cp = ldlm_completion_ast;
1019         einfo.ei_cb_gl = ll_glimpse_callback;
1020         einfo.ei_cbdata = inode;
1021
1022         oinfo.oi_policy = *policy;
1023         oinfo.oi_lockh = lockh;
1024         oinfo.oi_md = lsm;
1025         oinfo.oi_flags = ast_flags;
1026
1027         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);
1028         *policy = oinfo.oi_policy;
1029         if (rc > 0)
1030                 rc = -EIO;
1031
1032         ll_inode_size_lock(inode, 1);
1033         inode_init_lvb(inode, &lvb);
1034         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1035
1036         if (policy->l_extent.start == 0 &&
1037             policy->l_extent.end == OBD_OBJECT_EOF) {
1038                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1039                  * the kms under both a DLM lock and the
1040                  * ll_inode_size_lock().  If we don't get the
1041                  * ll_inode_size_lock() here we can match the DLM lock and
1042                  * reset i_size from the kms before the truncating path has
1043                  * updated the kms.  generic_file_write can then trust the
1044                  * stale i_size when doing appending writes and effectively
1045                  * cancel the result of the truncate.  Getting the
1046                  * ll_inode_size_lock() after the enqueue maintains the DLM
1047                  * -> ll_inode_size_lock() acquiring order. */
1048                 i_size_write(inode, lvb.lvb_size);
1049                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1050                        inode->i_ino, i_size_read(inode));
1051         }
1052
1053         if (rc == 0) {
1054                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1055                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1056                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1057         }
1058         ll_inode_size_unlock(inode, 1);
1059
1060         RETURN(rc);
1061 }
1062
1063 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1064                      struct lov_stripe_md *lsm, int mode,
1065                      struct lustre_handle *lockh)
1066 {
1067         struct ll_sb_info *sbi = ll_i2sbi(inode);
1068         int rc;
1069         ENTRY;
1070
1071         /* XXX phil: can we do this?  won't it screw the file size up? */
1072         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1073             (sbi->ll_flags & LL_SBI_NOLCK))
1074                 RETURN(0);
1075
1076         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1077
1078         RETURN(rc);
1079 }
1080
1081 static void ll_set_file_contended(struct inode *inode)
1082 {
1083         struct ll_inode_info *lli = ll_i2info(inode);
1084
1085         lli->lli_contention_time = cfs_time_current();
1086         set_bit(LLI_F_CONTENDED, &lli->lli_flags);
1087 }
1088
1089 void ll_clear_file_contended(struct inode *inode)
1090 {
1091         struct ll_inode_info *lli = ll_i2info(inode);
1092
1093         clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
1094 }
1095
1096 static int ll_is_file_contended(struct file *file)
1097 {
1098         struct inode *inode = file->f_dentry->d_inode;
1099         struct ll_inode_info *lli = ll_i2info(inode);
1100         struct ll_sb_info *sbi = ll_i2sbi(inode);
1101         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1102         ENTRY;
1103
1104         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1105                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1106                        " osc connect flags = 0x"LPX64"\n",
1107                        sbi->ll_lco.lco_flags);
1108                 RETURN(0);
1109         }
1110         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1111                 RETURN(1);
1112         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
1113                 cfs_time_t cur_time = cfs_time_current();
1114                 cfs_time_t retry_time;
1115
1116                 retry_time = cfs_time_add(
1117                         lli->lli_contention_time,
1118                         cfs_time_seconds(sbi->ll_contention_time));
1119                 if (cfs_time_after(cur_time, retry_time)) {
1120                         ll_clear_file_contended(inode);
1121                         RETURN(0);
1122                 }
1123                 RETURN(1);
1124         }
1125         RETURN(0);
1126 }
1127
1128 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1129                                  const char *buf, size_t count,
1130                                  loff_t start, loff_t end, int rw)
1131 {
1132         int append;
1133         int tree_locked = 0;
1134         int rc;
1135         struct inode * inode = file->f_dentry->d_inode;
1136
1137         append = (rw == WRITE) && (file->f_flags & O_APPEND);
1138
1139         if (append || !ll_is_file_contended(file)) {
1140                 struct ll_lock_tree_node *node;
1141                 int ast_flags;
1142
1143                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1144                 if (file->f_flags & O_NONBLOCK)
1145                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1146                 node = ll_node_from_inode(inode, start, end,
1147                                           (rw == WRITE) ? LCK_PW : LCK_PR);
1148                 if (IS_ERR(node)) {
1149                         rc = PTR_ERR(node);
1150                         GOTO(out, rc);
1151                 }
1152                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1153                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1154                 if (rc == 0)
1155                         tree_locked = 1;
1156                 else if (rc == -EUSERS)
1157                         ll_set_file_contended(inode);
1158                 else
1159                         GOTO(out, rc);
1160         }
1161         RETURN(tree_locked);
1162 out:
1163         return rc;
1164 }
1165
1166 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1167                             loff_t *ppos)
1168 {
1169         struct inode *inode = file->f_dentry->d_inode;
1170         struct ll_inode_info *lli = ll_i2info(inode);
1171         struct lov_stripe_md *lsm = lli->lli_smd;
1172         struct ll_sb_info *sbi = ll_i2sbi(inode);
1173         struct ll_lock_tree tree;
1174         struct ost_lvb lvb;
1175         struct ll_ra_read bead;
1176         int ra = 0;
1177         loff_t end;
1178         ssize_t retval, chunk, sum = 0;
1179         int tree_locked;
1180
1181         __u64 kms;
1182         ENTRY;
1183         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1184                inode->i_ino, inode->i_generation, inode, count, *ppos);
1185         /* "If nbyte is 0, read() will return 0 and have no other results."
1186          *                      -- Single Unix Spec */
1187         if (count == 0)
1188                 RETURN(0);
1189
1190         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1191
1192         if (!lsm) {
1193                 /* Read on file with no objects should return zero-filled
1194                  * buffers up to file size (we can get non-zero sizes with
1195                  * mknod + truncate, then opening file for read. This is a
1196                  * common pattern in NFS case, it seems). Bug 6243 */
1197                 int notzeroed;
1198                 /* Since there are no objects on OSTs, we have nothing to get
1199                  * lock on and so we are forced to access inode->i_size
1200                  * unguarded */
1201
1202                 /* Read beyond end of file */
1203                 if (*ppos >= i_size_read(inode))
1204                         RETURN(0);
1205
1206                 if (count > i_size_read(inode) - *ppos)
1207                         count = i_size_read(inode) - *ppos;
1208                 /* Make sure to correctly adjust the file pos pointer for
1209                  * EFAULT case */
1210                 notzeroed = clear_user(buf, count);
1211                 count -= notzeroed;
1212                 *ppos += count;
1213                 if (!count)
1214                         RETURN(-EFAULT);
1215                 RETURN(count);
1216         }
1217 repeat:
1218         if (sbi->ll_max_rw_chunk != 0) {
1219                 /* first, let's know the end of the current stripe */
1220                 end = *ppos;
1221                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1222                                 (obd_off *)&end);
1223
1224                 /* correct, the end is beyond the request */
1225                 if (end > *ppos + count - 1)
1226                         end = *ppos + count - 1;
1227
1228                 /* and chunk shouldn't be too large even if striping is wide */
1229                 if (end - *ppos > sbi->ll_max_rw_chunk)
1230                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1231         } else {
1232                 end = *ppos + count - 1;
1233         }
1234
1235         tree_locked = ll_file_get_tree_lock(&tree, file, buf,
1236                                             count, *ppos, end, READ);
1237         if (tree_locked < 0)
1238                 GOTO(out, retval = tree_locked);
1239
1240         ll_inode_size_lock(inode, 1);
1241         /*
1242          * Consistency guarantees: following possibilities exist for the
1243          * relation between region being read and real file size at this
1244          * moment:
1245          *
1246          *  (A): the region is completely inside of the file;
1247          *
1248          *  (B-x): x bytes of region are inside of the file, the rest is
1249          *  outside;
1250          *
1251          *  (C): the region is completely outside of the file.
1252          *
1253          * This classification is stable under DLM lock acquired by
1254          * ll_tree_lock() above, because to change class, other client has to
1255          * take DLM lock conflicting with our lock. Also, any updates to
1256          * ->i_size by other threads on this client are serialized by
1257          * ll_inode_size_lock(). This guarantees that short reads are handled
1258          * correctly in the face of concurrent writes and truncates.
1259          */
1260         inode_init_lvb(inode, &lvb);
1261         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1262         kms = lvb.lvb_size;
1263         if (*ppos + count - 1 > kms) {
1264                 /* A glimpse is necessary to determine whether we return a
1265                  * short read (B) or some zeroes at the end of the buffer (C) */
1266                 ll_inode_size_unlock(inode, 1);
1267                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1268                 if (retval) {
1269                         ll_tree_unlock(&tree);
1270                         goto out;
1271                 }
1272         } else {
1273                 /* region is within kms and, hence, within real file size (A).
1274                  * We need to increase i_size to cover the read region so that
1275                  * generic_file_read() will do its job, but that doesn't mean
1276                  * the kms size is _correct_, it is only the _minimum_ size.
1277                  * If someone does a stat they will get the correct size which
1278                  * will always be >= the kms value here.  b=11081 */
1279                 if (i_size_read(inode) < kms)
1280                         i_size_write(inode, kms);
1281                 ll_inode_size_unlock(inode, 1);
1282         }
1283
1284         chunk = end - *ppos + 1;
1285         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1286                inode->i_ino, chunk, *ppos, i_size_read(inode));
1287
1288         /* turn off the kernel's read-ahead */
1289         if (tree_locked) {
1290 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1291                 file->f_ramax = 0;
1292 #else
1293                 file->f_ra.ra_pages = 0;
1294 #endif
1295                 /* initialize read-ahead window once per syscall */
1296                 if (ra == 0) {
1297                         ra = 1;
1298                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1299                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1300                         ll_ra_read_in(file, &bead);
1301                 }
1302
1303                 /* BUG: 5972 */
1304                 file_accessed(file);
1305                 retval = generic_file_read(file, buf, chunk, ppos);
1306                 ll_tree_unlock(&tree);
1307         } else {
1308                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1309         }
1310         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1311         if (retval > 0) {
1312                 buf += retval;
1313                 count -= retval;
1314                 sum += retval;
1315                 if (retval == chunk && count > 0)
1316                         goto repeat;
1317         }
1318
1319  out:
1320         if (ra != 0)
1321                 ll_ra_read_ex(file, &bead);
1322         retval = (sum > 0) ? sum : retval;
1323         RETURN(retval);
1324 }
1325
1326 /*
1327  * Write to a file (through the page cache).
1328  */
1329 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1330                              loff_t *ppos)
1331 {
1332         struct inode *inode = file->f_dentry->d_inode;
1333         struct ll_sb_info *sbi = ll_i2sbi(inode);
1334         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1335         struct ll_lock_tree tree;
1336         loff_t maxbytes = ll_file_maxbytes(inode);
1337         loff_t lock_start, lock_end, end;
1338         ssize_t retval, chunk, sum = 0;
1339         int tree_locked;
1340         ENTRY;
1341
1342         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1343                inode->i_ino, inode->i_generation, inode, count, *ppos);
1344         
1345         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1346
1347         /* POSIX, but surprised the VFS doesn't check this already */
1348         if (count == 0)
1349                 RETURN(0);
1350
1351         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1352          * called on the file, don't fail the below assertion (bug 2388). */
1353         if (file->f_flags & O_LOV_DELAY_CREATE &&
1354             ll_i2info(inode)->lli_smd == NULL)
1355                 RETURN(-EBADF);
1356
1357         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1358
1359         down(&ll_i2info(inode)->lli_write_sem);
1360
1361 repeat:
1362         chunk = 0; /* just to fix gcc's warning */
1363         end = *ppos + count - 1;
1364
1365         if (file->f_flags & O_APPEND) {
1366                 lock_start = 0;
1367                 lock_end = OBD_OBJECT_EOF;
1368         } else if (sbi->ll_max_rw_chunk != 0) {
1369                 /* first, let's know the end of the current stripe */
1370                 end = *ppos;
1371                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
1372                                 (obd_off *)&end);
1373
1374                 /* correct, the end is beyond the request */
1375                 if (end > *ppos + count - 1)
1376                         end = *ppos + count - 1;
1377
1378                 /* and chunk shouldn't be too large even if striping is wide */
1379                 if (end - *ppos > sbi->ll_max_rw_chunk)
1380                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1381                 lock_start = *ppos;
1382                 lock_end = end;
1383         } else {
1384                 lock_start = *ppos;
1385                 lock_end = *ppos + count - 1;
1386         }
1387
1388         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1389                                             lock_start, lock_end, WRITE);
1390         if (tree_locked < 0)
1391                 GOTO(out, retval = tree_locked);
1392
1393         /* This is ok, g_f_w will overwrite this under i_sem if it races
1394          * with a local truncate, it just makes our maxbyte checking easier.
1395          * The i_size value gets updated in ll_extent_lock() as a consequence
1396          * of the [0,EOF] extent lock we requested above. */
1397         if (file->f_flags & O_APPEND) {
1398                 *ppos = i_size_read(inode);
1399                 end = *ppos + count - 1;
1400         }
1401
1402         if (*ppos >= maxbytes) {
1403                 send_sig(SIGXFSZ, current, 0);
1404                 GOTO(out_unlock, retval = -EFBIG);
1405         }
1406         if (end > maxbytes - 1)
1407                 end = maxbytes - 1;
1408
1409         /* generic_file_write handles O_APPEND after getting i_mutex */
1410         chunk = end - *ppos + 1;
1411         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1412                inode->i_ino, chunk, *ppos);
1413         if (tree_locked)
1414                 retval = generic_file_write(file, buf, chunk, ppos);
1415         else
1416                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1417                                              ppos, WRITE);
1418         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1419
1420 out_unlock:
1421         if (tree_locked)
1422                 ll_tree_unlock(&tree);
1423
1424 out:
1425         if (retval > 0) {
1426                 buf += retval;
1427                 count -= retval;
1428                 sum += retval;
1429                 if (retval == chunk && count > 0)
1430                         goto repeat;
1431         }
1432
1433         up(&ll_i2info(inode)->lli_write_sem);
1434
1435         retval = (sum > 0) ? sum : retval;
1436         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1437                            retval > 0 ? retval : 0);
1438         RETURN(retval);
1439 }
1440
1441 /*
1442  * Send file content (through pagecache) somewhere with helper
1443  */
1444 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1445 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1446                                 read_actor_t actor, void *target)
1447 {
1448         struct inode *inode = in_file->f_dentry->d_inode;
1449         struct ll_inode_info *lli = ll_i2info(inode);
1450         struct lov_stripe_md *lsm = lli->lli_smd;
1451         struct ll_lock_tree tree;
1452         struct ll_lock_tree_node *node;
1453         struct ost_lvb lvb;
1454         struct ll_ra_read bead;
1455         int rc;
1456         ssize_t retval;
1457         __u64 kms;
1458         ENTRY;
1459         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1460                inode->i_ino, inode->i_generation, inode, count, *ppos);
1461
1462         /* "If nbyte is 0, read() will return 0 and have no other results."
1463          *                      -- Single Unix Spec */
1464         if (count == 0)
1465                 RETURN(0);
1466
1467         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1468         /* turn off the kernel's read-ahead */
1469         in_file->f_ra.ra_pages = 0;
1470
1471         /* File with no objects, nothing to lock */
1472         if (!lsm)
1473                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1474
1475         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1476         if (IS_ERR(node))
1477                 RETURN(PTR_ERR(node));
1478
1479         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1480         rc = ll_tree_lock(&tree, node, NULL, count,
1481                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1482         if (rc != 0)
1483                 RETURN(rc);
1484
1485         ll_clear_file_contended(inode);
1486         ll_inode_size_lock(inode, 1);
1487         /*
1488          * Consistency guarantees: following possibilities exist for the
1489          * relation between region being read and real file size at this
1490          * moment:
1491          *
1492          *  (A): the region is completely inside of the file;
1493          *
1494          *  (B-x): x bytes of region are inside of the file, the rest is
1495          *  outside;
1496          *
1497          *  (C): the region is completely outside of the file.
1498          *
1499          * This classification is stable under DLM lock acquired by
1500          * ll_tree_lock() above, because to change class, other client has to
1501          * take DLM lock conflicting with our lock. Also, any updates to
1502          * ->i_size by other threads on this client are serialized by
1503          * ll_inode_size_lock(). This guarantees that short reads are handled
1504          * correctly in the face of concurrent writes and truncates.
1505          */
1506         inode_init_lvb(inode, &lvb);
1507         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1508         kms = lvb.lvb_size;
1509         if (*ppos + count - 1 > kms) {
1510                 /* A glimpse is necessary to determine whether we return a
1511                  * short read (B) or some zeroes at the end of the buffer (C) */
1512                 ll_inode_size_unlock(inode, 1);
1513                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1514                 if (retval)
1515                         goto out;
1516         } else {
1517                 /* region is within kms and, hence, within real file size (A) */
1518                 i_size_write(inode, kms);
1519                 ll_inode_size_unlock(inode, 1);
1520         }
1521
1522         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1523                inode->i_ino, count, *ppos, i_size_read(inode));
1524
1525         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1526         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1527         ll_ra_read_in(in_file, &bead);
1528         /* BUG: 5972 */
1529         file_accessed(in_file);
1530         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1531         ll_ra_read_ex(in_file, &bead);
1532
1533  out:
1534         ll_tree_unlock(&tree);
1535         RETURN(retval);
1536 }
1537 #endif
1538
1539 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1540                                unsigned long arg)
1541 {
1542         struct ll_inode_info *lli = ll_i2info(inode);
1543         struct obd_export *exp = ll_i2obdexp(inode);
1544         struct ll_recreate_obj ucreatp;
1545         struct obd_trans_info oti = { 0 };
1546         struct obdo *oa = NULL;
1547         int lsm_size;
1548         int rc = 0;
1549         struct lov_stripe_md *lsm, *lsm2;
1550         ENTRY;
1551
1552         if (!capable (CAP_SYS_ADMIN))
1553                 RETURN(-EPERM);
1554
1555         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1556                             sizeof(struct ll_recreate_obj));
1557         if (rc) {
1558                 RETURN(-EFAULT);
1559         }
1560         OBDO_ALLOC(oa);
1561         if (oa == NULL)
1562                 RETURN(-ENOMEM);
1563
1564         down(&lli->lli_size_sem);
1565         lsm = lli->lli_smd;
1566         if (lsm == NULL)
1567                 GOTO(out, rc = -ENOENT);
1568         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1569                    (lsm->lsm_stripe_count));
1570
1571         OBD_ALLOC(lsm2, lsm_size);
1572         if (lsm2 == NULL)
1573                 GOTO(out, rc = -ENOMEM);
1574
1575         oa->o_id = ucreatp.lrc_id;
1576         oa->o_nlink = ucreatp.lrc_ost_idx;
1577         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1578         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1579         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1580                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1581
1582         memcpy(lsm2, lsm, lsm_size);
1583         rc = obd_create(exp, oa, &lsm2, &oti);
1584
1585         OBD_FREE(lsm2, lsm_size);
1586         GOTO(out, rc);
1587 out:
1588         up(&lli->lli_size_sem);
1589         OBDO_FREE(oa);
1590         return rc;
1591 }
1592
1593 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1594                                     int flags, struct lov_user_md *lum,
1595                                     int lum_size)
1596 {
1597         struct ll_inode_info *lli = ll_i2info(inode);
1598         struct lov_stripe_md *lsm;
1599         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1600         int rc = 0;
1601         ENTRY;
1602
1603         down(&lli->lli_size_sem);
1604         lsm = lli->lli_smd;
1605         if (lsm) {
1606                 up(&lli->lli_size_sem);
1607                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1608                        inode->i_ino);
1609                 RETURN(-EEXIST);
1610         }
1611
1612         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1613         if (rc)
1614                 GOTO(out, rc);
1615         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1616                 GOTO(out_req_free, rc = -ENOENT);
1617         rc = oit.d.lustre.it_status;
1618         if (rc < 0)
1619                 GOTO(out_req_free, rc);
1620
1621         ll_release_openhandle(file->f_dentry, &oit);
1622
1623  out:
1624         up(&lli->lli_size_sem);
1625         ll_intent_release(&oit);
1626         RETURN(rc);
1627 out_req_free:
1628         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1629         goto out;
1630 }
1631
1632 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1633                              struct lov_mds_md **lmmp, int *lmm_size, 
1634                              struct ptlrpc_request **request)
1635 {
1636         struct ll_sb_info *sbi = ll_i2sbi(inode);
1637         struct ll_fid  fid;
1638         struct mds_body  *body;
1639         struct lov_mds_md *lmm = NULL;
1640         struct ptlrpc_request *req = NULL;
1641         int rc, lmmsize;
1642
1643         ll_inode2fid(&fid, inode);
1644
1645         rc = ll_get_max_mdsize(sbi, &lmmsize);
1646         if (rc)
1647                 RETURN(rc);
1648
1649         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
1650                         filename, strlen(filename) + 1,
1651                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
1652                         lmmsize, &req);
1653         if (rc < 0) {
1654                 CDEBUG(D_INFO, "mdc_getattr_name failed "
1655                                 "on %s: rc %d\n", filename, rc);
1656                 GOTO(out, rc);
1657         }
1658
1659         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1660                         sizeof(*body));
1661         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1662         /* swabbed by mdc_getattr_name */
1663         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF));
1664
1665         lmmsize = body->eadatasize;
1666
1667         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1668                         lmmsize == 0) {
1669                 GOTO(out, rc = -ENODATA);
1670         }
1671
1672         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
1673                         lmmsize);
1674         LASSERT(lmm != NULL);
1675         LASSERT(lustre_rep_swabbed(req, REPLY_REC_OFF + 1));
1676
1677         /*
1678          * This is coming from the MDS, so is probably in
1679          * little endian.  We convert it to host endian before
1680          * passing it to userspace.
1681          */
1682         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1683                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1684                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1685         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1686                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1687         }
1688
1689         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1690                 struct lov_stripe_md *lsm;
1691                 struct lov_user_md_join *lmj;
1692                 int lmj_size, i, aindex = 0;
1693
1694                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
1695                 if (rc < 0)
1696                         GOTO(out, rc = -ENOMEM);
1697                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
1698                 if (rc)
1699                         GOTO(out_free_memmd, rc);
1700
1701                 lmj_size = sizeof(struct lov_user_md_join) +
1702                         lsm->lsm_stripe_count *
1703                         sizeof(struct lov_user_ost_data_join);
1704                 OBD_ALLOC(lmj, lmj_size);
1705                 if (!lmj)
1706                         GOTO(out_free_memmd, rc = -ENOMEM);
1707
1708                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1709                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1710                         struct lov_extent *lex =
1711                                 &lsm->lsm_array->lai_ext_array[aindex];
1712
1713                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1714                                 aindex ++;
1715                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1716                                         LPU64" len %d\n", aindex, i,
1717                                         lex->le_start, (int)lex->le_len);
1718                         lmj->lmm_objects[i].l_extent_start =
1719                                 lex->le_start;
1720
1721                         if ((int)lex->le_len == -1)
1722                                 lmj->lmm_objects[i].l_extent_end = -1;
1723                         else
1724                                 lmj->lmm_objects[i].l_extent_end =
1725                                         lex->le_start + lex->le_len;
1726                         lmj->lmm_objects[i].l_object_id =
1727                                 lsm->lsm_oinfo[i]->loi_id;
1728                         lmj->lmm_objects[i].l_object_gr =
1729                                 lsm->lsm_oinfo[i]->loi_gr;
1730                         lmj->lmm_objects[i].l_ost_gen =
1731                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1732                         lmj->lmm_objects[i].l_ost_idx =
1733                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1734                 }
1735                 lmm = (struct lov_mds_md *)lmj;
1736                 lmmsize = lmj_size;
1737 out_free_memmd:
1738                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
1739         }
1740 out:
1741         *lmmp = lmm;
1742         *lmm_size = lmmsize;
1743         *request = req;
1744         return rc;
1745 }
1746 static int ll_lov_setea(struct inode *inode, struct file *file,
1747                             unsigned long arg)
1748 {
1749         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1750         struct lov_user_md  *lump;
1751         int lum_size = sizeof(struct lov_user_md) +
1752                        sizeof(struct lov_user_ost_data);
1753         int rc;
1754         ENTRY;
1755
1756         if (!capable (CAP_SYS_ADMIN))
1757                 RETURN(-EPERM);
1758
1759         OBD_ALLOC(lump, lum_size);
1760         if (lump == NULL) {
1761                 RETURN(-ENOMEM);
1762         }
1763         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1764         if (rc) {
1765                 OBD_FREE(lump, lum_size);
1766                 RETURN(-EFAULT);
1767         }
1768
1769         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1770
1771         OBD_FREE(lump, lum_size);
1772         RETURN(rc);
1773 }
1774
1775 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1776                             unsigned long arg)
1777 {
1778         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1779         int rc;
1780         int flags = FMODE_WRITE;
1781         ENTRY;
1782
1783         /* Bug 1152: copy properly when this is no longer true */
1784         LASSERT(sizeof(lum) == sizeof(*lump));
1785         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1786         rc = copy_from_user(&lum, lump, sizeof(lum));
1787         if (rc)
1788                 RETURN(-EFAULT);
1789
1790         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1791         if (rc == 0) {
1792                  put_user(0, &lump->lmm_stripe_count);
1793                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
1794                                     0, ll_i2info(inode)->lli_smd, lump);
1795         }
1796         RETURN(rc);
1797 }
1798
1799 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1800 {
1801         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1802
1803         if (!lsm)
1804                 RETURN(-ENODATA);
1805
1806         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1807                             (void *)arg);
1808 }
1809
1810 static int ll_get_grouplock(struct inode *inode, struct file *file,
1811                             unsigned long arg)
1812 {
1813         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1814         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1815                                                     .end = OBD_OBJECT_EOF}};
1816         struct lustre_handle lockh = { 0 };
1817         struct ll_inode_info *lli = ll_i2info(inode);
1818         struct lov_stripe_md *lsm = lli->lli_smd;
1819         int flags = 0, rc;
1820         ENTRY;
1821
1822         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1823                 RETURN(-EINVAL);
1824         }
1825
1826         policy.l_extent.gid = arg;
1827         if (file->f_flags & O_NONBLOCK)
1828                 flags = LDLM_FL_BLOCK_NOWAIT;
1829
1830         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1831         if (rc)
1832                 RETURN(rc);
1833
1834         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1835         fd->fd_gid = arg;
1836         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1837
1838         RETURN(0);
1839 }
1840
1841 static int ll_put_grouplock(struct inode *inode, struct file *file,
1842                             unsigned long arg)
1843 {
1844         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1845         struct ll_inode_info *lli = ll_i2info(inode);
1846         struct lov_stripe_md *lsm = lli->lli_smd;
1847         int rc;
1848         ENTRY;
1849
1850         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1851                 /* Ugh, it's already unlocked. */
1852                 RETURN(-EINVAL);
1853         }
1854
1855         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1856                 RETURN(-EINVAL);
1857
1858         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1859
1860         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1861         if (rc)
1862                 RETURN(rc);
1863
1864         fd->fd_gid = 0;
1865         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1866
1867         RETURN(0);
1868 }
1869
1870 static int join_sanity_check(struct inode *head, struct inode *tail)
1871 {
1872         ENTRY;
1873         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1874                 CERROR("server do not support join \n");
1875                 RETURN(-EINVAL);
1876         }
1877         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1878                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1879                        head->i_ino, tail->i_ino);
1880                 RETURN(-EINVAL);
1881         }
1882         if (head->i_ino == tail->i_ino) {
1883                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1884                 RETURN(-EINVAL);
1885         }
1886         if (i_size_read(head) % JOIN_FILE_ALIGN) {
1887                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
1888                 RETURN(-EINVAL);
1889         }
1890         RETURN(0);
1891 }
1892
1893 static int join_file(struct inode *head_inode, struct file *head_filp,
1894                      struct file *tail_filp)
1895 {
1896         struct dentry *tail_dentry = tail_filp->f_dentry;
1897         struct lookup_intent oit = {.it_op = IT_OPEN,
1898                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1899         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW,
1900                 ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL };
1901
1902         struct lustre_handle lockh;
1903         struct mdc_op_data *op_data;
1904         int    rc;
1905         loff_t data;
1906         ENTRY;
1907
1908         tail_dentry = tail_filp->f_dentry;
1909
1910         OBD_ALLOC_PTR(op_data);
1911         if (op_data == NULL) {
1912                 RETURN(-ENOMEM);
1913         }
1914
1915         data = i_size_read(head_inode);
1916         ll_prepare_mdc_op_data(op_data, head_inode,
1917                                tail_dentry->d_parent->d_inode,
1918                                tail_dentry->d_name.name,
1919                                tail_dentry->d_name.len, 0, &data);
1920         rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit,
1921                          op_data, &lockh, NULL, 0, 0);
1922
1923         if (rc < 0)
1924                 GOTO(out, rc);
1925
1926         rc = oit.d.lustre.it_status;
1927
1928         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
1929                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
1930                 ptlrpc_req_finished((struct ptlrpc_request *)
1931                                     oit.d.lustre.it_data);
1932                 GOTO(out, rc);
1933         }
1934
1935         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
1936                                            * away */
1937                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
1938                 oit.d.lustre.it_lock_mode = 0;
1939         }
1940         ll_release_openhandle(head_filp->f_dentry, &oit);
1941 out:
1942         if (op_data)
1943                 OBD_FREE_PTR(op_data);
1944         ll_intent_release(&oit);
1945         RETURN(rc);
1946 }
1947
1948 static int ll_file_join(struct inode *head, struct file *filp,
1949                         char *filename_tail)
1950 {
1951         struct inode *tail = NULL, *first = NULL, *second = NULL;
1952         struct dentry *tail_dentry;
1953         struct file *tail_filp, *first_filp, *second_filp;
1954         struct ll_lock_tree first_tree, second_tree;
1955         struct ll_lock_tree_node *first_node, *second_node;
1956         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
1957         int rc = 0, cleanup_phase = 0;
1958         ENTRY;
1959
1960         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
1961                head->i_ino, head->i_generation, head, filename_tail);
1962
1963         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
1964         if (IS_ERR(tail_filp)) {
1965                 CERROR("Can not open tail file %s", filename_tail);
1966                 rc = PTR_ERR(tail_filp);
1967                 GOTO(cleanup, rc);
1968         }
1969         tail = igrab(tail_filp->f_dentry->d_inode);
1970
1971         tlli = ll_i2info(tail);
1972         tail_dentry = tail_filp->f_dentry;
1973         LASSERT(tail_dentry);
1974         cleanup_phase = 1;
1975
1976         /*reorder the inode for lock sequence*/
1977         first = head->i_ino > tail->i_ino ? head : tail;
1978         second = head->i_ino > tail->i_ino ? tail : head;
1979         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
1980         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
1981
1982         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
1983                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
1984         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
1985         if (IS_ERR(first_node)){
1986                 rc = PTR_ERR(first_node);
1987                 GOTO(cleanup, rc);
1988         }
1989         first_tree.lt_fd = first_filp->private_data;
1990         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
1991         if (rc != 0)
1992                 GOTO(cleanup, rc);
1993         cleanup_phase = 2;
1994
1995         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
1996         if (IS_ERR(second_node)){
1997                 rc = PTR_ERR(second_node);
1998                 GOTO(cleanup, rc);
1999         }
2000         second_tree.lt_fd = second_filp->private_data;
2001         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2002         if (rc != 0)
2003                 GOTO(cleanup, rc);
2004         cleanup_phase = 3;
2005
2006         rc = join_sanity_check(head, tail);
2007         if (rc)
2008                 GOTO(cleanup, rc);
2009
2010         rc = join_file(head, filp, tail_filp);
2011         if (rc)
2012                 GOTO(cleanup, rc);
2013 cleanup:
2014         switch (cleanup_phase) {
2015         case 3:
2016                 ll_tree_unlock(&second_tree);
2017                 obd_cancel_unused(ll_i2obdexp(second),
2018                                   ll_i2info(second)->lli_smd, 0, NULL);
2019         case 2:
2020                 ll_tree_unlock(&first_tree);
2021                 obd_cancel_unused(ll_i2obdexp(first),
2022                                   ll_i2info(first)->lli_smd, 0, NULL);
2023         case 1:
2024                 filp_close(tail_filp, 0);
2025                 if (tail)
2026                         iput(tail);
2027                 if (head && rc == 0) {
2028                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2029                                        &hlli->lli_smd);
2030                         hlli->lli_smd = NULL;
2031                 }
2032         case 0:
2033                 break;
2034         default:
2035                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2036                 LBUG();
2037         }
2038         RETURN(rc);
2039 }
2040
2041 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2042 {
2043         struct inode *inode = dentry->d_inode;
2044         struct obd_client_handle *och;
2045         int rc;
2046         ENTRY;
2047
2048         LASSERT(inode);
2049
2050         /* Root ? Do nothing. */
2051         if (dentry->d_inode->i_sb->s_root == dentry)
2052                 RETURN(0);
2053
2054         /* No open handle to close? Move away */
2055         if (!it_disposition(it, DISP_OPEN_OPEN))
2056                 RETURN(0);
2057
2058         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2059
2060         OBD_ALLOC(och, sizeof(*och));
2061         if (!och)
2062                 GOTO(out, rc = -ENOMEM);
2063
2064         ll_och_fill(ll_i2info(inode), it, och);
2065
2066         rc = ll_close_inode_openhandle(inode, och);
2067
2068         OBD_FREE(och, sizeof(*och));
2069  out:
2070         /* this one is in place of ll_file_open */
2071         ptlrpc_req_finished(it->d.lustre.it_data);
2072         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2073         RETURN(rc);
2074 }
2075
2076 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2077                   unsigned long arg)
2078 {
2079         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2080         int flags;
2081         ENTRY;
2082
2083         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2084                inode->i_generation, inode, cmd);
2085         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2086
2087         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2088         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2089                 RETURN(-ENOTTY);
2090
2091         switch(cmd) {
2092         case LL_IOC_GETFLAGS:
2093                 /* Get the current value of the file flags */
2094                 return put_user(fd->fd_flags, (int *)arg);
2095         case LL_IOC_SETFLAGS:
2096         case LL_IOC_CLRFLAGS:
2097                 /* Set or clear specific file flags */
2098                 /* XXX This probably needs checks to ensure the flags are
2099                  *     not abused, and to handle any flag side effects.
2100                  */
2101                 if (get_user(flags, (int *) arg))
2102                         RETURN(-EFAULT);
2103
2104                 if (cmd == LL_IOC_SETFLAGS) {
2105                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2106                             !(file->f_flags & O_DIRECT)) {
2107                                 CERROR("%s: unable to disable locking on "
2108                                        "non-O_DIRECT file\n", current->comm);
2109                                 RETURN(-EINVAL);
2110                         }
2111
2112                         fd->fd_flags |= flags;
2113                 } else {
2114                         fd->fd_flags &= ~flags;
2115                 }
2116                 RETURN(0);
2117         case LL_IOC_LOV_SETSTRIPE:
2118                 RETURN(ll_lov_setstripe(inode, file, arg));
2119         case LL_IOC_LOV_SETEA:
2120                 RETURN(ll_lov_setea(inode, file, arg));
2121         case LL_IOC_LOV_GETSTRIPE:
2122                 RETURN(ll_lov_getstripe(inode, arg));
2123         case LL_IOC_RECREATE_OBJ:
2124                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2125         case EXT3_IOC_GETFLAGS:
2126         case EXT3_IOC_SETFLAGS:
2127                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2128         case EXT3_IOC_GETVERSION_OLD:
2129         case EXT3_IOC_GETVERSION:
2130                 RETURN(put_user(inode->i_generation, (int *)arg));
2131         case LL_IOC_JOIN: {
2132                 char *ftail;
2133                 int rc;
2134
2135                 ftail = getname((const char *)arg);
2136                 if (IS_ERR(ftail))
2137                         RETURN(PTR_ERR(ftail));
2138                 rc = ll_file_join(inode, file, ftail);
2139                 putname(ftail);
2140                 RETURN(rc);
2141         }
2142         case LL_IOC_GROUP_LOCK:
2143                 RETURN(ll_get_grouplock(inode, file, arg));
2144         case LL_IOC_GROUP_UNLOCK:
2145                 RETURN(ll_put_grouplock(inode, file, arg));
2146         case IOC_OBD_STATFS:
2147                 RETURN(ll_obd_statfs(inode, (void *)arg));
2148         case OBD_IOC_GETNAME_OLD:
2149         case OBD_IOC_GETNAME: {
2150                 struct obd_device *obd =
2151                         class_exp2obd(ll_i2sbi(inode)->ll_osc_exp);
2152                 if (!obd)
2153                         RETURN(-EFAULT);
2154                 if (copy_to_user((void *)arg, obd->obd_name,
2155                                 strlen(obd->obd_name) + 1))
2156                         RETURN (-EFAULT);
2157                 RETURN(0);
2158         }
2159
2160         /* We need to special case any other ioctls we want to handle,
2161          * to send them to the MDS/OST as appropriate and to properly
2162          * network encode the arg field.
2163         case EXT3_IOC_SETVERSION_OLD:
2164         case EXT3_IOC_SETVERSION:
2165         */
2166         default: {
2167                 int err;
2168
2169                 if (LLIOC_STOP == 
2170                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2171                         RETURN(err);
2172
2173                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2174                                      (void *)arg));
2175         }
2176         }
2177 }
2178
2179 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2180 {
2181         struct inode *inode = file->f_dentry->d_inode;
2182         struct ll_inode_info *lli = ll_i2info(inode);
2183         struct lov_stripe_md *lsm = lli->lli_smd;
2184         loff_t retval;
2185         ENTRY;
2186         retval = offset + ((origin == 2) ? i_size_read(inode) :
2187                            (origin == 1) ? file->f_pos : 0);
2188         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2189                inode->i_ino, inode->i_generation, inode, retval, retval,
2190                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2191         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2192
2193         if (origin == 2) { /* SEEK_END */
2194                 int nonblock = 0, rc;
2195
2196                 if (file->f_flags & O_NONBLOCK)
2197                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2198
2199                 if (lsm != NULL) {
2200                         rc = ll_glimpse_size(inode, nonblock);
2201                         if (rc != 0)
2202                                 RETURN(rc);
2203                 }
2204
2205                 ll_inode_size_lock(inode, 0);
2206                 offset += i_size_read(inode);
2207                 ll_inode_size_unlock(inode, 0);
2208         } else if (origin == 1) { /* SEEK_CUR */
2209                 offset += file->f_pos;
2210         }
2211
2212         retval = -EINVAL;
2213         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2214                 if (offset != file->f_pos) {
2215                         file->f_pos = offset;
2216 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2217                         file->f_reada = 0;
2218                         file->f_version = ++event;
2219 #else
2220                         file->f_version = 0;
2221 #endif
2222                 }
2223                 retval = offset;
2224         }
2225
2226         RETURN(retval);
2227 }
2228
2229 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2230 {
2231         struct inode *inode = dentry->d_inode;
2232         struct ll_inode_info *lli = ll_i2info(inode);
2233         struct lov_stripe_md *lsm = lli->lli_smd;
2234         struct ll_fid fid;
2235         struct ptlrpc_request *req;
2236         int rc, err;
2237         ENTRY;
2238         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2239                inode->i_generation, inode);
2240         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2241
2242         /* fsync's caller has already called _fdata{sync,write}, we want
2243          * that IO to finish before calling the osc and mdc sync methods */
2244         rc = filemap_fdatawait(inode->i_mapping);
2245
2246         /* catch async errors that were recorded back when async writeback
2247          * failed for pages in this mapping. */
2248         err = lli->lli_async_rc;
2249         lli->lli_async_rc = 0;
2250         if (rc == 0)
2251                 rc = err;
2252         if (lsm) {
2253                 err = lov_test_and_clear_async_rc(lsm);
2254                 if (rc == 0)
2255                         rc = err;
2256         }
2257
2258         ll_inode2fid(&fid, inode);
2259         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2260         if (!rc)
2261                 rc = err;
2262         if (!err)
2263                 ptlrpc_req_finished(req);
2264
2265         if (data && lsm) {
2266                 struct obdo *oa;
2267
2268                 OBDO_ALLOC(oa);
2269                 if (!oa)
2270                         RETURN(rc ? rc : -ENOMEM);
2271
2272                 oa->o_id = lsm->lsm_object_id;
2273                 oa->o_valid = OBD_MD_FLID;
2274                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2275                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2276
2277                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
2278                                0, OBD_OBJECT_EOF);
2279                 if (!rc)
2280                         rc = err;
2281                 OBDO_FREE(oa);
2282         }
2283
2284         RETURN(rc);
2285 }
2286
2287 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2288 {
2289         struct inode *inode = file->f_dentry->d_inode;
2290         struct ll_sb_info *sbi = ll_i2sbi(inode);
2291         struct ldlm_res_id res_id =
2292                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
2293         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2294                 ldlm_flock_completion_ast, NULL, file_lock };
2295         struct lustre_handle lockh = {0};
2296         ldlm_policy_data_t flock;
2297         int flags = 0;
2298         int rc;
2299         ENTRY;
2300
2301         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2302                inode->i_ino, file_lock);
2303         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2304
2305         if (file_lock->fl_flags & FL_FLOCK) {
2306                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2307                 /* set missing params for flock() calls */
2308                 file_lock->fl_end = OFFSET_MAX;
2309                 file_lock->fl_pid = current->tgid;
2310         }
2311         flock.l_flock.pid = file_lock->fl_pid;
2312         flock.l_flock.start = file_lock->fl_start;
2313         flock.l_flock.end = file_lock->fl_end;
2314
2315         switch (file_lock->fl_type) {
2316         case F_RDLCK:
2317                 einfo.ei_mode = LCK_PR;
2318                 break;
2319         case F_UNLCK:
2320                 /* An unlock request may or may not have any relation to
2321                  * existing locks so we may not be able to pass a lock handle
2322                  * via a normal ldlm_lock_cancel() request. The request may even
2323                  * unlock a byte range in the middle of an existing lock. In
2324                  * order to process an unlock request we need all of the same
2325                  * information that is given with a normal read or write record
2326                  * lock request. To avoid creating another ldlm unlock (cancel)
2327                  * message we'll treat a LCK_NL flock request as an unlock. */
2328                 einfo.ei_mode = LCK_NL;
2329                 break;
2330         case F_WRLCK:
2331                 einfo.ei_mode = LCK_PW;
2332                 break;
2333         default:
2334                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2335                 LBUG();
2336         }
2337
2338         switch (cmd) {
2339         case F_SETLKW:
2340 #ifdef F_SETLKW64
2341         case F_SETLKW64:
2342 #endif
2343                 flags = 0;
2344                 break;
2345         case F_SETLK:
2346 #ifdef F_SETLK64
2347         case F_SETLK64:
2348 #endif
2349                 flags = LDLM_FL_BLOCK_NOWAIT;
2350                 break;
2351         case F_GETLK:
2352 #ifdef F_GETLK64
2353         case F_GETLK64:
2354 #endif
2355                 flags = LDLM_FL_TEST_LOCK;
2356                 /* Save the old mode so that if the mode in the lock changes we
2357                  * can decrement the appropriate reader or writer refcount. */
2358                 file_lock->fl_type = einfo.ei_mode;
2359                 break;
2360         default:
2361                 CERROR("unknown fcntl lock command: %d\n", cmd);
2362                 LBUG();
2363         }
2364
2365         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2366                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2367                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2368
2369         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id,
2370                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2371         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2372                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2373 #ifdef HAVE_F_OP_FLOCK
2374         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2375             !(flags & LDLM_FL_TEST_LOCK))
2376                 posix_lock_file_wait(file, file_lock);
2377 #endif
2378
2379         RETURN(rc);
2380 }
2381
2382 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2383 {
2384         ENTRY;
2385
2386         RETURN(-ENOSYS);
2387 }
2388
2389 int ll_have_md_lock(struct inode *inode, __u64 bits)
2390 {
2391         struct lustre_handle lockh;
2392         struct ldlm_res_id res_id = { .name = {0} };
2393         struct obd_device *obddev;
2394         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2395         int flags;
2396         ENTRY;
2397
2398         if (!inode)
2399                RETURN(0);
2400
2401         obddev = ll_i2mdcexp(inode)->exp_obd;
2402         res_id.name[0] = inode->i_ino;
2403         res_id.name[1] = inode->i_generation;
2404
2405         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
2406
2407         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2408         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2409                             &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
2410                 RETURN(1);
2411         }
2412
2413         RETURN(0);
2414 }
2415
2416 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2417         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2418                               * and return success */
2419                 inode->i_nlink = 0;
2420                 /* This path cannot be hit for regular files unless in
2421                  * case of obscure races, so no need to to validate
2422                  * size. */
2423                 if (!S_ISREG(inode->i_mode) &&
2424                     !S_ISDIR(inode->i_mode))
2425                         return 0;
2426         }
2427
2428         if (rc) {
2429                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2430                 return -abs(rc);
2431
2432         }
2433
2434         return 0;
2435 }
2436
2437 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2438 {
2439         struct inode *inode = dentry->d_inode;
2440         struct ptlrpc_request *req = NULL;
2441         struct obd_export *exp;
2442         int rc;
2443         ENTRY;
2444
2445         if (!inode) {
2446                 CERROR("REPORT THIS LINE TO PETER\n");
2447                 RETURN(0);
2448         }
2449         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2450                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2451 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2452         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2453 #endif
2454
2455         exp = ll_i2mdcexp(inode);
2456
2457         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2458                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2459                 struct mdc_op_data op_data;
2460
2461                 /* Call getattr by fid, so do not provide name at all. */
2462                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
2463                                        dentry->d_inode, NULL, 0, 0, NULL);
2464                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
2465                                      /* we are not interested in name
2466                                         based lookup */
2467                                      &oit, 0, &req,
2468                                      ll_mdc_blocking_ast, 0);
2469                 if (rc < 0) {
2470                         rc = ll_inode_revalidate_fini(inode, rc);
2471                         GOTO (out, rc);
2472                 }
2473                 
2474                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2475                 if (rc != 0) {
2476                         ll_intent_release(&oit);
2477                         GOTO(out, rc);
2478                 }
2479
2480                 /* Unlinked? Unhash dentry, so it is not picked up later by
2481                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2482                    here to preserve get_cwd functionality on 2.6.
2483                    Bug 10503 */
2484                 if (!dentry->d_inode->i_nlink) {
2485                         spin_lock(&dcache_lock);
2486                         ll_drop_dentry(dentry);
2487                         spin_unlock(&dcache_lock);
2488                 }
2489
2490                 ll_lookup_finish_locks(&oit, dentry);
2491         } else if (!ll_have_md_lock(dentry->d_inode,
2492                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
2493                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2494                 struct ll_fid fid;
2495                 obd_valid valid = OBD_MD_FLGETATTR;
2496                 int ealen = 0;
2497
2498                 if (S_ISREG(inode->i_mode)) {
2499                         rc = ll_get_max_mdsize(sbi, &ealen);
2500                         if (rc) 
2501                                 RETURN(rc); 
2502                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2503                 }
2504                 ll_inode2fid(&fid, inode);
2505                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
2506                 if (rc) {
2507                         rc = ll_inode_revalidate_fini(inode, rc);
2508                         RETURN(rc);
2509                 }
2510
2511                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
2512                                    NULL);
2513                 if (rc)
2514                         GOTO(out, rc);
2515         }
2516
2517         /* if object not yet allocated, don't validate size */
2518         if (ll_i2info(inode)->lli_smd == NULL) 
2519                 GOTO(out, rc = 0);
2520
2521         /* ll_glimpse_size will prefer locally cached writes if they extend
2522          * the file */
2523         rc = ll_glimpse_size(inode, 0);
2524
2525 out:
2526         ptlrpc_req_finished(req);
2527         RETURN(rc);
2528 }
2529
2530 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2531 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2532                   struct lookup_intent *it, struct kstat *stat)
2533 {
2534         struct inode *inode = de->d_inode;
2535         int res = 0;
2536
2537         res = ll_inode_revalidate_it(de, it);
2538         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2539
2540         if (res)
2541                 return res;
2542
2543         stat->dev = inode->i_sb->s_dev;
2544         stat->ino = inode->i_ino;
2545         stat->mode = inode->i_mode;
2546         stat->nlink = inode->i_nlink;
2547         stat->uid = inode->i_uid;
2548         stat->gid = inode->i_gid;
2549         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2550         stat->atime = inode->i_atime;
2551         stat->mtime = inode->i_mtime;
2552         stat->ctime = inode->i_ctime;
2553 #ifdef HAVE_INODE_BLKSIZE
2554         stat->blksize = inode->i_blksize;
2555 #else
2556         stat->blksize = 1<<inode->i_blkbits;
2557 #endif
2558
2559         ll_inode_size_lock(inode, 0);
2560         stat->size = i_size_read(inode);
2561         stat->blocks = inode->i_blocks;
2562         ll_inode_size_unlock(inode, 0);
2563
2564         return 0;
2565 }
2566 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2567 {
2568         struct lookup_intent it = { .it_op = IT_GETATTR };
2569
2570         return ll_getattr_it(mnt, de, &it, stat);
2571 }
2572 #endif
2573
2574 static
2575 int lustre_check_acl(struct inode *inode, int mask)
2576 {
2577 #ifdef CONFIG_FS_POSIX_ACL
2578         struct ll_inode_info *lli = ll_i2info(inode);
2579         struct posix_acl *acl;
2580         int rc;
2581         ENTRY;
2582
2583         spin_lock(&lli->lli_lock);
2584         acl = posix_acl_dup(lli->lli_posix_acl);
2585         spin_unlock(&lli->lli_lock);
2586
2587         if (!acl)
2588                 RETURN(-EAGAIN);
2589
2590         rc = posix_acl_permission(inode, acl, mask);
2591         posix_acl_release(acl);
2592
2593         RETURN(rc);
2594 #else
2595         return -EAGAIN;
2596 #endif
2597 }
2598
2599 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2600 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2601 {
2602         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2603                inode->i_ino, inode->i_generation, inode, mask);
2604
2605         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2606         return generic_permission(inode, mask, lustre_check_acl);
2607 }
2608 #else
2609 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2610 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2611 #else
2612 int ll_inode_permission(struct inode *inode, int mask)
2613 #endif
2614 {
2615         int mode = inode->i_mode;
2616         int rc;
2617
2618         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2619                inode->i_ino, inode->i_generation, inode, mask);
2620         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2621
2622         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2623             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2624                 return -EROFS;
2625         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2626                 return -EACCES;
2627         if (current->fsuid == inode->i_uid) {
2628                 mode >>= 6;
2629         } else if (1) {
2630                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2631                         goto check_groups;
2632                 rc = lustre_check_acl(inode, mask);
2633                 if (rc == -EAGAIN)
2634                         goto check_groups;
2635                 if (rc == -EACCES)
2636                         goto check_capabilities;
2637                 return rc;
2638         } else {
2639 check_groups:
2640                 if (in_group_p(inode->i_gid))
2641                         mode >>= 3;
2642         }
2643         if ((mode & mask & S_IRWXO) == mask)
2644                 return 0;
2645
2646 check_capabilities:
2647         if (!(mask & MAY_EXEC) ||
2648             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2649                 if (capable(CAP_DAC_OVERRIDE))
2650                         return 0;
2651
2652         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2653             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2654                 return 0;
2655
2656         return -EACCES;
2657 }
2658 #endif
2659
2660 /* -o localflock - only provides locally consistent flock locks */
2661 struct file_operations ll_file_operations = {
2662         .read           = ll_file_read,
2663         .write          = ll_file_write,
2664         .ioctl          = ll_file_ioctl,
2665         .open           = ll_file_open,
2666         .release        = ll_file_release,
2667         .mmap           = ll_file_mmap,
2668         .llseek         = ll_file_seek,
2669 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2670         .sendfile       = ll_file_sendfile,
2671 #endif
2672         .fsync          = ll_fsync,
2673 };
2674
2675 struct file_operations ll_file_operations_flock = {
2676         .read           = ll_file_read,
2677         .write          = ll_file_write,
2678         .ioctl          = ll_file_ioctl,
2679         .open           = ll_file_open,
2680         .release        = ll_file_release,
2681         .mmap           = ll_file_mmap,
2682         .llseek         = ll_file_seek,
2683 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2684         .sendfile       = ll_file_sendfile,
2685 #endif
2686         .fsync          = ll_fsync,
2687 #ifdef HAVE_F_OP_FLOCK
2688         .flock          = ll_file_flock,
2689 #endif
2690         .lock           = ll_file_flock
2691 };
2692
2693 /* These are for -o noflock - to return ENOSYS on flock calls */
2694 struct file_operations ll_file_operations_noflock = {
2695         .read           = ll_file_read,
2696         .write          = ll_file_write,
2697         .ioctl          = ll_file_ioctl,
2698         .open           = ll_file_open,
2699         .release        = ll_file_release,
2700         .mmap           = ll_file_mmap,
2701         .llseek         = ll_file_seek,
2702 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2703         .sendfile       = ll_file_sendfile,
2704 #endif
2705         .fsync          = ll_fsync,
2706 #ifdef HAVE_F_OP_FLOCK
2707         .flock          = ll_file_noflock,
2708 #endif
2709         .lock           = ll_file_noflock
2710 };
2711
2712 struct inode_operations ll_file_inode_operations = {
2713 #ifdef HAVE_VFS_INTENT_PATCHES
2714         .setattr_raw    = ll_setattr_raw,
2715 #endif
2716         .setattr        = ll_setattr,
2717         .truncate       = ll_truncate,
2718 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2719         .getattr        = ll_getattr,
2720 #else
2721         .revalidate_it  = ll_inode_revalidate_it,
2722 #endif
2723         .permission     = ll_inode_permission,
2724         .setxattr       = ll_setxattr,
2725         .getxattr       = ll_getxattr,
2726         .listxattr      = ll_listxattr,
2727         .removexattr    = ll_removexattr,
2728 };
2729
2730 /* dynamic ioctl number support routins */
2731 static struct llioc_ctl_data {
2732         struct rw_semaphore ioc_sem;
2733         struct list_head    ioc_head;
2734 } llioc = { 
2735         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2736         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2737 };
2738
2739
2740 struct llioc_data {
2741         struct list_head        iocd_list;
2742         unsigned int            iocd_size;
2743         llioc_callback_t        iocd_cb;
2744         unsigned int            iocd_count;
2745         unsigned int            iocd_cmd[0];
2746 };
2747
2748 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2749 {
2750         unsigned int size;
2751         struct llioc_data *in_data = NULL;
2752         ENTRY;
2753
2754         if (cb == NULL || cmd == NULL ||
2755             count > LLIOC_MAX_CMD || count < 0)
2756                 RETURN(NULL);
2757
2758         size = sizeof(*in_data) + count * sizeof(unsigned int);
2759         OBD_ALLOC(in_data, size);
2760         if (in_data == NULL)
2761                 RETURN(NULL);
2762
2763         memset(in_data, 0, sizeof(*in_data));
2764         in_data->iocd_size = size;
2765         in_data->iocd_cb = cb;
2766         in_data->iocd_count = count;
2767         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2768
2769         down_write(&llioc.ioc_sem);
2770         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2771         up_write(&llioc.ioc_sem);
2772
2773         RETURN(in_data);
2774 }
2775
2776 void ll_iocontrol_unregister(void *magic)
2777 {
2778         struct llioc_data *tmp;
2779
2780         if (magic == NULL)
2781                 return;
2782
2783         down_write(&llioc.ioc_sem);
2784         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2785                 if (tmp == magic) {
2786                         unsigned int size = tmp->iocd_size;
2787
2788                         list_del(&tmp->iocd_list);
2789                         up_write(&llioc.ioc_sem);
2790
2791                         OBD_FREE(tmp, size);
2792                         return;
2793                 }
2794         }
2795         up_write(&llioc.ioc_sem);
2796
2797         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2798 }
2799
2800 EXPORT_SYMBOL(ll_iocontrol_register);
2801 EXPORT_SYMBOL(ll_iocontrol_unregister);
2802
2803 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2804                         unsigned int cmd, unsigned long arg, int *rcp)
2805 {
2806         enum llioc_iter ret = LLIOC_CONT;
2807         struct llioc_data *data;
2808         int rc = -EINVAL, i;
2809
2810         down_read(&llioc.ioc_sem);
2811         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2812                 for (i = 0; i < data->iocd_count; i++) {
2813                         if (cmd != data->iocd_cmd[i]) 
2814                                 continue;
2815
2816                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2817                         break;
2818                 }
2819
2820                 if (ret == LLIOC_STOP)
2821                         break;
2822         }
2823         up_read(&llioc.ioc_sem);
2824
2825         if (rcp)
2826                 *rcp = rc;
2827         return ret;
2828 }