CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
"CPU partitions OSS IO threads should run on");
-/**
- * Validate oa from client.
- * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
- * req are valid.
- * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0
- * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
- * pack ost_id. Because non-zero oi_seq will make it diffcult to tell
- * whether this is oi_fid or real ostid. So it will check
- * OBD_CONNECT_FID, then convert the ostid to FID for old client.
- * c. Old FID-disable osc will send IDIF.
- * d. new FID-enable osc/osp will send normal FID.
- *
- * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
- * be used for LAST_ID file, and only being accessed inside OST now.
- */
-static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
- struct obd_ioobj *ioobj)
-{
- int rc = 0;
-
- if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
- fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
- /* Sigh 2.[123] client still sends echo req with oi_id = 0
- * during create, and we will reset this to 1, since this
- * oi_id is basically useless in the following create process,
- * but oi_id == 0 will make it difficult to tell whether it is
- * real FID or ost_id. */
- oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
- oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
- oa->o_oi.oi_fid.f_ver = 0;
- } else {
- if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
- GOTO(out, rc = -EPROTO);
-
- /* Note: this check might be forced in 2.5 or 2.6, i.e.
- * all of the requests are required to setup FLGROUP */
- if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
- ostid_set_seq_mdt0(&oa->o_oi);
- if (ioobj)
- ostid_set_seq_mdt0(&ioobj->ioo_oid);
- oa->o_valid |= OBD_MD_FLGROUP;
- }
-
- if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
- GOTO(out, rc = -EPROTO);
- }
-
- if (ioobj != NULL) {
- unsigned max_brw = ioobj_max_brw_get(ioobj);
-
- if (unlikely((max_brw & (max_brw - 1)) != 0)) {
- CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
- ": rc = -EPROTO\n", exp->exp_obd->obd_name,
- obd_export_nid2str(exp), max_brw,
- POSTID(&oa->o_oi));
- GOTO(out, rc = -EPROTO);
- }
- ioobj->ioo_oid = oa->o_oi;
- }
-
-out:
- if (rc != 0)
- CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
- exp->exp_obd->obd_name, obd_export_nid2str(exp),
- oa ? ostid_seq(&oa->o_oi) : -1,
- oa ? ostid_id(&oa->o_oi) : -1, rc);
- return rc;
-}
-
-struct ost_prolong_data {
- struct ptlrpc_request *opd_req;
- struct obd_export *opd_exp;
- struct obdo *opd_oa;
- struct ldlm_res_id opd_resid;
- struct ldlm_extent opd_extent;
- ldlm_mode_t opd_mode;
- unsigned int opd_locks;
- int opd_timeout;
-};
-
-/* prolong locks for the current service time of the corresponding
- * portal (= OST_IO_PORTAL)
- */
-static inline int prolong_timeout(struct ptlrpc_request *req)
-{
- struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-
- if (AT_OFF)
- return obd_timeout / 2;
-
- return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
- ldlm_timeout);
-}
-
-static void ost_prolong_lock_one(struct ost_prolong_data *opd,
- struct ldlm_lock *lock)
-{
- LASSERT(lock->l_export == opd->opd_exp);
-
- if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
- return;
-
- /* XXX: never try to grab resource lock here because we're inside
- * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
- * res lock and then exp_bl_list_lock. */
-
- if (!(lock->l_flags & LDLM_FL_AST_SENT))
- /* ignore locks not being cancelled */
- return;
-
- LDLM_DEBUG(lock,
- "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
- opd->opd_req->rq_xid, opd->opd_extent.start,
- opd->opd_extent.end, opd->opd_timeout);
-
- /* OK. this is a possible lock the user holds doing I/O
- * let's refresh eviction timer for it */
- ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
- ++opd->opd_locks;
-}
-
-static void ost_prolong_locks(struct ost_prolong_data *data)
-{
- struct obd_export *exp = data->opd_exp;
- struct obdo *oa = data->opd_oa;
- struct ldlm_lock *lock;
- ENTRY;
-
- if (oa->o_valid & OBD_MD_FLHANDLE) {
- /* mostly a request should be covered by only one lock, try
- * fast path. */
- lock = ldlm_handle2lock(&oa->o_handle);
- if (lock != NULL) {
- /* Fast path to check if the lock covers the whole IO
- * region exclusively. */
- if (lock->l_granted_mode == LCK_PW &&
- ldlm_extent_contain(&lock->l_policy_data.l_extent,
- &data->opd_extent)) {
- /* bingo */
- ost_prolong_lock_one(data, lock);
- LDLM_LOCK_PUT(lock);
- RETURN_EXIT;
- }
- LDLM_LOCK_PUT(lock);
- }
- }
-
-
- spin_lock_bh(&exp->exp_bl_list_lock);
- cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
- LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
- LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
-
- if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
- continue;
-
- if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
- &data->opd_extent))
- continue;
-
- ost_prolong_lock_one(data, lock);
- }
- spin_unlock_bh(&exp->exp_bl_list_lock);
-
- EXIT;
-}
-
-/**
- * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
- * not.
- */
-static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
- struct ldlm_lock *lock)
-{
- struct niobuf_remote *nb;
- struct obd_ioobj *ioo;
- int mode, opc;
- struct ldlm_extent ext;
- ENTRY;
-
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- LASSERT(opc == OST_READ || opc == OST_WRITE);
-
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- LASSERT(ioo != NULL);
-
- nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- LASSERT(nb != NULL);
-
- ext.start = nb->offset;
- nb += ioo->ioo_bufcnt - 1;
- ext.end = nb->offset + nb->len - 1;
-
- LASSERT(lock->l_resource != NULL);
- if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
- RETURN(0);
-
- mode = LCK_PW;
- if (opc == OST_READ)
- mode |= LCK_PR;
- if (!(lock->l_granted_mode & mode))
- RETURN(0);
-
- RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
-}
-
-/**
- * High-priority queue request check for whether the given PTLRPC request (\a
- * req) is blocking an LDLM lock cancel.
- *
- * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
- * cancel, 0 if it is not, and -EFAULT if the request is malformed.
- *
- * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
- * function looks only at OST_READs and OST_WRITEs.
- */
-static int ost_rw_hpreq_check(struct ptlrpc_request *req)
-{
- struct obd_device *obd = req->rq_export->exp_obd;
- struct ost_body *body;
- struct obd_ioobj *ioo;
- struct niobuf_remote *nb;
- struct ost_prolong_data opd = { 0 };
- int mode, opc;
- ENTRY;
-
- /*
- * Use LASSERT to do sanity check because malformed RPCs should have
- * been filtered out in ost_hpreq_handler().
- */
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- LASSERT(opc == OST_READ || opc == OST_WRITE);
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body != NULL);
-
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- LASSERT(ioo != NULL);
-
- nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- LASSERT(nb != NULL);
- LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
-
- ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
-
- opd.opd_req = req;
- mode = LCK_PW;
- if (opc == OST_READ)
- mode |= LCK_PR;
- opd.opd_mode = mode;
- opd.opd_exp = req->rq_export;
- opd.opd_oa = &body->oa;
- opd.opd_extent.start = nb->offset;
- nb += ioo->ioo_bufcnt - 1;
- opd.opd_extent.end = nb->offset + nb->len - 1;
- opd.opd_timeout = prolong_timeout(req);
-
- DEBUG_REQ(D_RPCTRACE, req,
- "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
- obd->obd_name, current->comm,
- opd.opd_resid.name[0], opd.opd_resid.name[1],
- opd.opd_extent.start, opd.opd_extent.end);
-
- ost_prolong_locks(&opd);
-
- CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
- obd->obd_name, opd.opd_locks, req);
-
- RETURN(opd.opd_locks > 0);
-}
-
-static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
-{
- (void)ost_rw_hpreq_check(req);
-}
-
-/**
- * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
- struct ldlm_lock *lock)
-{
- struct ost_body *body;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body != NULL);
-
- if (body->oa.o_valid & OBD_MD_FLHANDLE &&
- body->oa.o_handle.cookie == lock->l_handle.h_cookie)
- RETURN(1);
-
- RETURN(0);
-}
-
-/**
- * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_check(struct ptlrpc_request *req)
-{
- struct obd_device *obd = req->rq_export->exp_obd;
- struct ost_body *body;
- struct obdo *oa;
- struct ost_prolong_data opd = { 0 };
- __u64 start, end;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body != NULL);
-
- oa = &body->oa;
- LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
- !(oa->o_flags & OBD_FL_SRVLOCK));
-
- start = oa->o_size;
- end = start + oa->o_blocks;
-
- opd.opd_req = req;
- opd.opd_mode = LCK_PW;
- opd.opd_exp = req->rq_export;
- opd.opd_oa = oa;
- opd.opd_extent.start = start;
- opd.opd_extent.end = end;
- if (oa->o_blocks == OBD_OBJECT_EOF)
- opd.opd_extent.end = OBD_OBJECT_EOF;
- opd.opd_timeout = prolong_timeout(req);
-
- ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
-
- CDEBUG(D_DLMTRACE,
- "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
- obd->obd_name,
- opd.opd_resid.name[0], opd.opd_resid.name[1],
- opd.opd_extent.start, opd.opd_extent.end);
-
- ost_prolong_locks(&opd);
-
- CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
- obd->obd_name, opd.opd_locks, req);
-
- RETURN(opd.opd_locks > 0);
-}
-
-static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
-{
- (void)ost_punch_hpreq_check(req);
-}
-
-struct ptlrpc_hpreq_ops ost_hpreq_rw = {
- .hpreq_lock_match = ost_rw_hpreq_lock_match,
- .hpreq_check = ost_rw_hpreq_check,
- .hpreq_fini = ost_rw_hpreq_fini
-};
-
-struct ptlrpc_hpreq_ops ost_hpreq_punch = {
- .hpreq_lock_match = ost_punch_hpreq_lock_match,
- .hpreq_check = ost_punch_hpreq_check,
- .hpreq_fini = ost_punch_hpreq_fini
-};
-
-/** Assign high priority operations to the request if needed. */
-static int ost_io_hpreq_handler(struct ptlrpc_request *req)
-{
- ENTRY;
- if (req->rq_export) {
- int opc = lustre_msg_get_opc(req->rq_reqmsg);
- struct ost_body *body;
-
- if (opc == OST_READ || opc == OST_WRITE) {
- struct niobuf_remote *nb;
- struct obd_ioobj *ioo;
- int objcount, niocount;
- int rc;
- int i;
-
- /* RPCs on the H-P queue can be inspected before
- * ost_handler() initializes their pills, so we
- * initialize that here. Capsule initialization is
- * idempotent, as is setting the pill's format (provided
- * it doesn't change).
- */
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
- if (opc == OST_READ)
- req_capsule_set(&req->rq_pill,
- &RQF_OST_BRW_READ);
- else
- req_capsule_set(&req->rq_pill,
- &RQF_OST_BRW_WRITE);
-
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- RETURN(-EFAULT);
- }
-
- objcount = req_capsule_get_size(&req->rq_pill,
- &RMF_OBD_IOOBJ,
- RCL_CLIENT) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- RETURN(-EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- RETURN(-EFAULT);
- }
-
- ioo = req_capsule_client_get(&req->rq_pill,
- &RMF_OBD_IOOBJ);
- if (ioo == NULL) {
- CERROR("Missing/short ioobj\n");
- RETURN(-EFAULT);
- }
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
- if (rc) {
- CERROR("invalid object ids\n");
- RETURN(rc);
- }
-
- for (niocount = i = 0; i < objcount; i++) {
- if (ioo[i].ioo_bufcnt == 0) {
- CERROR("ioo[%d] has zero bufcnt\n", i);
- RETURN(-EFAULT);
- }
- niocount += ioo[i].ioo_bufcnt;
- }
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_RPCTRACE, req,
- "bulk has too many pages (%d)",
- niocount);
- RETURN(-EFAULT);
- }
-
- nb = req_capsule_client_get(&req->rq_pill,
- &RMF_NIOBUF_REMOTE);
- if (nb == NULL) {
- CERROR("Missing/short niobuf\n");
- RETURN(-EFAULT);
- }
-
- if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
- req->rq_ops = &ost_hpreq_rw;
- } else if (opc == OST_PUNCH) {
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
- req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
-
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- RETURN(-EFAULT);
- }
-
- if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
- !(body->oa.o_flags & OBD_FL_SRVLOCK))
- req->rq_ops = &ost_hpreq_punch;
- }
- }
- RETURN(0);
-}
-
#define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
static struct cfs_cpt_table *ost_io_cptable;
LPROC_SEQ_FOPS_RO_TYPE(ost, uuid);
static struct lprocfs_seq_vars lprocfs_ost_obd_vars[] = {
- { "uuid", &ost_uuid_fops },
+ { .name = "uuid",
+ .fops = &ost_uuid_fops },
{ 0 }
};
#endif /* LPROCFS */
GOTO(out_service, rc);
}
- mask = cfs_cpt_table->ctb_nodemask;
+ mask = cfs_cpt_nodemask(cfs_cpt_table, CFS_CPT_ANY);
/* event CPT feature is disabled in libcfs level by set partition
* number to 1, we still want to set node affinity for io service */
if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
.so_thr_init = tgt_io_thread_init,
.so_thr_done = tgt_io_thread_done,
.so_req_handler = tgt_request_handle,
- .so_hpreq_handler = ost_io_hpreq_handler,
+ .so_hpreq_handler = tgt_hpreq_handler,
.so_req_printer = target_print_req,
},
};
GOTO(out_seq, rc);
}
+ /* Index read service */
+ memset(&svc_conf, 0, sizeof(svc_conf));
+ svc_conf = (typeof(svc_conf)) {
+ .psc_name = "ost_idx_read",
+ .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
+ .psc_buf = {
+ .bc_nbufs = OST_NBUFS,
+ .bc_buf_size = OST_BUFSIZE,
+ .bc_req_max_size = OST_MAXREQSIZE,
+ .bc_rep_max_size = OST_MAXREPSIZE,
+ .bc_req_portal = OST_IDX_PORTAL,
+ .bc_rep_portal = OSC_REPLY_PORTAL,
+ },
+ .psc_thr = {
+ .tc_thr_name = "ll_ost_idx",
+ .tc_thr_factor = OSS_CR_THR_FACTOR,
+ .tc_nthrs_init = OSS_CR_NTHRS_INIT,
+ .tc_nthrs_base = OSS_CR_NTHRS_BASE,
+ .tc_nthrs_max = OSS_CR_NTHRS_MAX,
+ .tc_nthrs_user = oss_num_create_threads,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_DT_THREAD,
+ },
+ .psc_cpt = {
+ .cc_pattern = oss_cpts,
+ },
+ .psc_ops = {
+ .so_req_handler = tgt_request_handle,
+ .so_req_printer = target_print_req,
+ },
+ };
+ ost->ost_idx_service = ptlrpc_register_service(&svc_conf,
+ obd->obd_proc_entry);
+ if (IS_ERR(ost->ost_idx_service)) {
+ rc = PTR_ERR(ost->ost_idx_service);
+ CERROR("failed to start OST index read service: rc = %d\n", rc);
+ ost->ost_idx_service = NULL;
+ GOTO(out_out, rc);
+ }
+
ping_evictor_start();
RETURN(0);
+
+out_out:
+ ptlrpc_unregister_service(ost->ost_out_service);
+ ost->ost_out_service = NULL;
out_seq:
ptlrpc_unregister_service(ost->ost_seq_service);
ost->ost_seq_service = NULL;
ptlrpc_unregister_service(ost->ost_io_service);
ptlrpc_unregister_service(ost->ost_seq_service);
ptlrpc_unregister_service(ost->ost_out_service);
+ ptlrpc_unregister_service(ost->ost_idx_service);
ost->ost_service = NULL;
ost->ost_create_service = NULL;
ost->ost_io_service = NULL;
ost->ost_seq_service = NULL;
ost->ost_out_service = NULL;
+ ost->ost_idx_service = NULL;
mutex_unlock(&ost->ost_health_mutex);