CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
"number of OSS create threads to start");
+/**
+ * Do not return server-side uid/gid to remote client
+ */
+static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
+{
+ if (exp_connect_rmtclient(exp)) {
+ oa->o_uid = -1;
+ oa->o_gid = -1;
+ oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
+ }
+}
+
void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
{
struct oti_req_ack_lock *ack_lock;
{
struct ost_body *body, *repbody;
__u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ struct lustre_capa *capa = NULL;
int rc;
ENTRY;
ldlm_request_cancel(req, dlm, 0);
}
+ if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
+ capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 2);
+
rc = lustre_pack_reply(req, 2, size, NULL);
if (rc)
RETURN(rc);
repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
sizeof(*repbody));
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
+ req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
RETURN(0);
}
oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
REQ_REC_OFF + 1);
req->rq_status = obd_getattr(exp, &oinfo);
+ ost_drop_id(exp, &repbody->oa);
RETURN(0);
}
/* check that we do support OBD_CONNECT_TRUNCLOCK. */
CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL)
- RETURN(-EFAULT);
+ /* ost_body is varified and swabbed in ost_hpreq_handler() */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
oinfo.oi_oa = &body->oa;
oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
}
repbody->oa = *oinfo.oi_oa;
+ ost_drop_id(exp, &repbody->oa);
RETURN(rc);
}
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
repbody->oa.o_blocks, capa);
+ ost_drop_id(exp, &repbody->oa);
RETURN(0);
}
oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
REQ_REC_OFF + 1);
req->rq_status = obd_setattr(exp, &oinfo, oti);
+ ost_drop_id(exp, &repbody->oa);
RETURN(0);
}
struct ost_prolong_data {
struct obd_export *opd_exp;
ldlm_policy_data_t opd_policy;
+ struct obdo *opd_oa;
ldlm_mode_t opd_mode;
+ int opd_lock_match;
+ int opd_timeout;
};
static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
return LDLM_ITER_CONTINUE;
}
+ /* Fill the obdo with the matched lock handle.
+ * XXX: it is possible in some cases the IO RPC is covered by several
+ * locks, even for the write case, so it may need to be a lock list. */
+ if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
+ opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
+ opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
+ }
+
if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
/* ignore locks not being cancelled */
return LDLM_ITER_CONTINUE;
/* OK. this is a possible lock the user holds doing I/O
* let's refresh eviction timer for it */
- ldlm_refresh_waiting_lock(lock);
+ ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
+ opd->opd_lock_match = 1;
return LDLM_ITER_CONTINUE;
}
-static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
- struct niobuf_remote *nb, struct obdo *oa,
- ldlm_mode_t mode)
+static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, struct obdo *oa,
+ ldlm_mode_t mode)
{
struct ldlm_res_id res_id;
int nrbufs = obj->ioo_bufcnt;
- struct ost_prolong_data opd;
+ struct ost_prolong_data opd = { 0 };
ENTRY;
osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
opd.opd_mode = mode;
- opd.opd_exp = exp;
+ opd.opd_exp = req->rq_export;
opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
+ /* prolong locks for the current service time of the corresponding
+ * portal (= OST_IO_PORTAL) */
+ opd.opd_timeout = AT_OFF ? obd_timeout / 2:
+ max(at_est2timeout(at_get(&req->rq_rqbd->
+ rqbd_service->srv_at_estimate)), ldlm_timeout);
+
CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
opd.opd_policy.l_extent.end);
lock = ldlm_handle2lock(&oa->o_handle);
if (lock != NULL) {
ost_prolong_locks_iter(lock, &opd);
+ if (opd.opd_lock_match) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(1);
+ }
+
+ /* Check if the lock covers the whole IO region,
+ * otherwise iterate through the resource. */
+ if (lock->l_policy_data.l_extent.end >=
+ opd.opd_policy.l_extent.end &&
+ lock->l_policy_data.l_extent.start <=
+ opd.opd_policy.l_extent.start) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(0);
+ }
LDLM_LOCK_PUT(lock);
- EXIT;
- return;
}
}
- ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+ opd.opd_oa = oa;
+ ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
ost_prolong_locks_iter, &opd);
-
- EXIT;
+ RETURN(opd.opd_lock_match);
}
static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
struct l_wait_info lwi;
struct lustre_handle lockh = { 0 };
__u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
- int objcount, niocount, npages, nob = 0, rc, i;
+ int niocount, npages, nob = 0, rc, i;
int no_reply = 0;
ENTRY;
if (exp->exp_failed)
GOTO(out, rc = -ENOTCONN);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- GOTO(out, rc = -EFAULT);
- }
-
- objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- GOTO(out, rc = -EFAULT);
- }
+ /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+ * ost_rw_hpreq_check(). */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
- ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
- lustre_swab_obd_ioobj);
- if (ioo == NULL) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
+ LASSERT(ioo != NULL);
niocount = ioo->ioo_bufcnt;
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
- niocount);
- GOTO(out, rc = -EFAULT);
- }
-
- remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb),
- lustre_swab_niobuf_remote);
- if (remote_nb == NULL) {
- CERROR("Missing/short niobuf\n");
- GOTO(out, rc = -EFAULT);
- }
- if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
- for (i = 1; i < niocount; i++)
- lustre_swab_niobuf_remote (&remote_nb[i]);
- }
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*remote_nb));
+ LASSERT(remote_nb != NULL);
if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
if (desc == NULL) /* XXX: check all cleanup stuff */
GOTO(out, rc = -ENOMEM);
- ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
+ ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
nob = 0;
for (i = 0; i < npages; i++) {
if (exp->exp_failed)
rc = -ENOTCONN;
else {
- sptlrpc_svc_wrap_bulk(req, desc);
-
- rc = ptlrpc_start_bulk_transfer(desc);
+ rc = sptlrpc_svc_wrap_bulk(req, desc);
+ if (rc == 0)
+ rc = ptlrpc_start_bulk_transfer(desc);
}
if (rc == 0) {
ost_bulk_timeout,
desc);
rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_bulk_active(desc) ||
+ !ptlrpc_server_bulk_active(desc) ||
exp->exp_failed, &lwi);
LASSERT(rc == 0 || rc == -ETIMEDOUT);
/* Wait again if we changed deadline */
repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
sizeof(*repbody));
memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
+ ost_drop_id(exp, &repbody->oa);
}
out_lock:
__u32 *rcs;
__u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int objcount, niocount, npages;
- int rc, swab, i, j;
+ int rc, i, j;
obd_count client_cksum = 0, server_cksum = 0;
cksum_type_t cksum_type = OBD_CKSUM_CRC32;
int no_reply = 0;
+ __u32 o_uid = 0, o_gid = 0;
ENTRY;
req->rq_bulk_write = 1;
if (exp->exp_failed)
GOTO(out, rc = -ENOTCONN);
- swab = lustre_msg_swabbed(req->rq_reqmsg);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- GOTO(out, rc = -EFAULT);
- }
+ /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+ * ost_rw_hpreq_check(). */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
- lustre_set_req_swabbed(req, REQ_REC_OFF + 1);
objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- GOTO(out, rc = -EFAULT);
- }
-
+ sizeof(*ioo);
ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
objcount * sizeof(*ioo));
- LASSERT (ioo != NULL);
- for (niocount = i = 0; i < objcount; i++) {
- if (swab)
- lustre_swab_obd_ioobj(&ioo[i]);
- if (ioo[i].ioo_bufcnt == 0) {
- CERROR("ioo[%d] has zero bufcnt\n", i);
- GOTO(out, rc = -EFAULT);
- }
+ LASSERT(ioo != NULL);
+ for (niocount = i = 0; i < objcount; i++)
niocount += ioo[i].ioo_bufcnt;
- }
-
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
- niocount);
- GOTO(out, rc = -EFAULT);
- }
- remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb),
- lustre_swab_niobuf_remote);
- if (remote_nb == NULL) {
- CERROR("Missing/short niobuf\n");
- GOTO(out, rc = -EFAULT);
- }
- if (swab) { /* swab the remaining niobufs */
- for (i = 1; i < niocount; i++)
- lustre_swab_niobuf_remote (&remote_nb[i]);
- }
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*remote_nb));
+ LASSERT(remote_nb != NULL);
if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
GOTO(out_lock, rc = -ETIMEDOUT);
}
- ost_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW);
+ ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa, LCK_PW);
/* obd_preprw clobbers oa->valid, so save what we need */
if (body->oa.o_valid & OBD_MD_FLCKSUM) {
body->oa.o_valid &= ~OBD_MD_FLGRANT;
}
+ if (exp_connect_rmtclient(exp)) {
+ o_uid = body->oa.o_uid;
+ o_gid = body->oa.o_gid;
+ }
npages = OST_THREAD_POOL_SIZE;
rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
ioo, remote_nb, &npages, local_nb, oti, capa);
local_nb[i].offset & ~CFS_PAGE_MASK,
local_nb[i].len);
+ rc = sptlrpc_svc_prep_bulk(req, desc);
+ if (rc != 0)
+ GOTO(out_lock, rc);
+
/* Check if client was evicted while we were doing i/o before touching
network */
if (desc->bd_export->exp_failed)
rc = -ENOTCONN;
else
- rc = ptlrpc_start_bulk_transfer (desc);
+ rc = ptlrpc_start_bulk_transfer(desc);
if (rc == 0) {
time_t start = cfs_time_current_sec();
do {
lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
ost_bulk_timeout, desc);
rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_bulk_active(desc) ||
+ !ptlrpc_server_bulk_active(desc) ||
desc->bd_export->exp_failed, &lwi);
LASSERT(rc == 0 || rc == -ETIMEDOUT);
/* Wait again if we changed deadline */
DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
rc = -ENOTCONN;
ptlrpc_abort_bulk(desc);
- } else if (!desc->bd_success ||
- desc->bd_nob_transferred != desc->bd_nob) {
- DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
- desc->bd_success ?
- "truncated" : "network error on",
- desc->bd_nob_transferred, desc->bd_nob);
+ } else if (!desc->bd_success) {
+ DEBUG_REQ(D_ERROR, req, "network error on bulk GET");
/* XXX should this be a different errno? */
rc = -ETIMEDOUT;
+ } else {
+ rc = sptlrpc_svc_unwrap_bulk(req, desc);
}
} else {
DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
}
no_reply = rc != 0;
- if (rc == 0)
- sptlrpc_svc_unwrap_bulk(req, desc);
-
repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
sizeof(*repbody));
memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
/* Must commit after prep above in all cases */
rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
remote_nb, npages, local_nb, oti, rc);
+ if (exp_connect_rmtclient(exp)) {
+ repbody->oa.o_uid = o_uid;
+ repbody->oa.o_gid = o_gid;
+ }
if (unlikely(client_cksum != server_cksum && rc == 0)) {
int new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
RETURN(rc);
}
+#ifdef HAVE_QUOTA_SUPPORT
static int ost_handle_quotactl(struct ptlrpc_request *req)
{
struct obd_quotactl *oqctl, *repoqc;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
int rc;
ENTRY;
- oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
- lustre_swab_obd_quotactl);
+ oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
if (oqctl == NULL)
GOTO(out, rc = -EPROTO);
- rc = lustre_pack_reply(req, 2, size, NULL);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
GOTO(out, rc);
- repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
-
+ repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
req->rq_status = obd_quotactl(req->rq_export, oqctl);
*repoqc = *oqctl;
+
out:
RETURN(rc);
}
RETURN(-EPROTO);
rc = req_capsule_server_pack(&req->rq_pill);
- if (rc) {
- CERROR("ost: out of memory while packing quotacheck reply\n");
+ if (rc)
RETURN(-ENOMEM);
- }
req->rq_status = obd_quotacheck(req->rq_export, oqctl);
RETURN(0);
}
+static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
+{
+ struct quota_adjust_qunit *oqaq, *repoqa;
+ struct lustre_quota_ctxt *qctxt;
+ int rc;
+ ENTRY;
+
+ qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
+ oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
+ if (oqaq == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ rc = req_capsule_server_pack(&req->rq_pill);
+ if (rc)
+ GOTO(out, rc);
+
+ repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
+ req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt);
+ *repoqa = *oqaq;
+
+ out:
+ RETURN(rc);
+}
+#endif
+
static int ost_llog_handle_connect(struct obd_export *exp,
struct ptlrpc_request *req)
{
RETURN(rc);
}
-static int filter_export_check_flavor(struct filter_obd *filter,
- struct obd_export *exp,
- struct ptlrpc_request *req)
+#define ost_init_sec_none(reply, exp) \
+do { \
+ reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT | \
+ OBD_CONNECT_RMT_CLIENT_FORCE | \
+ OBD_CONNECT_OSS_CAPA); \
+ spin_lock(&exp->exp_lock); \
+ exp->exp_connect_flags = reply->ocd_connect_flags; \
+ spin_unlock(&exp->exp_lock); \
+} while (0)
+
+static int ost_init_sec_level(struct ptlrpc_request *req)
{
- int rc = 0;
-
- /* FIXME
- * this should be done in filter_connect()/filter_reconnect(), but
- * we can't obtain information like NID, which stored in incoming
- * request, thus can't decide what flavor to use. so we do it here.
- *
- * This hack should be removed after the OST stack be rewritten, just
- * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
- */
- if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_INVALID)
- return 0;
-
- CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
- spin_lock(&exp->exp_lock);
- exp->exp_sp_peer = req->rq_sp_from;
-
- read_lock(&filter->fo_sptlrpc_lock);
- sptlrpc_rule_set_choose(&filter->fo_sptlrpc_rset, exp->exp_sp_peer,
- req->rq_peer.nid, &exp->exp_flvr);
- read_unlock(&filter->fo_sptlrpc_lock);
-
- if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
- CERROR("invalid rpc flavor %x, expect %x, from %s\n",
- req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
- libcfs_nid2str(req->rq_peer.nid));
- exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
- rc = -EACCES;
+ struct obd_export *exp = req->rq_export;
+ struct req_capsule *pill = &req->rq_pill;
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
+ char *client = libcfs_nid2str(req->rq_peer.nid);
+ struct obd_connect_data *data, *reply;
+ int rc = 0, remote;
+ ENTRY;
+
+ data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
+ reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
+ if (data == NULL || reply == NULL)
+ RETURN(-EFAULT);
+
+ /* connection from MDT is always trusted */
+ if (req->rq_auth_usr_mdt) {
+ ost_init_sec_none(reply, exp);
+ RETURN(0);
+ }
+
+ /* no GSS support case */
+ if (!req->rq_auth_gss) {
+ if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
+ CWARN("client %s -> target %s does not user GSS, "
+ "can not run under security level %d.\n",
+ client, obd->obd_name, filter->fo_sec_level);
+ RETURN(-EACCES);
+ } else {
+ ost_init_sec_none(reply, exp);
+ RETURN(0);
+ }
+ }
+
+ /* old version case */
+ if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
+ !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
+ if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
+ CWARN("client %s -> target %s uses old version, "
+ "can not run under security level %d.\n",
+ client, obd->obd_name, filter->fo_sec_level);
+ RETURN(-EACCES);
+ } else {
+ CWARN("client %s -> target %s uses old version, "
+ "run under security level %d.\n",
+ client, obd->obd_name, filter->fo_sec_level);
+ ost_init_sec_none(reply, exp);
+ RETURN(0);
+ }
+ }
+
+ remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
+ if (remote) {
+ if (!req->rq_auth_remote)
+ CDEBUG(D_SEC, "client (local realm) %s -> target %s "
+ "asked to be remote.\n", client, obd->obd_name);
+ } else if (req->rq_auth_remote) {
+ remote = 1;
+ CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
+ "as remote by default.\n", client, obd->obd_name);
+ }
+
+ if (remote) {
+ if (!filter->fo_fl_oss_capa) {
+ CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
+ " but OSS capabilities are not enabled: %d.\n",
+ client, obd->obd_name, filter->fo_fl_oss_capa);
+ RETURN(-EACCES);
+ }
+ }
+
+ switch (filter->fo_sec_level) {
+ case LUSTRE_SEC_NONE:
+ if (!remote) {
+ ost_init_sec_none(reply, exp);
+ break;
+ } else {
+ CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
+ "can not run under security level %d.\n",
+ client, obd->obd_name, filter->fo_sec_level);
+ RETURN(-EACCES);
+ }
+ case LUSTRE_SEC_REMOTE:
+ if (!remote)
+ ost_init_sec_none(reply, exp);
+ break;
+ case LUSTRE_SEC_ALL:
+ if (!remote) {
+ reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
+ OBD_CONNECT_RMT_CLIENT_FORCE);
+ if (!filter->fo_fl_oss_capa)
+ reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
+
+ spin_lock(&exp->exp_lock);
+ exp->exp_connect_flags = reply->ocd_connect_flags;
+ spin_unlock(&exp->exp_lock);
+ }
+ break;
+ default:
+ RETURN(-EINVAL);
}
- spin_unlock(&exp->exp_lock);
+ RETURN(rc);
+}
+
+/*
+ * FIXME
+ * this should be done in filter_connect()/filter_reconnect(), but
+ * we can't obtain information like NID, which stored in incoming
+ * request, thus can't decide what flavor to use. so we do it here.
+ *
+ * This hack should be removed after the OST stack be rewritten, just
+ * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
+ */
+static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
+{
+ struct obd_export *exp = req->rq_export;
+ struct filter_obd *filter = &exp->exp_obd->u.filter;
+ struct sptlrpc_flavor flvr;
+ int rc = 0;
+
+ if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
+ read_lock(&filter->fo_sptlrpc_lock);
+ sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
+ req->rq_sp_from,
+ req->rq_peer.nid,
+ &flvr);
+ read_unlock(&filter->fo_sptlrpc_lock);
+
+ spin_lock(&exp->exp_lock);
+
+ exp->exp_sp_peer = req->rq_sp_from;
+ exp->exp_flvr = flvr;
+
+ if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
+ exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
+ CERROR("unauthorized rpc flavor %x from %s, "
+ "expect %x\n", req->rq_flvr.sf_rpc,
+ libcfs_nid2str(req->rq_peer.nid),
+ exp->exp_flvr.sf_rpc);
+ rc = -EACCES;
+ }
+
+ spin_unlock(&exp->exp_lock);
+ } else {
+ if (exp->exp_sp_peer != req->rq_sp_from) {
+ CERROR("RPC source %s doesn't match %s\n",
+ sptlrpc_part2name(req->rq_sp_from),
+ sptlrpc_part2name(exp->exp_sp_peer));
+ rc = -EACCES;
+ } else {
+ rc = sptlrpc_target_export_check(exp, req);
+ }
+ }
return rc;
}
case OST_SYNC:
case OST_SET_INFO:
case OST_GET_INFO:
+#ifdef HAVE_QUOTA_SUPPORT
case OST_QUOTACHECK:
case OST_QUOTACTL:
+ case OST_QUOTA_ADJUST_QUNIT:
+#endif
rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
if (rc)
CERROR("bad opc %u version %08x, expecting %08x\n",
return rc;
}
+static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i;
+ __u64 start, end;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ /* As the request may be covered by several locks, do not look at
+ * o_handle, look at the RPC IO region. */
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+ lustre_swab_obdo);
+ objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo));
+ LASSERT(ioo != NULL);
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+
+ nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb));
+ LASSERT(nb != NULL);
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+
+ start = nb[0].offset & CFS_PAGE_MASK;
+ end = (nb[ioo->ioo_bufcnt - 1].offset +
+ nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
+
+ LASSERT(lock->l_resource != NULL);
+ if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_gr,
+ &lock->l_resource->lr_name))
+ RETURN(0);
+
+ if (!(lock->l_granted_mode & mode))
+ RETURN(0);
+
+ if (lock->l_policy_data.l_extent.end < start ||
+ lock->l_policy_data.l_extent.start > end)
+ RETURN(0);
+
+ RETURN(1);
+}
+
+/**
+ * Swab buffers needed to call ost_rw_prolong_locks() and call it.
+ * Return the value from ost_rw_prolong_locks() which is non-zero if
+ * there is a cancelled lock which is waiting for this IO request.
+ */
+static int ost_rw_hpreq_check(struct ptlrpc_request *req)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
+
+ objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo));
+ LASSERT(ioo != NULL);
+
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+ nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb));
+ LASSERT(nb != NULL);
+ LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+ RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
+}
+
+static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
+{
+ struct ldlm_res_id res_id = { .name = { oa->o_id } };
+ struct ost_prolong_data opd = { 0 };
+ __u64 start, end;
+ ENTRY;
+
+ start = oa->o_size;
+ end = start + oa->o_blocks;
+
+ opd.opd_mode = LCK_PW;
+ opd.opd_exp = req->rq_export;
+ opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
+ if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
+ opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
+ else
+ opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+ /* prolong locks for the current service time of the corresponding
+ * portal (= OST_IO_PORTAL) */
+ opd.opd_timeout = AT_OFF ? obd_timeout / 2:
+ max(at_est2timeout(at_get(&req->rq_rqbd->
+ rqbd_service->srv_at_estimate)), ldlm_timeout);
+
+ CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+ res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
+ opd.opd_policy.l_extent.end);
+
+ opd.opd_oa = oa;
+ ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
+ ost_prolong_locks_iter, &opd);
+ RETURN(opd.opd_lock_match);
+}
+
+static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct ost_body *body;
+ ENTRY;
+
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+ lustre_swab_obdo);
+ LASSERT(body != NULL);
+
+ if (body->oa.o_valid & OBD_MD_FLHANDLE &&
+ body->oa.o_handle.cookie == lock->l_handle.h_cookie)
+ RETURN(1);
+ RETURN(0);
+}
+
+static int ost_punch_hpreq_check(struct ptlrpc_request *req)
+{
+ struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
+ REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
+ LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
+
+ RETURN(ost_punch_prolong_locks(req, &body->oa));
+}
+
+struct ptlrpc_hpreq_ops ost_hpreq_rw = {
+ .hpreq_lock_match = ost_rw_hpreq_lock_match,
+ .hpreq_check = ost_rw_hpreq_check,
+};
+
+struct ptlrpc_hpreq_ops ost_hpreq_punch = {
+ .hpreq_lock_match = ost_punch_hpreq_lock_match,
+ .hpreq_check = ost_punch_hpreq_check,
+};
+
+/** Assign high priority operations to the request if needed. */
+static int ost_hpreq_handler(struct ptlrpc_request *req)
+{
+ ENTRY;
+ if (req->rq_export) {
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+ struct ost_body *body;
+
+ if (opc == OST_READ || opc == OST_WRITE) {
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ int objcount, niocount;
+ int swab, i;
+
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+ sizeof(*body),
+ lustre_swab_obdo);
+ if (!body) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+ objcount = lustre_msg_buflen(req->rq_reqmsg,
+ REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ if (objcount == 0) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+ if (objcount > 1) {
+ CERROR("too many ioobjs (%d)\n", objcount);
+ RETURN(-EFAULT);
+ }
+
+ swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) &&
+ lustre_msg_swabbed(req->rq_reqmsg);
+ ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo),
+ lustre_swab_obd_ioobj);
+ if (!ioo) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+ for (niocount = i = 0; i < objcount; i++) {
+ if (i > 0 && swab)
+ lustre_swab_obd_ioobj(&ioo[i]);
+ if (ioo[i].ioo_bufcnt == 0) {
+ CERROR("ioo[%d] has zero bufcnt\n", i);
+ RETURN(-EFAULT);
+ }
+ niocount += ioo[i].ioo_bufcnt;
+ }
+ if (niocount > PTLRPC_MAX_BRW_PAGES) {
+ DEBUG_REQ(D_ERROR, req, "bulk has too many "
+ "pages (%d)", niocount);
+ RETURN(-EFAULT);
+ }
+
+ swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) &&
+ lustre_msg_swabbed(req->rq_reqmsg);
+ nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb),
+ lustre_swab_niobuf_remote);
+ if (!nb) {
+ CERROR("Missing/short niobuf\n");
+ RETURN(-EFAULT);
+ }
+
+ if (swab) {
+ /* swab remaining niobufs */
+ for (i = 1; i < niocount; i++)
+ lustre_swab_niobuf_remote(&nb[i]);
+ }
+
+ if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+ req->rq_ops = &ost_hpreq_rw;
+ } else if (opc == OST_PUNCH) {
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+ sizeof(*body),
+ lustre_swab_obdo);
+ if (!body) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+
+ if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
+ req->rq_ops = &ost_hpreq_punch;
+ }
+ }
+ RETURN(0);
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
int ost_handle(struct ptlrpc_request *req)
{
if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
RETURN(0);
if (!rc) {
- struct obd_export *exp = req->rq_export;
-
- obd = exp->exp_obd;
-
- rc = filter_export_check_flavor(&obd->u.filter,
- exp, req);
+ rc = ost_init_sec_level(req);
+ if (!rc)
+ rc = ost_connect_check_sptlrpc(req);
}
break;
}
DEBUG_REQ(D_INODE, req, "get_info");
rc = ost_get_info(req->rq_export, req);
break;
+#ifdef HAVE_QUOTA_SUPPORT
case OST_QUOTACHECK:
CDEBUG(D_INODE, "quotacheck\n");
req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
RETURN(0);
rc = ost_handle_quotactl(req);
break;
+ case OST_QUOTA_ADJUST_QUNIT:
+ CDEBUG(D_INODE, "quota_adjust_qunit\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
+ rc = ost_handle_quota_adjust_qunit(req);
+ break;
+#endif
case OBD_PING:
DEBUG_REQ(D_INODE, req, "ping");
req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
/* Insure a 4x range for dynamic threads */
if (oss_min_threads > OSS_THREADS_MAX / 4)
oss_min_threads = OSS_THREADS_MAX / 4;
- oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4);
+ oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
}
ost->ost_service =
ost_handle, LUSTRE_OSS_NAME,
obd->obd_proc_entry, target_print_req,
oss_min_threads, oss_max_threads,
- "ll_ost", LCT_DT_THREAD);
+ "ll_ost", LCT_DT_THREAD, NULL);
if (ost->ost_service == NULL) {
CERROR("failed to start service\n");
GOTO(out_lprocfs, rc = -ENOMEM);
ost_handle, "ost_create",
obd->obd_proc_entry, target_print_req,
oss_min_create_threads, oss_max_create_threads,
- "ll_ost_creat", LCT_DT_THREAD);
+ "ll_ost_creat", LCT_DT_THREAD, NULL);
if (ost->ost_create_service == NULL) {
CERROR("failed to start OST create service\n");
GOTO(out_service, rc = -ENOMEM);
ost_handle, "ost_io",
obd->obd_proc_entry, target_print_req,
oss_min_threads, oss_max_threads,
- "ll_ost_io", LCT_DT_THREAD);
+ "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
if (ost->ost_io_service == NULL) {
CERROR("failed to start OST I/O service\n");
GOTO(out_create, rc = -ENOMEM);