From 47b457491889278ef17f7dc4c72435e313829bdd Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Thu, 24 May 2012 12:52:22 +0400 Subject: [PATCH] LU-1406 ofd: add OBD methods to handle OST requests Finally add OBD handlers to process incoming requests. Add stub for quota functions, quota is non-functional in OFD. Fix couple minor issues: - slot checking in ofd_last_rcvd_update() must check lr_idx - remove osd_write_locked() assertion from __osd_xattr_set(), it is too excessive there Signed-off-by: Mikhail Pershin Change-Id: Iab270cdf5f2ad41df86845190c5c6db3436279c8 Reviewed-on: http://review.whamcloud.com/2894 Tested-by: Hudson Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Alex Zhuravlev --- lustre/include/lustre/lustre_idl.h | 1 + lustre/ofd/ofd_internal.h | 2 + lustre/ofd/ofd_obd.c | 633 ++++++++++++++++++++++++++++++++++++- lustre/ofd/ofd_trans.c | 2 +- lustre/osd-ldiskfs/osd_compat.c | 4 +- lustre/osd-ldiskfs/osd_handler.c | 1 - lustre/ost/ost_handler.c | 92 +++--- 7 files changed, 683 insertions(+), 52 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index aa15515..c574371 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1475,6 +1475,7 @@ struct lov_mds_md_v3 { /* LOV EA mds/wire data (little-endian) */ #define OBD_MD_FLCROSSREF (0x0000100000000000ULL) /* Cross-ref case */ #define OBD_MD_FLGETATTRLOCK (0x0000200000000000ULL) /* Get IOEpoch attributes * under lock */ +#define OBD_MD_FLOBJCOUNT (0x0000400000000000ULL) /* for multiple destroy */ #define OBD_MD_FLRMTLSETFACL (0x0001000000000000ULL) /* lfs lsetfacl case */ #define OBD_MD_FLRMTLGETFACL (0x0002000000000000ULL) /* lfs lgetfacl case */ diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index 6eca8c2..b5563cf 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -299,6 +299,8 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd, /* ofd_fs.c */ obd_id ofd_last_id(struct ofd_device *ofd, obd_seq seq); void ofd_last_id_set(struct ofd_device *ofd, obd_id id, obd_seq seq); +int ofd_last_id_write(const struct lu_env *env, struct ofd_device *ofd, + obd_seq seq); int ofd_group_load(const struct lu_env *env, struct ofd_device *ofd, int); int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd, struct obd_device *obd); diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 3f53b1b..a7cd74c 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -442,6 +442,31 @@ int ofd_obd_postrecov(struct obd_device *obd) RETURN(rc); } +static int ofd_adapt_sptlrpc_conf(const struct lu_env *env, + struct obd_device *obd, int initial) +{ + struct filter_obd *fo = &obd->u.filter; + struct sptlrpc_rule_set tmp_rset; + int rc; + + sptlrpc_rule_set_init(&tmp_rset); + rc = sptlrpc_conf_target_get_rules(obd, &tmp_rset, initial); + if (rc) { + CERROR("%s: failed get sptlrpc rules: rc = %d\n", + obd->obd_name, rc); + return rc; + } + + sptlrpc_target_update_exp_flavor(obd, &tmp_rset); + + cfs_write_lock(&fo->fo_sptlrpc_lock); + sptlrpc_rule_set_free(&fo->fo_sptlrpc_rset); + fo->fo_sptlrpc_rset = tmp_rset; + cfs_write_unlock(&fo->fo_sptlrpc_lock); + + return 0; +} + static int ofd_set_mds_conn(struct obd_export *exp, void *val) { int rc = 0; @@ -470,7 +495,10 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_CAPA_KEY)) { rc = ofd_update_capa_key(ofd, val); if (rc) - CERROR("ofd update capability key failed: %d\n", rc); + CERROR("%s: update capability key failed: rc = %d\n", + exp->exp_obd->obd_name, rc); + } else if (KEY_IS(KEY_SPTLRPC_CONF)) { + ofd_adapt_sptlrpc_conf(env, exp->exp_obd, 0); } else if (KEY_IS(KEY_MDS_CONN)) { rc = ofd_set_mds_conn(exp, val); } else if (KEY_IS(KEY_GRANT_SHRINK)) { @@ -526,6 +554,48 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, exp->exp_filter_data.fed_group); } *vallen = sizeof(*last_id); + } else if (KEY_IS(KEY_FIEMAP)) { + struct ofd_thread_info *info; + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_object *fo; + struct ll_fiemap_info_key *fm_key = key; + + if (val == NULL) { + *vallen = fiemap_count_to_size( + fm_key->fiemap.fm_extent_count); + RETURN(0); + } + + info = ofd_info_init(env, exp); + + fid_ostid_unpack(&info->fti_fid, &fm_key->oa.o_oi, 0); + + CDEBUG(D_INODE, "get FIEMAP of object "DFID"\n", + PFID(&info->fti_fid)); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: error finding object "DFID"\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid)); + rc = PTR_ERR(fo); + } else { + struct ll_user_fiemap *fiemap = val; + + ofd_read_lock(env, fo); + if (ofd_object_exists(fo)) { + *fiemap = fm_key->fiemap; + rc = dt_fiemap_get(env, + ofd_object_child(fo), + fiemap); + } else { + rc = -ENOENT; + } + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + } + } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) { + *((__u32 *) val) = ofd->ofd_sync_lock_cancel; + *vallen = sizeof(__u32); } else { CERROR("Not supported key %s\n", (char*)key); rc = -EOPNOTSUPP; @@ -687,11 +757,503 @@ out: return rc; } +int ofd_setattr(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti) +{ + struct ofd_thread_info *info; + struct ofd_device *ofd = ofd_exp(exp); + struct ldlm_namespace *ns = ofd->ofd_namespace; + struct ldlm_resource *res; + struct ofd_object *fo; + struct obdo *oa = oinfo->oi_oa; + struct filter_fid *ff = NULL; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + ofd_build_resid(&info->fti_fid, &info->fti_resid); + + rc = ofd_auth_capa(exp, &info->fti_fid, oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_META_WRITE); + if (rc) + GOTO(out, rc); + + /* This would be very bad - accidentally truncating a file when + * changing the time or similar - bug 12203. */ + if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE && + oinfo->oi_policy.l_extent.end != OBD_OBJECT_EOF) { + static char mdsinum[48]; + + if (oinfo->oi_oa->o_valid & OBD_MD_FLFID) + snprintf(mdsinum, sizeof(mdsinum) - 1, + "of parent "DFID, oinfo->oi_oa->o_parent_seq, + oinfo->oi_oa->o_parent_oid, 0); + else + mdsinum[0] = '\0'; + + CERROR("%s: setattr from %s trying to truncate object "DFID + " %s\n", exp->exp_obd->obd_name, + obd_export_nid2str(exp), PFID(&info->fti_fid), mdsinum); + GOTO(out, rc = -EPERM); + } + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: can't find object "DFID"\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid)); + GOTO(out, rc = PTR_ERR(fo)); + } + + la_from_obdo(&info->fti_attr, oinfo->oi_oa, oinfo->oi_oa->o_valid); + info->fti_attr.la_valid &= ~LA_TYPE; + + if (oa->o_valid & OBD_MD_FLFID) { + ff = &info->fti_mds_fid; + ofd_prepare_fidea(ff, oa); + } + + /* setting objects attributes (including owner/group) */ + rc = ofd_attr_set(env, fo, &info->fti_attr, ff); + if (rc) + GOTO(out_unlock, rc); + + res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); + if (res != NULL) { + ldlm_res_lvbo_update(res, NULL, 0); + ldlm_resource_putref(res); + } + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + + /* Quota release needs uid/gid info */ + rc = ofd_attr_get(env, fo, &info->fti_attr); + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + OFD_VALID_FLAGS | LA_UID | LA_GID); + ofd_info2oti(info, oti); +out_unlock: + ofd_object_put(env, fo); +out: + RETURN(rc); +} + +static int ofd_punch(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti, + struct ptlrpc_request_set *rqset) +{ + struct ofd_thread_info *info; + struct ofd_device *ofd = ofd_exp(exp); + struct ldlm_namespace *ns = ofd->ofd_namespace; + struct ldlm_resource *res; + struct ofd_object *fo; + struct filter_fid *ff = NULL; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + ofd_build_resid(&info->fti_fid, &info->fti_resid); + + CDEBUG(D_INODE, "calling punch for object "DFID", valid = "LPX64 + ", start = "LPD64", end = "LPD64"\n", PFID(&info->fti_fid), + oinfo->oi_oa->o_valid, oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end); + + rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC); + if (rc) + GOTO(out_env, rc); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: error finding object "DFID": rc = %ld\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid), + PTR_ERR(fo)); + GOTO(out_env, rc = PTR_ERR(fo)); + } + + LASSERT(oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF); + if (oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF) { + /* Truncate case */ + oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start; + } else if (oinfo->oi_policy.l_extent.end >= oinfo->oi_oa->o_size) { + oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.end; + } + + la_from_obdo(&info->fti_attr, oinfo->oi_oa, + OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME); + info->fti_attr.la_valid &= ~LA_TYPE; + info->fti_attr.la_size = oinfo->oi_policy.l_extent.start; + info->fti_attr.la_valid |= LA_SIZE; + + if (oinfo->oi_oa->o_valid & OBD_MD_FLFID) { + ff = &info->fti_mds_fid; + ofd_prepare_fidea(ff, oinfo->oi_oa); + } + + rc = ofd_object_punch(env, fo, oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end, &info->fti_attr, + ff); + if (rc) + GOTO(out, rc); + + res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); + if (res != NULL) { + ldlm_res_lvbo_update(res, NULL, 0); + ldlm_resource_putref(res); + } + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + /* Quota release needs uid/gid info */ + rc = ofd_attr_get(env, fo, &info->fti_attr); + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + OFD_VALID_FLAGS | LA_UID | LA_GID); + ofd_info2oti(info, oti); +out: + ofd_object_put(env, fo); +out_env: + RETURN(rc); +} + +static int ofd_destroy_by_fid(const struct lu_env *env, + struct ofd_device *ofd, + const struct lu_fid *fid, int orphan) +{ + struct ofd_thread_info *info = ofd_info(env); + struct lustre_handle lockh; + int flags = LDLM_AST_DISCARD_DATA, rc = 0; + ldlm_policy_data_t policy = { + .l_extent = { 0, OBD_OBJECT_EOF } + }; + struct ofd_object *fo; + + ENTRY; + + /* Tell the clients that the object is gone now and that they should + * throw away any cached pages. */ + ofd_build_resid(fid, &info->fti_resid); + rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, + LDLM_EXTENT, &policy, LCK_PW, &flags, + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, 0, NULL, &lockh); + + /* We only care about the side-effects, just drop the lock. */ + if (rc == ELDLM_OK) + ldlm_lock_decref(&lockh, LCK_PW); + + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + + rc = ofd_object_destroy(env, fo, orphan); + + ofd_object_put(env, fo); + RETURN(rc); +} + +int ofd_destroy(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md *md, + struct obd_trans_info *oti, struct obd_export *md_exp, + void *capa) +{ + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + obd_count count; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + if (!(oa->o_valid & OBD_MD_FLGROUP)) + oa->o_seq = 0; + + /* check that o_misc makes sense */ + if (oa->o_valid & OBD_MD_FLOBJCOUNT) + count = oa->o_misc; + else + count = 1; /* default case - single destroy */ + + /** + * There can be sequence of objects to destroy. Therefore this request + * may have multiple transaction involved in. It is OK, we need only + * the highest used transno to be reported back in reply but not for + * replays, they must report their transno + */ + if (info->fti_transno == 0) /* not replay */ + info->fti_mult_trans = 1; + while (count > 0) { + int lrc; + + fid_ostid_unpack(&info->fti_fid, &oa->o_oi, 0); + lrc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 0); + if (lrc == -ENOENT) { + CDEBUG(D_INODE, + "destroying non-existent object "LPU64"\n", + oa->o_id); + /* rewrite rc with -ENOENT only if it is 0 */ + if (rc == 0) + rc = lrc; + } else if (lrc != 0) { + CEMERG("error destroying object "LPU64": %d\n", + oa->o_id, rc); + rc = lrc; + } + count--; + oa->o_id++; + } + + /* if we have transaction then there were some deletions, we don't + * need to return ENOENT in that case because it will not wait + * for commit of these deletions. The ENOENT must be returned only + * if there were no transations. + */ + if (rc == -ENOENT) { + if (info->fti_transno != 0) + rc = 0; + } else if (rc != 0) { + /* + * If we have at least one transaction then llog record + * on server will be removed upon commit, so for rc != 0 + * we return no transno and llog record will be reprocessed. + */ + info->fti_transno = 0; + } + ofd_info2oti(info, oti); + RETURN(rc); +} + +static int ofd_orphans_destroy(const struct lu_env *env, + struct obd_export *exp, struct ofd_device *ofd, + struct obdo *oa) +{ + struct ofd_thread_info *info = ofd_info(env); + obd_id last; + int skip_orphan; + int rc = 0; + struct ost_id oi = oa->o_oi; + + ENTRY; + + LASSERT(exp != NULL); + skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN); + + last = ofd_last_id(ofd, oa->o_seq); + CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n", + ofd_obd(ofd)->obd_name, oa->o_id + 1, last); + + for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) { + fid_ostid_unpack(&info->fti_fid, &oi, 0); + rc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 1); + if (rc && rc != -ENOENT) /* this is pretty fatal... */ + CEMERG("error destroying precreated id "LPU64": %d\n", + oi.oi_id, rc); + if (!skip_orphan) { + ofd_last_id_set(ofd, oi.oi_id - 1, oa->o_seq); + /* update last_id on disk periodically so that if we + * restart * we don't need to re-scan all of the just + * deleted objects. */ + if ((oi.oi_id & 511) == 0) + ofd_last_id_write(env, ofd, oa->o_seq); + } + } + CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n", + ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id); + if (!skip_orphan) { + rc = ofd_last_id_write(env, ofd, oa->o_seq); + } else { + /* don't reuse orphan object, return last used objid */ + oa->o_id = last; + rc = 0; + } + RETURN(rc); +} + +int ofd_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md **ea, + struct obd_trans_info *oti) +{ + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + int rc = 0, diff; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + LASSERT(oa->o_seq >= FID_SEQ_OST_MDT0); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n", + oa->o_seq, oa->o_id); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + if (!ofd_obd(ofd)->obd_recovering || + oa->o_id > ofd_last_id(ofd, oa->o_seq)) { + CERROR("recreate objid "LPU64" > last id "LPU64"\n", + oa->o_id, ofd_last_id(ofd, oa->o_seq)); + GOTO(out, rc = -EINVAL); + } + /* do nothing because we create objects during first write */ + GOTO(out, rc = 0); + } + /* former ofd_handle_precreate */ + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_DELORPHAN)) { + /* destroy orphans */ + if (oti->oti_conn_cnt < exp->exp_conn_cnt) { + CERROR("%s: dropping old orphan cleanup request\n", + ofd_obd(ofd)->obd_name); + GOTO(out, rc = 0); + } + /* This causes inflight precreates to abort and drop lock */ + cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); + if (!cfs_test_bit(oa->o_seq, &ofd->ofd_destroys_in_progress)) { + CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", + exp->exp_obd->obd_name, oa->o_seq); + GOTO(out, rc = 0); + } + diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n", + ofd_last_id(ofd, oa->o_seq), diff); + if (-diff > OST_MAX_PRECREATE) { + /* FIXME: should reset precreate_next_id on MDS */ + rc = 0; + } else if (diff < 0) { + rc = ofd_orphans_destroy(env, exp, ofd, oa); + cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + } else { + /* XXX: Used by MDS for the first time! */ + cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + } + } else { + cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); + if (oti->oti_conn_cnt < exp->exp_conn_cnt) { + CERROR("%s: dropping old precreate request\n", + ofd_obd(ofd)->obd_name); + GOTO(out, rc = 0); + } + /* only precreate if group == 0 and o_id is specfied */ + if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) { + diff = 1; /* shouldn't we create this right now? */ + } else { + diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + } + } + if (diff > 0) { + obd_id next_id = ofd_last_id(ofd, oa->o_seq) + 1; + int i; + + if (!(oa->o_valid & OBD_MD_FLFLAGS) || + !(oa->o_flags & OBD_FL_DELORPHAN)) { + /* don't enforce grant during orphan recovery */ + rc = ofd_grant_create(env, + ofd_obd(ofd)->obd_self_export, + &diff); + if (rc) { + CDEBUG(D_HA, "%s: failed to acquire grant space" + "for precreate (%d)\n", + ofd_obd(ofd)->obd_name, diff); + diff = 0; + } + } + + CDEBUG(D_HA, + "%s: reserve %d objects in group "LPU64" at "LPU64"\n", + ofd_obd(ofd)->obd_name, diff, oa->o_seq, next_id); + for (i = 0; i < diff; i++) { + rc = ofd_precreate_object(env, ofd, next_id + i, + oa->o_seq); + if (rc) + break; + } + if (i > 0) { + /* some objects got created, we can return + * them, even if last creation failed */ + oa->o_id = ofd_last_id(ofd, oa->o_seq); + rc = 0; + } else { + CERROR("unable to precreate: %d\n", rc); + oa->o_id = ofd_last_id(ofd, oa->o_seq); + } + + oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; + + if (!(oa->o_valid & OBD_MD_FLFLAGS) || + !(oa->o_flags & OBD_FL_DELORPHAN)) + ofd_grant_commit(env, ofd_obd(ofd)->obd_self_export, + rc); + } + + ofd_info2oti(info, oti); +out: + cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]); + if (rc == 0 && ea != NULL) { + struct lov_stripe_md *lsm = *ea; + + lsm->lsm_object_id = oa->o_id; + } + return rc; +} + +int ofd_getattr(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo) +{ + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + struct ofd_object *fo; + __u64 curr_version; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_META_READ); + if (rc) + GOTO(out, rc); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) + GOTO(out, rc = PTR_ERR(fo)); + LASSERT(fo != NULL); + rc = ofd_attr_get(env, fo, &info->fti_attr); + oinfo->oi_oa->o_valid = OBD_MD_FLID; + if (rc == 0) + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + OFD_VALID_FLAGS | LA_UID | LA_GID); + + /* Store object version in reply */ + curr_version = dt_version_get(env, ofd_object_child(fo)); + if ((__s64)curr_version != -EOPNOTSUPP) { + oinfo->oi_oa->o_valid |= OBD_MD_FLDATAVERSION; + oinfo->oi_oa->o_data_version = curr_version; + } + ofd_object_put(env, fo); +out: + RETURN(rc); +} + static int ofd_sync(const struct lu_env *env, struct obd_export *exp, struct obd_info *oinfo, obd_size start, obd_size end, struct ptlrpc_request_set *set) { struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + struct ofd_object *fo; int rc = 0; ENTRY; @@ -702,7 +1264,37 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, GOTO(out, rc); } + info = ofd_info_init(env, exp); + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + + rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC); + if (rc) + GOTO(out, rc); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: error finding object "DFID": rc = %ld\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid), + PTR_ERR(fo)); + GOTO(out, rc = PTR_ERR(fo)); + } + + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + rc = dt_object_sync(env, ofd_object_child(fo)); + if (rc) + GOTO(unlock, rc); + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + rc = ofd_attr_get(env, fo, &info->fti_attr); + obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS); EXIT; +unlock: + ofd_write_unlock(env, fo); + ofd_object_put(env, fo); out: return rc; } @@ -724,7 +1316,7 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, switch (cmd) { case OBD_IOC_ABORT_RECOVERY: - CERROR("aborting recovery for device %s\n", obd->obd_name); + CERROR("%s: aborting recovery\n", obd->obd_name); target_stop_recovery_thread(obd); break; case OBD_IOC_SYNC: @@ -737,8 +1329,7 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = dt_ro(&env, ofd->ofd_osd); break; default: - CERROR("Not supported cmd = %d for device %s\n", - cmd, obd->obd_name); + CERROR("%s: not supported cmd = %d\n", obd->obd_name, cmd); rc = -ENOTTY; } @@ -833,6 +1424,33 @@ static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused, return 0; } +/* + * Handle quotacheck requests. + * + * \param obd - is the obd device associated with the ofd + * \param exp - is the client's export + * \param oqctl - is the obd_quotactl request to be processed + */ +static int ofd_quotacheck(struct obd_device *obd, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + return 0; +} + +/* + * Handle quota control requests to consult current usage/limit, but also + * to configure quota enforcement + * + * \param obd - is the obd device associated with the ofd + * \param exp - is the client's export + * \param oqctl - is the obd_quotactl request to be processed + */ +static int ofd_quotactl(struct obd_device *obd, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + return 0; +} + struct obd_ops ofd_obd_ops = { .o_owner = THIS_MODULE, .o_connect = ofd_obd_connect, @@ -840,16 +1458,23 @@ struct obd_ops ofd_obd_ops = { .o_disconnect = ofd_obd_disconnect, .o_set_info_async = ofd_set_info_async, .o_get_info = ofd_get_info, + .o_create = ofd_create, .o_statfs = ofd_statfs, + .o_setattr = ofd_setattr, .o_preprw = ofd_preprw, .o_commitrw = ofd_commitrw, + .o_destroy = ofd_destroy, .o_init_export = ofd_init_export, .o_destroy_export = ofd_destroy_export, .o_postrecov = ofd_obd_postrecov, + .o_punch = ofd_punch, + .o_getattr = ofd_getattr, .o_sync = ofd_sync, .o_iocontrol = ofd_iocontrol, .o_precleanup = ofd_precleanup, .o_ping = ofd_ping, .o_health_check = ofd_health_check, .o_notify = ofd_obd_notify, + .o_quotactl = ofd_quotactl, + .o_quotacheck = ofd_quotacheck, }; diff --git a/lustre/ofd/ofd_trans.c b/lustre/ofd/ofd_trans.c index 4d09bce..d9f26fe 100644 --- a/lustre/ofd/ofd_trans.c +++ b/lustre/ofd/ofd_trans.c @@ -134,7 +134,7 @@ static int ofd_last_rcvd_update(struct ofd_thread_info *info, } /* ofd connect may cause transaction before export has last_rcvd * slot */ - if (fed->fed_ted.ted_lr_off < 0) + if (fed->fed_ted.ted_lr_idx < 0) RETURN(0); off = fed->fed_ted.ted_lr_off; diff --git a/lustre/osd-ldiskfs/osd_compat.c b/lustre/osd-ldiskfs/osd_compat.c index 1e9b397..4c0c3b6 100644 --- a/lustre/osd-ldiskfs/osd_compat.c +++ b/lustre/osd-ldiskfs/osd_compat.c @@ -204,13 +204,11 @@ int osd_last_rcvd_subdir_count(struct osd_device *osd) rc = -EFAULT; dput(dlast); return rc; - } else { - count = FILTER_SUBDIR_COUNT; } out: dput(dlast); LASSERT(count > 0); - return rc; + return count; } void osd_compat_fini(struct osd_device *dev) diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 9c1949b..de725c6 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -1918,7 +1918,6 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL); - LASSERT(osd_write_locked(env, obj)); if (fl & LU_XATTR_REPLACE) fs_flags |= XATTR_REPLACE; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index bf50ab38..05e51df 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1522,52 +1522,58 @@ static int ost_connect_check_sptlrpc(struct ptlrpc_request *req) /* Ensure that data and metadata are synced to the disk when lock is cancelled * (if requested) */ -int ost_blocking_ast(struct ldlm_lock *lock, - struct ldlm_lock_desc *desc, - void *data, int flag) +int ost_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) { - __u32 sync_lock_cancel = 0; - __u32 len = sizeof(sync_lock_cancel); - int rc = 0; - ENTRY; - - rc = obd_get_info(NULL, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL), - KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL); - - if (!rc && flag == LDLM_CB_CANCELING && - (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) && - (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL || - (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL && - lock->l_flags & LDLM_FL_CBPENDING))) { - struct obd_info *oinfo; - struct obdo *oa; - int rc; + struct lu_env env; + __u32 sync_lock_cancel = 0; + __u32 len = sizeof(sync_lock_cancel); + int rc = 0; - OBD_ALLOC_PTR(oinfo); - if (!oinfo) - RETURN(-ENOMEM); - OBDO_ALLOC(oa); - if (!oa) { - OBD_FREE_PTR(oinfo); - RETURN(-ENOMEM); - } - oa->o_id = lock->l_resource->lr_name.name[0]; - oa->o_seq = lock->l_resource->lr_name.name[1]; - oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP; - oinfo->oi_oa = oa; - - rc = obd_sync(NULL, lock->l_export, oinfo, - lock->l_policy_data.l_extent.start, - lock->l_policy_data.l_extent.end, NULL); - if (rc) - CERROR("Error %d syncing data on lock cancel\n", rc); - - OBDO_FREE(oa); - OBD_FREE_PTR(oinfo); - } + ENTRY; - rc = ldlm_server_blocking_ast(lock, desc, data, flag); - RETURN(rc); + rc = lu_env_init(&env, LCT_DT_THREAD); + if (unlikely(rc != 0)) + RETURN(rc); + + rc = obd_get_info(&env, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL), + KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL); + if (rc == 0 && flag == LDLM_CB_CANCELING && + (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) && + (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL || + (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL && + lock->l_flags & LDLM_FL_CBPENDING))) { + struct obd_info *oinfo; + struct obdo *oa; + int rc; + + OBD_ALLOC_PTR(oinfo); + if (!oinfo) + GOTO(out_env, rc = -ENOMEM); + OBDO_ALLOC(oa); + if (!oa) { + OBD_FREE_PTR(oinfo); + GOTO(out_env, rc = -ENOMEM); + } + oa->o_id = lock->l_resource->lr_name.name[0]; + oa->o_seq = lock->l_resource->lr_name.name[1]; + oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP; + oinfo->oi_oa = oa; + + rc = obd_sync(&env, lock->l_export, oinfo, + lock->l_policy_data.l_extent.start, + lock->l_policy_data.l_extent.end, NULL); + if (rc) + CERROR("Error %d syncing data on lock cancel\n", rc); + + OBDO_FREE(oa); + OBD_FREE_PTR(oinfo); + } + + rc = ldlm_server_blocking_ast(lock, desc, data, flag); +out_env: + lu_env_fini(&env); + RETURN(rc); } static int ost_filter_recovery_request(struct ptlrpc_request *req, -- 1.8.3.1