Whamcloud - gitweb
recommit the patch for bug 5498.
[fs/lustre-release.git] / lustre / llite / file.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Andreas Dilger <adilger@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LLITE
26 #include <lustre_dlm.h>
27 #include <lustre_lite.h>
28 #include <linux/pagemap.h>
29 #include <linux/file.h>
30 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
31 #include <linux/lustre_compat25.h>
32 #endif
33 #include "llite_internal.h"
34
35 /* also used by llite/special.c:ll_special_open() */
36 struct ll_file_data *ll_file_data_get(void)
37 {
38         struct ll_file_data *fd;
39
40         OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
41         return fd;
42 }
43
44 static void ll_file_data_put(struct ll_file_data *fd)
45 {
46         if (fd != NULL)
47                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
48 }
49
50 static int ll_close_inode_openhandle(struct inode *inode,
51                                      struct obd_client_handle *och)
52 {
53         struct ptlrpc_request *req = NULL;
54         struct obd_device *obd;
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         obd = class_exp2obd(ll_i2mdcexp(inode));
60         if (obd == NULL) {
61                 CERROR("Invalid MDC connection handle "LPX64"\n",
62                        ll_i2mdcexp(inode)->exp_handle.h_cookie);
63                 GOTO(out, rc = 0);
64         }
65
66         /*
67          * here we check if this is forced umount. If so this is called on
68          * canceling "open lock" and we do not call mdc_close() in this case, as
69          * it will not be successful, as import is already deactivated.
70          */
71         if (obd->obd_force)
72                 GOTO(out, rc = 0);
73
74         OBDO_ALLOC(oa);
75         if (!oa)
76                 RETURN(-ENOMEM); // XXX We leak openhandle and request here.
77
78         oa->o_id = inode->i_ino;
79         oa->o_valid = OBD_MD_FLID;
80         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
81                                    OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
82                                    OBD_MD_FLATIME | OBD_MD_FLMTIME |
83                                    OBD_MD_FLCTIME);
84         if (ll_is_inode_dirty(inode)) {
85                 oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
86                 oa->o_valid |= OBD_MD_FLFLAGS;
87         }
88
89         rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
90         if (rc == EAGAIN) {
91                 /* We are the last writer, so the MDS has instructed us to get
92                  * the file size and any write cookies, then close again. */
93                 ll_queue_done_writing(inode);
94                 rc = 0;
95         } else if (rc) {
96                 CERROR("inode %lu mdc close failed: rc = %d\n",
97                        inode->i_ino, rc);
98         }
99
100         OBDO_FREE(oa);
101
102         if (rc == 0) {
103                 rc = ll_objects_destroy(req, inode);
104                 if (rc)
105                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
106                                inode->i_ino, rc);
107         }
108
109         ptlrpc_req_finished(req); /* This is close request */
110         EXIT;
111 out:
112         mdc_clear_open_replay_data(och);
113
114         return rc;
115 }
116
117 int ll_mdc_real_close(struct inode *inode, int flags)
118 {
119         struct ll_inode_info *lli = ll_i2info(inode);
120         int rc = 0;
121         struct obd_client_handle **och_p;
122         struct obd_client_handle *och;
123         __u64 *och_usecount;
124
125         ENTRY;
126
127         if (flags & FMODE_WRITE) {
128                 och_p = &lli->lli_mds_write_och;
129                 och_usecount = &lli->lli_open_fd_write_count;
130         } else if (flags & FMODE_EXEC) {
131                 och_p = &lli->lli_mds_exec_och;
132                 och_usecount = &lli->lli_open_fd_exec_count;
133          } else {
134                 LASSERT(flags & FMODE_READ);
135                 och_p = &lli->lli_mds_read_och;
136                 och_usecount = &lli->lli_open_fd_read_count;
137         }
138
139         down(&lli->lli_och_sem);
140         if (*och_usecount) { /* There are still users of this handle, so
141                                 skip freeing it. */
142                 up(&lli->lli_och_sem);
143                 RETURN(0);
144         }
145         och=*och_p;
146         *och_p = NULL;
147         up(&lli->lli_och_sem);
148
149         if (och) { /* There might be a race and somebody have freed this och
150                       already */
151                 rc = ll_close_inode_openhandle(inode, och);
152                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
153                 OBD_FREE(och, sizeof *och);
154         }
155
156         RETURN(rc);
157 }
158
159 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
160                         struct file *file)
161 {
162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
163         struct ll_inode_info *lli = ll_i2info(inode);
164         int rc = 0;
165         ENTRY;
166
167         /* clear group lock, if present */
168         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
169                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
170                 fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
171                 rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
172                                       &fd->fd_cwlockh);
173         }
174
175         /* Let's see if we have good enough OPEN lock on the file and if
176            we can skip talking to MDS */
177         if (file->f_dentry->d_inode) { /* Can this ever be false? */
178                 int lockmode;
179                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
180                 struct lustre_handle lockh;
181                 struct inode *inode = file->f_dentry->d_inode;
182                 struct ldlm_res_id file_res_id = {.name={inode->i_ino,
183                                                          inode->i_generation}};
184                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
185
186                 down(&lli->lli_och_sem);
187                 if (fd->fd_omode & FMODE_WRITE) {
188                         lockmode = LCK_CW;
189                         LASSERT(lli->lli_open_fd_write_count);
190                         lli->lli_open_fd_write_count--;
191                 } else if (fd->fd_omode & FMODE_EXEC) {
192                         lockmode = LCK_PR;
193                         LASSERT(lli->lli_open_fd_exec_count);
194                         lli->lli_open_fd_exec_count--;
195                 } else {
196                         lockmode = LCK_CR;
197                         LASSERT(lli->lli_open_fd_read_count);
198                         lli->lli_open_fd_read_count--;
199                 }
200                 up(&lli->lli_och_sem);
201
202                 if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
203                                      &file_res_id, LDLM_IBITS, &policy,lockmode,
204                                      &lockh)) {
205                         rc = ll_mdc_real_close(file->f_dentry->d_inode,
206                                                 fd->fd_omode);
207                 }
208         } else {
209                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
210                        file, file->f_dentry, file->f_dentry->d_name.name);
211         }
212
213         LUSTRE_FPRIVATE(file) = NULL;
214         ll_file_data_put(fd);
215
216         RETURN(rc);
217 }
218
219 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
220
221 /* While this returns an error code, fput() the caller does not, so we need
222  * to make every effort to clean up all of our state here.  Also, applications
223  * rarely check close errors and even if an error is returned they will not
224  * re-try the close call.
225  */
226 int ll_file_release(struct inode *inode, struct file *file)
227 {
228         struct ll_file_data *fd;
229         struct ll_sb_info *sbi = ll_i2sbi(inode);
230         struct ll_inode_info *lli = ll_i2info(inode);
231         struct lov_stripe_md *lsm = lli->lli_smd;
232         int rc;
233
234         ENTRY;
235         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
236                inode->i_generation, inode);
237
238         if (S_ISDIR(inode->i_mode))
239                 ll_stop_statahead(inode);
240
241         /* don't do anything for / */
242         if (inode->i_sb->s_root == file->f_dentry)
243                 RETURN(0);
244
245         ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
246         fd = LUSTRE_FPRIVATE(file);
247         LASSERT(fd != NULL);
248
249         if (lsm)
250                 lov_test_and_clear_async_rc(lsm);
251         lli->lli_async_rc = 0;
252
253         rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
254         RETURN(rc);
255 }
256
257 static int ll_intent_file_open(struct file *file, void *lmm,
258                                int lmmsize, struct lookup_intent *itp)
259 {
260         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
261         struct mdc_op_data data;
262         struct dentry *parent = file->f_dentry->d_parent;
263         const char *name = file->f_dentry->d_name.name;
264         const int len = file->f_dentry->d_name.len;
265         struct inode *inode = file->f_dentry->d_inode;
266         struct ptlrpc_request *req;
267         int rc;
268         ENTRY;
269
270         if (!parent)
271                 RETURN(-ENOENT);
272
273         ll_prepare_mdc_op_data(&data, parent->d_inode, inode,
274                                name, len, O_RDWR, NULL);
275
276         /* Usually we come here only for NFSD, and we want open lock.
277            But we can also get here with pre 2.6.15 patchless kernels, and in
278            that case that lock is also ok */
279         /* We can also get here if there was cached open handle in revalidate_it
280          * but it disappeared while we were getting from there to ll_file_open.
281          * But this means this file was closed and immediatelly opened which
282          * makes a good candidate for using OPEN lock */
283         /* If lmmsize & lmm are not 0, we are just setting stripe info
284          * parameters. No need for the open lock */
285         if (!lmm && !lmmsize)
286                 itp->it_flags |= MDS_OPEN_LOCK;
287
288         rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
289                               0 /*unused */, &req, ll_mdc_blocking_ast, 0);
290         if (rc == -ESTALE) {
291                 /* reason for keep own exit path - don`t flood log
292                 * with messages with -ESTALE errors.
293                 */
294                 if (!it_disposition(itp, DISP_OPEN_OPEN) || 
295                      it_open_error(DISP_OPEN_OPEN, itp))
296                         GOTO(out, rc);
297                 ll_release_openhandle(file->f_dentry, itp);
298                 GOTO(out_stale, rc);
299         }
300
301         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
302                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
303                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
304                 GOTO(out, rc);
305         }
306
307         if (itp->d.lustre.it_lock_mode)
308                 mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
309                                   inode);
310
311         rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
312                            req, DLM_REPLY_REC_OFF, NULL);
313 out:
314         ptlrpc_req_finished(itp->d.lustre.it_data);
315
316 out_stale:
317         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
318         ll_intent_drop_lock(itp);
319
320         RETURN(rc);
321 }
322
323
324 static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
325                         struct obd_client_handle *och)
326 {
327         struct ptlrpc_request *req = it->d.lustre.it_data;
328         struct mds_body *body;
329
330         LASSERT(och);
331
332         body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
333         LASSERT(body != NULL);                  /* reply already checked out */
334         LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in mdc_enqueue */
335
336         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
337         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
338         lli->lli_io_epoch = body->io_epoch;
339
340         mdc_set_open_replay_data(och, it->d.lustre.it_data);
341 }
342
343 int ll_local_open(struct file *file, struct lookup_intent *it,
344                   struct ll_file_data *fd, struct obd_client_handle *och)
345 {
346         ENTRY;
347
348         LASSERT(!LUSTRE_FPRIVATE(file));
349
350         LASSERT(fd != NULL);
351
352         if (och)
353                 ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
354         LUSTRE_FPRIVATE(file) = fd;
355         ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
356         fd->fd_omode = it->it_flags;
357
358         RETURN(0);
359 }
360
361 /* Open a file, and (for the very first open) create objects on the OSTs at
362  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
363  * creation or open until ll_lov_setstripe() ioctl is called.  We grab
364  * lli_open_sem to ensure no other process will create objects, send the
365  * stripe MD to the MDS, or try to destroy the objects if that fails.
366  *
367  * If we already have the stripe MD locally then we don't request it in
368  * mdc_open(), by passing a lmm_size = 0.
369  *
370  * It is up to the application to ensure no other processes open this file
371  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
372  * used.  We might be able to avoid races of that sort by getting lli_open_sem
373  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
374  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
375  */
376 int ll_file_open(struct inode *inode, struct file *file)
377 {
378         struct ll_inode_info *lli = ll_i2info(inode);
379         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
380                                           .it_flags = file->f_flags };
381         struct lov_stripe_md *lsm;
382         struct ptlrpc_request *req = NULL;
383         struct obd_client_handle **och_p;
384         __u64 *och_usecount;
385         struct ll_file_data *fd;
386         int rc = 0;
387         ENTRY;
388
389         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
390                inode->i_generation, inode, file->f_flags);
391
392         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0)
393                 lli->lli_opendir_pid = current->pid;
394
395         /* don't do anything for / */
396         if (inode->i_sb->s_root == file->f_dentry)
397                 RETURN(0);
398
399 #ifdef HAVE_VFS_INTENT_PATCHES
400         it = file->f_it;
401 #else
402         it = file->private_data; /* XXX: compat macro */
403         file->private_data = NULL; /* prevent ll_local_open assertion */
404 #endif
405
406         fd = ll_file_data_get();
407         if (fd == NULL) {
408                 lli->lli_opendir_pid = 0;
409                 RETURN(-ENOMEM);
410         }
411         if (!it || !it->d.lustre.it_disposition) {
412                 /* Convert f_flags into access mode. We cannot use file->f_mode,
413                  * because everything but O_ACCMODE mask was stripped from it */
414                 if ((oit.it_flags + 1) & O_ACCMODE)
415                         oit.it_flags++;
416                 if (file->f_flags & O_TRUNC)
417                         oit.it_flags |= FMODE_WRITE;
418
419                 /* kernel only call f_op->open in dentry_open.  filp_open calls
420                  * dentry_open after call to open_namei that checks permissions.
421                  * Only nfsd_open call dentry_open directly without checking
422                  * permissions and because of that this code below is safe. */
423                 if (oit.it_flags & FMODE_WRITE)
424                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
425
426                 /* We do not want O_EXCL here, presumably we opened the file
427                  * already? XXX - NFS implications? */
428                 oit.it_flags &= ~O_EXCL;
429
430                 it = &oit;
431         }
432
433         /* Let's see if we have file open on MDS already. */
434         if (it->it_flags & FMODE_WRITE) {
435                 och_p = &lli->lli_mds_write_och;
436                 och_usecount = &lli->lli_open_fd_write_count;
437         } else if (it->it_flags & FMODE_EXEC) {
438                 och_p = &lli->lli_mds_exec_och;
439                 och_usecount = &lli->lli_open_fd_exec_count;
440          } else {
441                 och_p = &lli->lli_mds_read_och;
442                 och_usecount = &lli->lli_open_fd_read_count;
443         }
444
445         LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
446                  it->d.lustre.it_disposition);
447
448         down(&lli->lli_och_sem);
449         if (*och_p) { /* Open handle is present */
450                 if (it_disposition(it, DISP_OPEN_OPEN)) {
451                         /* Well, there's extra open request that we do not need,
452                            let's close it somehow. This will decref request. */
453                         rc = it_open_error(DISP_OPEN_OPEN, it);
454                         if (rc) {
455                                 ll_file_data_put(fd);
456                                 GOTO(out_och_free, rc);
457                         }       
458                         ll_release_openhandle(file->f_dentry, it);
459                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
460                                              LPROC_LL_OPEN);
461                 }
462                 (*och_usecount)++;
463
464                 rc = ll_local_open(file, it, fd, NULL);
465
466                 LASSERTF(rc == 0, "rc = %d\n", rc);
467         } else {
468                 LASSERT(*och_usecount == 0);
469                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
470                 if (!*och_p) {
471                         ll_file_data_put(fd);
472                         GOTO(out_och_free, rc = -ENOMEM);
473                 }
474                 (*och_usecount)++;
475                 if (!it->d.lustre.it_disposition) {
476                         rc = ll_intent_file_open(file, NULL, 0, it);
477                         if (rc) {
478                                 ll_file_data_put(fd);
479                                 GOTO(out_och_free, rc);
480                         }
481
482                         /* Got some error? Release the request */
483                         if (it->d.lustre.it_status < 0) {
484                                 req = it->d.lustre.it_data;
485                                 ptlrpc_req_finished(req);
486                         }
487                         mdc_set_lock_data(&it->d.lustre.it_lock_handle,
488                                           file->f_dentry->d_inode);
489                 }
490                 req = it->d.lustre.it_data;
491
492                 /* mdc_intent_lock() didn't get a request ref if there was an
493                  * open error, so don't do cleanup on the request here
494                  * (bug 3430) */
495                 /* XXX (green): Should not we bail out on any error here, not
496                  * just open error? */
497                 rc = it_open_error(DISP_OPEN_OPEN, it);
498                 if (rc) {
499                         ll_file_data_put(fd);
500                         GOTO(out_och_free, rc);
501                 }
502
503                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
504                 rc = ll_local_open(file, it, fd, *och_p);
505                 LASSERTF(rc == 0, "rc = %d\n", rc);
506         }
507         up(&lli->lli_och_sem);
508
509         /* Must do this outside lli_och_sem lock to prevent deadlock where
510            different kind of OPEN lock for this same inode gets cancelled
511            by ldlm_cancel_lru */
512         if (!S_ISREG(inode->i_mode))
513                 GOTO(out, rc);
514
515         lsm = lli->lli_smd;
516         if (lsm == NULL) {
517                 if (file->f_flags & O_LOV_DELAY_CREATE ||
518                     !(file->f_mode & FMODE_WRITE)) {
519                         CDEBUG(D_INODE, "object creation was delayed\n");
520                         GOTO(out, rc);
521                 }
522         }
523         file->f_flags &= ~O_LOV_DELAY_CREATE;
524         GOTO(out, rc);
525  out:
526         ptlrpc_req_finished(req);
527         if (req)
528                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
529         if (rc == 0) {
530                 ll_open_complete(inode);
531         } else {
532 out_och_free:
533                 if (*och_p) {
534                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
535                         *och_p = NULL; /* OBD_FREE writes some magic there */
536                         (*och_usecount)--;
537                 }
538                 up(&lli->lli_och_sem);
539                 lli->lli_opendir_pid = 0;
540         }
541         return rc;
542 }
543
544 /* Fills the obdo with the attributes for the inode defined by lsm */
545 int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
546                    struct obdo *oa)
547 {
548         struct ptlrpc_request_set *set;
549         struct obd_info oinfo = { { { 0 } } };
550         int rc;
551         ENTRY;
552
553         LASSERT(lsm != NULL);
554
555         memset(oa, 0, sizeof *oa);
556         oinfo.oi_md = lsm;
557         oinfo.oi_oa = oa;
558         oa->o_id = lsm->lsm_object_id;
559         oa->o_mode = S_IFREG;
560         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
561                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
562                 OBD_MD_FLCTIME;
563
564         set = ptlrpc_prep_set();
565         if (set == NULL) {
566                 rc = -ENOMEM;
567         } else {
568                 rc = obd_getattr_async(exp, &oinfo, set);
569                 if (rc == 0)
570                         rc = ptlrpc_set_wait(set);
571                 ptlrpc_set_destroy(set);
572         }
573         if (rc)
574                 RETURN(rc);
575
576         oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
577                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
578         RETURN(0);
579 }
580
581 static inline void ll_remove_suid(struct inode *inode)
582 {
583         unsigned int mode;
584
585         /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
586         mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
587
588         /* was any of the uid bits set? */
589         mode &= inode->i_mode;
590         if (mode && !capable(CAP_FSETID)) {
591                 inode->i_mode &= ~mode;
592                 // XXX careful here - we cannot change the size
593         }
594 }
595
596 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
597 {
598         struct ll_inode_info *lli = ll_i2info(inode);
599         struct lov_stripe_md *lsm = lli->lli_smd;
600         struct obd_export *exp = ll_i2obdexp(inode);
601         struct {
602                 char name[16];
603                 struct ldlm_lock *lock;
604                 struct lov_stripe_md *lsm;
605         } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
606         __u32 stripe, vallen = sizeof(stripe);
607         int rc;
608         ENTRY;
609
610         if (lsm->lsm_stripe_count == 1)
611                 GOTO(check, stripe = 0);
612
613         /* get our offset in the lov */
614         rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
615         if (rc != 0) {
616                 CERROR("obd_get_info: rc = %d\n", rc);
617                 RETURN(rc);
618         }
619         LASSERT(stripe < lsm->lsm_stripe_count);
620
621 check:
622         if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
623             lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[1]){
624                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
625                            lsm->lsm_oinfo[stripe]->loi_id,
626                            lsm->lsm_oinfo[stripe]->loi_gr);
627                 RETURN(-ELDLM_NO_LOCK_DATA);
628         }
629
630         RETURN(stripe);
631 }
632
633 /* Flush the page cache for an extent as its canceled.  When we're on an LOV,
634  * we get a lock cancellation for each stripe, so we have to map the obd's
635  * region back onto the stripes in the file that it held.
636  *
637  * No one can dirty the extent until we've finished our work and they can
638  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
639  * but other kernel actors could have pages locked.
640  *
641  * Called with the DLM lock held. */
642 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
643                               struct ldlm_lock *lock, __u32 stripe)
644 {
645         ldlm_policy_data_t tmpex;
646         unsigned long start, end, count, skip, i, j;
647         struct page *page;
648         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
649         struct lustre_handle lockh;
650         ENTRY;
651
652         memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
653         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
654                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
655                i_size_read(inode));
656
657         /* our locks are page granular thanks to osc_enqueue, we invalidate the
658          * whole page. */
659         if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
660             ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
661                 LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
662         LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
663         LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
664
665         count = ~0;
666         skip = 0;
667         start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
668         end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
669         if (lsm->lsm_stripe_count > 1) {
670                 count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
671                 skip = (lsm->lsm_stripe_count - 1) * count;
672                 start += start/count * skip + stripe * count;
673                 if (end != ~0)
674                         end += end/count * skip + stripe * count;
675         }
676         if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
677                 end = ~0;
678
679         i = i_size_read(inode) ? (i_size_read(inode) - 1) >> CFS_PAGE_SHIFT : 0;
680         if (i < end)
681                 end = i;
682
683         CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
684                "count: %lu skip: %lu end: %lu%s\n", start, start % count,
685                count, skip, end, discard ? " (DISCARDING)" : "");
686
687         /* walk through the vmas on the inode and tear down mmaped pages that
688          * intersect with the lock.  this stops immediately if there are no
689          * mmap()ed regions of the file.  This is not efficient at all and
690          * should be short lived. We'll associate mmap()ed pages with the lock
691          * and will be able to find them directly */
692         for (i = start; i <= end; i += (j + skip)) {
693                 j = min(count - (i % count), end - i + 1);
694                 LASSERT(j > 0);
695                 LASSERT(inode->i_mapping);
696                 if (ll_teardown_mmaps(inode->i_mapping,
697                                       (__u64)i << CFS_PAGE_SHIFT,
698                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
699                         break;
700         }
701
702         /* this is the simplistic implementation of page eviction at
703          * cancelation.  It is careful to get races with other page
704          * lockers handled correctly.  fixes from bug 20 will make it
705          * more efficient by associating locks with pages and with
706          * batching writeback under the lock explicitly. */
707         for (i = start, j = start % count; i <= end;
708              j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
709                 if (j == count) {
710                         CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
711                         i += skip;
712                         j = 0;
713                         if (i > end)
714                                 break;
715                 }
716                 LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
717                          LPU64" >= "LPU64" start %lu i %lu end %lu\n",
718                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
719                          start, i, end);
720
721                 if (!mapping_has_pages(inode->i_mapping)) {
722                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
723                         break;
724                 }
725
726                 cond_resched();
727
728                 page = find_get_page(inode->i_mapping, i);
729                 if (page == NULL)
730                         continue;
731                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
732                                i, tmpex.l_extent.start);
733                 lock_page(page);
734
735                 /* page->mapping to check with racing against teardown */
736                 if (!discard && clear_page_dirty_for_io(page)) {
737                         rc = ll_call_writepage(inode, page);
738                         if (rc != 0)
739                                 CERROR("writepage of page %p failed: %d\n",
740                                        page, rc);
741                         /* either waiting for io to complete or reacquiring
742                          * the lock that the failed writepage released */
743                         lock_page(page);
744                 }
745
746                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
747                 /* check to see if another DLM lock covers this page  b=2765 */
748                 rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
749                                       LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
750                                       LDLM_FL_TEST_LOCK,
751                                       &lock->l_resource->lr_name, LDLM_EXTENT,
752                                       &tmpex, LCK_PR | LCK_PW, &lockh);
753                 if (rc2 == 0 && page->mapping != NULL) {
754                         struct ll_async_page *llap = llap_cast_private(page);
755                         // checking again to account for writeback's lock_page()
756                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
757                         if (llap)
758                                 ll_ra_accounting(llap, inode->i_mapping);
759                         ll_truncate_complete_page(page);
760                 }
761                 unlock_page(page);
762                 page_cache_release(page);
763         }
764         LASSERTF(tmpex.l_extent.start <=
765                  (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
766                   lock->l_policy_data.l_extent.end + 1),
767                  "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
768                  tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
769                  start, i, end);
770         EXIT;
771 }
772
773 static int ll_extent_lock_callback(struct ldlm_lock *lock,
774                                    struct ldlm_lock_desc *new, void *data,
775                                    int flag)
776 {
777         struct lustre_handle lockh = { 0 };
778         int rc;
779         ENTRY;
780
781         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
782                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
783                 LBUG();
784         }
785
786         switch (flag) {
787         case LDLM_CB_BLOCKING:
788                 ldlm_lock2handle(lock, &lockh);
789                 rc = ldlm_cli_cancel(&lockh);
790                 if (rc != ELDLM_OK)
791                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
792                 break;
793         case LDLM_CB_CANCELING: {
794                 struct inode *inode;
795                 struct ll_inode_info *lli;
796                 struct lov_stripe_md *lsm;
797                 int stripe;
798                 __u64 kms;
799
800                 /* This lock wasn't granted, don't try to evict pages */
801                 if (lock->l_req_mode != lock->l_granted_mode)
802                         RETURN(0);
803
804                 inode = ll_inode_from_lock(lock);
805                 if (inode == NULL)
806                         RETURN(0);
807                 lli = ll_i2info(inode);
808                 if (lli == NULL)
809                         goto iput;
810                 if (lli->lli_smd == NULL)
811                         goto iput;
812                 lsm = lli->lli_smd;
813
814                 stripe = ll_lock_to_stripe_offset(inode, lock);
815                 if (stripe < 0)
816                         goto iput;
817
818                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
819
820                 lov_stripe_lock(lsm);
821                 lock_res_and_lock(lock);
822                 kms = ldlm_extent_shift_kms(lock,
823                                             lsm->lsm_oinfo[stripe]->loi_kms);
824
825                 if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
826                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
827                                    lsm->lsm_oinfo[stripe]->loi_kms, kms);
828                 lsm->lsm_oinfo[stripe]->loi_kms = kms;
829                 unlock_res_and_lock(lock);
830                 lov_stripe_unlock(lsm);
831                 ll_try_done_writing(inode);
832         iput:
833                 iput(inode);
834                 break;
835         }
836         default:
837                 LBUG();
838         }
839
840         RETURN(0);
841 }
842
843 #if 0
844 int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
845 {
846         /* XXX ALLOCATE - 160 bytes */
847         struct inode *inode = ll_inode_from_lock(lock);
848         struct ll_inode_info *lli = ll_i2info(inode);
849         struct lustre_handle lockh = { 0 };
850         struct ost_lvb *lvb;
851         int stripe;
852         ENTRY;
853
854         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
855                      LDLM_FL_BLOCK_CONV)) {
856                 LBUG(); /* not expecting any blocked async locks yet */
857                 LDLM_DEBUG(lock, "client-side async enqueue returned a blocked "
858                            "lock, returning");
859                 ldlm_lock_dump(D_OTHER, lock, 0);
860                 ldlm_reprocess_all(lock->l_resource);
861                 RETURN(0);
862         }
863
864         LDLM_DEBUG(lock, "client-side async enqueue: granted/glimpsed");
865
866         stripe = ll_lock_to_stripe_offset(inode, lock);
867         if (stripe < 0)
868                 goto iput;
869
870         if (lock->l_lvb_len) {
871                 struct lov_stripe_md *lsm = lli->lli_smd;
872                 __u64 kms;
873                 lvb = lock->l_lvb_data;
874                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
875
876                 lock_res_and_lock(lock);
877                 ll_inode_size_lock(inode, 1);
878                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
879                 kms = ldlm_extent_shift_kms(NULL, kms);
880                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
881                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
882                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
883                 lsm->lsm_oinfo[stripe].loi_kms = kms;
884                 ll_inode_size_unlock(inode, 1);
885                 unlock_res_and_lock(lock);
886         }
887
888 iput:
889         iput(inode);
890         wake_up(&lock->l_waitq);
891
892         ldlm_lock2handle(lock, &lockh);
893         ldlm_lock_decref(&lockh, LCK_PR);
894         RETURN(0);
895 }
896 #endif
897
898 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
899 {
900         struct ptlrpc_request *req = reqp;
901         struct inode *inode = ll_inode_from_lock(lock);
902         struct ll_inode_info *lli;
903         struct lov_stripe_md *lsm;
904         struct ost_lvb *lvb;
905         int rc, stripe;
906         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
907         ENTRY;
908
909         if (inode == NULL)
910                 GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
911         lli = ll_i2info(inode);
912         if (lli == NULL)
913                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
914         lsm = lli->lli_smd;
915         if (lsm == NULL)
916                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
917
918         /* First, find out which stripe index this lock corresponds to. */
919         stripe = ll_lock_to_stripe_offset(inode, lock);
920         if (stripe < 0)
921                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
922
923         rc = lustre_pack_reply(req, 2, size, NULL);
924         if (rc) {
925                 CERROR("lustre_pack_reply: %d\n", rc);
926                 GOTO(iput, rc);
927         }
928
929         lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
930         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
931         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
932         lvb->lvb_atime = LTIME_S(inode->i_atime);
933         lvb->lvb_ctime = LTIME_S(inode->i_ctime);
934
935         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
936                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
937                    i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
938                    lvb->lvb_atime, lvb->lvb_ctime);
939  iput:
940         iput(inode);
941
942  out:
943         /* These errors are normal races, so we don't want to fill the console
944          * with messages by calling ptlrpc_error() */
945         if (rc == -ELDLM_NO_LOCK_DATA)
946                 lustre_pack_reply(req, 1, NULL, NULL);
947
948         req->rq_status = rc;
949         return rc;
950 }
951
952 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
953                      lstat_t *st)
954 {
955         struct lustre_handle lockh = { 0 };
956         struct ldlm_enqueue_info einfo = { 0 };
957         struct obd_info oinfo = { { { 0 } } };
958         struct ost_lvb lvb;
959         int rc;
960         
961         ENTRY;
962         
963         einfo.ei_type = LDLM_EXTENT;
964         einfo.ei_mode = LCK_PR;
965         einfo.ei_cb_bl = ll_extent_lock_callback;
966         einfo.ei_cb_cp = ldlm_completion_ast;
967         einfo.ei_cb_gl = ll_glimpse_callback;
968         einfo.ei_cbdata = NULL;
969
970         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
971         oinfo.oi_lockh = &lockh;
972         oinfo.oi_md = lsm;
973         oinfo.oi_flags = LDLM_FL_HAS_INTENT;
974
975         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
976         if (rc == -ENOENT)
977                 RETURN(rc);
978         if (rc != 0) {
979                 CERROR("obd_enqueue returned rc %d, "
980                        "returning -EIO\n", rc);
981                 RETURN(rc > 0 ? -EIO : rc);
982         }
983         
984         lov_stripe_lock(lsm);
985         memset(&lvb, 0, sizeof(lvb));
986         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
987         st->st_size = lvb.lvb_size;
988         st->st_blocks = lvb.lvb_blocks;
989         st->st_mtime = lvb.lvb_mtime;
990         st->st_atime = lvb.lvb_atime;
991         st->st_ctime = lvb.lvb_ctime;
992         lov_stripe_unlock(lsm);
993         
994         RETURN(rc);
995 }
996
997 /* NB: obd_merge_lvb will prefer locally cached writes if they extend the
998  * file (because it prefers KMS over RSS when larger) */
999 int ll_glimpse_size(struct inode *inode, int ast_flags)
1000 {
1001         struct ll_inode_info *lli = ll_i2info(inode);
1002         struct ll_sb_info *sbi = ll_i2sbi(inode);
1003         struct lustre_handle lockh = { 0 };
1004         struct ldlm_enqueue_info einfo = { 0 };
1005         struct obd_info oinfo = { { { 0 } } };
1006         struct ost_lvb lvb;
1007         int rc;
1008         ENTRY;
1009
1010         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
1011
1012         if (!lli->lli_smd) {
1013                 CDEBUG(D_DLMTRACE, "No objects for inode %lu\n", inode->i_ino);
1014                 RETURN(0);
1015         }
1016
1017         /* NOTE: this looks like DLM lock request, but it may not be one. Due
1018          *       to LDLM_FL_HAS_INTENT flag, this is glimpse request, that
1019          *       won't revoke any conflicting DLM locks held. Instead,
1020          *       ll_glimpse_callback() will be called on each client
1021          *       holding a DLM lock against this file, and resulting size
1022          *       will be returned for each stripe. DLM lock on [0, EOF] is
1023          *       acquired only if there were no conflicting locks. */
1024         einfo.ei_type = LDLM_EXTENT;
1025         einfo.ei_mode = LCK_PR;
1026         einfo.ei_cb_bl = ll_extent_lock_callback;
1027         einfo.ei_cb_cp = ldlm_completion_ast;
1028         einfo.ei_cb_gl = ll_glimpse_callback;
1029         einfo.ei_cbdata = inode;
1030
1031         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
1032         oinfo.oi_lockh = &lockh;
1033         oinfo.oi_md = lli->lli_smd;
1034         oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
1035
1036         rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
1037         if (rc == -ENOENT)
1038                 RETURN(rc);
1039         if (rc != 0) {
1040                 CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
1041                 RETURN(rc > 0 ? -EIO : rc);
1042         }
1043
1044         ll_inode_size_lock(inode, 1);
1045         inode_init_lvb(inode, &lvb);
1046         rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
1047         i_size_write(inode, lvb.lvb_size);
1048         inode->i_blocks = lvb.lvb_blocks;
1049         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1050         LTIME_S(inode->i_atime) = lvb.lvb_atime;
1051         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1052         ll_inode_size_unlock(inode, 1);
1053
1054         CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
1055                i_size_read(inode), (long long)inode->i_blocks);
1056
1057         RETURN(rc);
1058 }
1059
1060 int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
1061                    struct lov_stripe_md *lsm, int mode,
1062                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,
1063                    int ast_flags)
1064 {
1065         struct ll_sb_info *sbi = ll_i2sbi(inode);
1066         struct ost_lvb lvb;
1067         struct ldlm_enqueue_info einfo = { 0 };
1068         struct obd_info oinfo = { { { 0 } } };
1069         int rc;
1070         ENTRY;
1071
1072         LASSERT(!lustre_handle_is_used(lockh));
1073         LASSERT(lsm != NULL);
1074
1075         /* don't drop the mmapped file to LRU */
1076         if (mapping_mapped(inode->i_mapping))
1077                 ast_flags |= LDLM_FL_NO_LRU;
1078
1079         /* XXX phil: can we do this?  won't it screw the file size up? */
1080         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1081             (sbi->ll_flags & LL_SBI_NOLCK))
1082                 RETURN(0);
1083
1084         CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
1085                inode->i_ino, policy->l_extent.start, policy->l_extent.end);
1086
1087         einfo.ei_type = LDLM_EXTENT;
1088         einfo.ei_mode = mode;
1089         einfo.ei_cb_bl = ll_extent_lock_callback;
1090         einfo.ei_cb_cp = ldlm_completion_ast;
1091         einfo.ei_cb_gl = ll_glimpse_callback;
1092         einfo.ei_cbdata = inode;
1093
1094         oinfo.oi_policy = *policy;
1095         oinfo.oi_lockh = lockh;
1096         oinfo.oi_md = lsm;
1097         oinfo.oi_flags = ast_flags;
1098
1099         rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);
1100         *policy = oinfo.oi_policy;
1101         if (rc > 0)
1102                 rc = -EIO;
1103
1104         ll_inode_size_lock(inode, 1);
1105         inode_init_lvb(inode, &lvb);
1106         obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
1107
1108         if (policy->l_extent.start == 0 &&
1109             policy->l_extent.end == OBD_OBJECT_EOF) {
1110                 /* vmtruncate()->ll_truncate() first sets the i_size and then
1111                  * the kms under both a DLM lock and the
1112                  * ll_inode_size_lock().  If we don't get the
1113                  * ll_inode_size_lock() here we can match the DLM lock and
1114                  * reset i_size from the kms before the truncating path has
1115                  * updated the kms.  generic_file_write can then trust the
1116                  * stale i_size when doing appending writes and effectively
1117                  * cancel the result of the truncate.  Getting the
1118                  * ll_inode_size_lock() after the enqueue maintains the DLM
1119                  * -> ll_inode_size_lock() acquiring order. */
1120                 i_size_write(inode, lvb.lvb_size);
1121                 CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
1122                        inode->i_ino, i_size_read(inode));
1123         }
1124
1125         if (rc == 0) {
1126                 LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1127                 LTIME_S(inode->i_atime) = lvb.lvb_atime;
1128                 LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1129         }
1130         ll_inode_size_unlock(inode, 1);
1131
1132         RETURN(rc);
1133 }
1134
1135 int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
1136                      struct lov_stripe_md *lsm, int mode,
1137                      struct lustre_handle *lockh)
1138 {
1139         struct ll_sb_info *sbi = ll_i2sbi(inode);
1140         int rc;
1141         ENTRY;
1142
1143         /* XXX phil: can we do this?  won't it screw the file size up? */
1144         if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
1145             (sbi->ll_flags & LL_SBI_NOLCK))
1146                 RETURN(0);
1147
1148         rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
1149
1150         RETURN(rc);
1151 }
1152
1153 static void ll_set_file_contended(struct inode *inode)
1154 {
1155         struct ll_inode_info *lli = ll_i2info(inode);
1156
1157         lli->lli_contention_time = cfs_time_current();
1158         set_bit(LLI_F_CONTENDED, &lli->lli_flags);
1159 }
1160
1161 void ll_clear_file_contended(struct inode *inode)
1162 {
1163         struct ll_inode_info *lli = ll_i2info(inode);
1164
1165         clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
1166 }
1167
1168 static int ll_is_file_contended(struct file *file)
1169 {
1170         struct inode *inode = file->f_dentry->d_inode;
1171         struct ll_inode_info *lli = ll_i2info(inode);
1172         struct ll_sb_info *sbi = ll_i2sbi(inode);
1173         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1174         ENTRY;
1175
1176         if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
1177                 CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
1178                        " osc connect flags = 0x"LPX64"\n",
1179                        sbi->ll_lco.lco_flags);
1180                 RETURN(0);
1181         }
1182         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
1183                 RETURN(1);
1184         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
1185                 cfs_time_t cur_time = cfs_time_current();
1186                 cfs_time_t retry_time;
1187
1188                 retry_time = cfs_time_add(
1189                         lli->lli_contention_time,
1190                         cfs_time_seconds(sbi->ll_contention_time));
1191                 if (cfs_time_after(cur_time, retry_time)) {
1192                         ll_clear_file_contended(inode);
1193                         RETURN(0);
1194                 }
1195                 RETURN(1);
1196         }
1197         RETURN(0);
1198 }
1199
1200 static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
1201                                  const char *buf, size_t count,
1202                                  loff_t start, loff_t end, int rw)
1203 {
1204         int append;
1205         int tree_locked = 0;
1206         int rc;
1207         struct inode * inode = file->f_dentry->d_inode;
1208
1209         append = (rw == WRITE) && (file->f_flags & O_APPEND);
1210
1211         if (append || !ll_is_file_contended(file)) {
1212                 struct ll_lock_tree_node *node;
1213                 int ast_flags;
1214
1215                 ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
1216                 if (file->f_flags & O_NONBLOCK)
1217                         ast_flags |= LDLM_FL_BLOCK_NOWAIT;
1218                 node = ll_node_from_inode(inode, start, end,
1219                                           (rw == WRITE) ? LCK_PW : LCK_PR);
1220                 if (IS_ERR(node)) {
1221                         rc = PTR_ERR(node);
1222                         GOTO(out, rc);
1223                 }
1224                 tree->lt_fd = LUSTRE_FPRIVATE(file);
1225                 rc = ll_tree_lock(tree, node, buf, count, ast_flags);
1226                 if (rc == 0)
1227                         tree_locked = 1;
1228                 else if (rc == -EUSERS)
1229                         ll_set_file_contended(inode);
1230                 else
1231                         GOTO(out, rc);
1232         }
1233         RETURN(tree_locked);
1234 out:
1235         return rc;
1236 }
1237
1238 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1239                             loff_t *ppos)
1240 {
1241         struct inode *inode = file->f_dentry->d_inode;
1242         struct ll_inode_info *lli = ll_i2info(inode);
1243         struct lov_stripe_md *lsm = lli->lli_smd;
1244         struct ll_sb_info *sbi = ll_i2sbi(inode);
1245         struct ll_lock_tree tree;
1246         struct ost_lvb lvb;
1247         struct ll_ra_read bead;
1248         int ra = 0;
1249         loff_t end;
1250         ssize_t retval, chunk, sum = 0;
1251         int tree_locked;
1252
1253         __u64 kms;
1254         ENTRY;
1255         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1256                inode->i_ino, inode->i_generation, inode, count, *ppos);
1257         /* "If nbyte is 0, read() will return 0 and have no other results."
1258          *                      -- Single Unix Spec */
1259         if (count == 0)
1260                 RETURN(0);
1261
1262         ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
1263
1264         if (!lsm) {
1265                 /* Read on file with no objects should return zero-filled
1266                  * buffers up to file size (we can get non-zero sizes with
1267                  * mknod + truncate, then opening file for read. This is a
1268                  * common pattern in NFS case, it seems). Bug 6243 */
1269                 int notzeroed;
1270                 /* Since there are no objects on OSTs, we have nothing to get
1271                  * lock on and so we are forced to access inode->i_size
1272                  * unguarded */
1273
1274                 /* Read beyond end of file */
1275                 if (*ppos >= i_size_read(inode))
1276                         RETURN(0);
1277
1278                 if (count > i_size_read(inode) - *ppos)
1279                         count = i_size_read(inode) - *ppos;
1280                 /* Make sure to correctly adjust the file pos pointer for
1281                  * EFAULT case */
1282                 notzeroed = clear_user(buf, count);
1283                 count -= notzeroed;
1284                 *ppos += count;
1285                 if (!count)
1286                         RETURN(-EFAULT);
1287                 RETURN(count);
1288         }
1289 repeat:
1290         if (sbi->ll_max_rw_chunk != 0) {
1291                 /* first, let's know the end of the current stripe */
1292                 end = *ppos;
1293                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
1294                                 (obd_off *)&end);
1295
1296                 /* correct, the end is beyond the request */
1297                 if (end > *ppos + count - 1)
1298                         end = *ppos + count - 1;
1299
1300                 /* and chunk shouldn't be too large even if striping is wide */
1301                 if (end - *ppos > sbi->ll_max_rw_chunk)
1302                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1303         } else {
1304                 end = *ppos + count - 1;
1305         }
1306
1307         tree_locked = ll_file_get_tree_lock(&tree, file, buf,
1308                                             count, *ppos, end, READ);
1309         if (tree_locked < 0)
1310                 GOTO(out, retval = tree_locked);
1311
1312         ll_inode_size_lock(inode, 1);
1313         /*
1314          * Consistency guarantees: following possibilities exist for the
1315          * relation between region being read and real file size at this
1316          * moment:
1317          *
1318          *  (A): the region is completely inside of the file;
1319          *
1320          *  (B-x): x bytes of region are inside of the file, the rest is
1321          *  outside;
1322          *
1323          *  (C): the region is completely outside of the file.
1324          *
1325          * This classification is stable under DLM lock acquired by
1326          * ll_tree_lock() above, because to change class, other client has to
1327          * take DLM lock conflicting with our lock. Also, any updates to
1328          * ->i_size by other threads on this client are serialized by
1329          * ll_inode_size_lock(). This guarantees that short reads are handled
1330          * correctly in the face of concurrent writes and truncates.
1331          */
1332         inode_init_lvb(inode, &lvb);
1333         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1334         kms = lvb.lvb_size;
1335         if (*ppos + count - 1 > kms) {
1336                 /* A glimpse is necessary to determine whether we return a
1337                  * short read (B) or some zeroes at the end of the buffer (C) */
1338                 ll_inode_size_unlock(inode, 1);
1339                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1340                 if (retval) {
1341                         ll_tree_unlock(&tree);
1342                         goto out;
1343                 }
1344         } else {
1345                 /* region is within kms and, hence, within real file size (A).
1346                  * We need to increase i_size to cover the read region so that
1347                  * generic_file_read() will do its job, but that doesn't mean
1348                  * the kms size is _correct_, it is only the _minimum_ size.
1349                  * If someone does a stat they will get the correct size which
1350                  * will always be >= the kms value here.  b=11081 */
1351                 if (i_size_read(inode) < kms)
1352                         i_size_write(inode, kms);
1353                 ll_inode_size_unlock(inode, 1);
1354         }
1355
1356         chunk = end - *ppos + 1;
1357         CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1358                inode->i_ino, chunk, *ppos, i_size_read(inode));
1359
1360         /* turn off the kernel's read-ahead */
1361         if (tree_locked) {
1362 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1363                 file->f_ramax = 0;
1364 #else
1365                 file->f_ra.ra_pages = 0;
1366 #endif
1367                 /* initialize read-ahead window once per syscall */
1368                 if (ra == 0) {
1369                         ra = 1;
1370                         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1371                         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1372                         ll_ra_read_in(file, &bead);
1373                 }
1374
1375                 /* BUG: 5972 */
1376                 file_accessed(file);
1377                 retval = generic_file_read(file, buf, chunk, ppos);
1378                 ll_tree_unlock(&tree);
1379         } else {
1380                 retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
1381         }
1382         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
1383         if (retval > 0) {
1384                 buf += retval;
1385                 count -= retval;
1386                 sum += retval;
1387                 if (retval == chunk && count > 0)
1388                         goto repeat;
1389         }
1390
1391  out:
1392         if (ra != 0)
1393                 ll_ra_read_ex(file, &bead);
1394         retval = (sum > 0) ? sum : retval;
1395         RETURN(retval);
1396 }
1397
1398 /*
1399  * Write to a file (through the page cache).
1400  */
1401 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1402                              loff_t *ppos)
1403 {
1404         struct inode *inode = file->f_dentry->d_inode;
1405         struct ll_sb_info *sbi = ll_i2sbi(inode);
1406         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1407         struct ll_lock_tree tree;
1408         loff_t maxbytes = ll_file_maxbytes(inode);
1409         loff_t lock_start, lock_end, end;
1410         ssize_t retval, chunk, sum = 0;
1411         int tree_locked;
1412         ENTRY;
1413
1414         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1415                inode->i_ino, inode->i_generation, inode, count, *ppos);
1416         
1417         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1418
1419         /* POSIX, but surprised the VFS doesn't check this already */
1420         if (count == 0)
1421                 RETURN(0);
1422
1423         /* If file was opened for LL_IOC_LOV_SETSTRIPE but the ioctl wasn't
1424          * called on the file, don't fail the below assertion (bug 2388). */
1425         if (file->f_flags & O_LOV_DELAY_CREATE &&
1426             ll_i2info(inode)->lli_smd == NULL)
1427                 RETURN(-EBADF);
1428
1429         LASSERT(ll_i2info(inode)->lli_smd != NULL);
1430
1431         down(&ll_i2info(inode)->lli_write_sem);
1432
1433 repeat:
1434         chunk = 0; /* just to fix gcc's warning */
1435         end = *ppos + count - 1;
1436
1437         if (file->f_flags & O_APPEND) {
1438                 lock_start = 0;
1439                 lock_end = OBD_OBJECT_EOF;
1440         } else if (sbi->ll_max_rw_chunk != 0) {
1441                 /* first, let's know the end of the current stripe */
1442                 end = *ppos;
1443                 obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
1444                                 (obd_off *)&end);
1445
1446                 /* correct, the end is beyond the request */
1447                 if (end > *ppos + count - 1)
1448                         end = *ppos + count - 1;
1449
1450                 /* and chunk shouldn't be too large even if striping is wide */
1451                 if (end - *ppos > sbi->ll_max_rw_chunk)
1452                         end = *ppos + sbi->ll_max_rw_chunk - 1;
1453                 lock_start = *ppos;
1454                 lock_end = end;
1455         } else {
1456                 lock_start = *ppos;
1457                 lock_end = *ppos + count - 1;
1458         }
1459
1460         tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
1461                                             lock_start, lock_end, WRITE);
1462         if (tree_locked < 0)
1463                 GOTO(out, retval = tree_locked);
1464
1465         /* This is ok, g_f_w will overwrite this under i_sem if it races
1466          * with a local truncate, it just makes our maxbyte checking easier.
1467          * The i_size value gets updated in ll_extent_lock() as a consequence
1468          * of the [0,EOF] extent lock we requested above. */
1469         if (file->f_flags & O_APPEND) {
1470                 *ppos = i_size_read(inode);
1471                 end = *ppos + count - 1;
1472         }
1473
1474         if (*ppos >= maxbytes) {
1475                 send_sig(SIGXFSZ, current, 0);
1476                 GOTO(out_unlock, retval = -EFBIG);
1477         }
1478         if (end > maxbytes - 1)
1479                 end = maxbytes - 1;
1480
1481         /* generic_file_write handles O_APPEND after getting i_mutex */
1482         chunk = end - *ppos + 1;
1483         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
1484                inode->i_ino, chunk, *ppos);
1485         if (tree_locked)
1486                 retval = generic_file_write(file, buf, chunk, ppos);
1487         else
1488                 retval = ll_file_lockless_io(file, (char*)buf, chunk,
1489                                              ppos, WRITE);
1490         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
1491
1492 out_unlock:
1493         if (tree_locked)
1494                 ll_tree_unlock(&tree);
1495
1496 out:
1497         if (retval > 0) {
1498                 buf += retval;
1499                 count -= retval;
1500                 sum += retval;
1501                 if (retval == chunk && count > 0)
1502                         goto repeat;
1503         }
1504
1505         up(&ll_i2info(inode)->lli_write_sem);
1506
1507         retval = (sum > 0) ? sum : retval;
1508         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
1509                            retval > 0 ? retval : 0);
1510         RETURN(retval);
1511 }
1512
1513 /*
1514  * Send file content (through pagecache) somewhere with helper
1515  */
1516 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
1517 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1518                                 read_actor_t actor, void *target)
1519 {
1520         struct inode *inode = in_file->f_dentry->d_inode;
1521         struct ll_inode_info *lli = ll_i2info(inode);
1522         struct lov_stripe_md *lsm = lli->lli_smd;
1523         struct ll_lock_tree tree;
1524         struct ll_lock_tree_node *node;
1525         struct ost_lvb lvb;
1526         struct ll_ra_read bead;
1527         int rc;
1528         ssize_t retval;
1529         __u64 kms;
1530         ENTRY;
1531         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
1532                inode->i_ino, inode->i_generation, inode, count, *ppos);
1533
1534         /* "If nbyte is 0, read() will return 0 and have no other results."
1535          *                      -- Single Unix Spec */
1536         if (count == 0)
1537                 RETURN(0);
1538
1539         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
1540         /* turn off the kernel's read-ahead */
1541         in_file->f_ra.ra_pages = 0;
1542
1543         /* File with no objects, nothing to lock */
1544         if (!lsm)
1545                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
1546
1547         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
1548         if (IS_ERR(node))
1549                 RETURN(PTR_ERR(node));
1550
1551         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
1552         rc = ll_tree_lock(&tree, node, NULL, count,
1553                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
1554         if (rc != 0)
1555                 RETURN(rc);
1556
1557         ll_clear_file_contended(inode);
1558         ll_inode_size_lock(inode, 1);
1559         /*
1560          * Consistency guarantees: following possibilities exist for the
1561          * relation between region being read and real file size at this
1562          * moment:
1563          *
1564          *  (A): the region is completely inside of the file;
1565          *
1566          *  (B-x): x bytes of region are inside of the file, the rest is
1567          *  outside;
1568          *
1569          *  (C): the region is completely outside of the file.
1570          *
1571          * This classification is stable under DLM lock acquired by
1572          * ll_tree_lock() above, because to change class, other client has to
1573          * take DLM lock conflicting with our lock. Also, any updates to
1574          * ->i_size by other threads on this client are serialized by
1575          * ll_inode_size_lock(). This guarantees that short reads are handled
1576          * correctly in the face of concurrent writes and truncates.
1577          */
1578         inode_init_lvb(inode, &lvb);
1579         obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
1580         kms = lvb.lvb_size;
1581         if (*ppos + count - 1 > kms) {
1582                 /* A glimpse is necessary to determine whether we return a
1583                  * short read (B) or some zeroes at the end of the buffer (C) */
1584                 ll_inode_size_unlock(inode, 1);
1585                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
1586                 if (retval)
1587                         goto out;
1588         } else {
1589                 /* region is within kms and, hence, within real file size (A) */
1590                 i_size_write(inode, kms);
1591                 ll_inode_size_unlock(inode, 1);
1592         }
1593
1594         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
1595                inode->i_ino, count, *ppos, i_size_read(inode));
1596
1597         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
1598         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1599         ll_ra_read_in(in_file, &bead);
1600         /* BUG: 5972 */
1601         file_accessed(in_file);
1602         retval = generic_file_sendfile(in_file, ppos, count, actor, target);
1603         ll_ra_read_ex(in_file, &bead);
1604
1605  out:
1606         ll_tree_unlock(&tree);
1607         RETURN(retval);
1608 }
1609 #endif
1610
1611 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
1612                                unsigned long arg)
1613 {
1614         struct ll_inode_info *lli = ll_i2info(inode);
1615         struct obd_export *exp = ll_i2obdexp(inode);
1616         struct ll_recreate_obj ucreatp;
1617         struct obd_trans_info oti = { 0 };
1618         struct obdo *oa = NULL;
1619         int lsm_size;
1620         int rc = 0;
1621         struct lov_stripe_md *lsm, *lsm2;
1622         ENTRY;
1623
1624         if (!capable (CAP_SYS_ADMIN))
1625                 RETURN(-EPERM);
1626
1627         rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
1628                             sizeof(struct ll_recreate_obj));
1629         if (rc) {
1630                 RETURN(-EFAULT);
1631         }
1632         OBDO_ALLOC(oa);
1633         if (oa == NULL)
1634                 RETURN(-ENOMEM);
1635
1636         down(&lli->lli_size_sem);
1637         lsm = lli->lli_smd;
1638         if (lsm == NULL)
1639                 GOTO(out, rc = -ENOENT);
1640         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1641                    (lsm->lsm_stripe_count));
1642
1643         OBD_ALLOC(lsm2, lsm_size);
1644         if (lsm2 == NULL)
1645                 GOTO(out, rc = -ENOMEM);
1646
1647         oa->o_id = ucreatp.lrc_id;
1648         oa->o_nlink = ucreatp.lrc_ost_idx;
1649         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1650         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
1651         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1652                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1653
1654         oti.oti_objid = NULL;
1655         memcpy(lsm2, lsm, lsm_size);
1656         rc = obd_create(exp, oa, &lsm2, &oti);
1657
1658         OBD_FREE(lsm2, lsm_size);
1659         GOTO(out, rc);
1660 out:
1661         up(&lli->lli_size_sem);
1662         OBDO_FREE(oa);
1663         return rc;
1664 }
1665
1666 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1667                                     int flags, struct lov_user_md *lum,
1668                                     int lum_size)
1669 {
1670         struct ll_inode_info *lli = ll_i2info(inode);
1671         struct lov_stripe_md *lsm;
1672         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1673         int rc = 0;
1674         ENTRY;
1675
1676         down(&lli->lli_size_sem);
1677         lsm = lli->lli_smd;
1678         if (lsm) {
1679                 up(&lli->lli_size_sem);
1680                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1681                        inode->i_ino);
1682                 RETURN(-EEXIST);
1683         }
1684
1685         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1686         if (rc)
1687                 GOTO(out, rc);
1688         if (it_disposition(&oit, DISP_LOOKUP_NEG))
1689                 GOTO(out_req_free, rc = -ENOENT);
1690         rc = oit.d.lustre.it_status;
1691         if (rc < 0)
1692                 GOTO(out_req_free, rc);
1693
1694         ll_release_openhandle(file->f_dentry, &oit);
1695
1696  out:
1697         up(&lli->lli_size_sem);
1698         ll_intent_release(&oit);
1699         RETURN(rc);
1700 out_req_free:
1701         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1702         goto out;
1703 }
1704
1705 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename, 
1706                              struct lov_mds_md **lmmp, int *lmm_size, 
1707                              struct ptlrpc_request **request)
1708 {
1709         struct ll_sb_info *sbi = ll_i2sbi(inode);
1710         struct ll_fid  fid;
1711         struct mds_body  *body;
1712         struct lov_mds_md *lmm = NULL;
1713         struct ptlrpc_request *req = NULL;
1714         int rc, lmmsize;
1715
1716         ll_inode2fid(&fid, inode);
1717
1718         rc = ll_get_max_mdsize(sbi, &lmmsize);
1719         if (rc)
1720                 RETURN(rc);
1721
1722         rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
1723                         filename, strlen(filename) + 1,
1724                         OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
1725                         lmmsize, &req);
1726         if (rc < 0) {
1727                 CDEBUG(D_INFO, "mdc_getattr_name failed "
1728                                 "on %s: rc %d\n", filename, rc);
1729                 GOTO(out, rc);
1730         }
1731
1732         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1733                         sizeof(*body));
1734         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1735         /* swabbed by mdc_getattr_name */
1736         LASSERT_REPSWABBED(req, REPLY_REC_OFF);
1737
1738         lmmsize = body->eadatasize;
1739
1740         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1741                         lmmsize == 0) {
1742                 GOTO(out, rc = -ENODATA);
1743         }
1744
1745         lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
1746                         lmmsize);
1747         LASSERT(lmm != NULL);
1748         LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
1749
1750         /*
1751          * This is coming from the MDS, so is probably in
1752          * little endian.  We convert it to host endian before
1753          * passing it to userspace.
1754          */
1755         if (lmm->lmm_magic == __swab32(LOV_MAGIC)) {
1756                 lustre_swab_lov_user_md((struct lov_user_md *)lmm);
1757                 lustre_swab_lov_user_md_objects((struct lov_user_md *)lmm);
1758         } else if (lmm->lmm_magic == __swab32(LOV_MAGIC_JOIN)) {
1759                 lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
1760         }
1761
1762         if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
1763                 struct lov_stripe_md *lsm;
1764                 struct lov_user_md_join *lmj;
1765                 int lmj_size, i, aindex = 0;
1766
1767                 rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
1768                 if (rc < 0)
1769                         GOTO(out, rc = -ENOMEM);
1770                 rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
1771                 if (rc)
1772                         GOTO(out_free_memmd, rc);
1773
1774                 lmj_size = sizeof(struct lov_user_md_join) +
1775                         lsm->lsm_stripe_count *
1776                         sizeof(struct lov_user_ost_data_join);
1777                 OBD_ALLOC(lmj, lmj_size);
1778                 if (!lmj)
1779                         GOTO(out_free_memmd, rc = -ENOMEM);
1780
1781                 memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
1782                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1783                         struct lov_extent *lex =
1784                                 &lsm->lsm_array->lai_ext_array[aindex];
1785
1786                         if (lex->le_loi_idx + lex->le_stripe_count <= i)
1787                                 aindex ++;
1788                         CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
1789                                         LPU64" len %d\n", aindex, i,
1790                                         lex->le_start, (int)lex->le_len);
1791                         lmj->lmm_objects[i].l_extent_start =
1792                                 lex->le_start;
1793
1794                         if ((int)lex->le_len == -1)
1795                                 lmj->lmm_objects[i].l_extent_end = -1;
1796                         else
1797                                 lmj->lmm_objects[i].l_extent_end =
1798                                         lex->le_start + lex->le_len;
1799                         lmj->lmm_objects[i].l_object_id =
1800                                 lsm->lsm_oinfo[i]->loi_id;
1801                         lmj->lmm_objects[i].l_object_gr =
1802                                 lsm->lsm_oinfo[i]->loi_gr;
1803                         lmj->lmm_objects[i].l_ost_gen =
1804                                 lsm->lsm_oinfo[i]->loi_ost_gen;
1805                         lmj->lmm_objects[i].l_ost_idx =
1806                                 lsm->lsm_oinfo[i]->loi_ost_idx;
1807                 }
1808                 lmm = (struct lov_mds_md *)lmj;
1809                 lmmsize = lmj_size;
1810 out_free_memmd:
1811                 obd_free_memmd(sbi->ll_osc_exp, &lsm);
1812         }
1813 out:
1814         *lmmp = lmm;
1815         *lmm_size = lmmsize;
1816         *request = req;
1817         return rc;
1818 }
1819 static int ll_lov_setea(struct inode *inode, struct file *file,
1820                             unsigned long arg)
1821 {
1822         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1823         struct lov_user_md  *lump;
1824         int lum_size = sizeof(struct lov_user_md) +
1825                        sizeof(struct lov_user_ost_data);
1826         int rc;
1827         ENTRY;
1828
1829         if (!capable (CAP_SYS_ADMIN))
1830                 RETURN(-EPERM);
1831
1832         OBD_ALLOC(lump, lum_size);
1833         if (lump == NULL) {
1834                 RETURN(-ENOMEM);
1835         }
1836         rc = copy_from_user(lump, (struct lov_user_md  *)arg, lum_size);
1837         if (rc) {
1838                 OBD_FREE(lump, lum_size);
1839                 RETURN(-EFAULT);
1840         }
1841
1842         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1843
1844         OBD_FREE(lump, lum_size);
1845         RETURN(rc);
1846 }
1847
1848 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1849                             unsigned long arg)
1850 {
1851         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1852         int rc;
1853         int flags = FMODE_WRITE;
1854         ENTRY;
1855
1856         /* Bug 1152: copy properly when this is no longer true */
1857         LASSERT(sizeof(lum) == sizeof(*lump));
1858         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1859         rc = copy_from_user(&lum, lump, sizeof(lum));
1860         if (rc)
1861                 RETURN(-EFAULT);
1862
1863         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
1864         if (rc == 0) {
1865                  put_user(0, &lump->lmm_stripe_count);
1866                  rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
1867                                     0, ll_i2info(inode)->lli_smd, lump);
1868         }
1869         RETURN(rc);
1870 }
1871
1872 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1873 {
1874         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1875
1876         if (!lsm)
1877                 RETURN(-ENODATA);
1878
1879         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
1880                             (void *)arg);
1881 }
1882
1883 static int ll_get_grouplock(struct inode *inode, struct file *file,
1884                             unsigned long arg)
1885 {
1886         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1887         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1888                                                     .end = OBD_OBJECT_EOF}};
1889         struct lustre_handle lockh = { 0 };
1890         struct ll_inode_info *lli = ll_i2info(inode);
1891         struct lov_stripe_md *lsm = lli->lli_smd;
1892         int flags = 0, rc;
1893         ENTRY;
1894
1895         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1896                 RETURN(-EINVAL);
1897         }
1898
1899         policy.l_extent.gid = arg;
1900         if (file->f_flags & O_NONBLOCK)
1901                 flags = LDLM_FL_BLOCK_NOWAIT;
1902
1903         rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags);
1904         if (rc)
1905                 RETURN(rc);
1906
1907         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1908         fd->fd_gid = arg;
1909         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1910
1911         RETURN(0);
1912 }
1913
1914 static int ll_put_grouplock(struct inode *inode, struct file *file,
1915                             unsigned long arg)
1916 {
1917         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1918         struct ll_inode_info *lli = ll_i2info(inode);
1919         struct lov_stripe_md *lsm = lli->lli_smd;
1920         int rc;
1921         ENTRY;
1922
1923         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1924                 /* Ugh, it's already unlocked. */
1925                 RETURN(-EINVAL);
1926         }
1927
1928         if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */
1929                 RETURN(-EINVAL);
1930
1931         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1932
1933         rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1934         if (rc)
1935                 RETURN(rc);
1936
1937         fd->fd_gid = 0;
1938         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1939
1940         RETURN(0);
1941 }
1942
1943 static int join_sanity_check(struct inode *head, struct inode *tail)
1944 {
1945         ENTRY;
1946         if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
1947                 CERROR("server do not support join \n");
1948                 RETURN(-EINVAL);
1949         }
1950         if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
1951                 CERROR("tail ino %lu and ino head %lu must be regular\n",
1952                        head->i_ino, tail->i_ino);
1953                 RETURN(-EINVAL);
1954         }
1955         if (head->i_ino == tail->i_ino) {
1956                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
1957                 RETURN(-EINVAL);
1958         }
1959         if (i_size_read(head) % JOIN_FILE_ALIGN) {
1960                 CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
1961                 RETURN(-EINVAL);
1962         }
1963         RETURN(0);
1964 }
1965
1966 static int join_file(struct inode *head_inode, struct file *head_filp,
1967                      struct file *tail_filp)
1968 {
1969         struct dentry *tail_dentry = tail_filp->f_dentry;
1970         struct lookup_intent oit = {.it_op = IT_OPEN,
1971                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
1972         struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_PW,
1973                 ll_mdc_blocking_ast, ldlm_completion_ast, NULL, NULL };
1974
1975         struct lustre_handle lockh;
1976         struct mdc_op_data *op_data;
1977         int    rc;
1978         loff_t data;
1979         ENTRY;
1980
1981         tail_dentry = tail_filp->f_dentry;
1982
1983         OBD_ALLOC_PTR(op_data);
1984         if (op_data == NULL) {
1985                 RETURN(-ENOMEM);
1986         }
1987
1988         data = i_size_read(head_inode);
1989         ll_prepare_mdc_op_data(op_data, head_inode,
1990                                tail_dentry->d_parent->d_inode,
1991                                tail_dentry->d_name.name,
1992                                tail_dentry->d_name.len, 0, &data);
1993         rc = mdc_enqueue(ll_i2mdcexp(head_inode), &einfo, &oit,
1994                          op_data, &lockh, NULL, 0, 0);
1995
1996         if (rc < 0)
1997                 GOTO(out, rc);
1998
1999         rc = oit.d.lustre.it_status;
2000
2001         if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
2002                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
2003                 ptlrpc_req_finished((struct ptlrpc_request *)
2004                                     oit.d.lustre.it_data);
2005                 GOTO(out, rc);
2006         }
2007
2008         if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
2009                                            * away */
2010                 ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
2011                 oit.d.lustre.it_lock_mode = 0;
2012         }
2013         ll_release_openhandle(head_filp->f_dentry, &oit);
2014 out:
2015         if (op_data)
2016                 OBD_FREE_PTR(op_data);
2017         ll_intent_release(&oit);
2018         RETURN(rc);
2019 }
2020
2021 static int ll_file_join(struct inode *head, struct file *filp,
2022                         char *filename_tail)
2023 {
2024         struct inode *tail = NULL, *first = NULL, *second = NULL;
2025         struct dentry *tail_dentry;
2026         struct file *tail_filp, *first_filp, *second_filp;
2027         struct ll_lock_tree first_tree, second_tree;
2028         struct ll_lock_tree_node *first_node, *second_node;
2029         struct ll_inode_info *hlli = ll_i2info(head), *tlli;
2030         int rc = 0, cleanup_phase = 0;
2031         ENTRY;
2032
2033         CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
2034                head->i_ino, head->i_generation, head, filename_tail);
2035
2036         tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
2037         if (IS_ERR(tail_filp)) {
2038                 CERROR("Can not open tail file %s", filename_tail);
2039                 rc = PTR_ERR(tail_filp);
2040                 GOTO(cleanup, rc);
2041         }
2042         tail = igrab(tail_filp->f_dentry->d_inode);
2043
2044         tlli = ll_i2info(tail);
2045         tail_dentry = tail_filp->f_dentry;
2046         LASSERT(tail_dentry);
2047         cleanup_phase = 1;
2048
2049         /*reorder the inode for lock sequence*/
2050         first = head->i_ino > tail->i_ino ? head : tail;
2051         second = head->i_ino > tail->i_ino ? tail : head;
2052         first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
2053         second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
2054
2055         CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
2056                head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
2057         first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
2058         if (IS_ERR(first_node)){
2059                 rc = PTR_ERR(first_node);
2060                 GOTO(cleanup, rc);
2061         }
2062         first_tree.lt_fd = first_filp->private_data;
2063         rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
2064         if (rc != 0)
2065                 GOTO(cleanup, rc);
2066         cleanup_phase = 2;
2067
2068         second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
2069         if (IS_ERR(second_node)){
2070                 rc = PTR_ERR(second_node);
2071                 GOTO(cleanup, rc);
2072         }
2073         second_tree.lt_fd = second_filp->private_data;
2074         rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
2075         if (rc != 0)
2076                 GOTO(cleanup, rc);
2077         cleanup_phase = 3;
2078
2079         rc = join_sanity_check(head, tail);
2080         if (rc)
2081                 GOTO(cleanup, rc);
2082
2083         rc = join_file(head, filp, tail_filp);
2084         if (rc)
2085                 GOTO(cleanup, rc);
2086 cleanup:
2087         switch (cleanup_phase) {
2088         case 3:
2089                 ll_tree_unlock(&second_tree);
2090                 obd_cancel_unused(ll_i2obdexp(second),
2091                                   ll_i2info(second)->lli_smd, 0, NULL);
2092         case 2:
2093                 ll_tree_unlock(&first_tree);
2094                 obd_cancel_unused(ll_i2obdexp(first),
2095                                   ll_i2info(first)->lli_smd, 0, NULL);
2096         case 1:
2097                 filp_close(tail_filp, 0);
2098                 if (tail)
2099                         iput(tail);
2100                 if (head && rc == 0) {
2101                         obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
2102                                        &hlli->lli_smd);
2103                         hlli->lli_smd = NULL;
2104                 }
2105         case 0:
2106                 break;
2107         default:
2108                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2109                 LBUG();
2110         }
2111         RETURN(rc);
2112 }
2113
2114 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
2115 {
2116         struct inode *inode = dentry->d_inode;
2117         struct obd_client_handle *och;
2118         int rc;
2119         ENTRY;
2120
2121         LASSERT(inode);
2122
2123         /* Root ? Do nothing. */
2124         if (dentry->d_inode->i_sb->s_root == dentry)
2125                 RETURN(0);
2126
2127         /* No open handle to close? Move away */
2128         if (!it_disposition(it, DISP_OPEN_OPEN))
2129                 RETURN(0);
2130
2131         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
2132
2133         OBD_ALLOC(och, sizeof(*och));
2134         if (!och)
2135                 GOTO(out, rc = -ENOMEM);
2136
2137         ll_och_fill(ll_i2info(inode), it, och);
2138
2139         rc = ll_close_inode_openhandle(inode, och);
2140
2141         OBD_FREE(och, sizeof(*och));
2142  out:
2143         /* this one is in place of ll_file_open */
2144         ptlrpc_req_finished(it->d.lustre.it_data);
2145         it_clear_disposition(it, DISP_ENQ_OPEN_REF);
2146         RETURN(rc);
2147 }
2148
2149 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
2150                   unsigned long arg)
2151 {
2152         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2153         int flags;
2154         ENTRY;
2155
2156         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2157                inode->i_generation, inode, cmd);
2158         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2159
2160         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2161         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2162                 RETURN(-ENOTTY);
2163
2164         switch(cmd) {
2165         case LL_IOC_GETFLAGS:
2166                 /* Get the current value of the file flags */
2167                 return put_user(fd->fd_flags, (int *)arg);
2168         case LL_IOC_SETFLAGS:
2169         case LL_IOC_CLRFLAGS:
2170                 /* Set or clear specific file flags */
2171                 /* XXX This probably needs checks to ensure the flags are
2172                  *     not abused, and to handle any flag side effects.
2173                  */
2174                 if (get_user(flags, (int *) arg))
2175                         RETURN(-EFAULT);
2176
2177                 if (cmd == LL_IOC_SETFLAGS) {
2178                         if ((flags & LL_FILE_IGNORE_LOCK) &&
2179                             !(file->f_flags & O_DIRECT)) {
2180                                 CERROR("%s: unable to disable locking on "
2181                                        "non-O_DIRECT file\n", current->comm);
2182                                 RETURN(-EINVAL);
2183                         }
2184
2185                         fd->fd_flags |= flags;
2186                 } else {
2187                         fd->fd_flags &= ~flags;
2188                 }
2189                 RETURN(0);
2190         case LL_IOC_LOV_SETSTRIPE:
2191                 RETURN(ll_lov_setstripe(inode, file, arg));
2192         case LL_IOC_LOV_SETEA:
2193                 RETURN(ll_lov_setea(inode, file, arg));
2194         case LL_IOC_LOV_GETSTRIPE:
2195                 RETURN(ll_lov_getstripe(inode, arg));
2196         case LL_IOC_RECREATE_OBJ:
2197                 RETURN(ll_lov_recreate_obj(inode, file, arg));
2198         case EXT3_IOC_GETFLAGS:
2199         case EXT3_IOC_SETFLAGS:
2200                 RETURN(ll_iocontrol(inode, file, cmd, arg));
2201         case EXT3_IOC_GETVERSION_OLD:
2202         case EXT3_IOC_GETVERSION:
2203                 RETURN(put_user(inode->i_generation, (int *)arg));
2204         case LL_IOC_JOIN: {
2205                 char *ftail;
2206                 int rc;
2207
2208                 ftail = getname((const char *)arg);
2209                 if (IS_ERR(ftail))
2210                         RETURN(PTR_ERR(ftail));
2211                 rc = ll_file_join(inode, file, ftail);
2212                 putname(ftail);
2213                 RETURN(rc);
2214         }
2215         case LL_IOC_GROUP_LOCK:
2216                 RETURN(ll_get_grouplock(inode, file, arg));
2217         case LL_IOC_GROUP_UNLOCK:
2218                 RETURN(ll_put_grouplock(inode, file, arg));
2219         case IOC_OBD_STATFS:
2220                 RETURN(ll_obd_statfs(inode, (void *)arg));
2221         case OBD_IOC_GETNAME_OLD:
2222         case OBD_IOC_GETNAME: {
2223                 struct obd_device *obd =
2224                         class_exp2obd(ll_i2sbi(inode)->ll_osc_exp);
2225                 if (!obd)
2226                         RETURN(-EFAULT);
2227                 if (copy_to_user((void *)arg, obd->obd_name,
2228                                 strlen(obd->obd_name) + 1))
2229                         RETURN (-EFAULT);
2230                 RETURN(0);
2231         }
2232
2233         /* We need to special case any other ioctls we want to handle,
2234          * to send them to the MDS/OST as appropriate and to properly
2235          * network encode the arg field.
2236         case EXT3_IOC_SETVERSION_OLD:
2237         case EXT3_IOC_SETVERSION:
2238         */
2239         default: {
2240                 int err;
2241
2242                 if (LLIOC_STOP == 
2243                     ll_iocontrol_call(inode, file, cmd, arg, &err))
2244                         RETURN(err);
2245
2246                 RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
2247                                      (void *)arg));
2248         }
2249         }
2250 }
2251
2252 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2253 {
2254         struct inode *inode = file->f_dentry->d_inode;
2255         struct ll_inode_info *lli = ll_i2info(inode);
2256         struct lov_stripe_md *lsm = lli->lli_smd;
2257         loff_t retval;
2258         ENTRY;
2259         retval = offset + ((origin == 2) ? i_size_read(inode) :
2260                            (origin == 1) ? file->f_pos : 0);
2261         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
2262                inode->i_ino, inode->i_generation, inode, retval, retval,
2263                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
2264         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2265
2266         if (origin == 2) { /* SEEK_END */
2267                 int nonblock = 0, rc;
2268
2269                 if (file->f_flags & O_NONBLOCK)
2270                         nonblock = LDLM_FL_BLOCK_NOWAIT;
2271
2272                 if (lsm != NULL) {
2273                         rc = ll_glimpse_size(inode, nonblock);
2274                         if (rc != 0)
2275                                 RETURN(rc);
2276                 }
2277
2278                 ll_inode_size_lock(inode, 0);
2279                 offset += i_size_read(inode);
2280                 ll_inode_size_unlock(inode, 0);
2281         } else if (origin == 1) { /* SEEK_CUR */
2282                 offset += file->f_pos;
2283         }
2284
2285         retval = -EINVAL;
2286         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
2287                 if (offset != file->f_pos) {
2288                         file->f_pos = offset;
2289 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2290                         file->f_reada = 0;
2291                         file->f_version = ++event;
2292 #endif
2293                 }
2294                 retval = offset;
2295         }
2296
2297         RETURN(retval);
2298 }
2299
2300 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2301 {
2302         struct inode *inode = dentry->d_inode;
2303         struct ll_inode_info *lli = ll_i2info(inode);
2304         struct lov_stripe_md *lsm = lli->lli_smd;
2305         struct ll_fid fid;
2306         struct ptlrpc_request *req;
2307         int rc, err;
2308         ENTRY;
2309         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2310                inode->i_generation, inode);
2311         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2312
2313         /* fsync's caller has already called _fdata{sync,write}, we want
2314          * that IO to finish before calling the osc and mdc sync methods */
2315         rc = filemap_fdatawait(inode->i_mapping);
2316
2317         /* catch async errors that were recorded back when async writeback
2318          * failed for pages in this mapping. */
2319         err = lli->lli_async_rc;
2320         lli->lli_async_rc = 0;
2321         if (rc == 0)
2322                 rc = err;
2323         if (lsm) {
2324                 err = lov_test_and_clear_async_rc(lsm);
2325                 if (rc == 0)
2326                         rc = err;
2327         }
2328
2329         ll_inode2fid(&fid, inode);
2330         err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
2331         if (!rc)
2332                 rc = err;
2333         if (!err)
2334                 ptlrpc_req_finished(req);
2335
2336         if (data && lsm) {
2337                 struct obdo *oa;
2338
2339                 OBDO_ALLOC(oa);
2340                 if (!oa)
2341                         RETURN(rc ? rc : -ENOMEM);
2342
2343                 oa->o_id = lsm->lsm_object_id;
2344                 oa->o_valid = OBD_MD_FLID;
2345                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
2346                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
2347
2348                 err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
2349                                0, OBD_OBJECT_EOF);
2350                 if (!rc)
2351                         rc = err;
2352                 OBDO_FREE(oa);
2353         }
2354
2355         RETURN(rc);
2356 }
2357
2358 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2359 {
2360         struct inode *inode = file->f_dentry->d_inode;
2361         struct ll_sb_info *sbi = ll_i2sbi(inode);
2362         struct ldlm_res_id res_id =
2363                     { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
2364         struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
2365                 ldlm_flock_completion_ast, NULL, file_lock };
2366         struct lustre_handle lockh = {0};
2367         ldlm_policy_data_t flock;
2368         int flags = 0;
2369         int rc;
2370         ENTRY;
2371
2372         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2373                inode->i_ino, file_lock);
2374         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2375
2376         if (file_lock->fl_flags & FL_FLOCK) {
2377                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2378                 /* set missing params for flock() calls */
2379                 file_lock->fl_end = OFFSET_MAX;
2380                 file_lock->fl_pid = current->tgid;
2381         }
2382         flock.l_flock.pid = file_lock->fl_pid;
2383         flock.l_flock.start = file_lock->fl_start;
2384         flock.l_flock.end = file_lock->fl_end;
2385
2386         switch (file_lock->fl_type) {
2387         case F_RDLCK:
2388                 einfo.ei_mode = LCK_PR;
2389                 break;
2390         case F_UNLCK:
2391                 /* An unlock request may or may not have any relation to
2392                  * existing locks so we may not be able to pass a lock handle
2393                  * via a normal ldlm_lock_cancel() request. The request may even
2394                  * unlock a byte range in the middle of an existing lock. In
2395                  * order to process an unlock request we need all of the same
2396                  * information that is given with a normal read or write record
2397                  * lock request. To avoid creating another ldlm unlock (cancel)
2398                  * message we'll treat a LCK_NL flock request as an unlock. */
2399                 einfo.ei_mode = LCK_NL;
2400                 break;
2401         case F_WRLCK:
2402                 einfo.ei_mode = LCK_PW;
2403                 break;
2404         default:
2405                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
2406                 LBUG();
2407         }
2408
2409         switch (cmd) {
2410         case F_SETLKW:
2411 #ifdef F_SETLKW64
2412         case F_SETLKW64:
2413 #endif
2414                 flags = 0;
2415                 break;
2416         case F_SETLK:
2417 #ifdef F_SETLK64
2418         case F_SETLK64:
2419 #endif
2420                 flags = LDLM_FL_BLOCK_NOWAIT;
2421                 break;
2422         case F_GETLK:
2423 #ifdef F_GETLK64
2424         case F_GETLK64:
2425 #endif
2426                 flags = LDLM_FL_TEST_LOCK;
2427                 /* Save the old mode so that if the mode in the lock changes we
2428                  * can decrement the appropriate reader or writer refcount. */
2429                 file_lock->fl_type = einfo.ei_mode;
2430                 break;
2431         default:
2432                 CERROR("unknown fcntl lock command: %d\n", cmd);
2433                 LBUG();
2434         }
2435
2436         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2437                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2438                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2439
2440         rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, &einfo, res_id,
2441                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
2442         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
2443                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
2444 #ifdef HAVE_F_OP_FLOCK
2445         if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
2446             !(flags & LDLM_FL_TEST_LOCK))
2447                 posix_lock_file_wait(file, file_lock);
2448 #endif
2449
2450         RETURN(rc);
2451 }
2452
2453 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2454 {
2455         ENTRY;
2456
2457         RETURN(-ENOSYS);
2458 }
2459
2460 int ll_have_md_lock(struct inode *inode, __u64 bits)
2461 {
2462         struct lustre_handle lockh;
2463         struct ldlm_res_id res_id = { .name = {0} };
2464         struct obd_device *obddev;
2465         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2466         int flags;
2467         ENTRY;
2468
2469         if (!inode)
2470                RETURN(0);
2471
2472         obddev = ll_i2mdcexp(inode)->exp_obd;
2473         res_id.name[0] = inode->i_ino;
2474         res_id.name[1] = inode->i_generation;
2475
2476         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
2477
2478         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2479         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
2480                             &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
2481                 RETURN(1);
2482         }
2483
2484         RETURN(0);
2485 }
2486
2487 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2488         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2489                               * and return success */
2490                 inode->i_nlink = 0;
2491                 /* This path cannot be hit for regular files unless in
2492                  * case of obscure races, so no need to to validate
2493                  * size. */
2494                 if (!S_ISREG(inode->i_mode) &&
2495                     !S_ISDIR(inode->i_mode))
2496                         return 0;
2497         }
2498
2499         if (rc) {
2500                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2501                 return -abs(rc);
2502
2503         }
2504
2505         return 0;
2506 }
2507
2508 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
2509 {
2510         struct inode *inode = dentry->d_inode;
2511         struct ptlrpc_request *req = NULL;
2512         struct obd_export *exp;
2513         int rc;
2514         ENTRY;
2515
2516         if (!inode) {
2517                 CERROR("REPORT THIS LINE TO PETER\n");
2518                 RETURN(0);
2519         }
2520         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2521                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2522 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
2523         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
2524 #endif
2525
2526         exp = ll_i2mdcexp(inode);
2527
2528         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2529                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2530                 struct mdc_op_data op_data;
2531
2532                 /* Call getattr by fid, so do not provide name at all. */
2533                 ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
2534                                        dentry->d_inode, NULL, 0, 0, NULL);
2535                 rc = mdc_intent_lock(exp, &op_data, NULL, 0,
2536                                      /* we are not interested in name
2537                                         based lookup */
2538                                      &oit, 0, &req,
2539                                      ll_mdc_blocking_ast, 0);
2540                 if (rc < 0) {
2541                         rc = ll_inode_revalidate_fini(inode, rc);
2542                         GOTO (out, rc);
2543                 }
2544                 
2545                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
2546                 if (rc != 0) {
2547                         ll_intent_release(&oit);
2548                         GOTO(out, rc);
2549                 }
2550
2551                 /* Unlinked? Unhash dentry, so it is not picked up later by
2552                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2553                    here to preserve get_cwd functionality on 2.6.
2554                    Bug 10503 */
2555                 if (!dentry->d_inode->i_nlink) {
2556                         spin_lock(&dcache_lock);
2557                         ll_drop_dentry(dentry);
2558                         spin_unlock(&dcache_lock);
2559                 }
2560
2561                 ll_lookup_finish_locks(&oit, dentry);
2562         } else if (!ll_have_md_lock(dentry->d_inode,
2563                                   MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
2564                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2565                 struct ll_fid fid;
2566                 obd_valid valid = OBD_MD_FLGETATTR;
2567                 int ealen = 0;
2568
2569                 if (S_ISREG(inode->i_mode)) {
2570                         rc = ll_get_max_mdsize(sbi, &ealen);
2571                         if (rc) 
2572                                 RETURN(rc); 
2573                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2574                 }
2575                 ll_inode2fid(&fid, inode);
2576                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
2577                 if (rc) {
2578                         rc = ll_inode_revalidate_fini(inode, rc);
2579                         RETURN(rc);
2580                 }
2581
2582                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
2583                                    NULL);
2584                 if (rc)
2585                         GOTO(out, rc);
2586         }
2587
2588         /* if object not yet allocated, don't validate size */
2589         if (ll_i2info(inode)->lli_smd == NULL) 
2590                 GOTO(out, rc = 0);
2591
2592         /* ll_glimpse_size will prefer locally cached writes if they extend
2593          * the file */
2594         rc = ll_glimpse_size(inode, 0);
2595
2596 out:
2597         ptlrpc_req_finished(req);
2598         RETURN(rc);
2599 }
2600
2601 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2602 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2603                   struct lookup_intent *it, struct kstat *stat)
2604 {
2605         struct inode *inode = de->d_inode;
2606         int res = 0;
2607
2608         res = ll_inode_revalidate_it(de, it);
2609         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
2610
2611         if (res)
2612                 return res;
2613
2614         stat->dev = inode->i_sb->s_dev;
2615         stat->ino = inode->i_ino;
2616         stat->mode = inode->i_mode;
2617         stat->nlink = inode->i_nlink;
2618         stat->uid = inode->i_uid;
2619         stat->gid = inode->i_gid;
2620         stat->rdev = kdev_t_to_nr(inode->i_rdev);
2621         stat->atime = inode->i_atime;
2622         stat->mtime = inode->i_mtime;
2623         stat->ctime = inode->i_ctime;
2624 #ifdef HAVE_INODE_BLKSIZE
2625         stat->blksize = inode->i_blksize;
2626 #else
2627         stat->blksize = 1<<inode->i_blkbits;
2628 #endif
2629
2630         ll_inode_size_lock(inode, 0);
2631         stat->size = i_size_read(inode);
2632         stat->blocks = inode->i_blocks;
2633         ll_inode_size_unlock(inode, 0);
2634
2635         return 0;
2636 }
2637 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2638 {
2639         struct lookup_intent it = { .it_op = IT_GETATTR };
2640
2641         return ll_getattr_it(mnt, de, &it, stat);
2642 }
2643 #endif
2644
2645 static
2646 int lustre_check_acl(struct inode *inode, int mask)
2647 {
2648 #ifdef CONFIG_FS_POSIX_ACL
2649         struct ll_inode_info *lli = ll_i2info(inode);
2650         struct posix_acl *acl;
2651         int rc;
2652         ENTRY;
2653
2654         spin_lock(&lli->lli_lock);
2655         acl = posix_acl_dup(lli->lli_posix_acl);
2656         spin_unlock(&lli->lli_lock);
2657
2658         if (!acl)
2659                 RETURN(-EAGAIN);
2660
2661         rc = posix_acl_permission(inode, acl, mask);
2662         posix_acl_release(acl);
2663
2664         RETURN(rc);
2665 #else
2666         return -EAGAIN;
2667 #endif
2668 }
2669
2670 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
2671 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2672 {
2673         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2674                inode->i_ino, inode->i_generation, inode, mask);
2675
2676         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2677         return generic_permission(inode, mask, lustre_check_acl);
2678 }
2679 #else
2680 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
2681 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2682 #else
2683 int ll_inode_permission(struct inode *inode, int mask)
2684 #endif
2685 {
2686         int mode = inode->i_mode;
2687         int rc;
2688
2689         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
2690                inode->i_ino, inode->i_generation, inode, mask);
2691         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2692
2693         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
2694             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
2695                 return -EROFS;
2696         if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
2697                 return -EACCES;
2698         if (current->fsuid == inode->i_uid) {
2699                 mode >>= 6;
2700         } else if (1) {
2701                 if (((mode >> 3) & mask & S_IRWXO) != mask)
2702                         goto check_groups;
2703                 rc = lustre_check_acl(inode, mask);
2704                 if (rc == -EAGAIN)
2705                         goto check_groups;
2706                 if (rc == -EACCES)
2707                         goto check_capabilities;
2708                 return rc;
2709         } else {
2710 check_groups:
2711                 if (in_group_p(inode->i_gid))
2712                         mode >>= 3;
2713         }
2714         if ((mode & mask & S_IRWXO) == mask)
2715                 return 0;
2716
2717 check_capabilities:
2718         if (!(mask & MAY_EXEC) ||
2719             (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
2720                 if (capable(CAP_DAC_OVERRIDE))
2721                         return 0;
2722
2723         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
2724             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
2725                 return 0;
2726
2727         return -EACCES;
2728 }
2729 #endif
2730
2731 /* -o localflock - only provides locally consistent flock locks */
2732 struct file_operations ll_file_operations = {
2733         .read           = ll_file_read,
2734         .write          = ll_file_write,
2735         .ioctl          = ll_file_ioctl,
2736         .open           = ll_file_open,
2737         .release        = ll_file_release,
2738         .mmap           = ll_file_mmap,
2739         .llseek         = ll_file_seek,
2740 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2741         .sendfile       = ll_file_sendfile,
2742 #endif
2743         .fsync          = ll_fsync,
2744 };
2745
2746 struct file_operations ll_file_operations_flock = {
2747         .read           = ll_file_read,
2748         .write          = ll_file_write,
2749         .ioctl          = ll_file_ioctl,
2750         .open           = ll_file_open,
2751         .release        = ll_file_release,
2752         .mmap           = ll_file_mmap,
2753         .llseek         = ll_file_seek,
2754 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2755         .sendfile       = ll_file_sendfile,
2756 #endif
2757         .fsync          = ll_fsync,
2758 #ifdef HAVE_F_OP_FLOCK
2759         .flock          = ll_file_flock,
2760 #endif
2761         .lock           = ll_file_flock
2762 };
2763
2764 /* These are for -o noflock - to return ENOSYS on flock calls */
2765 struct file_operations ll_file_operations_noflock = {
2766         .read           = ll_file_read,
2767         .write          = ll_file_write,
2768         .ioctl          = ll_file_ioctl,
2769         .open           = ll_file_open,
2770         .release        = ll_file_release,
2771         .mmap           = ll_file_mmap,
2772         .llseek         = ll_file_seek,
2773 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2774         .sendfile       = ll_file_sendfile,
2775 #endif
2776         .fsync          = ll_fsync,
2777 #ifdef HAVE_F_OP_FLOCK
2778         .flock          = ll_file_noflock,
2779 #endif
2780         .lock           = ll_file_noflock
2781 };
2782
2783 struct inode_operations ll_file_inode_operations = {
2784 #ifdef HAVE_VFS_INTENT_PATCHES
2785         .setattr_raw    = ll_setattr_raw,
2786 #endif
2787         .setattr        = ll_setattr,
2788         .truncate       = ll_truncate,
2789 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
2790         .getattr        = ll_getattr,
2791 #else
2792         .revalidate_it  = ll_inode_revalidate_it,
2793 #endif
2794         .permission     = ll_inode_permission,
2795         .setxattr       = ll_setxattr,
2796         .getxattr       = ll_getxattr,
2797         .listxattr      = ll_listxattr,
2798         .removexattr    = ll_removexattr,
2799 };
2800
2801 /* dynamic ioctl number support routins */
2802 static struct llioc_ctl_data {
2803         struct rw_semaphore ioc_sem;
2804         struct list_head    ioc_head;
2805 } llioc = { 
2806         __RWSEM_INITIALIZER(llioc.ioc_sem), 
2807         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2808 };
2809
2810
2811 struct llioc_data {
2812         struct list_head        iocd_list;
2813         unsigned int            iocd_size;
2814         llioc_callback_t        iocd_cb;
2815         unsigned int            iocd_count;
2816         unsigned int            iocd_cmd[0];
2817 };
2818
2819 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2820 {
2821         unsigned int size;
2822         struct llioc_data *in_data = NULL;
2823         ENTRY;
2824
2825         if (cb == NULL || cmd == NULL ||
2826             count > LLIOC_MAX_CMD || count < 0)
2827                 RETURN(NULL);
2828
2829         size = sizeof(*in_data) + count * sizeof(unsigned int);
2830         OBD_ALLOC(in_data, size);
2831         if (in_data == NULL)
2832                 RETURN(NULL);
2833
2834         memset(in_data, 0, sizeof(*in_data));
2835         in_data->iocd_size = size;
2836         in_data->iocd_cb = cb;
2837         in_data->iocd_count = count;
2838         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2839
2840         down_write(&llioc.ioc_sem);
2841         list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2842         up_write(&llioc.ioc_sem);
2843
2844         RETURN(in_data);
2845 }
2846
2847 void ll_iocontrol_unregister(void *magic)
2848 {
2849         struct llioc_data *tmp;
2850
2851         if (magic == NULL)
2852                 return;
2853
2854         down_write(&llioc.ioc_sem);
2855         list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2856                 if (tmp == magic) {
2857                         unsigned int size = tmp->iocd_size;
2858
2859                         list_del(&tmp->iocd_list);
2860                         up_write(&llioc.ioc_sem);
2861
2862                         OBD_FREE(tmp, size);
2863                         return;
2864                 }
2865         }
2866         up_write(&llioc.ioc_sem);
2867
2868         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2869 }
2870
2871 EXPORT_SYMBOL(ll_iocontrol_register);
2872 EXPORT_SYMBOL(ll_iocontrol_unregister);
2873
2874 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
2875                         unsigned int cmd, unsigned long arg, int *rcp)
2876 {
2877         enum llioc_iter ret = LLIOC_CONT;
2878         struct llioc_data *data;
2879         int rc = -EINVAL, i;
2880
2881         down_read(&llioc.ioc_sem);
2882         list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2883                 for (i = 0; i < data->iocd_count; i++) {
2884                         if (cmd != data->iocd_cmd[i]) 
2885                                 continue;
2886
2887                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2888                         break;
2889                 }
2890
2891                 if (ret == LLIOC_STOP)
2892                         break;
2893         }
2894         up_read(&llioc.ioc_sem);
2895
2896         if (rcp)
2897                 *rcp = rc;
2898         return ret;
2899 }