Whamcloud - gitweb
land b1_5 onto HEAD
[fs/lustre-release.git] / lustre / liblustre / super.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Light Super operations
5  *
6  *  Copyright (c) 2002-2004 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LLITE
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <time.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <fcntl.h>
33 #include <sys/queue.h>
34 #ifndef __CYGWIN__
35 # include <sys/statvfs.h>
36 #else
37 # include <sys/statfs.h>
38 #endif
39
40 #ifdef HAVE_XTIO_H
41 #include <xtio.h>
42 #endif
43 #include <sysio.h>
44 #include <fs.h>
45 #include <mount.h>
46 #include <inode.h>
47 #ifdef HAVE_FILE_H
48 #include <file.h>
49 #endif
50
51 #undef LIST_HEAD
52
53 #include "llite_lib.h"
54
55 #ifndef MAY_EXEC
56 #define MAY_EXEC        1
57 #define MAY_WRITE       2
58 #define MAY_READ        4
59 #endif
60
61 #define S_IXUGO (S_IXUSR|S_IXGRP|S_IXOTH)
62
63 static int ll_permission(struct inode *inode, int mask)
64 {
65         struct intnl_stat *st = llu_i2stat(inode);
66         mode_t mode = st->st_mode;
67
68         if (current->fsuid == st->st_uid)
69                 mode >>= 6;
70         else if (in_group_p(st->st_gid))
71                 mode >>= 3;
72
73         if ((mode & mask & (MAY_READ|MAY_WRITE|MAY_EXEC)) == mask)
74                 return 0;
75
76         if ((mask & (MAY_READ|MAY_WRITE)) ||
77             (st->st_mode & S_IXUGO))
78                 if (capable(CAP_DAC_OVERRIDE))
79                         return 0;
80
81         if (mask == MAY_READ ||
82             (S_ISDIR(st->st_mode) && !(mask & MAY_WRITE))) {
83                 if (capable(CAP_DAC_READ_SEARCH))
84                         return 0;
85         }
86
87         return -EACCES;
88 }
89
90 static void llu_fsop_gone(struct filesys *fs)
91 {
92         struct llu_sb_info *sbi = (struct llu_sb_info *) fs->fs_private;
93         struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp);
94         int next = 0;
95         ENTRY;
96
97         list_del(&sbi->ll_conn_chain);
98         obd_disconnect(sbi->ll_osc_exp);
99         obd_disconnect(sbi->ll_mdc_exp);
100
101         while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL)
102                 class_manual_cleanup(obd);
103
104         OBD_FREE(sbi, sizeof(*sbi));
105
106         EXIT;
107 }
108
109 static struct inode_ops llu_inode_ops;
110
111 void llu_update_inode(struct inode *inode, struct mds_body *body,
112                       struct lov_stripe_md *lsm)
113 {
114         struct llu_inode_info *lli = llu_i2info(inode);
115         struct intnl_stat *st = llu_i2stat(inode);
116
117         LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
118         if (lsm != NULL) {
119                 if (lli->lli_smd == NULL) {
120                         lli->lli_smd = lsm;
121                         lli->lli_maxbytes = lsm->lsm_maxbytes;
122                         if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES)
123                                 lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
124                 } else {
125                         if (lov_stripe_md_cmp(lli->lli_smd, lsm)) {
126                                 CERROR("lsm mismatch for inode %lld\n",
127                                        (long long)st->st_ino);
128                                 LBUG();
129                         }
130                 }
131         }
132
133         if (body->valid & OBD_MD_FLID)
134                 st->st_ino = body->ino;
135         if (body->valid & OBD_MD_FLATIME &&
136             body->atime > LTIME_S(st->st_atime))
137                 LTIME_S(st->st_atime) = body->atime;
138         
139         /* mtime is always updated with ctime, but can be set in past.
140            As write and utime(2) may happen within 1 second, and utime's
141            mtime has a priority over write's one, so take mtime from mds 
142            for the same ctimes. */
143         if (body->valid & OBD_MD_FLCTIME &&
144             body->ctime >= LTIME_S(st->st_ctime)) {
145                 LTIME_S(st->st_ctime) = body->ctime;
146                 if (body->valid & OBD_MD_FLMTIME)
147                         LTIME_S(st->st_mtime) = body->mtime;
148         }
149         if (body->valid & OBD_MD_FLMODE)
150                 st->st_mode = (st->st_mode & S_IFMT)|(body->mode & ~S_IFMT);
151         if (body->valid & OBD_MD_FLTYPE)
152                 st->st_mode = (st->st_mode & ~S_IFMT)|(body->mode & S_IFMT);
153         if (S_ISREG(st->st_mode))
154                 st->st_blksize = min(2UL * PTLRPC_MAX_BRW_SIZE, LL_MAX_BLKSIZE);
155         else
156                 st->st_blksize = 4096;
157         if (body->valid & OBD_MD_FLUID)
158                 st->st_uid = body->uid;
159         if (body->valid & OBD_MD_FLGID)
160                 st->st_gid = body->gid;
161         if (body->valid & OBD_MD_FLNLINK)
162                 st->st_nlink = body->nlink;
163         if (body->valid & OBD_MD_FLRDEV)
164                 st->st_rdev = body->rdev;
165         if (body->valid & OBD_MD_FLSIZE)
166                 st->st_size = body->size;
167         if (body->valid & OBD_MD_FLBLOCKS)
168                 st->st_blocks = body->blocks;
169         if (body->valid & OBD_MD_FLFLAGS)
170                 lli->lli_st_flags = body->flags;
171         if (body->valid & OBD_MD_FLGENER)
172                 lli->lli_st_generation = body->generation;
173
174         /* fillin fid */
175         if (body->valid & OBD_MD_FLID)
176                 lli->lli_fid.id = body->ino;
177         if (body->valid & OBD_MD_FLGENER)
178                 lli->lli_fid.generation = body->generation;
179         if (body->valid & OBD_MD_FLTYPE)
180                 lli->lli_fid.f_type = body->mode & S_IFMT;
181 }
182
183 void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid)
184 {
185         struct llu_inode_info *lli = llu_i2info(dst);
186         struct intnl_stat *st = llu_i2stat(dst);
187
188         valid &= src->o_valid;
189
190         if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME))
191                 CDEBUG(D_INODE,"valid "LPX64", cur time %lu/%lu, new %lu/%lu\n",
192                        src->o_valid,
193                        LTIME_S(st->st_mtime), LTIME_S(st->st_ctime),
194                        (long)src->o_mtime, (long)src->o_ctime);
195
196         if (valid & OBD_MD_FLATIME)
197                 LTIME_S(st->st_atime) = src->o_atime;
198         if (valid & OBD_MD_FLMTIME)
199                 LTIME_S(st->st_mtime) = src->o_mtime;
200         if (valid & OBD_MD_FLCTIME && src->o_ctime > LTIME_S(st->st_ctime))
201                 LTIME_S(st->st_ctime) = src->o_ctime;
202         if (valid & OBD_MD_FLSIZE)
203                 st->st_size = src->o_size;
204         if (valid & OBD_MD_FLBLOCKS) /* allocation of space */
205                 st->st_blocks = src->o_blocks;
206         if (valid & OBD_MD_FLBLKSZ)
207                 st->st_blksize = src->o_blksize;
208         if (valid & OBD_MD_FLTYPE)
209                 st->st_mode = (st->st_mode & ~S_IFMT) | (src->o_mode & S_IFMT);
210         if (valid & OBD_MD_FLMODE)
211                 st->st_mode = (st->st_mode & S_IFMT) | (src->o_mode & ~S_IFMT);
212         if (valid & OBD_MD_FLUID)
213                 st->st_uid = src->o_uid;
214         if (valid & OBD_MD_FLGID)
215                 st->st_gid = src->o_gid;
216         if (valid & OBD_MD_FLFLAGS)
217                 lli->lli_st_flags = src->o_flags;
218         if (valid & OBD_MD_FLGENER)
219                 lli->lli_st_generation = src->o_generation;
220 }
221
222 #define S_IRWXUGO       (S_IRWXU|S_IRWXG|S_IRWXO)
223 #define S_IALLUGO       (S_ISUID|S_ISGID|S_ISVTX|S_IRWXUGO)
224
225 void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid)
226 {
227         struct llu_inode_info *lli = llu_i2info(src);
228         struct intnl_stat *st = llu_i2stat(src);
229         obd_flag newvalid = 0;
230
231         if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME))
232                 CDEBUG(D_INODE, "valid %x, new time %lu/%lu\n",
233                        valid, LTIME_S(st->st_mtime),
234                        LTIME_S(st->st_ctime));
235
236         if (valid & OBD_MD_FLATIME) {
237                 dst->o_atime = LTIME_S(st->st_atime);
238                 newvalid |= OBD_MD_FLATIME;
239         }
240         if (valid & OBD_MD_FLMTIME) {
241                 dst->o_mtime = LTIME_S(st->st_mtime);
242                 newvalid |= OBD_MD_FLMTIME;
243         }
244         if (valid & OBD_MD_FLCTIME) {
245                 dst->o_ctime = LTIME_S(st->st_ctime);
246                 newvalid |= OBD_MD_FLCTIME;
247         }
248         if (valid & OBD_MD_FLSIZE) {
249                 dst->o_size = st->st_size;
250                 newvalid |= OBD_MD_FLSIZE;
251         }
252         if (valid & OBD_MD_FLBLOCKS) {  /* allocation of space (x512 bytes) */
253                 dst->o_blocks = st->st_blocks;
254                 newvalid |= OBD_MD_FLBLOCKS;
255         }
256         if (valid & OBD_MD_FLBLKSZ) {   /* optimal block size */
257                 dst->o_blksize = st->st_blksize;
258                 newvalid |= OBD_MD_FLBLKSZ;
259         }
260         if (valid & OBD_MD_FLTYPE) {
261                 dst->o_mode = (dst->o_mode & S_IALLUGO)|(st->st_mode & S_IFMT);
262                 newvalid |= OBD_MD_FLTYPE;
263         }
264         if (valid & OBD_MD_FLMODE) {
265                 dst->o_mode = (dst->o_mode & S_IFMT)|(st->st_mode & S_IALLUGO);
266                 newvalid |= OBD_MD_FLMODE;
267         }
268         if (valid & OBD_MD_FLUID) {
269                 dst->o_uid = st->st_uid;
270                 newvalid |= OBD_MD_FLUID;
271         }
272         if (valid & OBD_MD_FLGID) {
273                 dst->o_gid = st->st_gid;
274                 newvalid |= OBD_MD_FLGID;
275         }
276         if (valid & OBD_MD_FLFLAGS) {
277                 dst->o_flags = lli->lli_st_flags;
278                 newvalid |= OBD_MD_FLFLAGS;
279         }
280         if (valid & OBD_MD_FLGENER) {
281                 dst->o_generation = lli->lli_st_generation;
282                 newvalid |= OBD_MD_FLGENER;
283         }
284         if (valid & OBD_MD_FLFID) {
285                 dst->o_fid = st->st_ino;
286                 newvalid |= OBD_MD_FLFID;
287         }
288
289         dst->o_valid |= newvalid;
290 }
291
292 /*
293  * really does the getattr on the inode and updates its fields
294  */
295 int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm)
296 {
297         struct llu_inode_info *lli = llu_i2info(inode);
298         struct obd_export *exp = llu_i2obdexp(inode);
299         struct ptlrpc_request_set *set;
300         struct obd_info oinfo = { { { 0 } } };
301         struct obdo oa = { 0 };
302         obd_flag refresh_valid;
303         int rc;
304         ENTRY;
305
306         LASSERT(lsm);
307         LASSERT(lli);
308
309         oinfo.oi_md = lsm;
310         oinfo.oi_oa = &oa;
311         oa.o_id = lsm->lsm_object_id;
312         oa.o_mode = S_IFREG;
313         oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
314                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
315                 OBD_MD_FLCTIME;
316
317         set = ptlrpc_prep_set();
318         if (set == NULL) {
319                 CERROR ("ENOMEM allocing request set\n");
320                 rc = -ENOMEM;
321         } else {
322                 rc = obd_getattr_async(exp, &oinfo, set);
323                 if (rc == 0)
324                         rc = ptlrpc_set_wait(set);
325                 ptlrpc_set_destroy(set);
326         }
327         if (rc)
328                 RETURN(rc);
329
330         refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
331                         OBD_MD_FLCTIME | OBD_MD_FLSIZE;
332
333         obdo_refresh_inode(inode, &oa, refresh_valid);
334
335         RETURN(0);
336 }
337
338 static struct inode* llu_new_inode(struct filesys *fs,
339                                    struct ll_fid *fid)
340 {
341         struct inode *inode;
342         struct llu_inode_info *lli;
343         struct intnl_stat st = {
344                 .st_dev  = 0,
345 #ifndef AUTOMOUNT_FILE_NAME
346                 .st_mode = fid->f_type & S_IFMT,
347 #else
348                 .st_mode = fid->f_type /* all of the bits! */
349 #endif
350                 .st_uid  = geteuid(),
351                 .st_gid  = getegid(),
352         };
353
354         OBD_ALLOC(lli, sizeof(*lli));
355         if (!lli)
356                 return NULL;
357
358         /* initialize lli here */
359         lli->lli_sbi = llu_fs2sbi(fs);
360         lli->lli_smd = NULL;
361         lli->lli_symlink_name = NULL;
362         lli->lli_flags = 0;
363         lli->lli_maxbytes = (__u64)(~0UL);
364         lli->lli_file_data = NULL;
365
366         lli->lli_sysio_fid.fid_data = &lli->lli_fid;
367         lli->lli_sysio_fid.fid_len = sizeof(lli->lli_fid);
368         lli->lli_fid = *fid;
369
370         /* file identifier is needed by functions like _sysio_i_find() */
371         inode = _sysio_i_new(fs, &lli->lli_sysio_fid,
372                              &st, 0, &llu_inode_ops, lli);
373
374         if (!inode)
375                 OBD_FREE(lli, sizeof(*lli));
376
377         return inode;
378 }
379
380 static int llu_have_md_lock(struct inode *inode, __u64 lockpart)
381 {
382         struct llu_sb_info *sbi = llu_i2sbi(inode);
383         struct llu_inode_info *lli = llu_i2info(inode);
384         struct lustre_handle lockh;
385         struct ldlm_res_id res_id = { .name = {0} };
386         struct obd_device *obddev;
387         ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
388         int flags;
389         ENTRY;
390
391         LASSERT(inode);
392
393         obddev = sbi->ll_mdc_exp->exp_obd;
394         res_id.name[0] = llu_i2stat(inode)->st_ino;
395         res_id.name[1] = lli->lli_st_generation;
396
397         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
398
399         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
400         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
401                             &policy, LCK_PW | LCK_PR, &lockh)) {
402                 RETURN(1);
403         }
404         RETURN(0);
405 }
406
407 static int llu_inode_revalidate(struct inode *inode)
408 {
409         struct lov_stripe_md *lsm = NULL;
410         ENTRY;
411
412         if (!inode) {
413                 CERROR("REPORT THIS LINE TO PETER\n");
414                 RETURN(0);
415         }
416
417         if (!llu_have_md_lock(inode, MDS_INODELOCK_UPDATE)) {
418                 struct lustre_md md;
419                 struct ptlrpc_request *req = NULL;
420                 struct llu_sb_info *sbi = llu_i2sbi(inode);
421                 struct ll_fid fid;
422                 unsigned long valid = OBD_MD_FLGETATTR;
423                 int rc, ealen = 0;
424
425                 /* Why don't we update all valid MDS fields here, if we're
426                  * doing an RPC anyways?  -phil */
427                 if (S_ISREG(llu_i2stat(inode)->st_mode)) {
428                         ealen = obd_size_diskmd(sbi->ll_osc_exp, NULL);
429                         valid |= OBD_MD_FLEASIZE;
430                 }
431                 ll_inode2fid(&fid, inode);
432                 rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
433                 if (rc) {
434                         CERROR("failure %d inode %llu\n", rc,
435                                (long long)llu_i2stat(inode)->st_ino);
436                         RETURN(-abs(rc));
437                 }
438                 rc = mdc_req2lustre_md(req, REPLY_REC_OFF, sbi->ll_osc_exp,&md);
439
440                 /* XXX Too paranoid? */
441                 if (((md.body->valid ^ valid) & OBD_MD_FLEASIZE) &&
442                     !((md.body->valid & OBD_MD_FLNLINK) &&
443                       (md.body->nlink == 0))) {
444                         CERROR("Asked for %s eadata but got %s (%d)\n",
445                                (valid & OBD_MD_FLEASIZE) ? "some" : "no",
446                                (md.body->valid & OBD_MD_FLEASIZE) ? "some":"none",
447                                 md.body->eadatasize);
448                 }
449                 if (rc) {
450                         ptlrpc_req_finished(req);
451                         RETURN(rc);
452                 }
453
454
455                 llu_update_inode(inode, md.body, md.lsm);
456                 if (md.lsm != NULL && llu_i2info(inode)->lli_smd != md.lsm)
457                         obd_free_memmd(sbi->ll_osc_exp, &md.lsm);
458
459                 if (md.body->valid & OBD_MD_FLSIZE)
460                         set_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
461                                 &llu_i2info(inode)->lli_flags);
462                 ptlrpc_req_finished(req);
463         }
464
465         lsm = llu_i2info(inode)->lli_smd;
466         if (!lsm)       /* object not yet allocated, don't validate size */
467                 RETURN(0);
468
469         /* ll_glimpse_size will prefer locally cached writes if they extend
470          * the file */
471         RETURN(llu_glimpse_size(inode));
472 }
473
474 static void copy_stat_buf(struct inode *ino, struct intnl_stat *b)
475 {
476         *b = *llu_i2stat(ino);
477 }
478
479 static int llu_iop_getattr(struct pnode *pno,
480                            struct inode *ino,
481                            struct intnl_stat *b)
482 {
483         int rc;
484         ENTRY;
485
486         liblustre_wait_event(0);
487
488         if (!ino) {
489                 LASSERT(pno);
490                 LASSERT(pno->p_base->pb_ino);
491                 ino = pno->p_base->pb_ino;
492         } else {
493                 LASSERT(!pno || pno->p_base->pb_ino == ino);
494         }
495
496         /* libsysio might call us directly without intent lock,
497          * we must re-fetch the attrs here
498          */
499         rc = llu_inode_revalidate(ino);
500         if (!rc) {
501                 copy_stat_buf(ino, b);
502                 LASSERT(!llu_i2info(ino)->lli_it);
503         }
504
505         liblustre_wait_event(0);
506         RETURN(rc);
507 }
508
509 static int null_if_equal(struct ldlm_lock *lock, void *data)
510 {
511         if (data == lock->l_ast_data) {
512                 lock->l_ast_data = NULL;
513
514                 if (lock->l_req_mode != lock->l_granted_mode)
515                         LDLM_ERROR(lock,"clearing inode with ungranted lock\n");
516         }
517
518         return LDLM_ITER_CONTINUE;
519 }
520
521 void llu_clear_inode(struct inode *inode)
522 {
523         struct ll_fid fid;
524         struct llu_inode_info *lli = llu_i2info(inode);
525         struct llu_sb_info *sbi = llu_i2sbi(inode);
526         ENTRY;
527
528         CDEBUG(D_VFSTRACE, "VFS Op:inode=%llu/%lu(%p)\n",
529                (long long)llu_i2stat(inode)->st_ino, lli->lli_st_generation,
530                inode);
531
532         ll_inode2fid(&fid, inode);
533         clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(lli->lli_flags));
534         mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode);
535
536         if (lli->lli_smd)
537                 obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd,
538                                   null_if_equal, inode);
539
540         if (lli->lli_smd) {
541                 obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd);
542                 lli->lli_smd = NULL;
543         }
544
545         if (lli->lli_symlink_name) {
546                 OBD_FREE(lli->lli_symlink_name,
547                          strlen(lli->lli_symlink_name) + 1);
548                 lli->lli_symlink_name = NULL;
549         }
550
551         EXIT;
552 }
553
554 void llu_iop_gone(struct inode *inode)
555 {
556         struct llu_inode_info *lli = llu_i2info(inode);
557         ENTRY;
558
559         liblustre_wait_event(0);
560         llu_clear_inode(inode);
561
562         OBD_FREE(lli, sizeof(*lli));
563         EXIT;
564 }
565
566 static int inode_setattr(struct inode * inode, struct iattr * attr)
567 {
568         unsigned int ia_valid = attr->ia_valid;
569         struct intnl_stat *st = llu_i2stat(inode);
570         int error = 0;
571
572         /*
573          * inode_setattr() is only ever invoked with ATTR_SIZE (by
574          * llu_setattr_raw()) when file has no bodies. Check this.
575          */
576         LASSERT(ergo(ia_valid & ATTR_SIZE, llu_i2info(inode)->lli_smd == NULL));
577
578         if (ia_valid & ATTR_SIZE)
579                 st->st_size = attr->ia_size;
580         if (ia_valid & ATTR_UID)
581                 st->st_uid = attr->ia_uid;
582         if (ia_valid & ATTR_GID)
583                 st->st_gid = attr->ia_gid;
584         if (ia_valid & ATTR_ATIME)
585                 st->st_atime = attr->ia_atime;
586         if (ia_valid & ATTR_MTIME)
587                 st->st_mtime = attr->ia_mtime;
588         if (ia_valid & ATTR_CTIME)
589                 st->st_ctime = attr->ia_ctime;
590         if (ia_valid & ATTR_MODE) {
591                 st->st_mode = attr->ia_mode;
592                 if (!in_group_p(st->st_gid) && !capable(CAP_FSETID))
593                         st->st_mode &= ~S_ISGID;
594         }
595         /* mark_inode_dirty(inode); */
596         return error;
597 }
598
599 /* If this inode has objects allocated to it (lsm != NULL), then the OST
600  * object(s) determine the file size and mtime.  Otherwise, the MDS will
601  * keep these values until such a time that objects are allocated for it.
602  * We do the MDS operations first, as it is checking permissions for us.
603  * We don't to the MDS RPC if there is nothing that we want to store there,
604  * otherwise there is no harm in updating mtime/atime on the MDS if we are
605  * going to do an RPC anyways.
606  *
607  * If we are doing a truncate, we will send the mtime and ctime updates
608  * to the OST with the punch RPC, otherwise we do an explicit setattr RPC.
609  * I don't believe it is possible to get e.g. ATTR_MTIME_SET and ATTR_SIZE
610  * at the same time.
611  */
612 int llu_setattr_raw(struct inode *inode, struct iattr *attr)
613 {
614         struct lov_stripe_md *lsm = llu_i2info(inode)->lli_smd;
615         struct llu_sb_info *sbi = llu_i2sbi(inode);
616         struct intnl_stat *st = llu_i2stat(inode);
617         struct ptlrpc_request *request = NULL;
618         struct mdc_op_data op_data;
619         int ia_valid = attr->ia_valid;
620         int rc = 0;
621         ENTRY;
622
623         CDEBUG(D_VFSTRACE, "VFS Op:inode=%llu\n", (long long)st->st_ino);
624
625         if (ia_valid & ATTR_SIZE) {
626                 if (attr->ia_size > ll_file_maxbytes(inode)) {
627                         CDEBUG(D_INODE, "file too large %llu > "LPU64"\n",
628                                (long long)attr->ia_size,
629                                ll_file_maxbytes(inode));
630                         RETURN(-EFBIG);
631                 }
632
633                 attr->ia_valid |= ATTR_MTIME | ATTR_CTIME;
634         }
635
636         /* We mark all of the fields "set" so MDS/OST does not re-set them */
637         if (attr->ia_valid & ATTR_CTIME) {
638                 attr->ia_ctime = CURRENT_TIME;
639                 attr->ia_valid |= ATTR_CTIME_SET;
640         }
641         if (!(ia_valid & ATTR_ATIME_SET) && (attr->ia_valid & ATTR_ATIME)) {
642                 attr->ia_atime = CURRENT_TIME;
643                 attr->ia_valid |= ATTR_ATIME_SET;
644         }
645         if (!(ia_valid & ATTR_MTIME_SET) && (attr->ia_valid & ATTR_MTIME)) {
646                 attr->ia_mtime = CURRENT_TIME;
647                 attr->ia_valid |= ATTR_MTIME_SET;
648         }
649         if ((attr->ia_valid & ATTR_CTIME) && !(attr->ia_valid & ATTR_MTIME)) {
650                 /* To avoid stale mtime on mds, obtain it from ost and send 
651                    to mds. */
652                 rc = llu_glimpse_size(inode);
653                 if (rc) 
654                         RETURN(rc);
655                 
656                 attr->ia_valid |= ATTR_MTIME_SET | ATTR_MTIME;
657                 attr->ia_mtime = inode->i_stbuf.st_mtime;
658         }
659
660         if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
661                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n",
662                        LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
663                        LTIME_S(CURRENT_TIME));
664         if (lsm)
665                 attr->ia_valid &= ~ATTR_SIZE;
666
667         /* If only OST attributes being set on objects, don't do MDS RPC.
668          * In that case, we need to check permissions and update the local
669          * inode ourselves so we can call obdo_from_inode() always. */
670         if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) {
671                 struct lustre_md md;
672                 llu_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0);
673
674                 rc = mdc_setattr(sbi->ll_mdc_exp, &op_data,
675                                   attr, NULL, 0, NULL, 0, &request);
676
677                 if (rc) {
678                         ptlrpc_req_finished(request);
679                         if (rc != -EPERM && rc != -EACCES)
680                                 CERROR("mdc_setattr fails: rc = %d\n", rc);
681                         RETURN(rc);
682                 }
683
684                 rc = mdc_req2lustre_md(request, REPLY_REC_OFF, sbi->ll_osc_exp,
685                                        &md);
686                 if (rc) {
687                         ptlrpc_req_finished(request);
688                         RETURN(rc);
689                 }
690
691                 /* We call inode_setattr to adjust timestamps.
692                  * If there is at least some data in file, we cleared ATTR_SIZE
693                  * above to avoid invoking vmtruncate, otherwise it is important
694                  * to call vmtruncate in inode_setattr to update inode->i_size
695                  * (bug 6196) */
696                 inode_setattr(inode, attr);
697                 llu_update_inode(inode, md.body, md.lsm);
698                 ptlrpc_req_finished(request);
699
700                 if (!lsm || !S_ISREG(st->st_mode)) {
701                         CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
702                         RETURN(0);
703                 }
704         } else {
705                 /* The OST doesn't check permissions, but the alternative is
706                  * a gratuitous RPC to the MDS.  We already rely on the client
707                  * to do read/write/truncate permission checks, so is mtime OK?
708                  */
709                 if (ia_valid & (ATTR_MTIME | ATTR_ATIME)) {
710                         /* from sys_utime() */
711                         if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) {
712                                 if (current->fsuid != st->st_uid &&
713                                     (rc = ll_permission(inode, MAY_WRITE)) != 0)
714                                         RETURN(rc);
715                         } else {
716                                 /* from inode_change_ok() */
717                                 if (current->fsuid != st->st_uid &&
718                                     !capable(CAP_FOWNER))
719                                         RETURN(-EPERM);
720                         }
721                 }
722
723                 /* Won't invoke llu_vmtruncate(), as we already cleared
724                  * ATTR_SIZE */
725                 inode_setattr(inode, attr);
726         }
727
728         if (ia_valid & ATTR_SIZE) {
729                 ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
730                                                            OBD_OBJECT_EOF} };
731                 struct lustre_handle lockh = { 0, };
732                 struct lustre_handle match_lockh = { 0, };
733
734                 int err;
735                 int flags = LDLM_FL_TEST_LOCK; /* for assertion check below */
736                 int lock_mode;
737                 obd_flag obd_flags;
738
739                 /* check that there are no matching locks */
740                 LASSERT(obd_match(sbi->ll_osc_exp, lsm, LDLM_EXTENT, &policy,
741                                   LCK_PW, &flags, inode, &match_lockh) <= 0);
742
743                 /* XXX when we fix the AST intents to pass the discard-range
744                  * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
745                  * XXX here. */
746                 flags = (attr->ia_size == 0) ? LDLM_AST_DISCARD_DATA : 0;
747
748                 if (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK) {
749                         lock_mode = LCK_NL;
750                         obd_flags = OBD_FL_TRUNCLOCK;
751                         CDEBUG(D_INODE, "delegating locking to the OST");
752                 } else {
753                         lock_mode = LCK_PW;
754                         obd_flags = 0;
755                 }
756
757                 /* with lock_mode == LK_NL no lock is taken. */
758                 rc = llu_extent_lock(NULL, inode, lsm, lock_mode, &policy,
759                                      &lockh, flags);
760                 if (rc != ELDLM_OK) {
761                         if (rc > 0)
762                                 RETURN(-ENOLCK);
763                         RETURN(rc);
764                 }
765
766                 rc = llu_vmtruncate(inode, attr->ia_size, obd_flags);
767
768                 /* unlock now as we don't mind others file lockers racing with
769                  * the mds updates below? */
770                 err = llu_extent_unlock(NULL, inode, lsm, lock_mode, &lockh);
771                 if (err) {
772                         CERROR("llu_extent_unlock failed: %d\n", err);
773                         if (!rc)
774                                 rc = err;
775                 }
776         } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
777                 struct obd_info oinfo = { { { 0 } } };
778                 struct obdo oa;
779
780                 CDEBUG(D_INODE, "set mtime on OST inode %llu to %lu\n",
781                        (long long)st->st_ino, LTIME_S(attr->ia_mtime));
782                 oa.o_id = lsm->lsm_object_id;
783                 oa.o_valid = OBD_MD_FLID;
784
785                 obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
786                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME);
787
788                 oinfo.oi_oa = &oa;
789                 oinfo.oi_md = lsm;
790
791                 rc = obd_setattr_rqset(sbi->ll_osc_exp, &oinfo, NULL);
792                 if (rc)
793                         CERROR("obd_setattr_async fails: rc=%d\n", rc);
794         }
795         RETURN(rc);
796 }
797
798 /* here we simply act as a thin layer to glue it with
799  * llu_setattr_raw(), which is copy from kernel
800  */
801 static int llu_iop_setattr(struct pnode *pno,
802                            struct inode *ino,
803                            unsigned mask,
804                            struct intnl_stat *stbuf)
805 {
806         struct iattr iattr;
807         int rc;
808         ENTRY;
809
810         liblustre_wait_event(0);
811
812         LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME |
813                            SETATTR_UID | SETATTR_GID |
814                            SETATTR_LEN | SETATTR_MODE)));
815         memset(&iattr, 0, sizeof(iattr));
816
817         if (mask & SETATTR_MODE) {
818                 iattr.ia_mode = stbuf->st_mode;
819                 iattr.ia_valid |= ATTR_MODE;
820         }
821         if (mask & SETATTR_MTIME) {
822                 iattr.ia_mtime = stbuf->st_mtime;
823                 iattr.ia_valid |= ATTR_MTIME | ATTR_MTIME_SET;
824         }
825         if (mask & SETATTR_ATIME) {
826                 iattr.ia_atime = stbuf->st_atime;
827                 iattr.ia_valid |= ATTR_ATIME | ATTR_ATIME_SET;
828         }
829         if (mask & SETATTR_UID) {
830                 iattr.ia_uid = stbuf->st_uid;
831                 iattr.ia_valid |= ATTR_UID;
832         }
833         if (mask & SETATTR_GID) {
834                 iattr.ia_gid = stbuf->st_gid;
835                 iattr.ia_valid |= ATTR_GID;
836         }
837         if (mask & SETATTR_LEN) {
838                 iattr.ia_size = stbuf->st_size; /* XXX signed expansion problem */
839                 iattr.ia_valid |= ATTR_SIZE;
840         }
841
842         iattr.ia_valid |= ATTR_RAW | ATTR_CTIME;
843         iattr.ia_ctime = CURRENT_TIME;
844
845         rc = llu_setattr_raw(ino, &iattr);
846         liblustre_wait_event(0);
847         RETURN(rc);
848 }
849
850 #define EXT2_LINK_MAX           32000
851
852 static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt)
853 {
854         struct inode *dir = pno->p_base->pb_parent->pb_ino;
855         struct qstr *qstr = &pno->p_base->pb_name;
856         const char *name = qstr->name;
857         int len = qstr->len;
858         struct ptlrpc_request *request = NULL;
859         struct llu_sb_info *sbi = llu_i2sbi(dir);
860         struct mdc_op_data op_data;
861         int err = -EMLINK;
862         ENTRY;
863
864         liblustre_wait_event(0);
865         if (llu_i2stat(dir)->st_nlink >= EXT2_LINK_MAX)
866                 RETURN(err);
867
868         llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
869         err = mdc_create(sbi->ll_mdc_exp, &op_data,
870                          tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
871                          current->fsuid, current->fsgid, current->cap_effective,
872                          0, &request);
873         ptlrpc_req_finished(request);
874         liblustre_wait_event(0);
875         RETURN(err);
876 }
877
878 static int llu_readlink_internal(struct inode *inode,
879                                  struct ptlrpc_request **request,
880                                  char **symname)
881 {
882         struct llu_inode_info *lli = llu_i2info(inode);
883         struct llu_sb_info *sbi = llu_i2sbi(inode);
884         struct ll_fid fid;
885         struct mds_body *body;
886         struct intnl_stat *st = llu_i2stat(inode);
887         int rc, symlen = st->st_size + 1;
888         ENTRY;
889
890         *request = NULL;
891
892         if (lli->lli_symlink_name) {
893                 *symname = lli->lli_symlink_name;
894                 CDEBUG(D_INODE, "using cached symlink %s\n", *symname);
895                 RETURN(0);
896         }
897
898         ll_inode2fid(&fid, inode);
899         rc = mdc_getattr(sbi->ll_mdc_exp, &fid,
900                          OBD_MD_LINKNAME, symlen, request);
901         if (rc) {
902                 CERROR("inode %llu: rc = %d\n", (long long)st->st_ino, rc);
903                 RETURN(rc);
904         }
905
906         body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF,
907                               sizeof(*body));
908         LASSERT(body != NULL);
909         LASSERT_REPSWABBED(*request, REPLY_REC_OFF);
910
911         if ((body->valid & OBD_MD_LINKNAME) == 0) {
912                 CERROR ("OBD_MD_LINKNAME not set on reply\n");
913                 GOTO (failed, rc = -EPROTO);
914         }
915
916         LASSERT(symlen != 0);
917         if (body->eadatasize != symlen) {
918                 CERROR("inode %llu: symlink length %d not expected %d\n",
919                        (long long)st->st_ino, body->eadatasize - 1, symlen - 1);
920                 GOTO(failed, rc = -EPROTO);
921         }
922
923         *symname = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF + 1,
924                                    symlen);
925         if (*symname == NULL ||
926             strnlen(*symname, symlen) != symlen - 1) {
927                 /* not full/NULL terminated */
928                 CERROR("inode %llu: symlink not NULL terminated string"
929                        "of length %d\n", (long long)st->st_ino, symlen - 1);
930                 GOTO(failed, rc = -EPROTO);
931         }
932
933         OBD_ALLOC(lli->lli_symlink_name, symlen);
934         /* do not return an error if we cannot cache the symlink locally */
935         if (lli->lli_symlink_name)
936                 memcpy(lli->lli_symlink_name, *symname, symlen);
937
938         RETURN(0);
939
940  failed:
941         ptlrpc_req_finished (*request);
942         RETURN (-EPROTO);
943 }
944
945 static int llu_iop_readlink(struct pnode *pno, char *data, size_t bufsize)
946 {
947         struct inode *inode = pno->p_base->pb_ino;
948         struct ptlrpc_request *request;
949         char *symname;
950         int rc;
951         ENTRY;
952
953         liblustre_wait_event(0);
954         rc = llu_readlink_internal(inode, &request, &symname);
955         if (rc)
956                 GOTO(out, rc);
957
958         LASSERT(symname);
959         strncpy(data, symname, bufsize);
960         rc = strlen(symname);
961
962         ptlrpc_req_finished(request);
963  out:
964         liblustre_wait_event(0);
965         RETURN(rc);
966 }
967
968 static int llu_iop_mknod_raw(struct pnode *pno,
969                              mode_t mode,
970                              dev_t dev)
971 {
972         struct ptlrpc_request *request = NULL;
973         struct inode *dir = pno->p_parent->p_base->pb_ino;
974         struct llu_sb_info *sbi = llu_i2sbi(dir);
975         struct mdc_op_data op_data;
976         int err = -EMLINK;
977         ENTRY;
978
979         liblustre_wait_event(0);
980         CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%llu\n",
981                (int)pno->p_base->pb_name.len, pno->p_base->pb_name.name,
982                (long long)llu_i2stat(dir)->st_ino);
983
984         if (llu_i2stat(dir)->st_nlink >= EXT2_LINK_MAX)
985                 RETURN(err);
986
987         switch (mode & S_IFMT) {
988         case 0:
989         case S_IFREG:
990                 mode |= S_IFREG; /* for mode = 0 case, fallthrough */
991         case S_IFCHR:
992         case S_IFBLK:
993         case S_IFIFO:
994         case S_IFSOCK:
995                 llu_prepare_mdc_op_data(&op_data, dir, NULL,
996                                         pno->p_base->pb_name.name,
997                                         pno->p_base->pb_name.len,
998                                         0);
999                 err = mdc_create(sbi->ll_mdc_exp, &op_data, NULL, 0, mode,
1000                                  current->fsuid, current->fsgid,
1001                                  current->cap_effective, dev, &request);
1002                 ptlrpc_req_finished(request);
1003                 break;
1004         case S_IFDIR:
1005                 err = -EPERM;
1006                 break;
1007         default:
1008                 err = -EINVAL;
1009         }
1010         liblustre_wait_event(0);
1011         RETURN(err);
1012 }
1013
1014 static int llu_iop_link_raw(struct pnode *old, struct pnode *new)
1015 {
1016         struct inode *src = old->p_base->pb_ino;
1017         struct inode *dir = new->p_parent->p_base->pb_ino;
1018         const char *name = new->p_base->pb_name.name;
1019         int namelen = new->p_base->pb_name.len;
1020         struct ptlrpc_request *request = NULL;
1021         struct mdc_op_data op_data;
1022         int rc;
1023         ENTRY;
1024
1025         LASSERT(src);
1026         LASSERT(dir);
1027
1028         liblustre_wait_event(0);
1029         llu_prepare_mdc_op_data(&op_data, src, dir, name, namelen, 0);
1030         rc = mdc_link(llu_i2sbi(src)->ll_mdc_exp, &op_data, &request);
1031         ptlrpc_req_finished(request);
1032         liblustre_wait_event(0);
1033
1034         RETURN(rc);
1035 }
1036
1037 /*
1038  * libsysio will clear the inode immediately after return
1039  */
1040 static int llu_iop_unlink_raw(struct pnode *pno)
1041 {
1042         struct inode *dir = pno->p_base->pb_parent->pb_ino;
1043         struct qstr *qstr = &pno->p_base->pb_name;
1044         const char *name = qstr->name;
1045         int len = qstr->len;
1046         struct inode *target = pno->p_base->pb_ino;
1047         struct ptlrpc_request *request = NULL;
1048         struct mdc_op_data op_data;
1049         int rc;
1050         ENTRY;
1051
1052         LASSERT(target);
1053
1054         liblustre_wait_event(0);
1055         llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
1056         rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request);
1057         if (!rc)
1058                 rc = llu_objects_destroy(request, dir);
1059         ptlrpc_req_finished(request);
1060         liblustre_wait_event(0);
1061
1062         RETURN(rc);
1063 }
1064
1065 static int llu_iop_rename_raw(struct pnode *old, struct pnode *new)
1066 {
1067         struct inode *src = old->p_parent->p_base->pb_ino;
1068         struct inode *tgt = new->p_parent->p_base->pb_ino;
1069         const char *oldname = old->p_base->pb_name.name;
1070         int oldnamelen = old->p_base->pb_name.len;
1071         const char *newname = new->p_base->pb_name.name;
1072         int newnamelen = new->p_base->pb_name.len;
1073         struct ptlrpc_request *request = NULL;
1074         struct mdc_op_data op_data;
1075         int rc;
1076         ENTRY;
1077
1078         LASSERT(src);
1079         LASSERT(tgt);
1080
1081         liblustre_wait_event(0);
1082         llu_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0);
1083         rc = mdc_rename(llu_i2sbi(src)->ll_mdc_exp, &op_data,
1084                         oldname, oldnamelen, newname, newnamelen,
1085                         &request);
1086         if (!rc) {
1087                 rc = llu_objects_destroy(request, src);
1088         }
1089
1090         ptlrpc_req_finished(request);
1091         liblustre_wait_event(0);
1092
1093         RETURN(rc);
1094 }
1095
1096 #ifdef _HAVE_STATVFS
1097 static int llu_statfs_internal(struct llu_sb_info *sbi,
1098                                struct obd_statfs *osfs, __u64 max_age)
1099 {
1100         struct obd_statfs obd_osfs;
1101         int rc;
1102         ENTRY;
1103
1104         rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age);
1105         if (rc) {
1106                 CERROR("mdc_statfs fails: rc = %d\n", rc);
1107                 RETURN(rc);
1108         }
1109
1110         CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
1111                osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files);
1112
1113         rc = obd_statfs_rqset(class_exp2obd(sbi->ll_osc_exp),
1114                               &obd_statfs, max_age);
1115         if (rc) {
1116                 CERROR("obd_statfs fails: rc = %d\n", rc);
1117                 RETURN(rc);
1118         }
1119
1120         CDEBUG(D_SUPER, "OSC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n",
1121                obd_osfs.os_bavail, obd_osfs.os_blocks, obd_osfs.os_ffree,
1122                obd_osfs.os_files);
1123
1124         osfs->os_blocks = obd_osfs.os_blocks;
1125         osfs->os_bfree = obd_osfs.os_bfree;
1126         osfs->os_bavail = obd_osfs.os_bavail;
1127
1128         /* If we don't have as many objects free on the OST as inodes
1129          * on the MDS, we reduce the total number of inodes to
1130          * compensate, so that the "inodes in use" number is correct.
1131          */
1132         if (obd_osfs.os_ffree < osfs->os_ffree) {
1133                 osfs->os_files = (osfs->os_files - osfs->os_ffree) +
1134                         obd_osfs.os_ffree;
1135                 osfs->os_ffree = obd_osfs.os_ffree;
1136         }
1137
1138         RETURN(rc);
1139 }
1140
1141 static int llu_statfs(struct llu_sb_info *sbi, struct statfs *sfs)
1142 {
1143         struct obd_statfs osfs;
1144         int rc;
1145
1146         CDEBUG(D_VFSTRACE, "VFS Op:\n");
1147
1148         /* For now we will always get up-to-date statfs values, but in the
1149          * future we may allow some amount of caching on the client (e.g.
1150          * from QOS or lprocfs updates). */
1151         rc = llu_statfs_internal(sbi, &osfs, cfs_time_current_64() - HZ);
1152         if (rc)
1153                 return rc;
1154
1155         statfs_unpack(sfs, &osfs);
1156
1157         if (sizeof(sfs->f_blocks) == 4) {
1158                 while (osfs.os_blocks > ~0UL) {
1159                         sfs->f_bsize <<= 1;
1160
1161                         osfs.os_blocks >>= 1;
1162                         osfs.os_bfree >>= 1;
1163                         osfs.os_bavail >>= 1;
1164                 }
1165         }
1166
1167         sfs->f_blocks = osfs.os_blocks;
1168         sfs->f_bfree = osfs.os_bfree;
1169         sfs->f_bavail = osfs.os_bavail;
1170
1171         return 0;
1172 }
1173
1174 static int llu_iop_statvfs(struct pnode *pno,
1175                            struct inode *ino,
1176                            struct intnl_statvfs *buf)
1177 {
1178         struct statfs fs;
1179         int rc;
1180         ENTRY;
1181
1182         liblustre_wait_event(0);
1183
1184 #ifndef __CYGWIN__
1185         LASSERT(pno->p_base->pb_ino);
1186         rc = llu_statfs(llu_i2sbi(pno->p_base->pb_ino), &fs);
1187         if (rc)
1188                 RETURN(rc);
1189
1190         /* from native driver */
1191         buf->f_bsize = fs.f_bsize;  /* file system block size */
1192         buf->f_frsize = fs.f_bsize; /* file system fundamental block size */
1193         buf->f_blocks = fs.f_blocks;
1194         buf->f_bfree = fs.f_bfree;
1195         buf->f_bavail = fs.f_bavail;
1196         buf->f_files = fs.f_files;  /* Total number serial numbers */
1197         buf->f_ffree = fs.f_ffree;  /* Number free serial numbers */
1198         buf->f_favail = fs.f_ffree; /* Number free ser num for non-privileged*/
1199         buf->f_fsid = fs.f_fsid.__val[1];
1200         buf->f_flag = 0;            /* No equiv in statfs; maybe use type? */
1201         buf->f_namemax = fs.f_namelen;
1202 #endif
1203
1204         liblustre_wait_event(0);
1205         RETURN(0);
1206 }
1207 #endif /* _HAVE_STATVFS */
1208
1209 static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode)
1210 {
1211         struct inode *dir = pno->p_base->pb_parent->pb_ino;
1212         struct qstr *qstr = &pno->p_base->pb_name;
1213         const char *name = qstr->name;
1214         int len = qstr->len;
1215         struct ptlrpc_request *request = NULL;
1216         struct intnl_stat *st = llu_i2stat(dir);
1217         struct mdc_op_data op_data;
1218         int err = -EMLINK;
1219         ENTRY;
1220
1221         liblustre_wait_event(0);
1222         CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%llu/%lu(%p)\n", len, name,
1223                (long long)st->st_ino, llu_i2info(dir)->lli_st_generation, dir);
1224
1225         if (st->st_nlink >= EXT2_LINK_MAX)
1226                 RETURN(err);
1227
1228         llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
1229         err = mdc_create(llu_i2sbi(dir)->ll_mdc_exp, &op_data, NULL, 0, mode,
1230                          current->fsuid, current->fsgid, current->cap_effective,
1231                          0, &request);
1232         ptlrpc_req_finished(request);
1233         liblustre_wait_event(0);
1234         RETURN(err);
1235 }
1236
1237 static int llu_iop_rmdir_raw(struct pnode *pno)
1238 {
1239         struct inode *dir = pno->p_base->pb_parent->pb_ino;
1240         struct qstr *qstr = &pno->p_base->pb_name;
1241         const char *name = qstr->name;
1242         int len = qstr->len;
1243         struct ptlrpc_request *request = NULL;
1244         struct mdc_op_data op_data;
1245         int rc;
1246         ENTRY;
1247
1248         liblustre_wait_event(0);
1249         CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%llu/%lu(%p)\n", len, name,
1250                (long long)llu_i2stat(dir)->st_ino,
1251                llu_i2info(dir)->lli_st_generation, dir);
1252
1253         llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, S_IFDIR);
1254         rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request);
1255         ptlrpc_req_finished(request);
1256
1257         liblustre_wait_event(0);
1258         RETURN(rc);
1259 }
1260
1261 #ifdef O_DIRECT
1262 #define FCNTL_FLMASK (O_APPEND|O_NONBLOCK|O_ASYNC|O_DIRECT)
1263 #else
1264 #define FCNTL_FLMASK (O_APPEND|O_NONBLOCK|O_ASYNC)
1265 #endif
1266 #define FCNTL_FLMASK_INVALID (O_NONBLOCK|O_ASYNC)
1267
1268 /* refer to ll_file_flock() for details */
1269 static int llu_file_flock(struct inode *ino,
1270                           int cmd,
1271                           struct file_lock *file_lock)
1272 {
1273         struct llu_inode_info *lli = llu_i2info(ino);
1274         struct intnl_stat *st = llu_i2stat(ino);
1275         struct ldlm_res_id res_id =
1276                 { .name = {st->st_ino,
1277                            lli->lli_st_generation, LDLM_FLOCK} };
1278         struct lustre_handle lockh = {0};
1279         ldlm_policy_data_t flock;
1280         ldlm_mode_t mode = 0;
1281         int flags = 0;
1282         int rc;
1283
1284         CDEBUG(D_VFSTRACE, "VFS Op:inode="LPU64" file_lock=%p\n",
1285                st->st_ino, file_lock);
1286
1287         flock.l_flock.pid = file_lock->fl_pid;
1288         flock.l_flock.start = file_lock->fl_start;
1289         flock.l_flock.end = file_lock->fl_end;
1290
1291         switch (file_lock->fl_type) {
1292         case F_RDLCK:
1293                 mode = LCK_PR;
1294                 break;
1295         case F_UNLCK:
1296                 mode = LCK_NL;
1297                 break;
1298         case F_WRLCK:
1299                 mode = LCK_PW;
1300                 break;
1301         default:
1302                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
1303                 LBUG();
1304         }
1305
1306         switch (cmd) {
1307         case F_SETLKW:
1308 #ifdef F_SETLKW64
1309 #if F_SETLKW64 != F_SETLKW
1310         case F_SETLKW64:
1311 #endif
1312 #endif
1313                 flags = 0;
1314                 break;
1315         case F_SETLK:
1316 #ifdef F_SETLK64
1317 #if F_SETLK64 != F_SETLK
1318         case F_SETLK64:
1319 #endif
1320 #endif
1321                 flags = LDLM_FL_BLOCK_NOWAIT;
1322                 break;
1323         case F_GETLK:
1324 #ifdef F_GETLK64
1325 #if F_GETLK64 != F_GETLK
1326         case F_GETLK64:
1327 #endif
1328 #endif
1329                 flags = LDLM_FL_TEST_LOCK;
1330                 file_lock->fl_type = mode;
1331                 break;
1332         default:
1333                 CERROR("unknown fcntl cmd: %d\n", cmd);
1334                 LBUG();
1335         }
1336
1337         CDEBUG(D_DLMTRACE, "inode="LPU64", pid=%u, flags=%#x, mode=%u, "
1338                "start="LPU64", end="LPU64"\n", st->st_ino, flock.l_flock.pid,
1339                flags, mode, flock.l_flock.start, flock.l_flock.end);
1340
1341         rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, res_id, 
1342                               LDLM_FLOCK, &flock, mode, &flags, NULL, 
1343                               ldlm_flock_completion_ast, NULL, 
1344                               file_lock, NULL, 0, NULL, &lockh, 0);
1345         RETURN(rc);
1346 }
1347
1348 static int assign_type(struct file_lock *fl, int type)
1349 {
1350         switch (type) {
1351         case F_RDLCK:
1352         case F_WRLCK:
1353         case F_UNLCK:
1354                 fl->fl_type = type;
1355                 return 0;
1356         default:
1357                 return -EINVAL;
1358         }
1359 }
1360
1361 static int flock_to_posix_lock(struct inode *ino,
1362                                struct file_lock *fl,
1363                                struct flock *l)
1364 {
1365         switch (l->l_whence) {
1366         /* XXX: only SEEK_SET is supported in lustre */
1367         case SEEK_SET:
1368                 fl->fl_start = 0;
1369                 break;
1370         default:
1371                 return -EINVAL;
1372         }
1373
1374         fl->fl_end = l->l_len - 1;
1375         if (l->l_len < 0)
1376                 return -EINVAL;
1377         if (l->l_len == 0)
1378                 fl->fl_end = OFFSET_MAX;
1379
1380         fl->fl_pid = getpid();
1381         fl->fl_flags = FL_POSIX;
1382         fl->fl_notify = NULL;
1383         fl->fl_insert = NULL;
1384         fl->fl_remove = NULL;
1385         /* XXX: these fields can't be filled with suitable values,
1386                 but I think lustre doesn't use them.
1387          */
1388         fl->fl_owner = NULL;
1389         fl->fl_file = NULL;
1390
1391         return assign_type(fl, l->l_type);
1392 }
1393
1394 static int llu_fcntl_getlk(struct inode *ino, struct flock *flock)
1395 {
1396         struct file_lock fl;
1397         int error;
1398
1399         error = EINVAL;
1400         if ((flock->l_type != F_RDLCK) && (flock->l_type != F_WRLCK))
1401                 goto out;
1402
1403         error = flock_to_posix_lock(ino, &fl, flock);
1404         if (error)
1405                 goto out;
1406
1407         error = llu_file_flock(ino, F_GETLK, &fl);
1408         if (error)
1409                 goto out;
1410
1411         flock->l_type = F_UNLCK;
1412         if (fl.fl_type != F_UNLCK) {
1413                 flock->l_pid = fl.fl_pid;
1414                 flock->l_start = fl.fl_start;
1415                 flock->l_len = fl.fl_end == OFFSET_MAX ? 0:
1416                         fl.fl_end - fl.fl_start + 1;
1417                 flock->l_whence = SEEK_SET;
1418                 flock->l_type = fl.fl_type;
1419         }
1420
1421 out:
1422         return error;
1423 }
1424
1425 static int llu_fcntl_setlk(struct inode *ino, int cmd, struct flock *flock)
1426 {
1427         struct file_lock fl;
1428         int flags = llu_i2info(ino)->lli_open_flags + 1;
1429         int error;
1430
1431         error = flock_to_posix_lock(ino, &fl, flock);
1432         if (error)
1433                 goto out;
1434         if (cmd == F_SETLKW)
1435                 fl.fl_flags |= FL_SLEEP;
1436
1437         error = -EBADF;
1438         switch (flock->l_type) {
1439         case F_RDLCK:
1440                 if (!(flags & FMODE_READ))
1441                         goto out;
1442                 break;
1443         case F_WRLCK:
1444                 if (!(flags & FMODE_WRITE))
1445                         goto out;
1446                 break;
1447         case F_UNLCK:
1448                 break;
1449         default:
1450                 error = -EINVAL;
1451                 goto out;
1452         }
1453
1454         error = llu_file_flock(ino, cmd, &fl);
1455         if (error)
1456                 goto out;
1457
1458 out:
1459         return error;
1460 }
1461
1462 static int llu_iop_fcntl(struct inode *ino, int cmd, va_list ap, int *rtn)
1463 {
1464         struct llu_inode_info *lli = llu_i2info(ino);
1465         long flags;
1466         struct flock *flock;
1467         long err = 0;
1468
1469         liblustre_wait_event(0);
1470         switch (cmd) {
1471         case F_GETFL:
1472                 *rtn = lli->lli_open_flags;
1473                 break;
1474         case F_SETFL:
1475                 flags = va_arg(ap, long);
1476                 flags &= FCNTL_FLMASK;
1477                 if (flags & FCNTL_FLMASK_INVALID) {
1478                         CERROR("liblustre don't support O_NONBLOCK, O_ASYNC, "
1479                                "and O_DIRECT on file descriptor\n");
1480                         *rtn = -EINVAL;
1481                         err = EINVAL;
1482                         break;
1483                 }
1484                 lli->lli_open_flags = (int)(flags & FCNTL_FLMASK) |
1485                                       (lli->lli_open_flags & ~FCNTL_FLMASK);
1486                 *rtn = 0;
1487                 break;
1488         case F_GETLK:
1489 #ifdef F_GETLK64
1490 #if F_GETLK64 != F_GETLK
1491         case F_GETLK64:
1492 #endif
1493 #endif
1494                 flock = va_arg(ap, struct flock *);
1495                 err = llu_fcntl_getlk(ino, flock);
1496                 *rtn = err? -1: 0;
1497                 break;
1498         case F_SETLK:
1499 #ifdef F_SETLKW64
1500 #if F_SETLKW64 != F_SETLKW
1501         case F_SETLKW64:
1502 #endif
1503 #endif
1504         case F_SETLKW:
1505 #ifdef F_SETLK64
1506 #if F_SETLK64 != F_SETLK
1507         case F_SETLK64:
1508 #endif
1509 #endif
1510                 flock = va_arg(ap, struct flock *);
1511                 err = llu_fcntl_setlk(ino, cmd, flock);
1512                 *rtn = err? -1: 0;
1513                 break;
1514         default:
1515                 CERROR("unsupported fcntl cmd %x\n", cmd);
1516                 *rtn = -ENOSYS;
1517                 err = ENOSYS;
1518                 break;
1519         }
1520
1521         liblustre_wait_event(0);
1522         return err;
1523 }
1524
1525 static int llu_get_grouplock(struct inode *inode, unsigned long arg)
1526 {
1527         struct llu_inode_info *lli = llu_i2info(inode);
1528         struct ll_file_data *fd = lli->lli_file_data;
1529         ldlm_policy_data_t policy = { .l_extent = { .start = 0,
1530                                                     .end = OBD_OBJECT_EOF}};
1531         struct lustre_handle lockh = { 0 };
1532         struct lov_stripe_md *lsm = lli->lli_smd;
1533         ldlm_error_t err;
1534         int flags = 0;
1535         ENTRY;
1536
1537         if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1538                 RETURN(-EINVAL);
1539         }
1540
1541         policy.l_extent.gid = arg;
1542         if (lli->lli_open_flags & O_NONBLOCK)
1543                 flags = LDLM_FL_BLOCK_NOWAIT;
1544
1545         err = llu_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh,
1546                               flags);
1547         if (err)
1548                 RETURN(err);
1549
1550         fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
1551         fd->fd_gid = arg;
1552         memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
1553
1554         RETURN(0);
1555 }
1556
1557 static int llu_put_grouplock(struct inode *inode, unsigned long arg)
1558 {
1559         struct llu_inode_info *lli = llu_i2info(inode);
1560         struct ll_file_data *fd = lli->lli_file_data;
1561         struct lov_stripe_md *lsm = lli->lli_smd;
1562         ldlm_error_t err;
1563         ENTRY;
1564
1565         if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED))
1566                 RETURN(-EINVAL);
1567
1568         if (fd->fd_gid != arg)
1569                 RETURN(-EINVAL);
1570
1571         fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
1572
1573         err = llu_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
1574         if (err)
1575                 RETURN(err);
1576
1577         fd->fd_gid = 0;
1578         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
1579
1580         RETURN(0);
1581 }
1582
1583 static int llu_lov_dir_setstripe(struct inode *ino, unsigned long arg)
1584 {
1585         struct llu_sb_info *sbi = llu_i2sbi(ino); 
1586         struct ptlrpc_request *request = NULL;
1587         struct mdc_op_data op_data;
1588         struct iattr attr = { 0 };
1589         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1590         int rc = 0;
1591
1592         llu_prepare_mdc_op_data(&op_data, ino, NULL, NULL, 0, 0);
1593
1594         LASSERT(sizeof(lum) == sizeof(*lump));
1595         LASSERT(sizeof(lum.lmm_objects[0]) ==
1596                 sizeof(lump->lmm_objects[0]));
1597         rc = copy_from_user(&lum, lump, sizeof(lum));
1598         if (rc)
1599                 return(-EFAULT);
1600
1601         if (lum.lmm_magic != LOV_USER_MAGIC)
1602                 RETURN(-EINVAL);
1603
1604         if (lum.lmm_magic != cpu_to_le32(LOV_USER_MAGIC))
1605                 lustre_swab_lov_user_md(&lum);
1606
1607         /* swabbing is done in lov_setstripe() on server side */
1608         rc = mdc_setattr(sbi->ll_mdc_exp, &op_data,
1609                          &attr, &lum, sizeof(lum), NULL, 0, &request);
1610         if (rc) {
1611                 ptlrpc_req_finished(request);
1612                 if (rc != -EPERM && rc != -EACCES)
1613                         CERROR("mdc_setattr fails: rc = %d\n", rc);
1614                 return rc;
1615         }
1616         ptlrpc_req_finished(request);
1617
1618         return rc;
1619 }
1620
1621 static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
1622                                      struct lov_user_md *lum, int lum_size)
1623 {
1624         struct llu_sb_info *sbi = llu_i2sbi(ino); 
1625         struct obd_export *exp = llu_i2obdexp(ino);
1626         struct llu_inode_info *lli = llu_i2info(ino);
1627         struct llu_inode_info *lli2 = NULL;
1628         struct lov_stripe_md *lsm;
1629         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1630         struct ptlrpc_request *req = NULL;
1631         struct lustre_md md;
1632         struct mdc_op_data data;
1633         struct lustre_handle lockh;
1634         int rc = 0;
1635         ENTRY;
1636
1637         lsm = lli->lli_smd;
1638         if (lsm) {
1639                 CDEBUG(D_IOCTL, "stripe already exists for ino "LPU64"\n",
1640                        lli->lli_fid.id);
1641                 return -EEXIST;
1642         }
1643
1644         OBD_ALLOC(lli2, sizeof(struct llu_inode_info));
1645         if (!lli2)
1646                 return -ENOMEM;
1647         
1648         memcpy(lli2, lli, sizeof(struct llu_inode_info));
1649         lli2->lli_open_count = 0;
1650         lli2->lli_it = NULL;
1651         lli2->lli_file_data = NULL;
1652         lli2->lli_smd = NULL;
1653         lli2->lli_symlink_name = NULL;
1654         ino->i_private = lli2;
1655
1656         llu_prepare_mdc_op_data(&data, NULL, ino, NULL, 0, O_RDWR);
1657
1658         rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, &oit, LCK_CR, &data,
1659                          &lockh, lum, lum_size, ldlm_completion_ast,
1660                          llu_mdc_blocking_ast, NULL, LDLM_FL_INTENT_ONLY);
1661         if (rc)
1662                 GOTO(out, rc);
1663         
1664         req = oit.d.lustre.it_data;
1665         rc = it_open_error(DISP_IT_EXECD, &oit);
1666         if (rc) {
1667                 req->rq_replay = 0;
1668                 GOTO(out, rc);
1669         }
1670         
1671         rc = it_open_error(DISP_OPEN_OPEN, &oit);
1672         if (rc) {
1673                 req->rq_replay = 0;
1674                 GOTO(out, rc);
1675         }
1676         
1677         rc = mdc_req2lustre_md(req, DLM_REPLY_REC_OFF, exp, &md);
1678         if (rc)
1679                 GOTO(out, rc);
1680         
1681         llu_update_inode(ino, md.body, md.lsm);
1682         lli->lli_smd = lli2->lli_smd;
1683         lli2->lli_smd = NULL;
1684
1685         llu_local_open(lli2, &oit);
1686        
1687         /* release intent */
1688         if (lustre_handle_is_used(&lockh))
1689                 ldlm_lock_decref(&lockh, LCK_CR);
1690
1691         ptlrpc_req_finished(req);
1692         req = NULL;
1693         
1694         rc = llu_file_release(ino);
1695  out:
1696         ino->i_private = lli;
1697         if (lli2)
1698                 OBD_FREE(lli2, sizeof(struct llu_inode_info));
1699         if (req != NULL)
1700                 ptlrpc_req_finished(req);
1701         RETURN(rc);
1702 }
1703
1704 static int llu_lov_file_setstripe(struct inode *ino, unsigned long arg)
1705 {
1706         struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
1707         int rc;
1708         int flags = FMODE_WRITE;
1709         ENTRY;
1710
1711         LASSERT(sizeof(lum) == sizeof(*lump));
1712         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0]));
1713         rc = copy_from_user(&lum, lump, sizeof(lum));
1714         if (rc)
1715                 RETURN(-EFAULT);
1716
1717         rc = llu_lov_setstripe_ea_info(ino, flags, &lum, sizeof(lum));
1718         RETURN(rc);
1719 }
1720
1721 static int llu_lov_setstripe(struct inode *ino, unsigned long arg)
1722 {
1723         struct intnl_stat *st = llu_i2stat(ino);
1724         if (S_ISREG(st->st_mode))
1725                 return llu_lov_file_setstripe(ino, arg);
1726         if (S_ISDIR(st->st_mode))
1727                 return llu_lov_dir_setstripe(ino, arg);
1728         
1729         return -EINVAL; 
1730 }
1731
1732 static int llu_lov_getstripe(struct inode *ino, unsigned long arg)
1733 {
1734         struct lov_stripe_md *lsm = llu_i2info(ino)->lli_smd;
1735
1736         if (!lsm)
1737                 RETURN(-ENODATA);
1738
1739         return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, llu_i2obdexp(ino), 0, lsm,
1740                             (void *)arg);
1741 }
1742
1743 static int llu_iop_ioctl(struct inode *ino, unsigned long int request,
1744                          va_list ap)
1745 {
1746         unsigned long arg;
1747         int rc;
1748
1749         liblustre_wait_event(0);
1750
1751         switch (request) {
1752         case LL_IOC_GROUP_LOCK:
1753                 arg = va_arg(ap, unsigned long);
1754                 rc = llu_get_grouplock(ino, arg);
1755                 break;
1756         case LL_IOC_GROUP_UNLOCK:
1757                 arg = va_arg(ap, unsigned long);
1758                 rc = llu_put_grouplock(ino, arg);
1759                 break;
1760         case LL_IOC_LOV_SETSTRIPE:
1761                 arg = va_arg(ap, unsigned long);
1762                 rc = llu_lov_setstripe(ino, arg);
1763                 break;
1764         case LL_IOC_LOV_GETSTRIPE:
1765                 arg = va_arg(ap, unsigned long);
1766                 rc = llu_lov_getstripe(ino, arg);
1767                 break;
1768         default:
1769                 CERROR("did not support ioctl cmd %lx\n", request);
1770                 rc = -ENOSYS;
1771                 break;
1772         }
1773
1774         liblustre_wait_event(0);
1775         return rc;
1776 }
1777
1778 /*
1779  * we already do syncronous read/write
1780  */
1781 static int llu_iop_sync(struct inode *inode)
1782 {
1783         liblustre_wait_event(0);
1784         return 0;
1785 }
1786
1787 static int llu_iop_datasync(struct inode *inode)
1788 {
1789         liblustre_wait_event(0);
1790         return 0;
1791 }
1792
1793 struct filesys_ops llu_filesys_ops =
1794 {
1795         fsop_gone: llu_fsop_gone,
1796 };
1797
1798 struct inode *llu_iget(struct filesys *fs, struct lustre_md *md)
1799 {
1800         struct inode *inode;
1801         struct ll_fid fid;
1802         struct file_identifier fileid = {&fid, sizeof(fid)};
1803
1804         if ((md->body->valid &
1805              (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) !=
1806             (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) {
1807                 CERROR("bad md body valid mask "LPX64"\n", md->body->valid);
1808                 LBUG();
1809                 return ERR_PTR(-EPERM);
1810         }
1811
1812         /* try to find existing inode */
1813         fid.id = md->body->ino;
1814         fid.generation = md->body->generation;
1815         fid.f_type = md->body->mode & S_IFMT;
1816
1817         inode = _sysio_i_find(fs, &fileid);
1818         if (inode) {
1819                 struct llu_inode_info *lli = llu_i2info(inode);
1820
1821                 if (inode->i_zombie ||
1822                     lli->lli_st_generation != md->body->generation) {
1823                         I_RELE(inode);
1824                 }
1825                 else {
1826                         llu_update_inode(inode, md->body, md->lsm);
1827                         return inode;
1828                 }
1829         }
1830
1831         inode = llu_new_inode(fs, &fid);
1832         if (inode)
1833                 llu_update_inode(inode, md->body, md->lsm);
1834
1835         return inode;
1836 }
1837
1838 extern struct list_head lustre_profile_list;
1839
1840 static int
1841 llu_fsswop_mount(const char *source,
1842                  unsigned flags,
1843                  const void *data __IS_UNUSED,
1844                  struct pnode *tocover,
1845                  struct mount **mntp)
1846 {
1847         struct filesys *fs;
1848         struct inode *root;
1849         struct pnode_base *rootpb;
1850         struct obd_device *obd;
1851         struct ll_fid rootfid;
1852         struct llu_sb_info *sbi;
1853         struct obd_statfs osfs;
1854         static struct qstr noname = { NULL, 0, 0 };
1855         struct ptlrpc_request *request = NULL;
1856         struct lustre_handle mdc_conn = {0, };
1857         struct lustre_handle osc_conn = {0, };
1858         struct lustre_md md;
1859         class_uuid_t uuid;
1860         struct config_llog_instance cfg;
1861         char ll_instance[sizeof(sbi) * 2 + 1];
1862         struct lustre_profile *lprof;
1863         char *zconf_mgsnid, *zconf_profile;
1864         char *osc = NULL, *mdc = NULL;
1865         int async = 1, err = -EINVAL;
1866         struct obd_connect_data ocd = {0,};
1867
1868         ENTRY;
1869
1870         if (ll_parse_mount_target(source,
1871                                   &zconf_mgsnid,
1872                                   &zconf_profile)) {
1873                 CERROR("mal-formed target %s\n", source);
1874                 RETURN(err);
1875         }
1876         if (!zconf_mgsnid || !zconf_profile) {
1877                 printf("Liblustre: invalid target %s\n", source);
1878                 RETURN(err);
1879         }
1880         /* allocate & initialize sbi */
1881         OBD_ALLOC(sbi, sizeof(*sbi));
1882         if (!sbi)
1883                 RETURN(-ENOMEM);
1884
1885         INIT_LIST_HEAD(&sbi->ll_conn_chain);
1886         generate_random_uuid(uuid);
1887         class_uuid_unparse(uuid, &sbi->ll_sb_uuid);
1888
1889         /* generate a string unique to this super, let's try
1890          the address of the super itself.*/
1891         sprintf(ll_instance, "%p", sbi);
1892
1893         /* retrive & parse config log */
1894         cfg.cfg_instance = ll_instance;
1895         cfg.cfg_uuid = sbi->ll_sb_uuid;
1896         cfg.cfg_last_idx = 0;
1897         err = liblustre_process_log(&cfg, zconf_mgsnid, zconf_profile, 1);
1898         if (err < 0) {
1899                 CERROR("Unable to process log: %s\n", zconf_profile);
1900                 GOTO(out_free, err);
1901         }
1902
1903         lprof = class_get_profile(zconf_profile);
1904         if (lprof == NULL) {
1905                 CERROR("No profile found: %s\n", zconf_profile);
1906                 GOTO(out_free, err = -EINVAL);
1907         }
1908         OBD_ALLOC(osc, strlen(lprof->lp_osc) + strlen(ll_instance) + 2);
1909         sprintf(osc, "%s-%s", lprof->lp_osc, ll_instance);
1910
1911         OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + strlen(ll_instance) + 2);
1912         sprintf(mdc, "%s-%s", lprof->lp_mdc, ll_instance);
1913
1914         if (!osc) {
1915                 CERROR("no osc\n");
1916                 GOTO(out_free, err = -EINVAL);
1917         }
1918         if (!mdc) {
1919                 CERROR("no mdc\n");
1920                 GOTO(out_free, err = -EINVAL);
1921         }
1922
1923         fs = _sysio_fs_new(&llu_filesys_ops, flags, sbi);
1924         if (!fs) {
1925                 err = -ENOMEM;
1926                 goto out_free;
1927         }
1928
1929         obd = class_name2obd(mdc);
1930         if (!obd) {
1931                 CERROR("MDC %s: not setup or attached\n", mdc);
1932                 GOTO(out_free, err = -EINVAL);
1933         }
1934         obd_set_info_async(obd->obd_self_export, strlen("async"), "async",
1935                            sizeof(async), &async, NULL);
1936
1937         ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION;
1938         ocd.ocd_ibits_known = MDS_INODELOCK_FULL;
1939         ocd.ocd_version = LUSTRE_VERSION_CODE;
1940
1941         /* setup mdc */
1942         err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid, &ocd);
1943         if (err) {
1944                 CERROR("cannot connect to %s: rc = %d\n", mdc, err);
1945                 GOTO(out_free, err);
1946         }
1947         sbi->ll_mdc_exp = class_conn2export(&mdc_conn);
1948
1949         err = obd_statfs(obd, &osfs, 100000000);
1950         if (err)
1951                 GOTO(out_mdc, err);
1952
1953         /*
1954          * FIXME fill fs stat data into sbi here!!! FIXME
1955          */
1956
1957         /* setup osc */
1958         obd = class_name2obd(osc);
1959         if (!obd) {
1960                 CERROR("OSC %s: not setup or attached\n", osc);
1961                 GOTO(out_mdc, err = -EINVAL);
1962         }
1963         obd_set_info_async(obd->obd_self_export, strlen("async"), "async",
1964                            sizeof(async), &async, NULL);
1965
1966         obd->obd_upcall.onu_owner = &sbi->ll_lco;
1967         obd->obd_upcall.onu_upcall = ll_ocd_update;
1968
1969         ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL |
1970                                 OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK;
1971         ocd.ocd_version = LUSTRE_VERSION_CODE;
1972         err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd);
1973         if (err) {
1974                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
1975                 GOTO(out_mdc, err);
1976         }
1977         sbi->ll_osc_exp = class_conn2export(&osc_conn);
1978         sbi->ll_lco.lco_flags = ocd.ocd_connect_flags;
1979
1980         mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
1981
1982         err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
1983         if (err) {
1984                 CERROR("cannot mds_connect: rc = %d\n", err);
1985                 GOTO(out_osc, err);
1986         }
1987         CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id);
1988         sbi->ll_rootino = rootfid.id;
1989
1990         /* fetch attr of root inode */
1991         err = mdc_getattr(sbi->ll_mdc_exp, &rootfid,
1992                           OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS, 0, &request);
1993         if (err) {
1994                 CERROR("mdc_getattr failed for root: rc = %d\n", err);
1995                 GOTO(out_osc, err);
1996         }
1997
1998         err = mdc_req2lustre_md(request, REPLY_REC_OFF, sbi->ll_osc_exp, &md);
1999         if (err) {
2000                 CERROR("failed to understand root inode md: rc = %d\n",err);
2001                 GOTO(out_request, err);
2002         }
2003
2004         LASSERT(sbi->ll_rootino != 0);
2005
2006         root = llu_iget(fs, &md);
2007         if (!root || IS_ERR(root)) {
2008                 CERROR("fail to generate root inode\n");
2009                 GOTO(out_request, err = -EBADF);
2010         }
2011
2012         /*
2013          * Generate base path-node for root.
2014          */
2015         rootpb = _sysio_pb_new(&noname, NULL, root);
2016         if (!rootpb) {
2017                 err = -ENOMEM;
2018                 goto out_inode;
2019         }
2020
2021         err = _sysio_do_mount(fs, rootpb, flags, tocover, mntp);
2022         if (err) {
2023                 _sysio_pb_gone(rootpb);
2024                 goto out_inode;
2025         }
2026
2027         ptlrpc_req_finished(request);
2028
2029         CDEBUG(D_SUPER, "LibLustre: %s mounted successfully!\n", source);
2030         liblustre_wait_event(0);
2031
2032         return 0;
2033
2034 out_inode:
2035         _sysio_i_gone(root);
2036 out_request:
2037         ptlrpc_req_finished(request);
2038 out_osc:
2039         obd_disconnect(sbi->ll_osc_exp);
2040 out_mdc:
2041         obd_disconnect(sbi->ll_mdc_exp);
2042 out_free:
2043         if (osc)
2044                 OBD_FREE(osc, strlen(osc) + 1);
2045         if (mdc)
2046                 OBD_FREE(mdc, strlen(mdc) + 1);
2047         OBD_FREE(sbi, sizeof(*sbi));
2048         return err;
2049 }
2050
2051 struct fssw_ops llu_fssw_ops = {
2052         llu_fsswop_mount
2053 };
2054
2055 static struct inode_ops llu_inode_ops = {
2056         inop_lookup:    llu_iop_lookup,
2057         inop_getattr:   llu_iop_getattr,
2058         inop_setattr:   llu_iop_setattr,
2059         inop_filldirentries:     llu_iop_filldirentries,
2060         inop_mkdir:     llu_iop_mkdir_raw,
2061         inop_rmdir:     llu_iop_rmdir_raw,
2062         inop_symlink:   llu_iop_symlink_raw,
2063         inop_readlink:  llu_iop_readlink,
2064         inop_open:      llu_iop_open,
2065         inop_close:     llu_iop_close,
2066         inop_link:      llu_iop_link_raw,
2067         inop_unlink:    llu_iop_unlink_raw,
2068         inop_rename:    llu_iop_rename_raw,
2069         inop_pos:       llu_iop_pos,
2070         inop_read:      llu_iop_read,
2071         inop_write:     llu_iop_write,
2072         inop_iodone:    llu_iop_iodone,
2073         inop_fcntl:     llu_iop_fcntl,
2074         inop_sync:      llu_iop_sync,
2075         inop_datasync:  llu_iop_datasync,
2076         inop_ioctl:     llu_iop_ioctl,
2077         inop_mknod:     llu_iop_mknod_raw,
2078 #ifdef _HAVE_STATVFS
2079         inop_statvfs:   llu_iop_statvfs,
2080 #endif
2081         inop_gone:      llu_iop_gone,
2082 };