Whamcloud - gitweb
072a53c06c1b5530ea58d9d6fd7a9e8b13e20596
[fs/lustre-release.git] / lustre / llite / file.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/file.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44 #include <lustre_dlm.h>
45 #include <lustre_lite.h>
46 #include <linux/pagemap.h>
47 #include <linux/file.h>
48 #include "llite_internal.h"
49 #include <lustre/ll_fiemap.h>
50
51 #include "cl_object.h"
52
53 struct ll_file_data *ll_file_data_get(void)
54 {
55         struct ll_file_data *fd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
58         fd->fd_write_failed = false;
59         return fd;
60 }
61
62 static void ll_file_data_put(struct ll_file_data *fd)
63 {
64         if (fd != NULL)
65                 OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66 }
67
68 void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69                           struct lustre_handle *fh)
70 {
71         op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72         op_data->op_attr.ia_mode = inode->i_mode;
73         op_data->op_attr.ia_atime = inode->i_atime;
74         op_data->op_attr.ia_mtime = inode->i_mtime;
75         op_data->op_attr.ia_ctime = inode->i_ctime;
76         op_data->op_attr.ia_size = i_size_read(inode);
77         op_data->op_attr_blocks = inode->i_blocks;
78         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79                                         ll_inode_to_ext_flags(inode->i_flags);
80         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81         if (fh)
82                 op_data->op_handle = *fh;
83         op_data->op_capa1 = ll_mdscapa_get(inode);
84 }
85
86 /**
87  * Closes the IO epoch and packs all the attributes into @op_data for
88  * the CLOSE rpc.
89  */
90 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
91                              struct obd_client_handle *och)
92 {
93         ENTRY;
94
95         op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
96                                  ATTR_MTIME_SET | ATTR_CTIME_SET;
97
98         if (!(och->och_flags & FMODE_WRITE))
99                 goto out;
100
101         if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
102                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
103         else
104                 ll_ioepoch_close(inode, op_data, &och, 0);
105
106 out:
107         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
108         ll_prep_md_op_data(op_data, inode, NULL, NULL,
109                            0, 0, LUSTRE_OPC_ANY, NULL);
110         EXIT;
111 }
112
113 static int ll_close_inode_openhandle(struct obd_export *md_exp,
114                                      struct inode *inode,
115                                      struct obd_client_handle *och)
116 {
117         struct obd_export *exp = ll_i2mdexp(inode);
118         struct md_op_data *op_data;
119         struct ptlrpc_request *req = NULL;
120         struct obd_device *obd = class_exp2obd(exp);
121         int epoch_close = 1;
122         int rc;
123         ENTRY;
124
125         if (obd == NULL) {
126                 /*
127                  * XXX: in case of LMV, is this correct to access
128                  * ->exp_handle?
129                  */
130                 CERROR("Invalid MDC connection handle "LPX64"\n",
131                        ll_i2mdexp(inode)->exp_handle.h_cookie);
132                 GOTO(out, rc = 0);
133         }
134
135         OBD_ALLOC_PTR(op_data);
136         if (op_data == NULL)
137                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
138
139         ll_prepare_close(inode, op_data, och);
140         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
141         rc = md_close(md_exp, op_data, och->och_mod, &req);
142         if (rc == -EAGAIN) {
143                 /* This close must have the epoch closed. */
144                 LASSERT(epoch_close);
145                 /* MDS has instructed us to obtain Size-on-MDS attribute from
146                  * OSTs and send setattr to back to MDS. */
147                 rc = ll_som_update(inode, op_data);
148                 if (rc) {
149                         CERROR("inode %lu mdc Size-on-MDS update failed: "
150                                "rc = %d\n", inode->i_ino, rc);
151                         rc = 0;
152                 }
153         } else if (rc) {
154                 CERROR("inode %lu mdc close failed: rc = %d\n",
155                        inode->i_ino, rc);
156         }
157         ll_finish_md_op_data(op_data);
158
159         if (rc == 0) {
160                 rc = ll_objects_destroy(req, inode);
161                 if (rc)
162                         CERROR("inode %lu ll_objects destroy: rc = %d\n",
163                                inode->i_ino, rc);
164         }
165
166         EXIT;
167 out:
168
169         if (exp_connect_som(exp) && !epoch_close &&
170             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
171                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
172         } else {
173                 md_clear_open_replay_data(md_exp, och);
174                 /* Free @och if it is not waiting for DONE_WRITING. */
175                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
176                 OBD_FREE_PTR(och);
177         }
178         if (req) /* This is close request */
179                 ptlrpc_req_finished(req);
180         return rc;
181 }
182
183 int ll_md_real_close(struct inode *inode, int flags)
184 {
185         struct ll_inode_info *lli = ll_i2info(inode);
186         struct obd_client_handle **och_p;
187         struct obd_client_handle *och;
188         __u64 *och_usecount;
189         int rc = 0;
190         ENTRY;
191
192         if (flags & FMODE_WRITE) {
193                 och_p = &lli->lli_mds_write_och;
194                 och_usecount = &lli->lli_open_fd_write_count;
195         } else if (flags & FMODE_EXEC) {
196                 och_p = &lli->lli_mds_exec_och;
197                 och_usecount = &lli->lli_open_fd_exec_count;
198         } else {
199                 LASSERT(flags & FMODE_READ);
200                 och_p = &lli->lli_mds_read_och;
201                 och_usecount = &lli->lli_open_fd_read_count;
202         }
203
204         mutex_lock(&lli->lli_och_mutex);
205         if (*och_usecount) { /* There are still users of this handle, so
206                                 skip freeing it. */
207                 mutex_unlock(&lli->lli_och_mutex);
208                 RETURN(0);
209         }
210         och=*och_p;
211         *och_p = NULL;
212         mutex_unlock(&lli->lli_och_mutex);
213
214         if (och) { /* There might be a race and somebody have freed this och
215                       already */
216                 rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
217                                                inode, och);
218         }
219
220         RETURN(rc);
221 }
222
223 int ll_md_close(struct obd_export *md_exp, struct inode *inode,
224                 struct file *file)
225 {
226         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
227         struct ll_inode_info *lli = ll_i2info(inode);
228         int rc = 0;
229         ENTRY;
230
231         /* clear group lock, if present */
232         if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
233                 ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
234
235         /* Let's see if we have good enough OPEN lock on the file and if
236            we can skip talking to MDS */
237         if (file->f_dentry->d_inode) { /* Can this ever be false? */
238                 int lockmode;
239                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
240                 struct lustre_handle lockh;
241                 struct inode *inode = file->f_dentry->d_inode;
242                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
243
244                 mutex_lock(&lli->lli_och_mutex);
245                 if (fd->fd_omode & FMODE_WRITE) {
246                         lockmode = LCK_CW;
247                         LASSERT(lli->lli_open_fd_write_count);
248                         lli->lli_open_fd_write_count--;
249                 } else if (fd->fd_omode & FMODE_EXEC) {
250                         lockmode = LCK_PR;
251                         LASSERT(lli->lli_open_fd_exec_count);
252                         lli->lli_open_fd_exec_count--;
253                 } else {
254                         lockmode = LCK_CR;
255                         LASSERT(lli->lli_open_fd_read_count);
256                         lli->lli_open_fd_read_count--;
257                 }
258                 mutex_unlock(&lli->lli_och_mutex);
259
260                 if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
261                                    LDLM_IBITS, &policy, lockmode,
262                                    &lockh)) {
263                         rc = ll_md_real_close(file->f_dentry->d_inode,
264                                               fd->fd_omode);
265                 }
266         } else {
267                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
268                        file, file->f_dentry, file->f_dentry->d_name.name);
269         }
270
271         LUSTRE_FPRIVATE(file) = NULL;
272         ll_file_data_put(fd);
273         ll_capa_close(inode);
274
275         RETURN(rc);
276 }
277
278 /* While this returns an error code, fput() the caller does not, so we need
279  * to make every effort to clean up all of our state here.  Also, applications
280  * rarely check close errors and even if an error is returned they will not
281  * re-try the close call.
282  */
283 int ll_file_release(struct inode *inode, struct file *file)
284 {
285         struct ll_file_data *fd;
286         struct ll_sb_info *sbi = ll_i2sbi(inode);
287         struct ll_inode_info *lli = ll_i2info(inode);
288         int rc;
289         ENTRY;
290
291         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
292                inode->i_generation, inode);
293
294 #ifdef CONFIG_FS_POSIX_ACL
295         if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
296             inode == inode->i_sb->s_root->d_inode) {
297                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
298
299                 LASSERT(fd != NULL);
300                 if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
301                         fd->fd_flags &= ~LL_FILE_RMTACL;
302                         rct_del(&sbi->ll_rct, cfs_curproc_pid());
303                         et_search_free(&sbi->ll_et, cfs_curproc_pid());
304                 }
305         }
306 #endif
307
308         if (inode->i_sb->s_root != file->f_dentry)
309                 ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
310         fd = LUSTRE_FPRIVATE(file);
311         LASSERT(fd != NULL);
312
313         /* The last ref on @file, maybe not the the owner pid of statahead.
314          * Different processes can open the same dir, "ll_opendir_key" means:
315          * it is me that should stop the statahead thread. */
316         if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
317             lli->lli_opendir_pid != 0)
318                 ll_stop_statahead(inode, lli->lli_opendir_key);
319
320         if (inode->i_sb->s_root == file->f_dentry) {
321                 LUSTRE_FPRIVATE(file) = NULL;
322                 ll_file_data_put(fd);
323                 RETURN(0);
324         }
325
326         if (!S_ISDIR(inode->i_mode)) {
327                 lov_read_and_clear_async_rc(lli->lli_clob);
328                 lli->lli_async_rc = 0;
329         }
330
331         rc = ll_md_close(sbi->ll_md_exp, inode, file);
332
333         if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
334                 libcfs_debug_dumplog();
335
336         RETURN(rc);
337 }
338
339 static int ll_intent_file_open(struct file *file, void *lmm,
340                                int lmmsize, struct lookup_intent *itp)
341 {
342         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
343         struct dentry *parent = file->f_dentry->d_parent;
344         const char *name = file->f_dentry->d_name.name;
345         const int len = file->f_dentry->d_name.len;
346         struct md_op_data *op_data;
347         struct ptlrpc_request *req;
348         __u32 opc = LUSTRE_OPC_ANY;
349         int rc;
350         ENTRY;
351
352         if (!parent)
353                 RETURN(-ENOENT);
354
355         /* Usually we come here only for NFSD, and we want open lock.
356            But we can also get here with pre 2.6.15 patchless kernels, and in
357            that case that lock is also ok */
358         /* We can also get here if there was cached open handle in revalidate_it
359          * but it disappeared while we were getting from there to ll_file_open.
360          * But this means this file was closed and immediatelly opened which
361          * makes a good candidate for using OPEN lock */
362         /* If lmmsize & lmm are not 0, we are just setting stripe info
363          * parameters. No need for the open lock */
364         if (lmm == NULL && lmmsize == 0) {
365                 itp->it_flags |= MDS_OPEN_LOCK;
366                 if (itp->it_flags & FMODE_WRITE)
367                         opc = LUSTRE_OPC_CREATE;
368         }
369
370         op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
371                                       file->f_dentry->d_inode, name, len,
372                                       O_RDWR, opc, NULL);
373         if (IS_ERR(op_data))
374                 RETURN(PTR_ERR(op_data));
375
376         itp->it_flags |= MDS_OPEN_BY_FID;
377         rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
378                             0 /*unused */, &req, ll_md_blocking_ast, 0);
379         ll_finish_md_op_data(op_data);
380         if (rc == -ESTALE) {
381                 /* reason for keep own exit path - don`t flood log
382                 * with messages with -ESTALE errors.
383                 */
384                 if (!it_disposition(itp, DISP_OPEN_OPEN) ||
385                      it_open_error(DISP_OPEN_OPEN, itp))
386                         GOTO(out, rc);
387                 ll_release_openhandle(file->f_dentry, itp);
388                 GOTO(out, rc);
389         }
390
391         if (it_disposition(itp, DISP_LOOKUP_NEG))
392                 GOTO(out, rc = -ENOENT);
393
394         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
395                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
396                 CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
397                 GOTO(out, rc);
398         }
399
400         rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
401         if (!rc && itp->d.lustre.it_lock_mode)
402                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
403                                  itp, NULL);
404
405 out:
406         ptlrpc_req_finished(itp->d.lustre.it_data);
407         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
408         ll_intent_drop_lock(itp);
409
410         RETURN(rc);
411 }
412
413 /**
414  * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
415  * not believe attributes if a few ioepoch holders exist. Attributes for
416  * previous ioepoch if new one is opened are also skipped by MDS.
417  */
418 void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
419 {
420         if (ioepoch && lli->lli_ioepoch != ioepoch) {
421                 lli->lli_ioepoch = ioepoch;
422                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
423                        ioepoch, PFID(&lli->lli_fid));
424         }
425 }
426
427 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
428                        struct lookup_intent *it, struct obd_client_handle *och)
429 {
430         struct ptlrpc_request *req = it->d.lustre.it_data;
431         struct mdt_body *body;
432
433         LASSERT(och);
434
435         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
436         LASSERT(body != NULL);                      /* reply already checked out */
437
438         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
439         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
440         och->och_fid = lli->lli_fid;
441         och->och_flags = it->it_flags;
442         ll_ioepoch_open(lli, body->ioepoch);
443
444         return md_set_open_replay_data(md_exp, och, req);
445 }
446
447 int ll_local_open(struct file *file, struct lookup_intent *it,
448                   struct ll_file_data *fd, struct obd_client_handle *och)
449 {
450         struct inode *inode = file->f_dentry->d_inode;
451         struct ll_inode_info *lli = ll_i2info(inode);
452         ENTRY;
453
454         LASSERT(!LUSTRE_FPRIVATE(file));
455
456         LASSERT(fd != NULL);
457
458         if (och) {
459                 struct ptlrpc_request *req = it->d.lustre.it_data;
460                 struct mdt_body *body;
461                 int rc;
462
463                 rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
464                 if (rc)
465                         RETURN(rc);
466
467                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
468                 if ((it->it_flags & FMODE_WRITE) &&
469                     (body->valid & OBD_MD_FLSIZE))
470                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
471                                lli->lli_ioepoch, PFID(&lli->lli_fid));
472         }
473
474         LUSTRE_FPRIVATE(file) = fd;
475         ll_readahead_init(inode, &fd->fd_ras);
476         fd->fd_omode = it->it_flags;
477         RETURN(0);
478 }
479
480 /* Open a file, and (for the very first open) create objects on the OSTs at
481  * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
482  * creation or open until ll_lov_setstripe() ioctl is called.
483  *
484  * If we already have the stripe MD locally then we don't request it in
485  * md_open(), by passing a lmm_size = 0.
486  *
487  * It is up to the application to ensure no other processes open this file
488  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
489  * used.  We might be able to avoid races of that sort by getting lli_open_sem
490  * before returning in the O_LOV_DELAY_CREATE case and dropping it here
491  * or in ll_file_release(), but I'm not sure that is desirable/necessary.
492  */
493 int ll_file_open(struct inode *inode, struct file *file)
494 {
495         struct ll_inode_info *lli = ll_i2info(inode);
496         struct lookup_intent *it, oit = { .it_op = IT_OPEN,
497                                           .it_flags = file->f_flags };
498         struct obd_client_handle **och_p = NULL;
499         __u64 *och_usecount = NULL;
500         struct ll_file_data *fd;
501         int rc = 0, opendir_set = 0;
502         ENTRY;
503
504         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
505                inode->i_generation, inode, file->f_flags);
506
507         it = file->private_data; /* XXX: compat macro */
508         file->private_data = NULL; /* prevent ll_local_open assertion */
509
510         fd = ll_file_data_get();
511         if (fd == NULL)
512                 GOTO(out_och_free, rc = -ENOMEM);
513
514         fd->fd_file = file;
515         if (S_ISDIR(inode->i_mode)) {
516                 spin_lock(&lli->lli_sa_lock);
517                 if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
518                     lli->lli_opendir_pid == 0) {
519                         lli->lli_opendir_key = fd;
520                         lli->lli_opendir_pid = cfs_curproc_pid();
521                         opendir_set = 1;
522                 }
523                 spin_unlock(&lli->lli_sa_lock);
524         }
525
526         if (inode->i_sb->s_root == file->f_dentry) {
527                 LUSTRE_FPRIVATE(file) = fd;
528                 RETURN(0);
529         }
530
531         if (!it || !it->d.lustre.it_disposition) {
532                 /* Convert f_flags into access mode. We cannot use file->f_mode,
533                  * because everything but O_ACCMODE mask was stripped from
534                  * there */
535                 if ((oit.it_flags + 1) & O_ACCMODE)
536                         oit.it_flags++;
537                 if (file->f_flags & O_TRUNC)
538                         oit.it_flags |= FMODE_WRITE;
539
540                 /* kernel only call f_op->open in dentry_open.  filp_open calls
541                  * dentry_open after call to open_namei that checks permissions.
542                  * Only nfsd_open call dentry_open directly without checking
543                  * permissions and because of that this code below is safe. */
544                 if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
545                         oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
546
547                 /* We do not want O_EXCL here, presumably we opened the file
548                  * already? XXX - NFS implications? */
549                 oit.it_flags &= ~O_EXCL;
550
551                 /* bug20584, if "it_flags" contains O_CREAT, the file will be
552                  * created if necessary, then "IT_CREAT" should be set to keep
553                  * consistent with it */
554                 if (oit.it_flags & O_CREAT)
555                         oit.it_op |= IT_CREAT;
556
557                 it = &oit;
558         }
559
560 restart:
561         /* Let's see if we have file open on MDS already. */
562         if (it->it_flags & FMODE_WRITE) {
563                 och_p = &lli->lli_mds_write_och;
564                 och_usecount = &lli->lli_open_fd_write_count;
565         } else if (it->it_flags & FMODE_EXEC) {
566                 och_p = &lli->lli_mds_exec_och;
567                 och_usecount = &lli->lli_open_fd_exec_count;
568          } else {
569                 och_p = &lli->lli_mds_read_och;
570                 och_usecount = &lli->lli_open_fd_read_count;
571         }
572
573         mutex_lock(&lli->lli_och_mutex);
574         if (*och_p) { /* Open handle is present */
575                 if (it_disposition(it, DISP_OPEN_OPEN)) {
576                         /* Well, there's extra open request that we do not need,
577                            let's close it somehow. This will decref request. */
578                         rc = it_open_error(DISP_OPEN_OPEN, it);
579                         if (rc) {
580                                 mutex_unlock(&lli->lli_och_mutex);
581                                 GOTO(out_openerr, rc);
582                         }
583
584                         ll_release_openhandle(file->f_dentry, it);
585                 }
586                 (*och_usecount)++;
587
588                 rc = ll_local_open(file, it, fd, NULL);
589                 if (rc) {
590                         (*och_usecount)--;
591                         mutex_unlock(&lli->lli_och_mutex);
592                         GOTO(out_openerr, rc);
593                 }
594         } else {
595                 LASSERT(*och_usecount == 0);
596                 if (!it->d.lustre.it_disposition) {
597                         /* We cannot just request lock handle now, new ELC code
598                            means that one of other OPEN locks for this file
599                            could be cancelled, and since blocking ast handler
600                            would attempt to grab och_mutex as well, that would
601                            result in a deadlock */
602                         mutex_unlock(&lli->lli_och_mutex);
603                         it->it_create_mode |= M_CHECK_STALE;
604                         rc = ll_intent_file_open(file, NULL, 0, it);
605                         it->it_create_mode &= ~M_CHECK_STALE;
606                         if (rc)
607                                 GOTO(out_openerr, rc);
608
609                         goto restart;
610                 }
611                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
612                 if (!*och_p)
613                         GOTO(out_och_free, rc = -ENOMEM);
614
615                 (*och_usecount)++;
616
617                 /* md_intent_lock() didn't get a request ref if there was an
618                  * open error, so don't do cleanup on the request here
619                  * (bug 3430) */
620                 /* XXX (green): Should not we bail out on any error here, not
621                  * just open error? */
622                 rc = it_open_error(DISP_OPEN_OPEN, it);
623                 if (rc)
624                         GOTO(out_och_free, rc);
625
626                 LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
627
628                 rc = ll_local_open(file, it, fd, *och_p);
629                 if (rc)
630                         GOTO(out_och_free, rc);
631         }
632         mutex_unlock(&lli->lli_och_mutex);
633         fd = NULL;
634
635         /* Must do this outside lli_och_mutex lock to prevent deadlock where
636            different kind of OPEN lock for this same inode gets cancelled
637            by ldlm_cancel_lru */
638         if (!S_ISREG(inode->i_mode))
639                 GOTO(out_och_free, rc);
640
641         ll_capa_open(inode);
642
643         if (!lli->lli_has_smd) {
644                 if (file->f_flags & O_LOV_DELAY_CREATE ||
645                     !(file->f_mode & FMODE_WRITE)) {
646                         CDEBUG(D_INODE, "object creation was delayed\n");
647                         GOTO(out_och_free, rc);
648                 }
649         }
650         file->f_flags &= ~O_LOV_DELAY_CREATE;
651         GOTO(out_och_free, rc);
652
653 out_och_free:
654         if (rc) {
655                 if (och_p && *och_p) {
656                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
657                         *och_p = NULL; /* OBD_FREE writes some magic there */
658                         (*och_usecount)--;
659                 }
660                 mutex_unlock(&lli->lli_och_mutex);
661
662 out_openerr:
663                 if (opendir_set != 0)
664                         ll_stop_statahead(inode, lli->lli_opendir_key);
665                 if (fd != NULL)
666                         ll_file_data_put(fd);
667         } else {
668                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
669         }
670
671         if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
672                 ptlrpc_req_finished(it->d.lustre.it_data);
673                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
674         }
675
676         return rc;
677 }
678
679 /* Fills the obdo with the attributes for the lsm */
680 static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
681                           struct obd_capa *capa, struct obdo *obdo,
682                           __u64 ioepoch, int sync)
683 {
684         struct ptlrpc_request_set *set;
685         struct obd_info            oinfo = { { { 0 } } };
686         int                        rc;
687
688         ENTRY;
689
690         LASSERT(lsm != NULL);
691
692         oinfo.oi_md = lsm;
693         oinfo.oi_oa = obdo;
694         oinfo.oi_oa->o_id = lsm->lsm_object_id;
695         oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
696         oinfo.oi_oa->o_mode = S_IFREG;
697         oinfo.oi_oa->o_ioepoch = ioepoch;
698         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
699                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
700                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
701                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
702                                OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
703                                OBD_MD_FLDATAVERSION;
704         oinfo.oi_capa = capa;
705         if (sync) {
706                 oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
707                 oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
708         }
709
710         set = ptlrpc_prep_set();
711         if (set == NULL) {
712                 CERROR("can't allocate ptlrpc set\n");
713                 rc = -ENOMEM;
714         } else {
715                 rc = obd_getattr_async(exp, &oinfo, set);
716                 if (rc == 0)
717                         rc = ptlrpc_set_wait(set);
718                 ptlrpc_set_destroy(set);
719         }
720         if (rc == 0)
721                 oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
722                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
723                                          OBD_MD_FLCTIME | OBD_MD_FLSIZE |
724                                          OBD_MD_FLDATAVERSION);
725         RETURN(rc);
726 }
727
728 /**
729   * Performs the getattr on the inode and updates its fields.
730   * If @sync != 0, perform the getattr under the server-side lock.
731   */
732 int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
733                      __u64 ioepoch, int sync)
734 {
735         struct obd_capa      *capa = ll_mdscapa_get(inode);
736         struct lov_stripe_md *lsm;
737         int rc;
738         ENTRY;
739
740         lsm = ccc_inode_lsm_get(inode);
741         rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
742                             capa, obdo, ioepoch, sync);
743         capa_put(capa);
744         if (rc == 0) {
745                 obdo_refresh_inode(inode, obdo, obdo->o_valid);
746                 CDEBUG(D_INODE,
747                        "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
748                        lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
749                        (unsigned long long)inode->i_blocks,
750                        (unsigned long)ll_inode_blksize(inode));
751         }
752         ccc_inode_lsm_put(inode, lsm);
753         RETURN(rc);
754 }
755
756 int ll_merge_lvb(struct inode *inode)
757 {
758         struct ll_inode_info *lli = ll_i2info(inode);
759         struct ll_sb_info *sbi = ll_i2sbi(inode);
760         struct lov_stripe_md *lsm;
761         struct ost_lvb lvb;
762         int rc = 0;
763
764         ENTRY;
765
766         lsm = ccc_inode_lsm_get(inode);
767         ll_inode_size_lock(inode);
768         inode_init_lvb(inode, &lvb);
769
770         /* merge timestamps the most resently obtained from mds with
771            timestamps obtained from osts */
772         lvb.lvb_atime = lli->lli_lvb.lvb_atime;
773         lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
774         lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
775         if (lsm != NULL) {
776                 rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
777                 cl_isize_write_nolock(inode, lvb.lvb_size);
778
779                 CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
780                                 PFID(&lli->lli_fid), lvb.lvb_size);
781                 inode->i_blocks = lvb.lvb_blocks;
782         }
783         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
784         LTIME_S(inode->i_atime) = lvb.lvb_atime;
785         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
786         ll_inode_size_unlock(inode);
787         ccc_inode_lsm_put(inode, lsm);
788
789         RETURN(rc);
790 }
791
792 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
793                      lstat_t *st)
794 {
795         struct obdo obdo = { 0 };
796         int rc;
797
798         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
799         if (rc == 0) {
800                 st->st_size   = obdo.o_size;
801                 st->st_blocks = obdo.o_blocks;
802                 st->st_mtime  = obdo.o_mtime;
803                 st->st_atime  = obdo.o_atime;
804                 st->st_ctime  = obdo.o_ctime;
805         }
806         return rc;
807 }
808
809 void ll_io_init(struct cl_io *io, const struct file *file, int write)
810 {
811         struct inode *inode = file->f_dentry->d_inode;
812
813         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
814         if (write) {
815                 io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
816                 io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode);
817         }
818         io->ci_obj     = ll_i2info(inode)->lli_clob;
819         io->ci_lockreq = CILR_MAYBE;
820         if (ll_file_nolock(file)) {
821                 io->ci_lockreq = CILR_NEVER;
822                 io->ci_no_srvlock = 1;
823         } else if (file->f_flags & O_APPEND) {
824                 io->ci_lockreq = CILR_MANDATORY;
825         }
826 }
827
828 static ssize_t
829 ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
830                    struct file *file, enum cl_io_type iot,
831                    loff_t *ppos, size_t count)
832 {
833         struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
834         struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
835         struct cl_io         *io;
836         ssize_t               result;
837         ENTRY;
838
839         io = ccc_env_thread_io(env);
840         ll_io_init(io, file, iot == CIT_WRITE);
841
842         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
843                 struct vvp_io *vio = vvp_env_io(env);
844                 struct ccc_io *cio = ccc_env_io(env);
845                 int write_mutex_locked = 0;
846
847                 cio->cui_fd  = LUSTRE_FPRIVATE(file);
848                 vio->cui_io_subtype = args->via_io_subtype;
849
850                 switch (vio->cui_io_subtype) {
851                 case IO_NORMAL:
852                         cio->cui_iov = args->u.normal.via_iov;
853                         cio->cui_nrsegs = args->u.normal.via_nrsegs;
854                         cio->cui_tot_nrsegs = cio->cui_nrsegs;
855 #ifndef HAVE_FILE_WRITEV
856                         cio->cui_iocb = args->u.normal.via_iocb;
857 #endif
858                         if ((iot == CIT_WRITE) &&
859                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
860                                 if (mutex_lock_interruptible(&lli->
861                                                                lli_write_mutex))
862                                         GOTO(out, result = -ERESTARTSYS);
863                                 write_mutex_locked = 1;
864                         } else if (iot == CIT_READ) {
865                                 down_read(&lli->lli_trunc_sem);
866                         }
867                         break;
868                 case IO_SENDFILE:
869                         vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
870                         vio->u.sendfile.cui_target = args->u.sendfile.via_target;
871                         break;
872                 case IO_SPLICE:
873                         vio->u.splice.cui_pipe = args->u.splice.via_pipe;
874                         vio->u.splice.cui_flags = args->u.splice.via_flags;
875                         break;
876                 default:
877                         CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
878                         LBUG();
879                 }
880                 result = cl_io_loop(env, io);
881                 if (write_mutex_locked)
882                         mutex_unlock(&lli->lli_write_mutex);
883                 else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
884                         up_read(&lli->lli_trunc_sem);
885         } else {
886                 /* cl_io_rw_init() handled IO */
887                 result = io->ci_result;
888         }
889
890         if (io->ci_nob > 0) {
891                 result = io->ci_nob;
892                 *ppos = io->u.ci_wr.wr.crw_pos;
893         }
894         GOTO(out, result);
895 out:
896         cl_io_fini(env, io);
897
898         if (iot == CIT_READ) {
899                 if (result >= 0)
900                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
901                                            LPROC_LL_READ_BYTES, result);
902         } else if (iot == CIT_WRITE) {
903                 if (result >= 0) {
904                         ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
905                                            LPROC_LL_WRITE_BYTES, result);
906                         fd->fd_write_failed = false;
907                 } else {
908                         fd->fd_write_failed = true;
909                 }
910         }
911
912         return result;
913 }
914
915
916 /*
917  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
918  */
919 static int ll_file_get_iov_count(const struct iovec *iov,
920                                  unsigned long *nr_segs, size_t *count)
921 {
922         size_t cnt = 0;
923         unsigned long seg;
924
925         for (seg = 0; seg < *nr_segs; seg++) {
926                 const struct iovec *iv = &iov[seg];
927
928                 /*
929                  * If any segment has a negative length, or the cumulative
930                  * length ever wraps negative then return -EINVAL.
931                  */
932                 cnt += iv->iov_len;
933                 if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
934                         return -EINVAL;
935                 if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
936                         continue;
937                 if (seg == 0)
938                         return -EFAULT;
939                 *nr_segs = seg;
940                 cnt -= iv->iov_len;   /* This segment is no good */
941                 break;
942         }
943         *count = cnt;
944         return 0;
945 }
946
947 #ifdef HAVE_FILE_READV
948 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
949                               unsigned long nr_segs, loff_t *ppos)
950 {
951         struct lu_env      *env;
952         struct vvp_io_args *args;
953         size_t              count;
954         ssize_t             result;
955         int                 refcheck;
956         ENTRY;
957
958         result = ll_file_get_iov_count(iov, &nr_segs, &count);
959         if (result)
960                 RETURN(result);
961
962         env = cl_env_get(&refcheck);
963         if (IS_ERR(env))
964                 RETURN(PTR_ERR(env));
965
966         args = vvp_env_args(env, IO_NORMAL);
967         args->u.normal.via_iov = (struct iovec *)iov;
968         args->u.normal.via_nrsegs = nr_segs;
969
970         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
971         cl_env_put(env, &refcheck);
972         RETURN(result);
973 }
974
975 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
976                             loff_t *ppos)
977 {
978         struct lu_env *env;
979         struct iovec  *local_iov;
980         ssize_t        result;
981         int            refcheck;
982         ENTRY;
983
984         env = cl_env_get(&refcheck);
985         if (IS_ERR(env))
986                 RETURN(PTR_ERR(env));
987
988         local_iov = &vvp_env_info(env)->vti_local_iov;
989         local_iov->iov_base = (void __user *)buf;
990         local_iov->iov_len = count;
991         result = ll_file_readv(file, local_iov, 1, ppos);
992         cl_env_put(env, &refcheck);
993         RETURN(result);
994 }
995
996 #else
997 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
998                                 unsigned long nr_segs, loff_t pos)
999 {
1000         struct lu_env      *env;
1001         struct vvp_io_args *args;
1002         size_t              count;
1003         ssize_t             result;
1004         int                 refcheck;
1005         ENTRY;
1006
1007         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1008         if (result)
1009                 RETURN(result);
1010
1011         env = cl_env_get(&refcheck);
1012         if (IS_ERR(env))
1013                 RETURN(PTR_ERR(env));
1014
1015         args = vvp_env_args(env, IO_NORMAL);
1016         args->u.normal.via_iov = (struct iovec *)iov;
1017         args->u.normal.via_nrsegs = nr_segs;
1018         args->u.normal.via_iocb = iocb;
1019
1020         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1021                                     &iocb->ki_pos, count);
1022         cl_env_put(env, &refcheck);
1023         RETURN(result);
1024 }
1025
1026 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1027                             loff_t *ppos)
1028 {
1029         struct lu_env *env;
1030         struct iovec  *local_iov;
1031         struct kiocb  *kiocb;
1032         ssize_t        result;
1033         int            refcheck;
1034         ENTRY;
1035
1036         env = cl_env_get(&refcheck);
1037         if (IS_ERR(env))
1038                 RETURN(PTR_ERR(env));
1039
1040         local_iov = &vvp_env_info(env)->vti_local_iov;
1041         kiocb = &vvp_env_info(env)->vti_kiocb;
1042         local_iov->iov_base = (void __user *)buf;
1043         local_iov->iov_len = count;
1044         init_sync_kiocb(kiocb, file);
1045         kiocb->ki_pos = *ppos;
1046         kiocb->ki_left = count;
1047
1048         result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1049         *ppos = kiocb->ki_pos;
1050
1051         cl_env_put(env, &refcheck);
1052         RETURN(result);
1053 }
1054 #endif
1055
1056 /*
1057  * Write to a file (through the page cache).
1058  */
1059 #ifdef HAVE_FILE_WRITEV
1060 static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
1061                               unsigned long nr_segs, loff_t *ppos)
1062 {
1063         struct lu_env      *env;
1064         struct vvp_io_args *args;
1065         size_t              count;
1066         ssize_t             result;
1067         int                 refcheck;
1068         ENTRY;
1069
1070         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1071         if (result)
1072                 RETURN(result);
1073
1074         env = cl_env_get(&refcheck);
1075         if (IS_ERR(env))
1076                 RETURN(PTR_ERR(env));
1077
1078         args = vvp_env_args(env, IO_NORMAL);
1079         args->u.normal.via_iov = (struct iovec *)iov;
1080         args->u.normal.via_nrsegs = nr_segs;
1081
1082         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
1083         cl_env_put(env, &refcheck);
1084         RETURN(result);
1085 }
1086
1087 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1088                              loff_t *ppos)
1089 {
1090         struct lu_env    *env;
1091         struct iovec     *local_iov;
1092         ssize_t           result;
1093         int               refcheck;
1094         ENTRY;
1095
1096         env = cl_env_get(&refcheck);
1097         if (IS_ERR(env))
1098                 RETURN(PTR_ERR(env));
1099
1100         local_iov = &vvp_env_info(env)->vti_local_iov;
1101         local_iov->iov_base = (void __user *)buf;
1102         local_iov->iov_len = count;
1103
1104         result = ll_file_writev(file, local_iov, 1, ppos);
1105         cl_env_put(env, &refcheck);
1106         RETURN(result);
1107 }
1108
1109 #else /* AIO stuff */
1110 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1111                                  unsigned long nr_segs, loff_t pos)
1112 {
1113         struct lu_env      *env;
1114         struct vvp_io_args *args;
1115         size_t              count;
1116         ssize_t             result;
1117         int                 refcheck;
1118         ENTRY;
1119
1120         result = ll_file_get_iov_count(iov, &nr_segs, &count);
1121         if (result)
1122                 RETURN(result);
1123
1124         env = cl_env_get(&refcheck);
1125         if (IS_ERR(env))
1126                 RETURN(PTR_ERR(env));
1127
1128         args = vvp_env_args(env, IO_NORMAL);
1129         args->u.normal.via_iov = (struct iovec *)iov;
1130         args->u.normal.via_nrsegs = nr_segs;
1131         args->u.normal.via_iocb = iocb;
1132
1133         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1134                                   &iocb->ki_pos, count);
1135         cl_env_put(env, &refcheck);
1136         RETURN(result);
1137 }
1138
1139 static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1140                              loff_t *ppos)
1141 {
1142         struct lu_env *env;
1143         struct iovec  *local_iov;
1144         struct kiocb  *kiocb;
1145         ssize_t        result;
1146         int            refcheck;
1147         ENTRY;
1148
1149         env = cl_env_get(&refcheck);
1150         if (IS_ERR(env))
1151                 RETURN(PTR_ERR(env));
1152
1153         local_iov = &vvp_env_info(env)->vti_local_iov;
1154         kiocb = &vvp_env_info(env)->vti_kiocb;
1155         local_iov->iov_base = (void __user *)buf;
1156         local_iov->iov_len = count;
1157         init_sync_kiocb(kiocb, file);
1158         kiocb->ki_pos = *ppos;
1159         kiocb->ki_left = count;
1160
1161         result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1162         *ppos = kiocb->ki_pos;
1163
1164         cl_env_put(env, &refcheck);
1165         RETURN(result);
1166 }
1167 #endif
1168
1169
1170 #ifdef HAVE_KERNEL_SENDFILE
1171 /*
1172  * Send file content (through pagecache) somewhere with helper
1173  */
1174 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
1175                                 read_actor_t actor, void *target)
1176 {
1177         struct lu_env      *env;
1178         struct vvp_io_args *args;
1179         ssize_t             result;
1180         int                 refcheck;
1181         ENTRY;
1182
1183         env = cl_env_get(&refcheck);
1184         if (IS_ERR(env))
1185                 RETURN(PTR_ERR(env));
1186
1187         args = vvp_env_args(env, IO_SENDFILE);
1188         args->u.sendfile.via_target = target;
1189         args->u.sendfile.via_actor = actor;
1190
1191         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1192         cl_env_put(env, &refcheck);
1193         RETURN(result);
1194 }
1195 #endif
1196
1197 #ifdef HAVE_KERNEL_SPLICE_READ
1198 /*
1199  * Send file content (through pagecache) somewhere with helper
1200  */
1201 static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1202                                    struct pipe_inode_info *pipe, size_t count,
1203                                    unsigned int flags)
1204 {
1205         struct lu_env      *env;
1206         struct vvp_io_args *args;
1207         ssize_t             result;
1208         int                 refcheck;
1209         ENTRY;
1210
1211         env = cl_env_get(&refcheck);
1212         if (IS_ERR(env))
1213                 RETURN(PTR_ERR(env));
1214
1215         args = vvp_env_args(env, IO_SPLICE);
1216         args->u.splice.via_pipe = pipe;
1217         args->u.splice.via_flags = flags;
1218
1219         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1220         cl_env_put(env, &refcheck);
1221         RETURN(result);
1222 }
1223 #endif
1224
1225 static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
1226                            obd_count ost_idx)
1227 {
1228         struct obd_export *exp = ll_i2dtexp(inode);
1229         struct obd_trans_info oti = { 0 };
1230         struct obdo *oa = NULL;
1231         int lsm_size;
1232         int rc = 0;
1233         struct lov_stripe_md *lsm = NULL, *lsm2;
1234         ENTRY;
1235
1236         OBDO_ALLOC(oa);
1237         if (oa == NULL)
1238                 RETURN(-ENOMEM);
1239
1240         lsm = ccc_inode_lsm_get(inode);
1241         if (lsm == NULL)
1242                 GOTO(out, rc = -ENOENT);
1243
1244         lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1245                    (lsm->lsm_stripe_count));
1246
1247         OBD_ALLOC_LARGE(lsm2, lsm_size);
1248         if (lsm2 == NULL)
1249                 GOTO(out, rc = -ENOMEM);
1250
1251         oa->o_id = id;
1252         oa->o_seq = seq;
1253         oa->o_nlink = ost_idx;
1254         oa->o_flags |= OBD_FL_RECREATE_OBJS;
1255         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1256         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1257                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1258         obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1259         memcpy(lsm2, lsm, lsm_size);
1260         ll_inode_size_lock(inode);
1261         rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1262         ll_inode_size_unlock(inode);
1263
1264         OBD_FREE_LARGE(lsm2, lsm_size);
1265         GOTO(out, rc);
1266 out:
1267         ccc_inode_lsm_put(inode, lsm);
1268         OBDO_FREE(oa);
1269         return rc;
1270 }
1271
1272 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1273 {
1274         struct ll_recreate_obj ucreat;
1275         ENTRY;
1276
1277         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1278                 RETURN(-EPERM);
1279
1280         if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1281                                sizeof(struct ll_recreate_obj)))
1282                 RETURN(-EFAULT);
1283
1284         RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
1285                                ucreat.lrc_ost_idx));
1286 }
1287
1288 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1289 {
1290         struct lu_fid fid;
1291         obd_id id;
1292         obd_count ost_idx;
1293         ENTRY;
1294
1295         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1296                 RETURN(-EPERM);
1297
1298         if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
1299                                sizeof(struct lu_fid)))
1300                 RETURN(-EFAULT);
1301
1302         id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
1303         ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1304         RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
1305 }
1306
1307 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1308                              int flags, struct lov_user_md *lum, int lum_size)
1309 {
1310         struct lov_stripe_md *lsm = NULL;
1311         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1312         int rc = 0;
1313         ENTRY;
1314
1315         lsm = ccc_inode_lsm_get(inode);
1316         if (lsm != NULL) {
1317                 ccc_inode_lsm_put(inode, lsm);
1318                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1319                        inode->i_ino);
1320                 RETURN(-EEXIST);
1321         }
1322
1323         ll_inode_size_lock(inode);
1324         rc = ll_intent_file_open(file, lum, lum_size, &oit);
1325         if (rc)
1326                 GOTO(out, rc);
1327         rc = oit.d.lustre.it_status;
1328         if (rc < 0)
1329                 GOTO(out_req_free, rc);
1330
1331         ll_release_openhandle(file->f_dentry, &oit);
1332
1333  out:
1334         ll_inode_size_unlock(inode);
1335         ll_intent_release(&oit);
1336         ccc_inode_lsm_put(inode, lsm);
1337         RETURN(rc);
1338 out_req_free:
1339         ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1340         goto out;
1341 }
1342
1343 int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1344                              struct lov_mds_md **lmmp, int *lmm_size,
1345                              struct ptlrpc_request **request)
1346 {
1347         struct ll_sb_info *sbi = ll_i2sbi(inode);
1348         struct mdt_body  *body;
1349         struct lov_mds_md *lmm = NULL;
1350         struct ptlrpc_request *req = NULL;
1351         struct md_op_data *op_data;
1352         int rc, lmmsize;
1353
1354         rc = ll_get_max_mdsize(sbi, &lmmsize);
1355         if (rc)
1356                 RETURN(rc);
1357
1358         op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1359                                      strlen(filename), lmmsize,
1360                                      LUSTRE_OPC_ANY, NULL);
1361         if (IS_ERR(op_data))
1362                 RETURN(PTR_ERR(op_data));
1363
1364         op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1365         rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1366         ll_finish_md_op_data(op_data);
1367         if (rc < 0) {
1368                 CDEBUG(D_INFO, "md_getattr_name failed "
1369                        "on %s: rc %d\n", filename, rc);
1370                 GOTO(out, rc);
1371         }
1372
1373         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1374         LASSERT(body != NULL); /* checked by mdc_getattr_name */
1375
1376         lmmsize = body->eadatasize;
1377
1378         if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1379                         lmmsize == 0) {
1380                 GOTO(out, rc = -ENODATA);
1381         }
1382
1383         lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1384         LASSERT(lmm != NULL);
1385
1386         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1387             (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1388                 GOTO(out, rc = -EPROTO);
1389         }
1390
1391         /*
1392          * This is coming from the MDS, so is probably in
1393          * little endian.  We convert it to host endian before
1394          * passing it to userspace.
1395          */
1396         if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1397                 /* if function called for directory - we should
1398                  * avoid swab not existent lsm objects */
1399                 if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1400                         lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1401                         if (S_ISREG(body->mode))
1402                                 lustre_swab_lov_user_md_objects(
1403                                  ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1404                                  ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1405                 } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1406                         lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1407                         if (S_ISREG(body->mode))
1408                                 lustre_swab_lov_user_md_objects(
1409                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1410                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1411                 }
1412         }
1413
1414 out:
1415         *lmmp = lmm;
1416         *lmm_size = lmmsize;
1417         *request = req;
1418         return rc;
1419 }
1420
1421 static int ll_lov_setea(struct inode *inode, struct file *file,
1422                             unsigned long arg)
1423 {
1424         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1425         struct lov_user_md  *lump;
1426         int lum_size = sizeof(struct lov_user_md) +
1427                        sizeof(struct lov_user_ost_data);
1428         int rc;
1429         ENTRY;
1430
1431         if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1432                 RETURN(-EPERM);
1433
1434         OBD_ALLOC_LARGE(lump, lum_size);
1435         if (lump == NULL) {
1436                 RETURN(-ENOMEM);
1437         }
1438         if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1439                 OBD_FREE_LARGE(lump, lum_size);
1440                 RETURN(-EFAULT);
1441         }
1442
1443         rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1444
1445         OBD_FREE_LARGE(lump, lum_size);
1446         RETURN(rc);
1447 }
1448
1449 static int ll_lov_setstripe(struct inode *inode, struct file *file,
1450                             unsigned long arg)
1451 {
1452         struct lov_user_md_v3 lumv3;
1453         struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
1454         struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
1455         struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
1456         int lum_size;
1457         int rc;
1458         int flags = FMODE_WRITE;
1459         ENTRY;
1460
1461         /* first try with v1 which is smaller than v3 */
1462         lum_size = sizeof(struct lov_user_md_v1);
1463         if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
1464                 RETURN(-EFAULT);
1465
1466         if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1467                 lum_size = sizeof(struct lov_user_md_v3);
1468                 if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
1469                         RETURN(-EFAULT);
1470         }
1471
1472         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1473         if (rc == 0) {
1474                 struct lov_stripe_md *lsm;
1475                 put_user(0, &lumv1p->lmm_stripe_count);
1476                 lsm = ccc_inode_lsm_get(inode);
1477                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1478                                    0, lsm, (void *)arg);
1479                 ccc_inode_lsm_put(inode, lsm);
1480         }
1481         RETURN(rc);
1482 }
1483
1484 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1485 {
1486         struct lov_stripe_md *lsm;
1487         int rc = -ENODATA;
1488         ENTRY;
1489
1490         lsm = ccc_inode_lsm_get(inode);
1491         if (lsm != NULL)
1492                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1493                                    lsm, (void *)arg);
1494         ccc_inode_lsm_put(inode, lsm);
1495         RETURN(rc);
1496 }
1497
1498 int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1499 {
1500         struct ll_inode_info   *lli = ll_i2info(inode);
1501         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1502         struct ccc_grouplock    grouplock;
1503         int                     rc;
1504         ENTRY;
1505
1506         if (ll_file_nolock(file))
1507                 RETURN(-EOPNOTSUPP);
1508
1509         spin_lock(&lli->lli_lock);
1510         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1511                 CWARN("group lock already existed with gid %lu\n",
1512                       fd->fd_grouplock.cg_gid);
1513                 spin_unlock(&lli->lli_lock);
1514                 RETURN(-EINVAL);
1515         }
1516         LASSERT(fd->fd_grouplock.cg_lock == NULL);
1517         spin_unlock(&lli->lli_lock);
1518
1519         rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1520                               arg, (file->f_flags & O_NONBLOCK), &grouplock);
1521         if (rc)
1522                 RETURN(rc);
1523
1524         spin_lock(&lli->lli_lock);
1525         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1526                 spin_unlock(&lli->lli_lock);
1527                 CERROR("another thread just won the race\n");
1528                 cl_put_grouplock(&grouplock);
1529                 RETURN(-EINVAL);
1530         }
1531
1532         fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1533         fd->fd_grouplock = grouplock;
1534         spin_unlock(&lli->lli_lock);
1535
1536         CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1537         RETURN(0);
1538 }
1539
1540 int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1541 {
1542         struct ll_inode_info   *lli = ll_i2info(inode);
1543         struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1544         struct ccc_grouplock    grouplock;
1545         ENTRY;
1546
1547         spin_lock(&lli->lli_lock);
1548         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1549                 spin_unlock(&lli->lli_lock);
1550                 CWARN("no group lock held\n");
1551                 RETURN(-EINVAL);
1552         }
1553         LASSERT(fd->fd_grouplock.cg_lock != NULL);
1554
1555         if (fd->fd_grouplock.cg_gid != arg) {
1556                 CWARN("group lock %lu doesn't match current id %lu\n",
1557                        arg, fd->fd_grouplock.cg_gid);
1558                 spin_unlock(&lli->lli_lock);
1559                 RETURN(-EINVAL);
1560         }
1561
1562         grouplock = fd->fd_grouplock;
1563         memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1564         fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1565         spin_unlock(&lli->lli_lock);
1566
1567         cl_put_grouplock(&grouplock);
1568         CDEBUG(D_INFO, "group lock %lu released\n", arg);
1569         RETURN(0);
1570 }
1571
1572 /**
1573  * Close inode open handle
1574  *
1575  * \param dentry [in]     dentry which contains the inode
1576  * \param it     [in,out] intent which contains open info and result
1577  *
1578  * \retval 0     success
1579  * \retval <0    failure
1580  */
1581 int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1582 {
1583         struct inode *inode = dentry->d_inode;
1584         struct obd_client_handle *och;
1585         int rc;
1586         ENTRY;
1587
1588         LASSERT(inode);
1589
1590         /* Root ? Do nothing. */
1591         if (dentry->d_inode->i_sb->s_root == dentry)
1592                 RETURN(0);
1593
1594         /* No open handle to close? Move away */
1595         if (!it_disposition(it, DISP_OPEN_OPEN))
1596                 RETURN(0);
1597
1598         LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1599
1600         OBD_ALLOC(och, sizeof(*och));
1601         if (!och)
1602                 GOTO(out, rc = -ENOMEM);
1603
1604         ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1605                     ll_i2info(inode), it, och);
1606
1607         rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1608                                        inode, och);
1609  out:
1610         /* this one is in place of ll_file_open */
1611         if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1612                 ptlrpc_req_finished(it->d.lustre.it_data);
1613                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1614         }
1615         RETURN(rc);
1616 }
1617
1618 /**
1619  * Get size for inode for which FIEMAP mapping is requested.
1620  * Make the FIEMAP get_info call and returns the result.
1621  */
1622 int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1623               int num_bytes)
1624 {
1625         struct obd_export *exp = ll_i2dtexp(inode);
1626         struct lov_stripe_md *lsm = NULL;
1627         struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1628         int vallen = num_bytes;
1629         int rc;
1630         ENTRY;
1631
1632         /* Checks for fiemap flags */
1633         if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1634                 fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1635                 return -EBADR;
1636         }
1637
1638         /* Check for FIEMAP_FLAG_SYNC */
1639         if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1640                 rc = filemap_fdatawrite(inode->i_mapping);
1641                 if (rc)
1642                         return rc;
1643         }
1644
1645         lsm = ccc_inode_lsm_get(inode);
1646         if (lsm == NULL)
1647                 return -ENOENT;
1648
1649         /* If the stripe_count > 1 and the application does not understand
1650          * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1651          */
1652         if (lsm->lsm_stripe_count > 1 &&
1653             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1654                 GOTO(out, rc = -EOPNOTSUPP);
1655
1656         fm_key.oa.o_id = lsm->lsm_object_id;
1657         fm_key.oa.o_seq = lsm->lsm_object_seq;
1658         fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1659
1660         obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1661         obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1662         /* If filesize is 0, then there would be no objects for mapping */
1663         if (fm_key.oa.o_size == 0) {
1664                 fiemap->fm_mapped_extents = 0;
1665                 GOTO(out, rc = 0);
1666         }
1667
1668         memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1669
1670         rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1671                           fiemap, lsm);
1672         if (rc)
1673                 CERROR("obd_get_info failed: rc = %d\n", rc);
1674
1675 out:
1676         ccc_inode_lsm_put(inode, lsm);
1677         RETURN(rc);
1678 }
1679
1680 int ll_fid2path(struct inode *inode, void *arg)
1681 {
1682         struct obd_export *exp = ll_i2mdexp(inode);
1683         struct getinfo_fid2path *gfout, *gfin;
1684         int outsize, rc;
1685         ENTRY;
1686
1687         if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1688             !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1689                 RETURN(-EPERM);
1690
1691         /* Need to get the buflen */
1692         OBD_ALLOC_PTR(gfin);
1693         if (gfin == NULL)
1694                 RETURN(-ENOMEM);
1695         if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
1696                 OBD_FREE_PTR(gfin);
1697                 RETURN(-EFAULT);
1698         }
1699
1700         outsize = sizeof(*gfout) + gfin->gf_pathlen;
1701         OBD_ALLOC(gfout, outsize);
1702         if (gfout == NULL) {
1703                 OBD_FREE_PTR(gfin);
1704                 RETURN(-ENOMEM);
1705         }
1706         memcpy(gfout, gfin, sizeof(*gfout));
1707         OBD_FREE_PTR(gfin);
1708
1709         /* Call mdc_iocontrol */
1710         rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1711         if (rc)
1712                 GOTO(gf_free, rc);
1713         if (cfs_copy_to_user(arg, gfout, outsize))
1714                 rc = -EFAULT;
1715
1716 gf_free:
1717         OBD_FREE(gfout, outsize);
1718         RETURN(rc);
1719 }
1720
1721 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1722 {
1723         struct ll_user_fiemap *fiemap_s;
1724         size_t num_bytes, ret_bytes;
1725         unsigned int extent_count;
1726         int rc = 0;
1727
1728         /* Get the extent count so we can calculate the size of
1729          * required fiemap buffer */
1730         if (get_user(extent_count,
1731             &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1732                 RETURN(-EFAULT);
1733         num_bytes = sizeof(*fiemap_s) + (extent_count *
1734                                          sizeof(struct ll_fiemap_extent));
1735
1736         OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1737         if (fiemap_s == NULL)
1738                 RETURN(-ENOMEM);
1739
1740         /* get the fiemap value */
1741         if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
1742                            sizeof(*fiemap_s)))
1743                 GOTO(error, rc = -EFAULT);
1744
1745         /* If fm_extent_count is non-zero, read the first extent since
1746          * it is used to calculate end_offset and device from previous
1747          * fiemap call. */
1748         if (extent_count) {
1749                 if (copy_from_user(&fiemap_s->fm_extents[0],
1750                     (char __user *)arg + sizeof(*fiemap_s),
1751                     sizeof(struct ll_fiemap_extent)))
1752                         GOTO(error, rc = -EFAULT);
1753         }
1754
1755         rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1756         if (rc)
1757                 GOTO(error, rc);
1758
1759         ret_bytes = sizeof(struct ll_user_fiemap);
1760
1761         if (extent_count != 0)
1762                 ret_bytes += (fiemap_s->fm_mapped_extents *
1763                                  sizeof(struct ll_fiemap_extent));
1764
1765         if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1766                 rc = -EFAULT;
1767
1768 error:
1769         OBD_FREE_LARGE(fiemap_s, num_bytes);
1770         RETURN(rc);
1771 }
1772
1773 /*
1774  * Read the data_version for inode.
1775  *
1776  * This value is computed using stripe object version on OST.
1777  * Version is computed using server side locking.
1778  *
1779  * @param extent_lock  Take extent lock. Not needed if a process is already
1780  *                     holding the OST object group locks.
1781  */
1782 static int ll_data_version(struct inode *inode, __u64 *data_version,
1783                            int extent_lock)
1784 {
1785         struct lov_stripe_md *lsm = NULL;
1786         struct ll_sb_info    *sbi = ll_i2sbi(inode);
1787         struct obdo          *obdo = NULL;
1788         int                   rc;
1789         ENTRY;
1790
1791         /* If no stripe, we consider version is 0. */
1792         lsm = ccc_inode_lsm_get(inode);
1793         if (lsm == NULL) {
1794                 *data_version = 0;
1795                 CDEBUG(D_INODE, "No object for inode\n");
1796                 RETURN(0);
1797         }
1798
1799         OBD_ALLOC_PTR(obdo);
1800         if (obdo == NULL) {
1801                 ccc_inode_lsm_put(inode, lsm);
1802                 RETURN(-ENOMEM);
1803         }
1804
1805         rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1806         if (!rc) {
1807                 if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1808                         rc = -EOPNOTSUPP;
1809                 else
1810                         *data_version = obdo->o_data_version;
1811         }
1812
1813         OBD_FREE_PTR(obdo);
1814         ccc_inode_lsm_put(inode, lsm);
1815
1816         RETURN(rc);
1817 }
1818
1819 long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1820 {
1821         struct inode *inode = file->f_dentry->d_inode;
1822         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1823         int flags;
1824
1825         ENTRY;
1826
1827         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1828                inode->i_generation, inode, cmd);
1829         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1830
1831         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1832         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1833                 RETURN(-ENOTTY);
1834
1835         switch(cmd) {
1836         case LL_IOC_GETFLAGS:
1837                 /* Get the current value of the file flags */
1838                 return put_user(fd->fd_flags, (int *)arg);
1839         case LL_IOC_SETFLAGS:
1840         case LL_IOC_CLRFLAGS:
1841                 /* Set or clear specific file flags */
1842                 /* XXX This probably needs checks to ensure the flags are
1843                  *     not abused, and to handle any flag side effects.
1844                  */
1845                 if (get_user(flags, (int *) arg))
1846                         RETURN(-EFAULT);
1847
1848                 if (cmd == LL_IOC_SETFLAGS) {
1849                         if ((flags & LL_FILE_IGNORE_LOCK) &&
1850                             !(file->f_flags & O_DIRECT)) {
1851                                 CERROR("%s: unable to disable locking on "
1852                                        "non-O_DIRECT file\n", current->comm);
1853                                 RETURN(-EINVAL);
1854                         }
1855
1856                         fd->fd_flags |= flags;
1857                 } else {
1858                         fd->fd_flags &= ~flags;
1859                 }
1860                 RETURN(0);
1861         case LL_IOC_LOV_SETSTRIPE:
1862                 RETURN(ll_lov_setstripe(inode, file, arg));
1863         case LL_IOC_LOV_SETEA:
1864                 RETURN(ll_lov_setea(inode, file, arg));
1865         case LL_IOC_LOV_GETSTRIPE:
1866                 RETURN(ll_lov_getstripe(inode, arg));
1867         case LL_IOC_RECREATE_OBJ:
1868                 RETURN(ll_lov_recreate_obj(inode, arg));
1869         case LL_IOC_RECREATE_FID:
1870                 RETURN(ll_lov_recreate_fid(inode, arg));
1871         case FSFILT_IOC_FIEMAP:
1872                 RETURN(ll_ioctl_fiemap(inode, arg));
1873         case FSFILT_IOC_GETFLAGS:
1874         case FSFILT_IOC_SETFLAGS:
1875                 RETURN(ll_iocontrol(inode, file, cmd, arg));
1876         case FSFILT_IOC_GETVERSION_OLD:
1877         case FSFILT_IOC_GETVERSION:
1878                 RETURN(put_user(inode->i_generation, (int *)arg));
1879         case LL_IOC_GROUP_LOCK:
1880                 RETURN(ll_get_grouplock(inode, file, arg));
1881         case LL_IOC_GROUP_UNLOCK:
1882                 RETURN(ll_put_grouplock(inode, file, arg));
1883         case IOC_OBD_STATFS:
1884                 RETURN(ll_obd_statfs(inode, (void *)arg));
1885
1886         /* We need to special case any other ioctls we want to handle,
1887          * to send them to the MDS/OST as appropriate and to properly
1888          * network encode the arg field.
1889         case FSFILT_IOC_SETVERSION_OLD:
1890         case FSFILT_IOC_SETVERSION:
1891         */
1892         case LL_IOC_FLUSHCTX:
1893                 RETURN(ll_flush_ctx(inode));
1894         case LL_IOC_PATH2FID: {
1895                 if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
1896                                      sizeof(struct lu_fid)))
1897                         RETURN(-EFAULT);
1898
1899                 RETURN(0);
1900         }
1901         case OBD_IOC_FID2PATH:
1902                 RETURN(ll_fid2path(inode, (void *)arg));
1903         case LL_IOC_DATA_VERSION: {
1904                 struct ioc_data_version idv;
1905                 int rc;
1906
1907                 if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
1908                         RETURN(-EFAULT);
1909
1910                 rc = ll_data_version(inode, &idv.idv_version,
1911                                      !(idv.idv_flags & LL_DV_NOFLUSH));
1912
1913                 if (rc == 0 &&
1914                     cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
1915                         RETURN(-EFAULT);
1916
1917                 RETURN(rc);
1918         }
1919
1920         case LL_IOC_GET_MDTIDX: {
1921                 int mdtidx;
1922
1923                 mdtidx = ll_get_mdt_idx(inode);
1924                 if (mdtidx < 0)
1925                         RETURN(mdtidx);
1926
1927                 if (put_user((int)mdtidx, (int*)arg))
1928                         RETURN(-EFAULT);
1929
1930                 RETURN(0);
1931         }
1932         case OBD_IOC_GETDTNAME:
1933         case OBD_IOC_GETMDNAME:
1934                 RETURN(ll_get_obd_name(inode, cmd, arg));
1935         default: {
1936                 int err;
1937
1938                 if (LLIOC_STOP ==
1939                     ll_iocontrol_call(inode, file, cmd, arg, &err))
1940                         RETURN(err);
1941
1942                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
1943                                      (void *)arg));
1944         }
1945         }
1946 }
1947
1948 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
1949 {
1950         struct inode *inode = file->f_dentry->d_inode;
1951         loff_t retval;
1952         ENTRY;
1953         retval = offset + ((origin == 2) ? i_size_read(inode) :
1954                            (origin == 1) ? file->f_pos : 0);
1955         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%s)\n",
1956                inode->i_ino, inode->i_generation, inode, retval, retval,
1957                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
1958         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
1959
1960         if (origin == 2) { /* SEEK_END */
1961                 int rc;
1962
1963                 rc = ll_glimpse_size(inode);
1964                 if (rc != 0)
1965                         RETURN(rc);
1966
1967                 offset += i_size_read(inode);
1968         } else if (origin == 1) { /* SEEK_CUR */
1969                 offset += file->f_pos;
1970         }
1971
1972         retval = -EINVAL;
1973         if (offset >= 0 && offset <= ll_file_maxbytes(inode)) {
1974                 if (offset != file->f_pos) {
1975                         file->f_pos = offset;
1976                 }
1977                 retval = offset;
1978         }
1979
1980         RETURN(retval);
1981 }
1982
1983 int ll_flush(struct file *file, fl_owner_t id)
1984 {
1985         struct inode *inode = file->f_dentry->d_inode;
1986         struct ll_inode_info *lli = ll_i2info(inode);
1987         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1988         int rc, err;
1989
1990         LASSERT(!S_ISDIR(inode->i_mode));
1991
1992         /* catch async errors that were recorded back when async writeback
1993          * failed for pages in this mapping. */
1994         rc = lli->lli_async_rc;
1995         lli->lli_async_rc = 0;
1996         err = lov_read_and_clear_async_rc(lli->lli_clob);
1997         if (rc == 0)
1998                 rc = err;
1999
2000         /* The application has been told write failure already.
2001          * Do not report failure again. */
2002         if (fd->fd_write_failed)
2003                 return 0;
2004         return rc ? -EIO : 0;
2005 }
2006
2007 /**
2008  * Called to make sure a portion of file has been written out.
2009  * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2010  *
2011  * Return how many pages have been written.
2012  */
2013 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2014                        enum cl_fsync_mode mode)
2015 {
2016         struct cl_env_nest nest;
2017         struct lu_env *env;
2018         struct cl_io *io;
2019         struct obd_capa *capa = NULL;
2020         struct cl_fsync_io *fio;
2021         int result;
2022         ENTRY;
2023
2024         if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2025             mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2026                 RETURN(-EINVAL);
2027
2028         env = cl_env_nested_get(&nest);
2029         if (IS_ERR(env))
2030                 RETURN(PTR_ERR(env));
2031
2032         capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2033
2034         io = ccc_env_thread_io(env);
2035         io->ci_obj = cl_i2info(inode)->lli_clob;
2036         io->ci_ignore_layout = 1;
2037
2038         /* initialize parameters for sync */
2039         fio = &io->u.ci_fsync;
2040         fio->fi_capa = capa;
2041         fio->fi_start = start;
2042         fio->fi_end = end;
2043         fio->fi_fid = ll_inode2fid(inode);
2044         fio->fi_mode = mode;
2045         fio->fi_nr_written = 0;
2046
2047         if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2048                 result = cl_io_loop(env, io);
2049         else
2050                 result = io->ci_result;
2051         if (result == 0)
2052                 result = fio->fi_nr_written;
2053         cl_io_fini(env, io);
2054         cl_env_nested_put(&nest, env);
2055
2056         capa_put(capa);
2057
2058         RETURN(result);
2059 }
2060
2061 #ifdef HAVE_FILE_FSYNC_4ARGS
2062 int ll_fsync(struct file *file, loff_t start, loff_t end, int data)
2063 #elif defined(HAVE_FILE_FSYNC_2ARGS)
2064 int ll_fsync(struct file *file, int data)
2065 #else
2066 int ll_fsync(struct file *file, struct dentry *dentry, int data)
2067 #endif
2068 {
2069         struct inode *inode = file->f_dentry->d_inode;
2070         struct ll_inode_info *lli = ll_i2info(inode);
2071         struct ptlrpc_request *req;
2072         struct obd_capa *oc;
2073         int rc, err;
2074         ENTRY;
2075
2076         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2077                inode->i_generation, inode);
2078         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2079
2080 #ifdef HAVE_FILE_FSYNC_4ARGS
2081         rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2082         mutex_lock(&inode->i_mutex);
2083 #else
2084         /* fsync's caller has already called _fdata{sync,write}, we want
2085          * that IO to finish before calling the osc and mdc sync methods */
2086         rc = filemap_fdatawait(inode->i_mapping);
2087 #endif
2088
2089         /* catch async errors that were recorded back when async writeback
2090          * failed for pages in this mapping. */
2091         if (!S_ISDIR(inode->i_mode)) {
2092                 err = lli->lli_async_rc;
2093                 lli->lli_async_rc = 0;
2094                 if (rc == 0)
2095                         rc = err;
2096                 err = lov_read_and_clear_async_rc(lli->lli_clob);
2097                 if (rc == 0)
2098                         rc = err;
2099         }
2100
2101         oc = ll_mdscapa_get(inode);
2102         err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2103                       &req);
2104         capa_put(oc);
2105         if (!rc)
2106                 rc = err;
2107         if (!err)
2108                 ptlrpc_req_finished(req);
2109
2110         if (data) {
2111                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2112
2113                 err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2114                                 CL_FSYNC_ALL);
2115                 if (rc == 0 && err < 0)
2116                         rc = err;
2117                 if (rc < 0)
2118                         fd->fd_write_failed = true;
2119                 else
2120                         fd->fd_write_failed = false;
2121         }
2122
2123 #ifdef HAVE_FILE_FSYNC_4ARGS
2124         mutex_unlock(&inode->i_mutex);
2125 #endif
2126         RETURN(rc);
2127 }
2128
2129 int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2130 {
2131         struct inode *inode = file->f_dentry->d_inode;
2132         struct ll_sb_info *sbi = ll_i2sbi(inode);
2133         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2134                                            .ei_cb_cp =ldlm_flock_completion_ast,
2135                                            .ei_cbdata = file_lock };
2136         struct md_op_data *op_data;
2137         struct lustre_handle lockh = {0};
2138         ldlm_policy_data_t flock = {{0}};
2139         int flags = 0;
2140         int rc;
2141         ENTRY;
2142
2143         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2144                inode->i_ino, file_lock);
2145
2146         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2147
2148         if (file_lock->fl_flags & FL_FLOCK) {
2149                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2150                 /* flocks are whole-file locks */
2151                 flock.l_flock.end = OFFSET_MAX;
2152                 /* For flocks owner is determined by the local file desctiptor*/
2153                 flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2154         } else if (file_lock->fl_flags & FL_POSIX) {
2155                 flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2156                 flock.l_flock.start = file_lock->fl_start;
2157                 flock.l_flock.end = file_lock->fl_end;
2158         } else {
2159                 RETURN(-EINVAL);
2160         }
2161         flock.l_flock.pid = file_lock->fl_pid;
2162
2163         /* Somewhat ugly workaround for svc lockd.
2164          * lockd installs custom fl_lmops->lm_compare_owner that checks
2165          * for the fl_owner to be the same (which it always is on local node
2166          * I guess between lockd processes) and then compares pid.
2167          * As such we assign pid to the owner field to make it all work,
2168          * conflict with normal locks is unlikely since pid space and
2169          * pointer space for current->files are not intersecting */
2170         if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2171                 flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2172
2173         switch (file_lock->fl_type) {
2174         case F_RDLCK:
2175                 einfo.ei_mode = LCK_PR;
2176                 break;
2177         case F_UNLCK:
2178                 /* An unlock request may or may not have any relation to
2179                  * existing locks so we may not be able to pass a lock handle
2180                  * via a normal ldlm_lock_cancel() request. The request may even
2181                  * unlock a byte range in the middle of an existing lock. In
2182                  * order to process an unlock request we need all of the same
2183                  * information that is given with a normal read or write record
2184                  * lock request. To avoid creating another ldlm unlock (cancel)
2185                  * message we'll treat a LCK_NL flock request as an unlock. */
2186                 einfo.ei_mode = LCK_NL;
2187                 break;
2188         case F_WRLCK:
2189                 einfo.ei_mode = LCK_PW;
2190                 break;
2191         default:
2192                 CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2193                         file_lock->fl_type);
2194                 RETURN (-ENOTSUPP);
2195         }
2196
2197         switch (cmd) {
2198         case F_SETLKW:
2199 #ifdef F_SETLKW64
2200         case F_SETLKW64:
2201 #endif
2202                 flags = 0;
2203                 break;
2204         case F_SETLK:
2205 #ifdef F_SETLK64
2206         case F_SETLK64:
2207 #endif
2208                 flags = LDLM_FL_BLOCK_NOWAIT;
2209                 break;
2210         case F_GETLK:
2211 #ifdef F_GETLK64
2212         case F_GETLK64:
2213 #endif
2214                 flags = LDLM_FL_TEST_LOCK;
2215                 /* Save the old mode so that if the mode in the lock changes we
2216                  * can decrement the appropriate reader or writer refcount. */
2217                 file_lock->fl_type = einfo.ei_mode;
2218                 break;
2219         default:
2220                 CERROR("unknown fcntl lock command: %d\n", cmd);
2221                 RETURN (-EINVAL);
2222         }
2223
2224         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2225                                      LUSTRE_OPC_ANY, NULL);
2226         if (IS_ERR(op_data))
2227                 RETURN(PTR_ERR(op_data));
2228
2229         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2230                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2231                flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2232
2233         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2234                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
2235
2236         ll_finish_md_op_data(op_data);
2237
2238         if ((file_lock->fl_flags & FL_FLOCK) &&
2239             (rc == 0 || file_lock->fl_type == F_UNLCK))
2240                 flock_lock_file_wait(file, file_lock);
2241         if ((file_lock->fl_flags & FL_POSIX) &&
2242             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2243             !(flags & LDLM_FL_TEST_LOCK))
2244                 posix_lock_file_wait(file, file_lock);
2245
2246         RETURN(rc);
2247 }
2248
2249 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2250 {
2251         ENTRY;
2252
2253         RETURN(-ENOSYS);
2254 }
2255
2256 /**
2257  * test if some locks matching bits and l_req_mode are acquired
2258  * - bits can be in different locks
2259  * - if found clear the common lock bits in *bits
2260  * - the bits not found, are kept in *bits
2261  * \param inode [IN]
2262  * \param bits [IN] searched lock bits [IN]
2263  * \param l_req_mode [IN] searched lock mode
2264  * \retval boolean, true iff all bits are found
2265  */
2266 int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2267 {
2268         struct lustre_handle lockh;
2269         ldlm_policy_data_t policy;
2270         ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2271                                 (LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2272         struct lu_fid *fid;
2273         __u64 flags;
2274         int i;
2275         ENTRY;
2276
2277         if (!inode)
2278                RETURN(0);
2279
2280         fid = &ll_i2info(inode)->lli_fid;
2281         CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2282                ldlm_lockname[mode]);
2283
2284         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2285         for (i = 0; i < MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2286                 policy.l_inodebits.bits = *bits & (1 << i);
2287                 if (policy.l_inodebits.bits == 0)
2288                         continue;
2289
2290                 if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2291                                   &policy, mode, &lockh)) {
2292                         struct ldlm_lock *lock;
2293
2294                         lock = ldlm_handle2lock(&lockh);
2295                         if (lock) {
2296                                 *bits &=
2297                                       ~(lock->l_policy_data.l_inodebits.bits);
2298                                 LDLM_LOCK_PUT(lock);
2299                         } else {
2300                                 *bits &= ~policy.l_inodebits.bits;
2301                         }
2302                 }
2303         }
2304         RETURN(*bits == 0);
2305 }
2306
2307 ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2308                             struct lustre_handle *lockh, __u64 flags)
2309 {
2310         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2311         struct lu_fid *fid;
2312         ldlm_mode_t rc;
2313         ENTRY;
2314
2315         fid = &ll_i2info(inode)->lli_fid;
2316         CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2317
2318         rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2319                            fid, LDLM_IBITS, &policy,
2320                            LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2321         RETURN(rc);
2322 }
2323
2324 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
2325         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
2326                               * and return success */
2327                 clear_nlink(inode);
2328                 /* This path cannot be hit for regular files unless in
2329                  * case of obscure races, so no need to to validate
2330                  * size. */
2331                 if (!S_ISREG(inode->i_mode) &&
2332                     !S_ISDIR(inode->i_mode))
2333                         return 0;
2334         }
2335
2336         if (rc) {
2337                 CERROR("failure %d inode %lu\n", rc, inode->i_ino);
2338                 return -abs(rc);
2339         }
2340
2341         return 0;
2342 }
2343
2344 int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2345                              __u64 ibits)
2346 {
2347         struct inode *inode = dentry->d_inode;
2348         struct ptlrpc_request *req = NULL;
2349         struct obd_export *exp;
2350         int rc = 0;
2351         ENTRY;
2352
2353         if (!inode) {
2354                 CERROR("REPORT THIS LINE TO PETER\n");
2355                 RETURN(0);
2356         }
2357
2358         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2359                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2360
2361         exp = ll_i2mdexp(inode);
2362
2363         /* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2364          *      But under CMD case, it caused some lock issues, should be fixed
2365          *      with new CMD ibits lock. See bug 12718 */
2366         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
2367                 struct lookup_intent oit = { .it_op = IT_GETATTR };
2368                 struct md_op_data *op_data;
2369
2370                 if (ibits == MDS_INODELOCK_LOOKUP)
2371                         oit.it_op = IT_LOOKUP;
2372
2373                 /* Call getattr by fid, so do not provide name at all. */
2374                 op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2375                                              dentry->d_inode, NULL, 0, 0,
2376                                              LUSTRE_OPC_ANY, NULL);
2377                 if (IS_ERR(op_data))
2378                         RETURN(PTR_ERR(op_data));
2379
2380                 oit.it_create_mode |= M_CHECK_STALE;
2381                 rc = md_intent_lock(exp, op_data, NULL, 0,
2382                                     /* we are not interested in name
2383                                        based lookup */
2384                                     &oit, 0, &req,
2385                                     ll_md_blocking_ast, 0);
2386                 ll_finish_md_op_data(op_data);
2387                 oit.it_create_mode &= ~M_CHECK_STALE;
2388                 if (rc < 0) {
2389                         rc = ll_inode_revalidate_fini(inode, rc);
2390                         GOTO (out, rc);
2391                 }
2392
2393                 rc = ll_revalidate_it_finish(req, &oit, dentry);
2394                 if (rc != 0) {
2395                         ll_intent_release(&oit);
2396                         GOTO(out, rc);
2397                 }
2398
2399                 /* Unlinked? Unhash dentry, so it is not picked up later by
2400                    do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2401                    here to preserve get_cwd functionality on 2.6.
2402                    Bug 10503 */
2403                 if (!dentry->d_inode->i_nlink)
2404                         d_lustre_invalidate(dentry);
2405
2406                 ll_lookup_finish_locks(&oit, dentry);
2407         } else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2408                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2409                 obd_valid valid = OBD_MD_FLGETATTR;
2410                 struct md_op_data *op_data;
2411                 int ealen = 0;
2412
2413                 if (S_ISREG(inode->i_mode)) {
2414                         rc = ll_get_max_mdsize(sbi, &ealen);
2415                         if (rc)
2416                                 RETURN(rc);
2417                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2418                 }
2419
2420                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2421                                              0, ealen, LUSTRE_OPC_ANY,
2422                                              NULL);
2423                 if (IS_ERR(op_data))
2424                         RETURN(PTR_ERR(op_data));
2425
2426                 op_data->op_valid = valid;
2427                 /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2428                  * capa for this inode. Because we only keep capas of dirs
2429                  * fresh. */
2430                 rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2431                 ll_finish_md_op_data(op_data);
2432                 if (rc) {
2433                         rc = ll_inode_revalidate_fini(inode, rc);
2434                         RETURN(rc);
2435                 }
2436
2437                 rc = ll_prep_inode(&inode, req, NULL);
2438         }
2439 out:
2440         ptlrpc_req_finished(req);
2441         return rc;
2442 }
2443
2444 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2445                            __u64 ibits)
2446 {
2447         struct inode *inode = dentry->d_inode;
2448         int rc;
2449         ENTRY;
2450
2451         rc = __ll_inode_revalidate_it(dentry, it, ibits);
2452         if (rc != 0)
2453                 RETURN(rc);
2454
2455         /* if object isn't regular file, don't validate size */
2456         if (!S_ISREG(inode->i_mode)) {
2457                 LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2458                 LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2459                 LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2460         } else {
2461                 rc = ll_glimpse_size(inode);
2462         }
2463         RETURN(rc);
2464 }
2465
2466 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2467                   struct lookup_intent *it, struct kstat *stat)
2468 {
2469         struct inode *inode = de->d_inode;
2470         struct ll_sb_info *sbi = ll_i2sbi(inode);
2471         struct ll_inode_info *lli = ll_i2info(inode);
2472         int res = 0;
2473
2474         res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2475                                              MDS_INODELOCK_LOOKUP);
2476         ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2477
2478         if (res)
2479                 return res;
2480
2481         stat->dev = inode->i_sb->s_dev;
2482         if (ll_need_32bit_api(sbi))
2483                 stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2484         else
2485                 stat->ino = inode->i_ino;
2486         stat->mode = inode->i_mode;
2487         stat->nlink = inode->i_nlink;
2488         stat->uid = inode->i_uid;
2489         stat->gid = inode->i_gid;
2490         stat->rdev = inode->i_rdev;
2491         stat->atime = inode->i_atime;
2492         stat->mtime = inode->i_mtime;
2493         stat->ctime = inode->i_ctime;
2494         stat->blksize = 1 << inode->i_blkbits;
2495
2496         stat->size = i_size_read(inode);
2497         stat->blocks = inode->i_blocks;
2498
2499         return 0;
2500 }
2501 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2502 {
2503         struct lookup_intent it = { .it_op = IT_GETATTR };
2504
2505         return ll_getattr_it(mnt, de, &it, stat);
2506 }
2507
2508 #ifdef HAVE_LINUX_FIEMAP_H
2509 int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
2510                 __u64 start, __u64 len)
2511 {
2512         int rc;
2513         size_t num_bytes;
2514         struct ll_user_fiemap *fiemap;
2515         unsigned int extent_count = fieinfo->fi_extents_max;
2516
2517         num_bytes = sizeof(*fiemap) + (extent_count *
2518                                        sizeof(struct ll_fiemap_extent));
2519         OBD_ALLOC_LARGE(fiemap, num_bytes);
2520
2521         if (fiemap == NULL)
2522                 RETURN(-ENOMEM);
2523
2524         fiemap->fm_flags = fieinfo->fi_flags;
2525         fiemap->fm_extent_count = fieinfo->fi_extents_max;
2526         fiemap->fm_start = start;
2527         fiemap->fm_length = len;
2528         memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
2529                sizeof(struct ll_fiemap_extent));
2530
2531         rc = ll_do_fiemap(inode, fiemap, num_bytes);
2532
2533         fieinfo->fi_flags = fiemap->fm_flags;
2534         fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
2535         memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
2536                fiemap->fm_mapped_extents * sizeof(struct ll_fiemap_extent));
2537
2538         OBD_FREE_LARGE(fiemap, num_bytes);
2539         return rc;
2540 }
2541 #endif
2542
2543 struct posix_acl * ll_get_acl(struct inode *inode, int type)
2544 {
2545         struct ll_inode_info *lli = ll_i2info(inode);
2546         struct posix_acl *acl = NULL;
2547         ENTRY;
2548
2549         spin_lock(&lli->lli_lock);
2550         /* VFS' acl_permission_check->check_acl will release the refcount */
2551         acl = posix_acl_dup(lli->lli_posix_acl);
2552         spin_unlock(&lli->lli_lock);
2553
2554         RETURN(acl);
2555 }
2556
2557 #ifndef HAVE_GENERIC_PERMISSION_2ARGS
2558 static int
2559 # ifdef HAVE_GENERIC_PERMISSION_4ARGS
2560 ll_check_acl(struct inode *inode, int mask, unsigned int flags)
2561 # else
2562 ll_check_acl(struct inode *inode, int mask)
2563 # endif
2564 {
2565 # ifdef CONFIG_FS_POSIX_ACL
2566         struct posix_acl *acl;
2567         int rc;
2568         ENTRY;
2569
2570 #  ifdef HAVE_GENERIC_PERMISSION_4ARGS
2571         if (flags & IPERM_FLAG_RCU)
2572                 return -ECHILD;
2573 #  endif
2574         acl = ll_get_acl(inode, ACL_TYPE_ACCESS);
2575
2576         if (!acl)
2577                 RETURN(-EAGAIN);
2578
2579         rc = posix_acl_permission(inode, acl, mask);
2580         posix_acl_release(acl);
2581
2582         RETURN(rc);
2583 # else /* !CONFIG_FS_POSIX_ACL */
2584         return -EAGAIN;
2585 # endif /* CONFIG_FS_POSIX_ACL */
2586 }
2587 #endif /* HAVE_GENERIC_PERMISSION_2ARGS */
2588
2589 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
2590 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags)
2591 #else
2592 # ifdef HAVE_INODE_PERMISION_2ARGS
2593 int ll_inode_permission(struct inode *inode, int mask)
2594 # else
2595 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
2596 # endif
2597 #endif
2598 {
2599         int rc = 0;
2600         ENTRY;
2601
2602 #ifdef MAY_NOT_BLOCK
2603         if (mask & MAY_NOT_BLOCK)
2604                 return -ECHILD;
2605 #elif defined(HAVE_GENERIC_PERMISSION_4ARGS)
2606         if (flags & IPERM_FLAG_RCU)
2607                 return -ECHILD;
2608 #endif
2609
2610        /* as root inode are NOT getting validated in lookup operation,
2611         * need to do it before permission check. */
2612
2613         if (inode == inode->i_sb->s_root->d_inode) {
2614                 struct lookup_intent it = { .it_op = IT_LOOKUP };
2615
2616                 rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2617                                               MDS_INODELOCK_LOOKUP);
2618                 if (rc)
2619                         RETURN(rc);
2620         }
2621
2622         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2623                inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2624
2625         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2626                 return lustre_check_remote_perm(inode, mask);
2627
2628         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2629         rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2630
2631         RETURN(rc);
2632 }
2633
2634 #ifdef HAVE_FILE_READV
2635 #define READ_METHOD readv
2636 #define READ_FUNCTION ll_file_readv
2637 #define WRITE_METHOD writev
2638 #define WRITE_FUNCTION ll_file_writev
2639 #else
2640 #define READ_METHOD aio_read
2641 #define READ_FUNCTION ll_file_aio_read
2642 #define WRITE_METHOD aio_write
2643 #define WRITE_FUNCTION ll_file_aio_write
2644 #endif
2645
2646 /* -o localflock - only provides locally consistent flock locks */
2647 struct file_operations ll_file_operations = {
2648         .read           = ll_file_read,
2649         .READ_METHOD    = READ_FUNCTION,
2650         .write          = ll_file_write,
2651         .WRITE_METHOD   = WRITE_FUNCTION,
2652         .unlocked_ioctl = ll_file_ioctl,
2653         .open           = ll_file_open,
2654         .release        = ll_file_release,
2655         .mmap           = ll_file_mmap,
2656         .llseek         = ll_file_seek,
2657 #ifdef HAVE_KERNEL_SENDFILE
2658         .sendfile       = ll_file_sendfile,
2659 #endif
2660 #ifdef HAVE_KERNEL_SPLICE_READ
2661         .splice_read    = ll_file_splice_read,
2662 #endif
2663         .fsync          = ll_fsync,
2664         .flush          = ll_flush
2665 };
2666
2667 struct file_operations ll_file_operations_flock = {
2668         .read           = ll_file_read,
2669         .READ_METHOD    = READ_FUNCTION,
2670         .write          = ll_file_write,
2671         .WRITE_METHOD   = WRITE_FUNCTION,
2672         .unlocked_ioctl = ll_file_ioctl,
2673         .open           = ll_file_open,
2674         .release        = ll_file_release,
2675         .mmap           = ll_file_mmap,
2676         .llseek         = ll_file_seek,
2677 #ifdef HAVE_KERNEL_SENDFILE
2678         .sendfile       = ll_file_sendfile,
2679 #endif
2680 #ifdef HAVE_KERNEL_SPLICE_READ
2681         .splice_read    = ll_file_splice_read,
2682 #endif
2683         .fsync          = ll_fsync,
2684         .flush          = ll_flush,
2685         .flock          = ll_file_flock,
2686         .lock           = ll_file_flock
2687 };
2688
2689 /* These are for -o noflock - to return ENOSYS on flock calls */
2690 struct file_operations ll_file_operations_noflock = {
2691         .read           = ll_file_read,
2692         .READ_METHOD    = READ_FUNCTION,
2693         .write          = ll_file_write,
2694         .WRITE_METHOD   = WRITE_FUNCTION,
2695         .unlocked_ioctl = ll_file_ioctl,
2696         .open           = ll_file_open,
2697         .release        = ll_file_release,
2698         .mmap           = ll_file_mmap,
2699         .llseek         = ll_file_seek,
2700 #ifdef HAVE_KERNEL_SENDFILE
2701         .sendfile       = ll_file_sendfile,
2702 #endif
2703 #ifdef HAVE_KERNEL_SPLICE_READ
2704         .splice_read    = ll_file_splice_read,
2705 #endif
2706         .fsync          = ll_fsync,
2707         .flush          = ll_flush,
2708         .flock          = ll_file_noflock,
2709         .lock           = ll_file_noflock
2710 };
2711
2712 struct inode_operations ll_file_inode_operations = {
2713         .setattr        = ll_setattr,
2714         .getattr        = ll_getattr,
2715         .permission     = ll_inode_permission,
2716         .setxattr       = ll_setxattr,
2717         .getxattr       = ll_getxattr,
2718         .listxattr      = ll_listxattr,
2719         .removexattr    = ll_removexattr,
2720 #ifdef  HAVE_LINUX_FIEMAP_H
2721         .fiemap         = ll_fiemap,
2722 #endif
2723 #ifdef HAVE_IOP_GET_ACL
2724         .get_acl        = ll_get_acl,
2725 #endif
2726 };
2727
2728 /* dynamic ioctl number support routins */
2729 static struct llioc_ctl_data {
2730         struct rw_semaphore     ioc_sem;
2731         cfs_list_t              ioc_head;
2732 } llioc = {
2733         __RWSEM_INITIALIZER(llioc.ioc_sem),
2734         CFS_LIST_HEAD_INIT(llioc.ioc_head)
2735 };
2736
2737
2738 struct llioc_data {
2739         cfs_list_t              iocd_list;
2740         unsigned int            iocd_size;
2741         llioc_callback_t        iocd_cb;
2742         unsigned int            iocd_count;
2743         unsigned int            iocd_cmd[0];
2744 };
2745
2746 void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2747 {
2748         unsigned int size;
2749         struct llioc_data *in_data = NULL;
2750         ENTRY;
2751
2752         if (cb == NULL || cmd == NULL ||
2753             count > LLIOC_MAX_CMD || count < 0)
2754                 RETURN(NULL);
2755
2756         size = sizeof(*in_data) + count * sizeof(unsigned int);
2757         OBD_ALLOC(in_data, size);
2758         if (in_data == NULL)
2759                 RETURN(NULL);
2760
2761         memset(in_data, 0, sizeof(*in_data));
2762         in_data->iocd_size = size;
2763         in_data->iocd_cb = cb;
2764         in_data->iocd_count = count;
2765         memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2766
2767         down_write(&llioc.ioc_sem);
2768         cfs_list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2769         up_write(&llioc.ioc_sem);
2770
2771         RETURN(in_data);
2772 }
2773
2774 void ll_iocontrol_unregister(void *magic)
2775 {
2776         struct llioc_data *tmp;
2777
2778         if (magic == NULL)
2779                 return;
2780
2781         down_write(&llioc.ioc_sem);
2782         cfs_list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2783                 if (tmp == magic) {
2784                         unsigned int size = tmp->iocd_size;
2785
2786                         cfs_list_del(&tmp->iocd_list);
2787                         up_write(&llioc.ioc_sem);
2788
2789                         OBD_FREE(tmp, size);
2790                         return;
2791                 }
2792         }
2793         up_write(&llioc.ioc_sem);
2794
2795         CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2796 }
2797
2798 EXPORT_SYMBOL(ll_iocontrol_register);
2799 EXPORT_SYMBOL(ll_iocontrol_unregister);
2800
2801 enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2802                         unsigned int cmd, unsigned long arg, int *rcp)
2803 {
2804         enum llioc_iter ret = LLIOC_CONT;
2805         struct llioc_data *data;
2806         int rc = -EINVAL, i;
2807
2808         down_read(&llioc.ioc_sem);
2809         cfs_list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2810                 for (i = 0; i < data->iocd_count; i++) {
2811                         if (cmd != data->iocd_cmd[i])
2812                                 continue;
2813
2814                         ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2815                         break;
2816                 }
2817
2818                 if (ret == LLIOC_STOP)
2819                         break;
2820         }
2821         up_read(&llioc.ioc_sem);
2822
2823         if (rcp)
2824                 *rcp = rc;
2825         return ret;
2826 }
2827
2828 int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2829 {
2830         struct ll_inode_info *lli = ll_i2info(inode);
2831         struct cl_env_nest nest;
2832         struct lu_env *env;
2833         int result;
2834         ENTRY;
2835
2836         if (lli->lli_clob == NULL)
2837                 RETURN(0);
2838
2839         env = cl_env_nested_get(&nest);
2840         if (IS_ERR(env))
2841                 RETURN(PTR_ERR(env));
2842
2843         result = cl_conf_set(env, lli->lli_clob, conf);
2844         cl_env_nested_put(&nest, env);
2845         RETURN(result);
2846 }
2847
2848 /**
2849  * This function checks if there exists a LAYOUT lock on the client side,
2850  * or enqueues it if it doesn't have one in cache.
2851  *
2852  * This function will not hold layout lock so it may be revoked any time after
2853  * this function returns. Any operations depend on layout should be redone
2854  * in that case.
2855  *
2856  * This function should be called before lov_io_init() to get an uptodate
2857  * layout version, the caller should save the version number and after IO
2858  * is finished, this function should be called again to verify that layout
2859  * is not changed during IO time.
2860  */
2861 int ll_layout_refresh(struct inode *inode, __u32 *gen)
2862 {
2863         struct ll_inode_info  *lli = ll_i2info(inode);
2864         struct ll_sb_info     *sbi = ll_i2sbi(inode);
2865         struct md_op_data     *op_data = NULL;
2866         struct lookup_intent   it = { .it_op = IT_LAYOUT };
2867         struct lustre_handle   lockh = { 0 };
2868         ldlm_mode_t            mode;
2869         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
2870                                            .ei_mode = LCK_CR,
2871                                            .ei_cb_bl = ll_md_blocking_ast,
2872                                            .ei_cb_cp = ldlm_completion_ast,
2873                                            .ei_cbdata = inode };
2874         int rc;
2875         ENTRY;
2876
2877         *gen = 0;
2878         if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
2879                 RETURN(0);
2880
2881         /* sanity checks */
2882         LASSERT(fid_is_sane(ll_inode2fid(inode)));
2883         LASSERT(S_ISREG(inode->i_mode));
2884
2885         /* mostly layout lock is caching on the local side, so try to match
2886          * it before grabbing layout lock mutex. */
2887         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2888                                 LDLM_FL_LVB_READY);
2889         if (mode != 0) { /* hit cached lock */
2890                 /* lsm_layout_gen is started from 0, plus 1 here to distinguish
2891                  * the cases of no layout and first layout. */
2892                 *gen = lli->lli_layout_gen + 1;
2893
2894                 ldlm_lock_decref(&lockh, mode);
2895                 RETURN(0);
2896         }
2897
2898         op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
2899                                      0, 0, LUSTRE_OPC_ANY, NULL);
2900         if (IS_ERR(op_data))
2901                 RETURN(PTR_ERR(op_data));
2902
2903         /* take layout lock mutex to enqueue layout lock exclusively. */
2904         mutex_lock(&lli->lli_layout_mutex);
2905
2906         /* try again inside layout mutex */
2907         mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
2908                                 LDLM_FL_LVB_READY);
2909         if (mode != 0) { /* hit cached lock */
2910                 *gen = lli->lli_layout_gen + 1;
2911
2912                 ldlm_lock_decref(&lockh, mode);
2913                 mutex_unlock(&lli->lli_layout_mutex);
2914                 ll_finish_md_op_data(op_data);
2915                 RETURN(0);
2916         }
2917
2918         /* have to enqueue one */
2919         rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
2920                         NULL, 0, NULL, 0);
2921         if (it.d.lustre.it_data != NULL)
2922                 ptlrpc_req_finished(it.d.lustre.it_data);
2923         it.d.lustre.it_data = NULL;
2924
2925         if (rc == 0) {
2926                 struct ldlm_lock *lock;
2927                 struct cl_object_conf conf;
2928                 struct lustre_md md = { NULL };
2929                 void *lmm;
2930                 int lmmsize;
2931
2932                 LASSERT(lustre_handle_is_used(&lockh));
2933
2934                 /* set lock data in case this is a new lock */
2935                 ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
2936
2937                 lock = ldlm_handle2lock(&lockh);
2938                 LASSERT(lock != NULL);
2939
2940                 /* for IT_LAYOUT lock, lmm is returned in lock's lvb
2941                  * data via completion callback */
2942                 lmm = lock->l_lvb_data;
2943                 lmmsize = lock->l_lvb_len;
2944                 if (lmm != NULL) {
2945                         rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
2946                                         lmm, lmmsize);
2947                         if (rc >= 0) {
2948                                 if (md.lsm != NULL)
2949                                         *gen = md.lsm->lsm_layout_gen + 1;
2950                                 rc = 0;
2951                         } else {
2952                                 CERROR("file: "DFID" unpackmd error: %d\n",
2953                                         PFID(&lli->lli_fid), rc);
2954                         }
2955                 }
2956                 LDLM_LOCK_PUT(lock);
2957
2958                 /* set layout to file. This may cause lock expiration as we
2959                  * set layout inside layout ibits lock. */
2960                 memset(&conf, 0, sizeof conf);
2961                 conf.coc_inode = inode;
2962                 conf.u.coc_md = &md;
2963                 ll_layout_conf(inode, &conf);
2964                 /* is this racy? */
2965                 lli->lli_has_smd = md.lsm != NULL;
2966                 if (md.lsm != NULL)
2967                         obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
2968         }
2969         ll_intent_drop_lock(&it);
2970
2971         mutex_unlock(&lli->lli_layout_mutex);
2972         ll_finish_md_op_data(op_data);
2973
2974         RETURN(rc);
2975 }