struct md_object *obj, struct lu_buf *buf,
const char *name);
+int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
+ void **data)
+{
+ LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
+ PFID(mdd_object_fid(obj)));
+ mdo_data_get(env, obj, data);
+ return 0;
+}
+
int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
struct lu_attr *la, struct lustre_capa *capa)
{
return buf;
}
-/* preserve old data */
+/** Increase the size of the \a mti_big_buf.
+ * preserves old data in buffer
+ * old buffer remains unchanged on error
+ * \retval 0 or -ENOMEM
+ */
int mdd_buf_grow(const struct lu_env *env, ssize_t len)
{
struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
static int mdd_object_print(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o)
{
- return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
+ struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
+ return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
+ "valid=%x, cltime=%llu, flags=%lx)",
+ mdd, mdd->mod_count, mdd->mod_valid,
+ mdd->mod_cltime, mdd->mod_flags);
}
static const struct lu_object_operations mdd_lu_obj_ops = {
/** mdd_path() lookup structure. */
struct path_lookup_info {
__u64 pli_recno; /**< history point */
+ __u64 pli_currec; /**< current record */
struct lu_fid pli_fid;
struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
struct mdd_object *pli_mdd_obj;
/* Get parent fid and object name */
mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
buf = mdd_links_get(env, mdd_obj);
- if (IS_ERR(buf))
- GOTO(out, rc = PTR_ERR(buf));
mdd_read_unlock(env, mdd_obj);
mdd_object_put(env, mdd_obj);
- if (rc < 0)
- GOTO(out, rc);
+ if (IS_ERR(buf))
+ GOTO(out, rc = PTR_ERR(buf));
leh = buf->lb_buf;
lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
pli->pli_fids[pli->pli_fidcount] = *tmpfid;
}
- /* Verify that our path hasn't changed since we started the lookup */
+ /* Verify that our path hasn't changed since we started the lookup.
+ Record the current index, and verify the path resolves to the
+ same fid. If it does, then the path is correct as of this index. */
+ spin_lock(&mdd->mdd_cl.mc_lock);
+ pli->pli_currec = mdd->mdd_cl.mc_index;
+ spin_unlock(&mdd->mdd_cl.mc_lock);
rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
if (rc) {
CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
return rc;
}
+static int mdd_path_historic(const struct lu_env *env,
+ struct path_lookup_info *pli)
+{
+ return 0;
+}
+
/* Returns the full path to this fid, as of changelog record recno. */
static int mdd_path(const struct lu_env *env, struct md_object *obj,
- char *path, int pathlen, __u64 recno, int *linkno)
+ char *path, int pathlen, __u64 *recno, int *linkno)
{
struct path_lookup_info *pli;
int tries = 3;
RETURN(-ENOMEM);
pli->pli_mdd_obj = md2mdd_obj(obj);
- pli->pli_recno = recno;
+ pli->pli_recno = *recno;
pli->pli_path = path;
pli->pli_pathlen = pathlen;
pli->pli_linkno = *linkno;
while (tries-- && rc == -EAGAIN)
rc = mdd_path_current(env, pli);
-#if 0 /* We need old path names only for replication */
/* For historical path lookup, the current links may not have existed
* at "recno" time. We must switch over to earlier links/parents
* by using the changelog records. If the earlier parent doesn't
* We may ignore this problem for the initial implementation and
* state that an "original" hardlink must still exist for us to find
* historic path name. */
- if (pli->pli_recno != -1)
+ if (pli->pli_recno != -1) {
rc = mdd_path_historic(env, pli);
-#endif
-
- /* return next link index to caller */
- *linkno = pli->pli_linkno;
+ } else {
+ *recno = pli->pli_currec;
+ /* Return next link index to caller */
+ *linkno = pli->pli_linkno;
+ }
OBD_FREE_PTR(pli);
RETURN(rc);
}
-static int mdd_get_default_md(struct mdd_object *mdd_obj,
- struct lov_mds_md *lmm, int *size)
+int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
+ int *size)
{
struct lov_desc *ldesc;
struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
unsigned int qnids[MAXQUOTAS] = { 0, 0 };
unsigned int qoids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
- int inode_pending = 0, block_pending = 0;
+ int inode_pending[MAXQUOTAS] = { 0, 0 };
+ int block_pending[MAXQUOTAS] = { 0, 0 };
#endif
ENTRY;
mdd_quota_wrapper(la_copy, qnids);
mdd_quota_wrapper(la_tmp, qoids);
/* get file quota for new owner */
- lquota_chkquota(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA], 1,
- &inode_pending, NULL, 0, NULL, 0);
+ lquota_chkquota(mds_quota_interface_ref, obd, qnids,
+ inode_pending, 1, NULL, 0, NULL, 0);
block_count = (la_tmp->la_blocks + 7) >> 3;
- if (block_count)
+ if (block_count) {
+ void *data = NULL;
+ mdd_data_get(env, mdd_obj, &data);
/* get block quota for new owner */
lquota_chkquota(mds_quota_interface_ref, obd,
- qnids[USRQUOTA],
- qnids[GRPQUOTA],
- block_count, &block_pending,
- NULL, LQUOTA_FLAGS_BLK,
- NULL, 0);
+ qnids, block_pending,
+ block_count, NULL,
+ LQUOTA_FLAGS_BLK, data, 1);
+ }
}
}
#endif
}
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (inode_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA],
- inode_pending, 0);
- if (block_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA],
- block_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+ inode_pending, 0);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+ block_pending, 1);
/* Trigger dqrel/dqacq for original owner and new owner.
* If failed, the next call for lquota_chkquota will
* process it. */
struct mds_obd *mds = &obd->u.mds;
unsigned int qids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
- int inode_pending = 0, block_pending = 0;
+ int inode_pending[MAXQUOTAS] = { 0, 0 };
+ int block_pending[MAXQUOTAS] = { 0, 0 };
#endif
int rc = 0;
ENTRY;
quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
mdd_quota_wrapper(&ma->ma_attr, qids);
/* get file quota for child */
- lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
- qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
- NULL, 0);
+ lquota_chkquota(mds_quota_interface_ref, obd, qids,
+ inode_pending, 1, NULL, 0, NULL, 0);
switch (ma->ma_attr.la_mode & S_IFMT) {
case S_IFLNK:
case S_IFDIR:
}
/* get block quota for child */
if (block_count)
- lquota_chkquota(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- block_count, &block_pending, NULL,
+ lquota_chkquota(mds_quota_interface_ref, obd, qids,
+ block_pending, block_count, NULL,
LQUOTA_FLAGS_BLK, NULL, 0);
}
#endif
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (inode_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- inode_pending, 0);
- if (block_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- block_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+ inode_pending, 0);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+ block_pending, 1);
/* Trigger dqacq on the owner of child. If failed,
* the next call for lquota_chkquota will process it. */
lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
- FSFILT_OP_CREATE_PARTIAL_CHILD);
+ quota_opc);
}
#endif
return rc;
struct md_attr *ma)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
+ struct mdd_device *mdd = mdo2mdd(obj);
struct thandle *handle;
int rc;
int reset = 1;
/* release open count */
mdd_obj->mod_count --;
- if (mdd_obj->mod_count == 0) {
+ if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
/* remove link to object from orphan index */
- if (mdd_obj->mod_flags & ORPHAN_OBJ)
- __mdd_orphan_del(env, mdd_obj, handle);
+ rc = __mdd_orphan_del(env, mdd_obj, handle);
+ if (rc == 0) {
+ CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+ "list, OSS objects to be destroyed.\n",
+ PFID(mdd_object_fid(mdd_obj)));
+ } else {
+ CERROR("Object "DFID" can not be deleted from orphan "
+ "list, maybe cause OST objects can not be "
+ "destroyed (err: %d).\n",
+ PFID(mdd_object_fid(mdd_obj)), rc);
+ /* If object was not deleted from orphan list, do not
+ * destroy OSS objects, which will be done when next
+ * recovery. */
+ GOTO(out, rc);
+ }
}
rc = mdd_iattr_get(env, mdd_obj, ma);
- if (rc == 0) {
- if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
- rc = mdd_object_kill(env, mdd_obj, ma);
+ /* Object maybe not in orphan list originally, it is rare case for
+ * mdd_finish_unlink() failure. */
+ if (rc == 0 && ma->ma_attr.la_nlink == 0) {
#ifdef HAVE_QUOTA_SUPPORT
- if (mds->mds_quota) {
- quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
- mdd_quota_wrapper(&ma->ma_attr, qids);
- }
+ if (mds->mds_quota) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ mdd_quota_wrapper(&ma->ma_attr, qids);
+ }
#endif
- if (rc == 0)
- reset = 0;
+ /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
+ if (ma->ma_valid & MA_FLAGS &&
+ ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
+ rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
+ } else {
+ rc = mdd_object_kill(env, mdd_obj, ma);
+ if (rc == 0)
+ reset = 0;
}
+
+ if (rc != 0)
+ CERROR("Error when prepare to delete Object "DFID" , "
+ "which will cause OST objects can not be "
+ "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
}
+ EXIT;
+out:
if (reset)
ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
quota_opc);
#endif
- RETURN(rc);
+ return rc;
}
/*
RETURN(rc);
}
-static int mdd_dir_page_build(const struct lu_env *env, int first,
- void *area, int nob, const struct dt_it_ops *iops,
- struct dt_it *it, __u64 *start, __u64 *end,
- struct lu_dirent **last)
+static int mdd_append_attrs(const struct lu_env *env,
+ struct mdd_device *mdd,
+ __u32 attr,
+ const struct dt_it_ops *iops,
+ struct dt_it *it,
+ struct lu_dirent*ent)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lu_fid *fid = &info->mti_fid2;
+ int len = cpu_to_le16(ent->lde_namelen);
+ const unsigned align = sizeof(struct luda_type) - 1;
+ struct lu_fid_pack *pack;
+ struct mdd_object *obj;
+ struct luda_type *lt;
+ int rc = 0;
+
+ if (attr & LUDA_FID) {
+ pack = (struct lu_fid_pack *)iops->rec(env, it);
+ if (IS_ERR(pack)) {
+ rc = PTR_ERR(pack);
+ ent->lde_attrs = 0;
+ goto out;
+ }
+ rc = fid_unpack(pack, fid);
+ if (rc != 0) {
+ ent->lde_attrs = 0;
+ goto out;
+ }
+
+ fid_cpu_to_le(&ent->lde_fid, fid);
+ ent->lde_attrs = LUDA_FID;
+ }
+
+ /* check if file type is required */
+ if (attr & LUDA_TYPE) {
+ if (!(attr & LUDA_FID)) {
+ CERROR("wrong attr : [%x]\n",attr);
+ rc = -EINVAL;
+ goto out;
+ }
+
+ obj = mdd_object_find(env, mdd, fid);
+ if (obj == NULL) /* remote object */
+ goto out;
+
+ if (IS_ERR(obj)) {
+ rc = PTR_ERR(obj);
+ goto out;
+ }
+
+ if (mdd_object_exists(obj) == +1) {
+ len = (len + align) & ~align;
+
+ lt = (void *) ent->lde_name + len;
+ lt->lt_type = cpu_to_le16(mdd_object_type(obj));
+
+ ent->lde_attrs |= LUDA_TYPE;
+ }
+ mdd_object_put(env, obj);
+ }
+out:
+ ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
+ return rc;
+}
+
+static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
+ int first, void *area, int nob,
+ const struct dt_it_ops *iops, struct dt_it *it,
+ __u64 *start, __u64 *end,
+ struct lu_dirent **last, __u32 attr)
{
- struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_fid_pack *pack = &info->mti_pack;
int result;
struct lu_dirent *ent;
+ __u64 hash = 0;
if (first) {
memset(area, 0, sizeof (struct lu_dirpage));
nob -= sizeof (struct lu_dirpage);
}
- LASSERT(nob > sizeof *ent);
-
ent = area;
- result = 0;
do {
char *name;
int len;
int recsize;
- __u64 hash;
- name = (char *)iops->key(env, it);
len = iops->key_size(env, it);
- pack = (struct lu_fid_pack *)iops->rec(env, it);
- result = fid_unpack(pack, fid);
- if (result != 0)
- break;
+ /* IAM iterator can return record with zero len. */
+ if (len == 0)
+ goto next;
- recsize = (sizeof(*ent) + len + 7) & ~7;
+ name = (char *)iops->key(env, it);
hash = iops->store(env, it);
- *end = hash;
- CDEBUG(D_INFO, "%p %p %d "DFID": "LPU64" (%d) \"%*.*s\"\n",
- name, ent, nob, PFID(fid), hash, len, len, len, name);
+ if (unlikely(first)) {
+ first = 0;
+ *start = hash;
+ }
+
+ recsize = lu_dirent_calc_size(len, attr);
+
+ CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n",
+ name, ent, nob, hash, len, len, len, name);
if (nob >= recsize) {
- ent->lde_fid = *fid;
- fid_cpu_to_le(&ent->lde_fid, &ent->lde_fid);
- ent->lde_hash = hash;
+ ent->lde_hash = cpu_to_le64(hash);
ent->lde_namelen = cpu_to_le16(len);
ent->lde_reclen = cpu_to_le16(recsize);
memcpy(ent->lde_name, name, len);
- if (first && ent == area)
- *start = hash;
- *last = ent;
- ent = (void *)ent + recsize;
- nob -= recsize;
- result = iops->next(env, it);
+
+ result = mdd_append_attrs(env, mdd, attr, iops, it, ent);
+ if (result == -ESTALE)
+ goto next;
+ if (result != 0)
+ goto out;
} else {
/*
* record doesn't fit into page, enlarge previous one.
*/
- LASSERT(*last != NULL);
- (*last)->lde_reclen =
- cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
- nob);
- break;
+ if (*last) {
+ (*last)->lde_reclen =
+ cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
+ nob);
+ result = 0;
+ } else
+ result = -EINVAL;
+
+ goto out;
}
+ *last = ent;
+ ent = (void *)ent + recsize;
+ nob -= recsize;
+
+next:
+ result = iops->next(env, it);
+ if (result == -ESTALE)
+ goto next;
} while (result == 0);
+out:
+ *end = hash;
return result;
}
const struct dt_it_ops *iops;
struct page *pg;
struct lu_dirent *last = NULL;
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
int i;
int rc;
int nob;
i++, nob -= CFS_PAGE_SIZE) {
LASSERT(i < rdpg->rp_npages);
pg = rdpg->rp_pages[i];
- rc = mdd_dir_page_build(env, !i, cfs_kmap(pg),
+ rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
min_t(int, nob, CFS_PAGE_SIZE), iops,
- it, &hash_start, &hash_end, &last);
- if (rc != 0 || i == rdpg->rp_npages - 1)
- last->lde_reclen = 0;
+ it, &hash_start, &hash_end, &last,
+ rdpg->rp_attrs);
+ if (rc != 0 || i == rdpg->rp_npages - 1) {
+ if (last)
+ last->lde_reclen = 0;
+ }
cfs_kunmap(pg);
}
if (rc > 0) {
struct lu_dirpage *dp;
dp = cfs_kmap(rdpg->rp_pages[0]);
- dp->ldp_hash_start = rdpg->rp_hash;
- dp->ldp_hash_end = hash_end;
+ dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
+ dp->ldp_hash_end = cpu_to_le64(hash_end);
if (i == 0)
/*
* No pages were processed, mark this.
*/
dp->ldp_flags |= LDF_EMPTY;
+
dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
cfs_kunmap(rdpg->rp_pages[0]);
}
return rc;
}
-static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
- const struct lu_rdpg *rdpg)
+int mdd_readpage(const struct lu_env *env, struct md_object *obj,
+ const struct lu_rdpg *rdpg)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
int rc;
pg = rdpg->rp_pages[0];
dp = (struct lu_dirpage*)cfs_kmap(pg);
memset(dp, 0 , sizeof(struct lu_dirpage));
- dp->ldp_hash_start = rdpg->rp_hash;
- dp->ldp_hash_end = DIR_END_OFF;
+ dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
+ dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
dp->ldp_flags |= LDF_EMPTY;
dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
cfs_kunmap(pg);
return next->do_ops->do_object_sync(env, next);
}
+static dt_obj_version_t mdd_version_get(const struct lu_env *env,
+ struct md_object *obj)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_get(env, mdd_object_child(mdd_obj));
+}
+
+static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
+ dt_obj_version_t version)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_set(env, mdd_object_child(mdd_obj), version);
+}
+
const struct md_object_operations mdd_obj_ops = {
.moo_permission = mdd_permission,
.moo_attr_get = mdd_attr_get,
.moo_readlink = mdd_readlink,
.moo_capa_get = mdd_capa_get,
.moo_object_sync = mdd_object_sync,
+ .moo_version_get = mdd_version_get,
+ .moo_version_set = mdd_version_set,
.moo_path = mdd_path,
};