X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fliblustre%2Fsuper.c;h=ad759d7273485d55f17efae98a91cb8c6feab1bd;hp=d3b6adf70afc20e5a7c4c1aa50887b1f384799b2;hb=a25adb47c7b7eeb68a922e2647d74eeff3401c6a;hpb=a1d11a561dea2ae38275eedc64d09e1fe1730d6b diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index d3b6adf..ad759d7 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -3,7 +3,7 @@ * * Lustre Light Super operations * - * Copyright (c) 2002, 2003 Cluster File Systems, Inc. + * Copyright (c) 2002-2004 Cluster File Systems, Inc. * * This file is part of Lustre, http://www.lustre.org. * @@ -29,7 +29,7 @@ #include #include #include -#include +#include #include #ifndef __CYGWIN__ # include @@ -38,47 +38,100 @@ #endif #include +#ifdef HAVE_XTIO_H +#include +#endif #include #include #include +#ifdef HAVE_FILE_H #include +#endif #undef LIST_HEAD #include "llite_lib.h" +#ifndef MAY_EXEC +#define MAY_EXEC 1 +#define MAY_WRITE 2 +#define MAY_READ 4 +#endif + +#define S_IXUGO (S_IXUSR|S_IXGRP|S_IXOTH) + +static int ll_permission(struct inode *inode, int mask) +{ + struct intnl_stat *st = llu_i2stat(inode); + mode_t mode = st->st_mode; + + if (current->fsuid == st->st_uid) + mode >>= 6; + else if (in_group_p(st->st_gid)) + mode >>= 3; + + if ((mode & mask & (MAY_READ|MAY_WRITE|MAY_EXEC)) == mask) + return 0; + + if ((mask & (MAY_READ|MAY_WRITE)) || + (st->st_mode & S_IXUGO)) + if (capable(CAP_DAC_OVERRIDE)) + return 0; + + if (mask == MAY_READ || + (S_ISDIR(st->st_mode) && !(mask & MAY_WRITE))) { + if (capable(CAP_DAC_READ_SEARCH)) + return 0; + } + + return -EACCES; +} + static void llu_fsop_gone(struct filesys *fs) { struct llu_sb_info *sbi = (struct llu_sb_info *) fs->fs_private; - struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp); - struct ll_fid rootfid; + struct obd_device *obd = class_exp2obd(sbi->ll_md_exp); + int next = 0; ENTRY; list_del(&sbi->ll_conn_chain); - obd_disconnect(sbi->ll_osc_exp, 0); - - /* NULL request to force sync on the MDS, and get the last_committed - * value to flush remaining RPCs from the sending queue on client. - * - * XXX This should be an mdc_sync() call to sync the whole MDS fs, - * which we can call for other reasons as well. - */ - if (!obd->obd_no_recov) - mdc_getstatus(sbi->ll_mdc_exp, &rootfid); + obd_disconnect(sbi->ll_dt_exp); + obd_disconnect(sbi->ll_md_exp); - obd_disconnect(sbi->ll_mdc_exp, 0); + while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL) + class_manual_cleanup(obd); OBD_FREE(sbi, sizeof(*sbi)); + liblustre_wait_idle(); EXIT; } static struct inode_ops llu_inode_ops; -void llu_update_inode(struct inode *inode, struct mds_body *body, +static ldlm_mode_t llu_take_md_lock(struct inode *inode, __u64 bits, + struct lustre_handle *lockh) +{ + ldlm_policy_data_t policy = { .l_inodebits = {bits}}; + struct lu_fid *fid; + ldlm_mode_t rc; + int flags; + ENTRY; + + fid = &llu_i2info(inode)->lli_fid; + CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); + + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; + rc = md_lock_match(llu_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh); + RETURN(rc); +} + +void llu_update_inode(struct inode *inode, struct mdt_body *body, struct lov_stripe_md *lsm) { struct llu_inode_info *lli = llu_i2info(inode); + struct intnl_stat *st = llu_i2stat(inode); LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); if (lsm != NULL) { @@ -88,88 +141,110 @@ void llu_update_inode(struct inode *inode, struct mds_body *body, if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; } else { - if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) { - CERROR("lsm mismatch for inode %ld\n", - lli->lli_st_ino); + if (lov_stripe_md_cmp(lli->lli_smd, lsm)) { + CERROR("lsm mismatch for inode %lld\n", + (long long)st->st_ino); LBUG(); } } } - if (body->valid & OBD_MD_FLID) - lli->lli_st_ino = body->ino; - if (body->valid & OBD_MD_FLATIME) - LTIME_S(lli->lli_st_atime) = body->atime; - if (body->valid & OBD_MD_FLMTIME) - LTIME_S(lli->lli_st_mtime) = body->mtime; - if (body->valid & OBD_MD_FLCTIME) - LTIME_S(lli->lli_st_ctime) = body->ctime; + if (body->valid & OBD_MD_FLMTIME && + body->mtime > LTIME_S(st->st_mtime)) + LTIME_S(st->st_mtime) = body->mtime; + if (body->valid & OBD_MD_FLATIME && + body->atime > LTIME_S(st->st_atime)) + LTIME_S(st->st_atime) = body->atime; + + /* mtime is always updated with ctime, but can be set in past. + As write and utime(2) may happen within 1 second, and utime's + mtime has a priority over write's one, so take mtime from mds + for the same ctimes. */ + if (body->valid & OBD_MD_FLCTIME && + body->ctime >= LTIME_S(st->st_ctime)) { + LTIME_S(st->st_ctime) = body->ctime; + if (body->valid & OBD_MD_FLMTIME) + LTIME_S(st->st_mtime) = body->mtime; + } if (body->valid & OBD_MD_FLMODE) - lli->lli_st_mode = (lli->lli_st_mode & S_IFMT)|(body->mode & ~S_IFMT); + st->st_mode = (st->st_mode & S_IFMT)|(body->mode & ~S_IFMT); if (body->valid & OBD_MD_FLTYPE) - lli->lli_st_mode = (lli->lli_st_mode & ~S_IFMT)|(body->mode & S_IFMT); + st->st_mode = (st->st_mode & ~S_IFMT)|(body->mode & S_IFMT); + if (S_ISREG(st->st_mode)) + st->st_blksize = min(2UL * PTLRPC_MAX_BRW_SIZE, LL_MAX_BLKSIZE); + else + st->st_blksize = 4096; if (body->valid & OBD_MD_FLUID) - lli->lli_st_uid = body->uid; + st->st_uid = body->uid; if (body->valid & OBD_MD_FLGID) - lli->lli_st_gid = body->gid; - if (body->valid & OBD_MD_FLFLAGS) - lli->lli_st_flags = body->flags; + st->st_gid = body->gid; if (body->valid & OBD_MD_FLNLINK) - lli->lli_st_nlink = body->nlink; - if (body->valid & OBD_MD_FLGENER) - lli->lli_st_generation = body->generation; + st->st_nlink = body->nlink; if (body->valid & OBD_MD_FLRDEV) - lli->lli_st_rdev = body->rdev; - if (body->valid & OBD_MD_FLSIZE) - lli->lli_st_size = body->size; - if (body->valid & OBD_MD_FLBLOCKS) - lli->lli_st_blocks = body->blocks; - - /* fillin fid */ - if (body->valid & OBD_MD_FLID) - lli->lli_fid.id = body->ino; - if (body->valid & OBD_MD_FLGENER) - lli->lli_fid.generation = body->generation; - if (body->valid & OBD_MD_FLTYPE) - lli->lli_fid.f_type = body->mode & S_IFMT; + st->st_rdev = body->rdev; + if (body->valid & OBD_MD_FLFLAGS) + lli->lli_st_flags = body->flags; + if (body->valid & OBD_MD_FLSIZE) { + if ((llu_i2sbi(inode)->ll_lco.lco_flags & OBD_CONNECT_SOM) && + S_ISREG(st->st_mode) && lli->lli_smd) { + struct lustre_handle lockh; + ldlm_mode_t mode; + + /* As it is possible a blocking ast has been processed + * by this time, we need to check there is an UPDATE + * lock on the client and set LLIF_MDS_SIZE_LOCK holding + * it. */ + mode = llu_take_md_lock(inode, MDS_INODELOCK_UPDATE, + &lockh); + if (mode) { + st->st_size = body->size; + lli->lli_flags |= LLIF_MDS_SIZE_LOCK; + ldlm_lock_decref(&lockh, mode); + } + } else { + st->st_size = body->size; + } + + if (body->valid & OBD_MD_FLBLOCKS) + st->st_blocks = body->blocks; + } } void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) { struct llu_inode_info *lli = llu_i2info(dst); + struct intnl_stat *st = llu_i2stat(dst); valid &= src->o_valid; if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) - CDEBUG(D_INODE, "valid %x, cur time %lu/%lu, new %lu/%lu\n", - src->o_valid, - LTIME_S(lli->lli_st_mtime), LTIME_S(lli->lli_st_ctime), + CDEBUG(D_INODE,"valid "LPX64", cur time %lu/%lu, new %lu/%lu\n", + src->o_valid, + LTIME_S(st->st_mtime), LTIME_S(st->st_ctime), (long)src->o_mtime, (long)src->o_ctime); if (valid & OBD_MD_FLATIME) - LTIME_S(lli->lli_st_atime) = src->o_atime; + LTIME_S(st->st_atime) = src->o_atime; if (valid & OBD_MD_FLMTIME) - LTIME_S(lli->lli_st_mtime) = src->o_mtime; - if (valid & OBD_MD_FLCTIME && src->o_ctime > LTIME_S(lli->lli_st_ctime)) - LTIME_S(lli->lli_st_ctime) = src->o_ctime; + LTIME_S(st->st_mtime) = src->o_mtime; + if (valid & OBD_MD_FLCTIME && src->o_ctime > LTIME_S(st->st_ctime)) + LTIME_S(st->st_ctime) = src->o_ctime; if (valid & OBD_MD_FLSIZE) - lli->lli_st_size = src->o_size; + st->st_size = src->o_size; if (valid & OBD_MD_FLBLOCKS) /* allocation of space */ - lli->lli_st_blocks = src->o_blocks; + st->st_blocks = src->o_blocks; if (valid & OBD_MD_FLBLKSZ) - lli->lli_st_blksize = src->o_blksize; + st->st_blksize = src->o_blksize; if (valid & OBD_MD_FLTYPE) - lli->lli_st_mode = (lli->lli_st_mode & ~S_IFMT) | (src->o_mode & S_IFMT); + st->st_mode = (st->st_mode & ~S_IFMT) | (src->o_mode & S_IFMT); if (valid & OBD_MD_FLMODE) - lli->lli_st_mode = (lli->lli_st_mode & S_IFMT) | (src->o_mode & ~S_IFMT); + st->st_mode = (st->st_mode & S_IFMT) | (src->o_mode & ~S_IFMT); if (valid & OBD_MD_FLUID) - lli->lli_st_uid = src->o_uid; + st->st_uid = src->o_uid; if (valid & OBD_MD_FLGID) - lli->lli_st_gid = src->o_gid; + st->st_gid = src->o_gid; if (valid & OBD_MD_FLFLAGS) lli->lli_st_flags = src->o_flags; - if (valid & OBD_MD_FLGENER) - lli->lli_st_generation = src->o_generation; } #define S_IRWXUGO (S_IRWXU|S_IRWXG|S_IRWXO) @@ -178,51 +253,52 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) { struct llu_inode_info *lli = llu_i2info(src); + struct intnl_stat *st = llu_i2stat(src); obd_flag newvalid = 0; if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) CDEBUG(D_INODE, "valid %x, new time %lu/%lu\n", - valid, LTIME_S(lli->lli_st_mtime), - LTIME_S(lli->lli_st_ctime)); + valid, LTIME_S(st->st_mtime), + LTIME_S(st->st_ctime)); if (valid & OBD_MD_FLATIME) { - dst->o_atime = LTIME_S(lli->lli_st_atime); + dst->o_atime = LTIME_S(st->st_atime); newvalid |= OBD_MD_FLATIME; } if (valid & OBD_MD_FLMTIME) { - dst->o_mtime = LTIME_S(lli->lli_st_mtime); + dst->o_mtime = LTIME_S(st->st_mtime); newvalid |= OBD_MD_FLMTIME; } if (valid & OBD_MD_FLCTIME) { - dst->o_ctime = LTIME_S(lli->lli_st_ctime); + dst->o_ctime = LTIME_S(st->st_ctime); newvalid |= OBD_MD_FLCTIME; } if (valid & OBD_MD_FLSIZE) { - dst->o_size = lli->lli_st_size; + dst->o_size = st->st_size; newvalid |= OBD_MD_FLSIZE; } if (valid & OBD_MD_FLBLOCKS) { /* allocation of space (x512 bytes) */ - dst->o_blocks = lli->lli_st_blocks; + dst->o_blocks = st->st_blocks; newvalid |= OBD_MD_FLBLOCKS; } if (valid & OBD_MD_FLBLKSZ) { /* optimal block size */ - dst->o_blksize = lli->lli_st_blksize; + dst->o_blksize = st->st_blksize; newvalid |= OBD_MD_FLBLKSZ; } if (valid & OBD_MD_FLTYPE) { - dst->o_mode = (dst->o_mode & S_IALLUGO)|(lli->lli_st_mode & S_IFMT); + dst->o_mode = (dst->o_mode & S_IALLUGO)|(st->st_mode & S_IFMT); newvalid |= OBD_MD_FLTYPE; } if (valid & OBD_MD_FLMODE) { - dst->o_mode = (dst->o_mode & S_IFMT)|(lli->lli_st_mode & S_IALLUGO); + dst->o_mode = (dst->o_mode & S_IFMT)|(st->st_mode & S_IALLUGO); newvalid |= OBD_MD_FLMODE; } if (valid & OBD_MD_FLUID) { - dst->o_uid = lli->lli_st_uid; + dst->o_uid = st->st_uid; newvalid |= OBD_MD_FLUID; } if (valid & OBD_MD_FLGID) { - dst->o_gid = lli->lli_st_gid; + dst->o_gid = st->st_gid; newvalid |= OBD_MD_FLGID; } if (valid & OBD_MD_FLFLAGS) { @@ -233,6 +309,10 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) dst->o_generation = lli->lli_st_generation; newvalid |= OBD_MD_FLGENER; } + if (valid & OBD_MD_FLFID) { + dst->o_fid = st->st_ino; + newvalid |= OBD_MD_FLFID; + } dst->o_valid |= newvalid; } @@ -240,32 +320,33 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) /* * really does the getattr on the inode and updates its fields */ -int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm) +int llu_inode_getattr(struct inode *inode, struct obdo *obdo) { struct llu_inode_info *lli = llu_i2info(inode); - struct obd_export *exp = llu_i2obdexp(inode); struct ptlrpc_request_set *set; - struct obdo oa; - obd_flag refresh_valid; + struct lov_stripe_md *lsm = lli->lli_smd; + struct obd_info oinfo = { { { 0 } } }; int rc; ENTRY; LASSERT(lsm); - LASSERT(lli); - memset(&oa, 0, sizeof oa); - oa.o_id = lsm->lsm_object_id; - oa.o_mode = S_IFREG; - oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | - OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME; + oinfo.oi_md = lsm; + oinfo.oi_oa = obdo; + oinfo.oi_oa->o_id = lsm->lsm_object_id; + oinfo.oi_oa->o_gr = lsm->lsm_object_gr; + oinfo.oi_oa->o_mode = S_IFREG; + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | + OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | + OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | + OBD_MD_FLCTIME; set = ptlrpc_prep_set(); if (set == NULL) { CERROR ("ENOMEM allocing request set\n"); rc = -ENOMEM; } else { - rc = obd_getattr_async(exp, &oa, lsm, set); + rc = obd_getattr_async(llu_i2obdexp(inode), &oinfo, set); if (rc == 0) rc = ptlrpc_set_wait(set); ptlrpc_set_destroy(set); @@ -273,36 +354,39 @@ int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm) if (rc) RETURN(rc); - refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME | OBD_MD_FLSIZE; - - /* We set this flag in commit write as we extend the file size. When - * the bit is set and the lock is canceled that covers the file size, - * we clear the bit. This is enough to protect the window where our - * local size extension is needed for writeback. However, it relies on - * behaviour that won't be true in the near future. This assumes that - * all getattr callers get extent locks, which they currnetly do. It - * also assumes that we only send discarding asts for {0,eof} truncates - * as is currently the case. This will have to be replaced by the - * proper eoc communication between clients and the ost, which is on - * its way. */ - if (test_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags)) { - if (oa.o_size < lli->lli_st_size) - refresh_valid &= ~OBD_MD_FLSIZE; - else - clear_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags); - } - - obdo_refresh_inode(inode, &oa, refresh_valid); + oinfo.oi_oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | + OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLSIZE; + obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid); + CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %Lu, " + "blksize %Lu\n", lli->lli_smd->lsm_object_id, + (long long unsigned)llu_i2stat(inode)->st_size, + (long long unsigned)llu_i2stat(inode)->st_blocks, + (long long unsigned)llu_i2stat(inode)->st_blksize); RETURN(0); } static struct inode* llu_new_inode(struct filesys *fs, - struct ll_fid *fid) + struct lu_fid *fid) { - struct inode *inode; + struct inode *inode; struct llu_inode_info *lli; + struct intnl_stat st = { + .st_dev = 0, +#if 0 +#ifndef AUTOMOUNT_FILE_NAME + .st_mode = fid->f_type & S_IFMT, +#else + .st_mode = fid->f_type /* all of the bits! */ +#endif +#endif + /* FIXME: fix this later */ + .st_mode = 0, + + .st_uid = geteuid(), + .st_gid = getegid(), + }; OBD_ALLOC(lli, sizeof(*lli)); if (!lli) @@ -318,53 +402,34 @@ static struct inode* llu_new_inode(struct filesys *fs, lli->lli_sysio_fid.fid_data = &lli->lli_fid; lli->lli_sysio_fid.fid_len = sizeof(lli->lli_fid); - - memcpy(&lli->lli_fid, fid, sizeof(*fid)); + lli->lli_fid = *fid; /* file identifier is needed by functions like _sysio_i_find() */ - inode = _sysio_i_new(fs, &lli->lli_sysio_fid, -#ifndef AUTOMOUNT_FILE_NAME - fid->f_type & S_IFMT, -#else - fid->f_type, /* all of the bits! */ -#endif - 0, 0, - &llu_inode_ops, lli); + inode = _sysio_i_new(fs, &lli->lli_sysio_fid, + &st, 0, &llu_inode_ops, lli); - if (!inode) - OBD_FREE(lli, sizeof(*lli)); + if (!inode) + OBD_FREE(lli, sizeof(*lli)); return inode; } -static int llu_have_md_lock(struct inode *inode) +static int llu_have_md_lock(struct inode *inode, __u64 lockpart) { - struct llu_sb_info *sbi = llu_i2sbi(inode); - struct llu_inode_info *lli = llu_i2info(inode); struct lustre_handle lockh; - struct ldlm_res_id res_id = { .name = {0} }; - struct obd_device *obddev; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; + struct lu_fid *fid; int flags; ENTRY; LASSERT(inode); - obddev = sbi->ll_mdc_exp->exp_obd; - res_id.name[0] = lli->lli_st_ino; - res_id.name[1] = lli->lli_st_generation; - - CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]); + fid = &llu_i2info(inode)->lli_fid; + CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); - flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PR, &lockh)) { - ldlm_lock_decref(&lockh, LCK_PR); - RETURN(1); - } - - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PW, &lockh)) { - ldlm_lock_decref(&lockh, LCK_PW); + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; + if (md_lock_match(llu_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) { RETURN(1); } RETURN(0); @@ -372,7 +437,6 @@ static int llu_have_md_lock(struct inode *inode) static int llu_inode_revalidate(struct inode *inode) { - struct llu_inode_info *lli = llu_i2info(inode); struct lov_stripe_md *lsm = NULL; ENTRY; @@ -381,27 +445,28 @@ static int llu_inode_revalidate(struct inode *inode) RETURN(0); } - if (!llu_have_md_lock(inode)) { + if (!llu_have_md_lock(inode, MDS_INODELOCK_UPDATE)) { struct lustre_md md; struct ptlrpc_request *req = NULL; struct llu_sb_info *sbi = llu_i2sbi(inode); - struct ll_fid fid; - unsigned long valid = 0; + unsigned long valid = OBD_MD_FLGETATTR; int rc, ealen = 0; /* Why don't we update all valid MDS fields here, if we're * doing an RPC anyways? -phil */ - if (S_ISREG(lli->lli_st_mode)) { - ealen = obd_size_diskmd(sbi->ll_osc_exp, NULL); + if (S_ISREG(llu_i2stat(inode)->st_mode)) { + ealen = obd_size_diskmd(sbi->ll_dt_exp, NULL); valid |= OBD_MD_FLEASIZE; } - ll_inode2fid(&fid, inode); - rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req); + rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), + NULL, valid, ealen, &req); if (rc) { - CERROR("failure %d inode %lu\n", rc, lli->lli_st_ino); + CERROR("failure %d inode %llu\n", rc, + (long long)llu_i2stat(inode)->st_ino); RETURN(-abs(rc)); } - rc = mdc_req2lustre_md(req, 0, sbi->ll_osc_exp, &md); + rc = md_get_lustre_md(sbi->ll_md_exp, req, REPLY_REC_OFF, + sbi->ll_dt_exp, sbi->ll_md_exp, &md); /* XXX Too paranoid? */ if (((md.body->valid ^ valid) & OBD_MD_FLEASIZE) && @@ -420,11 +485,7 @@ static int llu_inode_revalidate(struct inode *inode) llu_update_inode(inode, md.body, md.lsm); if (md.lsm != NULL && llu_i2info(inode)->lli_smd != md.lsm) - obd_free_memmd(sbi->ll_osc_exp, &md.lsm); - - if (md.body->valid & OBD_MD_FLSIZE) - set_bit(LLI_F_HAVE_MDS_SIZE_LOCK, - &llu_i2info(inode)->lli_flags); + obd_free_memmd(sbi->ll_dt_exp, &md.lsm); ptlrpc_req_finished(req); } @@ -434,33 +495,12 @@ static int llu_inode_revalidate(struct inode *inode) /* ll_glimpse_size will prefer locally cached writes if they extend * the file */ - { - struct ost_lvb lvb; - ldlm_error_t err; - - err = llu_glimpse_size(inode, &lvb); - lli->lli_st_size = lvb.lvb_size; - } - RETURN(0); + RETURN(llu_glimpse_size(inode)); } static void copy_stat_buf(struct inode *ino, struct intnl_stat *b) { - struct llu_inode_info *lli = llu_i2info(ino); - - b->st_dev = lli->lli_st_dev; - b->st_ino = lli->lli_st_ino; - b->st_mode = lli->lli_st_mode; - b->st_nlink = lli->lli_st_nlink; - b->st_uid = lli->lli_st_uid; - b->st_gid = lli->lli_st_gid; - b->st_rdev = lli->lli_st_rdev; - b->st_size = lli->lli_st_size; - b->st_blksize = lli->lli_st_blksize; - b->st_blocks = lli->lli_st_blocks; - b->st_atime = lli->lli_st_atime; - b->st_mtime = lli->lli_st_mtime; - b->st_ctime = lli->lli_st_ctime; + *b = *llu_i2stat(ino); } static int llu_iop_getattr(struct pnode *pno, @@ -470,6 +510,8 @@ static int llu_iop_getattr(struct pnode *pno, int rc; ENTRY; + liblustre_wait_event(0); + if (!ino) { LASSERT(pno); LASSERT(pno->p_base->pb_ino); @@ -484,50 +526,45 @@ static int llu_iop_getattr(struct pnode *pno, rc = llu_inode_revalidate(ino); if (!rc) { copy_stat_buf(ino, b); - - if (llu_i2info(ino)->lli_it) { - struct lookup_intent *it; - - LL_GET_INTENT(ino, it); - it->it_op_release(it); - OBD_FREE(it, sizeof(*it)); - } + LASSERT(!llu_i2info(ino)->lli_it); } + liblustre_wait_event(0); RETURN(rc); } static int null_if_equal(struct ldlm_lock *lock, void *data) { - if (data == lock->l_ast_data) + if (data == lock->l_ast_data) { lock->l_ast_data = NULL; - if (lock->l_req_mode != lock->l_granted_mode) - return LDLM_ITER_STOP; + if (lock->l_req_mode != lock->l_granted_mode) + LDLM_ERROR(lock,"clearing inode with ungranted lock\n"); + } return LDLM_ITER_CONTINUE; } void llu_clear_inode(struct inode *inode) { - struct ll_fid fid; struct llu_inode_info *lli = llu_i2info(inode); struct llu_sb_info *sbi = llu_i2sbi(inode); ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%lu(%p)\n", lli->lli_st_ino, - lli->lli_st_generation, inode); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%llu/%lu(%p)\n", + (long long)llu_i2stat(inode)->st_ino, lli->lli_st_generation, + inode); - ll_inode2fid(&fid, inode); - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(lli->lli_flags)); - mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode); + lli->lli_flags &= ~LLIF_MDS_SIZE_LOCK; + md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode), + null_if_equal, inode); if (lli->lli_smd) - obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd, + obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd, null_if_equal, inode); if (lli->lli_smd) { - obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd); + obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd); lli->lli_smd = NULL; } @@ -545,6 +582,7 @@ void llu_iop_gone(struct inode *inode) struct llu_inode_info *lli = llu_i2info(inode); ENTRY; + liblustre_wait_event(0); llu_clear_inode(inode); OBD_FREE(lli, sizeof(*lli)); @@ -554,35 +592,107 @@ void llu_iop_gone(struct inode *inode) static int inode_setattr(struct inode * inode, struct iattr * attr) { unsigned int ia_valid = attr->ia_valid; - struct llu_inode_info *lli = llu_i2info(inode); + struct intnl_stat *st = llu_i2stat(inode); int error = 0; - if (ia_valid & ATTR_SIZE) { - error = llu_vmtruncate(inode, attr->ia_size); - if (error) - goto out; - } + /* + * inode_setattr() is only ever invoked with ATTR_SIZE (by + * llu_setattr_raw()) when file has no bodies. Check this. + */ + LASSERT(ergo(ia_valid & ATTR_SIZE, llu_i2info(inode)->lli_smd == NULL)); + if (ia_valid & ATTR_SIZE) + st->st_size = attr->ia_size; if (ia_valid & ATTR_UID) - lli->lli_st_uid = attr->ia_uid; + st->st_uid = attr->ia_uid; if (ia_valid & ATTR_GID) - lli->lli_st_gid = attr->ia_gid; + st->st_gid = attr->ia_gid; if (ia_valid & ATTR_ATIME) - lli->lli_st_atime = attr->ia_atime; + st->st_atime = attr->ia_atime; if (ia_valid & ATTR_MTIME) - lli->lli_st_mtime = attr->ia_mtime; + st->st_mtime = attr->ia_mtime; if (ia_valid & ATTR_CTIME) - lli->lli_st_ctime = attr->ia_ctime; + st->st_ctime = attr->ia_ctime; if (ia_valid & ATTR_MODE) { - lli->lli_st_mode = attr->ia_mode; - if (!in_group_p(lli->lli_st_gid) && !capable(CAP_FSETID)) - lli->lli_st_mode &= ~S_ISGID; + st->st_mode = attr->ia_mode; + if (!in_group_p(st->st_gid) && !capable(CAP_FSETID)) + st->st_mode &= ~S_ISGID; } /* mark_inode_dirty(inode); */ -out: return error; } +int llu_md_setattr(struct inode *inode, struct md_op_data *op_data, + struct md_open_data **mod) +{ + struct lustre_md md; + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct ptlrpc_request *request = NULL; + int rc; + ENTRY; + + llu_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0, LUSTRE_OPC_ANY); + rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, + 0, &request, mod); + + if (rc) { + ptlrpc_req_finished(request); + if (rc != -EPERM && rc != -EACCES) + CERROR("md_setattr fails: rc = %d\n", rc); + RETURN(rc); + } + + rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF, + sbi->ll_dt_exp, sbi->ll_md_exp, &md); + if (rc) { + ptlrpc_req_finished(request); + RETURN(rc); + } + + /* We call inode_setattr to adjust timestamps. + * If there is at least some data in file, we cleared ATTR_SIZE + * above to avoid invoking vmtruncate, otherwise it is important + * to call vmtruncate in inode_setattr to update inode->i_size + * (bug 6196) */ + inode_setattr(inode, &op_data->op_attr); + llu_update_inode(inode, md.body, md.lsm); + ptlrpc_req_finished(request); + + RETURN(rc); +} + +/* Close IO epoch and send Size-on-MDS attribute update. */ +static int llu_setattr_done_writing(struct inode *inode, + struct md_op_data *op_data, + struct md_open_data *mod) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct intnl_stat *st = llu_i2stat(inode); + int rc = 0; + ENTRY; + + LASSERT(op_data != NULL); + if (!S_ISREG(st->st_mode)) + RETURN(0); + + /* XXX: pass och here for the recovery purpose. */ + CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID" for truncate\n", + op_data->op_ioepoch, PFID(&lli->lli_fid)); + + op_data->op_flags = MF_EPOCH_CLOSE | MF_SOM_CHANGE; + rc = md_done_writing(llu_i2sbi(inode)->ll_md_exp, op_data, mod); + if (rc == -EAGAIN) { + /* MDS has instructed us to obtain Size-on-MDS attribute + * from OSTs and send setattr to back to MDS. */ + rc = llu_sizeonmds_update(inode, mod, &op_data->op_handle, + op_data->op_ioepoch); + } else if (rc) { + CERROR("inode %llu mdc truncate failed: rc = %d\n", + st->st_ino, rc); + } + RETURN(rc); +} + /* If this inode has objects allocated to it (lsm != NULL), then the OST * object(s) determine the file size and mtime. Otherwise, the MDS will * keep these values until such a time that objects are allocated for it. @@ -600,19 +710,20 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) { struct lov_stripe_md *lsm = llu_i2info(inode)->lli_smd; struct llu_sb_info *sbi = llu_i2sbi(inode); - struct llu_inode_info *lli = llu_i2info(inode); - struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; + struct intnl_stat *st = llu_i2stat(inode); int ia_valid = attr->ia_valid; - int rc = 0; + struct md_op_data op_data = { { 0 } }; + struct md_open_data *mod = NULL; + int rc = 0, rc1 = 0; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu\n", lli->lli_st_ino); + CDEBUG(D_VFSTRACE, "VFS Op:inode=%llu\n", (long long)st->st_ino); if (ia_valid & ATTR_SIZE) { if (attr->ia_size > ll_file_maxbytes(inode)) { CDEBUG(D_INODE, "file too large %llu > "LPU64"\n", - attr->ia_size, ll_file_maxbytes(inode)); + (long long)attr->ia_size, + ll_file_maxbytes(inode)); RETURN(-EFBIG); } @@ -632,42 +743,48 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) attr->ia_mtime = CURRENT_TIME; attr->ia_valid |= ATTR_MTIME_SET; } + if ((attr->ia_valid & ATTR_CTIME) && !(attr->ia_valid & ATTR_MTIME)) { + /* To avoid stale mtime on mds, obtain it from ost and send + to mds. */ + rc = llu_glimpse_size(inode); + if (rc) + RETURN(rc); + + attr->ia_valid |= ATTR_MTIME_SET | ATTR_MTIME; + attr->ia_mtime = inode->i_stbuf.st_mtime; + } if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME)) CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n", LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime), LTIME_S(CURRENT_TIME)); + + /* NB: ATTR_SIZE will only be set after this point if the size + * resides on the MDS, ie, this file has no objects. */ if (lsm) attr->ia_valid &= ~ATTR_SIZE; /* If only OST attributes being set on objects, don't do MDS RPC. * In that case, we need to check permissions and update the local * inode ourselves so we can call obdo_from_inode() always. */ - if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { - struct lustre_md md; - llu_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); - - rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, - attr, NULL, 0, NULL, 0, &request); + if (ia_valid & (lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { + memcpy(&op_data.op_attr, attr, sizeof(*attr)); - if (rc) { - ptlrpc_req_finished(request); - if (rc != -EPERM && rc != -EACCES) - CERROR("mdc_setattr fails: rc = %d\n", rc); + /* Open epoch for truncate. */ + if (ia_valid & ATTR_SIZE) + op_data.op_flags = MF_EPOCH_OPEN; + rc = llu_md_setattr(inode, &op_data, &mod); + if (rc) RETURN(rc); - } - rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); - if (rc) { - ptlrpc_req_finished(request); - RETURN(rc); - } - llu_update_inode(inode, md.body, md.lsm); - ptlrpc_req_finished(request); + if (op_data.op_ioepoch) + CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID" for " + "truncate\n", op_data.op_ioepoch, + PFID(&llu_i2info(inode)->lli_fid)); - if (!md.lsm || !S_ISREG(lli->lli_st_mode)) { + if (!lsm || !S_ISREG(st->st_mode)) { CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); - RETURN(0); + GOTO(out, rc); } } else { /* The OST doesn't check permissions, but the alternative is @@ -677,67 +794,94 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) if (ia_valid & (ATTR_MTIME | ATTR_ATIME)) { /* from sys_utime() */ if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { - if (current->fsuid != lli->lli_st_uid && - (rc = ll_permission(inode, 0/*MAY_WRITE*/, NULL)) != 0) + if (current->fsuid != st->st_uid && + (rc = ll_permission(inode, MAY_WRITE)) != 0) RETURN(rc); } else { - /* from inode_change_ok() */ - if (current->fsuid != lli->lli_st_uid && - !capable(CAP_FOWNER)) - RETURN(-EPERM); + /* from inode_change_ok() */ + if (current->fsuid != st->st_uid && + !capable(CAP_FOWNER)) + RETURN(-EPERM); } } - /* Won't invoke vmtruncate, as we already cleared ATTR_SIZE */ + + /* Won't invoke llu_vmtruncate(), as we already cleared + * ATTR_SIZE */ inode_setattr(inode, attr); } if (ia_valid & ATTR_SIZE) { ldlm_policy_data_t policy = { .l_extent = {attr->ia_size, OBD_OBJECT_EOF} }; - struct lustre_handle lockh = { 0 }; - int err, ast_flags = 0; + struct lustre_handle lockh = { 0, }; + struct lustre_handle match_lockh = { 0, }; + + int err; + int flags = LDLM_FL_TEST_LOCK; /* for assertion check below */ + int lock_mode; + obd_flag obd_flags; + + /* check that there are no matching locks */ + LASSERT(obd_match(sbi->ll_dt_exp, lsm, LDLM_EXTENT, &policy, + LCK_PW, &flags, inode, &match_lockh) <= 0); + /* XXX when we fix the AST intents to pass the discard-range * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA * XXX here. */ - if (attr->ia_size == 0) - ast_flags = LDLM_AST_DISCARD_DATA; + flags = (attr->ia_size == 0) ? LDLM_AST_DISCARD_DATA : 0; - rc = llu_extent_lock(NULL, inode, lsm, LCK_PW, &policy, - &lockh, ast_flags); + if (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK) { + lock_mode = LCK_NL; + obd_flags = OBD_FL_TRUNCLOCK; + CDEBUG(D_INODE, "delegating locking to the OST"); + } else { + lock_mode = LCK_PW; + obd_flags = 0; + } + + /* with lock_mode == LK_NL no lock is taken. */ + rc = llu_extent_lock(NULL, inode, lsm, lock_mode, &policy, + &lockh, flags); if (rc != ELDLM_OK) { if (rc > 0) - RETURN(-ENOLCK); - RETURN(rc); + GOTO(out, rc = -ENOLCK); + GOTO(out, rc); } - - rc = llu_vmtruncate(inode, attr->ia_size); - if (rc == 0) - set_bit(LLI_F_HAVE_OST_SIZE_LOCK, - &llu_i2info(inode)->lli_flags); + rc = llu_vmtruncate(inode, attr->ia_size, obd_flags); /* unlock now as we don't mind others file lockers racing with * the mds updates below? */ - err = llu_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); + err = llu_extent_unlock(NULL, inode, lsm, lock_mode, &lockh); if (err) { CERROR("llu_extent_unlock failed: %d\n", err); if (!rc) rc = err; } } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { + struct obd_info oinfo = { { { 0 } } }; struct obdo oa; - CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", - lli->lli_st_ino, LTIME_S(attr->ia_mtime)); + CDEBUG(D_INODE, "set mtime on OST inode %llu to %lu\n", + (long long)st->st_ino, LTIME_S(attr->ia_mtime)); oa.o_id = lsm->lsm_object_id; oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME); - rc = obd_setattr(sbi->ll_osc_exp, &oa, lsm, NULL); + + oinfo.oi_oa = &oa; + oinfo.oi_md = lsm; + + rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL); if (rc) - CERROR("obd_setattr fails: rc=%d\n", rc); + CERROR("obd_setattr_async fails: rc=%d\n", rc); } - RETURN(rc); + EXIT; +out: + if (op_data.op_ioepoch) + rc1 = llu_setattr_done_writing(inode, &op_data, mod); + return rc ? rc : rc1; } /* here we simply act as a thin layer to glue it with @@ -749,8 +893,14 @@ static int llu_iop_setattr(struct pnode *pno, struct intnl_stat *stbuf) { struct iattr iattr; + int rc; ENTRY; + liblustre_wait_event(0); + + LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME | + SETATTR_UID | SETATTR_GID | + SETATTR_LEN | SETATTR_MODE))); memset(&iattr, 0, sizeof(iattr)); if (mask & SETATTR_MODE) { @@ -759,11 +909,11 @@ static int llu_iop_setattr(struct pnode *pno, } if (mask & SETATTR_MTIME) { iattr.ia_mtime = stbuf->st_mtime; - iattr.ia_valid |= ATTR_MTIME; + iattr.ia_valid |= ATTR_MTIME | ATTR_MTIME_SET; } if (mask & SETATTR_ATIME) { iattr.ia_atime = stbuf->st_atime; - iattr.ia_valid |= ATTR_ATIME; + iattr.ia_valid |= ATTR_ATIME | ATTR_ATIME_SET; } if (mask & SETATTR_UID) { iattr.ia_uid = stbuf->st_uid; @@ -778,9 +928,12 @@ static int llu_iop_setattr(struct pnode *pno, iattr.ia_valid |= ATTR_SIZE; } - iattr.ia_valid |= ATTR_RAW; + iattr.ia_valid |= ATTR_RAW | ATTR_CTIME; + iattr.ia_ctime = CURRENT_TIME; - RETURN(llu_setattr_raw(ino, &iattr)); + rc = llu_setattr_raw(ino, &iattr); + liblustre_wait_event(0); + RETURN(rc); } #define EXT2_LINK_MAX 32000 @@ -793,18 +946,23 @@ static int llu_iop_symlink_raw(struct pnode *pno, const char *tgt) int len = qstr->len; struct ptlrpc_request *request = NULL; struct llu_sb_info *sbi = llu_i2sbi(dir); - struct mdc_op_data op_data; + struct md_op_data op_data; int err = -EMLINK; ENTRY; - if (llu_i2info(dir)->lli_st_nlink >= EXT2_LINK_MAX) + liblustre_wait_event(0); + if (llu_i2stat(dir)->st_nlink >= EXT2_LINK_MAX) RETURN(err); - llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0); - err = mdc_create(sbi->ll_mdc_exp, &op_data, - tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO, - current->fsuid, current->fsgid, 0, &request); + llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0, + LUSTRE_OPC_SYMLINK); + + err = md_create(sbi->ll_md_exp, &op_data, + tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO, + current->fsuid, current->fsgid, current->cap_effective, + 0, &request); ptlrpc_req_finished(request); + liblustre_wait_event(0); RETURN(err); } @@ -814,9 +972,9 @@ static int llu_readlink_internal(struct inode *inode, { struct llu_inode_info *lli = llu_i2info(inode); struct llu_sb_info *sbi = llu_i2sbi(inode); - struct ll_fid fid; - struct mds_body *body; - int rc, symlen = lli->lli_st_size + 1; + struct mdt_body *body; + struct intnl_stat *st = llu_i2stat(inode); + int rc, symlen = st->st_size + 1; ENTRY; *request = NULL; @@ -827,37 +985,38 @@ static int llu_readlink_internal(struct inode *inode, RETURN(0); } - ll_inode2fid(&fid, inode); - rc = mdc_getattr(sbi->ll_mdc_exp, &fid, - OBD_MD_LINKNAME, symlen, request); + rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), NULL, + OBD_MD_LINKNAME, symlen, request); if (rc) { - CERROR("inode %lu: rc = %d\n", lli->lli_st_ino, rc); + CERROR("inode %llu: rc = %d\n", (long long)st->st_ino, rc); RETURN(rc); } - body = lustre_msg_buf ((*request)->rq_repmsg, 0, sizeof (*body)); - LASSERT (body != NULL); - LASSERT_REPSWABBED (*request, 0); + body = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF, + sizeof(*body)); + LASSERT(body != NULL); + LASSERT(lustre_rep_swabbed(*request, REPLY_REC_OFF)); if ((body->valid & OBD_MD_LINKNAME) == 0) { CERROR ("OBD_MD_LINKNAME not set on reply\n"); GOTO (failed, rc = -EPROTO); } - - LASSERT (symlen != 0); + + LASSERT(symlen != 0); if (body->eadatasize != symlen) { - CERROR ("inode %lu: symlink length %d not expected %d\n", - lli->lli_st_ino, body->eadatasize - 1, symlen - 1); - GOTO (failed, rc = -EPROTO); + CERROR("inode %llu: symlink length %d not expected %d\n", + (long long)st->st_ino, body->eadatasize - 1, symlen - 1); + GOTO(failed, rc = -EPROTO); } - *symname = lustre_msg_buf ((*request)->rq_repmsg, 1, symlen); + *symname = lustre_msg_buf((*request)->rq_repmsg, REPLY_REC_OFF + 1, + symlen); if (*symname == NULL || - strnlen (*symname, symlen) != symlen - 1) { + strnlen(*symname, symlen) != symlen - 1) { /* not full/NULL terminated */ - CERROR ("inode %lu: symlink not NULL terminated string" - "of length %d\n", lli->lli_st_ino, symlen - 1); - GOTO (failed, rc = -EPROTO); + CERROR("inode %llu: symlink not NULL terminated string" + "of length %d\n", (long long)st->st_ino, symlen - 1); + GOTO(failed, rc = -EPROTO); } OBD_ALLOC(lli->lli_symlink_name, symlen); @@ -880,15 +1039,18 @@ static int llu_iop_readlink(struct pnode *pno, char *data, size_t bufsize) int rc; ENTRY; + liblustre_wait_event(0); rc = llu_readlink_internal(inode, &request, &symname); if (rc) GOTO(out, rc); LASSERT(symname); strncpy(data, symname, bufsize); + rc = strlen(symname); ptlrpc_req_finished(request); out: + liblustre_wait_event(0); RETURN(rc); } @@ -899,18 +1061,18 @@ static int llu_iop_mknod_raw(struct pnode *pno, struct ptlrpc_request *request = NULL; struct inode *dir = pno->p_parent->p_base->pb_ino; struct llu_sb_info *sbi = llu_i2sbi(dir); - struct mdc_op_data op_data; + struct md_op_data op_data; int err = -EMLINK; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu\n", - pno->p_base->pb_name.name, llu_i2info(dir)->lli_st_ino); + liblustre_wait_event(0); + CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%llu\n", + (int)pno->p_base->pb_name.len, pno->p_base->pb_name.name, + (long long)llu_i2stat(dir)->st_ino); - if (llu_i2info(dir)->lli_st_nlink >= EXT2_LINK_MAX) + if (llu_i2stat(dir)->st_nlink >= EXT2_LINK_MAX) RETURN(err); - mode &= ~current->fs->umask; - switch (mode & S_IFMT) { case 0: case S_IFREG: @@ -919,12 +1081,14 @@ static int llu_iop_mknod_raw(struct pnode *pno, case S_IFBLK: case S_IFIFO: case S_IFSOCK: - llu_prepare_mdc_op_data(&op_data, dir, NULL, - pno->p_base->pb_name.name, - pno->p_base->pb_name.len, - 0); - err = mdc_create(sbi->ll_mdc_exp, &op_data, NULL, 0, mode, - current->fsuid, current->fsgid, dev, &request); + llu_prep_md_op_data(&op_data, dir, NULL, + pno->p_base->pb_name.name, + pno->p_base->pb_name.len, 0, + LUSTRE_OPC_MKNOD); + + err = md_create(sbi->ll_md_exp, &op_data, NULL, 0, mode, + current->fsuid, current->fsgid, + current->cap_effective, dev, &request); ptlrpc_req_finished(request); break; case S_IFDIR: @@ -933,6 +1097,7 @@ static int llu_iop_mknod_raw(struct pnode *pno, default: err = -EINVAL; } + liblustre_wait_event(0); RETURN(err); } @@ -943,20 +1108,26 @@ static int llu_iop_link_raw(struct pnode *old, struct pnode *new) const char *name = new->p_base->pb_name.name; int namelen = new->p_base->pb_name.len; struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; + struct md_op_data op_data; int rc; ENTRY; LASSERT(src); LASSERT(dir); - llu_prepare_mdc_op_data(&op_data, src, dir, name, namelen, 0); - rc = mdc_link(llu_i2sbi(src)->ll_mdc_exp, &op_data, &request); + liblustre_wait_event(0); + llu_prep_md_op_data(&op_data, src, dir, name, namelen, 0, + LUSTRE_OPC_ANY); + rc = md_link(llu_i2sbi(src)->ll_md_exp, &op_data, &request); ptlrpc_req_finished(request); + liblustre_wait_event(0); RETURN(rc); } +/* + * libsysio will clear the inode immediately after return + */ static int llu_iop_unlink_raw(struct pnode *pno) { struct inode *dir = pno->p_base->pb_parent->pb_ino; @@ -965,85 +1136,75 @@ static int llu_iop_unlink_raw(struct pnode *pno) int len = qstr->len; struct inode *target = pno->p_base->pb_ino; struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; + struct md_op_data op_data; int rc; ENTRY; LASSERT(target); - llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0); - rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request); - if (!rc) { + liblustre_wait_event(0); + llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0, + LUSTRE_OPC_ANY); + rc = md_unlink(llu_i2sbi(dir)->ll_md_exp, &op_data, &request); + if (!rc) rc = llu_objects_destroy(request, dir); - - llu_i2info(target)->lli_stale_flag = 1; - unhook_stale_inode(pno); - } - ptlrpc_req_finished(request); + liblustre_wait_event(0); + RETURN(rc); } -/* FIXME - * following cases need to be considered later: - * - rename an opened file/dir - * - an opened file be removed in rename - * - rename to remove and hardlink (?opened) - */ static int llu_iop_rename_raw(struct pnode *old, struct pnode *new) { struct inode *src = old->p_parent->p_base->pb_ino; struct inode *tgt = new->p_parent->p_base->pb_ino; - struct inode *tgtinode = new->p_base->pb_ino; const char *oldname = old->p_base->pb_name.name; int oldnamelen = old->p_base->pb_name.len; const char *newname = new->p_base->pb_name.name; int newnamelen = new->p_base->pb_name.len; struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; + struct md_op_data op_data; int rc; ENTRY; LASSERT(src); LASSERT(tgt); - llu_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0); - rc = mdc_rename(llu_i2sbi(src)->ll_mdc_exp, &op_data, - oldname, oldnamelen, newname, newnamelen, - &request); + liblustre_wait_event(0); + llu_prep_md_op_data(&op_data, src, tgt, NULL, 0, 0, + LUSTRE_OPC_ANY); + rc = md_rename(llu_i2sbi(src)->ll_md_exp, &op_data, + oldname, oldnamelen, newname, newnamelen, + &request); if (!rc) { rc = llu_objects_destroy(request, src); - - if (tgtinode) { - llu_i2info(tgtinode)->lli_stale_flag = 1; - unhook_stale_inode(new); - } } ptlrpc_req_finished(request); + liblustre_wait_event(0); RETURN(rc); } #ifdef _HAVE_STATVFS static int llu_statfs_internal(struct llu_sb_info *sbi, - struct obd_statfs *osfs, - unsigned long max_age) + struct obd_statfs *osfs, __u64 max_age) { struct obd_statfs obd_osfs; int rc; ENTRY; - rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age); + rc = obd_statfs(class_exp2obd(sbi->ll_md_exp), osfs, max_age); if (rc) { - CERROR("mdc_statfs fails: rc = %d\n", rc); + CERROR("md_statfs fails: rc = %d\n", rc); RETURN(rc); } CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); - rc = obd_statfs(class_exp2obd(sbi->ll_osc_exp), &obd_osfs, max_age); + rc = obd_statfs_rqset(class_exp2obd(sbi->ll_dt_exp), + &obd_statfs, max_age); if (rc) { CERROR("obd_statfs fails: rc = %d\n", rc); RETURN(rc); @@ -1080,7 +1241,7 @@ static int llu_statfs(struct llu_sb_info *sbi, struct statfs *sfs) /* For now we will always get up-to-date statfs values, but in the * future we may allow some amount of caching on the client (e.g. * from QOS or lprocfs updates). */ - rc = llu_statfs_internal(sbi, &osfs, jiffies - 1); + rc = llu_statfs_internal(sbi, &osfs, cfs_time_current_64() - HZ); if (rc) return rc; @@ -1111,6 +1272,8 @@ static int llu_iop_statvfs(struct pnode *pno, int rc; ENTRY; + liblustre_wait_event(0); + #ifndef __CYGWIN__ LASSERT(pno->p_base->pb_ino); rc = llu_statfs(llu_i2sbi(pno->p_base->pb_ino), &fs); @@ -1131,6 +1294,7 @@ static int llu_iop_statvfs(struct pnode *pno, buf->f_namemax = fs.f_namelen; #endif + liblustre_wait_event(0); RETURN(0); } #endif /* _HAVE_STATVFS */ @@ -1142,21 +1306,26 @@ static int llu_iop_mkdir_raw(struct pnode *pno, mode_t mode) const char *name = qstr->name; int len = qstr->len; struct ptlrpc_request *request = NULL; - struct llu_inode_info *lli = llu_i2info(dir); - struct mdc_op_data op_data; + struct intnl_stat *st = llu_i2stat(dir); + struct md_op_data op_data; int err = -EMLINK; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%lu(%p)\n", - name, lli->lli_st_ino, lli->lli_st_generation, dir); - if (lli->lli_st_nlink >= EXT2_LINK_MAX) + liblustre_wait_event(0); + CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%llu/%lu(%p)\n", len, name, + (long long)st->st_ino, llu_i2info(dir)->lli_st_generation, dir); + + if (st->st_nlink >= EXT2_LINK_MAX) RETURN(err); - mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR; - llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0); - err = mdc_create(llu_i2sbi(dir)->ll_mdc_exp, &op_data, NULL, 0, mode, - current->fsuid, current->fsgid, 0, &request); + llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0, + LUSTRE_OPC_MKDIR); + + err = md_create(llu_i2sbi(dir)->ll_md_exp, &op_data, NULL, 0, + mode | S_IFDIR, current->fsuid, current->fsgid, + current->cap_effective, 0, &request); ptlrpc_req_finished(request); + liblustre_wait_event(0); RETURN(err); } @@ -1167,37 +1336,546 @@ static int llu_iop_rmdir_raw(struct pnode *pno) const char *name = qstr->name; int len = qstr->len; struct ptlrpc_request *request = NULL; - struct mdc_op_data op_data; - struct llu_inode_info *lli = llu_i2info(dir); + struct md_op_data op_data; int rc; ENTRY; - CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%lu(%p)\n", - name, lli->lli_st_ino, lli->lli_st_generation, dir); - llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, S_IFDIR); - rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request); + liblustre_wait_event(0); + CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%llu/%lu(%p)\n", len, name, + (long long)llu_i2stat(dir)->st_ino, + llu_i2info(dir)->lli_st_generation, dir); + + llu_prep_md_op_data(&op_data, dir, NULL, name, len, S_IFDIR, + LUSTRE_OPC_ANY); + rc = md_unlink(llu_i2sbi(dir)->ll_md_exp, &op_data, &request); ptlrpc_req_finished(request); - /* libsysio: remove the pnode right away */ - if (!rc) { - llu_i2info(pno->p_base->pb_ino)->lli_stale_flag = 1; - unhook_stale_inode(pno); + liblustre_wait_event(0); + RETURN(rc); +} + +#ifdef O_DIRECT +#define FCNTL_FLMASK (O_APPEND|O_NONBLOCK|O_ASYNC|O_DIRECT) +#else +#define FCNTL_FLMASK (O_APPEND|O_NONBLOCK|O_ASYNC) +#endif +#define FCNTL_FLMASK_INVALID (O_NONBLOCK|O_ASYNC) + +/* refer to ll_file_flock() for details */ +static int llu_file_flock(struct inode *ino, + int cmd, + struct file_lock *file_lock) +{ + struct llu_inode_info *lli = llu_i2info(ino); + struct intnl_stat *st = llu_i2stat(ino); + struct ldlm_res_id res_id = + { .name = {fid_seq(&lli->lli_fid), + fid_oid(&lli->lli_fid), + fid_ver(&lli->lli_fid), + LDLM_FLOCK} }; + struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL, + ldlm_flock_completion_ast, NULL, file_lock }; + + struct lustre_handle lockh = {0}; + ldlm_policy_data_t flock; + int flags = 0; + int rc; + + CDEBUG(D_VFSTRACE, "VFS Op:inode=%llu file_lock=%p\n", + (unsigned long long)st->st_ino, file_lock); + + flock.l_flock.pid = file_lock->fl_pid; + flock.l_flock.start = file_lock->fl_start; + flock.l_flock.end = file_lock->fl_end; + + switch (file_lock->fl_type) { + case F_RDLCK: + einfo.ei_mode = LCK_PR; + break; + case F_UNLCK: + einfo.ei_mode = LCK_NL; + break; + case F_WRLCK: + einfo.ei_mode = LCK_PW; + break; + default: + CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type); + LBUG(); } + switch (cmd) { + case F_SETLKW: +#ifdef F_SETLKW64 +#if F_SETLKW64 != F_SETLKW + case F_SETLKW64: +#endif +#endif + flags = 0; + break; + case F_SETLK: +#ifdef F_SETLK64 +#if F_SETLK64 != F_SETLK + case F_SETLK64: +#endif +#endif + flags = LDLM_FL_BLOCK_NOWAIT; + break; + case F_GETLK: +#ifdef F_GETLK64 +#if F_GETLK64 != F_GETLK + case F_GETLK64: +#endif +#endif + flags = LDLM_FL_TEST_LOCK; + file_lock->fl_type = einfo.ei_mode; + break; + default: + CERROR("unknown fcntl cmd: %d\n", cmd); + LBUG(); + } + + CDEBUG(D_DLMTRACE, "inode=%llu, pid=%u, flags=%#x, mode=%u, " + "start="LPU64", end="LPU64"\n", (unsigned long long)st->st_ino, + flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start, + flock.l_flock.end); + + rc = ldlm_cli_enqueue(llu_i2mdexp(ino), NULL, &einfo, &res_id, + &flock, &flags, NULL, 0, NULL, &lockh, 0); RETURN(rc); } -static int llu_iop_fcntl(struct inode *ino, int cmd, va_list ap) +static int assign_type(struct file_lock *fl, int type) { - CERROR("liblustre did not support fcntl\n"); - return -ENOSYS; + switch (type) { + case F_RDLCK: + case F_WRLCK: + case F_UNLCK: + fl->fl_type = type; + return 0; + default: + return -EINVAL; + } +} + +static int flock_to_posix_lock(struct inode *ino, + struct file_lock *fl, + struct flock *l) +{ + switch (l->l_whence) { + /* XXX: only SEEK_SET is supported in lustre */ + case SEEK_SET: + fl->fl_start = 0; + break; + default: + return -EINVAL; + } + + fl->fl_end = l->l_len - 1; + if (l->l_len < 0) + return -EINVAL; + if (l->l_len == 0) + fl->fl_end = OFFSET_MAX; + + fl->fl_pid = getpid(); + fl->fl_flags = FL_POSIX; + fl->fl_notify = NULL; + fl->fl_insert = NULL; + fl->fl_remove = NULL; + /* XXX: these fields can't be filled with suitable values, + but I think lustre doesn't use them. + */ + fl->fl_owner = NULL; + fl->fl_file = NULL; + + return assign_type(fl, l->l_type); +} + +static int llu_fcntl_getlk(struct inode *ino, struct flock *flock) +{ + struct file_lock fl; + int error; + + error = EINVAL; + if ((flock->l_type != F_RDLCK) && (flock->l_type != F_WRLCK)) + goto out; + + error = flock_to_posix_lock(ino, &fl, flock); + if (error) + goto out; + + error = llu_file_flock(ino, F_GETLK, &fl); + if (error) + goto out; + + flock->l_type = F_UNLCK; + if (fl.fl_type != F_UNLCK) { + flock->l_pid = fl.fl_pid; + flock->l_start = fl.fl_start; + flock->l_len = fl.fl_end == OFFSET_MAX ? 0: + fl.fl_end - fl.fl_start + 1; + flock->l_whence = SEEK_SET; + flock->l_type = fl.fl_type; + } + +out: + return error; +} + +static int llu_fcntl_setlk(struct inode *ino, int cmd, struct flock *flock) +{ + struct file_lock fl; + int flags = llu_i2info(ino)->lli_open_flags + 1; + int error; + + error = flock_to_posix_lock(ino, &fl, flock); + if (error) + goto out; + if (cmd == F_SETLKW) + fl.fl_flags |= FL_SLEEP; + + error = -EBADF; + switch (flock->l_type) { + case F_RDLCK: + if (!(flags & FMODE_READ)) + goto out; + break; + case F_WRLCK: + if (!(flags & FMODE_WRITE)) + goto out; + break; + case F_UNLCK: + break; + default: + error = -EINVAL; + goto out; + } + + error = llu_file_flock(ino, cmd, &fl); + if (error) + goto out; + +out: + return error; +} + +static int llu_iop_fcntl(struct inode *ino, int cmd, va_list ap, int *rtn) +{ + struct llu_inode_info *lli = llu_i2info(ino); + long flags; + struct flock *flock; + long err = 0; + + liblustre_wait_event(0); + switch (cmd) { + case F_GETFL: + *rtn = lli->lli_open_flags; + break; + case F_SETFL: + flags = va_arg(ap, long); + flags &= FCNTL_FLMASK; + if (flags & FCNTL_FLMASK_INVALID) { + LCONSOLE_ERROR_MSG(0x010, "liblustre does not support " + "the O_NONBLOCK or O_ASYNC flags. " + "Please fix your application.\n"); + *rtn = -EINVAL; + err = EINVAL; + break; + } + lli->lli_open_flags = (int)(flags & FCNTL_FLMASK) | + (lli->lli_open_flags & ~FCNTL_FLMASK); + *rtn = 0; + break; + case F_GETLK: +#ifdef F_GETLK64 +#if F_GETLK64 != F_GETLK + case F_GETLK64: +#endif +#endif + flock = va_arg(ap, struct flock *); + err = llu_fcntl_getlk(ino, flock); + *rtn = err? -1: 0; + break; + case F_SETLK: +#ifdef F_SETLKW64 +#if F_SETLKW64 != F_SETLKW + case F_SETLKW64: +#endif +#endif + case F_SETLKW: +#ifdef F_SETLK64 +#if F_SETLK64 != F_SETLK + case F_SETLK64: +#endif +#endif + flock = va_arg(ap, struct flock *); + err = llu_fcntl_setlk(ino, cmd, flock); + *rtn = err? -1: 0; + break; + default: + CERROR("unsupported fcntl cmd %x\n", cmd); + *rtn = -ENOSYS; + err = ENOSYS; + break; + } + + liblustre_wait_event(0); + return err; +} + +static int llu_get_grouplock(struct inode *inode, unsigned long arg) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct ll_file_data *fd = lli->lli_file_data; + ldlm_policy_data_t policy = { .l_extent = { .start = 0, + .end = OBD_OBJECT_EOF}}; + struct lustre_handle lockh = { 0 }; + struct lov_stripe_md *lsm = lli->lli_smd; + ldlm_error_t err; + int flags = 0; + ENTRY; + + if (fd->fd_flags & LL_FILE_GROUP_LOCKED) { + RETURN(-EINVAL); + } + + policy.l_extent.gid = arg; + if (lli->lli_open_flags & O_NONBLOCK) + flags = LDLM_FL_BLOCK_NOWAIT; + + err = llu_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, + flags); + if (err) + RETURN(err); + + fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK; + fd->fd_gid = arg; + memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh)); + + RETURN(0); +} + +static int llu_put_grouplock(struct inode *inode, unsigned long arg) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct ll_file_data *fd = lli->lli_file_data; + struct lov_stripe_md *lsm = lli->lli_smd; + ldlm_error_t err; + ENTRY; + + if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) + RETURN(-EINVAL); + + if (fd->fd_gid != arg) + RETURN(-EINVAL); + + fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK); + + err = llu_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh); + if (err) + RETURN(err); + + fd->fd_gid = 0; + memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh)); + + RETURN(0); +} + +static int llu_lov_dir_setstripe(struct inode *ino, unsigned long arg) +{ + struct llu_sb_info *sbi = llu_i2sbi(ino); + struct ptlrpc_request *request = NULL; + struct md_op_data op_data; + struct lov_user_md lum, *lump = (struct lov_user_md *)arg; + int rc = 0; + + llu_prep_md_op_data(&op_data, ino, NULL, NULL, 0, 0, + LUSTRE_OPC_ANY); + + LASSERT(sizeof(lum) == sizeof(*lump)); + LASSERT(sizeof(lum.lmm_objects[0]) == + sizeof(lump->lmm_objects[0])); + rc = copy_from_user(&lum, lump, sizeof(lum)); + if (rc) + return(-EFAULT); + + if (lum.lmm_magic != LOV_USER_MAGIC) + RETURN(-EINVAL); + + if (lum.lmm_magic != cpu_to_le32(LOV_USER_MAGIC)) + lustre_swab_lov_user_md(&lum); + + /* swabbing is done in lov_setstripe() on server side */ + rc = md_setattr(sbi->ll_md_exp, &op_data, &lum, + sizeof(lum), NULL, 0, &request, NULL); + if (rc) { + ptlrpc_req_finished(request); + if (rc != -EPERM && rc != -EACCES) + CERROR("md_setattr fails: rc = %d\n", rc); + return rc; + } + ptlrpc_req_finished(request); + + return rc; +} + +static int llu_lov_setstripe_ea_info(struct inode *ino, int flags, + struct lov_user_md *lum, int lum_size) +{ + struct llu_sb_info *sbi = llu_i2sbi(ino); + struct llu_inode_info *lli = llu_i2info(ino); + struct llu_inode_info *lli2 = NULL; + struct lov_stripe_md *lsm; + struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags}; + struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR, + llu_md_blocking_ast, ldlm_completion_ast, NULL, NULL }; + + struct ptlrpc_request *req = NULL; + struct lustre_md md; + struct md_op_data data; + struct lustre_handle lockh; + int rc = 0; + ENTRY; + + lsm = lli->lli_smd; + if (lsm) { + CDEBUG(D_IOCTL, "stripe already exists for ino "DFID"\n", + PFID(&lli->lli_fid)); + return -EEXIST; + } + + OBD_ALLOC(lli2, sizeof(struct llu_inode_info)); + if (!lli2) + return -ENOMEM; + + memcpy(lli2, lli, sizeof(struct llu_inode_info)); + lli2->lli_open_count = 0; + lli2->lli_it = NULL; + lli2->lli_file_data = NULL; + lli2->lli_smd = NULL; + lli2->lli_symlink_name = NULL; + ino->i_private = lli2; + + llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR, + LUSTRE_OPC_ANY); + + rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data, + &lockh, lum, lum_size, LDLM_FL_INTENT_ONLY); + if (rc) + GOTO(out, rc); + + req = oit.d.lustre.it_data; + rc = it_open_error(DISP_IT_EXECD, &oit); + if (rc) { + req->rq_replay = 0; + GOTO(out, rc); + } + + rc = it_open_error(DISP_OPEN_OPEN, &oit); + if (rc) { + req->rq_replay = 0; + GOTO(out, rc); + } + + rc = md_get_lustre_md(sbi->ll_md_exp, req, + DLM_REPLY_REC_OFF, sbi->ll_dt_exp, sbi->ll_md_exp, &md); + if (rc) + GOTO(out, rc); + + llu_update_inode(ino, md.body, md.lsm); + lli->lli_smd = lli2->lli_smd; + lli2->lli_smd = NULL; + + llu_local_open(lli2, &oit); + + /* release intent */ + if (lustre_handle_is_used(&lockh)) + ldlm_lock_decref(&lockh, LCK_CR); + + ptlrpc_req_finished(req); + req = NULL; + + rc = llu_file_release(ino); + out: + ino->i_private = lli; + if (lli2) + OBD_FREE(lli2, sizeof(struct llu_inode_info)); + if (req != NULL) + ptlrpc_req_finished(req); + RETURN(rc); +} + +static int llu_lov_file_setstripe(struct inode *ino, unsigned long arg) +{ + struct lov_user_md lum, *lump = (struct lov_user_md *)arg; + int rc; + int flags = FMODE_WRITE; + ENTRY; + + LASSERT(sizeof(lum) == sizeof(*lump)); + LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0])); + rc = copy_from_user(&lum, lump, sizeof(lum)); + if (rc) + RETURN(-EFAULT); + + rc = llu_lov_setstripe_ea_info(ino, flags, &lum, sizeof(lum)); + RETURN(rc); +} + +static int llu_lov_setstripe(struct inode *ino, unsigned long arg) +{ + struct intnl_stat *st = llu_i2stat(ino); + if (S_ISREG(st->st_mode)) + return llu_lov_file_setstripe(ino, arg); + if (S_ISDIR(st->st_mode)) + return llu_lov_dir_setstripe(ino, arg); + + return -EINVAL; +} + +static int llu_lov_getstripe(struct inode *ino, unsigned long arg) +{ + struct lov_stripe_md *lsm = llu_i2info(ino)->lli_smd; + + if (!lsm) + RETURN(-ENODATA); + + return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, llu_i2obdexp(ino), 0, lsm, + (void *)arg); } static int llu_iop_ioctl(struct inode *ino, unsigned long int request, va_list ap) { - CERROR("liblustre did not support ioctl\n"); - return -ENOSYS; + unsigned long arg; + int rc; + + liblustre_wait_event(0); + + switch (request) { + case LL_IOC_GROUP_LOCK: + arg = va_arg(ap, unsigned long); + rc = llu_get_grouplock(ino, arg); + break; + case LL_IOC_GROUP_UNLOCK: + arg = va_arg(ap, unsigned long); + rc = llu_put_grouplock(ino, arg); + break; + case LL_IOC_LOV_SETSTRIPE: + arg = va_arg(ap, unsigned long); + rc = llu_lov_setstripe(ino, arg); + break; + case LL_IOC_LOV_GETSTRIPE: + arg = va_arg(ap, unsigned long); + rc = llu_lov_getstripe(ino, arg); + break; + default: + CERROR("did not support ioctl cmd %lx\n", request); + rc = -ENOSYS; + break; + } + + liblustre_wait_event(0); + return rc; } /* @@ -1205,11 +1883,13 @@ static int llu_iop_ioctl(struct inode *ino, unsigned long int request, */ static int llu_iop_sync(struct inode *inode) { + liblustre_wait_event(0); return 0; } static int llu_iop_datasync(struct inode *inode) { + liblustre_wait_event(0); return 0; } @@ -1221,37 +1901,68 @@ struct filesys_ops llu_filesys_ops = struct inode *llu_iget(struct filesys *fs, struct lustre_md *md) { struct inode *inode; - struct ll_fid fid; + struct lu_fid fid; struct file_identifier fileid = {&fid, sizeof(fid)}; - if ((md->body->valid & - (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) != - (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) - CERROR("invalide fields!\n"); + if ((md->body->valid & (OBD_MD_FLID | OBD_MD_FLTYPE)) != + (OBD_MD_FLID | OBD_MD_FLTYPE)) { + CERROR("bad md body valid mask "LPX64"\n", md->body->valid); + LBUG(); + return ERR_PTR(-EPERM); + } /* try to find existing inode */ - fid.id = md->body->ino; - fid.generation = md->body->generation; - fid.f_type = md->body->mode & S_IFMT; + fid = md->body->fid1; inode = _sysio_i_find(fs, &fileid); if (inode) { - if (llu_i2info(inode)->lli_st_generation == - md->body->generation) { + if (inode->i_zombie/* || + lli->lli_st_generation != md->body->generation*/) { + I_RELE(inode); + } + else { llu_update_inode(inode, md->body, md->lsm); return inode; - } else - I_RELE(inode); + } } inode = llu_new_inode(fs, &fid); if (inode) llu_update_inode(inode, md->body, md->lsm); - + return inode; } -extern struct list_head lustre_profile_list; +static int +llu_init_ea_size(struct obd_export *md_exp, struct obd_export *dt_exp) +{ + struct lov_stripe_md lsm = { .lsm_magic = LOV_MAGIC }; + __u32 valsize = sizeof(struct lov_desc); + int rc, easize, def_easize, cookiesize; + struct lov_desc desc; + __u32 stripes; + ENTRY; + + rc = obd_get_info(dt_exp, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC, + &valsize, &desc); + if (rc) + RETURN(rc); + + stripes = min(desc.ld_tgt_count, (__u32)LOV_MAX_STRIPE_COUNT); + lsm.lsm_stripe_count = stripes; + easize = obd_size_diskmd(dt_exp, &lsm); + + lsm.lsm_stripe_count = desc.ld_default_stripe_count; + def_easize = obd_size_diskmd(dt_exp, &lsm); + + cookiesize = stripes * sizeof(struct llog_cookie); + + CDEBUG(D_HA, "updating max_mdsize/max_cookiesize: %d/%d\n", + easize, cookiesize); + + rc = md_init_ea_size(md_exp, easize, def_easize, cookiesize); + RETURN(rc); +} static int llu_fsswop_mount(const char *source, @@ -1264,85 +1975,67 @@ llu_fsswop_mount(const char *source, struct inode *root; struct pnode_base *rootpb; struct obd_device *obd; - struct ll_fid rootfid; + struct lu_fid rootfid; struct llu_sb_info *sbi; struct obd_statfs osfs; static struct qstr noname = { NULL, 0, 0 }; struct ptlrpc_request *request = NULL; - struct lustre_handle mdc_conn = {0, }; - struct lustre_handle osc_conn = {0, }; + struct lustre_handle md_conn = {0, }; + struct lustre_handle dt_conn = {0, }; struct lustre_md md; class_uuid_t uuid; + struct config_llog_instance cfg = {0, }; + char ll_instance[sizeof(sbi) * 2 + 1]; struct lustre_profile *lprof; + char *zconf_mgsnid, *zconf_profile; char *osc = NULL, *mdc = NULL; - int err = -EINVAL; + int async = 1, err = -EINVAL; + struct obd_connect_data ocd = {0,}; ENTRY; + if (ll_parse_mount_target(source, + &zconf_mgsnid, + &zconf_profile)) { + CERROR("mal-formed target %s\n", source); + RETURN(err); + } + if (!zconf_mgsnid || !zconf_profile) { + printf("Liblustre: invalid target %s\n", source); + RETURN(err); + } /* allocate & initialize sbi */ OBD_ALLOC(sbi, sizeof(*sbi)); if (!sbi) RETURN(-ENOMEM); INIT_LIST_HEAD(&sbi->ll_conn_chain); - generate_random_uuid(uuid); + ll_generate_random_uuid(uuid); class_uuid_unparse(uuid, &sbi->ll_sb_uuid); - /* zeroconf */ - if (g_zconf) { - struct config_llog_instance cfg; - int len; - - if (!g_zconf_mdsname) { - CERROR("no mds name\n"); - GOTO(out_free, err = -EINVAL); - } + /* generate a string unique to this super, let's try + the address of the super itself.*/ + sprintf(ll_instance, "%p", sbi); - /* generate a string unique to this super, let's try - the address of the super itself.*/ - len = (sizeof(sbi) * 2) + 1; - OBD_ALLOC(sbi->ll_instance, len); - if (sbi->ll_instance == NULL) - GOTO(out_free, err = -ENOMEM); - sprintf(sbi->ll_instance, "%p", sbi); - - cfg.cfg_instance = sbi->ll_instance; - cfg.cfg_uuid = sbi->ll_sb_uuid; - err = liblustre_process_log(&cfg, 1); - if (err < 0) { - CERROR("Unable to process log: %s\n", g_zconf_profile); - - GOTO(out_free, err); - } - - lprof = class_get_profile(g_zconf_profile); - if (lprof == NULL) { - CERROR("No profile found: %s\n", g_zconf_profile); - GOTO(out_free, err = -EINVAL); - } - if (osc) - OBD_FREE(osc, strlen(osc) + 1); - OBD_ALLOC(osc, strlen(lprof->lp_osc) + - strlen(sbi->ll_instance) + 2); - sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance); - - if (mdc) - OBD_FREE(mdc, strlen(mdc) + 1); - OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + - strlen(sbi->ll_instance) + 2); - sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance); - } else { - /* setup from dump_file */ - if (list_empty(&lustre_profile_list)) { - CERROR("no profile\n"); - GOTO(out_free, err = -EINVAL); - } + /* retrive & parse config log */ + cfg.cfg_instance = ll_instance; + cfg.cfg_uuid = sbi->ll_sb_uuid; + err = liblustre_process_log(&cfg, zconf_mgsnid, zconf_profile, 1); + if (err < 0) { + CERROR("Unable to process log: %s\n", zconf_profile); + GOTO(out_free, err); + } - lprof = list_entry(lustre_profile_list.next, - struct lustre_profile, lp_list); - osc = lprof->lp_osc; - mdc = lprof->lp_mdc; + lprof = class_get_profile(zconf_profile); + if (lprof == NULL) { + CERROR("No profile found: %s\n", zconf_profile); + GOTO(out_free, err = -EINVAL); } + OBD_ALLOC(osc, strlen(lprof->lp_dt) + strlen(ll_instance) + 2); + sprintf(osc, "%s-%s", lprof->lp_dt, ll_instance); + + OBD_ALLOC(mdc, strlen(lprof->lp_md) + strlen(ll_instance) + 2); + sprintf(mdc, "%s-%s", lprof->lp_md, ll_instance); if (!osc) { CERROR("no osc\n"); @@ -1364,21 +2057,24 @@ llu_fsswop_mount(const char *source, CERROR("MDC %s: not setup or attached\n", mdc); GOTO(out_free, err = -EINVAL); } + obd_set_info_async(obd->obd_self_export, strlen("async"), "async", + sizeof(async), &async, NULL); - if (mdc_init_ea_size(obd, osc)) - GOTO(out_free, err = -EINVAL); + ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION; + ocd.ocd_ibits_known = MDS_INODELOCK_FULL; + ocd.ocd_version = LUSTRE_VERSION_CODE; /* setup mdc */ - err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(NULL, &md_conn, obd, &sbi->ll_sb_uuid, &ocd); if (err) { CERROR("cannot connect to %s: rc = %d\n", mdc, err); GOTO(out_free, err); } - sbi->ll_mdc_exp = class_conn2export(&mdc_conn); + sbi->ll_md_exp = class_conn2export(&md_conn); err = obd_statfs(obd, &osfs, 100000000); if (err) - GOTO(out_mdc, err); + GOTO(out_md, err); /* * FIXME fill fs stat data into sbi here!!! FIXME @@ -1388,64 +2084,77 @@ llu_fsswop_mount(const char *source, obd = class_name2obd(osc); if (!obd) { CERROR("OSC %s: not setup or attached\n", osc); - GOTO(out_mdc, err = -EINVAL); + GOTO(out_md, err = -EINVAL); } + obd_set_info_async(obd->obd_self_export, strlen("async"), "async", + sizeof(async), &async, NULL); + + obd->obd_upcall.onu_owner = &sbi->ll_lco; + obd->obd_upcall.onu_upcall = ll_ocd_update; - err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid); + ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL | + OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK; + ocd.ocd_version = LUSTRE_VERSION_CODE; + err = obd_connect(NULL, &dt_conn, obd, &sbi->ll_sb_uuid, &ocd); if (err) { CERROR("cannot connect to %s: rc = %d\n", osc, err); - GOTO(out_mdc, err); + GOTO(out_md, err); } - sbi->ll_osc_exp = class_conn2export(&osc_conn); + sbi->ll_dt_exp = class_conn2export(&dt_conn); + sbi->ll_lco.lco_flags = ocd.ocd_connect_flags; + + llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp); - err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid); + err = md_getstatus(sbi->ll_md_exp, &rootfid, NULL); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out_osc, err); + GOTO(out_dt, err); } - CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id); - sbi->ll_rootino = rootfid.id; + CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid)); + sbi->ll_root_fid = rootfid; /* fetch attr of root inode */ - err = mdc_getattr(sbi->ll_mdc_exp, &rootfid, - OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); + err = md_getattr(sbi->ll_md_exp, &rootfid, NULL, + OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS, 0, &request); if (err) { - CERROR("mdc_getattr failed for root: rc = %d\n", err); - GOTO(out_osc, err); + CERROR("md_getattr failed for root: rc = %d\n", err); + GOTO(out_dt, err); } - err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); + err = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF, + sbi->ll_dt_exp, sbi->ll_md_exp, &md); if (err) { CERROR("failed to understand root inode md: rc = %d\n",err); GOTO(out_request, err); } - LASSERT(sbi->ll_rootino != 0); + LASSERT(fid_is_sane(&sbi->ll_root_fid)); root = llu_iget(fs, &md); - if (root == NULL) { + if (!root || IS_ERR(root)) { CERROR("fail to generate root inode\n"); GOTO(out_request, err = -EBADF); } - /* - * Generate base path-node for root. - */ - rootpb = _sysio_pb_new(&noname, NULL, root); - if (!rootpb) { - err = -ENOMEM; - goto out_inode; - } + /* + * Generate base path-node for root. + */ + rootpb = _sysio_pb_new(&noname, NULL, root); + if (!rootpb) { + err = -ENOMEM; + goto out_inode; + } - err = _sysio_do_mount(fs, rootpb, flags, tocover, mntp); - if (err) { + err = _sysio_do_mount(fs, rootpb, flags, tocover, mntp); + if (err) { _sysio_pb_gone(rootpb); - goto out_inode; + goto out_inode; } ptlrpc_req_finished(request); - printf("LibLustre: namespace mounted successfully!\n"); + CDEBUG(D_SUPER, "LibLustre: %s mounted successfully!\n", source); + liblustre_wait_idle(); return 0; @@ -1453,12 +2162,17 @@ out_inode: _sysio_i_gone(root); out_request: ptlrpc_req_finished(request); -out_osc: - obd_disconnect(sbi->ll_osc_exp, 0); -out_mdc: - obd_disconnect(sbi->ll_mdc_exp, 0); +out_dt: + obd_disconnect(sbi->ll_dt_exp); +out_md: + obd_disconnect(sbi->ll_md_exp); out_free: + if (osc) + OBD_FREE(osc, strlen(osc) + 1); + if (mdc) + OBD_FREE(mdc, strlen(mdc) + 1); OBD_FREE(sbi, sizeof(*sbi)); + liblustre_wait_idle(); return err; } @@ -1470,7 +2184,7 @@ static struct inode_ops llu_inode_ops = { inop_lookup: llu_iop_lookup, inop_getattr: llu_iop_getattr, inop_setattr: llu_iop_setattr, - inop_getdirentries: llu_iop_getdirentries, + inop_filldirentries: llu_iop_filldirentries, inop_mkdir: llu_iop_mkdir_raw, inop_rmdir: llu_iop_rmdir_raw, inop_symlink: llu_iop_symlink_raw, @@ -1480,8 +2194,9 @@ static struct inode_ops llu_inode_ops = { inop_link: llu_iop_link_raw, inop_unlink: llu_iop_unlink_raw, inop_rename: llu_iop_rename_raw, - inop_ipreadv: llu_iop_ipreadv, - inop_ipwritev: llu_iop_ipwritev, + inop_pos: llu_iop_pos, + inop_read: llu_iop_read, + inop_write: llu_iop_write, inop_iodone: llu_iop_iodone, inop_fcntl: llu_iop_fcntl, inop_sync: llu_iop_sync, @@ -1493,4 +2208,3 @@ static struct inode_ops llu_inode_ops = { #endif inop_gone: llu_iop_gone, }; -