X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_object.c;h=fae592f75a5fd9bba21f4dfb83194f921008e4d5;hb=77eea1985bb1655e58c8b7df00703b4f08b58ec7;hp=5ffb7e49c0e14c7153ff44112de536e98852735b;hpb=2c67b17fd183ef60baba74e914e96b9b292bfc39;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 5ffb7e4..fae592f 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -431,7 +431,7 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, if (!(attr->la_valid & (LA_UID | LA_GID))) RETURN(0); - if (!is_remote_trans(th)) + if (!is_only_remote_trans(th)) /* * track all UID/GID changes via llog */ @@ -482,7 +482,7 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(0); } - if (!is_remote_trans(th)) + if (!is_only_remote_trans(th)) /* * once transaction is committed put proper command on * the queue going to our OST @@ -879,7 +879,7 @@ static int osp_declare_object_create(const struct lu_env *env, ENTRY; - if (is_remote_trans(th)) { + if (is_only_remote_trans(th)) { LASSERT(fid_is_sane(fid)); rc = osp_md_declare_object_create(env, dt, attr, hint, dof, th); @@ -953,7 +953,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_fid *fid = &osi->osi_fid; ENTRY; - if (is_remote_trans(th)) { + if (is_only_remote_trans(th)) { LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu))); rc = osp_md_object_create(env, dt, attr, hint, dof, th); @@ -1075,6 +1075,418 @@ int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +struct osp_orphan_it { + int ooi_pos0; + int ooi_pos1; + int ooi_pos2; + int ooi_total_npages; + int ooi_valid_npages; + unsigned int ooi_swab:1; + __u64 ooi_next; + struct dt_object *ooi_obj; + struct lu_orphan_ent *ooi_ent; + struct page *ooi_cur_page; + struct lu_idxpage *ooi_cur_idxpage; + struct page **ooi_pages; +}; + +static int osp_orphan_index_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, + const struct dt_key *key, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ignore_quota) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle) +{ + return -EOPNOTSUPP; +} + +static int osp_orphan_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa) +{ + return -EOPNOTSUPP; +} + +static struct dt_it *osp_orphan_it_init(const struct lu_env *env, + struct dt_object *dt, + __u32 attr, + struct lustre_capa *capa) +{ + struct osp_orphan_it *it; + + OBD_ALLOC_PTR(it); + if (it == NULL) + return ERR_PTR(-ENOMEM); + + it->ooi_pos2 = -1; + it->ooi_obj = dt; + + return (struct dt_it *)it; +} + +static void osp_orphan_it_fini(const struct lu_env *env, + struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct page **pages = it->ooi_pages; + int npages = it->ooi_total_npages; + int i; + + if (pages != NULL) { + for (i = 0; i < npages; i++) { + if (pages[i] != NULL) { + if (pages[i] == it->ooi_cur_page) { + kunmap(pages[i]); + it->ooi_cur_page = NULL; + } + __free_page(pages[i]); + } + } + OBD_FREE(pages, npages * sizeof(*pages)); + } + OBD_FREE_PTR(it); +} + +static int osp_orphan_it_fetch(const struct lu_env *env, + struct osp_orphan_it *it) +{ + struct lu_device *dev = it->ooi_obj->do_lu.lo_dev; + struct osp_device *osp = lu2osp_dev(dev); + struct page **pages; + struct ptlrpc_request *req = NULL; + struct ptlrpc_bulk_desc *desc; + struct idx_info *ii; + int npages; + int rc; + int i; + ENTRY; + + /* 1MB bulk */ + npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20); + npages /= PAGE_CACHE_SIZE; + + OBD_ALLOC(pages, npages * sizeof(*pages)); + if (pages == NULL) + RETURN(-ENOMEM); + + it->ooi_pages = pages; + it->ooi_total_npages = npages; + for (i = 0; i < npages; i++) { + pages[i] = alloc_page(GFP_IOFS); + if (pages[i] == NULL) + RETURN(-ENOMEM); + } + + req = ptlrpc_request_alloc(osp->opd_obd->u.cli.cl_import, + &RQF_OBD_IDX_READ); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + req->rq_request_portal = OST_IDX_PORTAL; + ptlrpc_at_set_req_timeout(req); + + desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, + MDS_BULK_PORTAL); + if (desc == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } + + for (i = 0; i < npages; i++) + ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); + + ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO); + memset(ii, 0, sizeof(*ii)); + ii->ii_fid.f_seq = FID_SEQ_LAYOUT_RBTREE; + ii->ii_fid.f_oid = osp->opd_index; + ii->ii_fid.f_ver = 0; + ii->ii_magic = IDX_INFO_MAGIC; + ii->ii_flags = II_FL_NOHASH; + ii->ii_count = npages * LU_PAGE_COUNT; + ii->ii_hash_start = it->ooi_next; + ii->ii_attrs = + osp->opd_storage->dd_lu_dev.ld_site->ld_seq_site->ss_node_id; + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); + + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); + + ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO); + if (ii->ii_magic != IDX_INFO_MAGIC) + GOTO(out, rc = -EPROTO); + + npages = (ii->ii_count + LU_PAGE_COUNT - 1) >> + (PAGE_CACHE_SHIFT - LU_PAGE_SHIFT); + if (npages > it->ooi_total_npages) { + CERROR("%s: returned more pages than expected, %u > %u\n", + osp->opd_obd->obd_name, npages, it->ooi_total_npages); + GOTO(out, rc = -EINVAL); + } + + it->ooi_valid_npages = npages; + if (ptlrpc_rep_need_swab(req)) + it->ooi_swab = 1; + + it->ooi_next = ii->ii_hash_end; + + GOTO(out, rc = 0); + +out: + ptlrpc_req_finished(req); + + return rc; +} + +static int osp_orphan_it_next(const struct lu_env *env, + struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct lu_idxpage *idxpage; + struct page **pages; + int rc; + int i; + ENTRY; + +again2: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos2++; + if (it->ooi_pos2 < idxpage->lip_nr) { + it->ooi_ent = + (struct lu_orphan_ent *)idxpage->lip_entries + + it->ooi_pos2; + if (it->ooi_swab) + lustre_swab_orphan_ent(it->ooi_ent); + RETURN(0); + } + + it->ooi_cur_idxpage = NULL; + it->ooi_pos1++; + +again1: + if (it->ooi_pos1 < LU_PAGE_COUNT) { + it->ooi_cur_idxpage = (void *)it->ooi_cur_page + + LU_PAGE_SIZE * it->ooi_pos1; + if (it->ooi_swab) + lustre_swab_lip_header(it->ooi_cur_idxpage); + if (it->ooi_cur_idxpage->lip_magic != LIP_MAGIC) { + struct osp_device *osp = + lu2osp_dev(it->ooi_obj->do_lu.lo_dev); + + CERROR("%s: invalid magic (%x != %x) for page " + "%d/%d while read layout orphan index\n", + osp->opd_obd->obd_name, + it->ooi_cur_idxpage->lip_magic, + LIP_MAGIC, it->ooi_pos0, it->ooi_pos1); + /* Skip this lu_page next time. */ + it->ooi_pos2 = idxpage->lip_nr - 1; + RETURN(-EINVAL); + } + it->ooi_pos2 = -1; + goto again2; + } + + kunmap(it->ooi_cur_page); + it->ooi_cur_page = NULL; + it->ooi_pos0++; + +again0: + pages = it->ooi_pages; + if (it->ooi_pos0 < it->ooi_valid_npages) { + it->ooi_cur_page = kmap(pages[it->ooi_pos0]); + it->ooi_pos1 = 0; + goto again1; + } + + for (i = 0; i < it->ooi_total_npages; i++) { + if (pages[i] != NULL) + __free_page(pages[i]); + } + OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages)); + + it->ooi_pos0 = 0; + it->ooi_total_npages = 0; + it->ooi_valid_npages = 0; + it->ooi_swab = 0; + it->ooi_ent = NULL; + it->ooi_cur_page = NULL; + it->ooi_cur_idxpage = NULL; + it->ooi_pages = NULL; + } + + if (it->ooi_next == II_END_OFF) + RETURN(1); + + rc = osp_orphan_it_fetch(env, it); + if (rc == 0) + goto again0; + + RETURN(rc); +} + +static int osp_orphan_it_get(const struct lu_env *env, + struct dt_it *di, + const struct dt_key *key) +{ + return -ENOSYS; +} + +static void osp_orphan_it_put(const struct lu_env *env, + struct dt_it *di) +{ +} + +static struct dt_key *osp_orphan_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct lu_orphan_ent *ent = it->ooi_ent; + + if (likely(ent != NULL)) + return (struct dt_key *)(&ent->loe_key); + + return NULL; +} + +static int osp_orphan_it_key_size(const struct lu_env *env, + const struct dt_it *di) +{ + return sizeof(struct lu_fid); +} + +static int osp_orphan_it_rec(const struct lu_env *env, + const struct dt_it *di, + struct dt_rec *rec, + __u32 attr) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + struct lu_orphan_ent *ent = it->ooi_ent; + + if (likely(ent != NULL)) { + *(struct lu_orphan_rec *)rec = ent->loe_rec; + return 0; + } + + return -EINVAL; +} + +static __u64 osp_orphan_it_store(const struct lu_env *env, + const struct dt_it *di) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + + return it->ooi_next; +} + +/** + * \retval +1: locate to the exactly position + * \retval 0: cannot locate to the exactly position, + * call next() to move to a valid position. + * \retval -ve: on error + */ +static int osp_orphan_it_load(const struct lu_env *env, + const struct dt_it *di, + __u64 hash) +{ + struct osp_orphan_it *it = (struct osp_orphan_it *)di; + int rc; + + it->ooi_next = hash; + rc = osp_orphan_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; +} + +static int osp_orphan_it_key_rec(const struct lu_env *env, + const struct dt_it *di, + void *key_rec) +{ + return 0; +} + +static const struct dt_index_operations osp_orphan_index_ops = { + .dio_lookup = osp_orphan_index_lookup, + .dio_declare_insert = osp_orphan_index_declare_insert, + .dio_insert = osp_orphan_index_insert, + .dio_declare_delete = osp_orphan_index_declare_delete, + .dio_delete = osp_orphan_index_delete, + .dio_it = { + .init = osp_orphan_it_init, + .fini = osp_orphan_it_fini, + .next = osp_orphan_it_next, + .get = osp_orphan_it_get, + .put = osp_orphan_it_put, + .key = osp_orphan_it_key, + .key_size = osp_orphan_it_key_size, + .rec = osp_orphan_it_rec, + .store = osp_orphan_it_store, + .load = osp_orphan_it_load, + .key_rec = osp_orphan_it_key_rec, + } +}; + +static int osp_index_try(const struct lu_env *env, + struct dt_object *dt, + const struct dt_index_features *feat) +{ + if (fid_is_last_id(lu_object_fid(&dt->do_lu))) { + dt->do_index_ops = &osp_orphan_index_ops; + + return 0; + } + + return -EINVAL; +} + struct dt_object_operations osp_obj_ops = { .do_declare_attr_get = osp_declare_attr_get, .do_attr_get = osp_attr_get, @@ -1088,6 +1500,7 @@ struct dt_object_operations osp_obj_ops = { .do_create = osp_object_create, .do_declare_destroy = osp_declare_object_destroy, .do_destroy = osp_object_destroy, + .do_index_try = osp_index_try, }; static int osp_object_init(const struct lu_env *env, struct lu_object *o,