X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fofd%2Fofd_obd.c;h=a49b4903fbbb277d4eee737a126694d44c216a91;hb=52c95d51f1906d4d838dec3419e39457a8e17824;hp=6135f0ff6fc4c18f8e46b8fa4c02cf961d4bcbd2;hpb=3f90f344ae059b30e7d23e4fe554a985eb827b02;p=fs%2Flustre-release.git diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 6135f0f..a49b490 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -44,6 +44,7 @@ #include "ofd_internal.h" #include +#include static int ofd_export_stats_init(struct ofd_device *ofd, struct obd_export *exp, void *client_nid) @@ -213,7 +214,7 @@ static int ofd_parse_connect_data(const struct lu_env *env, /* The client set in ocd_cksum_types the checksum types it * supports. We have to mask off the algorithms that we don't * support */ - data->ocd_cksum_types &= cksum_types_supported(); + data->ocd_cksum_types &= cksum_types_supported_server(); if (unlikely(data->ocd_cksum_types == 0)) { CERROR("%s: Connect with checksum support but no " @@ -442,6 +443,31 @@ int ofd_obd_postrecov(struct obd_device *obd) RETURN(rc); } +static int ofd_adapt_sptlrpc_conf(const struct lu_env *env, + struct obd_device *obd, int initial) +{ + struct filter_obd *fo = &obd->u.filter; + struct sptlrpc_rule_set tmp_rset; + int rc; + + sptlrpc_rule_set_init(&tmp_rset); + rc = sptlrpc_conf_target_get_rules(obd, &tmp_rset, initial); + if (rc) { + CERROR("%s: failed get sptlrpc rules: rc = %d\n", + obd->obd_name, rc); + return rc; + } + + sptlrpc_target_update_exp_flavor(obd, &tmp_rset); + + cfs_write_lock(&fo->fo_sptlrpc_lock); + sptlrpc_rule_set_free(&fo->fo_sptlrpc_rset); + fo->fo_sptlrpc_rset = tmp_rset; + cfs_write_unlock(&fo->fo_sptlrpc_lock); + + return 0; +} + static int ofd_set_mds_conn(struct obd_export *exp, void *val) { int rc = 0; @@ -470,7 +496,10 @@ static int ofd_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_CAPA_KEY)) { rc = ofd_update_capa_key(ofd, val); if (rc) - CERROR("ofd update capability key failed: %d\n", rc); + CERROR("%s: update capability key failed: rc = %d\n", + exp->exp_obd->obd_name, rc); + } else if (KEY_IS(KEY_SPTLRPC_CONF)) { + ofd_adapt_sptlrpc_conf(env, exp->exp_obd, 0); } else if (KEY_IS(KEY_MDS_CONN)) { rc = ofd_set_mds_conn(exp, val); } else if (KEY_IS(KEY_GRANT_SHRINK)) { @@ -526,6 +555,48 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp, exp->exp_filter_data.fed_group); } *vallen = sizeof(*last_id); + } else if (KEY_IS(KEY_FIEMAP)) { + struct ofd_thread_info *info; + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_object *fo; + struct ll_fiemap_info_key *fm_key = key; + + if (val == NULL) { + *vallen = fiemap_count_to_size( + fm_key->fiemap.fm_extent_count); + RETURN(0); + } + + info = ofd_info_init(env, exp); + + fid_ostid_unpack(&info->fti_fid, &fm_key->oa.o_oi, 0); + + CDEBUG(D_INODE, "get FIEMAP of object "DFID"\n", + PFID(&info->fti_fid)); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: error finding object "DFID"\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid)); + rc = PTR_ERR(fo); + } else { + struct ll_user_fiemap *fiemap = val; + + ofd_read_lock(env, fo); + if (ofd_object_exists(fo)) { + *fiemap = fm_key->fiemap; + rc = dt_fiemap_get(env, + ofd_object_child(fo), + fiemap); + } else { + rc = -ENOENT; + } + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + } + } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) { + *((__u32 *) val) = ofd->ofd_sync_lock_cancel; + *vallen = sizeof(__u32); } else { CERROR("Not supported key %s\n", (char*)key); rc = -EOPNOTSUPP; @@ -687,11 +758,503 @@ out: return rc; } +int ofd_setattr(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti) +{ + struct ofd_thread_info *info; + struct ofd_device *ofd = ofd_exp(exp); + struct ldlm_namespace *ns = ofd->ofd_namespace; + struct ldlm_resource *res; + struct ofd_object *fo; + struct obdo *oa = oinfo->oi_oa; + struct filter_fid *ff = NULL; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + ofd_build_resid(&info->fti_fid, &info->fti_resid); + + rc = ofd_auth_capa(exp, &info->fti_fid, oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_META_WRITE); + if (rc) + GOTO(out, rc); + + /* This would be very bad - accidentally truncating a file when + * changing the time or similar - bug 12203. */ + if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE && + oinfo->oi_policy.l_extent.end != OBD_OBJECT_EOF) { + static char mdsinum[48]; + + if (oinfo->oi_oa->o_valid & OBD_MD_FLFID) + snprintf(mdsinum, sizeof(mdsinum) - 1, + "of parent "DFID, oinfo->oi_oa->o_parent_seq, + oinfo->oi_oa->o_parent_oid, 0); + else + mdsinum[0] = '\0'; + + CERROR("%s: setattr from %s trying to truncate object "DFID + " %s\n", exp->exp_obd->obd_name, + obd_export_nid2str(exp), PFID(&info->fti_fid), mdsinum); + GOTO(out, rc = -EPERM); + } + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: can't find object "DFID"\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid)); + GOTO(out, rc = PTR_ERR(fo)); + } + + la_from_obdo(&info->fti_attr, oinfo->oi_oa, oinfo->oi_oa->o_valid); + info->fti_attr.la_valid &= ~LA_TYPE; + + if (oa->o_valid & OBD_MD_FLFID) { + ff = &info->fti_mds_fid; + ofd_prepare_fidea(ff, oa); + } + + /* setting objects attributes (including owner/group) */ + rc = ofd_attr_set(env, fo, &info->fti_attr, ff); + if (rc) + GOTO(out_unlock, rc); + + res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); + if (res != NULL) { + ldlm_res_lvbo_update(res, NULL, 0); + ldlm_resource_putref(res); + } + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + + /* Quota release needs uid/gid info */ + rc = ofd_attr_get(env, fo, &info->fti_attr); + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + OFD_VALID_FLAGS | LA_UID | LA_GID); + ofd_info2oti(info, oti); +out_unlock: + ofd_object_put(env, fo); +out: + RETURN(rc); +} + +static int ofd_punch(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo, struct obd_trans_info *oti, + struct ptlrpc_request_set *rqset) +{ + struct ofd_thread_info *info; + struct ofd_device *ofd = ofd_exp(exp); + struct ldlm_namespace *ns = ofd->ofd_namespace; + struct ldlm_resource *res; + struct ofd_object *fo; + struct filter_fid *ff = NULL; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + ofd_build_resid(&info->fti_fid, &info->fti_resid); + + CDEBUG(D_INODE, "calling punch for object "DFID", valid = "LPX64 + ", start = "LPD64", end = "LPD64"\n", PFID(&info->fti_fid), + oinfo->oi_oa->o_valid, oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end); + + rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC); + if (rc) + GOTO(out_env, rc); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: error finding object "DFID": rc = %ld\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid), + PTR_ERR(fo)); + GOTO(out_env, rc = PTR_ERR(fo)); + } + + LASSERT(oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF); + if (oinfo->oi_policy.l_extent.end == OBD_OBJECT_EOF) { + /* Truncate case */ + oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start; + } else if (oinfo->oi_policy.l_extent.end >= oinfo->oi_oa->o_size) { + oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.end; + } + + la_from_obdo(&info->fti_attr, oinfo->oi_oa, + OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME); + info->fti_attr.la_valid &= ~LA_TYPE; + info->fti_attr.la_size = oinfo->oi_policy.l_extent.start; + info->fti_attr.la_valid |= LA_SIZE; + + if (oinfo->oi_oa->o_valid & OBD_MD_FLFID) { + ff = &info->fti_mds_fid; + ofd_prepare_fidea(ff, oinfo->oi_oa); + } + + rc = ofd_object_punch(env, fo, oinfo->oi_policy.l_extent.start, + oinfo->oi_policy.l_extent.end, &info->fti_attr, + ff); + if (rc) + GOTO(out, rc); + + res = ldlm_resource_get(ns, NULL, &info->fti_resid, LDLM_EXTENT, 0); + if (res != NULL) { + ldlm_res_lvbo_update(res, NULL, 0); + ldlm_resource_putref(res); + } + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + /* Quota release needs uid/gid info */ + rc = ofd_attr_get(env, fo, &info->fti_attr); + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + OFD_VALID_FLAGS | LA_UID | LA_GID); + ofd_info2oti(info, oti); +out: + ofd_object_put(env, fo); +out_env: + RETURN(rc); +} + +static int ofd_destroy_by_fid(const struct lu_env *env, + struct ofd_device *ofd, + const struct lu_fid *fid, int orphan) +{ + struct ofd_thread_info *info = ofd_info(env); + struct lustre_handle lockh; + int flags = LDLM_AST_DISCARD_DATA, rc = 0; + ldlm_policy_data_t policy = { + .l_extent = { 0, OBD_OBJECT_EOF } + }; + struct ofd_object *fo; + + ENTRY; + + /* Tell the clients that the object is gone now and that they should + * throw away any cached pages. */ + ofd_build_resid(fid, &info->fti_resid); + rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, + LDLM_EXTENT, &policy, LCK_PW, &flags, + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, 0, NULL, &lockh); + + /* We only care about the side-effects, just drop the lock. */ + if (rc == ELDLM_OK) + ldlm_lock_decref(&lockh, LCK_PW); + + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + + rc = ofd_object_destroy(env, fo, orphan); + + ofd_object_put(env, fo); + RETURN(rc); +} + +int ofd_destroy(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md *md, + struct obd_trans_info *oti, struct obd_export *md_exp, + void *capa) +{ + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + obd_count count; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + if (!(oa->o_valid & OBD_MD_FLGROUP)) + oa->o_seq = 0; + + /* check that o_misc makes sense */ + if (oa->o_valid & OBD_MD_FLOBJCOUNT) + count = oa->o_misc; + else + count = 1; /* default case - single destroy */ + + /** + * There can be sequence of objects to destroy. Therefore this request + * may have multiple transaction involved in. It is OK, we need only + * the highest used transno to be reported back in reply but not for + * replays, they must report their transno + */ + if (info->fti_transno == 0) /* not replay */ + info->fti_mult_trans = 1; + while (count > 0) { + int lrc; + + fid_ostid_unpack(&info->fti_fid, &oa->o_oi, 0); + lrc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 0); + if (lrc == -ENOENT) { + CDEBUG(D_INODE, + "destroying non-existent object "LPU64"\n", + oa->o_id); + /* rewrite rc with -ENOENT only if it is 0 */ + if (rc == 0) + rc = lrc; + } else if (lrc != 0) { + CEMERG("error destroying object "LPU64": %d\n", + oa->o_id, rc); + rc = lrc; + } + count--; + oa->o_id++; + } + + /* if we have transaction then there were some deletions, we don't + * need to return ENOENT in that case because it will not wait + * for commit of these deletions. The ENOENT must be returned only + * if there were no transations. + */ + if (rc == -ENOENT) { + if (info->fti_transno != 0) + rc = 0; + } else if (rc != 0) { + /* + * If we have at least one transaction then llog record + * on server will be removed upon commit, so for rc != 0 + * we return no transno and llog record will be reprocessed. + */ + info->fti_transno = 0; + } + ofd_info2oti(info, oti); + RETURN(rc); +} + +static int ofd_orphans_destroy(const struct lu_env *env, + struct obd_export *exp, struct ofd_device *ofd, + struct obdo *oa) +{ + struct ofd_thread_info *info = ofd_info(env); + obd_id last; + int skip_orphan; + int rc = 0; + struct ost_id oi = oa->o_oi; + + ENTRY; + + LASSERT(exp != NULL); + skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN); + + last = ofd_last_id(ofd, oa->o_seq); + CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n", + ofd_obd(ofd)->obd_name, oa->o_id + 1, last); + + for (oi.oi_id = last; oi.oi_id > oa->o_id; oi.oi_id--) { + fid_ostid_unpack(&info->fti_fid, &oi, 0); + rc = ofd_destroy_by_fid(env, ofd, &info->fti_fid, 1); + if (rc && rc != -ENOENT) /* this is pretty fatal... */ + CEMERG("error destroying precreated id "LPU64": %d\n", + oi.oi_id, rc); + if (!skip_orphan) { + ofd_last_id_set(ofd, oi.oi_id - 1, oa->o_seq); + /* update last_id on disk periodically so that if we + * restart * we don't need to re-scan all of the just + * deleted objects. */ + if ((oi.oi_id & 511) == 0) + ofd_last_id_write(env, ofd, oa->o_seq); + } + } + CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n", + ofd_obd(ofd)->obd_name, oa->o_seq, oa->o_id); + if (!skip_orphan) { + rc = ofd_last_id_write(env, ofd, oa->o_seq); + } else { + /* don't reuse orphan object, return last used objid */ + oa->o_id = last; + rc = 0; + } + RETURN(rc); +} + +int ofd_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct lov_stripe_md **ea, + struct obd_trans_info *oti) +{ + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + int rc = 0, diff; + + ENTRY; + + info = ofd_info_init(env, exp); + ofd_oti2info(info, oti); + + LASSERT(oa->o_seq >= FID_SEQ_OST_MDT0); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + CDEBUG(D_INFO, "ofd_create(oa->o_seq="LPU64",oa->o_id="LPU64")\n", + oa->o_seq, oa->o_id); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + if (!ofd_obd(ofd)->obd_recovering || + oa->o_id > ofd_last_id(ofd, oa->o_seq)) { + CERROR("recreate objid "LPU64" > last id "LPU64"\n", + oa->o_id, ofd_last_id(ofd, oa->o_seq)); + GOTO(out, rc = -EINVAL); + } + /* do nothing because we create objects during first write */ + GOTO(out, rc = 0); + } + /* former ofd_handle_precreate */ + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_DELORPHAN)) { + /* destroy orphans */ + if (oti->oti_conn_cnt < exp->exp_conn_cnt) { + CERROR("%s: dropping old orphan cleanup request\n", + ofd_obd(ofd)->obd_name); + GOTO(out, rc = 0); + } + /* This causes inflight precreates to abort and drop lock */ + cfs_set_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); + if (!cfs_test_bit(oa->o_seq, &ofd->ofd_destroys_in_progress)) { + CERROR("%s:["LPU64"] destroys_in_progress already cleared\n", + exp->exp_obd->obd_name, oa->o_seq); + GOTO(out, rc = 0); + } + diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n", + ofd_last_id(ofd, oa->o_seq), diff); + if (-diff > OST_MAX_PRECREATE) { + /* FIXME: should reset precreate_next_id on MDS */ + rc = 0; + } else if (diff < 0) { + rc = ofd_orphans_destroy(env, exp, ofd, oa); + cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + } else { + /* XXX: Used by MDS for the first time! */ + cfs_clear_bit(oa->o_seq, &ofd->ofd_destroys_in_progress); + } + } else { + cfs_mutex_lock(&ofd->ofd_create_locks[oa->o_seq]); + if (oti->oti_conn_cnt < exp->exp_conn_cnt) { + CERROR("%s: dropping old precreate request\n", + ofd_obd(ofd)->obd_name); + GOTO(out, rc = 0); + } + /* only precreate if group == 0 and o_id is specfied */ + if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) { + diff = 1; /* shouldn't we create this right now? */ + } else { + diff = oa->o_id - ofd_last_id(ofd, oa->o_seq); + } + } + if (diff > 0) { + obd_id next_id = ofd_last_id(ofd, oa->o_seq) + 1; + int i; + + if (!(oa->o_valid & OBD_MD_FLFLAGS) || + !(oa->o_flags & OBD_FL_DELORPHAN)) { + /* don't enforce grant during orphan recovery */ + rc = ofd_grant_create(env, + ofd_obd(ofd)->obd_self_export, + &diff); + if (rc) { + CDEBUG(D_HA, "%s: failed to acquire grant space" + "for precreate (%d)\n", + ofd_obd(ofd)->obd_name, diff); + diff = 0; + } + } + + CDEBUG(D_HA, + "%s: reserve %d objects in group "LPU64" at "LPU64"\n", + ofd_obd(ofd)->obd_name, diff, oa->o_seq, next_id); + for (i = 0; i < diff; i++) { + rc = ofd_precreate_object(env, ofd, next_id + i, + oa->o_seq); + if (rc) + break; + } + if (i > 0) { + /* some objects got created, we can return + * them, even if last creation failed */ + oa->o_id = ofd_last_id(ofd, oa->o_seq); + rc = 0; + } else { + CERROR("unable to precreate: %d\n", rc); + oa->o_id = ofd_last_id(ofd, oa->o_seq); + } + + oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; + + if (!(oa->o_valid & OBD_MD_FLFLAGS) || + !(oa->o_flags & OBD_FL_DELORPHAN)) + ofd_grant_commit(env, ofd_obd(ofd)->obd_self_export, + rc); + } + + ofd_info2oti(info, oti); +out: + cfs_mutex_unlock(&ofd->ofd_create_locks[oa->o_seq]); + if (rc == 0 && ea != NULL) { + struct lov_stripe_md *lsm = *ea; + + lsm->lsm_object_id = oa->o_id; + } + return rc; +} + +int ofd_getattr(const struct lu_env *env, struct obd_export *exp, + struct obd_info *oinfo) +{ + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + struct ofd_object *fo; + __u64 curr_version; + int rc = 0; + + ENTRY; + + info = ofd_info_init(env, exp); + + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_META_READ); + if (rc) + GOTO(out, rc); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) + GOTO(out, rc = PTR_ERR(fo)); + LASSERT(fo != NULL); + rc = ofd_attr_get(env, fo, &info->fti_attr); + oinfo->oi_oa->o_valid = OBD_MD_FLID; + if (rc == 0) + obdo_from_la(oinfo->oi_oa, &info->fti_attr, + OFD_VALID_FLAGS | LA_UID | LA_GID); + + /* Store object version in reply */ + curr_version = dt_version_get(env, ofd_object_child(fo)); + if ((__s64)curr_version != -EOPNOTSUPP) { + oinfo->oi_oa->o_valid |= OBD_MD_FLDATAVERSION; + oinfo->oi_oa->o_data_version = curr_version; + } + ofd_object_put(env, fo); +out: + RETURN(rc); +} + static int ofd_sync(const struct lu_env *env, struct obd_export *exp, struct obd_info *oinfo, obd_size start, obd_size end, struct ptlrpc_request_set *set) { struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + struct ofd_object *fo; int rc = 0; ENTRY; @@ -702,7 +1265,37 @@ static int ofd_sync(const struct lu_env *env, struct obd_export *exp, GOTO(out, rc); } + info = ofd_info_init(env, exp); + fid_ostid_unpack(&info->fti_fid, &oinfo->oi_oa->o_oi, 0); + + rc = ofd_auth_capa(exp, &info->fti_fid, oinfo->oi_oa->o_seq, + oinfo_capa(oinfo), CAPA_OPC_OSS_TRUNC); + if (rc) + GOTO(out, rc); + + fo = ofd_object_find(env, ofd, &info->fti_fid); + if (IS_ERR(fo)) { + CERROR("%s: error finding object "DFID": rc = %ld\n", + exp->exp_obd->obd_name, PFID(&info->fti_fid), + PTR_ERR(fo)); + GOTO(out, rc = PTR_ERR(fo)); + } + + ofd_write_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + rc = dt_object_sync(env, ofd_object_child(fo)); + if (rc) + GOTO(unlock, rc); + + oinfo->oi_oa->o_valid = OBD_MD_FLID; + rc = ofd_attr_get(env, fo, &info->fti_attr); + obdo_from_la(oinfo->oi_oa, &info->fti_attr, OFD_VALID_FLAGS); EXIT; +unlock: + ofd_write_unlock(env, fo); + ofd_object_put(env, fo); out: return rc; } @@ -724,7 +1317,7 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, switch (cmd) { case OBD_IOC_ABORT_RECOVERY: - CERROR("aborting recovery for device %s\n", obd->obd_name); + CERROR("%s: aborting recovery\n", obd->obd_name); target_stop_recovery_thread(obd); break; case OBD_IOC_SYNC: @@ -737,8 +1330,7 @@ int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = dt_ro(&env, ofd->ofd_osd); break; default: - CERROR("Not supported cmd = %d for device %s\n", - cmd, obd->obd_name); + CERROR("%s: not supported cmd = %d\n", obd->obd_name, cmd); rc = -ENOTTY; } @@ -833,6 +1425,69 @@ static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused, return 0; } +/* + * Handle quotacheck requests. + * Although in-kernel quotacheck isn't supported any more, we still emulate it + * in order to interoperate with current MDT stack which needs proper + * quotacheck support, even for space accounting. + * + * \param obd - is the obd device associated with the ofd + * \param exp - is the client's export + * \param oqctl - is the obd_quotactl request to be processed + */ +static int ofd_quotacheck(struct obd_device *obd, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct ptlrpc_request *req; + struct obd_quotactl *body; + ENTRY; + + req = ptlrpc_request_alloc_pack(exp->exp_imp_reverse, &RQF_QC_CALLBACK, + LUSTRE_OBD_VERSION, OBD_QC_CALLBACK); + if (req == NULL) + RETURN(-ENOMEM); + + body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); + oqctl->qc_stat = 0; + memcpy(body, oqctl, sizeof(*body)); + + ptlrpc_request_set_replen(req); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + + RETURN(0); +} + +/* + * Handle quota control requests to consult current usage/limit, but also + * to configure quota enforcement + * + * \param obd - is the obd device associated with the ofd + * \param exp - is the client's export + * \param oqctl - is the obd_quotactl request to be processed + */ +static int ofd_quotactl(struct obd_device *obd, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev); + struct lu_env env; + int rc; + ENTRY; + + /* report success for quota on/off for interoperability with current MDT + * stack */ + if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF) + RETURN(0); + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + rc = lquotactl_slv(&env, ofd->ofd_osd, oqctl); + lu_env_fini(&env); + + RETURN(rc); +} + struct obd_ops ofd_obd_ops = { .o_owner = THIS_MODULE, .o_connect = ofd_obd_connect, @@ -840,14 +1495,23 @@ struct obd_ops ofd_obd_ops = { .o_disconnect = ofd_obd_disconnect, .o_set_info_async = ofd_set_info_async, .o_get_info = ofd_get_info, + .o_create = ofd_create, .o_statfs = ofd_statfs, + .o_setattr = ofd_setattr, + .o_preprw = ofd_preprw, + .o_commitrw = ofd_commitrw, + .o_destroy = ofd_destroy, .o_init_export = ofd_init_export, .o_destroy_export = ofd_destroy_export, .o_postrecov = ofd_obd_postrecov, + .o_punch = ofd_punch, + .o_getattr = ofd_getattr, .o_sync = ofd_sync, .o_iocontrol = ofd_iocontrol, .o_precleanup = ofd_precleanup, .o_ping = ofd_ping, .o_health_check = ofd_health_check, .o_notify = ofd_obd_notify, + .o_quotactl = ofd_quotactl, + .o_quotacheck = ofd_quotacheck, };