X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_object.c;h=afe3b9f532070058b879dcf4cb03486ec53fed25;hb=49b17944e1a61f88bddb5595bb053a555c8c08da;hp=7d66e664382f95e5b5139c21a4b5acd4b4707048;hpb=2d686e9c9cc3c3c47cce92a0ff495b04efacd3a9;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index 7d66e66..afe3b9f 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -333,6 +333,36 @@ static int lov_init_released(const struct lu_env *env, return 0; } +static struct cl_object *lov_find_subobj(const struct lu_env *env, + struct lov_object *lov, + struct lov_stripe_md *lsm, + int stripe_idx) +{ + struct lov_device *dev = lu2lov_dev(lov2lu(lov)->lo_dev); + struct lov_oinfo *oinfo = lsm->lsm_oinfo[stripe_idx]; + struct lov_thread_info *lti = lov_env_info(env); + struct lu_fid *ofid = <i->lti_fid; + struct cl_device *subdev; + int ost_idx; + int rc; + struct cl_object *result; + + if (lov->lo_type != LLT_RAID0) + GOTO(out, result = NULL); + + ost_idx = oinfo->loi_ost_idx; + rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx); + if (rc != 0) + GOTO(out, result = NULL); + + subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); + result = lov_sub_find(env, subdev, ofid, NULL); +out: + if (result == NULL) + result = ERR_PTR(-EINVAL); + return result; +} + static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) { @@ -975,6 +1005,570 @@ int lov_lock_init(const struct lu_env *env, struct cl_object *obj, io); } +/** + * We calculate on which OST the mapping will end. If the length of mapping + * is greater than (stripe_size * stripe_count) then the last_stripe will + * will be one just before start_stripe. Else we check if the mapping + * intersects each OST and find last_stripe. + * This function returns the last_stripe and also sets the stripe_count + * over which the mapping is spread + * + * \param lsm [in] striping information for the file + * \param fm_start [in] logical start of mapping + * \param fm_end [in] logical end of mapping + * \param start_stripe [in] starting stripe of the mapping + * \param stripe_count [out] the number of stripes across which to map is + * returned + * + * \retval last_stripe return the last stripe of the mapping + */ +static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, + loff_t fm_start, loff_t fm_end, + int start_stripe, int *stripe_count) +{ + int last_stripe; + loff_t obd_start; + loff_t obd_end; + int i, j; + + if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) { + last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 : + start_stripe - 1); + *stripe_count = lsm->lsm_stripe_count; + } else { + for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count; + i = (i + 1) % lsm->lsm_stripe_count, j++) { + if ((lov_stripe_intersects(lsm, i, fm_start, fm_end, + &obd_start, &obd_end)) == 0) + break; + } + *stripe_count = j; + last_stripe = (start_stripe + j - 1) % lsm->lsm_stripe_count; + } + + return last_stripe; +} + +/** + * Set fe_device and copy extents from local buffer into main return buffer. + * + * \param fiemap [out] fiemap to hold all extents + * \param lcl_fm_ext [in] array of fiemap extents get from OSC layer + * \param ost_index [in] OST index to be written into the fm_device + * field for each extent + * \param ext_count [in] number of extents to be copied + * \param current_extent [in] where to start copying in the extent array + */ +static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap, + struct ll_fiemap_extent *lcl_fm_ext, + int ost_index, unsigned int ext_count, + int current_extent) +{ + char *to; + unsigned int ext; + + for (ext = 0; ext < ext_count; ext++) { + lcl_fm_ext[ext].fe_device = ost_index; + lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET; + } + + /* Copy fm_extent's from fm_local to return buffer */ + to = (char *)fiemap + fiemap_count_to_size(current_extent); + memcpy(to, lcl_fm_ext, ext_count * sizeof(struct ll_fiemap_extent)); +} + +#define FIEMAP_BUFFER_SIZE 4096 + +/** + * Non-zero fe_logical indicates that this is a continuation FIEMAP + * call. The local end offset and the device are sent in the first + * fm_extent. This function calculates the stripe number from the index. + * This function returns a stripe_no on which mapping is to be restarted. + * + * This function returns fm_end_offset which is the in-OST offset at which + * mapping should be restarted. If fm_end_offset=0 is returned then caller + * will re-calculate proper offset in next stripe. + * Note that the first extent is passed to lov_get_info via the value field. + * + * \param fiemap [in] fiemap request header + * \param lsm [in] striping information for the file + * \param fm_start [in] logical start of mapping + * \param fm_end [in] logical end of mapping + * \param start_stripe [out] starting stripe will be returned in this + */ +static loff_t fiemap_calc_fm_end_offset(struct fiemap *fiemap, + struct lov_stripe_md *lsm, + loff_t fm_start, loff_t fm_end, + int *start_stripe) +{ + loff_t local_end = fiemap->fm_extents[0].fe_logical; + loff_t lun_start; + loff_t lun_end; + loff_t fm_end_offset; + int stripe_no = -1; + int i; + + if (fiemap->fm_extent_count == 0 || + fiemap->fm_extents[0].fe_logical == 0) + return 0; + + /* Find out stripe_no from ost_index saved in the fe_device */ + for (i = 0; i < lsm->lsm_stripe_count; i++) { + struct lov_oinfo *oinfo = lsm->lsm_oinfo[i]; + + if (lov_oinfo_is_dummy(oinfo)) + continue; + + if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) { + stripe_no = i; + break; + } + } + + if (stripe_no == -1) + return -EINVAL; + + /* If we have finished mapping on previous device, shift logical + * offset to start of next device */ + if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end, + &lun_start, &lun_end) != 0 && + local_end < lun_end) { + fm_end_offset = local_end; + *start_stripe = stripe_no; + } else { + /* This is a special value to indicate that caller should + * calculate offset in next stripe. */ + fm_end_offset = 0; + *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count; + } + + return fm_end_offset; +} + +/** + * Break down the FIEMAP request and send appropriate calls to individual OSTs. + * This also handles the restarting of FIEMAP calls in case mapping overflows + * the available number of extents in single call. + * + * \param env [in] lustre environment + * \param obj [in] file object + * \param fmkey [in] fiemap request header and other info + * \param fiemap [out] fiemap buffer holding retrived map extents + * \param buflen [in/out] max buffer length of @fiemap, when iterate + * each OST, it is used to limit max map needed + * \retval 0 success + * \retval < 0 error + */ +static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, + struct ll_fiemap_info_key *fmkey, + struct fiemap *fiemap, size_t *buflen) +{ + struct lov_stripe_md *lsm; + struct cl_object *subobj = NULL; + struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov; + struct fiemap *fm_local = NULL; + struct ll_fiemap_extent *lcl_fm_ext; + loff_t fm_start; + loff_t fm_end; + loff_t fm_length; + loff_t fm_end_offset; + int count_local; + int ost_index = 0; + int start_stripe; + int current_extent = 0; + int rc = 0; + int last_stripe; + int cur_stripe = 0; + int cur_stripe_wrap = 0; + int stripe_count; + unsigned int buffer_size = FIEMAP_BUFFER_SIZE; + /* Whether have we collected enough extents */ + bool enough = false; + /* EOF for object */ + bool ost_eof = false; + /* done with required mapping for this OST? */ + bool ost_done = false; + ENTRY; + + lsm = lov_lsm_addref(cl2lov(obj)); + if (lsm == NULL) + RETURN(-ENODATA); + + /** + * If the stripe_count > 1 and the application does not understand + * DEVICE_ORDER flag, it cannot interpret the extents correctly. + */ + if (lsm->lsm_stripe_count > 1 && !(fiemap->fm_flags & + FIEMAP_FLAG_DEVICE_ORDER)) + GOTO(out, rc = -ENOTSUPP); + + if (lsm_is_released(lsm)) { + if (fiemap->fm_start < fmkey->oa.o_size) { + /** + * released file, return a minimal FIEMAP if + * request fits in file-size. + */ + fiemap->fm_mapped_extents = 1; + fiemap->fm_extents[0].fe_logical = fiemap->fm_start; + if (fiemap->fm_start + fiemap->fm_length < + fmkey->oa.o_size) + fiemap->fm_extents[0].fe_length = + fiemap->fm_length; + else + fiemap->fm_extents[0].fe_length = + fmkey->oa.o_size - fiemap->fm_start; + fiemap->fm_extents[0].fe_flags |= + FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST; + } + GOTO(out, rc = 0); + } + + if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size) + buffer_size = fiemap_count_to_size(fiemap->fm_extent_count); + + OBD_ALLOC_LARGE(fm_local, buffer_size); + if (fm_local == NULL) + GOTO(out, rc = -ENOMEM); + lcl_fm_ext = &fm_local->fm_extents[0]; + count_local = fiemap_size_to_count(buffer_size); + + fm_start = fiemap->fm_start; + fm_length = fiemap->fm_length; + /* Calculate start stripe, last stripe and length of mapping */ + start_stripe = lov_stripe_number(lsm, fm_start); + fm_end = (fm_length == ~0ULL) ? fmkey->oa.o_size : + fm_start + fm_length - 1; + /* If fm_length != ~0ULL but fm_start_fm_length-1 exceeds file size */ + if (fm_end > fmkey->oa.o_size) + fm_end = fmkey->oa.o_size; + + last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end, + start_stripe, &stripe_count); + fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end, + &start_stripe); + if (fm_end_offset == -EINVAL) + GOTO(out, rc = -EINVAL); + + /** + * Requested extent count exceeds the fiemap buffer size, shrink our + * ambition. + */ + if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen) + fiemap->fm_extent_count = fiemap_size_to_count(*buflen); + if (fiemap->fm_extent_count == 0) + count_local = 0; + + /* Check each stripe */ + for (cur_stripe = start_stripe; stripe_count > 0; + --stripe_count, + cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) { + loff_t req_fm_len; /* Stores length of required mapping */ + loff_t len_mapped_single_call; + loff_t lun_start; + loff_t lun_end; + loff_t obd_object_end; + unsigned int ext_count; + + cur_stripe_wrap = cur_stripe; + + /* Find out range of mapping on this stripe */ + if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end, + &lun_start, &obd_object_end)) == 0) + continue; + + if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe])) + GOTO(out, rc = -EIO); + + /* If this is a continuation FIEMAP call and we are on + * starting stripe then lun_start needs to be set to + * fm_end_offset */ + if (fm_end_offset != 0 && cur_stripe == start_stripe) + lun_start = fm_end_offset; + + if (fm_length != ~0ULL) { + /* Handle fm_start + fm_length overflow */ + if (fm_start + fm_length < fm_start) + fm_length = ~0ULL - fm_start; + lun_end = lov_size_to_stripe(lsm, fm_start + fm_length, + cur_stripe); + } else { + lun_end = ~0ULL; + } + + if (lun_start == lun_end) + continue; + + req_fm_len = obd_object_end - lun_start; + fm_local->fm_length = 0; + len_mapped_single_call = 0; + + /* find lobsub object */ + subobj = lov_find_subobj(env, cl2lov(obj), lsm, + cur_stripe); + if (IS_ERR(subobj)) + GOTO(out, rc = PTR_ERR(subobj)); + /* If the output buffer is very large and the objects have many + * extents we may need to loop on a single OST repeatedly */ + ost_eof = false; + ost_done = false; + do { + if (fiemap->fm_extent_count > 0) { + /* Don't get too many extents. */ + if (current_extent + count_local > + fiemap->fm_extent_count) + count_local = fiemap->fm_extent_count - + current_extent; + } + + lun_start += len_mapped_single_call; + fm_local->fm_length = req_fm_len - + len_mapped_single_call; + req_fm_len = fm_local->fm_length; + fm_local->fm_extent_count = enough ? 1 : count_local; + fm_local->fm_mapped_extents = 0; + fm_local->fm_flags = fiemap->fm_flags; + + ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx; + + if (ost_index < 0 || + ost_index >= lov->desc.ld_tgt_count) + GOTO(obj_put, rc = -EINVAL); + /* If OST is inactive, return extent with UNKNOWN + * flag. */ + if (!lov->lov_tgts[ost_index]->ltd_active) { + fm_local->fm_flags |= FIEMAP_EXTENT_LAST; + fm_local->fm_mapped_extents = 1; + + lcl_fm_ext[0].fe_logical = lun_start; + lcl_fm_ext[0].fe_length = obd_object_end - + lun_start; + lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN; + + goto inactive_tgt; + } + + fm_local->fm_start = lun_start; + fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER; + memcpy(&fmkey->fiemap, fm_local, sizeof(*fm_local)); + *buflen = fiemap_count_to_size( + fm_local->fm_extent_count); + + rc = cl_object_fiemap(env, subobj, fmkey, fm_local, + buflen); + if (rc != 0) + GOTO(obj_put, rc); +inactive_tgt: + ext_count = fm_local->fm_mapped_extents; + if (ext_count == 0) { + ost_done = true; + /* If last stripe has hold at the end, + * we need to return */ + if (cur_stripe_wrap == last_stripe) { + fiemap->fm_mapped_extents = 0; + goto finish; + } + break; + } else if (enough) { + /* + * We've collected enough extents and there are + * more extents after it. + */ + goto finish; + } + + /* If we just need num of extents, got to next device */ + if (fiemap->fm_extent_count == 0) { + current_extent += ext_count; + break; + } + + /* prepare to copy retrived map extents */ + len_mapped_single_call = + lcl_fm_ext[ext_count - 1].fe_logical - + lun_start + lcl_fm_ext[ext_count - 1].fe_length; + + /* Have we finished mapping on this device? */ + if (req_fm_len <= len_mapped_single_call) + ost_done = true; + + /* Clear the EXTENT_LAST flag which can be present on + * the last extent */ + if (lcl_fm_ext[ext_count - 1].fe_flags & + FIEMAP_EXTENT_LAST) + lcl_fm_ext[ext_count - 1].fe_flags &= + ~FIEMAP_EXTENT_LAST; + if (lov_stripe_size(lsm, + lcl_fm_ext[ext_count - 1].fe_logical + + lcl_fm_ext[ext_count - 1].fe_length, + cur_stripe) >= fmkey->oa.o_size) + ost_eof = true; + + fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext, + ost_index, ext_count, + current_extent); + current_extent += ext_count; + + /* Ran out of available extents? */ + if (current_extent >= fiemap->fm_extent_count) + enough = true; + } while (!ost_done && !ost_eof); + + cl_object_put(env, subobj); + subobj = NULL; + + if (cur_stripe_wrap == last_stripe) + goto finish; + } /* for each stripe */ +finish: + /* Indicate that we are returning device offsets unless file just has + * single stripe */ + if (lsm->lsm_stripe_count > 1) + fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER; + + if (fiemap->fm_extent_count == 0) + goto skip_last_device_calc; + + /* Check if we have reached the last stripe and whether mapping for that + * stripe is done. */ + if ((cur_stripe_wrap == last_stripe) && (ost_done || ost_eof)) + fiemap->fm_extents[current_extent - 1].fe_flags |= + FIEMAP_EXTENT_LAST; +skip_last_device_calc: + fiemap->fm_mapped_extents = current_extent; +obj_put: + if (subobj != NULL) + cl_object_put(env, subobj); +out: + if (fm_local != NULL) + OBD_FREE_LARGE(fm_local, buffer_size); + lov_lsm_put(obj, lsm); + RETURN(rc); +} + +static int lov_dispatch_obd_info_get(const struct lu_env *env, + struct cl_object *obj, + struct obd_info *oinfo, + struct ptlrpc_request_set *set) +{ + struct cl_object *subobj = NULL; + struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov; + struct lov_request_set *lovset; + struct list_head *pos; + struct lov_request *req; + int rc; + int rc2; + ENTRY; + + rc = lov_prep_getattr_set(lov2obd(lov)->obd_self_export, oinfo, + &lovset); + if (rc != 0) + RETURN(rc); + + CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes.\n", + POSTID(&oinfo->oi_md->lsm_oi), + oinfo->oi_md->lsm_stripe_count, + oinfo->oi_md->lsm_stripe_size); + + list_for_each(pos, &lovset->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx" + "%u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe, + POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx); + subobj = lov_find_subobj(env, cl2lov(obj), oinfo->oi_md, + req->rq_stripe); + if (IS_ERR(subobj)) + GOTO(errout, rc = PTR_ERR(subobj)); + + rc = cl_object_obd_info_get(env, subobj, &req->rq_oi, set); + cl_object_put(env, subobj); + if (rc != 0) { + CERROR("%s: getattr objid "DOSTID" subobj" + DOSTID" on OST idx %d: rc = %d\n", + lov2obd(lov)->obd_name, + POSTID(&oinfo->oi_oa->o_oi), + POSTID(&req->rq_oi.oi_oa->o_oi), + req->rq_idx, rc); + GOTO(errout, rc); + } + } + + if (!list_empty(&set->set_requests)) { + LASSERT(rc == 0); + LASSERT(set->set_interpret == NULL); + set->set_interpret = lov_getattr_interpret; + set->set_arg = lovset; + GOTO(out, rc); + } +errout: + if (rc) + atomic_set(&lovset->set_completes, 0); + rc2 = lov_fini_getattr_set(lovset); + rc = rc != 0 ? rc : rc2; +out: + RETURN(rc); +} + +static int lov_object_data_version(const struct lu_env *env, + struct cl_object *obj, __u64 *data_version, + int flags) +{ + struct ptlrpc_request_set *set; + struct obd_info oinfo = { { { 0 } } }; + struct obdo *obdo = NULL; + struct lov_stripe_md *lsm; + int rc; + ENTRY; + + lsm = lov_lsm_addref(cl2lov(obj)); + if (!lsm_has_objects(lsm)) { + /* If no stripe, we consider version is 0. */ + *data_version = 0; + GOTO(out, rc = 0); + } + + OBD_ALLOC_PTR(obdo); + if (obdo == NULL) + GOTO(out, rc = -ENOMEM); + + oinfo.oi_md = lsm; + oinfo.oi_oa = obdo; + obdo->o_oi = lsm->lsm_oi; + obdo->o_mode = S_IFREG; + obdo->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLTYPE; + if (flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) { + obdo->o_valid |= OBD_MD_FLFLAGS; + obdo->o_flags |= OBD_FL_SRVLOCK; + if (flags & LL_DV_WR_FLUSH) + obdo->o_flags |= OBD_FL_FLUSH; + } + + set = ptlrpc_prep_set(); + if (set == NULL) + GOTO(out_obdo, rc = -ENOMEM); + + rc = lov_dispatch_obd_info_get(env, obj, &oinfo, set); + if (rc == 0) + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); + if (rc == 0) { + oinfo.oi_oa->o_valid &= OBD_MD_FLDATAVERSION | OBD_MD_FLFLAGS; + if (flags & LL_DV_WR_FLUSH && + !(oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS && + oinfo.oi_oa->o_flags & OBD_FL_FLUSH)) + rc = -EOPNOTSUPP; + else if (!(obdo->o_valid & OBD_MD_FLDATAVERSION)) + rc = -EOPNOTSUPP; + else + *data_version = obdo->o_data_version; + } +out_obdo: + OBD_FREE_PTR(obdo); +out: + lov_lsm_put(obj, lsm); + RETURN(rc); +} + static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj, struct lov_user_md __user *lum) { @@ -1013,7 +1607,9 @@ static const struct cl_object_operations lov_ops = { .coo_attr_update = lov_attr_update, .coo_conf_set = lov_conf_set, .coo_getstripe = lov_object_getstripe, - .coo_find_cbdata = lov_object_find_cbdata + .coo_find_cbdata = lov_object_find_cbdata, + .coo_fiemap = lov_object_fiemap, + .coo_data_version = lov_object_data_version, }; static const struct lu_object_operations lov_lu_obj_ops = {