X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_object.c;h=efd2a3ddbcb23031214c8e6db4def39f1b92b7ef;hb=75752e918b942000e24c979edcb3eed415dc5197;hp=c9e1e844ab538670bb0a7c2a52f7929e861e80be;hpb=46b927d45eb2ee5db3e35df2a0ade4c11ba9f345;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index c9e1e84..efd2a3d 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -207,7 +207,8 @@ static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe) } static int osp_get_attr_from_reply(const struct lu_env *env, - struct update_reply *reply, + struct object_update_reply *reply, + struct ptlrpc_request *req, struct lu_attr *attr, struct osp_object *obj, int index) { @@ -215,9 +216,9 @@ static int osp_get_attr_from_reply(const struct lu_env *env, struct lu_buf *rbuf = &osi->osi_lb2; struct obdo *lobdo = &osi->osi_obdo; struct obdo *wobdo; - int rc; + int rc; - rc = update_get_reply_buf(reply, rbuf, index); + rc = object_update_result_data_get(reply, rbuf, index); if (rc < 0) return rc; @@ -225,7 +226,10 @@ static int osp_get_attr_from_reply(const struct lu_env *env, if (rbuf->lb_len != sizeof(*wobdo)) return -EPROTO; - obdo_le_to_cpu(wobdo, wobdo); + LASSERT(req != NULL); + if (ptlrpc_req_need_swab(req)) + lustre_swab_obdo(wobdo); + lustre_get_wire_obdo(NULL, lobdo, wobdo); spin_lock(&obj->opo_lock); if (obj->opo_ooa != NULL) { @@ -243,7 +247,8 @@ static int osp_get_attr_from_reply(const struct lu_env *env, } static int osp_attr_get_interpterer(const struct lu_env *env, - struct update_reply *reply, + struct object_update_reply *reply, + struct ptlrpc_request *req, struct osp_object *obj, void *data, int index, int rc) { @@ -255,7 +260,8 @@ static int osp_attr_get_interpterer(const struct lu_env *env, osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS; obj->opo_non_exist = 0; - return osp_get_attr_from_reply(env, reply, NULL, obj, index); + return osp_get_attr_from_reply(env, reply, req, NULL, obj, + index); } else { if (rc == -ENOENT) { osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS; @@ -275,7 +281,7 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, { struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct update_request *update; + struct dt_update_request *update; int rc = 0; if (obj->opo_ooa == NULL) { @@ -289,7 +295,7 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) rc = PTR_ERR(update); else - rc = osp_insert_async_update(env, update, OBJ_ATTR_GET, obj, 0, + rc = osp_insert_async_update(env, update, OUT_ATTR_GET, obj, 0, NULL, NULL, &obj->opo_ooa->ooa_attr, osp_attr_get_interpterer); @@ -301,13 +307,13 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt, int osp_attr_get(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct lustre_capa *capa) { - struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct osp_object *obj = dt2osp_obj(dt); - struct dt_device *dev = &osp->opd_dt_dev; - struct update_request *update; - struct update_reply *reply; - struct ptlrpc_request *req = NULL; - int rc = 0; + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_object *obj = dt2osp_obj(dt); + struct dt_device *dev = &osp->opd_dt_dev; + struct dt_update_request *update; + struct object_update_reply *reply; + struct ptlrpc_request *req = NULL; + int rc = 0; ENTRY; if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist) @@ -328,7 +334,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OBJ_ATTR_GET, + rc = out_insert_update(env, update, OUT_ATTR_GET, lu_object_fid(&dt->do_lu), 0, NULL, NULL); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", @@ -354,22 +360,16 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS; obj->opo_non_exist = 0; - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1) + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) GOTO(out, rc = -EPROTO); - rc = osp_get_attr_from_reply(env, reply, attr, obj, 0); + rc = osp_get_attr_from_reply(env, reply, req, attr, obj, 0); if (rc != 0) GOTO(out, rc); - if (!is_ost_obj(&dt->do_lu)) { - if (attr->la_flags == 1) - obj->opo_empty = 0; - else - obj->opo_empty = 1; - } - GOTO(out, rc = 0); out: @@ -431,10 +431,16 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, if (!(attr->la_valid & (LA_UID | LA_GID))) RETURN(0); - /* - * track all UID/GID changes via llog - */ - rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th); + if (!is_only_remote_trans(th)) + /* + * track all UID/GID changes via llog + */ + rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th); + else + /* It is for OST-object attr_set directly without updating + * local MDT-object attribute. It is usually used by LFSCK. */ + rc = osp_md_declare_attr_set(env, dt, attr, th); + if (rc != 0 || o->opo_ooa == NULL) RETURN(rc); @@ -476,19 +482,24 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(0); } - /* - * once transaction is committed put proper command on - * the queue going to our OST - */ - rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); - - /* XXX: send new uid/gid to OST ASAP? */ + if (!is_only_remote_trans(th)) + /* + * once transaction is committed put proper command on + * the queue going to our OST + */ + rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); + /* XXX: send new uid/gid to OST ASAP? */ + else + /* It is for OST-object attr_set directly without updating + * local MDT-object attribute. It is usually used by LFSCK. */ + rc = osp_md_attr_set(env, dt, attr, th, capa); RETURN(rc); } static int osp_xattr_get_interpterer(const struct lu_env *env, - struct update_reply *reply, + struct object_update_reply *reply, + struct ptlrpc_request *req, struct osp_object *obj, void *data, int index, int rc) { @@ -501,7 +512,7 @@ static int osp_xattr_get_interpterer(const struct lu_env *env, if (rc == 0) { int len = sizeof(*oxe) + oxe->oxe_namelen + 1; - rc = update_get_reply_buf(reply, rbuf, index); + rc = object_update_result_data_get(reply, rbuf, index); if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) { spin_lock(&obj->opo_lock); oxe->oxe_ready = 0; @@ -539,7 +550,7 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, { struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct update_request *update; + struct dt_update_request *update; struct osp_xattr_entry *oxe; int namelen = strlen(name); int rc = 0; @@ -568,7 +579,7 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, mutex_unlock(&osp->opd_async_requests_mutex); osp_oac_xattr_put(oxe); } else { - rc = osp_insert_async_update(env, update, OBJ_XATTR_GET, obj, + rc = osp_insert_async_update(env, update, OUT_XATTR_GET, obj, 1, &namelen, &name, oxe, osp_xattr_get_interpterer); if (rc != 0) { @@ -581,8 +592,8 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, * * We will improve it in the future. */ update = osp->opd_async_requests; - if (update != NULL && update->ur_buf != NULL && - update->ur_buf->ub_count > 0) { + if (update != NULL && update->dur_req != NULL && + update->dur_req->ourq_count > 0) { osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); rc = osp_unplug_async_update(env, osp, update); @@ -603,9 +614,9 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, struct osp_object *obj = dt2osp_obj(dt); struct dt_device *dev = &osp->opd_dt_dev; struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; - struct update_request *update = NULL; + struct dt_update_request *update = NULL; struct ptlrpc_request *req = NULL; - struct update_reply *reply; + struct object_update_reply *reply; struct osp_xattr_entry *oxe = NULL; const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name; int namelen; @@ -648,8 +659,8 @@ unlock: if (IS_ERR(update)) GOTO(out, rc = PTR_ERR(update)); - namelen = strlen(name); - rc = out_insert_update(env, update, OBJ_XATTR_GET, + namelen = strlen(name) + 1; + rc = out_insert_update(env, update, OUT_XATTR_GET, lu_object_fid(&dt->do_lu), 1, &namelen, &name); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", @@ -686,22 +697,21 @@ unlock: GOTO(out, rc); } - reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY, - UPDATE_BUFFER_SIZE); - if (reply->ur_version != UPDATE_REPLY_V1) { + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n", - dname, reply->ur_version, UPDATE_REPLY_V1, + dname, reply->ourp_magic, UPDATE_REPLY_MAGIC, PFID(lu_object_fid(&dt->do_lu)), -EPROTO); GOTO(out, rc = -EPROTO); } - rc = update_get_reply_buf(reply, rbuf, 0); + rc = object_update_result_data_get(reply, rbuf, 0); if (rc < 0) GOTO(out, rc); - LASSERT(rbuf->lb_len > 0 && rbuf->lb_len < PAGE_CACHE_SIZE); - if (buf->lb_buf == NULL) GOTO(out, rc = rbuf->lb_len); @@ -736,7 +746,7 @@ unlock: "cache for "DFID": rc = %d\n", dname, name, PFID(lu_object_fid(&dt->do_lu)), rc); spin_lock(&obj->opo_lock); - oxe->oxe_ready = 0; + old->oxe_ready = 0; spin_unlock(&obj->opo_lock); GOTO(out, rc); @@ -773,7 +783,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, int flag, struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); - struct update_request *update; + struct dt_update_request *update; struct lu_fid *fid; struct osp_xattr_entry *oxe; int sizes[3] = {strlen(name), buf->lb_len, @@ -797,7 +807,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, bufs[2] = (char *)&flag; fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OBJ_XATTR_SET, fid, + rc = out_insert_update(env, update, OUT_XATTR_SET, fid, ARRAY_SIZE(sizes), sizes, (const char **)bufs); if (rc != 0 || o->opo_ooa == NULL) return rc; @@ -823,7 +833,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, name, PFID(lu_object_fid(&dt->do_lu)), rc); spin_lock(&o->opo_lock); - oxe->oxe_ready = 0; + old->oxe_ready = 0; spin_unlock(&o->opo_lock); return 0; @@ -854,6 +864,36 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, return 0; } +int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) +{ + struct dt_update_request *update; + const struct lu_fid *fid; + int size = strlen(name); + int rc; + + update = out_find_create_update_loc(th, dt); + if (IS_ERR(update)) + return PTR_ERR(update); + + fid = lu_object_fid(&dt->do_lu); + + rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size, + (const char **)&name); + + return rc; +} + +int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th, + struct lustre_capa *capa) +{ + CDEBUG(D_INFO, "xattr %s del object "DFID"\n", name, + PFID(&dt->do_lu.lo_header->loh_fid)); + + return 0; +} + static int osp_declare_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -869,7 +909,7 @@ static int osp_declare_object_create(const struct lu_env *env, ENTRY; - if (is_remote_trans(th)) { + if (is_only_remote_trans(th)) { LASSERT(fid_is_sane(fid)); rc = osp_md_declare_object_create(env, dt, attr, hint, dof, th); @@ -896,9 +936,10 @@ static int osp_declare_object_create(const struct lu_env *env, if (unlikely(!fid_is_zero(fid))) { /* replay case: caller knows fid */ osi->osi_off = sizeof(osi->osi_id) * d->opd_index; + osi->osi_lb.lb_len = sizeof(osi->osi_id); + osi->osi_lb.lb_buf = NULL; rc = dt_declare_record_write(env, d->opd_last_used_oid_file, - sizeof(osi->osi_id), osi->osi_off, - th); + &osi->osi_lb, osi->osi_off, th); RETURN(rc); } @@ -920,9 +961,10 @@ static int osp_declare_object_create(const struct lu_env *env, /* common for all OSPs file hystorically */ osi->osi_off = sizeof(osi->osi_id) * d->opd_index; + osi->osi_lb.lb_len = sizeof(osi->osi_id); + osi->osi_lb.lb_buf = NULL; rc = dt_declare_record_write(env, d->opd_last_used_oid_file, - sizeof(osi->osi_id), osi->osi_off, - th); + &osi->osi_lb, osi->osi_off, th); } else { /* not needed in the cache anymore */ set_bit(LU_OBJECT_HEARD_BANSHEE, @@ -943,7 +985,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_fid *fid = &osi->osi_fid; ENTRY; - if (is_remote_trans(th)) { + if (is_only_remote_trans(th)) { LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu))); rc = osp_md_object_create(env, dt, attr, hint, dof, th); @@ -1065,6 +1107,424 @@ int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +static int osp_orphan_index_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, + const struct dt_key *key, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ignore_quota) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt, + __u32 attr, struct lustre_capa *capa) +{ + struct osp_it *it; + + OBD_ALLOC_PTR(it); + if (it == NULL) + return ERR_PTR(-ENOMEM); + + it->ooi_pos_ent = -1; + it->ooi_obj = dt; + + return (struct dt_it *)it; +} + +void osp_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct page **pages = it->ooi_pages; + int npages = it->ooi_total_npages; + int i; + + if (pages != NULL) { + for (i = 0; i < npages; i++) { + if (pages[i] != NULL) { + if (pages[i] == it->ooi_cur_page) { + kunmap(pages[i]); + it->ooi_cur_page = NULL; + } + __free_page(pages[i]); + } + } + OBD_FREE(pages, npages * sizeof(*pages)); + } + OBD_FREE_PTR(it); +} + +static int osp_it_fetch(const struct lu_env *env, struct osp_it *it) +{ + struct lu_device *dev = it->ooi_obj->do_lu.lo_dev; + struct osp_device *osp = lu2osp_dev(dev); + struct page **pages; + struct ptlrpc_request *req = NULL; + struct ptlrpc_bulk_desc *desc; + struct idx_info *ii; + int npages; + int rc; + int i; + ENTRY; + + /* 1MB bulk */ + npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20); + npages /= PAGE_CACHE_SIZE; + + OBD_ALLOC(pages, npages * sizeof(*pages)); + if (pages == NULL) + RETURN(-ENOMEM); + + it->ooi_pages = pages; + it->ooi_total_npages = npages; + for (i = 0; i < npages; i++) { + pages[i] = alloc_page(GFP_IOFS); + if (pages[i] == NULL) + RETURN(-ENOMEM); + } + + req = ptlrpc_request_alloc(osp->opd_obd->u.cli.cl_import, + &RQF_OBD_IDX_READ); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + req->rq_request_portal = OUT_PORTAL; + ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO); + memset(ii, 0, sizeof(*ii)); + if (fid_is_last_id(lu_object_fid(&it->ooi_obj->do_lu))) { + /* LFSCK will iterate orphan object[FID_SEQ_LAYOUT_BTREE, + * ost_index, 0] with LAST_ID FID, so it needs to replace + * the FID with orphan FID here */ + ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE; + ii->ii_fid.f_oid = osp->opd_index; + ii->ii_fid.f_ver = 0; + ii->ii_flags = II_FL_NOHASH; + } else { + ii->ii_fid = *lu_object_fid(&it->ooi_obj->do_lu); + ii->ii_flags = II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY | + II_FL_VARREC; + } + ii->ii_magic = IDX_INFO_MAGIC; + ii->ii_count = npages * LU_PAGE_COUNT; + ii->ii_hash_start = it->ooi_next; + ii->ii_attrs = + osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; + + ptlrpc_at_set_req_timeout(req); + + desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, + MDS_BULK_PORTAL); + if (desc == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } + + for (i = 0; i < npages; i++) + ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); + + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); + rc = 0; + + ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO); + if (ii->ii_magic != IDX_INFO_MAGIC) + GOTO(out, rc = -EPROTO); + + npages = (ii->ii_count + LU_PAGE_COUNT - 1) >> + (PAGE_CACHE_SHIFT - LU_PAGE_SHIFT); + if (npages > it->ooi_total_npages) { + CERROR("%s: returned more pages than expected, %u > %u\n", + osp->opd_obd->obd_name, npages, it->ooi_total_npages); + GOTO(out, rc = -EINVAL); + } + + it->ooi_valid_npages = npages; + if (ptlrpc_rep_need_swab(req)) + it->ooi_swab = 1; + + it->ooi_next = ii->ii_hash_end; + +out: + ptlrpc_req_finished(req); + + return rc; +} + +int osp_it_next_page(const struct lu_env *env, struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + struct page **pages; + int rc; + int i; + ENTRY; + +again2: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + if (it->ooi_pos_ent < idxpage->lip_nr) { + CDEBUG(D_INFO, "ooi_pos %d nr %d\n", + (int)it->ooi_pos_ent, (int)idxpage->lip_nr); + RETURN(0); + } + it->ooi_cur_idxpage = NULL; + it->ooi_pos_lu_page++; +again1: + if (it->ooi_pos_lu_page < LU_PAGE_COUNT) { + it->ooi_cur_idxpage = (void *)it->ooi_cur_page + + LU_PAGE_SIZE * it->ooi_pos_lu_page; + if (it->ooi_swab) + lustre_swab_lip_header(it->ooi_cur_idxpage); + if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) { + struct osp_device *osp = + lu2osp_dev(it->ooi_obj->do_lu.lo_dev); + + CERROR("%s: invalid magic (%x != %x) for page " + "%d/%d while read layout orphan index\n", + osp->opd_obd->obd_name, + it->ooi_cur_idxpage->lip_magic, + LIP_MAGIC, it->ooi_pos_page, + it->ooi_pos_lu_page); + /* Skip this lu_page next time. */ + it->ooi_pos_ent = idxpage->lip_nr - 1; + RETURN(-EINVAL); + } + it->ooi_pos_ent = -1; + goto again2; + } + + kunmap(it->ooi_cur_page); + it->ooi_cur_page = NULL; + it->ooi_pos_page++; + +again0: + pages = it->ooi_pages; + if (it->ooi_pos_page < it->ooi_valid_npages) { + it->ooi_cur_page = kmap(pages[it->ooi_pos_page]); + it->ooi_pos_lu_page = 0; + goto again1; + } + + for (i = 0; i < it->ooi_total_npages; i++) { + if (pages[i] != NULL) + __free_page(pages[i]); + } + OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages)); + + it->ooi_pos_page = 0; + it->ooi_total_npages = 0; + it->ooi_valid_npages = 0; + it->ooi_swab = 0; + it->ooi_ent = NULL; + it->ooi_cur_page = NULL; + it->ooi_cur_idxpage = NULL; + it->ooi_pages = NULL; + } + + if (it->ooi_next == II_END_OFF) + RETURN(1); + + rc = osp_it_fetch(env, it); + if (rc == 0) + goto again0; + + RETURN(rc); +} + +int osp_orphan_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + int rc; + ENTRY; + +again: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos_ent++; + if (it->ooi_pos_ent < idxpage->lip_nr) { + it->ooi_ent = + (struct lu_orphan_ent *)idxpage->lip_entries + + it->ooi_pos_ent; + if (it->ooi_swab) + lustre_swab_orphan_ent(it->ooi_ent); + RETURN(0); + } + } + + rc = osp_it_next_page(env, di); + if (rc == 0) + goto again; + + RETURN(rc); +} + +int osp_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + return 1; +} + +void osp_it_put(const struct lu_env *env, struct dt_it *di) +{ +} + +struct dt_key *osp_orphan_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_orphan_ent *ent = (struct lu_orphan_ent *)it->ooi_ent; + + if (likely(ent != NULL)) + return (struct dt_key *)(&ent->loe_key); + + return NULL; +} + +int osp_orphan_it_key_size(const struct lu_env *env, const struct dt_it *di) +{ + return sizeof(struct lu_fid); +} + +int osp_orphan_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + struct osp_it *it = (struct osp_it *)di; + struct lu_orphan_ent *ent = (struct lu_orphan_ent *)it->ooi_ent; + + if (likely(ent != NULL)) { + *(struct lu_orphan_rec *)rec = ent->loe_rec; + return 0; + } + + return -EINVAL; +} + +__u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) +{ + struct osp_it *it = (struct osp_it *)di; + + return it->ooi_next; +} + +/** + * \retval +1: locate to the exactly position + * \retval 0: cannot locate to the exactly position, + * call next() to move to a valid position. + * \retval -ve: on error + */ +int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di, + __u64 hash) +{ + struct osp_it *it = (struct osp_it *)di; + int rc; + + it->ooi_next = hash; + rc = osp_orphan_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; +} + +int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void *key_rec) +{ + return 0; +} + +static const struct dt_index_operations osp_orphan_index_ops = { + .dio_lookup = osp_orphan_index_lookup, + .dio_declare_insert = osp_orphan_index_declare_insert, + .dio_insert = osp_orphan_index_insert, + .dio_declare_delete = osp_orphan_index_declare_delete, + .dio_delete = osp_orphan_index_delete, + .dio_it = { + .init = osp_it_init, + .fini = osp_it_fini, + .next = osp_orphan_it_next, + .get = osp_it_get, + .put = osp_it_put, + .key = osp_orphan_it_key, + .key_size = osp_orphan_it_key_size, + .rec = osp_orphan_it_rec, + .store = osp_it_store, + .load = osp_orphan_it_load, + .key_rec = osp_it_key_rec, + } +}; + +static int osp_index_try(const struct lu_env *env, + struct dt_object *dt, + const struct dt_index_features *feat) +{ + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + + if (fid_is_last_id(fid) && fid_is_idif(fid)) + dt->do_index_ops = &osp_orphan_index_ops; + else + dt->do_index_ops = &osp_md_index_ops; + return 0; +} + struct dt_object_operations osp_obj_ops = { .do_declare_attr_get = osp_declare_attr_get, .do_attr_get = osp_attr_get, @@ -1078,6 +1538,7 @@ struct dt_object_operations osp_obj_ops = { .do_create = osp_object_create, .do_declare_destroy = osp_declare_object_destroy, .do_destroy = osp_object_destroy, + .do_index_try = osp_index_try, }; static int osp_object_init(const struct lu_env *env, struct lu_object *o, @@ -1096,6 +1557,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o, struct lu_attr *la = &osp_env_info(env)->osi_attr; po->opo_obj.do_ops = &osp_md_obj_ops; + po->opo_obj.do_body_ops = &osp_md_body_ops; rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o), la, NULL); if (rc == 0)