X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_object.c;h=966d75190bba13582b9f73179db8e6fefec7b614;hb=cd3aa902bfcf529a6774df3a207a2c3287c994fb;hp=5ffb7e49c0e14c7153ff44112de536e98852735b;hpb=2c67b17fd183ef60baba74e914e96b9b292bfc39;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 5ffb7e4..966d751 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -87,7 +87,7 @@ static int osp_oac_init(struct osp_object *obj) static struct osp_xattr_entry * osp_oac_xattr_find_locked(struct osp_object_attr *ooa, - const char *name, int namelen, bool unlink) + const char *name, size_t namelen, bool unlink) { struct osp_xattr_entry *oxe; @@ -121,13 +121,13 @@ static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj, } static struct osp_xattr_entry * -osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len) +osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) { struct osp_object_attr *ooa = obj->opo_ooa; struct osp_xattr_entry *oxe; struct osp_xattr_entry *tmp = NULL; - int namelen = strlen(name); - int size = sizeof(*oxe) + namelen + 1 + len; + size_t namelen = strlen(name); + size_t size = sizeof(*oxe) + namelen + 1 + len; LASSERT(ooa != NULL); @@ -163,14 +163,14 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len) static struct osp_xattr_entry * osp_oac_xattr_replace(struct osp_object *obj, - struct osp_xattr_entry **poxe, int len) + struct osp_xattr_entry **poxe, size_t len) { struct osp_object_attr *ooa = obj->opo_ooa; struct osp_xattr_entry *old = *poxe; struct osp_xattr_entry *oxe; struct osp_xattr_entry *tmp = NULL; - int namelen = old->oxe_namelen; - int size = sizeof(*oxe) + namelen + 1 + len; + size_t namelen = old->oxe_namelen; + size_t size = sizeof(*oxe) + namelen + 1 + len; LASSERT(ooa != NULL); @@ -207,7 +207,8 @@ static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe) } static int osp_get_attr_from_reply(const struct lu_env *env, - struct update_reply *reply, + struct object_update_reply *reply, + struct ptlrpc_request *req, struct lu_attr *attr, struct osp_object *obj, int index) { @@ -215,9 +216,9 @@ static int osp_get_attr_from_reply(const struct lu_env *env, struct lu_buf *rbuf = &osi->osi_lb2; struct obdo *lobdo = &osi->osi_obdo; struct obdo *wobdo; - int rc; + int rc; - rc = update_get_reply_buf(reply, rbuf, index); + rc = object_update_result_data_get(reply, rbuf, index); if (rc < 0) return rc; @@ -225,7 +226,10 @@ static int osp_get_attr_from_reply(const struct lu_env *env, if (rbuf->lb_len != sizeof(*wobdo)) return -EPROTO; - obdo_le_to_cpu(wobdo, wobdo); + LASSERT(req != NULL); + if (ptlrpc_req_need_swab(req)) + lustre_swab_obdo(wobdo); + lustre_get_wire_obdo(NULL, lobdo, wobdo); spin_lock(&obj->opo_lock); if (obj->opo_ooa != NULL) { @@ -243,7 +247,8 @@ static int osp_get_attr_from_reply(const struct lu_env *env, } static int osp_attr_get_interpterer(const struct lu_env *env, - struct update_reply *reply, + struct object_update_reply *reply, + struct ptlrpc_request *req, struct osp_object *obj, void *data, int index, int rc) { @@ -255,7 +260,8 @@ static int osp_attr_get_interpterer(const struct lu_env *env, osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS; obj->opo_non_exist = 0; - return osp_get_attr_from_reply(env, reply, NULL, obj, index); + return osp_get_attr_from_reply(env, reply, req, NULL, obj, + index); } else { if (rc == -ENOENT) { osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS; @@ -275,7 +281,6 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, { struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct update_request *update; int rc = 0; if (obj->opo_ooa == NULL) { @@ -285,14 +290,9 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, } mutex_lock(&osp->opd_async_requests_mutex); - update = osp_find_or_create_async_update_request(osp); - if (IS_ERR(update)) - rc = PTR_ERR(update); - else - rc = osp_insert_async_update(env, update, OBJ_ATTR_GET, obj, 0, - NULL, NULL, - &obj->opo_ooa->ooa_attr, - osp_attr_get_interpterer); + rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL, + &obj->opo_ooa->ooa_attr, + osp_attr_get_interpterer); mutex_unlock(&osp->opd_async_requests_mutex); return rc; @@ -301,13 +301,13 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, int osp_attr_get(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct lustre_capa *capa) { - struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct osp_object *obj = dt2osp_obj(dt); - struct dt_device *dev = &osp->opd_dt_dev; - struct update_request *update; - struct update_reply *reply; - struct ptlrpc_request *req = NULL; - int rc = 0; + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_object *obj = dt2osp_obj(dt); + struct dt_device *dev = &osp->opd_dt_dev; + struct dt_update_request *update; + struct object_update_reply *reply; + struct ptlrpc_request *req = NULL; + int rc = 0; ENTRY; if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist) @@ -328,7 +328,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OBJ_ATTR_GET, + rc = out_insert_update(env, update, OUT_ATTR_GET, lu_object_fid(&dt->do_lu), 0, NULL, NULL); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", @@ -354,22 +354,16 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS; obj->opo_non_exist = 0; - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1) + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) GOTO(out, rc = -EPROTO); - rc = osp_get_attr_from_reply(env, reply, attr, obj, 0); + rc = osp_get_attr_from_reply(env, reply, req, attr, obj, 0); if (rc != 0) GOTO(out, rc); - if (!is_ost_obj(&dt->do_lu)) { - if (attr->la_flags == 1) - obj->opo_empty = 0; - else - obj->opo_empty = 1; - } - GOTO(out, rc = 0); out: @@ -431,7 +425,7 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, if (!(attr->la_valid & (LA_UID | LA_GID))) RETURN(0); - if (!is_remote_trans(th)) + if (!is_only_remote_trans(th)) /* * track all UID/GID changes via llog */ @@ -482,7 +476,7 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(0); } - if (!is_remote_trans(th)) + if (!is_only_remote_trans(th)) /* * once transaction is committed put proper command on * the queue going to our OST @@ -498,7 +492,8 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, } static int osp_xattr_get_interpterer(const struct lu_env *env, - struct update_reply *reply, + struct object_update_reply *reply, + struct ptlrpc_request *req, struct osp_object *obj, void *data, int index, int rc) { @@ -509,9 +504,9 @@ static int osp_xattr_get_interpterer(const struct lu_env *env, LASSERT(ooa != NULL); if (rc == 0) { - int len = sizeof(*oxe) + oxe->oxe_namelen + 1; + size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1; - rc = update_get_reply_buf(reply, rbuf, index); + rc = object_update_result_data_get(reply, rbuf, index); if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) { spin_lock(&obj->opo_lock); oxe->oxe_ready = 0; @@ -549,7 +544,6 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, { struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct update_request *update; struct osp_xattr_entry *oxe; int namelen = strlen(name); int rc = 0; @@ -572,33 +566,28 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, return -ENOMEM; mutex_lock(&osp->opd_async_requests_mutex); - update = osp_find_or_create_async_update_request(osp); - if (IS_ERR(update)) { - rc = PTR_ERR(update); + rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1, + &namelen, &name, oxe, + osp_xattr_get_interpterer); + if (rc != 0) { mutex_unlock(&osp->opd_async_requests_mutex); osp_oac_xattr_put(oxe); } else { - rc = osp_insert_async_update(env, update, OBJ_XATTR_GET, obj, - 1, &namelen, &name, oxe, - osp_xattr_get_interpterer); - if (rc != 0) { + struct dt_update_request *update; + + /* XXX: Currently, we trigger the batched async OUT + * RPC via dt_declare_xattr_get(). It is not + * perfect solution, but works well now. + * + * We will improve it in the future. */ + update = osp->opd_async_requests; + if (update != NULL && update->dur_req != NULL && + update->dur_req->ourq_count > 0) { + osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); - osp_oac_xattr_put(oxe); + rc = osp_unplug_async_request(env, osp, update); } else { - /* XXX: Currently, we trigger the batched async OUT - * RPC via dt_declare_xattr_get(). It is not - * perfect solution, but works well now. - * - * We will improve it in the future. */ - update = osp->opd_async_requests; - if (update != NULL && update->ur_buf != NULL && - update->ur_buf->ub_count > 0) { - osp->opd_async_requests = NULL; - mutex_unlock(&osp->opd_async_requests_mutex); - rc = osp_unplug_async_update(env, osp, update); - } else { - mutex_unlock(&osp->opd_async_requests_mutex); - } + mutex_unlock(&osp->opd_async_requests_mutex); } } @@ -613,9 +602,9 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, struct osp_object *obj = dt2osp_obj(dt); struct dt_device *dev = &osp->opd_dt_dev; struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; - struct update_request *update = NULL; + struct dt_update_request *update = NULL; struct ptlrpc_request *req = NULL; - struct update_reply *reply; + struct object_update_reply *reply; struct osp_xattr_entry *oxe = NULL; const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name; int namelen; @@ -658,8 +647,8 @@ unlock: if (IS_ERR(update)) GOTO(out, rc = PTR_ERR(update)); - namelen = strlen(name); - rc = out_insert_update(env, update, OBJ_XATTR_GET, + namelen = strlen(name) + 1; + rc = out_insert_update(env, update, OUT_XATTR_GET, lu_object_fid(&dt->do_lu), 1, &namelen, &name); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", @@ -696,22 +685,21 @@ unlock: GOTO(out, rc); } - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply->ur_version != UPDATE_REPLY_V1) { + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n", - dname, reply->ur_version, UPDATE_REPLY_V1, + dname, reply->ourp_magic, UPDATE_REPLY_MAGIC, PFID(lu_object_fid(&dt->do_lu)), -EPROTO); GOTO(out, rc = -EPROTO); } - rc = update_get_reply_buf(reply, rbuf, 0); + rc = object_update_result_data_get(reply, rbuf, 0); if (rc < 0) GOTO(out, rc); - LASSERT(rbuf->lb_len > 0 && rbuf->lb_len < PAGE_CACHE_SIZE); - if (buf->lb_buf == NULL) GOTO(out, rc = rbuf->lb_len); @@ -746,7 +734,7 @@ unlock: "cache for "DFID": rc = %d\n", dname, name, PFID(lu_object_fid(&dt->do_lu)), rc); spin_lock(&obj->opo_lock); - oxe->oxe_ready = 0; + old->oxe_ready = 0; spin_unlock(&obj->opo_lock); GOTO(out, rc); @@ -783,7 +771,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, int flag, struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); - struct update_request *update; + struct dt_update_request *update; struct lu_fid *fid; struct osp_xattr_entry *oxe; int sizes[3] = {strlen(name), buf->lb_len, @@ -807,7 +795,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, bufs[2] = (char *)&flag; fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OBJ_XATTR_SET, fid, + rc = out_insert_update(env, update, OUT_XATTR_SET, fid, ARRAY_SIZE(sizes), sizes, (const char **)bufs); if (rc != 0 || o->opo_ooa == NULL) return rc; @@ -833,7 +821,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, name, PFID(lu_object_fid(&dt->do_lu)), rc); spin_lock(&o->opo_lock); - oxe->oxe_ready = 0; + old->oxe_ready = 0; spin_unlock(&o->opo_lock); return 0; @@ -864,6 +852,36 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, return 0; } +int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) +{ + struct dt_update_request *update; + const struct lu_fid *fid; + int size = strlen(name); + int rc; + + update = out_find_create_update_loc(th, dt); + if (IS_ERR(update)) + return PTR_ERR(update); + + fid = lu_object_fid(&dt->do_lu); + + rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size, + (const char **)&name); + + return rc; +} + +int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th, + struct lustre_capa *capa) +{ + CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name, + PFID(&dt->do_lu.lo_header->loh_fid)); + + return 0; +} + static int osp_declare_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -879,7 +897,7 @@ static int osp_declare_object_create(const struct lu_env *env, ENTRY; - if (is_remote_trans(th)) { + if (is_only_remote_trans(th)) { LASSERT(fid_is_sane(fid)); rc = osp_md_declare_object_create(env, dt, attr, hint, dof, th); @@ -906,9 +924,10 @@ static int osp_declare_object_create(const struct lu_env *env, if (unlikely(!fid_is_zero(fid))) { /* replay case: caller knows fid */ osi->osi_off = sizeof(osi->osi_id) * d->opd_index; + osi->osi_lb.lb_len = sizeof(osi->osi_id); + osi->osi_lb.lb_buf = NULL; rc = dt_declare_record_write(env, d->opd_last_used_oid_file, - sizeof(osi->osi_id), osi->osi_off, - th); + &osi->osi_lb, osi->osi_off, th); RETURN(rc); } @@ -930,9 +949,10 @@ static int osp_declare_object_create(const struct lu_env *env, /* common for all OSPs file hystorically */ osi->osi_off = sizeof(osi->osi_id) * d->opd_index; + osi->osi_lb.lb_len = sizeof(osi->osi_id); + osi->osi_lb.lb_buf = NULL; rc = dt_declare_record_write(env, d->opd_last_used_oid_file, - sizeof(osi->osi_id), osi->osi_off, - th); + &osi->osi_lb, osi->osi_off, th); } else { /* not needed in the cache anymore */ set_bit(LU_OBJECT_HEARD_BANSHEE, @@ -953,7 +973,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_fid *fid = &osi->osi_fid; ENTRY; - if (is_remote_trans(th)) { + if (is_only_remote_trans(th)) { LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu))); rc = osp_md_object_create(env, dt, attr, hint, dof, th); @@ -1075,6 +1095,424 @@ int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static int osp_orphan_index_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, + const struct dt_key *key, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ignore_quota) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt, + __u32 attr, struct lustre_capa *capa) +{ + struct osp_it *it; + + OBD_ALLOC_PTR(it); + if (it == NULL) + return ERR_PTR(-ENOMEM); + + it->ooi_pos_ent = -1; + it->ooi_obj = dt; + + return (struct dt_it *)it; +} + +void osp_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct page **pages = it->ooi_pages; + int npages = it->ooi_total_npages; + int i; + + if (pages != NULL) { + for (i = 0; i < npages; i++) { + if (pages[i] != NULL) { + if (pages[i] == it->ooi_cur_page) { + kunmap(pages[i]); + it->ooi_cur_page = NULL; + } + __free_page(pages[i]); + } + } + OBD_FREE(pages, npages * sizeof(*pages)); + } + OBD_FREE_PTR(it); +} + +static int osp_it_fetch(const struct lu_env *env, struct osp_it *it) +{ + struct lu_device *dev = it->ooi_obj->do_lu.lo_dev; + struct osp_device *osp = lu2osp_dev(dev); + struct page **pages; + struct ptlrpc_request *req = NULL; + struct ptlrpc_bulk_desc *desc; + struct idx_info *ii; + int npages; + int rc; + int i; + ENTRY; + + /* 1MB bulk */ + npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20); + npages /= PAGE_CACHE_SIZE; + + OBD_ALLOC(pages, npages * sizeof(*pages)); + if (pages == NULL) + RETURN(-ENOMEM); + + it->ooi_pages = pages; + it->ooi_total_npages = npages; + for (i = 0; i < npages; i++) { + pages[i] = alloc_page(GFP_IOFS); + if (pages[i] == NULL) + RETURN(-ENOMEM); + } + + req = ptlrpc_request_alloc(osp->opd_obd->u.cli.cl_import, + &RQF_OBD_IDX_READ); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + req->rq_request_portal = OUT_PORTAL; + ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO); + memset(ii, 0, sizeof(*ii)); + if (fid_is_last_id(lu_object_fid(&it->ooi_obj->do_lu))) { + /* LFSCK will iterate orphan object[FID_SEQ_LAYOUT_BTREE, + * ost_index, 0] with LAST_ID FID, so it needs to replace + * the FID with orphan FID here */ + ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE; + ii->ii_fid.f_oid = osp->opd_index; + ii->ii_fid.f_ver = 0; + ii->ii_flags = II_FL_NOHASH; + } else { + ii->ii_fid = *lu_object_fid(&it->ooi_obj->do_lu); + ii->ii_flags = II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY | + II_FL_VARREC; + } + ii->ii_magic = IDX_INFO_MAGIC; + ii->ii_count = npages * LU_PAGE_COUNT; + ii->ii_hash_start = it->ooi_next; + ii->ii_attrs = + osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; + + ptlrpc_at_set_req_timeout(req); + + desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, + MDS_BULK_PORTAL); + if (desc == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } + + for (i = 0; i < npages; i++) + ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); + + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); + rc = 0; + + ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO); + if (ii->ii_magic != IDX_INFO_MAGIC) + GOTO(out, rc = -EPROTO); + + npages = (ii->ii_count + LU_PAGE_COUNT - 1) >> + (PAGE_CACHE_SHIFT - LU_PAGE_SHIFT); + if (npages > it->ooi_total_npages) { + CERROR("%s: returned more pages than expected, %u > %u\n", + osp->opd_obd->obd_name, npages, it->ooi_total_npages); + GOTO(out, rc = -EINVAL); + } + + it->ooi_valid_npages = npages; + if (ptlrpc_rep_need_swab(req)) + it->ooi_swab = 1; + + it->ooi_next = ii->ii_hash_end; + +out: + ptlrpc_req_finished(req); + + return rc; +} + +int osp_it_next_page(const struct lu_env *env, struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + struct page **pages; + int rc; + int i; + ENTRY; + +again2: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + if (it->ooi_pos_ent < idxpage->lip_nr) { + CDEBUG(D_INFO, "ooi_pos %d nr %d\n", + (int)it->ooi_pos_ent, (int)idxpage->lip_nr); + RETURN(0); + } + it->ooi_cur_idxpage = NULL; + it->ooi_pos_lu_page++; +again1: + if (it->ooi_pos_lu_page < LU_PAGE_COUNT) { + it->ooi_cur_idxpage = (void *)it->ooi_cur_page + + LU_PAGE_SIZE * it->ooi_pos_lu_page; + if (it->ooi_swab) + lustre_swab_lip_header(it->ooi_cur_idxpage); + if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) { + struct osp_device *osp = + lu2osp_dev(it->ooi_obj->do_lu.lo_dev); + + CERROR("%s: invalid magic (%x != %x) for page " + "%d/%d while read layout orphan index\n", + osp->opd_obd->obd_name, + it->ooi_cur_idxpage->lip_magic, + LIP_MAGIC, it->ooi_pos_page, + it->ooi_pos_lu_page); + /* Skip this lu_page next time. */ + it->ooi_pos_ent = idxpage->lip_nr - 1; + RETURN(-EINVAL); + } + it->ooi_pos_ent = -1; + goto again2; + } + + kunmap(it->ooi_cur_page); + it->ooi_cur_page = NULL; + it->ooi_pos_page++; + +again0: + pages = it->ooi_pages; + if (it->ooi_pos_page < it->ooi_valid_npages) { + it->ooi_cur_page = kmap(pages[it->ooi_pos_page]); + it->ooi_pos_lu_page = 0; + goto again1; + } + + for (i = 0; i < it->ooi_total_npages; i++) { + if (pages[i] != NULL) + __free_page(pages[i]); + } + OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages)); + + it->ooi_pos_page = 0; + it->ooi_total_npages = 0; + it->ooi_valid_npages = 0; + it->ooi_swab = 0; + it->ooi_ent = NULL; + it->ooi_cur_page = NULL; + it->ooi_cur_idxpage = NULL; + it->ooi_pages = NULL; + } + + if (it->ooi_next == II_END_OFF) + RETURN(1); + + rc = osp_it_fetch(env, it); + if (rc == 0) + goto again0; + + RETURN(rc); +} + +int osp_orphan_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + int rc; + ENTRY; + +again: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos_ent++; + if (it->ooi_pos_ent < idxpage->lip_nr) { + it->ooi_ent = + (struct lu_orphan_ent *)idxpage->lip_entries + + it->ooi_pos_ent; + if (it->ooi_swab) + lustre_swab_orphan_ent(it->ooi_ent); + RETURN(0); + } + } + + rc = osp_it_next_page(env, di); + if (rc == 0) + goto again; + + RETURN(rc); +} + +int osp_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + return 1; +} + +void osp_it_put(const struct lu_env *env, struct dt_it *di) +{ +} + +struct dt_key *osp_orphan_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_orphan_ent *ent = (struct lu_orphan_ent *)it->ooi_ent; + + if (likely(ent != NULL)) + return (struct dt_key *)(&ent->loe_key); + + return NULL; +} + +int osp_orphan_it_key_size(const struct lu_env *env, const struct dt_it *di) +{ + return sizeof(struct lu_fid); +} + +int osp_orphan_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_orphan_ent *ent = (struct lu_orphan_ent *)it->ooi_ent; + + if (likely(ent != NULL)) { + *(struct lu_orphan_rec *)rec = ent->loe_rec; + return 0; + } + + return -EINVAL; +} + +__u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + + return it->ooi_next; +} + +/** + * \retval +1: locate to the exactly position + * \retval 0: cannot locate to the exactly position, + * call next() to move to a valid position. + * \retval -ve: on error + */ +int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di, + __u64 hash) +{ + struct osp_it *it = (struct osp_it *)di; + int rc; + + it->ooi_next = hash; + rc = osp_orphan_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; +} + +int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void *key_rec) +{ + return 0; +} + +static const struct dt_index_operations osp_orphan_index_ops = { + .dio_lookup = osp_orphan_index_lookup, + .dio_declare_insert = osp_orphan_index_declare_insert, + .dio_insert = osp_orphan_index_insert, + .dio_declare_delete = osp_orphan_index_declare_delete, + .dio_delete = osp_orphan_index_delete, + .dio_it = { + .init = osp_it_init, + .fini = osp_it_fini, + .next = osp_orphan_it_next, + .get = osp_it_get, + .put = osp_it_put, + .key = osp_orphan_it_key, + .key_size = osp_orphan_it_key_size, + .rec = osp_orphan_it_rec, + .store = osp_it_store, + .load = osp_orphan_it_load, + .key_rec = osp_it_key_rec, + } +}; + +static int osp_index_try(const struct lu_env *env, + struct dt_object *dt, + const struct dt_index_features *feat) +{ + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + + if (fid_is_last_id(fid) && fid_is_idif(fid)) + dt->do_index_ops = &osp_orphan_index_ops; + else + dt->do_index_ops = &osp_md_index_ops; + return 0; +} + struct dt_object_operations osp_obj_ops = { .do_declare_attr_get = osp_declare_attr_get, .do_attr_get = osp_attr_get, @@ -1088,6 +1526,7 @@ struct dt_object_operations osp_obj_ops = { .do_create = osp_object_create, .do_declare_destroy = osp_declare_object_destroy, .do_destroy = osp_object_destroy, + .do_index_try = osp_index_try, }; static int osp_object_init(const struct lu_env *env, struct lu_object *o, @@ -1106,6 +1545,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o, struct lu_attr *la = &osp_env_info(env)->osi_attr; po->opo_obj.do_ops = &osp_md_obj_ops; + po->opo_obj.do_body_ops = &osp_md_body_ops; rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o), la, NULL); if (rc == 0) @@ -1139,7 +1579,7 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o) count = atomic_read(&oxe->oxe_ref); LASSERTF(count == 1, "Still has %d users on the xattr entry %.*s\n", - count - 1, oxe->oxe_namelen, oxe->oxe_buf); + count-1, (int)oxe->oxe_namelen, oxe->oxe_buf); OBD_FREE(oxe, oxe->oxe_buflen); }