X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_md_object.c;h=d84a259e24aefebc33723765281d8c8defe60702;hp=fd262ac969a6510c7ba797a0c4c68f8eefc26ab2;hb=de2d5808bd2987f76d2486272e1a9c192ba277d4;hpb=a52aa3b4d9dd5efb87bc983c13b9f58d61ce8a8e diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index fd262ac..d84a259 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -38,20 +38,20 @@ static const char dot[] = "."; static const char dotdot[] = ".."; -static int osp_md_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +int osp_md_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); - struct update_request *update; - struct lu_fid *fid1; - int sizes[2] = {sizeof(struct obdo), 0}; - char *bufs[2] = {NULL, NULL}; - int buf_count; - int rc; + struct osp_thread_info *osi = osp_env_info(env); + struct dt_update_request *update; + struct lu_fid *fid1; + int sizes[2] = {sizeof(struct obdo), 0}; + char *bufs[2] = {NULL, NULL}; + int buf_count; + int rc; update = out_find_create_update_loc(th, dt); if (IS_ERR(update)) { @@ -62,22 +62,18 @@ static int osp_md_declare_object_create(const struct lu_env *env, } osi->osi_obdo.o_valid = 0; - LASSERT(S_ISDIR(attr->la_mode)); obdo_from_la(&osi->osi_obdo, attr, attr->la_valid); lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); bufs[0] = (char *)&osi->osi_obdo; buf_count = 1; fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu); - if (hint->dah_parent) { + if (hint != NULL && hint->dah_parent) { struct lu_fid *fid2; - struct lu_fid *tmp_fid = &osi->osi_fid; fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu); - fid_cpu_to_le(tmp_fid, fid2); - sizes[1] = sizeof(*tmp_fid); - bufs[1] = (char *)tmp_fid; + sizes[1] = sizeof(*fid2); + bufs[1] = (char *)fid2; buf_count++; } @@ -98,20 +94,20 @@ static int osp_md_declare_object_create(const struct lu_env *env, CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n", dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1)); - rc = out_insert_update(env, update, OBJ_REF_DEL, fid1, 0, + rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0, NULL, NULL); if (rc != 0) GOTO(out, rc); if (S_ISDIR(lu_object_attr(&dt->do_lu))) { /* decrease for ".." */ - rc = out_insert_update(env, update, OBJ_REF_DEL, fid1, + rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0, NULL, NULL); if (rc != 0) GOTO(out, rc); } - rc = out_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL, + rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL, NULL); if (rc != 0) GOTO(out, rc); @@ -122,7 +118,7 @@ static int osp_md_declare_object_create(const struct lu_env *env, update_inc_batchid(update); } - rc = out_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes, + rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes, (const char **)bufs); out: if (rc) @@ -132,14 +128,10 @@ out: return rc; } -static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); - CDEBUG(D_INFO, "create object "DFID"\n", PFID(&dt->do_lu.lo_header->loh_fid)); @@ -147,7 +139,6 @@ static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, * if creation reaches here, it means the object has been created * successfully */ dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); - obj->opo_empty = 1; return 0; } @@ -156,9 +147,9 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct update_request *update; - struct lu_fid *fid; - int rc; + struct dt_update_request *update; + struct lu_fid *fid; + int rc; update = out_find_create_update_loc(th, dt); if (IS_ERR(update)) { @@ -170,7 +161,7 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env, fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL); + rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL); return rc; } @@ -188,9 +179,9 @@ static int osp_md_object_ref_del(const struct lu_env *env, static int osp_md_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct update_request *update; - struct lu_fid *fid; - int rc; + struct dt_update_request *update; + struct lu_fid *fid; + int rc; update = out_find_create_update_loc(th, dt); if (IS_ERR(update)) { @@ -202,7 +193,7 @@ static int osp_md_declare_ref_add(const struct lu_env *env, fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL); + rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL); return rc; } @@ -225,22 +216,19 @@ static void osp_md_ah_init(const struct lu_env *env, { LASSERT(ah); - memset(ah, 0, sizeof(*ah)); ah->dah_parent = parent; ah->dah_mode = child_mode; } -static int osp_md_declare_attr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_attr *attr, - struct thandle *th) +int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); - struct update_request *update; - struct lu_fid *fid; - int size = sizeof(struct obdo); - char *buf; - int rc; + struct osp_thread_info *osi = osp_env_info(env); + struct dt_update_request *update; + struct lu_fid *fid; + int size = sizeof(struct obdo); + char *buf; + int rc; update = out_find_create_update_loc(th, dt); if (IS_ERR(update)) { @@ -251,24 +239,22 @@ static int osp_md_declare_attr_set(const struct lu_env *env, } osi->osi_obdo.o_valid = 0; - LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE))); obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr, attr->la_valid); lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); buf = (char *)&osi->osi_obdo; fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, + rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size, (const char **)&buf); return rc; } -static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th, - struct lustre_capa *capa) +int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th, + struct lustre_capa *capa) { CDEBUG(D_INFO, "attr set object "DFID"\n", PFID(&dt->do_lu.lo_header->loh_fid)); @@ -331,13 +317,12 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct dt_device *dt_dev = &osp->opd_dt_dev; - struct update_request *update; - struct ptlrpc_request *req = NULL; - int size = strlen((char *)key) + 1; - int rc; - struct update_reply *reply; - struct lu_fid *fid; - + struct dt_update_request *update; + struct object_update_reply *reply; + struct ptlrpc_request *req = NULL; + int size = strlen((char *)key) + 1; + struct lu_fid *fid; + int rc; ENTRY; /* Because it needs send the update buffer right away, @@ -348,9 +333,9 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OBJ_INDEX_LOOKUP, - lu_object_fid(&dt->do_lu), 1, &size, - (const char **)&key); + rc = out_insert_update(env, update, OUT_INDEX_LOOKUP, + lu_object_fid(&dt->do_lu), + 1, &size, (const char **)&key); if (rc) { CERROR("%s: Insert update error: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); @@ -361,26 +346,19 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, if (rc < 0) GOTO(out, rc); - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply->ur_version != UPDATE_REPLY_V1) { + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { CERROR("%s: Wrong version %x expected %x: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, - reply->ur_version, UPDATE_REPLY_V1, -EPROTO); + reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); GOTO(out, rc = -EPROTO); } - rc = update_get_reply_result(reply, NULL, 0); - if (rc < 0) { - CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, - PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc); - GOTO(out, rc); - } - - rc = update_get_reply_buf(reply, lbuf, 0); + rc = object_update_result_data_get(reply, lbuf, 0); if (rc < 0) - GOTO(out, rc); + GOTO(out, rc = size); if (lbuf->lb_len != sizeof(*fid)) { CERROR("%s: lookup "DFID" %s wrong size %d\n", @@ -391,7 +369,8 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, } fid = lbuf->lb_buf; - fid_le_to_cpu(fid, fid); + if (ptlrpc_rep_need_swab(req)) + lustre_swab_lu_fid(fid); if (!fid_is_sane(fid)) { CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n", dt_dev->dd_lu_dev.ld_obd->obd_name, @@ -418,13 +397,13 @@ static int osp_md_declare_insert(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct update_request *update; - struct lu_fid *fid; - struct lu_fid *rec_fid = (struct lu_fid *)rec; - int size[2] = {strlen((char *)key) + 1, + struct dt_update_request *update; + struct lu_fid *fid; + struct lu_fid *rec_fid = (struct lu_fid *)rec; + int size[2] = {strlen((char *)key) + 1, sizeof(*rec_fid)}; - const char *bufs[2] = {(char *)key, (char *)rec_fid}; - int rc; + const char *bufs[2] = {(char *)key, (char *)rec_fid}; + int rc; update = out_find_create_update_loc(th, dt); if (IS_ERR(update)) { @@ -442,7 +421,7 @@ static int osp_md_declare_insert(const struct lu_env *env, fid_cpu_to_le(rec_fid, rec_fid); - rc = out_insert_update(env, update, OBJ_INDEX_INSERT, fid, + rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid, ARRAY_SIZE(size), size, bufs); return rc; } @@ -463,7 +442,7 @@ static int osp_md_declare_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct update_request *update; + struct dt_update_request *update; struct lu_fid *fid; int size = strlen((char *)key) + 1; int rc; @@ -478,7 +457,7 @@ static int osp_md_declare_delete(const struct lu_env *env, fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size, + rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size, (const char **)&key); return rc; @@ -496,94 +475,102 @@ static int osp_md_index_delete(const struct lu_env *env, return 0; } -/** - * Creates or initializes iterator context. - * - * Note: for OSP, these index iterate api is only used to check - * whether the directory is empty now (see mdd_dir_is_empty). - * Since dir_empty will be return by OBJ_ATTR_GET(see osp_attr_get/ - * out_attr_get). So the implementation of these iterator is simplied - * to make mdd_dir_is_empty happy. The real iterator should be - * implemented, if we need it one day. - */ -static struct dt_it *osp_it_init(const struct lu_env *env, - struct dt_object *dt, - __u32 attr, - struct lustre_capa *capa) +int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) { - lu_object_get(&dt->do_lu); - return (struct dt_it *)dt; -} - -static void osp_it_fini(const struct lu_env *env, struct dt_it *di) -{ - struct dt_object *dt = (struct dt_object *)di; - lu_object_put(env, &dt->do_lu); -} - -static int osp_it_get(const struct lu_env *env, - struct dt_it *di, const struct dt_key *key) -{ - return 1; -} - -static void osp_it_put(const struct lu_env *env, struct dt_it *di) -{ - return; -} + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + int rc; + ENTRY; -static int osp_it_next(const struct lu_env *env, struct dt_it *di) -{ - struct dt_object *dt = (struct dt_object *)di; - struct osp_object *o = dt2osp_obj(dt); +again: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos_ent++; + if (ent == NULL) { + it->ooi_ent = + (struct lu_dirent *)idxpage->lip_entries; + RETURN(0); + } else if (le16_to_cpu(ent->lde_reclen) != 0 && + it->ooi_pos_ent < idxpage->lip_nr) { + ent = (struct lu_dirent *)(((char *)ent) + + le16_to_cpu(ent->lde_reclen)); + it->ooi_ent = ent; + RETURN(0); + } else { + it->ooi_ent = NULL; + } + } - if (o->opo_empty) - return 1; + rc = osp_it_next_page(env, di); + if (rc == 0) + goto again; - return 0; + RETURN(rc); } static struct dt_key *osp_it_key(const struct lu_env *env, const struct dt_it *di) { - LBUG(); - return NULL; + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + + return (struct dt_key *)ent->lde_name; } static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di) { - LBUG(); - return 0; -} + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; -static int osp_it_rec(const struct lu_env *env, const struct dt_it *di, - struct dt_rec *lde, __u32 attr) -{ - LBUG(); - return 0; + return (int)le16_to_cpu(ent->lde_namelen); } -static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) +static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) { - LBUG(); + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + int reclen; + + reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr); + memcpy(rec, ent, reclen); return 0; } +/** + * Locate the iteration cursor to the specified position (cookie). + * + * \param[in] env pointer to the thread context + * \param[in] di pointer to the iteration structure + * \param[in] hash the specified position + * + * \retval positive number for locating to the exactly position + * or the next + * \retval 0 for arriving at the end of the iteration + * \retval negative error number on failure + */ static int osp_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) { - LBUG(); - return 0; -} + struct osp_it *it = (struct osp_it *)di; + int rc; -static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void *key_rec) -{ - LBUG(); - return 0; + it->ooi_next = hash; + rc = osp_md_index_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; } -static const struct dt_index_operations osp_md_index_ops = { +const struct dt_index_operations osp_md_index_ops = { .dio_lookup = osp_md_index_lookup, .dio_declare_insert = osp_md_declare_insert, .dio_insert = osp_md_index_insert, @@ -594,10 +581,10 @@ static const struct dt_index_operations osp_md_index_ops = { .fini = osp_it_fini, .get = osp_it_get, .put = osp_it_put, - .next = osp_it_next, + .next = osp_md_index_it_next, .key = osp_it_key, .key_size = osp_it_key_size, - .rec = osp_it_rec, + .rec = osp_md_index_it_rec, .store = osp_it_store, .load = osp_it_load, .key_rec = osp_it_key_rec, @@ -616,23 +603,22 @@ static int osp_md_object_lock(const struct lu_env *env, struct dt_object *dt, struct lustre_handle *lh, struct ldlm_enqueue_info *einfo, - void *policy) + ldlm_policy_data_t *policy) { - struct osp_thread_info *info = osp_env_info(env); - struct ldlm_res_id *res_id = &info->osi_resid; - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct osp_device *osp = dt2osp_dev(dt_dev); - struct ptlrpc_request *req = NULL; - int rc = 0; - __u64 flags = 0; - ldlm_mode_t mode; + struct ldlm_res_id *res_id; + struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); + struct osp_device *osp = dt2osp_dev(dt_dev); + struct ptlrpc_request *req; + int rc = 0; + __u64 flags = 0; + ldlm_mode_t mode; - fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id); + res_id = einfo->ei_res_id; + LASSERT(res_id != NULL); mode = ldlm_lock_match(osp->opd_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, res_id, - einfo->ei_type, - (ldlm_policy_data_t *)policy, + einfo->ei_type, policy, einfo->ei_mode, lh, 0); if (mode > 0) return ELDLM_OK; @@ -650,6 +636,19 @@ static int osp_md_object_lock(const struct lu_env *env, return rc == ELDLM_OK ? 0 : -EIO; } +static int osp_md_object_unlock(const struct lu_env *env, + struct dt_object *dt, + struct ldlm_enqueue_info *einfo, + ldlm_policy_data_t *policy) +{ + struct lustre_handle *lockh = einfo->ei_cbdata; + + /* unlock finally */ + ldlm_lock_decref(lockh, einfo->ei_mode); + + return 0; +} + struct dt_object_operations osp_md_obj_ops = { .do_read_lock = osp_md_object_read_lock, .do_write_lock = osp_md_object_write_lock, @@ -671,6 +670,53 @@ struct dt_object_operations osp_md_obj_ops = { .do_xattr_get = osp_xattr_get, .do_declare_xattr_set = osp_declare_xattr_set, .do_xattr_set = osp_xattr_set, + .do_declare_xattr_del = osp_declare_xattr_del, + .do_xattr_del = osp_xattr_del, .do_index_try = osp_md_index_try, .do_object_lock = osp_md_object_lock, + .do_object_unlock = osp_md_object_unlock, +}; + +static ssize_t osp_md_declare_write(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + loff_t pos, struct thandle *th) +{ + struct dt_update_request *update; + struct lu_fid *fid; + int sizes[2] = {buf->lb_len, sizeof(pos)}; + const char *bufs[2] = {(char *)buf->lb_buf, + (char *)&pos}; + ssize_t rc; + + update = out_find_create_update_loc(th, dt); + if (IS_ERR(update)) { + CERROR("%s: Get OSP update buf failed: rc = %d\n", + dt->do_lu.lo_dev->ld_obd->obd_name, + (int)PTR_ERR(update)); + return PTR_ERR(update); + } + + pos = cpu_to_le64(pos); + bufs[1] = (char *)&pos; + fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + rc = out_insert_update(env, update, OUT_WRITE, fid, + ARRAY_SIZE(sizes), sizes, bufs); + + return rc; + +} + +static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *handle, + struct lustre_capa *capa, int ignore_quota) +{ + return buf->lb_len; +} + +/* These body operation will be used to write symlinks during migration etc */ +struct dt_body_operations osp_md_body_ops = { + .dbo_declare_write = osp_md_declare_write, + .dbo_write = osp_md_write, };