X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=2ce7a689d07c0863055aec6e589b9e271cfb041f;hp=57426d9ca5ca2b9582eeb8919350d7618118da2f;hb=c6f3d533542b9462b5b8df95183d80321d4d9c34;hpb=2728de4db8c0f8177202e2ffcad74a1893b210c9 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 57426d9..2ce7a68 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -66,7 +66,6 @@ #include "mdt_internal.h" - static unsigned int max_mod_rpcs_per_client = 8; module_param(max_mod_rpcs_per_client, uint, 0644); MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client"); @@ -94,7 +93,6 @@ enum ldlm_mode mdt_dlm_lock_modes[] = { }; static struct mdt_device *mdt_dev(struct lu_device *d); -static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags); static const struct lu_object_operations mdt_obj_ops; @@ -270,18 +268,13 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, { struct mdt_device *mdt = info->mti_mdt; struct lu_name *lname = &info->mti_name; - char *name = NULL; + char *filename = info->mti_filename; struct mdt_object *parent; u32 mode; int rc = 0; LASSERT(!info->mti_cross_ref); - OBD_ALLOC(name, NAME_MAX + 1); - if (name == NULL) - return -ENOMEM; - lname->ln_name = name; - /* * We may want to allow this to mount a completely separate * fileset from the MDT in the future, but keeping it to @@ -317,8 +310,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, break; } - strncpy(name, s1, lname->ln_namelen); - name[lname->ln_namelen] = '\0'; + strncpy(filename, s1, lname->ln_namelen); + filename[lname->ln_namelen] = '\0'; + lname->ln_name = filename; parent = mdt_object_find(info->mti_env, mdt, fid); if (IS_ERR(parent)) { @@ -343,8 +337,6 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, } } - OBD_FREE(name, NAME_MAX + 1); - return rc; } @@ -413,13 +405,16 @@ out: static int mdt_statfs(struct tgt_session_info *tsi) { - struct ptlrpc_request *req = tgt_ses_req(tsi); - struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_device *mdt = info->mti_mdt; - struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; - struct ptlrpc_service_part *svcpt; - struct obd_statfs *osfs; - int rc; + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct mdt_device *mdt = info->mti_mdt; + struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; + struct md_device *next = mdt->mdt_child; + struct ptlrpc_service_part *svcpt; + struct obd_statfs *osfs; + struct mdt_body *reqbody = NULL; + struct mdt_statfs_cache *msf; + int rc; ENTRY; @@ -441,11 +436,39 @@ static int mdt_statfs(struct tgt_session_info *tsi) if (!osfs) GOTO(out, rc = -EPROTO); - rc = tgt_statfs_internal(tsi->tsi_env, &mdt->mdt_lut, osfs, - ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS, - NULL); - if (unlikely(rc)) - GOTO(out, rc); + if (mdt_is_sum_statfs_client(req->rq_export) && + lustre_packed_msg_size(req->rq_reqmsg) == + req_capsule_fmt_size(req->rq_reqmsg->lm_magic, + &RQF_MDS_STATFS_NEW, RCL_CLIENT)) { + req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW); + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + } + + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + msf = &mdt->mdt_sum_osfs; + else + msf = &mdt->mdt_osfs; + + if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) { + /** statfs data is too old, get up-to-date one */ + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + rc = next->md_ops->mdo_statfs(info->mti_env, + next, osfs); + else + rc = dt_statfs(info->mti_env, mdt->mdt_bottom, + osfs); + if (rc) + GOTO(out, rc); + spin_lock(&mdt->mdt_lock); + msf->msf_osfs = *osfs; + msf->msf_age = ktime_get_seconds(); + spin_unlock(&mdt->mdt_lock); + } else { + /** use cached statfs data */ + spin_lock(&mdt->mdt_lock); + *osfs = msf->msf_osfs; + spin_unlock(&mdt->mdt_lock); + } /* at least try to account for cached pages. its still racy and * might be under-reporting if clients haven't announced their @@ -490,11 +513,12 @@ out: * Pack size attributes into the reply. */ int mdt_pack_size2body(struct mdt_thread_info *info, - const struct lu_fid *fid, bool dom_lock) + const struct lu_fid *fid, struct lustre_handle *lh) { struct mdt_body *b; struct md_attr *ma = &info->mti_attr; int dom_stripe; + bool dom_lock = false; ENTRY; @@ -509,6 +533,16 @@ int mdt_pack_size2body(struct mdt_thread_info *info, if (dom_stripe == LMM_NO_DOM) RETURN(-ENOENT); + if (lustre_handle_is_used(lh)) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(lh); + if (lock != NULL) { + dom_lock = ldlm_has_dom(lock); + LDLM_LOCK_PUT(lock); + } + } + /* no DoM lock, no size in reply */ if (!dom_lock) RETURN(0); @@ -539,13 +573,13 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody, struct md_object *next = mdt_object_child(o); struct lu_buf *buf = &info->mti_buf; struct mdt_device *mdt = info->mti_mdt; + struct req_capsule *pill = info->mti_pill; int rc; ENTRY; - buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL); - buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL, - RCL_SERVER); + buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL); + buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER); if (buf->lb_len == 0) RETURN(0); @@ -563,25 +597,26 @@ again: exp_connect_large_acl(info->mti_exp) && buf->lb_buf != info->mti_big_acl) { if (info->mti_big_acl == NULL) { + info->mti_big_aclsize = + MIN(mdt->mdt_max_ea_size, + XATTR_SIZE_MAX); OBD_ALLOC_LARGE(info->mti_big_acl, - mdt->mdt_max_ea_size); + info->mti_big_aclsize); if (info->mti_big_acl == NULL) { + info->mti_big_aclsize = 0; CERROR("%s: unable to grow " DFID" ACL buffer\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o))); RETURN(-ENOMEM); } - - info->mti_big_aclsize = - mdt->mdt_max_ea_size; } CDEBUG(D_INODE, "%s: grow the "DFID " ACL buffer to size %d\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o)), - mdt->mdt_max_ea_size); + info->mti_big_aclsize); buf->lb_buf = info->mti_big_acl; buf->lb_len = info->mti_big_aclsize; @@ -593,6 +628,36 @@ again: mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc); } } else { + int client; + int server; + int acl_buflen; + int lmm_buflen = 0; + int lmmsize = 0; + + acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER); + if (acl_buflen >= rc) + goto map; + + /* If LOV/LMA EA is small, we can reuse part of their buffer */ + client = ptlrpc_req_get_repsize(pill->rc_req); + server = lustre_packed_msg_size(pill->rc_req->rq_repmsg); + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) { + lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD, + RCL_SERVER); + lmmsize = repbody->mbo_eadatasize; + } + + if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) { + CDEBUG(D_INODE, "%s: client prepared buffer size %d " + "is not big enough with the ACL size %d (%d)\n", + mdt_obd_name(mdt), client, rc, + server - acl_buflen - lmm_buflen + rc + lmmsize); + repbody->mbo_aclsize = 0; + repbody->mbo_valid &= ~OBD_MD_FLACL; + RETURN(-ERANGE); + } + +map: if (buf->lb_buf == info->mti_big_acl) info->mti_big_acl_used = 1; @@ -603,6 +668,8 @@ again: CERROR("%s: nodemap_map_acl unable to parse "DFID " ACL: rc = %d\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc); + repbody->mbo_aclsize = 0; + repbody->mbo_valid &= ~OBD_MD_FLACL; } else { repbody->mbo_aclsize = rc; repbody->mbo_valid |= OBD_MD_FLACL; @@ -873,6 +940,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, return -EINVAL; } + LASSERT(buf->lb_buf); + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { @@ -925,8 +994,8 @@ got: return rc; } -static int mdt_attr_get_pfid(struct mdt_thread_info *info, - struct mdt_object *o, struct lu_fid *pfid) +int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, + struct lu_fid *pfid) { struct lu_buf *buf = &info->mti_buf; struct link_ea_header *leh; @@ -996,7 +1065,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); if (S_ISREG(mode)) - (void) mdt_get_som(info, o, &ma->ma_attr); + (void) mdt_get_som(info, o, ma); ma->ma_valid |= MA_INODE; } @@ -1026,6 +1095,15 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); } + /* + * In the handle of MA_INODE, we may already get the SOM attr. + */ + if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) { + rc = mdt_get_som(info, o, ma); + if (rc != 0) + GOTO(out, rc); + } + if (need & MA_HSM && S_ISREG(mode)) { buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); @@ -1272,6 +1350,12 @@ static int mdt_getattr(struct tgt_session_info *tsi) LASSERT(obj != NULL); LASSERT(lu_object_assert_exists(&obj->mot_obj)); + /* Special case for Data-on-MDT files to get data version */ + if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) { + rc = mdt_data_version_get(tsi); + GOTO(out, rc); + } + /* Unlike intent case where we need to pre-fill out buffers early on * in intent policy for ldlm reasons, here we can have a much better * guess at EA size by just reading it from disk. @@ -1281,7 +1365,6 @@ static int mdt_getattr(struct tgt_session_info *tsi) /* No easy way to know how long is the symlink, but it cannot * be more than PATH_MAX, so we allocate +1 */ rc = PATH_MAX + 1; - /* A special case for fs ROOT: getattr there might fetch * default EA for entire fs, not just for this dir! */ @@ -1325,12 +1408,12 @@ static int mdt_getattr(struct tgt_session_info *tsi) info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); rc = mdt_getattr_internal(info, obj, 0); - EXIT; + EXIT; out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; + mdt_client_compatibility(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; out: mdt_thread_info_fini(info); return rc; @@ -1370,8 +1453,9 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, if (rc) GOTO(out, rc); + mutex_lock(&obj->mot_som_mutex); rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout); - + mutex_unlock(&obj->mot_som_mutex); mdt_object_unlock(info, obj, lh, 1); out: RETURN(rc); @@ -1444,12 +1528,12 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) /* permission check. Make sure the calling process having permission * to write both files. */ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); @@ -1538,9 +1622,11 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, struct lu_name *lname = NULL; struct mdt_lock_handle *lhp = NULL; struct ldlm_lock *lock; + struct req_capsule *pill = info->mti_pill; __u64 try_bits = 0; bool is_resent; int ma_need = 0; + bool deal_with_dom = false; int rc; ENTRY; @@ -1569,12 +1655,15 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_lock_reg_init(lhc, LCK_PR); /* - * Object's name is on another MDS, no lookup or layout - * lock is needed here but update lock is. + * Object's name entry is on another MDS, it will + * request PERM lock only because LOOKUP lock is owned + * by the MDS where name entry resides. + * + * TODO: it should try layout lock too. - Jinshan */ child_bits &= ~(MDS_INODELOCK_LOOKUP | MDS_INODELOCK_LAYOUT); - child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE; + child_bits |= MDS_INODELOCK_PERM; rc = mdt_object_lock(info, child, lhc, child_bits); if (rc < 0) @@ -1594,18 +1683,20 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (unlikely(rc != 0)) mdt_object_unlock(info, child, lhc, 1); - RETURN(rc); - } + mdt_pack_secctx_in_reply(info, child); + + RETURN(rc); + } lname = &info->mti_name; - mdt_name_unpack(info->mti_pill, &RMF_NAME, lname, MNF_FIX_ANON); + mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON); if (lu_name_is_valid(lname)) { CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", " "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), PNAME(lname), ldlm_rep); } else { - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); if (unlikely(reqbody == NULL)) RETURN(err_serious(-EPROTO)); @@ -1758,16 +1849,17 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, child_bits &= ~MDS_INODELOCK_UPDATE; rc = mdt_object_lock(info, child, lhc, child_bits); } - if (unlikely(rc != 0)) - GOTO(out_child, rc); - } + if (unlikely(rc != 0)) + GOTO(out_child, rc); + } - lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); - /* finally, we can get attr for child. */ - rc = mdt_getattr_internal(info, child, ma_need); - if (unlikely(rc != 0)) { - mdt_object_unlock(info, child, lhc, 1); + /* finally, we can get attr for child. */ + rc = mdt_getattr_internal(info, child, ma_need); + if (unlikely(rc != 0)) { + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_lock, rc); } else if (lock) { /* Debugging code. */ LDLM_DEBUG(lock, "Returning lock to client"); @@ -1779,35 +1871,36 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (S_ISREG(lu_object_attr(&child->mot_obj)) && mdt_object_exists(child) && !mdt_object_remote(child) && - child != parent) { - LDLM_LOCK_PUT(lock); - mdt_object_put(info->mti_env, child); - /* NB: call the mdt_pack_size2body always after - * mdt_object_put(), that is why this speacial - * exit path is used. */ - rc = mdt_pack_size2body(info, child_fid, - child_bits & MDS_INODELOCK_DOM); - if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { - /* DOM lock was taken in advance but this is - * not DoM file. Drop the lock. */ - lock_res_and_lock(lock); - ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM); - unlock_res_and_lock(lock); - } + child != parent) + deal_with_dom = true; + } - GOTO(out_parent, rc = 0); - } - } - if (lock) - LDLM_LOCK_PUT(lock); + mdt_pack_secctx_in_reply(info, child); - EXIT; +out_lock: + if (lock) + LDLM_LOCK_PUT(lock); + + EXIT; out_child: - mdt_object_put(info->mti_env, child); + mdt_object_put(info->mti_env, child); + if (deal_with_dom) { + rc = mdt_pack_size2body(info, child_fid, + &lhc->mlh_reg_lh); + if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { + /* DOM lock was taken in advance but this is + * not DoM file. Drop the lock. + */ + lock_res_and_lock(lock); + ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM); + unlock_res_and_lock(lock); + } + rc = 0; + } out_parent: - if (lhp) - mdt_object_unlock(info, parent, lhp, 1); - return rc; + if (lhp) + mdt_object_unlock(info, parent, lhp, 1); + return rc; } /* normal handler: should release the child lock */ @@ -2003,6 +2096,34 @@ static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op) return 0; } +static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op) +{ + return op == REINT_OPEN && + !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT)); +} + +static void mdt_preset_secctx_size(struct mdt_thread_info *info) +{ + struct req_capsule *pill = info->mti_pill; + + if (req_capsule_has_field(pill, &RMF_FILE_SECCTX, + RCL_SERVER) && + req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT)) { + if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT) != 0) { + /* pre-set size in server part with max size */ + req_capsule_set_size(pill, &RMF_FILE_SECCTX, + RCL_SERVER, + info->mti_mdt->mdt_max_ea_size); + } else { + req_capsule_set_size(pill, &RMF_FILE_SECCTX, + RCL_SERVER, 0); + } + } + +} + static int mdt_reint_internal(struct mdt_thread_info *info, struct mdt_lock_handle *lhc, __u32 op) @@ -2013,19 +2134,25 @@ static int mdt_reint_internal(struct mdt_thread_info *info, ENTRY; - rc = mdt_reint_unpack(info, op); - if (rc != 0) { - CERROR("Can't unpack reint, rc %d\n", rc); - RETURN(err_serious(rc)); - } + rc = mdt_reint_unpack(info, op); + if (rc != 0) { + CERROR("Can't unpack reint, rc %d\n", rc); + RETURN(err_serious(rc)); + } + - /* for replay (no_create) lmm is not needed, client has it already */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, + /* check if the file system is set to readonly. O_RDONLY open + * is still allowed even the file system is set to readonly mode */ + if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op)) + RETURN(err_serious(-EROFS)); + + /* for replay (no_create) lmm is not needed, client has it already */ + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, DEF_REP_MD_SIZE); /* llog cookies are always 0, the field is kept for compatibility */ - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD @@ -2035,33 +2162,35 @@ static int mdt_reint_internal(struct mdt_thread_info *info, req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); - rc = req_capsule_server_pack(pill); - if (rc != 0) { - CERROR("Can't pack response, rc %d\n", rc); - RETURN(err_serious(rc)); - } + mdt_preset_secctx_size(info); + + rc = req_capsule_server_pack(pill); + if (rc != 0) { + CERROR("Can't pack response, rc %d\n", rc); + RETURN(err_serious(rc)); + } - if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) { - repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); - LASSERT(repbody); + if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) { + repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); + LASSERT(repbody); repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - } + } - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); - /* for replay no cookkie / lmm need, because client have this already */ - if (info->mti_spec.no_create) - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0); + /* for replay no cookkie / lmm need, because client have this already */ + if (info->mti_spec.no_create) + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0); - rc = mdt_init_ucred_reint(info); - if (rc) - GOTO(out_shrink, rc); + rc = mdt_init_ucred_reint(info); + if (rc) + GOTO(out_shrink, rc); - rc = mdt_fix_attr_ucred(info, op); - if (rc != 0) - GOTO(out_ucred, rc = err_serious(rc)); + rc = mdt_fix_attr_ucred(info, op); + if (rc != 0) + GOTO(out_ucred, rc = err_serious(rc)); rc = mdt_check_resent(info, mdt_reconstruct, lhc); if (rc < 0) { @@ -2069,18 +2198,31 @@ static int mdt_reint_internal(struct mdt_thread_info *info, } else if (rc == 1) { DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); - GOTO(out_ucred, rc); - } - rc = mdt_reint_rec(info, lhc); - EXIT; + GOTO(out_ucred, rc); + } + rc = mdt_reint_rec(info, lhc); + EXIT; out_ucred: - mdt_exit_ucred(info); + mdt_exit_ucred(info); out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - return rc; + mdt_client_compatibility(info); + + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + + /* + * Data-on-MDT optimization - read data along with OPEN and return it + * in reply. Do that only if we have both DOM and LAYOUT locks. + */ + if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) && + info->mti_attr.ma_lmm != NULL && + mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) { + rc = mdt_dom_read_on_open(info, info->mti_mdt, + &lhc->mlh_reg_lh); + } + + return rc; } static long mdt_reint_opcode(struct ptlrpc_request *req, @@ -2122,7 +2264,7 @@ static int mdt_reint(struct tgt_session_info *tsi) [REINT_OPEN] = &RQF_MDS_REINT_OPEN, [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR, [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK, - [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME, + [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE, [REINT_RESYNC] = &RQF_MDS_REINT_RESYNC, }; @@ -2146,7 +2288,7 @@ static int mdt_reint(struct tgt_session_info *tsi) } /* this should sync the whole device */ -static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) +int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) { struct dt_device *dt = mdt->mdt_bottom; int rc; @@ -2298,10 +2440,12 @@ static int mdt_quotactl(struct tgt_session_info *tsi) /* master quotactl */ case Q_SETINFO: case Q_SETQUOTA: + case LUSTRE_Q_SETDEFAULT: if (!nodemap_can_setquota(nodemap)) GOTO(out_nodemap, rc = -EPERM); case Q_GETINFO: case Q_GETQUOTA: + case LUSTRE_Q_GETDEFAULT: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ @@ -2351,6 +2495,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_SETINFO: case Q_SETQUOTA: case Q_GETQUOTA: + case LUSTRE_Q_SETDEFAULT: + case LUSTRE_Q_GETDEFAULT: /* forward quotactl request to QMT */ rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); break; @@ -2558,6 +2704,7 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, { struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd; struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + struct ldlm_cb_set_arg *arg = data; bool commit_async = false; int rc; ENTRY; @@ -2570,17 +2717,22 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, unlock_res_and_lock(lock); RETURN(0); } - /* There is no lock conflict if l_blocking_lock == NULL, - * it indicates a blocking ast sent from ldlm_lock_decref_internal - * when the last reference to a local lock was released */ - if (lock->l_req_mode & (LCK_PW | LCK_EX) && - lock->l_blocking_lock != NULL) { + + /* A blocking ast may be sent from ldlm_lock_decref_internal + * when the last reference to a local lock was released and + * during blocking event from ldlm_work_bl_ast_lock(). + * The 'data' parameter is l_ast_data in the first case and + * callback arguments in the second one. Distinguish them by that. + */ + if (!data || data == lock->l_ast_data || !arg->bl_desc) + goto skip_cos_checks; + + if (lock->l_req_mode & (LCK_PW | LCK_EX)) { if (mdt_cos_is_enabled(mdt)) { - if (lock->l_client_cookie != - lock->l_blocking_lock->l_client_cookie) + if (!arg->bl_desc->bl_same_client) mdt_set_lock_sync(lock); } else if (mdt_slc_is_enabled(mdt) && - ldlm_is_cos_incompat(lock->l_blocking_lock)) { + arg->bl_desc->bl_cos_incompat) { mdt_set_lock_sync(lock); /* * we may do extra commit here, but there is a small @@ -2594,11 +2746,11 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, */ commit_async = true; } - } else if (lock->l_req_mode == LCK_COS && - lock->l_blocking_lock != NULL) { + } else if (lock->l_req_mode == LCK_COS) { commit_async = true; } +skip_cos_checks: rc = ldlm_blocking_ast_nocheck(lock); if (commit_async) { @@ -2724,12 +2876,17 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, return 1; } +static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock) +{ + mdt_object_get(NULL, lock->l_ast_data); +} + int mdt_remote_object_lock_try(struct mdt_thread_info *mti, struct mdt_object *o, const struct lu_fid *fid, struct lustre_handle *lh, enum ldlm_mode mode, __u64 *ibits, __u64 trybits, bool cache) { - struct ldlm_enqueue_info *einfo = &mti->mti_einfo; + struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo; union ldlm_policy_data *policy = &mti->mti_policy; struct ldlm_res_id *res_id = &mti->mti_res_id; int rc = 0; @@ -2752,21 +2909,16 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, * if we cache lock, couple lock with mdt_object, so that object * can be easily found in lock ASTs. */ - mdt_object_get(mti->mti_env, o); einfo->ei_cbdata = o; + einfo->ei_cb_created = mdt_remote_object_lock_created_cb; } - memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = *ibits; policy->l_inodebits.try_bits = trybits; rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, policy); - if (rc < 0 && cache) { - mdt_object_put(mti->mti_env, o); - einfo->ei_cbdata = NULL; - } /* Return successfully acquired bits to a caller */ if (rc == 0) { @@ -2795,7 +2947,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0; + __u64 dlmflags = 0, *cookie = NULL; int rc; ENTRY; @@ -2815,6 +2967,11 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, /* Only enqueue LOOKUP lock for remote object */ LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP)); + /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */ + if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX && + *ibits == MDS_INODELOCK_OPEN) + dlmflags |= LDLM_FL_CANCEL_ON_BLOCK; + if (lh->mlh_type == MDT_PDO_LOCK) { /* check for exists after object is locked */ if (mdt_object_exists(o) == 0) { @@ -2827,10 +2984,12 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, } } - fid_build_reg_res_name(mdt_object_fid(o), res_id); dlmflags |= LDLM_FL_ATOMIC_CB; + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + /* * Take PDO lock on whole directory and build correct @res_id for lock * on part of directory. @@ -2844,12 +3003,16 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * is never going to be sent to client and we do not * want it slowed down due to possible cancels. */ - policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; - policy->l_inodebits.try_bits = 0; - rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + policy->l_inodebits.bits = + *ibits & MDS_INODELOCK_UPDATE; + policy->l_inodebits.try_bits = + trybits & MDS_INODELOCK_UPDATE; + /* at least one of them should be set */ + LASSERT(policy->l_inodebits.bits | + policy->l_inodebits.try_bits); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, policy, res_id, + dlmflags, cookie); if (unlikely(rc != 0)) GOTO(out_unlock, rc); } @@ -2869,10 +3032,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, + policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, + cookie); out_unlock: if (rc != 0) mdt_object_unlock(info, o, lh, 1); @@ -2949,6 +3111,10 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, } } + /* other components like LFSCK can use lockless access + * and populate cache, so we better invalidate it */ + mo_invalidate(info->mti_env, mdt_object_child(o)); + RETURN(0); } @@ -3155,11 +3321,12 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, * - create lu_object, corresponding to the fid in mdt_body, and save it in * @info; * - * - if HABEO_CORPUS flag is set for this request type check whether object + * - if HAS_BODY flag is set for this request type check whether object * actually exists on storage (lu_object_exists()). * */ -static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) +static int mdt_body_unpack(struct mdt_thread_info *info, + enum tgt_handler_flags flags) { const struct mdt_body *body; struct mdt_object *obj; @@ -3185,7 +3352,7 @@ static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) obj = mdt_object_find(env, info->mti_mdt, &body->mbo_fid1); if (!IS_ERR(obj)) { - if ((flags & HABEO_CORPUS) && !mdt_object_exists(obj)) { + if ((flags & HAS_BODY) && !mdt_object_exists(obj)) { mdt_object_put(env, obj); rc = -ENOENT; } else { @@ -3198,23 +3365,25 @@ static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) RETURN(rc); } -static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags) +static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, + enum tgt_handler_flags flags) { - struct req_capsule *pill = info->mti_pill; - int rc; - ENTRY; + struct req_capsule *pill = info->mti_pill; + int rc; - if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) - rc = mdt_body_unpack(info, flags); - else - rc = 0; + ENTRY; + + if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) + rc = mdt_body_unpack(info, flags); + else + rc = 0; - if (rc == 0 && (flags & HABEO_REFERO)) { - /* Pack reply. */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, + if (rc == 0 && (flags & HAS_REPLY)) { + /* Pack reply. */ + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, DEF_REP_MD_SIZE); - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); @@ -3225,9 +3394,14 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags) req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); - rc = req_capsule_server_pack(pill); - } - RETURN(rc); + mdt_preset_secctx_size(info); + + rc = req_capsule_server_pack(pill); + if (rc) + CWARN("%s: cannot pack response: rc = %d\n", + mdt_obd_name(info->mti_mdt), rc); + } + RETURN(rc); } void mdt_lock_handle_init(struct mdt_lock_handle *lh) @@ -3342,48 +3516,14 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi) return tgt_connect(tsi); } -enum mdt_it_code { - MDT_IT_OPEN, - MDT_IT_OCREAT, - MDT_IT_CREATE, - MDT_IT_GETATTR, - MDT_IT_READDIR, - MDT_IT_LOOKUP, - MDT_IT_UNLINK, - MDT_IT_TRUNC, - MDT_IT_GETXATTR, - MDT_IT_LAYOUT, - MDT_IT_QUOTA, - MDT_IT_GLIMPSE, - MDT_IT_BRW, - MDT_IT_NR -}; - -static int mdt_intent_getattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, __u64); - -static int mdt_intent_getxattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags); - -static int mdt_intent_layout(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); -static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); -static int mdt_intent_glimpse(enum mdt_it_code opcode, +static int mdt_intent_glimpse(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) { return mdt_glimpse_enqueue(info, info->mti_mdt->mdt_namespace, lockp, flags); } -static int mdt_intent_brw(enum mdt_it_code opcode, +static int mdt_intent_brw(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) { @@ -3391,90 +3531,6 @@ static int mdt_intent_brw(enum mdt_it_code opcode, lockp, flags); } -static struct mdt_it_flavor { - const struct req_format *it_fmt; - __u32 it_flags; - int (*it_act)(enum mdt_it_code , - struct mdt_thread_info *, - struct ldlm_lock **, - __u64); - long it_reint; -} mdt_it_flavor[] = { - [MDT_IT_OPEN] = { - .it_fmt = &RQF_LDLM_INTENT, - /*.it_flags = HABEO_REFERO,*/ - .it_flags = 0, - .it_act = mdt_intent_reint, - .it_reint = REINT_OPEN - }, - [MDT_IT_OCREAT] = { - .it_fmt = &RQF_LDLM_INTENT, - /* - * OCREAT is not a MUTABOR request as if the file - * already exists. - * We do the extra check of OBD_CONNECT_RDONLY in - * mdt_reint_open() when we really need to create - * the object. - */ - .it_flags = 0, - .it_act = mdt_intent_reint, - .it_reint = REINT_OPEN - }, - [MDT_IT_CREATE] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_flags = MUTABOR, - .it_act = mdt_intent_reint, - .it_reint = REINT_CREATE - }, - [MDT_IT_GETATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_flags = HABEO_REFERO, - .it_act = mdt_intent_getattr - }, - [MDT_IT_READDIR] = { - .it_fmt = NULL, - .it_flags = 0, - .it_act = NULL - }, - [MDT_IT_LOOKUP] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_flags = HABEO_REFERO, - .it_act = mdt_intent_getattr - }, - [MDT_IT_UNLINK] = { - .it_fmt = &RQF_LDLM_INTENT_UNLINK, - .it_flags = MUTABOR, - .it_act = NULL, - .it_reint = REINT_UNLINK - }, - [MDT_IT_TRUNC] = { - .it_fmt = NULL, - .it_flags = MUTABOR, - .it_act = NULL - }, - [MDT_IT_GETXATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETXATTR, - .it_flags = HABEO_CORPUS, - .it_act = mdt_intent_getxattr - }, - [MDT_IT_LAYOUT] = { - .it_fmt = &RQF_LDLM_INTENT_LAYOUT, - .it_flags = 0, - .it_act = mdt_intent_layout - }, - [MDT_IT_GLIMPSE] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_flags = 0, - .it_act = mdt_intent_glimpse, - }, - [MDT_IT_BRW] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_flags = 0, - .it_act = mdt_intent_brw, - }, - -}; - int mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp, struct mdt_lock_handle *lh, @@ -3614,10 +3670,10 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, dlmreq->lock_handle[0].cookie); } -static int mdt_intent_getxattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags) +static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *ldlm_rep = NULL; @@ -3646,7 +3702,10 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode, if (ldlm_rep == NULL || OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) { mdt_object_unlock(info, info->mti_object, lhc, 1); - RETURN(err_serious(-EFAULT)); + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); } ldlm_rep->lock_policy_res2 = clear_serious(rc); @@ -3664,7 +3723,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode, RETURN(rc); } -static int mdt_intent_getattr(enum mdt_it_code opcode, +static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3687,18 +3746,19 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - switch (opcode) { - case MDT_IT_LOOKUP: + switch (it_opc) { + case IT_LOOKUP: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; - break; - case MDT_IT_GETATTR: + break; + case IT_GETATTR: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM; - break; - default: - CERROR("Unsupported intent (%d)\n", opcode); - GOTO(out_shrink, rc = -EINVAL); - } + break; + default: + CERROR("%s: unsupported intent %#x\n", + mdt_obd_name(info->mti_mdt), (unsigned int)it_opc); + GOTO(out_shrink, rc = -EINVAL); + } rc = mdt_init_ucred_intent_getattr(info, reqbody); if (rc) @@ -3733,7 +3793,7 @@ out_shrink: return rc; } -static int mdt_intent_layout(enum mdt_it_code opcode, +static int mdt_intent_layout(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3747,12 +3807,6 @@ static int mdt_intent_layout(enum mdt_it_code opcode, int rc = 0; ENTRY; - if (opcode != MDT_IT_LAYOUT) { - CERROR("%s: Unknown intent (%d)\n", mdt_obd_name(info->mti_mdt), - opcode); - RETURN(-EINVAL); - } - fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); intent = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT); @@ -3812,6 +3866,8 @@ static int mdt_intent_layout(enum mdt_it_code opcode, if (layout_size > info->mti_mdt->mdt_max_mdsize) info->mti_mdt->mdt_max_mdsize = layout_size; } + CDEBUG(D_INFO, "%s: layout_size %d\n", + mdt_obd_name(info->mti_mdt), layout_size); } /* @@ -3883,13 +3939,13 @@ out_obj: out: lhc->mlh_reg_lh.cookie = 0; - return rc; + RETURN(rc); } -static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags) +static int mdt_intent_open(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *rep = NULL; @@ -3907,22 +3963,20 @@ static int mdt_intent_reint(enum mdt_it_code opcode, if (opc < 0) RETURN(opc); - if (mdt_it_flavor[opcode].it_reint != opc) { - CERROR("Reint code %ld doesn't match intent: %d\n", - opc, opcode); - RETURN(err_serious(-EPROTO)); - } - /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); rc = mdt_reint_internal(info, lhc, opc); - /* Check whether the reply has been packed successfully. */ - if (mdt_info_req(info)->rq_repmsg != NULL) - rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); - if (rep == NULL) - RETURN(err_serious(-EFAULT)); + /* Check whether the reply has been packed successfully. */ + if (mdt_info_req(info)->rq_repmsg != NULL) + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (rep == NULL) { + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); + } /* MDC expects this in any case */ if (rc != 0) @@ -3962,75 +4016,60 @@ static int mdt_intent_reint(enum mdt_it_code opcode, RETURN(ELDLM_LOCK_ABORTED); } -static int mdt_intent_code(enum ldlm_intent_flags itcode) -{ +static int mdt_intent_opc(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + u64 flags /* LDLM_FL_* */) +{ + struct req_capsule *pill = info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + const struct req_format *it_format; + int (*it_handler)(enum ldlm_intent_flags, + struct mdt_thread_info *, + struct ldlm_lock **, + u64); + enum tgt_handler_flags it_handler_flags = 0; + struct ldlm_reply *rep; int rc; + ENTRY; - switch (itcode) { + switch (it_opc) { case IT_OPEN: - rc = MDT_IT_OPEN; - break; case IT_OPEN|IT_CREAT: - rc = MDT_IT_OCREAT; - break; - case IT_CREAT: - rc = MDT_IT_CREATE; - break; - case IT_READDIR: - rc = MDT_IT_READDIR; + /* + * OCREAT is not a IS_MUTABLE request since the file may + * already exist. We do the extra check of + * OBD_CONNECT_RDONLY in mdt_reint_open() when we + * really need to create the object. + */ + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_open; break; case IT_GETATTR: - rc = MDT_IT_GETATTR; - break; case IT_LOOKUP: - rc = MDT_IT_LOOKUP; - break; - case IT_UNLINK: - rc = MDT_IT_UNLINK; - break; - case IT_TRUNC: - rc = MDT_IT_TRUNC; + it_format = &RQF_LDLM_INTENT_GETATTR; + it_handler = &mdt_intent_getattr; + it_handler_flags = HAS_REPLY; break; case IT_GETXATTR: - rc = MDT_IT_GETXATTR; + it_format = &RQF_LDLM_INTENT_GETXATTR; + it_handler = &mdt_intent_getxattr; + it_handler_flags = HAS_BODY; break; case IT_LAYOUT: - rc = MDT_IT_LAYOUT; - break; - case IT_QUOTA_DQACQ: - case IT_QUOTA_CONN: - rc = MDT_IT_QUOTA; + it_format = &RQF_LDLM_INTENT_LAYOUT; + it_handler = &mdt_intent_layout; break; case IT_GLIMPSE: - rc = MDT_IT_GLIMPSE; + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_glimpse; break; case IT_BRW: - rc = MDT_IT_BRW; - break; - default: - CERROR("Unknown intent opcode: 0x%08x\n", itcode); - rc = -EINVAL; + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_brw; break; - } - return rc; -} - -static int mdt_intent_opc(enum ldlm_intent_flags itopc, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, __u64 flags) -{ - struct req_capsule *pill = info->mti_pill; - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_it_flavor *flv; - int opc; - int rc; - ENTRY; - - opc = mdt_intent_code(itopc); - if (opc < 0) - RETURN(-EINVAL); - - if (opc == MDT_IT_QUOTA) { + case IT_QUOTA_DQACQ: + case IT_QUOTA_CONN: { struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev; if (qmt == NULL) @@ -4046,33 +4085,31 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc, flags); RETURN(rc); } + default: + CERROR("%s: unknown intent code %#x\n", + mdt_obd_name(info->mti_mdt), it_opc); + RETURN(-EPROTO); + } - flv = &mdt_it_flavor[opc]; - if (flv->it_fmt != NULL) - req_capsule_extend(pill, flv->it_fmt); + req_capsule_extend(pill, it_format); - rc = mdt_unpack_req_pack_rep(info, flv->it_flags); + rc = mdt_unpack_req_pack_rep(info, it_handler_flags); if (rc < 0) RETURN(rc); - if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export)) + if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export)) RETURN(-EROFS); - if (flv->it_act != NULL) { - struct ldlm_reply *rep; + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); + /* execute policy */ + rc = (*it_handler)(it_opc, info, lockp, flags); - /* execute policy */ - rc = flv->it_act(opc, info, lockp, flags); - - /* Check whether the reply has been packed successfully. */ - if (req->rq_repmsg != NULL) { - rep = req_capsule_server_get(info->mti_pill, - &RMF_DLM_REP); - rep->lock_policy_res2 = - ptlrpc_status_hton(rep->lock_policy_res2); - } + /* Check whether the reply has been packed successfully. */ + if (req->rq_repmsg != NULL) { + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + rep->lock_policy_res2 = + ptlrpc_status_hton(rep->lock_policy_res2); } RETURN(rc); @@ -4090,9 +4127,12 @@ static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req, LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE)); } -static int mdt_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - enum ldlm_mode mode, __u64 flags, void *data) +static int mdt_intent_policy(const struct lu_env *env, + struct ldlm_namespace *ns, + struct ldlm_lock **lockp, + void *req_cookie, + enum ldlm_mode mode, + __u64 flags, void *data) { struct tgt_session_info *tsi; struct mdt_thread_info *info; @@ -4106,7 +4146,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, LASSERT(req != NULL); - tsi = tgt_ses_info(req->rq_svc_thread->t_env); + tsi = tgt_ses_info(env); info = tsi2mdt_info(tsi); LASSERT(info != NULL); @@ -4835,44 +4875,47 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC, 0, MDS_DISCONNECT, tgt_disconnect, &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION), TGT_RPC_HANDLER(MDS_FIRST_OPC, - HABEO_REFERO, MDS_SET_INFO, mdt_set_info, + HAS_REPLY, MDS_SET_INFO, mdt_set_info, &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION), TGT_MDT_HDL(0, MDS_GET_INFO, mdt_get_info), -TGT_MDT_HDL(0 | HABEO_REFERO, MDS_GET_ROOT, mdt_get_root), -TGT_MDT_HDL(HABEO_CORPUS, MDS_GETATTR, mdt_getattr), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, +TGT_MDT_HDL(HAS_REPLY, MDS_GET_ROOT, mdt_get_root), +TGT_MDT_HDL(HAS_BODY, MDS_GETATTR, mdt_getattr), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_GETATTR_NAME, mdt_getattr_name), -TGT_MDT_HDL(HABEO_CORPUS, MDS_GETXATTR, mdt_tgt_getxattr), -TGT_MDT_HDL(0 | HABEO_REFERO, MDS_STATFS, mdt_statfs), -TGT_MDT_HDL(0 | MUTABOR, MDS_REINT, mdt_reint), -TGT_MDT_HDL(HABEO_CORPUS, MDS_CLOSE, mdt_close), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_READPAGE, mdt_readpage), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_SYNC, mdt_sync), +TGT_MDT_HDL(HAS_BODY, MDS_GETXATTR, mdt_tgt_getxattr), +TGT_MDT_HDL(HAS_REPLY, MDS_STATFS, mdt_statfs), +TGT_MDT_HDL(IS_MUTABLE, MDS_REINT, mdt_reint), +TGT_MDT_HDL(HAS_BODY, MDS_CLOSE, mdt_close), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_READPAGE, mdt_readpage), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_SYNC, mdt_sync), TGT_MDT_HDL(0, MDS_QUOTACTL, mdt_quotactl), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_PROGRESS, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_PROGRESS, mdt_hsm_progress), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_REGISTER, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_REGISTER, mdt_hsm_ct_register), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_UNREGISTER, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_UNREGISTER, mdt_hsm_ct_unregister), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_STATE_GET, mdt_hsm_state_get), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_STATE_SET, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_STATE_SET, mdt_hsm_state_set), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_ACTION, mdt_hsm_action), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_REQUEST, mdt_hsm_request), -TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR, +TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_SWAP_LAYOUTS, mdt_swap_layouts), }; static struct tgt_handler mdt_io_ops[] = { -TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read), -TGT_OST_HDL(HABEO_CORPUS | MUTABOR, OST_BRW_WRITE, tgt_brw_write), -TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO | MUTABOR, - OST_PUNCH, mdt_punch_hdl), -TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC, mdt_data_sync), +TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY, OST_BRW_READ, tgt_brw_read, + mdt_hp_brw), +TGT_OST_HDL_HP(HAS_BODY | IS_MUTABLE, OST_BRW_WRITE, tgt_brw_write, + mdt_hp_brw), +TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE, + OST_PUNCH, mdt_punch_hdl, + mdt_hp_punch), +TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC, mdt_data_sync), }; static struct tgt_handler mdt_sec_ctx_ops[] = { @@ -4882,7 +4925,7 @@ TGT_SEC_HDL_VAR(0, SEC_CTX_FINI, mdt_sec_ctx_handle) }; static struct tgt_handler mdt_quota_ops[] = { -TGT_QUOTA_HDL(HABEO_REFERO, QUOTA_DQACQ, mdt_quota_dqacq), +TGT_QUOTA_HDL(HAS_REPLY, QUOTA_DQACQ, mdt_quota_dqacq), }; static struct tgt_opc_slice mdt_common_slice[] = { @@ -4995,6 +5038,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) mdt_hsm_cdt_fini(m); + if (m->mdt_los != NULL) { + local_oid_storage_fini(env, m->mdt_los); + m->mdt_los = NULL; + } + if (m->mdt_namespace != NULL) { ldlm_namespace_free_post(m->mdt_namespace); d->ld_obd->obd_namespace = m->mdt_namespace = NULL; @@ -5038,6 +5086,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, struct seq_server_site *ss_site; const char *identity_upcall = "NONE"; struct md_device *next; + struct lu_fid fid; int rc; long node_id; mntopt_t mntopts; @@ -5062,7 +5111,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); - m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */ + m->mdt_max_mdsize = MAX_MD_SIZE_OLD; m->mdt_opts.mo_evict_tgt_nids = 1; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; @@ -5082,15 +5131,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, } /* DoM files get IO lock at open by default */ - m->mdt_opts.mo_dom_lock = 1; + m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN; + /* DoM files are read at open and data is packed in the reply */ + m->mdt_opts.mo_dom_read_open = 1; m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); init_rwsem(&m->mdt_squash.rsi_sem); spin_lock_init(&m->mdt_lock); - m->mdt_enable_remote_dir = 0; + m->mdt_enable_remote_dir = 1; + m->mdt_enable_striped_dir = 1; + m->mdt_enable_dir_migration = 1; m->mdt_enable_remote_dir_gid = 0; + m->mdt_enable_remote_rename = 1; atomic_set(&m->mdt_mds_mds_conns, 0); atomic_set(&m->mdt_async_commit_count, 0); @@ -5156,18 +5210,11 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, /* set obd_namespace for compatibility with old code */ obd->obd_namespace = m->mdt_namespace; - rc = mdt_hsm_cdt_init(m); - if (rc != 0) { - CERROR("%s: error initializing coordinator, rc %d\n", - mdt_obd_name(m), rc); - GOTO(err_free_ns, rc); - } - rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice, OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_MDS_ALL_REPLY_NET); if (rc) - GOTO(err_free_hsm, rc); + GOTO(err_free_ns, rc); /* Amount of available space excluded from granting and reserved * for metadata. It is in percentage and 50% is default value. */ @@ -5182,6 +5229,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) GOTO(err_tgt, rc); + fid.f_seq = FID_SEQ_LOCAL_NAME; + fid.f_oid = 1; + fid.f_ver = 0; + rc = local_oid_storage_init(env, m->mdt_bottom, &fid, &m->mdt_los); + if (rc != 0) + GOTO(err_fs_cleanup, rc); + + rc = mdt_hsm_cdt_init(m); + if (rc != 0) { + CERROR("%s: error initializing coordinator, rc %d\n", + mdt_obd_name(m), rc); + GOTO(err_los_fini, rc); + } + tgt_adapt_sptlrpc_conf(&m->mdt_lut); next = m->mdt_child; @@ -5212,7 +5273,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (IS_ERR(m->mdt_identity_cache)) { rc = PTR_ERR(m->mdt_identity_cache); m->mdt_identity_cache = NULL; - GOTO(err_fs_cleanup, rc); + GOTO(err_free_hsm, rc); } rc = mdt_procfs_init(m, dev); @@ -5248,12 +5309,15 @@ err_recovery: target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; +err_free_hsm: + mdt_hsm_cdt_fini(m); +err_los_fini: + local_oid_storage_fini(env, m->mdt_los); + m->mdt_los = NULL; err_fs_cleanup: mdt_fs_cleanup(env, m); err_tgt: tgt_fini(env, &m->mdt_lut); -err_free_hsm: - mdt_hsm_cdt_fini(m); err_free_ns: ldlm_namespace_free(m->mdt_namespace, NULL, 0); obd->obd_namespace = m->mdt_namespace = NULL; @@ -5374,6 +5438,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, lu_object_add_top(h, o); o->lo_ops = &mdt_obj_ops; spin_lock_init(&mo->mot_write_lock); + mutex_init(&mo->mot_som_mutex); mutex_init(&mo->mot_lov_mutex); init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); @@ -5549,6 +5614,7 @@ static int mdt_connect_internal(const struct lu_env *env, struct mdt_device *mdt, struct obd_connect_data *data, bool reconnect) { + const char *obd_name = mdt_obd_name(mdt); LASSERT(data != NULL); data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; @@ -5577,8 +5643,7 @@ static int mdt_connect_internal(const struct lu_env *env, "ocd_version: %x ocd_grant: %d ocd_index: %u " "ocd_brw_size unexpectedly zero, network data " "corruption? Refusing to connect this client\n", - mdt_obd_name(mdt), - exp->exp_client_uuid.uuid, + obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, data->ocd_grant, data->ocd_index); return -EPROTO; @@ -5599,12 +5664,12 @@ static int mdt_connect_internal(const struct lu_env *env, data->ocd_grant_max_blks = ddp->ddp_max_extent_blks; } - if (OCD_HAS_FLAG(data, GRANT)) { - /* Save connect_data we have so far because tgt_grant_connect() - * uses it to calculate grant. */ - exp->exp_connect_data = *data; + /* Save connect_data we have so far because tgt_grant_connect() + * uses it to calculate grant, and we want to save the client + * version before it is overwritten by LUSTRE_VERSION_CODE. */ + exp->exp_connect_data = *data; + if (OCD_HAS_FLAG(data, GRANT)) tgt_grant_connect(env, exp, data, !reconnect); - } if (OCD_HAS_FLAG(data, MAXBYTES)) data->ocd_maxbytes = mdt->mdt_lut.lut_dt_conf.ddp_maxbytes; @@ -5624,7 +5689,7 @@ static int mdt_connect_internal(const struct lu_env *env, if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) { CWARN("%s: MDS requires FID support, but client not\n", - mdt_obd_name(mdt)); + obd_name); return -EBADE; } @@ -5658,7 +5723,8 @@ static int mdt_connect_internal(const struct lu_env *env, /* The client set in ocd_cksum_types the checksum types it * supports. We have to mask off the algorithms that we don't * support */ - data->ocd_cksum_types &= cksum_types_supported_server(); + data->ocd_cksum_types &= + obd_cksum_types_supported_server(obd_name); if (unlikely(data->ocd_cksum_types == 0)) { CERROR("%s: Connect with checksum support but no " @@ -5726,7 +5792,7 @@ static int mdt_export_cleanup(struct obd_export *exp) /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ - class_handle_unhash(&mfd->mfd_handle); + class_handle_unhash(&mfd->mfd_open_handle); list_move_tail(&mfd->mfd_list, &closing_list); } spin_unlock(&med->med_open_lock); @@ -5767,7 +5833,7 @@ static int mdt_export_cleanup(struct obd_export *exp) * archive request into a noop if it's not actually * dirty. */ - if (mfd->mfd_mode & FMODE_WRITE) + if (mfd->mfd_open_flags & MDS_FMODE_WRITE) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */ @@ -6666,12 +6732,12 @@ struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info) * \param mdt mdt device * \param val 0 disables COS, other values enable COS */ -void mdt_enable_cos(struct mdt_device *mdt, int val) +void mdt_enable_cos(struct mdt_device *mdt, bool val) { struct lu_env env; int rc; - mdt->mdt_opts.mo_cos = !!val; + mdt->mdt_opts.mo_cos = val; rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) { CWARN("%s: lu_env initialization failed, cannot "