Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / llite_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002-2005 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/random.h>
29 #include <linux/version.h>
30 #include <linux/mm.h>
31
32 #include <lustre_lite.h>
33 #include <lustre_ha.h>
34 #include <lustre_dlm.h>
35 #include <lprocfs_status.h>
36 #include <lustre_disk.h>
37 #include <lustre_param.h>
38 #include <lustre_log.h>
39 #include <obd_cksum.h>
40 #include <lustre_cache.h>
41 #include "llite_internal.h"
42
43 cfs_mem_cache_t *ll_file_data_slab;
44
45 LIST_HEAD(ll_super_blocks);
46 spinlock_t ll_sb_lock = SPIN_LOCK_UNLOCKED;
47
48 extern struct address_space_operations ll_aops;
49 extern struct address_space_operations ll_dir_aops;
50
51 #ifndef log2
52 #define log2(n) ffz(~(n))
53 #endif
54
55
56 static struct ll_sb_info *ll_init_sbi(void)
57 {
58         struct ll_sb_info *sbi = NULL;
59         unsigned long pages;
60         struct sysinfo si;
61         class_uuid_t uuid;
62         int i;
63         ENTRY;
64
65         OBD_ALLOC(sbi, sizeof(*sbi));
66         if (!sbi)
67                 RETURN(NULL);
68
69         spin_lock_init(&sbi->ll_lock);
70         spin_lock_init(&sbi->ll_lco.lco_lock);
71         spin_lock_init(&sbi->ll_pp_extent_lock);
72         spin_lock_init(&sbi->ll_process_lock);
73         sbi->ll_rw_stats_on = 0;
74         INIT_LIST_HEAD(&sbi->ll_pglist);
75
76         si_meminfo(&si);
77         pages = si.totalram - si.totalhigh;
78         if (pages >> (20 - CFS_PAGE_SHIFT) < 512)
79                 sbi->ll_async_page_max = pages / 2;
80         else
81                 sbi->ll_async_page_max = (pages / 4) * 3;
82         sbi->ll_ra_info.ra_max_pages = min(num_physpages / 8,
83                                            SBI_DEFAULT_READAHEAD_MAX);
84         sbi->ll_ra_info.ra_max_read_ahead_whole_pages =
85                                            SBI_DEFAULT_READAHEAD_WHOLE_MAX;
86         sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS;
87         sbi->ll_lockless_truncate_enable = SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE;
88         INIT_LIST_HEAD(&sbi->ll_conn_chain);
89         INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
90
91         ll_generate_random_uuid(uuid);
92         class_uuid_unparse(uuid, &sbi->ll_sb_uuid);
93         CDEBUG(D_CONFIG, "generated uuid: %s\n", sbi->ll_sb_uuid.uuid);
94
95         spin_lock(&ll_sb_lock);
96         list_add_tail(&sbi->ll_list, &ll_super_blocks);
97         spin_unlock(&ll_sb_lock);
98
99 #ifdef ENABLE_LLITE_CHECKSUM
100         sbi->ll_flags |= LL_SBI_CHECKSUM;
101 #endif
102
103 #ifdef HAVE_LRU_RESIZE_SUPPORT
104         sbi->ll_flags |= LL_SBI_LRU_RESIZE;
105 #endif
106
107 #ifdef HAVE_EXPORT___IGET
108         INIT_LIST_HEAD(&sbi->ll_deathrow);
109         spin_lock_init(&sbi->ll_deathrow_lock);
110 #endif
111         for (i = 0; i <= LL_PROCESS_HIST_MAX; i++) {
112                 spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_r_hist.oh_lock);
113                 spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock);
114         }
115
116         /* metadata statahead is enabled by default */
117         sbi->ll_sa_max = LL_SA_RPC_DEF;
118
119         RETURN(sbi);
120 }
121
122 void ll_free_sbi(struct super_block *sb)
123 {
124         struct ll_sb_info *sbi = ll_s2sbi(sb);
125         ENTRY;
126
127         if (sbi != NULL) {
128                 spin_lock(&ll_sb_lock);
129                 list_del(&sbi->ll_list);
130                 spin_unlock(&ll_sb_lock);
131                 OBD_FREE(sbi, sizeof(*sbi));
132         }
133         EXIT;
134 }
135
136 static struct dentry_operations ll_d_root_ops = {
137 #ifdef DCACHE_LUSTRE_INVALID
138         .d_compare = ll_dcompare,
139 #endif
140 };
141
142 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
143  * us to make MDS RPCs with large enough reply buffers to hold the
144  * maximum-sized (= maximum striped) EA and cookie without having to
145  * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
146 static int ll_init_ea_size(struct obd_export *md_exp, struct obd_export *dt_exp)
147 {
148         struct lov_stripe_md lsm = { .lsm_magic = LOV_MAGIC };
149         __u32 valsize = sizeof(struct lov_desc);
150         int rc, easize, def_easize, cookiesize;
151         struct lov_desc desc;
152         __u32 stripes;
153         ENTRY;
154
155         rc = obd_get_info(dt_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
156                           &valsize, &desc);
157         if (rc)
158                 RETURN(rc);
159
160         stripes = min(desc.ld_tgt_count, (__u32)LOV_MAX_STRIPE_COUNT);
161         lsm.lsm_stripe_count = stripes;
162         easize = obd_size_diskmd(dt_exp, &lsm);
163
164         lsm.lsm_stripe_count = desc.ld_default_stripe_count;
165         def_easize = obd_size_diskmd(dt_exp, &lsm);
166
167         cookiesize = stripes * sizeof(struct llog_cookie);
168
169         CDEBUG(D_HA, "updating max_mdsize/max_cookiesize: %d/%d\n",
170                easize, cookiesize);
171
172         rc = md_init_ea_size(md_exp, easize, def_easize, cookiesize);
173         RETURN(rc);
174 }
175
176 static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
177 {
178         struct inode *root = 0;
179         struct ll_sb_info *sbi = ll_s2sbi(sb);
180         struct obd_device *obd;
181         struct lu_fid rootfid;
182         struct obd_capa *oc = NULL;
183         struct obd_statfs osfs;
184         struct ptlrpc_request *request = NULL;
185         struct lustre_handle dt_conn = {0, };
186         struct lustre_handle md_conn = {0, };
187         struct obd_connect_data *data = NULL;
188         struct lustre_md lmd;
189         obd_valid valid;
190         int size, err, checksum;
191         ENTRY;
192
193         obd = class_name2obd(md);
194         if (!obd) {
195                 CERROR("MD %s: not setup or attached\n", md);
196                 RETURN(-EINVAL);
197         }
198
199         OBD_ALLOC_PTR(data);
200         if (data == NULL)
201                 RETURN(-ENOMEM);
202
203         if (proc_lustre_fs_root) {
204                 err = lprocfs_register_mountpoint(proc_lustre_fs_root, sb,
205                                                   dt, md);
206                 if (err < 0)
207                         CERROR("could not register mount in /proc/fs/lustre\n");
208         }
209
210         /* indicate the features supported by this client */
211         data->ocd_connect_flags = OBD_CONNECT_IBITS    | OBD_CONNECT_NODEVOH  |
212                                   OBD_CONNECT_JOIN     | OBD_CONNECT_ATTRFID  |
213                                   OBD_CONNECT_VERSION  | OBD_CONNECT_MDS_CAPA |
214                                   OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET|
215                                   OBD_CONNECT_FID      | OBD_CONNECT_AT;
216
217 #ifdef HAVE_LRU_RESIZE_SUPPORT
218         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
219                 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
220 #endif
221 #ifdef CONFIG_FS_POSIX_ACL
222         data->ocd_connect_flags |= OBD_CONNECT_ACL;
223 #endif
224         data->ocd_ibits_known = MDS_INODELOCK_FULL;
225         data->ocd_version = LUSTRE_VERSION_CODE;
226
227         if (sb->s_flags & MS_RDONLY)
228                 data->ocd_connect_flags |= OBD_CONNECT_RDONLY;
229         if (sbi->ll_flags & LL_SBI_USER_XATTR)
230                 data->ocd_connect_flags |= OBD_CONNECT_XATTR;
231
232 #ifdef HAVE_MS_FLOCK_LOCK
233         /* force vfs to use lustre handler for flock() calls - bug 10743 */
234         sb->s_flags |= MS_FLOCK_LOCK;
235 #endif
236         
237         if (sbi->ll_flags & LL_SBI_FLOCK)
238                 sbi->ll_fop = &ll_file_operations_flock;
239         else if (sbi->ll_flags & LL_SBI_LOCALFLOCK)
240                 sbi->ll_fop = &ll_file_operations;
241         else
242                 sbi->ll_fop = &ll_file_operations_noflock;
243
244         /* real client */
245         data->ocd_connect_flags |= OBD_CONNECT_REAL;
246         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
247                 data->ocd_connect_flags &= ~OBD_CONNECT_LCL_CLIENT;
248                 data->ocd_connect_flags |= OBD_CONNECT_RMT_CLIENT;
249         } else {
250                 data->ocd_connect_flags &= ~OBD_CONNECT_RMT_CLIENT;
251                 data->ocd_connect_flags |= OBD_CONNECT_LCL_CLIENT;
252         }
253
254         err = obd_connect(NULL, &md_conn, obd, &sbi->ll_sb_uuid, data, NULL);
255         if (err == -EBUSY) {
256                 LCONSOLE_ERROR_MSG(0x14f, "An MDT (md %s) is performing "
257                                    "recovery, of which this client is not a "
258                                    "part. Please wait for recovery to complete,"
259                                    " abort, or time out.\n", md);
260                 GOTO(out, err);
261         } else if (err) {
262                 CERROR("cannot connect to %s: rc = %d\n", md, err);
263                 GOTO(out, err);
264         }
265         sbi->ll_md_exp = class_conn2export(&md_conn);
266
267         err = obd_fid_init(sbi->ll_md_exp);
268         if (err) {
269                 CERROR("Can't init metadata layer FID infrastructure, "
270                        "rc %d\n", err);
271                 GOTO(out_md, err);
272         }
273
274         err = obd_statfs(obd, &osfs, cfs_time_current_64() - HZ, 0);
275         if (err)
276                 GOTO(out_md_fid, err);
277
278         size = sizeof(*data);
279         err = obd_get_info(sbi->ll_md_exp, sizeof(KEY_CONN_DATA),
280                            KEY_CONN_DATA,  &size, data);
281         if (err) {
282                 CERROR("Get connect data failed: %d \n", err);
283                 GOTO(out_md, err);
284         }
285
286         LASSERT(osfs.os_bsize);
287         sb->s_blocksize = osfs.os_bsize;
288         sb->s_blocksize_bits = log2(osfs.os_bsize);
289         sb->s_magic = LL_SUPER_MAGIC;
290
291         /* for bug 11559. in $LINUX/fs/read_write.c, function do_sendfile():
292          *         retval = in_file->f_op->sendfile(...);
293          *         if (*ppos > max)
294          *                 retval = -EOVERFLOW;
295          *
296          * it will check if *ppos is greater than max. However, max equals to
297          * s_maxbytes, which is a negative integer in a x86_64 box since loff_t
298          * has been defined as a signed long long ineger in linux kernel. */
299 #if BITS_PER_LONG == 64
300         sb->s_maxbytes = PAGE_CACHE_MAXBYTES >> 1;
301 #else
302         sb->s_maxbytes = PAGE_CACHE_MAXBYTES;
303 #endif
304         sbi->ll_namelen = osfs.os_namelen;
305         sbi->ll_max_rw_chunk = LL_DEFAULT_MAX_RW_CHUNK;
306
307         if ((sbi->ll_flags & LL_SBI_USER_XATTR) &&
308             !(data->ocd_connect_flags & OBD_CONNECT_XATTR)) {
309                 LCONSOLE_INFO("Disabling user_xattr feature because "
310                               "it is not supported on the server\n");
311                 sbi->ll_flags &= ~LL_SBI_USER_XATTR;
312         }
313
314         if (data->ocd_connect_flags & OBD_CONNECT_ACL) {
315 #ifdef MS_POSIXACL
316                 sb->s_flags |= MS_POSIXACL;
317 #endif
318                 sbi->ll_flags |= LL_SBI_ACL;
319         } else {
320                 LCONSOLE_INFO("client wants to enable acl, but mdt not!\n");
321 #ifdef MS_POSIXACL
322                 sb->s_flags &= ~MS_POSIXACL;
323 #endif
324                 sbi->ll_flags &= ~LL_SBI_ACL;
325         }
326
327         if (data->ocd_connect_flags & OBD_CONNECT_JOIN)
328                 sbi->ll_flags |= LL_SBI_JOIN;
329
330         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
331                 if (!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT)) {
332                         /* sometimes local client claims to be remote, but mdt
333                          * will disagree when client gss not applied. */
334                         LCONSOLE_INFO("client claims to be remote, but server "
335                                       "rejected, forced to be local.\n");
336                         sbi->ll_flags &= ~LL_SBI_RMT_CLIENT;
337                 }
338         } else {
339                 if (!(data->ocd_connect_flags & OBD_CONNECT_LCL_CLIENT)) {
340                         /* with gss applied, remote client can not claim to be
341                          * local, so mdt maybe force client to be remote. */
342                         LCONSOLE_INFO("client claims to be local, but server "
343                                       "rejected, forced to be remote.\n");
344                         sbi->ll_flags |= LL_SBI_RMT_CLIENT;
345                 }
346         }
347
348         if (data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) {
349                 LCONSOLE_INFO("client enabled MDS capability!\n");
350                 sbi->ll_flags |= LL_SBI_MDS_CAPA;
351         }
352
353         if (data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA) {
354                 LCONSOLE_INFO("client enabled OSS capability!\n");
355                 sbi->ll_flags |= LL_SBI_OSS_CAPA;
356         }
357
358         sbi->ll_sdev_orig = sb->s_dev;
359 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0))
360         /* We set sb->s_dev equal on all lustre clients in order to support
361          * NFS export clustering.  NFSD requires that the FSID be the same
362          * on all clients. */
363         /* s_dev is also used in lt_compare() to compare two fs, but that is
364          * only a node-local comparison. */
365
366         /* XXX: this will not work with LMV */
367         sb->s_dev = get_uuid2int(sbi2mdc(sbi)->cl_target_uuid.uuid,
368                                  strlen(sbi2mdc(sbi)->cl_target_uuid.uuid));
369 #endif
370
371         obd = class_name2obd(dt);
372         if (!obd) {
373                 CERROR("DT %s: not setup or attached\n", dt);
374                 GOTO(out_md_fid, err = -ENODEV);
375         }
376
377         data->ocd_connect_flags = OBD_CONNECT_GRANT     | OBD_CONNECT_VERSION  |
378                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
379                                   OBD_CONNECT_CANCELSET | OBD_CONNECT_FID      |
380                                   OBD_CONNECT_SRVLOCK   | OBD_CONNECT_TRUNCLOCK|
381                                   OBD_CONNECT_AT;
382         if (sbi->ll_flags & LL_SBI_OSS_CAPA)
383                 data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA;
384
385         if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
386                 /* OBD_CONNECT_CKSUM should always be set, even if checksums are
387                  * disabled by default, because it can still be enabled on the
388                  * fly via /proc. As a consequence, we still need to come to an
389                  * agreement on the supported algorithms at connect time */
390                 data->ocd_connect_flags |= OBD_CONNECT_CKSUM;
391
392                 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_CKSUM_ADLER_ONLY))
393                         data->ocd_cksum_types = OBD_CKSUM_ADLER;
394                 else
395                         /* send the list of supported checksum types */
396                         data->ocd_cksum_types = OBD_CKSUM_ALL;
397         }
398
399 #ifdef HAVE_LRU_RESIZE_SUPPORT
400         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
401 #endif
402         CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d "
403                "ocd_grant: %d\n", data->ocd_connect_flags,
404                data->ocd_version, data->ocd_grant);
405
406         obd->obd_upcall.onu_owner = &sbi->ll_lco;
407         obd->obd_upcall.onu_upcall = ll_ocd_update;
408         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
409
410         err = obd_connect(NULL, &dt_conn, obd, &sbi->ll_sb_uuid, data, NULL);
411         if (err == -EBUSY) {
412                 LCONSOLE_ERROR_MSG(0x150, "An OST (dt %s) is performing "
413                                    "recovery, of which this client is not a "
414                                    "part.  Please wait for recovery to "
415                                    "complete, abort, or time out.\n", dt);
416                 GOTO(out_md_fid, err);
417         } else if (err) {
418                 CERROR("Cannot connect to %s: rc = %d\n", dt, err);
419                 GOTO(out_md_fid, err);
420         }
421
422         sbi->ll_dt_exp = class_conn2export(&dt_conn);
423
424         err = obd_fid_init(sbi->ll_dt_exp);
425         if (err) {
426                 CERROR("Can't init data layer FID infrastructure, "
427                        "rc %d\n", err);
428                 GOTO(out_dt, err);
429         }
430         
431         spin_lock(&sbi->ll_lco.lco_lock);
432         sbi->ll_lco.lco_flags = data->ocd_connect_flags;
433         spin_unlock(&sbi->ll_lco.lco_lock);
434
435         err = obd_register_page_removal_cb(sbi->ll_dt_exp,
436                                            ll_page_removal_cb, 
437                                            ll_pin_extent_cb);
438         if (err) {
439                 CERROR("cannot register page removal callback: rc = %d\n",err);
440                 GOTO(out_dt, err);
441         }
442         err = obd_register_lock_cancel_cb(sbi->ll_dt_exp,
443                                           ll_extent_lock_cancel_cb);
444         if (err) {
445                 CERROR("cannot register lock cancel callback: rc = %d\n", err);
446                 GOTO(out_page_rm_cb, err);
447         }
448
449         err = ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);;
450         if (err) {
451                 CERROR("cannot set max EA and cookie sizes: rc = %d\n", err);
452                 GOTO(out_lock_cn_cb, err);
453         }
454
455         err = obd_prep_async_page(sbi->ll_dt_exp, NULL, NULL, NULL,
456                                   0, NULL, NULL, NULL, 0, NULL);
457         if (err < 0) {
458                 LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this "
459                                    "filesystem. There must be at least one "
460                                    "active OST for a client to start.\n");
461                 GOTO(out_lock_cn_cb, err);
462         }
463
464         if (!ll_async_page_slab) {
465                 ll_async_page_slab_size =
466                         size_round(sizeof(struct ll_async_page)) + err;
467                 ll_async_page_slab = cfs_mem_cache_create("ll_async_page",
468                                                           ll_async_page_slab_size,
469                                                           0, 0);
470                 if (!ll_async_page_slab)
471                         GOTO(out_lock_cn_cb, err = -ENOMEM);
472         }
473
474         err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc);
475         if (err) {
476                 CERROR("cannot mds_connect: rc = %d\n", err);
477                 GOTO(out_lock_cn_cb, err);
478         }
479         CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
480         sbi->ll_root_fid = rootfid;
481
482         sb->s_op = &lustre_super_operations;
483         sb->s_export_op = &lustre_export_operations;
484
485         /* make root inode
486          * XXX: move this to after cbd setup? */
487         valid = OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS | OBD_MD_FLMDSCAPA;
488         if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
489                 valid |= OBD_MD_FLRMTPERM;
490         else if (sbi->ll_flags & LL_SBI_ACL)
491                 valid |= OBD_MD_FLACL;
492
493         err = md_getattr(sbi->ll_md_exp, &rootfid, oc, valid, 0, &request);
494         if (oc)
495                 free_capa(oc);
496         if (err) {
497                 CERROR("md_getattr failed for root: rc = %d\n", err);
498                 GOTO(out_lock_cn_cb, err);
499         }
500         memset(&lmd, 0, sizeof(lmd));
501         err = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp,
502                                sbi->ll_md_exp, &lmd);
503         if (err) {
504                 CERROR("failed to understand root inode md: rc = %d\n", err);
505                 ptlrpc_req_finished (request);
506                 GOTO(out_lock_cn_cb, err);
507         }
508
509         LASSERT(fid_is_sane(&sbi->ll_root_fid));
510         root = ll_iget(sb, ll_fid_build_ino(sbi, &sbi->ll_root_fid), &lmd);
511         md_free_lustre_md(sbi->ll_md_exp, &lmd);
512         ptlrpc_req_finished(request);
513
514         if (root == NULL || is_bad_inode(root)) {
515                 if (lmd.lsm)
516                         obd_free_memmd(sbi->ll_dt_exp, &lmd.lsm);
517 #ifdef CONFIG_FS_POSIX_ACL
518                 if (lmd.posix_acl) {
519                         posix_acl_release(lmd.posix_acl);
520                         lmd.posix_acl = NULL;
521                 }
522 #endif
523                 CERROR("lustre_lite: bad iget4 for root\n");
524                 GOTO(out_root, err = -EBADF);
525         }
526
527         err = ll_close_thread_start(&sbi->ll_lcq);
528         if (err) {
529                 CERROR("cannot start close thread: rc %d\n", err);
530                 GOTO(out_root, err);
531         }
532
533 #ifdef CONFIG_FS_POSIX_ACL
534         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
535                 rct_init(&sbi->ll_rct);
536                 et_init(&sbi->ll_et);
537         }
538 #endif
539
540         checksum = sbi->ll_flags & LL_SBI_CHECKSUM;
541         err = obd_set_info_async(sbi->ll_dt_exp, sizeof(KEY_CHECKSUM),
542                                  KEY_CHECKSUM, sizeof(checksum), &checksum,
543                                  NULL);
544
545         sb->s_root = d_alloc_root(root);
546         if (data != NULL)
547                 OBD_FREE(data, sizeof(*data));
548         sb->s_root->d_op = &ll_d_root_ops;
549         RETURN(err);
550 out_root:
551         if (root)
552                 iput(root);
553 out_lock_cn_cb:
554         obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,
555                                       ll_extent_lock_cancel_cb);
556 out_page_rm_cb:
557         obd_unregister_page_removal_cb(sbi->ll_dt_exp,
558                                        ll_page_removal_cb);
559         obd_fid_fini(sbi->ll_dt_exp);
560 out_dt:
561         obd_disconnect(sbi->ll_dt_exp);
562         sbi->ll_dt_exp = NULL;
563 out_md_fid:
564         obd_fid_fini(sbi->ll_md_exp);
565 out_md:
566         obd_disconnect(sbi->ll_md_exp);
567         sbi->ll_md_exp = NULL;
568 out:
569         if (data != NULL)
570                 OBD_FREE_PTR(data);
571         lprocfs_unregister_mountpoint(sbi);
572         return err;
573 }
574
575 int ll_get_max_mdsize(struct ll_sb_info *sbi, int *lmmsize)
576 {
577         int size, rc;
578
579         *lmmsize = obd_size_diskmd(sbi->ll_dt_exp, NULL);
580         size = sizeof(int);
581         rc = obd_get_info(sbi->ll_md_exp, sizeof(KEY_MAX_EASIZE),
582                           KEY_MAX_EASIZE, &size, lmmsize);
583         if (rc)
584                 CERROR("Get max mdsize error rc %d \n", rc);
585
586         RETURN(rc);
587 }
588
589 void ll_dump_inode(struct inode *inode)
590 {
591         struct list_head *tmp;
592         int dentry_count = 0;
593
594         LASSERT(inode != NULL);
595
596         list_for_each(tmp, &inode->i_dentry)
597                 dentry_count++;
598
599         CERROR("inode %p dump: dev=%s ino=%lu mode=%o count=%u, %d dentries\n",
600                inode, ll_i2mdexp(inode)->exp_obd->obd_name, inode->i_ino,
601                inode->i_mode, atomic_read(&inode->i_count), dentry_count);
602 }
603
604 void lustre_dump_dentry(struct dentry *dentry, int recur)
605 {
606         struct list_head *tmp;
607         int subdirs = 0;
608
609         LASSERT(dentry != NULL);
610
611         list_for_each(tmp, &dentry->d_subdirs)
612                 subdirs++;
613
614         CERROR("dentry %p dump: name=%.*s parent=%.*s (%p), inode=%p, count=%u,"
615                " flags=0x%x, fsdata=%p, %d subdirs\n", dentry,
616                dentry->d_name.len, dentry->d_name.name,
617                dentry->d_parent->d_name.len, dentry->d_parent->d_name.name,
618                dentry->d_parent, dentry->d_inode, atomic_read(&dentry->d_count),
619                dentry->d_flags, dentry->d_fsdata, subdirs);
620         if (dentry->d_inode != NULL)
621                 ll_dump_inode(dentry->d_inode);
622
623         if (recur == 0)
624                 return;
625
626         list_for_each(tmp, &dentry->d_subdirs) {
627                 struct dentry *d = list_entry(tmp, struct dentry, d_child);
628                 lustre_dump_dentry(d, recur - 1);
629         }
630 }
631
632 #ifdef HAVE_EXPORT___IGET
633 static void prune_dir_dentries(struct inode *inode)
634 {
635         struct dentry *dentry, *prev = NULL;
636
637         /* due to lustre specific logic, a directory
638          * can have few dentries - a bug from VFS POV */
639 restart:
640         spin_lock(&dcache_lock);
641         if (!list_empty(&inode->i_dentry)) {
642                 dentry = list_entry(inode->i_dentry.prev,
643                                     struct dentry, d_alias);
644                 /* in order to prevent infinite loops we
645                  * break if previous dentry is busy */
646                 if (dentry != prev) {
647                         prev = dentry;
648                         dget_locked(dentry);
649                         spin_unlock(&dcache_lock);
650
651                         /* try to kill all child dentries */
652                         lock_dentry(dentry);
653                         shrink_dcache_parent(dentry);
654                         unlock_dentry(dentry);
655                         dput(dentry);
656
657                         /* now try to get rid of current dentry */
658                         d_prune_aliases(inode);
659                         goto restart;
660                 }
661         }
662         spin_unlock(&dcache_lock);
663 }
664
665 static void prune_deathrow_one(struct ll_inode_info *lli)
666 {
667         struct inode *inode = ll_info2i(lli);
668
669         /* first, try to drop any dentries - they hold a ref on the inode */
670         if (S_ISDIR(inode->i_mode))
671                 prune_dir_dentries(inode);
672         else
673                 d_prune_aliases(inode);
674
675
676         /* if somebody still uses it, leave it */
677         LASSERT(atomic_read(&inode->i_count) > 0);
678         if (atomic_read(&inode->i_count) > 1)
679                 goto out;
680
681         CDEBUG(D_INODE, "inode %lu/%u(%d) looks a good candidate for prune\n",
682                inode->i_ino,inode->i_generation, atomic_read(&inode->i_count));
683
684         /* seems nobody uses it anymore */
685         inode->i_nlink = 0;
686
687 out:
688         iput(inode);
689         return;
690 }
691
692 static void prune_deathrow(struct ll_sb_info *sbi, int try)
693 {
694         struct ll_inode_info *lli;
695         int empty;
696
697         do {
698                 if (need_resched() && try)
699                         break;
700
701                 if (try) {
702                         if (!spin_trylock(&sbi->ll_deathrow_lock))
703                                 break;
704                 } else {
705                         spin_lock(&sbi->ll_deathrow_lock);
706                 }
707
708                 empty = 1;
709                 lli = NULL;
710                 if (!list_empty(&sbi->ll_deathrow)) {
711                         lli = list_entry(sbi->ll_deathrow.next,
712                                          struct ll_inode_info,
713                                          lli_dead_list);
714                         list_del_init(&lli->lli_dead_list);
715                         if (!list_empty(&sbi->ll_deathrow))
716                                 empty = 0;
717                 }
718                 spin_unlock(&sbi->ll_deathrow_lock);
719
720                 if (lli)
721                         prune_deathrow_one(lli);
722
723         } while (empty == 0);
724 }
725 #else /* !HAVE_EXPORT___IGET */
726 #define prune_deathrow(sbi, try) do {} while (0)
727 #endif /* HAVE_EXPORT___IGET */
728
729 void client_common_put_super(struct super_block *sb)
730 {
731         struct ll_sb_info *sbi = ll_s2sbi(sb);
732         ENTRY;
733
734 #ifdef CONFIG_FS_POSIX_ACL
735         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
736                 et_fini(&sbi->ll_et);
737                 rct_fini(&sbi->ll_rct);
738         }
739 #endif
740
741         obd_cancel_unused(sbi->ll_dt_exp, NULL, 0, NULL);
742
743         ll_close_thread_shutdown(sbi->ll_lcq);
744
745         /* destroy inodes in deathrow */
746         prune_deathrow(sbi, 0);
747
748         list_del(&sbi->ll_conn_chain);
749
750         obd_fid_fini(sbi->ll_dt_exp);
751         obd_disconnect(sbi->ll_dt_exp);
752         sbi->ll_dt_exp = NULL;
753
754         lprocfs_unregister_mountpoint(sbi);
755
756         obd_fid_fini(sbi->ll_md_exp);
757         obd_disconnect(sbi->ll_md_exp);
758         sbi->ll_md_exp = NULL;
759
760         EXIT;
761 }
762
763 void ll_kill_super(struct super_block *sb)
764 {
765         struct ll_sb_info *sbi;
766
767         ENTRY;
768
769         /* not init sb ?*/
770         if (!(sb->s_flags & MS_ACTIVE))
771                 return;
772
773         sbi = ll_s2sbi(sb);
774         /* we need restore s_dev from changed for clustred NFS before put_super
775          * because new kernels have cached s_dev and change sb->s_dev in
776          * put_super not affected real removing devices */
777         if (sbi)
778                 sb->s_dev = sbi->ll_sdev_orig;
779         EXIT;
780 }
781
782 char *ll_read_opt(const char *opt, char *data)
783 {
784         char *value;
785         char *retval;
786         ENTRY;
787
788         CDEBUG(D_SUPER, "option: %s, data %s\n", opt, data);
789         if (strncmp(opt, data, strlen(opt)))
790                 RETURN(NULL);
791         if ((value = strchr(data, '=')) == NULL)
792                 RETURN(NULL);
793
794         value++;
795         OBD_ALLOC(retval, strlen(value) + 1);
796         if (!retval) {
797                 CERROR("out of memory!\n");
798                 RETURN(NULL);
799         }
800
801         memcpy(retval, value, strlen(value)+1);
802         CDEBUG(D_SUPER, "Assigned option: %s, value %s\n", opt, retval);
803         RETURN(retval);
804 }
805
806 static inline int ll_set_opt(const char *opt, char *data, int fl)
807 {
808         if (strncmp(opt, data, strlen(opt)) != 0)
809                 return(0);
810         else
811                 return(fl);
812 }
813
814 /* non-client-specific mount options are parsed in lmd_parse */
815 static int ll_options(char *options, int *flags)
816 {
817         int tmp;
818         char *s1 = options, *s2;
819         ENTRY;
820
821         if (!options) 
822                 RETURN(0);
823
824         CDEBUG(D_CONFIG, "Parsing opts %s\n", options);
825
826         while (*s1) {
827                 CDEBUG(D_SUPER, "next opt=%s\n", s1);
828                 tmp = ll_set_opt("nolock", s1, LL_SBI_NOLCK);
829                 if (tmp) {
830                         *flags |= tmp;
831                         goto next;
832                 }
833                 tmp = ll_set_opt("flock", s1, LL_SBI_FLOCK);
834                 if (tmp) {
835                         *flags |= tmp;
836                         goto next;
837                 }
838                 tmp = ll_set_opt("localflock", s1, LL_SBI_LOCALFLOCK);
839                 if (tmp) {
840                         *flags |= tmp;
841                         goto next;
842                 }
843                 tmp = ll_set_opt("noflock", s1, LL_SBI_FLOCK|LL_SBI_LOCALFLOCK);
844                 if (tmp) {
845                         *flags &= ~tmp;
846                         goto next;
847                 }
848                 tmp = ll_set_opt("user_xattr", s1, LL_SBI_USER_XATTR);
849                 if (tmp) {
850                         *flags |= tmp;
851                         goto next;
852                 }
853                 tmp = ll_set_opt("nouser_xattr", s1, LL_SBI_USER_XATTR);
854                 if (tmp) {
855                         *flags &= ~tmp;
856                         goto next;
857                 }
858                 tmp = ll_set_opt("acl", s1, LL_SBI_ACL);
859                 if (tmp) {
860                         /* Ignore deprecated mount option.  The client will
861                          * always try to mount with ACL support, whether this
862                          * is used depends on whether server supports it. */
863                         goto next;
864                 }
865                 tmp = ll_set_opt("noacl", s1, LL_SBI_ACL);
866                 if (tmp) {
867                         goto next;
868                 }
869                 tmp = ll_set_opt("remote_client", s1, LL_SBI_RMT_CLIENT);
870                 if (tmp) {
871                         *flags |= tmp;
872                         goto next;
873                 }
874
875                 tmp = ll_set_opt("checksum", s1, LL_SBI_CHECKSUM);
876                 if (tmp) {
877                         *flags |= tmp;
878                         goto next;
879                 }
880                 tmp = ll_set_opt("nochecksum", s1, LL_SBI_CHECKSUM);
881                 if (tmp) {
882                         *flags &= ~tmp;
883                         goto next;
884                 }
885                 tmp = ll_set_opt("lruresize", s1, LL_SBI_LRU_RESIZE);
886                 if (tmp) {
887                         *flags |= tmp;
888                         goto next;
889                 }
890                 tmp = ll_set_opt("nolruresize", s1, LL_SBI_LRU_RESIZE);
891                 if (tmp) {
892                         *flags &= ~tmp;
893                         goto next;
894                 }
895
896                 LCONSOLE_ERROR_MSG(0x152, "Unknown option '%s', won't mount.\n",
897                                    s1);
898                 RETURN(-EINVAL);
899
900 next:
901                 /* Find next opt */
902                 s2 = strchr(s1, ',');
903                 if (s2 == NULL)
904                         break;
905                 s1 = s2 + 1;
906         }
907         RETURN(0);
908 }
909
910 void ll_lli_init(struct ll_inode_info *lli)
911 {
912         lli->lli_inode_magic = LLI_INODE_MAGIC;
913         sema_init(&lli->lli_size_sem, 1);
914         sema_init(&lli->lli_write_sem, 1);
915         lli->lli_flags = 0;
916         lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
917         spin_lock_init(&lli->lli_lock);
918         INIT_LIST_HEAD(&lli->lli_pending_write_llaps);
919         INIT_LIST_HEAD(&lli->lli_close_list);
920         lli->lli_inode_magic = LLI_INODE_MAGIC;
921         sema_init(&lli->lli_och_sem, 1);
922         lli->lli_mds_read_och = lli->lli_mds_write_och = NULL;
923         lli->lli_mds_exec_och = NULL;
924         lli->lli_open_fd_read_count = lli->lli_open_fd_write_count = 0;
925         lli->lli_open_fd_exec_count = 0;
926         INIT_LIST_HEAD(&lli->lli_dead_list);
927         lli->lli_remote_perms = NULL;
928         lli->lli_rmtperm_utime = 0;
929         sema_init(&lli->lli_rmtperm_sem, 1);
930         INIT_LIST_HEAD(&lli->lli_oss_capas);
931 }
932
933 int ll_fill_super(struct super_block *sb)
934 {
935         struct lustre_profile *lprof;
936         struct lustre_sb_info *lsi = s2lsi(sb);
937         struct ll_sb_info *sbi;
938         char  *dt = NULL, *md = NULL;
939         char  *profilenm = get_profile_name(sb);
940         struct config_llog_instance cfg = {0, };
941         char   ll_instance[sizeof(sb) * 2 + 1];
942         int    err;
943         ENTRY;
944
945         CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb);
946
947         cfs_module_get();
948
949         /* client additional sb info */
950         lsi->lsi_llsbi = sbi = ll_init_sbi();
951         if (!sbi) {
952                 cfs_module_put();
953                 RETURN(-ENOMEM);
954         }
955
956         err = ll_options(lsi->lsi_lmd->lmd_opts, &sbi->ll_flags);
957         if (err) 
958                 GOTO(out_free, err);
959
960         /* Generate a string unique to this super, in case some joker tries
961            to mount the same fs at two mount points.
962            Use the address of the super itself.*/
963         sprintf(ll_instance, "%p", sb);
964         cfg.cfg_instance = ll_instance;
965         cfg.cfg_uuid = lsi->lsi_llsbi->ll_sb_uuid;
966
967         /* set up client obds */
968         err = lustre_process_log(sb, profilenm, &cfg);
969         if (err < 0) {
970                 CERROR("Unable to process log: %d\n", err);
971                 GOTO(out_free, err);
972         }
973
974         lprof = class_get_profile(profilenm);
975         if (lprof == NULL) {
976                 LCONSOLE_ERROR_MSG(0x156, "The client profile '%s' could not be"
977                                    " read from the MGS.  Does that filesystem "
978                                    "exist?\n", profilenm);
979                 GOTO(out_free, err = -EINVAL);
980         }
981         CDEBUG(D_CONFIG, "Found profile %s: mdc=%s osc=%s\n", profilenm,
982                lprof->lp_md, lprof->lp_dt);
983
984         OBD_ALLOC(dt, strlen(lprof->lp_dt) +
985                   strlen(ll_instance) + 2);
986         if (!dt)
987                 GOTO(out_free, err = -ENOMEM);
988         sprintf(dt, "%s-%s", lprof->lp_dt, ll_instance);
989
990         OBD_ALLOC(md, strlen(lprof->lp_md) +
991                   strlen(ll_instance) + 2);
992         if (!md)
993                 GOTO(out_free, err = -ENOMEM);
994         sprintf(md, "%s-%s", lprof->lp_md, ll_instance);
995
996         /* connections, registrations, sb setup */
997         err = client_common_fill_super(sb, md, dt);
998
999 out_free:
1000         if (md)
1001                 OBD_FREE(md, strlen(md) + 1);
1002         if (dt)
1003                 OBD_FREE(dt, strlen(dt) + 1);
1004         if (err) 
1005                 ll_put_super(sb);
1006         else
1007                 LCONSOLE_WARN("Client %s has started\n", profilenm);        
1008
1009         RETURN(err);
1010 } /* ll_fill_super */
1011
1012
1013 void ll_put_super(struct super_block *sb)
1014 {
1015         struct config_llog_instance cfg;
1016         char   ll_instance[sizeof(sb) * 2 + 1];
1017         struct obd_device *obd;
1018         struct lustre_sb_info *lsi = s2lsi(sb);
1019         struct ll_sb_info *sbi = ll_s2sbi(sb);
1020         char *profilenm = get_profile_name(sb);
1021         int force = 1, next;
1022         ENTRY;
1023
1024         CDEBUG(D_VFSTRACE, "VFS Op: sb %p - %s\n", sb, profilenm);
1025
1026         ll_print_capa_stat(sbi);
1027
1028         sprintf(ll_instance, "%p", sb);
1029         cfg.cfg_instance = ll_instance;
1030         lustre_end_log(sb, NULL, &cfg);
1031         
1032         if (sbi->ll_md_exp) {
1033                 obd = class_exp2obd(sbi->ll_md_exp);
1034                 if (obd) 
1035                         force = obd->obd_force;
1036         }
1037         
1038         /* We need to set force before the lov_disconnect in 
1039            lustre_common_put_super, since l_d cleans up osc's as well. */
1040         if (force) {
1041                 next = 0;
1042                 while ((obd = class_devices_in_group(&sbi->ll_sb_uuid,
1043                                                      &next)) != NULL) {
1044                         obd->obd_force = force;
1045                 }
1046         }                       
1047
1048         if (sbi->ll_lcq) {
1049                 /* Only if client_common_fill_super succeeded */
1050                 client_common_put_super(sb);
1051         }
1052         next = 0;
1053         while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) !=NULL) {
1054                 class_manual_cleanup(obd);
1055         }
1056
1057         if (profilenm)
1058                 class_del_profile(profilenm);
1059
1060         ll_free_sbi(sb);
1061         lsi->lsi_llsbi = NULL;
1062
1063         lustre_common_put_super(sb);
1064
1065         LCONSOLE_WARN("client %s umount complete\n", ll_instance);
1066         
1067         cfs_module_put();
1068
1069         EXIT;
1070 } /* client_put_super */
1071
1072 #ifdef HAVE_REGISTER_CACHE
1073 #include <linux/cache_def.h>
1074 #ifdef HAVE_CACHE_RETURN_INT
1075 static int
1076 #else
1077 static void
1078 #endif
1079 ll_shrink_cache(int priority, unsigned int gfp_mask)
1080 {
1081         struct ll_sb_info *sbi;
1082         int count = 0;
1083
1084         list_for_each_entry(sbi, &ll_super_blocks, ll_list)
1085                 count += llap_shrink_cache(sbi, priority);
1086
1087 #ifdef HAVE_CACHE_RETURN_INT
1088         return count;
1089 #endif
1090 }
1091
1092 struct cache_definition ll_cache_definition = {
1093         .name = "llap_cache",
1094         .shrink = ll_shrink_cache
1095 };
1096 #endif /* HAVE_REGISTER_CACHE */
1097
1098 struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
1099 {
1100         struct inode *inode = NULL;
1101         /* NOTE: we depend on atomic igrab() -bzzz */
1102         lock_res_and_lock(lock);
1103         if (lock->l_ast_data) {
1104                 struct ll_inode_info *lli = ll_i2info(lock->l_ast_data);
1105                 if (lli->lli_inode_magic == LLI_INODE_MAGIC) {
1106                         inode = igrab(lock->l_ast_data);
1107                 } else {
1108                         inode = lock->l_ast_data;
1109                         ldlm_lock_debug(NULL, inode->i_state & I_FREEING ?
1110                                                 D_INFO : D_WARNING,
1111                                         lock, __FILE__, __func__, __LINE__,
1112                                         "l_ast_data %p is bogus: magic %08x",
1113                                         lock->l_ast_data, lli->lli_inode_magic);
1114                         inode = NULL;
1115                 }
1116         }
1117         unlock_res_and_lock(lock);
1118         return inode;
1119 }
1120
1121 static int null_if_equal(struct ldlm_lock *lock, void *data)
1122 {
1123         if (data == lock->l_ast_data) {
1124                 lock->l_ast_data = NULL;
1125
1126                 if (lock->l_req_mode != lock->l_granted_mode)
1127                         LDLM_ERROR(lock,"clearing inode with ungranted lock");
1128         }
1129
1130         return LDLM_ITER_CONTINUE;
1131 }
1132
1133 void ll_clear_inode(struct inode *inode)
1134 {
1135         struct ll_inode_info *lli = ll_i2info(inode);
1136         struct ll_sb_info *sbi = ll_i2sbi(inode);
1137         ENTRY;
1138
1139         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
1140                inode->i_generation, inode);
1141
1142         if (S_ISDIR(inode->i_mode)) {
1143                 /* these should have been cleared in ll_file_release */
1144                 LASSERT(lli->lli_sai == NULL);
1145                 LASSERT(lli->lli_opendir_key == NULL);
1146                 LASSERT(lli->lli_opendir_pid == 0);
1147         }
1148
1149         ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK;
1150         md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
1151                          null_if_equal, inode);
1152
1153         LASSERT(!lli->lli_open_fd_write_count);
1154         LASSERT(!lli->lli_open_fd_read_count);
1155         LASSERT(!lli->lli_open_fd_exec_count);
1156
1157         if (lli->lli_mds_write_och)
1158                 ll_md_real_close(inode, FMODE_WRITE);
1159         if (lli->lli_mds_exec_och)
1160                 ll_md_real_close(inode, FMODE_EXEC);
1161         if (lli->lli_mds_read_och)
1162                 ll_md_real_close(inode, FMODE_READ);
1163
1164         if (lli->lli_smd) {
1165                 obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd,
1166                                   null_if_equal, inode);
1167
1168                 obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd);
1169                 lli->lli_smd = NULL;
1170         }
1171
1172         if (lli->lli_symlink_name) {
1173                 OBD_FREE(lli->lli_symlink_name,
1174                          strlen(lli->lli_symlink_name) + 1);
1175                 lli->lli_symlink_name = NULL;
1176         }
1177
1178         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
1179                 LASSERT(lli->lli_posix_acl == NULL);
1180                 if (lli->lli_remote_perms) {
1181                         free_rmtperm_hash(lli->lli_remote_perms);
1182                         lli->lli_remote_perms = NULL;
1183                 }
1184         }
1185 #ifdef CONFIG_FS_POSIX_ACL
1186         else if (lli->lli_posix_acl) {
1187                 LASSERT(atomic_read(&lli->lli_posix_acl->a_refcount) == 1);
1188                 LASSERT(lli->lli_remote_perms == NULL);
1189                 posix_acl_release(lli->lli_posix_acl);
1190                 lli->lli_posix_acl = NULL;
1191         }
1192 #endif
1193         lli->lli_inode_magic = LLI_INODE_DEAD;
1194
1195 #ifdef HAVE_EXPORT___IGET
1196         spin_lock(&sbi->ll_deathrow_lock);
1197         list_del_init(&lli->lli_dead_list);
1198         spin_unlock(&sbi->ll_deathrow_lock);
1199 #endif
1200         ll_clear_inode_capas(inode);
1201
1202         EXIT;
1203 }
1204
1205 int ll_md_setattr(struct inode *inode, struct md_op_data *op_data,
1206                   struct md_open_data **mod)
1207 {
1208         struct lustre_md md;
1209         struct ll_sb_info *sbi = ll_i2sbi(inode);
1210         struct ptlrpc_request *request = NULL;
1211         int rc;
1212         ENTRY;
1213         
1214         op_data = ll_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0, 
1215                                      LUSTRE_OPC_ANY, NULL);
1216         if (IS_ERR(op_data))
1217                 RETURN(PTR_ERR(op_data));
1218
1219         rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0, 
1220                         &request, mod);
1221         if (rc) {
1222                 ptlrpc_req_finished(request);
1223                 if (rc == -ENOENT) {
1224                         inode->i_nlink = 0;
1225                         /* Unlinked special device node? Or just a race?
1226                          * Pretend we done everything. */
1227                         if (!S_ISREG(inode->i_mode) &&
1228                             !S_ISDIR(inode->i_mode))
1229                                 rc = inode_setattr(inode, &op_data->op_attr);
1230                 } else if (rc != -EPERM && rc != -EACCES && rc != -ETXTBSY) {
1231                         CERROR("md_setattr fails: rc = %d\n", rc);
1232                 }
1233                 RETURN(rc);
1234         }
1235
1236         rc = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp,
1237                               sbi->ll_md_exp, &md);
1238         if (rc) {
1239                 ptlrpc_req_finished(request);
1240                 RETURN(rc);
1241         }
1242
1243         /* We call inode_setattr to adjust timestamps.
1244          * If there is at least some data in file, we cleared ATTR_SIZE
1245          * above to avoid invoking vmtruncate, otherwise it is important
1246          * to call vmtruncate in inode_setattr to update inode->i_size
1247          * (bug 6196) */
1248         rc = inode_setattr(inode, &op_data->op_attr);
1249
1250         /* Extract epoch data if obtained. */
1251         op_data->op_handle = md.body->handle;
1252         op_data->op_ioepoch = md.body->ioepoch;
1253
1254         ll_update_inode(inode, &md);
1255         ptlrpc_req_finished(request);
1256
1257         RETURN(rc);
1258 }
1259
1260 /* Close IO epoch and send Size-on-MDS attribute update. */
1261 static int ll_setattr_done_writing(struct inode *inode,
1262                                    struct md_op_data *op_data,
1263                                    struct md_open_data *mod)
1264 {
1265         struct ll_inode_info *lli = ll_i2info(inode);
1266         int rc = 0;
1267         ENTRY;
1268         
1269         LASSERT(op_data != NULL);
1270         if (!S_ISREG(inode->i_mode))
1271                 RETURN(0);
1272
1273         CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID" for truncate\n",
1274                op_data->op_ioepoch, PFID(&lli->lli_fid));
1275
1276         op_data->op_flags = MF_EPOCH_CLOSE | MF_SOM_CHANGE;
1277         rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, mod);
1278         if (rc == -EAGAIN) {
1279                 /* MDS has instructed us to obtain Size-on-MDS attribute
1280                  * from OSTs and send setattr to back to MDS. */
1281                 rc = ll_sizeonmds_update(inode, mod, &op_data->op_handle,
1282                                          op_data->op_ioepoch);
1283         } else if (rc) {
1284                 CERROR("inode %lu mdc truncate failed: rc = %d\n",
1285                        inode->i_ino, rc);
1286         }
1287         RETURN(rc);
1288 }
1289
1290 static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size)
1291 {
1292         struct ll_sb_info *sbi = ll_i2sbi(inode);
1293         struct ll_inode_info *lli = ll_i2info(inode);
1294         struct lov_stripe_md *lsm = lli->lli_smd;
1295         int rc;
1296         ldlm_policy_data_t policy = { .l_extent = {new_size,
1297                                                    OBD_OBJECT_EOF } };
1298         struct lustre_handle lockh = { 0 };
1299         int local_lock = 0; /* 0 - no local lock;
1300                              * 1 - lock taken by lock_extent;
1301                              * 2 - by obd_match*/
1302         int ast_flags;
1303         int err;
1304         ENTRY;
1305
1306         UNLOCK_INODE_MUTEX(inode);
1307         UP_WRITE_I_ALLOC_SEM(inode);
1308
1309         if (sbi->ll_lockless_truncate_enable &&
1310             (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK)) {
1311                 ast_flags = LDLM_FL_BLOCK_GRANTED;
1312                 rc = obd_match(sbi->ll_dt_exp, lsm, LDLM_EXTENT,
1313                                &policy, LCK_PW, &ast_flags, inode, &lockh);
1314                 if (rc > 0) {
1315                         local_lock = 2;
1316                         rc = 0;
1317                 } else if (rc == 0) {
1318                         rc = ll_file_punch(inode, new_size, 1);
1319                 }
1320         } else {
1321                 /* XXX when we fix the AST intents to pass the discard-range
1322                  * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
1323                  * XXX here. */
1324                 ast_flags = (new_size == 0) ? LDLM_AST_DISCARD_DATA : 0;
1325                 rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy,
1326                                     &lockh, ast_flags);
1327                 if (likely(rc == 0))
1328                         local_lock = 1;
1329         }
1330
1331 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1332         DOWN_WRITE_I_ALLOC_SEM(inode);
1333         LOCK_INODE_MUTEX(inode);
1334 #else
1335         LOCK_INODE_MUTEX(inode);
1336         DOWN_WRITE_I_ALLOC_SEM(inode);
1337 #endif
1338         if (likely(rc == 0)) {
1339                 /* Only ll_inode_size_lock is taken at this level.
1340                  * lov_stripe_lock() is grabbed by ll_truncate() only over
1341                  * call to obd_adjust_kms().  If vmtruncate returns 0, then
1342                  * ll_truncate dropped ll_inode_size_lock() */
1343                 ll_inode_size_lock(inode, 0);
1344                 if (!local_lock) {
1345                         spin_lock(&lli->lli_lock);
1346                         lli->lli_flags |= LLIF_SRVLOCK;
1347                         spin_unlock(&lli->lli_lock);
1348                 }
1349                 rc = vmtruncate(inode, new_size);
1350                 if (!local_lock) {
1351                         spin_lock(&lli->lli_lock);
1352                         lli->lli_flags &= ~LLIF_SRVLOCK;
1353                         spin_unlock(&lli->lli_lock);
1354                 }
1355                 if (rc != 0) {
1356                         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
1357                         ll_inode_size_unlock(inode, 0);
1358                 }
1359         }
1360
1361         if (local_lock) {
1362                 if (local_lock == 2)
1363                         err = obd_cancel(sbi->ll_dt_exp, lsm, LCK_PW, &lockh);
1364                 else
1365                         err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
1366                 if (unlikely(err != 0)){
1367                         CERROR("extent unlock failed: err=%d,"
1368                                " unlock method =%d\n", err, local_lock);
1369                         if (rc == 0)
1370                                 rc = err;
1371                 }
1372         }
1373         RETURN(rc);
1374 }
1375
1376 /* If this inode has objects allocated to it (lsm != NULL), then the OST
1377  * object(s) determine the file size and mtime.  Otherwise, the MDS will
1378  * keep these values until such a time that objects are allocated for it.
1379  * We do the MDS operations first, as it is checking permissions for us.
1380  * We don't to the MDS RPC if there is nothing that we want to store there,
1381  * otherwise there is no harm in updating mtime/atime on the MDS if we are
1382  * going to do an RPC anyways.
1383  *
1384  * If we are doing a truncate, we will send the mtime and ctime updates
1385  * to the OST with the punch RPC, otherwise we do an explicit setattr RPC.
1386  * I don't believe it is possible to get e.g. ATTR_MTIME_SET and ATTR_SIZE
1387  * at the same time.
1388  */
1389 int ll_setattr_raw(struct inode *inode, struct iattr *attr)
1390 {
1391         struct ll_inode_info *lli = ll_i2info(inode);
1392         struct lov_stripe_md *lsm = lli->lli_smd;
1393         struct ll_sb_info *sbi = ll_i2sbi(inode);
1394         struct md_op_data *op_data = NULL;
1395         struct md_open_data *mod = NULL;
1396         int ia_valid = attr->ia_valid;
1397         int rc = 0, rc1 = 0;
1398         ENTRY;
1399
1400         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu valid %x\n", inode->i_ino,
1401                attr->ia_valid);
1402         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_SETATTR, 1);
1403
1404         if (ia_valid & ATTR_SIZE) {
1405                 if (attr->ia_size > ll_file_maxbytes(inode)) {
1406                         CDEBUG(D_INODE, "file too large %llu > "LPU64"\n",
1407                                attr->ia_size, ll_file_maxbytes(inode));
1408                         RETURN(-EFBIG);
1409                 }
1410
1411                 attr->ia_valid |= ATTR_MTIME | ATTR_CTIME | ATTR_TRUNC;
1412         }
1413
1414         /* POSIX: check before ATTR_*TIME_SET set (from inode_change_ok) */
1415         if (ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET)) {
1416                 if (current->fsuid != inode->i_uid && !capable(CAP_FOWNER))
1417                         RETURN(-EPERM);
1418         }
1419
1420         /* We mark all of the fields "set" so MDS/OST does not re-set them */
1421         if (attr->ia_valid & ATTR_CTIME) {
1422                 attr->ia_ctime = CURRENT_TIME;
1423                 attr->ia_valid |= ATTR_CTIME_SET;
1424         }
1425         if (!(ia_valid & ATTR_ATIME_SET) && (attr->ia_valid & ATTR_ATIME)) {
1426                 attr->ia_atime = CURRENT_TIME;
1427                 attr->ia_valid |= ATTR_ATIME_SET;
1428         }
1429         if (!(ia_valid & ATTR_MTIME_SET) && (attr->ia_valid & ATTR_MTIME)) {
1430                 attr->ia_mtime = CURRENT_TIME;
1431                 attr->ia_valid |= ATTR_MTIME_SET;
1432         }
1433         if ((attr->ia_valid & ATTR_CTIME) && !(attr->ia_valid & ATTR_MTIME)) {
1434                 /* To avoid stale mtime on mds, obtain it from ost and send 
1435                    to mds. */
1436                 rc = ll_glimpse_size(inode, 0);
1437                 if (rc) 
1438                         RETURN(rc);
1439                 
1440                 attr->ia_valid |= ATTR_MTIME_SET | ATTR_MTIME;
1441                 attr->ia_mtime = inode->i_mtime;
1442         }
1443
1444         if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
1445                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n",
1446                        LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
1447                        cfs_time_current_sec());
1448
1449         /* NB: ATTR_SIZE will only be set after this point if the size
1450          * resides on the MDS, ie, this file has no objects. */
1451         if (lsm)
1452                 attr->ia_valid &= ~ATTR_SIZE;
1453
1454         /* We always do an MDS RPC, even if we're only changing the size;
1455          * only the MDS knows whether truncate() should fail with -ETXTBUSY */
1456
1457         OBD_ALLOC_PTR(op_data);
1458         if (op_data == NULL)
1459                 RETURN(-ENOMEM);
1460
1461         memcpy(&op_data->op_attr, attr, sizeof(*attr));
1462
1463         /* Open epoch for truncate. */
1464         if ((ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) &&
1465             (ia_valid & ATTR_SIZE))
1466                 op_data->op_flags = MF_EPOCH_OPEN;
1467
1468         rc = ll_md_setattr(inode, op_data, &mod);
1469         if (rc)
1470                 GOTO(out, rc);
1471
1472         if (op_data->op_ioepoch)
1473                 CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID" for "
1474                        "truncate\n", op_data->op_ioepoch, PFID(&lli->lli_fid));
1475
1476         if (!lsm || !S_ISREG(inode->i_mode)) {
1477                 CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
1478                 GOTO(out, rc = 0);
1479         }
1480
1481         /* We really need to get our PW lock before we change inode->i_size.
1482          * If we don't we can race with other i_size updaters on our node, like
1483          * ll_file_read.  We can also race with i_size propogation to other
1484          * nodes through dirtying and writeback of final cached pages.  This
1485          * last one is especially bad for racing o_append users on other
1486          * nodes. */
1487         if (ia_valid & ATTR_SIZE) {
1488                 rc = ll_setattr_do_truncate(inode, attr->ia_size);
1489         } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
1490                 obd_flag flags;
1491                 struct obd_info oinfo = { { { 0 } } };
1492                 struct obdo *oa;
1493
1494                 CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n",
1495                        inode->i_ino, LTIME_S(attr->ia_mtime));
1496
1497                 OBDO_ALLOC(oa);
1498                 if (oa) {
1499                         oa->o_id = lsm->lsm_object_id;
1500                         oa->o_gr = lsm->lsm_object_gr;
1501                         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1502
1503                         flags = OBD_MD_FLTYPE | OBD_MD_FLATIME |
1504                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1505                                 OBD_MD_FLFID | OBD_MD_FLGENER | 
1506                                 OBD_MD_FLGROUP;
1507
1508                         obdo_from_inode(oa, inode, flags);
1509
1510                         oinfo.oi_oa = oa;
1511                         oinfo.oi_md = lsm;
1512                         oinfo.oi_capa = ll_mdscapa_get(inode);
1513
1514                         /* XXX: this looks unnecessary now. */
1515                         rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL);
1516                         capa_put(oinfo.oi_capa);
1517                         if (rc)
1518                                 CERROR("obd_setattr_async fails: rc=%d\n", rc);
1519                         OBDO_FREE(oa);
1520                 } else {
1521                         rc = -ENOMEM;
1522                 }
1523         }
1524         EXIT;
1525 out:
1526         if (op_data) {
1527                 if (op_data->op_ioepoch)
1528                         rc1 = ll_setattr_done_writing(inode, op_data, mod);
1529                 ll_finish_md_op_data(op_data);
1530         }
1531         return rc ? rc : rc1;
1532 }
1533
1534 int ll_setattr(struct dentry *de, struct iattr *attr)
1535 {
1536         int mode;
1537
1538         if ((attr->ia_valid & (ATTR_CTIME|ATTR_SIZE|ATTR_MODE)) ==
1539             (ATTR_CTIME|ATTR_SIZE|ATTR_MODE))
1540                 attr->ia_valid |= MDS_OPEN_OWNEROVERRIDE;
1541         if ((attr->ia_valid & (ATTR_MODE|ATTR_FORCE|ATTR_SIZE)) == 
1542             (ATTR_SIZE|ATTR_MODE)) {
1543                 mode = de->d_inode->i_mode;
1544                 if (((mode & S_ISUID) && (!(attr->ia_mode & S_ISUID))) ||
1545                     ((mode & S_ISGID) && (mode & S_IXGRP) &&
1546                     (!(attr->ia_mode & S_ISGID))))
1547                         attr->ia_valid |= ATTR_FORCE;
1548         }
1549
1550         return ll_setattr_raw(de->d_inode, attr);
1551 }
1552
1553 int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs,
1554                        __u64 max_age, __u32 flags)
1555 {
1556         struct ll_sb_info *sbi = ll_s2sbi(sb);
1557         struct obd_statfs obd_osfs;
1558         int rc;
1559         ENTRY;
1560
1561         rc = obd_statfs(class_exp2obd(sbi->ll_md_exp), osfs, max_age, flags);
1562         if (rc) {
1563                 CERROR("md_statfs fails: rc = %d\n", rc);
1564                 RETURN(rc);
1565         }
1566
1567         osfs->os_type = sb->s_magic;
1568
1569         CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
1570                osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files);
1571
1572         rc = obd_statfs_rqset(class_exp2obd(sbi->ll_dt_exp),
1573                               &obd_osfs, max_age, flags);
1574         if (rc) {
1575                 CERROR("obd_statfs fails: rc = %d\n", rc);
1576                 RETURN(rc);
1577         }
1578
1579         CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
1580                obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree,
1581                obd_osfs.os_files);
1582
1583         osfs->os_bsize = obd_osfs.os_bsize;
1584         osfs->os_blocks = obd_osfs.os_blocks;
1585         osfs->os_bfree = obd_osfs.os_bfree;
1586         osfs->os_bavail = obd_osfs.os_bavail;
1587
1588         /* If we don't have as many objects free on the OST as inodes
1589          * on the MDS, we reduce the total number of inodes to
1590          * compensate, so that the "inodes in use" number is correct.
1591          */
1592         if (obd_osfs.os_ffree < osfs->os_ffree) {
1593                 osfs->os_files = (osfs->os_files - osfs->os_ffree) +
1594                         obd_osfs.os_ffree;
1595                 osfs->os_ffree = obd_osfs.os_ffree;
1596         }
1597
1598         RETURN(rc);
1599 }
1600 #ifndef HAVE_STATFS_DENTRY_PARAM
1601 int ll_statfs(struct super_block *sb, struct kstatfs *sfs)
1602 {
1603 #else
1604 int ll_statfs(struct dentry *de, struct kstatfs *sfs)
1605 {
1606         struct super_block *sb = de->d_sb;
1607 #endif
1608         struct obd_statfs osfs;
1609         int rc;
1610
1611         CDEBUG(D_VFSTRACE, "VFS Op: at "LPU64" jiffies\n", get_jiffies_64());
1612         ll_stats_ops_tally(ll_s2sbi(sb), LPROC_LL_STAFS, 1);
1613
1614         /* For now we will always get up-to-date statfs values, but in the
1615          * future we may allow some amount of caching on the client (e.g.
1616          * from QOS or lprocfs updates). */
1617         rc = ll_statfs_internal(sb, &osfs, cfs_time_current_64() - 1, 0);
1618         if (rc)
1619                 return rc;
1620
1621         statfs_unpack(sfs, &osfs);
1622
1623         /* We need to downshift for all 32-bit kernels, because we can't
1624          * tell if the kernel is being called via sys_statfs64() or not.
1625          * Stop before overflowing f_bsize - in which case it is better
1626          * to just risk EOVERFLOW if caller is using old sys_statfs(). */
1627         if (sizeof(long) < 8) {
1628                 while (osfs.os_blocks > ~0UL && sfs->f_bsize < 0x40000000) {
1629                         sfs->f_bsize <<= 1;
1630
1631                         osfs.os_blocks >>= 1;
1632                         osfs.os_bfree >>= 1;
1633                         osfs.os_bavail >>= 1;
1634                 }
1635         }
1636
1637         sfs->f_blocks = osfs.os_blocks;
1638         sfs->f_bfree = osfs.os_bfree;
1639         sfs->f_bavail = osfs.os_bavail;
1640
1641         return 0;
1642 }
1643
1644 void ll_inode_size_lock(struct inode *inode, int lock_lsm)
1645 {
1646         struct ll_inode_info *lli;
1647         struct lov_stripe_md *lsm;
1648
1649         lli = ll_i2info(inode);
1650         LASSERT(lli->lli_size_sem_owner != current);
1651         down(&lli->lli_size_sem);
1652         LASSERT(lli->lli_size_sem_owner == NULL);
1653         lli->lli_size_sem_owner = current;
1654         lsm = lli->lli_smd;
1655         LASSERTF(lsm != NULL || lock_lsm == 0, "lsm %p, lock_lsm %d\n",
1656                  lsm, lock_lsm);
1657         if (lock_lsm)
1658                 lov_stripe_lock(lsm);
1659 }
1660
1661 void ll_inode_size_unlock(struct inode *inode, int unlock_lsm)
1662 {
1663         struct ll_inode_info *lli;
1664         struct lov_stripe_md *lsm;
1665
1666         lli = ll_i2info(inode);
1667         lsm = lli->lli_smd;
1668         LASSERTF(lsm != NULL || unlock_lsm == 0, "lsm %p, lock_lsm %d\n",
1669                  lsm, unlock_lsm);
1670         if (unlock_lsm)
1671                 lov_stripe_unlock(lsm);
1672         LASSERT(lli->lli_size_sem_owner == current);
1673         lli->lli_size_sem_owner = NULL;
1674         up(&lli->lli_size_sem);
1675 }
1676
1677 static void ll_replace_lsm(struct inode *inode, struct lov_stripe_md *lsm)
1678 {
1679         struct ll_inode_info *lli = ll_i2info(inode);
1680
1681         dump_lsm(D_INODE, lsm);
1682         dump_lsm(D_INODE, lli->lli_smd);
1683         LASSERTF(lsm->lsm_magic == LOV_MAGIC_JOIN,
1684                  "lsm must be joined lsm %p\n", lsm);
1685         obd_free_memmd(ll_i2dtexp(inode), &lli->lli_smd);
1686         CDEBUG(D_INODE, "replace lsm %p to lli_smd %p for inode %lu%u(%p)\n",
1687                lsm, lli->lli_smd, inode->i_ino, inode->i_generation, inode);
1688         lli->lli_smd = lsm;
1689         lli->lli_maxbytes = lsm->lsm_maxbytes;
1690         if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES)
1691                 lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
1692 }
1693
1694 void ll_update_inode(struct inode *inode, struct lustre_md *md)
1695 {
1696         struct ll_inode_info *lli = ll_i2info(inode);
1697         struct mdt_body *body = md->body;
1698         struct lov_stripe_md *lsm = md->lsm;
1699         struct ll_sb_info *sbi = ll_i2sbi(inode);
1700
1701         LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
1702         if (lsm != NULL) {
1703                 if (lli->lli_smd == NULL) {
1704                         if (lsm->lsm_magic != LOV_MAGIC &&
1705                             lsm->lsm_magic != LOV_MAGIC_JOIN) {
1706                                 dump_lsm(D_ERROR, lsm);
1707                                 LBUG();
1708                         }
1709                         CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
1710                                lsm, inode->i_ino, inode->i_generation, inode);
1711                         /* ll_inode_size_lock() requires it is only called
1712                          * with lli_smd != NULL or lock_lsm == 0 or we can
1713                          * race between lock/unlock.  bug 9547 */
1714                         lli->lli_smd = lsm;
1715                         lli->lli_maxbytes = lsm->lsm_maxbytes;
1716                         if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES)
1717                                 lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
1718                 } else {
1719                         if (lli->lli_smd->lsm_magic == lsm->lsm_magic &&
1720                              lli->lli_smd->lsm_stripe_count ==
1721                                         lsm->lsm_stripe_count) {
1722                                 if (lov_stripe_md_cmp(lli->lli_smd, lsm)) {
1723                                         CERROR("lsm mismatch for inode %ld\n",
1724                                                 inode->i_ino);
1725                                         CERROR("lli_smd:\n");
1726                                         dump_lsm(D_ERROR, lli->lli_smd);
1727                                         CERROR("lsm:\n");
1728                                         dump_lsm(D_ERROR, lsm);
1729                                         LBUG();
1730                                 }
1731                         } else
1732                                 ll_replace_lsm(inode, lsm);
1733                 }
1734                 if (lli->lli_smd != lsm)
1735                         obd_free_memmd(ll_i2dtexp(inode), &lsm);
1736         }
1737
1738         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
1739                 if (body->valid & OBD_MD_FLRMTPERM)
1740                         ll_update_remote_perm(inode, md->remote_perm);
1741         }
1742 #ifdef CONFIG_FS_POSIX_ACL
1743         else if (body->valid & OBD_MD_FLACL) {
1744                 spin_lock(&lli->lli_lock);
1745                 if (lli->lli_posix_acl)
1746                         posix_acl_release(lli->lli_posix_acl);
1747                 lli->lli_posix_acl = md->posix_acl;
1748                 spin_unlock(&lli->lli_lock);
1749         }
1750 #endif
1751         inode->i_ino = ll_fid_build_ino(sbi, &body->fid1);
1752
1753         if (body->valid & OBD_MD_FLATIME &&
1754             body->atime > LTIME_S(inode->i_atime))
1755                 LTIME_S(inode->i_atime) = body->atime;
1756         
1757         /* mtime is always updated with ctime, but can be set in past.
1758            As write and utime(2) may happen within 1 second, and utime's
1759            mtime has a priority over write's one, so take mtime from mds 
1760            for the same ctimes. */
1761         if (body->valid & OBD_MD_FLCTIME &&
1762             body->ctime >= LTIME_S(inode->i_ctime)) {
1763                 LTIME_S(inode->i_ctime) = body->ctime;
1764                 if (body->valid & OBD_MD_FLMTIME) {
1765                         CDEBUG(D_INODE, "setting ino %lu mtime "
1766                                "from %lu to "LPU64"\n", inode->i_ino, 
1767                                LTIME_S(inode->i_mtime), body->mtime);
1768                         LTIME_S(inode->i_mtime) = body->mtime;
1769                 }
1770         }
1771         if (body->valid & OBD_MD_FLMODE)
1772                 inode->i_mode = (inode->i_mode & S_IFMT)|(body->mode & ~S_IFMT);
1773         if (body->valid & OBD_MD_FLTYPE)
1774                 inode->i_mode = (inode->i_mode & ~S_IFMT)|(body->mode & S_IFMT);
1775         if (S_ISREG(inode->i_mode)) {
1776                 inode->i_blkbits = min(PTLRPC_MAX_BRW_BITS + 1, LL_MAX_BLKSIZE_BITS);
1777         } else {
1778                 inode->i_blkbits = inode->i_sb->s_blocksize_bits;
1779         }
1780 #ifdef HAVE_INODE_BLKSIZE
1781         inode->i_blksize = 1<<inode->i_blkbits;
1782 #endif
1783         if (body->valid & OBD_MD_FLUID)
1784                 inode->i_uid = body->uid;
1785         if (body->valid & OBD_MD_FLGID)
1786                 inode->i_gid = body->gid;
1787         if (body->valid & OBD_MD_FLFLAGS)
1788                 inode->i_flags = ll_ext_to_inode_flags(body->flags);
1789         if (body->valid & OBD_MD_FLNLINK)
1790                 inode->i_nlink = body->nlink;
1791         if (body->valid & OBD_MD_FLRDEV)
1792                 inode->i_rdev = old_decode_dev(body->rdev);
1793
1794         if (body->valid & OBD_MD_FLID) {
1795                 /* FID shouldn't be changed! */
1796                 if (fid_is_sane(&lli->lli_fid)) {
1797                         LASSERTF(lu_fid_eq(&lli->lli_fid, &body->fid1),
1798                                  "Trying to change FID "DFID
1799                                  " to the "DFID", inode %lu/%u(%p)\n",
1800                                  PFID(&lli->lli_fid), PFID(&body->fid1),
1801                                  inode->i_ino, inode->i_generation, inode);
1802                 } else 
1803                         lli->lli_fid = body->fid1;
1804         }
1805
1806         LASSERT(fid_seq(&lli->lli_fid) != 0);
1807
1808         if (body->valid & OBD_MD_FLSIZE) {
1809                 if ((ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) &&
1810                     S_ISREG(inode->i_mode) && lli->lli_smd) {
1811                         struct lustre_handle lockh;
1812                         ldlm_mode_t mode;
1813                         
1814                         /* As it is possible a blocking ast has been processed
1815                          * by this time, we need to check there is an UPDATE 
1816                          * lock on the client and set LLIF_MDS_SIZE_LOCK holding
1817                          * it. */
1818                         mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE,
1819                                                &lockh);
1820                         if (mode) {
1821                                 if (lli->lli_flags & (LLIF_DONE_WRITING |
1822                                                       LLIF_EPOCH_PENDING |
1823                                                       LLIF_SOM_DIRTY)) {
1824                                         CERROR("ino %lu flags %lu still has "
1825                                                "size authority! do not trust "
1826                                                "the size got from MDS\n",
1827                                                inode->i_ino, lli->lli_flags);
1828                                 } else {
1829                                         /* Use old size assignment to avoid
1830                                          * deadlock bz14138 & bz14326 */
1831                                         inode->i_size = body->size;
1832                                         lli->lli_flags |= LLIF_MDS_SIZE_LOCK;
1833                                 }
1834                                 ldlm_lock_decref(&lockh, mode);
1835                         }
1836                 } else {
1837                         /* Use old size assignment to avoid
1838                          * deadlock bz14138 & bz14326 */
1839                         inode->i_size = body->size;
1840                 }
1841
1842                 if (body->valid & OBD_MD_FLBLOCKS)
1843                         inode->i_blocks = body->blocks;
1844         }
1845
1846         if (body->valid & OBD_MD_FLMDSCAPA) {
1847                 LASSERT(md->mds_capa);
1848                 ll_add_capa(inode, md->mds_capa);
1849         }
1850         if (body->valid & OBD_MD_FLOSSCAPA) {
1851                 LASSERT(md->oss_capa);
1852                 ll_add_capa(inode, md->oss_capa);
1853         }
1854 }
1855
1856 static struct backing_dev_info ll_backing_dev_info = {
1857         .ra_pages       = 0,    /* No readahead */
1858 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,12))
1859         .capabilities   = 0,    /* Does contribute to dirty memory */
1860 #else
1861         .memory_backed  = 0,    /* Does contribute to dirty memory */
1862 #endif
1863 };
1864
1865 void ll_read_inode2(struct inode *inode, void *opaque)
1866 {
1867         struct lustre_md *md = opaque;
1868         struct ll_inode_info *lli = ll_i2info(inode);
1869         ENTRY;
1870
1871         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n",
1872                inode->i_ino, inode->i_generation, inode);
1873
1874         ll_lli_init(lli);
1875
1876         LASSERT(!lli->lli_smd);
1877
1878         /* Core attributes from the MDS first.  This is a new inode, and
1879          * the VFS doesn't zero times in the core inode so we have to do
1880          * it ourselves.  They will be overwritten by either MDS or OST
1881          * attributes - we just need to make sure they aren't newer. */
1882         LTIME_S(inode->i_mtime) = 0;
1883         LTIME_S(inode->i_atime) = 0;
1884         LTIME_S(inode->i_ctime) = 0;
1885         inode->i_rdev = 0;
1886         ll_update_inode(inode, md);
1887
1888         /* OIDEBUG(inode); */
1889
1890         if (S_ISREG(inode->i_mode)) {
1891                 struct ll_sb_info *sbi = ll_i2sbi(inode);
1892                 inode->i_op = &ll_file_inode_operations;
1893                 inode->i_fop = sbi->ll_fop;
1894                 inode->i_mapping->a_ops = &ll_aops;
1895                 EXIT;
1896         } else if (S_ISDIR(inode->i_mode)) {
1897                 inode->i_op = &ll_dir_inode_operations;
1898                 inode->i_fop = &ll_dir_operations;
1899                 inode->i_mapping->a_ops = &ll_dir_aops;
1900                 EXIT;
1901         } else if (S_ISLNK(inode->i_mode)) {
1902                 inode->i_op = &ll_fast_symlink_inode_operations;
1903                 EXIT;
1904         } else {
1905                 inode->i_op = &ll_special_inode_operations;
1906
1907                 init_special_inode(inode, inode->i_mode,
1908                                    kdev_t_to_nr(inode->i_rdev));
1909
1910                 /* initializing backing dev info. */
1911                 inode->i_mapping->backing_dev_info = &ll_backing_dev_info;
1912
1913                 EXIT;
1914         }
1915 }
1916
1917 void ll_delete_inode(struct inode *inode)
1918 {
1919         struct ll_sb_info *sbi = ll_i2sbi(inode);
1920         int rc;
1921         ENTRY;
1922
1923         rc = obd_fid_delete(sbi->ll_md_exp, ll_inode2fid(inode));
1924         if (rc) {
1925                 CERROR("fid_delete() failed, rc %d\n", rc);
1926         }
1927         truncate_inode_pages(&inode->i_data, 0);
1928         clear_inode(inode);
1929
1930         EXIT;
1931 }
1932
1933 int ll_iocontrol(struct inode *inode, struct file *file,
1934                  unsigned int cmd, unsigned long arg)
1935 {
1936         struct ll_sb_info *sbi = ll_i2sbi(inode);
1937         struct ptlrpc_request *req = NULL;
1938         int rc, flags = 0;
1939         ENTRY;
1940
1941         switch(cmd) {
1942         case EXT3_IOC_GETFLAGS: {
1943                 struct mdt_body *body;
1944                 struct obd_capa *oc;
1945
1946                 oc = ll_mdscapa_get(inode);
1947                 rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
1948                                 OBD_MD_FLFLAGS, 0, &req);
1949                 capa_put(oc);
1950                 if (rc) {
1951                         CERROR("failure %d inode %lu\n", rc, inode->i_ino);
1952                         RETURN(-abs(rc));
1953                 }
1954
1955                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1956
1957                 flags = body->flags;
1958
1959                 ptlrpc_req_finished(req);
1960
1961                 RETURN(put_user(flags, (int *)arg));
1962         }
1963         case EXT3_IOC_SETFLAGS: {
1964                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
1965                 struct obd_info oinfo = { { { 0 } } };
1966                 struct md_op_data *op_data;
1967
1968                 if (get_user(flags, (int *)arg))
1969                         RETURN(-EFAULT);
1970
1971                 oinfo.oi_md = lsm;
1972                 OBDO_ALLOC(oinfo.oi_oa);
1973                 if (!oinfo.oi_oa)
1974                         RETURN(-ENOMEM);
1975
1976                 op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
1977                                              LUSTRE_OPC_ANY, NULL);
1978                 if (IS_ERR(op_data))
1979                         RETURN(PTR_ERR(op_data));
1980
1981                 ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = flags;
1982                 op_data->op_attr.ia_valid |= ATTR_ATTR_FLAG;
1983                 rc = md_setattr(sbi->ll_md_exp, op_data,
1984                                 NULL, 0, NULL, 0, &req, NULL);
1985                 ll_finish_md_op_data(op_data);
1986                 ptlrpc_req_finished(req);
1987                 if (rc || lsm == NULL) {
1988                         OBDO_FREE(oinfo.oi_oa);
1989                         RETURN(rc);
1990                 }
1991
1992                 oinfo.oi_oa->o_id = lsm->lsm_object_id;
1993                 oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
1994                 oinfo.oi_oa->o_flags = flags;
1995                 oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | 
1996                                        OBD_MD_FLGROUP;
1997                 oinfo.oi_capa = ll_mdscapa_get(inode);
1998
1999                 obdo_from_inode(oinfo.oi_oa, inode,
2000                                 OBD_MD_FLFID | OBD_MD_FLGENER);
2001                 rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL);
2002                 capa_put(oinfo.oi_capa);
2003                 OBDO_FREE(oinfo.oi_oa);
2004                 if (rc) {
2005                         if (rc != -EPERM && rc != -EACCES)
2006                                 CERROR("md_setattr_async fails: rc = %d\n", rc);
2007                         RETURN(rc);
2008                 }
2009
2010                 inode->i_flags = ll_ext_to_inode_flags(flags |
2011                                                        MDS_BFLAG_EXT_FLAGS);
2012                 RETURN(0);
2013         }
2014         default:
2015                 RETURN(-ENOSYS);
2016         }
2017
2018         RETURN(0);
2019 }
2020
2021 int ll_flush_ctx(struct inode *inode)
2022 {
2023         struct ll_sb_info  *sbi = ll_i2sbi(inode);
2024
2025         CDEBUG(D_SEC, "flush context for user %d\n", current->uid);
2026
2027         obd_set_info_async(sbi->ll_md_exp,
2028                            sizeof(KEY_FLUSH_CTX), KEY_FLUSH_CTX,
2029                            0, NULL, NULL);
2030         obd_set_info_async(sbi->ll_dt_exp,
2031                            sizeof(KEY_FLUSH_CTX), KEY_FLUSH_CTX,
2032                            0, NULL, NULL);
2033         return 0;
2034 }
2035
2036 /* umount -f client means force down, don't save state */
2037 #ifdef HAVE_UMOUNTBEGIN_VFSMOUNT
2038 void ll_umount_begin(struct vfsmount *vfsmnt, int flags)
2039 {
2040         struct super_block *sb = vfsmnt->mnt_sb;
2041 #else
2042 void ll_umount_begin(struct super_block *sb)
2043 {
2044 #endif
2045         struct lustre_sb_info *lsi = s2lsi(sb);
2046         struct ll_sb_info *sbi = ll_s2sbi(sb);
2047         struct obd_device *obd;
2048         struct obd_ioctl_data ioc_data = { 0 };
2049         ENTRY;
2050
2051 #ifdef HAVE_UMOUNTBEGIN_VFSMOUNT
2052         if (!(flags & MNT_FORCE)) {
2053                 EXIT;
2054                 return;
2055         }
2056 #endif
2057
2058         /* Tell the MGC we got umount -f */
2059         lsi->lsi_flags |= LSI_UMOUNT_FORCE;
2060
2061         CDEBUG(D_VFSTRACE, "VFS Op: superblock %p count %d active %d\n", sb,
2062                sb->s_count, atomic_read(&sb->s_active));
2063
2064         obd = class_exp2obd(sbi->ll_md_exp);
2065         if (obd == NULL) {
2066                 CERROR("Invalid MDC connection handle "LPX64"\n",
2067                        sbi->ll_md_exp->exp_handle.h_cookie);
2068                 EXIT;
2069                 return;
2070         }
2071         obd->obd_force = 1;
2072         obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_md_exp, sizeof ioc_data,
2073                       &ioc_data, NULL);
2074
2075         obd = class_exp2obd(sbi->ll_dt_exp);
2076         if (obd == NULL) {
2077                 CERROR("Invalid LOV connection handle "LPX64"\n",
2078                        sbi->ll_dt_exp->exp_handle.h_cookie);
2079                 EXIT;
2080                 return;
2081         }
2082
2083         obd->obd_force = 1;
2084         obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_dt_exp, sizeof ioc_data,
2085                       &ioc_data, NULL);
2086
2087         /* Really, we'd like to wait until there are no requests outstanding,
2088          * and then continue.  For now, we just invalidate the requests,
2089          * schedule, and hope.
2090          */
2091         schedule();
2092
2093         EXIT;
2094 }
2095
2096 int ll_remount_fs(struct super_block *sb, int *flags, char *data)
2097 {
2098         struct ll_sb_info *sbi = ll_s2sbi(sb);
2099         int err;
2100         __u32 read_only;
2101
2102         if ((*flags & MS_RDONLY) != (sb->s_flags & MS_RDONLY)) {
2103                 read_only = *flags & MS_RDONLY;
2104                 err = obd_set_info_async(sbi->ll_md_exp,
2105                                          sizeof(KEY_READ_ONLY),
2106                                          KEY_READ_ONLY, sizeof(read_only),
2107                                          &read_only, NULL);
2108                 if (err) {
2109                         CERROR("Failed to change the read-only flag during "
2110                                "remount: %d\n", err);
2111                         return err;
2112                 }
2113
2114                 if (read_only)
2115                         sb->s_flags |= MS_RDONLY;
2116                 else
2117                         sb->s_flags &= ~MS_RDONLY;
2118         }
2119         return 0;
2120 }
2121
2122 int ll_prep_inode(struct inode **inode,
2123                   struct ptlrpc_request *req,
2124                   struct super_block *sb)
2125 {
2126         struct ll_sb_info *sbi = NULL;
2127         struct lustre_md md;
2128         int rc = 0;
2129         ENTRY;
2130
2131         LASSERT(*inode || sb);
2132         sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode);
2133         prune_deathrow(sbi, 1);
2134         memset(&md, 0, sizeof(struct lustre_md));
2135
2136         rc = md_get_lustre_md(sbi->ll_md_exp, req, sbi->ll_dt_exp,
2137                               sbi->ll_md_exp, &md);
2138         if (rc)
2139                 RETURN(rc);
2140
2141         if (*inode) {
2142                 ll_update_inode(*inode, &md);
2143         } else {
2144                 LASSERT(sb != NULL);
2145
2146                 /*
2147                  * At this point server returns to client's same fid as client
2148                  * generated for creating. So using ->fid1 is okay here.
2149                  */
2150                 LASSERT(fid_is_sane(&md.body->fid1));
2151
2152                 *inode = ll_iget(sb, ll_fid_build_ino(sbi, &md.body->fid1), &md);
2153                 if (*inode == NULL || is_bad_inode(*inode)) {
2154                         if (md.lsm)
2155                                 obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
2156 #ifdef CONFIG_FS_POSIX_ACL
2157                         if (md.posix_acl) {
2158                                 posix_acl_release(md.posix_acl);
2159                                 md.posix_acl = NULL;
2160                         }
2161 #endif
2162                         rc = -ENOMEM;
2163                         CERROR("new_inode -fatal: rc %d\n", rc);
2164                         GOTO(out, rc);
2165                 }
2166         }
2167
2168         rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp,
2169                          ll_i2info(*inode)->lli_smd);
2170 out:
2171         md_free_lustre_md(sbi->ll_md_exp, &md);
2172         RETURN(rc);
2173 }
2174
2175 char *llap_origins[] = {
2176         [LLAP_ORIGIN_UNKNOWN] = "--",
2177         [LLAP_ORIGIN_READPAGE] = "rp",
2178         [LLAP_ORIGIN_READAHEAD] = "ra",
2179         [LLAP_ORIGIN_COMMIT_WRITE] = "cw",
2180         [LLAP_ORIGIN_WRITEPAGE] = "wp",
2181         [LLAP_ORIGIN_LOCKLESS_IO] = "ls"
2182 };
2183
2184 struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi,
2185                                              struct list_head *list)
2186 {
2187         struct ll_async_page *llap;
2188         struct list_head *pos;
2189
2190         list_for_each(pos, list) {
2191                 if (pos == &sbi->ll_pglist)
2192                         return NULL;
2193                 llap = list_entry(pos, struct ll_async_page, llap_pglist_item);
2194                 if (llap->llap_page == NULL)
2195                         continue;
2196                 return llap;
2197         }
2198         LBUG();
2199         return NULL;
2200 }
2201
2202 int ll_obd_statfs(struct inode *inode, void *arg)
2203 {
2204         struct ll_sb_info *sbi = NULL;
2205         struct obd_export *exp;
2206         char *buf = NULL;
2207         struct obd_ioctl_data *data = NULL;
2208         __u32 type;
2209         int len = 0, rc;
2210
2211         if (!inode || !(sbi = ll_i2sbi(inode)))
2212                 GOTO(out_statfs, rc = -EINVAL);
2213
2214         rc = obd_ioctl_getdata(&buf, &len, arg);
2215         if (rc)
2216                 GOTO(out_statfs, rc);
2217
2218         data = (void*)buf;
2219         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2 ||
2220             !data->ioc_pbuf1 || !data->ioc_pbuf2)
2221                 GOTO(out_statfs, rc = -EINVAL);
2222
2223         memcpy(&type, data->ioc_inlbuf1, sizeof(__u32));
2224         if (type == LL_STATFS_MDC)
2225                 exp = sbi->ll_md_exp;
2226         else if (type == LL_STATFS_LOV)
2227                 exp = sbi->ll_dt_exp;
2228         else 
2229                 GOTO(out_statfs, rc = -ENODEV);
2230
2231         rc = obd_iocontrol(IOC_OBD_STATFS, exp, len, buf, NULL);
2232         if (rc)
2233                 GOTO(out_statfs, rc);
2234 out_statfs:
2235         if (buf)
2236                 obd_ioctl_freedata(buf, len);
2237         return rc;
2238 }
2239
2240 int ll_process_config(struct lustre_cfg *lcfg)
2241 {
2242         char *ptr;
2243         void *sb;
2244         struct lprocfs_static_vars lvars;
2245         unsigned long x; 
2246         int rc = 0;
2247
2248         lprocfs_llite_init_vars(&lvars);
2249
2250         /* The instance name contains the sb: lustre-client-aacfe000 */
2251         ptr = strrchr(lustre_cfg_string(lcfg, 0), '-');
2252         if (!ptr || !*(++ptr)) 
2253                 return -EINVAL;
2254         if (sscanf(ptr, "%lx", &x) != 1)
2255                 return -EINVAL;
2256         sb = (void *)x;
2257         /* This better be a real Lustre superblock! */
2258         LASSERT(s2lsi((struct super_block *)sb)->lsi_lmd->lmd_magic == LMD_MAGIC);
2259
2260         /* Note we have not called client_common_fill_super yet, so 
2261            proc fns must be able to handle that! */
2262         rc = class_process_proc_param(PARAM_LLITE, lvars.obd_vars,
2263                                       lcfg, sb);
2264         return(rc);
2265 }
2266
2267 /* this function prepares md_op_data hint for passing ot down to MD stack. */
2268 struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data,
2269                                        struct inode *i1, struct inode *i2,
2270                                        const char *name, int namelen,
2271                                        int mode, __u32 opc, void *data)
2272 {
2273         LASSERT(i1 != NULL);
2274
2275         if (namelen > ll_i2sbi(i1)->ll_namelen)
2276                 return ERR_PTR(-ENAMETOOLONG);
2277         
2278         if (op_data == NULL)
2279                 OBD_ALLOC_PTR(op_data);
2280         
2281         if (op_data == NULL)
2282                 return ERR_PTR(-ENOMEM);
2283
2284         ll_i2gids(op_data->op_suppgids, i1, i2);
2285         op_data->op_fid1 = *ll_inode2fid(i1);
2286         op_data->op_capa1 = ll_mdscapa_get(i1);
2287
2288         if (i2) {
2289                 op_data->op_fid2 = *ll_inode2fid(i2);
2290                 op_data->op_capa2 = ll_mdscapa_get(i2);
2291         } else {
2292                 fid_zero(&op_data->op_fid2);
2293                 op_data->op_capa2 = NULL;
2294         }
2295
2296         op_data->op_name = name;
2297         op_data->op_namelen = namelen;
2298         op_data->op_mode = mode;
2299         op_data->op_mod_time = cfs_time_current_sec();
2300         op_data->op_fsuid = current->fsuid;
2301         op_data->op_fsgid = current->fsgid;
2302         op_data->op_cap = current->cap_effective;
2303         op_data->op_bias = MDS_CHECK_SPLIT;
2304         op_data->op_opc = opc;
2305         op_data->op_mds = 0;
2306         op_data->op_data = data;
2307
2308         return op_data;
2309 }
2310
2311 void ll_finish_md_op_data(struct md_op_data *op_data)
2312 {
2313         capa_put(op_data->op_capa1);
2314         capa_put(op_data->op_capa2);
2315         OBD_FREE_PTR(op_data);
2316 }