X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_object.c;h=0cf918a877f68f0f78d99ffb4fe8208b9ba7f098;hb=c159c408293fbebf71a948e630aa9f637f3c8ffe;hp=96425cdd2c2c5ca806bd444ed47fd549ede03f44;hpb=837eab06232794ccf5dd523f778debc9253dad1c;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 96425cd..0cf918a 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -54,6 +54,7 @@ /* fid_be_cpu(), fid_cpu_to_be(). */ #include +#include #include #include #include @@ -62,6 +63,19 @@ static const struct lu_object_operations mdd_lu_obj_ops; +static int mdd_xattr_get(const struct lu_env *env, + struct md_object *obj, struct lu_buf *buf, + const char *name); + +int mdd_data_get(const struct lu_env *env, struct mdd_object *obj, + void **data) +{ + LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n", + PFID(mdd_object_fid(obj))); + mdo_data_get(env, obj, data); + return 0; +} + int mdd_la_get(const struct lu_env *env, struct mdd_object *obj, struct lu_attr *la, struct lustre_capa *capa) { @@ -81,6 +95,15 @@ static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags) obj->mod_flags |= IMMUTE_OBJ; } +struct mdd_thread_info *mdd_env_info(const struct lu_env *env) +{ + struct mdd_thread_info *info; + + info = lu_context_key_get(&env->le_ctx, &mdd_thread_key); + LASSERT(info != NULL); + return info; +} + struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len) { struct lu_buf *buf; @@ -91,6 +114,90 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len) return buf; } +void mdd_buf_put(struct lu_buf *buf) +{ + if (buf == NULL || buf->lb_buf == NULL) + return; + if (buf->lb_vmalloc) + OBD_VFREE(buf->lb_buf, buf->lb_len); + else + OBD_FREE(buf->lb_buf, buf->lb_len); + buf->lb_buf = NULL; +} + +const struct lu_buf *mdd_buf_get_const(const struct lu_env *env, + const void *area, ssize_t len) +{ + struct lu_buf *buf; + + buf = &mdd_env_info(env)->mti_buf; + buf->lb_buf = (void *)area; + buf->lb_len = len; + return buf; +} + +#define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */ +struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf; + + if ((len > buf->lb_len) && (buf->lb_buf != NULL)) { + if (buf->lb_vmalloc) + OBD_VFREE(buf->lb_buf, buf->lb_len); + else + OBD_FREE(buf->lb_buf, buf->lb_len); + buf->lb_buf = NULL; + } + if (buf->lb_buf == NULL) { + buf->lb_len = len; + if (buf->lb_len <= BUF_VMALLOC_SIZE) { + OBD_ALLOC(buf->lb_buf, buf->lb_len); + buf->lb_vmalloc = 0; + } + if (buf->lb_buf == NULL) { + OBD_VMALLOC(buf->lb_buf, buf->lb_len); + buf->lb_vmalloc = 1; + } + if (buf->lb_buf == NULL) + buf->lb_len = 0; + } + return buf; +} + +/** Increase the size of the \a mti_big_buf. + * preserves old data in buffer + * old buffer remains unchanged on error + * \retval 0 or -ENOMEM + */ +int mdd_buf_grow(const struct lu_env *env, ssize_t len) +{ + struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf; + struct lu_buf buf; + + LASSERT(len >= oldbuf->lb_len); + if (len > BUF_VMALLOC_SIZE) { + OBD_VMALLOC(buf.lb_buf, len); + buf.lb_vmalloc = 1; + } else { + OBD_ALLOC(buf.lb_buf, len); + buf.lb_vmalloc = 0; + } + if (buf.lb_buf == NULL) + return -ENOMEM; + + buf.lb_len = len; + memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len); + + if (oldbuf->lb_vmalloc) + OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len); + else + OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len); + + memcpy(oldbuf, &buf, sizeof(buf)); + + return 0; +} + struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env, struct mdd_device *mdd) { @@ -135,26 +242,6 @@ struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env, return mti->mti_max_lmm; } -const struct lu_buf *mdd_buf_get_const(const struct lu_env *env, - const void *area, ssize_t len) -{ - struct lu_buf *buf; - - buf = &mdd_env_info(env)->mti_buf; - buf->lb_buf = (void *)area; - buf->lb_len = len; - return buf; -} - -struct mdd_thread_info *mdd_env_info(const struct lu_env *env) -{ - struct mdd_thread_info *info; - - info = lu_context_key_get(&env->le_ctx, &mdd_thread_key); - LASSERT(info != NULL); - return info; -} - struct lu_object *mdd_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d) @@ -180,18 +267,21 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env, static int mdd_object_init(const struct lu_env *env, struct lu_object *o, const struct lu_object_conf *_) { - struct mdd_device *d = lu2mdd_dev(o->lo_dev); - struct lu_object *below; + struct mdd_device *d = lu2mdd_dev(o->lo_dev); + struct mdd_object *mdd_obj = lu2mdd_obj(o); + struct lu_object *below; struct lu_device *under; ENTRY; - under = &d->mdd_child->dd_lu_dev; - below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); - mdd_pdlock_init(lu2mdd_obj(o)); + mdd_obj->mod_cltime = 0; + under = &d->mdd_child->dd_lu_dev; + below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); + mdd_pdlock_init(mdd_obj); if (below == NULL) - RETURN(-ENOMEM); + RETURN(-ENOMEM); lu_object_add(o, below); + RETURN(0); } @@ -206,43 +296,22 @@ static int mdd_object_start(const struct lu_env *env, struct lu_object *o) static void mdd_object_free(const struct lu_env *env, struct lu_object *o) { struct mdd_object *mdd = lu2mdd_obj(o); - + lu_object_fini(o); OBD_FREE_PTR(mdd); } -/* orphan handling is here */ -static void mdd_object_delete(const struct lu_env *env, struct lu_object *o) +static int mdd_object_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct lu_object *o) { - struct mdd_object *mdd_obj = lu2mdd_obj(o); - struct thandle *handle = NULL; - ENTRY; - - if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL) - return; - - if (mdd_obj->mod_flags & ORPHAN_OBJ) { - mdd_txn_param_build(env, lu2mdd_dev(o->lo_dev), - MDD_TXN_INDEX_DELETE_OP); - handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev)); - if (IS_ERR(handle)) - CERROR("Cannot get thandle\n"); - else { - mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); - /* let's remove obj from the orphan list */ - __mdd_orphan_del(env, mdd_obj, handle); - mdd_write_unlock(env, mdd_obj); - mdd_trans_stop(env, lu2mdd_dev(o->lo_dev), - 0, handle); - } - } + return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o); } static const struct lu_object_operations mdd_lu_obj_ops = { - .loo_object_init = mdd_object_init, - .loo_object_start = mdd_object_start, - .loo_object_free = mdd_object_free, - .loo_object_delete = mdd_object_delete + .loo_object_init = mdd_object_init, + .loo_object_start = mdd_object_start, + .loo_object_free = mdd_object_free, + .loo_object_print = mdd_object_print, }; struct mdd_object *mdd_object_find(const struct lu_env *env, @@ -252,6 +321,235 @@ struct mdd_object *mdd_object_find(const struct lu_env *env, return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f)); } +static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd, + const char *path, struct lu_fid *fid) +{ + struct lu_buf *buf; + struct lu_fid *f = &mdd_env_info(env)->mti_fid; + struct mdd_object *obj; + struct lu_name *lname = &mdd_env_info(env)->mti_name; + char *name; + int rc = 0; + ENTRY; + + /* temp buffer for path element */ + buf = mdd_buf_alloc(env, PATH_MAX); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + + lname->ln_name = name = buf->lb_buf; + lname->ln_namelen = 0; + *f = mdd->mdd_root_fid; + + while(1) { + while (*path == '/') + path++; + if (*path == '\0') + break; + while (*path != '/' && *path != '\0') { + *name = *path; + path++; + name++; + lname->ln_namelen++; + } + + *name = '\0'; + /* find obj corresponding to fid */ + obj = mdd_object_find(env, mdd, f); + if (obj == NULL) + GOTO(out, rc = -EREMOTE); + if (IS_ERR(obj)) + GOTO(out, rc = -PTR_ERR(obj)); + /* get child fid from parent and name */ + rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL); + mdd_object_put(env, obj); + if (rc) + break; + + name = buf->lb_buf; + lname->ln_namelen = 0; + } + + if (!rc) + *fid = *f; +out: + RETURN(rc); +} + +/** The maximum depth that fid2path() will search. + * This is limited only because we want to store the fids for + * historical path lookup purposes. + */ +#define MAX_PATH_DEPTH 100 + +/** mdd_path() lookup structure. */ +struct path_lookup_info { + __u64 pli_recno; /**< history point */ + struct lu_fid pli_fid; + struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */ + struct mdd_object *pli_mdd_obj; + char *pli_path; /**< full path */ + int pli_pathlen; + int pli_linkno; /**< which hardlink to follow */ + int pli_fidcount; /**< number of \a pli_fids */ +}; + +static int mdd_path_current(const struct lu_env *env, + struct path_lookup_info *pli) +{ + struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj); + struct mdd_object *mdd_obj; + struct lu_buf *buf = NULL; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_name *tmpname = &mdd_env_info(env)->mti_name; + struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid; + char *ptr; + int reclen; + int rc; + ENTRY; + + ptr = pli->pli_path + pli->pli_pathlen - 1; + *ptr = 0; + --ptr; + pli->pli_fidcount = 0; + pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj); + + while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) { + mdd_obj = mdd_object_find(env, mdd, + &pli->pli_fids[pli->pli_fidcount]); + if (mdd_obj == NULL) + GOTO(out, rc = -EREMOTE); + if (IS_ERR(mdd_obj)) + GOTO(out, rc = -PTR_ERR(mdd_obj)); + rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu); + if (rc <= 0) { + mdd_object_put(env, mdd_obj); + if (rc == -1) + rc = -EREMOTE; + else if (rc == 0) + /* Do I need to error out here? */ + rc = -ENOENT; + GOTO(out, rc); + } + + /* Get parent fid and object name */ + mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD); + buf = mdd_links_get(env, mdd_obj); + mdd_read_unlock(env, mdd_obj); + mdd_object_put(env, mdd_obj); + if (IS_ERR(buf)) + GOTO(out, rc = PTR_ERR(buf)); + + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ + mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); + + /* If set, use link #linkno for path lookup, otherwise use + link #0. Only do this for the final path element. */ + if ((pli->pli_fidcount == 0) && + (pli->pli_linkno < leh->leh_reccount)) { + int count; + for (count = 0; count < pli->pli_linkno; count++) { + lee = (struct link_ea_entry *) + ((char *)lee + reclen); + mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); + } + if (pli->pli_linkno < leh->leh_reccount - 1) + /* indicate to user there are more links */ + pli->pli_linkno++; + } + + /* Pack the name in the end of the buffer */ + ptr -= tmpname->ln_namelen; + if (ptr - 1 <= pli->pli_path) + GOTO(out, rc = -EOVERFLOW); + strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen); + *(--ptr) = '/'; + + /* Store the parent fid for historic lookup */ + if (++pli->pli_fidcount >= MAX_PATH_DEPTH) + GOTO(out, rc = -EOVERFLOW); + pli->pli_fids[pli->pli_fidcount] = *tmpfid; + } + + /* Verify that our path hasn't changed since we started the lookup */ + rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid); + if (rc) { + CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc); + GOTO (out, rc = -EAGAIN); + } + if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) { + CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID + " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]), + PFID(&pli->pli_fid)); + GOTO(out, rc = -EAGAIN); + } + + memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr); + + EXIT; +out: + if (buf && !IS_ERR(buf) && buf->lb_vmalloc) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + return rc; +} + +/* Returns the full path to this fid, as of changelog record recno. */ +static int mdd_path(const struct lu_env *env, struct md_object *obj, + char *path, int pathlen, __u64 recno, int *linkno) +{ + struct path_lookup_info *pli; + int tries = 3; + int rc = -EAGAIN; + ENTRY; + + if (pathlen < 3) + RETURN(-EOVERFLOW); + + if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) { + path[0] = '/'; + path[1] = '\0'; + RETURN(0); + } + + OBD_ALLOC_PTR(pli); + if (pli == NULL) + RETURN(-ENOMEM); + + pli->pli_mdd_obj = md2mdd_obj(obj); + pli->pli_recno = recno; + pli->pli_path = path; + pli->pli_pathlen = pathlen; + pli->pli_linkno = *linkno; + + /* Retry multiple times in case file is being moved */ + while (tries-- && rc == -EAGAIN) + rc = mdd_path_current(env, pli); + +#if 0 /* We need old path names only for replication */ + /* For historical path lookup, the current links may not have existed + * at "recno" time. We must switch over to earlier links/parents + * by using the changelog records. If the earlier parent doesn't + * exist, we must search back through the changelog to reconstruct + * its parents, then check if it exists, etc. + * We may ignore this problem for the initial implementation and + * state that an "original" hardlink must still exist for us to find + * historic path name. */ + if (pli->pli_recno != -1) + rc = mdd_path_historic(env, pli); +#endif + + /* return next link index to caller */ + *linkno = pli->pli_linkno; + + OBD_FREE_PTR(pli); + + RETURN (rc); +} + int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj) { struct lu_attr *la = &mdd_env_info(env)->mti_la; @@ -318,7 +616,7 @@ static int __mdd_lmm_get(const struct lu_env *env, RETURN(0); rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size, - MDS_LOV_MD_NAME); + XATTR_NAME_LOV); if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) { rc = mdd_get_default_md(mdd_obj, ma->ma_lmm, @@ -355,7 +653,7 @@ static int __mdd_lmv_get(const struct lu_env *env, RETURN(0); rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size, - MDS_LMV_MD_NAME); + XATTR_NAME_LMV); if (rc > 0) { ma->ma_valid |= MA_LMV; rc = 0; @@ -486,10 +784,13 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj, int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct mdd_object *c, struct md_attr *ma, - struct thandle *handle) + struct thandle *handle, + const struct md_op_spec *spec) { struct lu_attr *attr = &ma->ma_attr; struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint; + struct dt_object_format *dof = &mdd_env_info(env)->mti_dof; + const struct dt_index_features *feat = spec->sp_feat; int rc; ENTRY; @@ -497,11 +798,19 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct dt_object *next = mdd_object_child(c); LASSERT(next); + if (feat != &dt_directory_features && feat != NULL) + dof->dof_type = DFT_INDEX; + else + dof->dof_type = dt_mode_to_dft(attr->la_mode); + + dof->u.dof_idx.di_feat = feat; + /* @hint will be initialized by underlying device. */ next->do_ops->do_ah_init(env, hint, p ? mdd_object_child(p) : NULL, attr->la_mode & S_IFMT); - rc = mdo_create_obj(env, c, attr, hint, handle); + + rc = mdo_create_obj(env, c, attr, hint, dof, handle); LASSERT(ergo(rc == 0, mdd_object_exists(c))); } else rc = -EEXIST; @@ -605,9 +914,9 @@ int mdd_attr_check_set_internal_locked(const struct lu_env *env, RETURN(rc); } -static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj, - const struct lu_buf *buf, const char *name, - int fl, struct thandle *handle) +int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj, + const struct lu_buf *buf, const char *name, + int fl, struct thandle *handle) { struct lustre_capa *capa = mdd_object_capa(env, obj); int rc = -EINVAL; @@ -669,7 +978,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, la->la_valid &= ~LA_ATIME; RETURN(0); } - + /* Check if flags change. */ if (la->la_valid & LA_FLAGS) { unsigned int oldflags = 0; @@ -685,7 +994,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, if (mdd_is_immutable(obj)) oldflags |= LUSTRE_IMMUTABLE_FL; if (mdd_is_append(obj)) - oldflags |= LUSTRE_APPEND_FL; + oldflags |= LUSTRE_APPEND_FL; if ((oldflags ^ newflags) && !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE)) RETURN(-EPERM); @@ -829,6 +1138,60 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, RETURN(0); } +/** Store a data change changelog record + * If this fails, we must fail the whole transaction; we don't + * want the change to commit without the log entry. + * \param mdd_obj - mdd_object of change + * \param handle - transacion handle + */ +static int mdd_changelog_data_store(const struct lu_env *env, + struct mdd_device *mdd, + enum changelog_rec_type type, + struct mdd_object *mdd_obj, + struct thandle *handle) +{ + const struct lu_fid *tfid = mdo2fid(mdd_obj); + struct llog_changelog_rec *rec; + struct lu_buf *buf; + int reclen; + int rc; + + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + RETURN(0); + + LASSERT(handle != NULL); + LASSERT(mdd_obj != NULL); + + if ((type == CL_SETATTR) && + cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) { + /* Don't need multiple updates in this log */ + /* Don't check under lock - no big deal if we get an extra + entry */ + RETURN(0); + } + + reclen = llog_data_len(sizeof(*rec)); + buf = mdd_buf_alloc(env, reclen); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + rec = (struct llog_changelog_rec *)buf->lb_buf; + + rec->cr_flags = CLF_VERSION; + rec->cr_type = (__u32)type; + rec->cr_tfid = *tfid; + rec->cr_namelen = 0; + mdd_obj->mod_cltime = cfs_time_current_64(); + + rc = mdd_changelog_llog_write(mdd, rec, handle); + if (rc < 0) { + CERROR("changelog failed: rc=%d op%d t"DFID"\n", + rc, type, PFID(tfid)); + return -EFAULT; + } + + return 0; +} + /* set attr and LOV EA at once, return updated attr */ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, const struct md_attr *ma) @@ -840,9 +1203,18 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, struct llog_cookie *logcookies = NULL; int rc, lmm_size = 0, cookie_size = 0; struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qnids[MAXQUOTAS] = { 0, 0 }; + unsigned int qoids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, block_count = 0; + int inode_pending = 0, block_pending = 0; +#endif ENTRY; - mdd_txn_param_build(env, mdd, MDD_TXN_ATTR_SET_OP); + mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma, + MDD_TXN_ATTR_SET_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); @@ -856,13 +1228,13 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, GOTO(cleanup, rc = -ENOMEM); rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size, - MDS_LOV_MD_NAME); + XATTR_NAME_LOV); if (rc < 0) GOTO(cleanup, rc); } - if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME)) + if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)) CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n", ma->ma_attr.la_mtime, ma->ma_attr.la_ctime); @@ -871,6 +1243,35 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, if (rc) GOTO(cleanup, rc); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); + if (!rc) { + quota_opc = FSFILT_OP_SETATTR; + mdd_quota_wrapper(la_copy, qnids); + mdd_quota_wrapper(la_tmp, qoids); + /* get file quota for new owner */ + lquota_chkquota(mds_quota_interface_ref, obd, + qnids[USRQUOTA], qnids[GRPQUOTA], 1, + &inode_pending, NULL, 0, NULL, 0); + block_count = (la_tmp->la_blocks + 7) >> 3; + if (block_count) { + void *data = NULL; + mdd_data_get(env, mdd_obj, &data); + /* get block quota for new owner */ + lquota_chkquota(mds_quota_interface_ref, obd, + qnids[USRQUOTA], + qnids[GRPQUOTA], + block_count, &block_pending, + NULL, LQUOTA_FLAGS_BLK, + data, 1); + } + } + } +#endif + if (la_copy->la_valid & LA_FLAGS) { rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy, handle, 1); @@ -907,12 +1308,32 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, } cleanup: + if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))) + rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj, + handle); mdd_trans_stop(env, mdd, rc, handle); if (rc == 0 && (lmm != NULL && lmm_size > 0 )) { /*set obd attr, if needed*/ rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size, logcookies); } +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) { + if (inode_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qnids[USRQUOTA], qnids[GRPQUOTA], + inode_pending, 0); + if (block_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qnids[USRQUOTA], qnids[GRPQUOTA], + block_pending, 1); + /* Trigger dqrel/dqacq for original owner and new owner. + * If failed, the next call for lquota_chkquota will + * process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc, + quota_opc); + } +#endif RETURN(rc); } @@ -976,6 +1397,12 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj, RETURN(PTR_ERR(handle)); rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle); + + /* Only record user xattr changes */ + if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) && + (strncmp("user.", name, 5) == 0)) + rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj, + handle); mdd_trans_stop(env, mdd, rc, handle); RETURN(rc); @@ -1007,6 +1434,13 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj, rc = mdo_xattr_del(env, mdd_obj, name, handle, mdd_object_capa(env, mdd_obj)); mdd_write_unlock(env, mdd_obj); + + /* Only record user xattr changes */ + if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) && + (strncmp("user.", name, 5) != 0)) + rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj, + handle); + mdd_trans_stop(env, mdd, rc, handle); RETURN(rc); @@ -1020,6 +1454,12 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj, struct mdd_object *mdd_obj = md2mdd_obj(obj); struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0; +#endif int rc; ENTRY; @@ -1062,11 +1502,26 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj, GOTO(cleanup, rc); rc = mdd_finish_unlink(env, mdd_obj, ma, handle); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && ma->ma_valid & MA_INODE && + ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + } +#endif + EXIT; cleanup: mdd_write_unlock(env, mdd_obj); mdd_trans_stop(env, mdd, rc, handle); +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) + /* Trigger dqrel on the owner of child. If failed, + * the next call for lquota_chkquota will process it */ + lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, + quota_opc); +#endif return rc; } @@ -1105,20 +1560,53 @@ static int mdd_object_create(const struct lu_env *env, struct mdd_object *mdd_obj = md2mdd_obj(obj); const struct lu_fid *pfid = spec->u.sp_pfid; struct thandle *handle; - int rc; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, block_count = 0; + int inode_pending = 0, block_pending = 0; +#endif + int rc = 0; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + /* get file quota for child */ + lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA], + qids[GRPQUOTA], 1, &inode_pending, NULL, 0, + NULL, 0); + switch (ma->ma_attr.la_mode & S_IFMT) { + case S_IFLNK: + case S_IFDIR: + block_count = 2; + break; + case S_IFREG: + block_count = 1; + break; + } + /* get block quota for child */ + if (block_count) + lquota_chkquota(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], + block_count, &block_pending, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); + } +#endif + mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); rc = mdd_oc_sanity_check(env, mdd_obj, ma); if (rc) GOTO(unlock, rc); - rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle); + rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec); if (rc) GOTO(unlock, rc); @@ -1132,7 +1620,7 @@ static int mdd_object_create(const struct lu_env *env, rc = __mdd_xattr_set(env, mdd_obj, mdd_buf_get_const(env, lmv, lmv_size), - MDS_LMV_MD_NAME, 0, handle); + XATTR_NAME_LMV, 0, handle); if (rc) GOTO(unlock, rc); @@ -1158,7 +1646,8 @@ static int mdd_object_create(const struct lu_env *env, pfid = spec->u.sp_ea.fid; } #endif - rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle); + rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle, + spec); } EXIT; unlock: @@ -1167,6 +1656,23 @@ unlock: mdd_write_unlock(env, mdd_obj); mdd_trans_stop(env, mdd, rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) { + if (inode_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], + inode_pending, 0); + if (block_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], + block_pending, 1); + /* Trigger dqacq on the owner of child. If failed, + * the next call for lquota_chkquota will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, + FSFILT_OP_CREATE_PARTIAL_CHILD); + } +#endif return rc; } @@ -1319,6 +1825,7 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, if (S_ISREG(mdd_object_type(obj))) { /* Return LOV & COOKIES unconditionally here. We clean evth up. * Caller must be ready for that. */ + rc = __mdd_lmm_get(env, obj, ma); if ((ma->ma_valid & MA_LOV)) rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj), @@ -1333,9 +1840,17 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, static int mdd_close(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { - int rc; struct mdd_object *mdd_obj = md2mdd_obj(obj); struct thandle *handle; + int rc; + int reset = 1; + +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0; +#endif ENTRY; rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP); @@ -1349,14 +1864,39 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* release open count */ mdd_obj->mod_count --; + if (mdd_obj->mod_count == 0) { + /* remove link to object from orphan index */ + if (mdd_obj->mod_flags & ORPHAN_OBJ) + __mdd_orphan_del(env, mdd_obj, handle); + } + rc = mdd_iattr_get(env, mdd_obj, ma); - if (rc == 0 && mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) - rc = mdd_object_kill(env, mdd_obj, ma); - else + if (rc == 0) { + if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) { + rc = mdd_object_kill(env, mdd_obj, ma); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + } +#endif + if (rc == 0) + reset = 0; + } + } + + if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); - + mdd_write_unlock(env, mdd_obj); mdd_trans_stop(env, mdo2mdd(obj), rc, handle); +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) + /* Trigger dqrel on the owner of child. If failed, + * the next call for lquota_chkquota will process it */ + lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc, + quota_opc); +#endif RETURN(rc); } @@ -1473,7 +2013,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, * iterate through directory and fill pages from @rdpg */ iops = &next->do_index_ops->dio_it; - it = iops->init(env, next, 0, mdd_object_capa(env, obj)); + it = iops->init(env, next, mdd_object_capa(env, obj)); if (IS_ERR(it)) return PTR_ERR(it); @@ -1615,4 +2155,5 @@ const struct md_object_operations mdd_obj_ops = { .moo_readlink = mdd_readlink, .moo_capa_get = mdd_capa_get, .moo_object_sync = mdd_object_sync, + .moo_path = mdd_path, };