-
-static int ost_llog_handle_connect(struct obd_export *exp,
- struct ptlrpc_request *req)
-{
- struct llogd_conn_body *body;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
- rc = obd_llog_connect(exp, body);
- RETURN(rc);
-}
-
-#define ost_init_sec_none(reply, exp) \
-do { \
- reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT | \
- OBD_CONNECT_RMT_CLIENT_FORCE | \
- OBD_CONNECT_OSS_CAPA); \
- cfs_spin_lock(&exp->exp_lock); \
- exp->exp_connect_flags = reply->ocd_connect_flags; \
- cfs_spin_unlock(&exp->exp_lock); \
-} while (0)
-
-static int ost_init_sec_level(struct ptlrpc_request *req)
-{
- struct obd_export *exp = req->rq_export;
- struct req_capsule *pill = &req->rq_pill;
- struct obd_device *obd = exp->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- char *client = libcfs_nid2str(req->rq_peer.nid);
- struct obd_connect_data *data, *reply;
- int rc = 0, remote;
- ENTRY;
-
- data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
- reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
- if (data == NULL || reply == NULL)
- RETURN(-EFAULT);
-
- /* connection from MDT is always trusted */
- if (req->rq_auth_usr_mdt) {
- ost_init_sec_none(reply, exp);
- RETURN(0);
- }
-
- /* no GSS support case */
- if (!req->rq_auth_gss) {
- if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
- CWARN("client %s -> target %s does not user GSS, "
- "can not run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- RETURN(-EACCES);
- } else {
- ost_init_sec_none(reply, exp);
- RETURN(0);
- }
- }
-
- /* old version case */
- if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
- !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
- if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
- CWARN("client %s -> target %s uses old version, "
- "can not run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- RETURN(-EACCES);
- } else {
- CWARN("client %s -> target %s uses old version, "
- "run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- ost_init_sec_none(reply, exp);
- RETURN(0);
- }
- }
-
- remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
- if (remote) {
- if (!req->rq_auth_remote)
- CDEBUG(D_SEC, "client (local realm) %s -> target %s "
- "asked to be remote.\n", client, obd->obd_name);
- } else if (req->rq_auth_remote) {
- remote = 1;
- CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
- "as remote by default.\n", client, obd->obd_name);
- }
-
- if (remote) {
- if (!filter->fo_fl_oss_capa) {
- CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
- " but OSS capabilities are not enabled: %d.\n",
- client, obd->obd_name, filter->fo_fl_oss_capa);
- RETURN(-EACCES);
- }
- }
-
- switch (filter->fo_sec_level) {
- case LUSTRE_SEC_NONE:
- if (!remote) {
- ost_init_sec_none(reply, exp);
- break;
- } else {
- CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
- "can not run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- RETURN(-EACCES);
- }
- case LUSTRE_SEC_REMOTE:
- if (!remote)
- ost_init_sec_none(reply, exp);
- break;
- case LUSTRE_SEC_ALL:
- if (!remote) {
- reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
- OBD_CONNECT_RMT_CLIENT_FORCE);
- if (!filter->fo_fl_oss_capa)
- reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
-
- cfs_spin_lock(&exp->exp_lock);
- exp->exp_connect_flags = reply->ocd_connect_flags;
- cfs_spin_unlock(&exp->exp_lock);
- }
- break;
- default:
- RETURN(-EINVAL);
- }
-
- RETURN(rc);
-}
-
-/*
- * FIXME
- * this should be done in filter_connect()/filter_reconnect(), but
- * we can't obtain information like NID, which stored in incoming
- * request, thus can't decide what flavor to use. so we do it here.
- *
- * This hack should be removed after the OST stack be rewritten, just
- * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
- */
-static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
-{
- struct obd_export *exp = req->rq_export;
- struct filter_obd *filter = &exp->exp_obd->u.filter;
- struct sptlrpc_flavor flvr;
- int rc = 0;
-
- if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
- LUSTRE_ECHO_NAME) == 0)) {
- exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
- return 0;
- }
-
- if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
- cfs_read_lock(&filter->fo_sptlrpc_lock);
- sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
- req->rq_sp_from,
- req->rq_peer.nid,
- &flvr);
- cfs_read_unlock(&filter->fo_sptlrpc_lock);
-
- cfs_spin_lock(&exp->exp_lock);
-
- exp->exp_sp_peer = req->rq_sp_from;
- exp->exp_flvr = flvr;
-
- if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
- exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
- CERROR("unauthorized rpc flavor %x from %s, "
- "expect %x\n", req->rq_flvr.sf_rpc,
- libcfs_nid2str(req->rq_peer.nid),
- exp->exp_flvr.sf_rpc);
- rc = -EACCES;
- }
-
- cfs_spin_unlock(&exp->exp_lock);
- } else {
- if (exp->exp_sp_peer != req->rq_sp_from) {
- CERROR("RPC source %s doesn't match %s\n",
- sptlrpc_part2name(req->rq_sp_from),
- sptlrpc_part2name(exp->exp_sp_peer));
- rc = -EACCES;
- } else {
- rc = sptlrpc_target_export_check(exp, req);
- }
- }
-
- return rc;
-}
-
-/* Ensure that data and metadata are synced to the disk when lock is cancelled
- * (if requested) */
-int ost_blocking_ast(struct ldlm_lock *lock,
- struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- __u32 sync_lock_cancel = 0;
- __u32 len = sizeof(sync_lock_cancel);
- int rc = 0;
- ENTRY;
-
- rc = obd_get_info(lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
- KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
-
- if (!rc && flag == LDLM_CB_CANCELING &&
- (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
- (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
- (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
- lock->l_flags & LDLM_FL_CBPENDING))) {
- struct obdo *oa;
- int rc;
-
- OBDO_ALLOC(oa);
- oa->o_id = lock->l_resource->lr_name.name[0];
- oa->o_seq = lock->l_resource->lr_name.name[1];
- oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
-
- rc = obd_sync(lock->l_export, oa, NULL,
- lock->l_policy_data.l_extent.start,
- lock->l_policy_data.l_extent.end, NULL);
- if (rc)
- CERROR("Error %d syncing data on lock cancel\n", rc);
-
- OBDO_FREE(oa);
- }
-
- return ldlm_server_blocking_ast(lock, desc, data, flag);
-}
-
-static int ost_filter_recovery_request(struct ptlrpc_request *req,
- struct obd_device *obd, int *process)
-{
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
- case OST_CONNECT: /* This will never get here, but for completeness. */
- case OST_DISCONNECT:
- *process = 1;
- RETURN(0);
-
- case OBD_PING:
- case OST_CREATE:
- case OST_DESTROY:
- case OST_PUNCH:
- case OST_SETATTR:
- case OST_SYNC:
- case OST_WRITE:
- case OBD_LOG_CANCEL:
- case LDLM_ENQUEUE:
- *process = target_queue_recovery_request(req, obd);
- RETURN(0);
-
- default:
- DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
- *process = -EAGAIN;
- RETURN(0);
- }
-}
-
-int ost_msg_check_version(struct lustre_msg *msg)
-{
- int rc;
-
- switch(lustre_msg_get_opc(msg)) {
- case OST_CONNECT:
- case OST_DISCONNECT:
- case OBD_PING:
- case SEC_CTX_INIT:
- case SEC_CTX_INIT_CONT:
- case SEC_CTX_FINI:
- rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_OBD_VERSION);
- break;
- case OST_CREATE:
- case OST_DESTROY:
- case OST_GETATTR:
- case OST_SETATTR:
- case OST_WRITE:
- case OST_READ:
- case OST_PUNCH:
- case OST_STATFS:
- case OST_SYNC:
- case OST_SET_INFO:
- case OST_GET_INFO:
-#ifdef HAVE_QUOTA_SUPPORT
- case OST_QUOTACHECK:
- case OST_QUOTACTL:
- case OST_QUOTA_ADJUST_QUNIT:
-#endif
- rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_OST_VERSION);
- break;
- case LDLM_ENQUEUE:
- case LDLM_CONVERT:
- case LDLM_CANCEL:
- case LDLM_BL_CALLBACK:
- case LDLM_CP_CALLBACK:
- rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_DLM_VERSION);
- break;
- case LLOG_ORIGIN_CONNECT:
- case OBD_LOG_CANCEL:
- rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_LOG_VERSION);
- break;
- default:
- CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
- rc = -ENOTSUPP;
- }
- return rc;
-}
-
-/**
- * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
- * not.
- */
-static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
- struct ldlm_lock *lock)
-{
- struct niobuf_remote *nb;
- struct obd_ioobj *ioo;
- struct ost_body *body;
- int objcount, niocount;
- int mode, opc, i, rc;
- __u64 start, end;
- ENTRY;
-
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- LASSERT(opc == OST_READ || opc == OST_WRITE);
-
- /* As the request may be covered by several locks, do not look at
- * o_handle, look at the RPC IO region. */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(0);
-
- objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
- RCL_CLIENT) / sizeof(*ioo);
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- if (ioo == NULL)
- RETURN(0);
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
- if (rc)
- RETURN(rc);
-
- for (niocount = i = 0; i < objcount; i++)
- niocount += ioo[i].ioo_bufcnt;
-
- nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- if (nb == NULL ||
- niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
- RCL_CLIENT) / sizeof(*nb)))
- RETURN(0);
-
- mode = LCK_PW;
- if (opc == OST_READ)
- mode |= LCK_PR;
-
- start = nb[0].offset & CFS_PAGE_MASK;
- end = (nb[ioo->ioo_bufcnt - 1].offset +
- nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
-
- LASSERT(lock->l_resource != NULL);
- if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
- &lock->l_resource->lr_name))
- RETURN(0);
-
- if (!(lock->l_granted_mode & mode))
- RETURN(0);
-
- if (lock->l_policy_data.l_extent.end < start ||
- lock->l_policy_data.l_extent.start > end)
- RETURN(0);
-
- RETURN(1);
-}
-
-/**
- * High-priority queue request check for whether the given PTLRPC request (\a
- * req) is blocking an LDLM lock cancel.
- *
- * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
- * cancel, 0 if it is not, and -EFAULT if the request is malformed.
- *
- * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
- * function looks only at OST_READs and OST_WRITEs.
- */
-static int ost_rw_hpreq_check(struct ptlrpc_request *req)
-{
- struct niobuf_remote *nb;
- struct obd_ioobj *ioo;
- struct ost_body *body;
- int objcount, niocount;
- int mode, opc, i, rc;
- ENTRY;
-
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- LASSERT(opc == OST_READ || opc == OST_WRITE);
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
- RCL_CLIENT) / sizeof(*ioo);
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- if (ioo == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
- if (rc)
- RETURN(rc);
-
- for (niocount = i = 0; i < objcount; i++)
- niocount += ioo[i].ioo_bufcnt;
- nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- if (nb == NULL ||
- niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
- RCL_CLIENT) / sizeof(*nb)))
- RETURN(-EFAULT);
- if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
- RETURN(-EFAULT);
-
- mode = LCK_PW;
- if (opc == OST_READ)
- mode |= LCK_PR;
- RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
-}
-
-static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
-{
- struct ldlm_res_id res_id = { .name = { oa->o_id } };
- struct ost_prolong_data opd = { 0 };
- __u64 start, end;
- ENTRY;
-
- start = oa->o_size;
- end = start + oa->o_blocks;
-
- opd.opd_mode = LCK_PW;
- opd.opd_exp = req->rq_export;
- opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
- if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
- opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
- else
- opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
-
- /* prolong locks for the current service time of the corresponding
- * portal (= OST_IO_PORTAL) */
- opd.opd_timeout = AT_OFF ? obd_timeout / 2:
- max(at_est2timeout(at_get(&req->rq_rqbd->
- rqbd_service->srv_at_estimate)), ldlm_timeout);
-
- CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
- res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
- opd.opd_policy.l_extent.end);
-
- opd.opd_oa = oa;
- ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
- ost_prolong_locks_iter, &opd);
- RETURN(opd.opd_lock_match);
-}
-
-/**
- * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
- struct ldlm_lock *lock)
-{
- struct ost_body *body;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(0); /* can't return -EFAULT here */
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- if (body->oa.o_valid & OBD_MD_FLHANDLE &&
- body->oa.o_handle.cookie == lock->l_handle.h_cookie)
- RETURN(1);
- RETURN(0);
-}
-
-/**
- * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_check(struct ptlrpc_request *req)
-{
- struct ost_body *body;
- int rc;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
- !(body->oa.o_flags & OBD_FL_SRVLOCK));
-
- RETURN(ost_punch_prolong_locks(req, &body->oa));
-}
-
-struct ptlrpc_hpreq_ops ost_hpreq_rw = {
- .hpreq_lock_match = ost_rw_hpreq_lock_match,
- .hpreq_check = ost_rw_hpreq_check,
-};
-
-struct ptlrpc_hpreq_ops ost_hpreq_punch = {
- .hpreq_lock_match = ost_punch_hpreq_lock_match,
- .hpreq_check = ost_punch_hpreq_check,
-};
-
-/** Assign high priority operations to the request if needed. */
-static int ost_hpreq_handler(struct ptlrpc_request *req)
-{
- ENTRY;
- if (req->rq_export) {
- int opc = lustre_msg_get_opc(req->rq_reqmsg);
- struct ost_body *body;
-
- if (opc == OST_READ || opc == OST_WRITE) {
- struct niobuf_remote *nb;
- struct obd_ioobj *ioo;
- int objcount, niocount;
- int i;
-
- /* RPCs on the H-P queue can be inspected before
- * ost_handler() initializes their pills, so we
- * initialize that here. Capsule initialization is
- * idempotent, as is setting the pill's format (provided
- * it doesn't change).
- */
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
- if (opc == OST_READ)
- req_capsule_set(&req->rq_pill,
- &RQF_OST_BRW_READ);
- else
- req_capsule_set(&req->rq_pill,
- &RQF_OST_BRW_WRITE);
-
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- RETURN(-EFAULT);
- }
-
- objcount = req_capsule_get_size(&req->rq_pill,
- &RMF_OBD_IOOBJ,
- RCL_CLIENT) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- RETURN(-EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- RETURN(-EFAULT);
- }
-
- ioo = req_capsule_client_get(&req->rq_pill,
- &RMF_OBD_IOOBJ);
- if (ioo == NULL) {
- CERROR("Missing/short ioobj\n");
- RETURN(-EFAULT);
- }
-
- for (niocount = i = 0; i < objcount; i++) {
- if (ioo[i].ioo_bufcnt == 0) {
- CERROR("ioo[%d] has zero bufcnt\n", i);
- RETURN(-EFAULT);
- }
- niocount += ioo[i].ioo_bufcnt;
- }
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_RPCTRACE, req,
- "bulk has too many pages (%d)",
- niocount);
- RETURN(-EFAULT);
- }
-
- nb = req_capsule_client_get(&req->rq_pill,
- &RMF_NIOBUF_REMOTE);
- if (nb == NULL) {
- CERROR("Missing/short niobuf\n");
- RETURN(-EFAULT);
- }
-
- if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
- req->rq_ops = &ost_hpreq_rw;
- } else if (opc == OST_PUNCH) {
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
- req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
-
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- RETURN(-EFAULT);
- }
-
- if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
- !(body->oa.o_flags & OBD_FL_SRVLOCK))
- req->rq_ops = &ost_hpreq_punch;
- }
- }
- RETURN(0);
-}
-
-/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
-int ost_handle(struct ptlrpc_request *req)
-{
- struct obd_trans_info trans_info = { 0, };
- struct obd_trans_info *oti = &trans_info;
- int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
- struct obd_device *obd = NULL;
- ENTRY;
-
- LASSERT(current->journal_info == NULL);
-
- /* primordial rpcs don't affect server recovery */
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
- case SEC_CTX_INIT:
- case SEC_CTX_INIT_CONT:
- case SEC_CTX_FINI:
- GOTO(out, rc = 0);
- }
-
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-
- if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
- if (!class_connected_export(req->rq_export)) {
- CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer));
- req->rq_status = -ENOTCONN;
- GOTO(out, rc = -ENOTCONN);
- }
-
- obd = req->rq_export->exp_obd;
-
- /* Check for aborted recovery. */
- if (obd->obd_recovering) {
- rc = ost_filter_recovery_request(req, obd,
- &should_process);
- if (rc || !should_process)
- RETURN(rc);
- else if (should_process < 0) {
- req->rq_status = should_process;
- rc = ptlrpc_error(req);
- RETURN(rc);
- }
- }
- }
-
- oti_init(oti, req);
-
- rc = ost_msg_check_version(req->rq_reqmsg);
- if (rc)
- RETURN(rc);
-
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
- case OST_CONNECT: {
- CDEBUG(D_INODE, "connect\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
- RETURN(0);
- rc = target_handle_connect(req);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
- RETURN(0);
- if (!rc) {
- rc = ost_init_sec_level(req);
- if (!rc)
- rc = ost_connect_check_sptlrpc(req);
- }
- break;
- }
- case OST_DISCONNECT:
- CDEBUG(D_INODE, "disconnect\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
- RETURN(0);
- rc = target_handle_disconnect(req);
- break;
- case OST_CREATE:
- CDEBUG(D_INODE, "create\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
- GOTO(out, rc = -EROFS);
- rc = ost_create(req->rq_export, req, oti);
- break;
- case OST_DESTROY:
- CDEBUG(D_INODE, "destroy\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
- GOTO(out, rc = -EROFS);
- rc = ost_destroy(req->rq_export, req, oti);
- break;
- case OST_GETATTR:
- CDEBUG(D_INODE, "getattr\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
- RETURN(0);
- rc = ost_getattr(req->rq_export, req);
- break;
- case OST_SETATTR:
- CDEBUG(D_INODE, "setattr\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
- RETURN(0);
- rc = ost_setattr(req->rq_export, req, oti);
- break;
- case OST_WRITE:
- req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
- CDEBUG(D_INODE, "write\n");
- /* req->rq_request_portal would be nice, if it was set */
- if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
- CERROR("%s: deny write request from %s to portal %u\n",
- req->rq_export->exp_obd->obd_name,
- obd_export_nid2str(req->rq_export),
- req->rq_rqbd->rqbd_service->srv_req_portal);
- GOTO(out, rc = -EPROTO);
- }
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
- GOTO(out, rc = -ENOSPC);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
- GOTO(out, rc = -EROFS);
- rc = ost_brw_write(req, oti);
- LASSERT(current->journal_info == NULL);
- /* ost_brw_write sends its own replies */
- RETURN(rc);
- case OST_READ:
- req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
- CDEBUG(D_INODE, "read\n");
- /* req->rq_request_portal would be nice, if it was set */
- if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
- CERROR("%s: deny read request from %s to portal %u\n",
- req->rq_export->exp_obd->obd_name,
- obd_export_nid2str(req->rq_export),
- req->rq_rqbd->rqbd_service->srv_req_portal);
- GOTO(out, rc = -EPROTO);
- }
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
- RETURN(0);
- rc = ost_brw_read(req, oti);
- LASSERT(current->journal_info == NULL);
- /* ost_brw_read sends its own replies */
- RETURN(rc);
- case OST_PUNCH:
- CDEBUG(D_INODE, "punch\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
- GOTO(out, rc = -EROFS);
- rc = ost_punch(req->rq_export, req, oti);
- break;
- case OST_STATFS:
- CDEBUG(D_INODE, "statfs\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
- RETURN(0);
- rc = ost_statfs(req);
- break;
- case OST_SYNC:
- CDEBUG(D_INODE, "sync\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
- RETURN(0);
- rc = ost_sync(req->rq_export, req);
- break;
- case OST_SET_INFO:
- DEBUG_REQ(D_INODE, req, "set_info");
- req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
- rc = ost_set_info(req->rq_export, req);
- break;
- case OST_GET_INFO:
- DEBUG_REQ(D_INODE, req, "get_info");
- req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
- rc = ost_get_info(req->rq_export, req);
- break;
-#ifdef HAVE_QUOTA_SUPPORT
- case OST_QUOTACHECK:
- CDEBUG(D_INODE, "quotacheck\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
- RETURN(0);
- rc = ost_handle_quotacheck(req);
- break;
- case OST_QUOTACTL:
- CDEBUG(D_INODE, "quotactl\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
- RETURN(0);
- rc = ost_handle_quotactl(req);
- break;
- case OST_QUOTA_ADJUST_QUNIT:
- CDEBUG(D_INODE, "quota_adjust_qunit\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
- rc = ost_handle_quota_adjust_qunit(req);
- break;
-#endif
- case OBD_PING:
- DEBUG_REQ(D_INODE, req, "ping");
- req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
- rc = target_handle_ping(req);
- break;
- /* FIXME - just reply status */
- case LLOG_ORIGIN_CONNECT:
- DEBUG_REQ(D_INODE, req, "log connect");
- req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
- rc = ost_llog_handle_connect(req->rq_export, req);
- req->rq_status = rc;
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
- RETURN(ptlrpc_reply(req));
- case OBD_LOG_CANCEL:
- CDEBUG(D_INODE, "log cancel\n");
- req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
- if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
- RETURN(0);
- rc = llog_origin_handle_cancel(req);
- if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
- RETURN(0);
- req->rq_status = rc;
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
- RETURN(ptlrpc_reply(req));
- case LDLM_ENQUEUE:
- CDEBUG(D_INODE, "enqueue\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
- RETURN(0);
- rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
- ost_blocking_ast,
- ldlm_server_glimpse_ast);
- fail = OBD_FAIL_OST_LDLM_REPLY_NET;
- break;
- case LDLM_CONVERT:
- CDEBUG(D_INODE, "convert\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
- RETURN(0);
- rc = ldlm_handle_convert(req);
- break;
- case LDLM_CANCEL:
- CDEBUG(D_INODE, "cancel\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
- RETURN(0);
- rc = ldlm_handle_cancel(req);
- break;
- case LDLM_BL_CALLBACK:
- case LDLM_CP_CALLBACK:
- CDEBUG(D_INODE, "callback\n");
- CERROR("callbacks should not happen on OST\n");
- /* fall through */
- default:
- CERROR("Unexpected opcode %d\n",
- lustre_msg_get_opc(req->rq_reqmsg));
- req->rq_status = -ENOTSUPP;
- rc = ptlrpc_error(req);
- RETURN(rc);
- }
-
- LASSERT(current->journal_info == NULL);
-
- EXIT;
- /* If we're DISCONNECTing, the export_data is already freed */
- if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
- target_committed_to_req(req);
-
-out:
- if (!rc)
- oti_to_request(oti, req);
-
- target_send_reply(req, rc, fail);
- return 0;
-}
-EXPORT_SYMBOL(ost_handle);
-/*
- * free per-thread pool created by ost_thread_init().
- */
-static void ost_thread_done(struct ptlrpc_thread *thread)
-{
- struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
- * Storage */
-
- ENTRY;
-
- LASSERT(thread != NULL);
-
- /*
- * be prepared to handle partially-initialized pools (because this is
- * called from ost_thread_init() for cleanup.
- */
- tls = thread->t_data;
- if (tls != NULL) {
- OBD_FREE_PTR(tls);
- thread->t_data = NULL;
- }
- EXIT;
-}
-
-/*
- * initialize per-thread page pool (bug 5137).
- */
-static int ost_thread_init(struct ptlrpc_thread *thread)
-{
- struct ost_thread_local_cache *tls;
-
- ENTRY;
-
- LASSERT(thread != NULL);
- LASSERT(thread->t_data == NULL);
- LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
-
- OBD_ALLOC_PTR(tls);
- if (tls == NULL)
- RETURN(-ENOMEM);
- thread->t_data = tls;
- RETURN(0);
-}
-
-#define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
-
-/* Sigh - really, this is an OSS, the _server_, not the _target_ */
-static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
-{
- struct ost_obd *ost = &obd->u.ost;
- struct lprocfs_static_vars lvars;
- int oss_min_threads;
- int oss_max_threads;
- int oss_min_create_threads;
- int oss_max_create_threads;
- int rc;
- ENTRY;
-
- rc = cfs_cleanup_group_info();
- if (rc)
- RETURN(rc);
-
- lprocfs_ost_init_vars(&lvars);
- lprocfs_obd_setup(obd, lvars.obd_vars);
-
- cfs_sema_init(&ost->ost_health_sem, 1);
-
- if (oss_num_threads) {
- /* If oss_num_threads is set, it is the min and the max. */
- if (oss_num_threads > OSS_THREADS_MAX)
- oss_num_threads = OSS_THREADS_MAX;
- if (oss_num_threads < OSS_THREADS_MIN)
- oss_num_threads = OSS_THREADS_MIN;
- oss_max_threads = oss_min_threads = oss_num_threads;
- } else {
- /* Base min threads on memory and cpus */
- oss_min_threads =
- cfs_num_possible_cpus() * CFS_NUM_CACHEPAGES >>
- (27 - CFS_PAGE_SHIFT);
- if (oss_min_threads < OSS_THREADS_MIN)
- oss_min_threads = OSS_THREADS_MIN;
- /* Insure a 4x range for dynamic threads */
- if (oss_min_threads > OSS_THREADS_MAX / 4)
- oss_min_threads = OSS_THREADS_MAX / 4;
- oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
- }
-
- ost->ost_service =
- ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
- OST_MAXREPSIZE, OST_REQUEST_PORTAL,
- OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
- ost_handle, LUSTRE_OSS_NAME,
- obd->obd_proc_entry, target_print_req,
- oss_min_threads, oss_max_threads,
- "ll_ost", LCT_DT_THREAD, NULL);
- if (ost->ost_service == NULL) {
- CERROR("failed to start service\n");
- GOTO(out_lprocfs, rc = -ENOMEM);
- }
-
- rc = ptlrpc_start_threads(obd, ost->ost_service);
- if (rc)
- GOTO(out_service, rc = -EINVAL);
-
- if (oss_num_create_threads) {
- if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
- oss_num_create_threads = OSS_MAX_CREATE_THREADS;
- if (oss_num_create_threads < OSS_MIN_CREATE_THREADS)
- oss_num_create_threads = OSS_MIN_CREATE_THREADS;
- oss_min_create_threads = oss_max_create_threads =
- oss_num_create_threads;
- } else {
- oss_min_create_threads = OSS_MIN_CREATE_THREADS;
- oss_max_create_threads = OSS_MAX_CREATE_THREADS;
- }
-
- ost->ost_create_service =
- ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
- OST_MAXREPSIZE, OST_CREATE_PORTAL,
- OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
- ost_handle, "ost_create",
- obd->obd_proc_entry, target_print_req,
- oss_min_create_threads, oss_max_create_threads,
- "ll_ost_creat", LCT_DT_THREAD, NULL);
- if (ost->ost_create_service == NULL) {
- CERROR("failed to start OST create service\n");
- GOTO(out_service, rc = -ENOMEM);
- }
-
- rc = ptlrpc_start_threads(obd, ost->ost_create_service);
- if (rc)
- GOTO(out_create, rc = -EINVAL);
-
- ost->ost_io_service =
- ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
- OST_MAXREPSIZE, OST_IO_PORTAL,
- OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
- ost_handle, "ost_io",
- obd->obd_proc_entry, target_print_req,
- oss_min_threads, oss_max_threads,
- "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
- if (ost->ost_io_service == NULL) {
- CERROR("failed to start OST I/O service\n");
- GOTO(out_create, rc = -ENOMEM);
- }
-
- ost->ost_io_service->srv_init = ost_thread_init;
- ost->ost_io_service->srv_done = ost_thread_done;
- ost->ost_io_service->srv_cpu_affinity = 1;
- rc = ptlrpc_start_threads(obd, ost->ost_io_service);
- if (rc)
- GOTO(out_io, rc = -EINVAL);
-
- ping_evictor_start();
-
- RETURN(0);
-
-out_io:
- ptlrpc_unregister_service(ost->ost_io_service);
- ost->ost_io_service = NULL;
-out_create:
- ptlrpc_unregister_service(ost->ost_create_service);
- ost->ost_create_service = NULL;
-out_service:
- ptlrpc_unregister_service(ost->ost_service);
- ost->ost_service = NULL;
-out_lprocfs:
- lprocfs_obd_cleanup(obd);
- RETURN(rc);
-}
-
-static int ost_cleanup(struct obd_device *obd)
-{
- struct ost_obd *ost = &obd->u.ost;
- int err = 0;
- ENTRY;
-
- ping_evictor_stop();
-
- cfs_spin_lock_bh(&obd->obd_processing_task_lock);
- if (obd->obd_recovering) {
- target_cancel_recovery_timer(obd);
- obd->obd_recovering = 0;
- }
- cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-
- cfs_down(&ost->ost_health_sem);
- ptlrpc_unregister_service(ost->ost_service);
- ptlrpc_unregister_service(ost->ost_create_service);
- ptlrpc_unregister_service(ost->ost_io_service);
- ost->ost_service = NULL;
- ost->ost_create_service = NULL;
- cfs_up(&ost->ost_health_sem);
-
- lprocfs_obd_cleanup(obd);
-
- RETURN(err);
-}
-
-static int ost_health_check(struct obd_device *obd)
-{
- struct ost_obd *ost = &obd->u.ost;
- int rc = 0;
-
- cfs_down(&ost->ost_health_sem);
- rc |= ptlrpc_service_health_check(ost->ost_service);
- rc |= ptlrpc_service_health_check(ost->ost_create_service);
- rc |= ptlrpc_service_health_check(ost->ost_io_service);
- cfs_up(&ost->ost_health_sem);
-
- /*
- * health_check to return 0 on healthy
- * and 1 on unhealthy.
- */
- if( rc != 0)
- rc = 1;
-
- return rc;
-}
-
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
-{
- return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-}
-
-/* use obd ops to offer management infrastructure */
-static struct obd_ops ost_obd_ops = {
- .o_owner = THIS_MODULE,
- .o_setup = ost_setup,
- .o_cleanup = ost_cleanup,
- .o_health_check = ost_health_check,
-};
-
-
-static int __init ost_init(void)
-{
- struct lprocfs_static_vars lvars;
- int rc;
- ENTRY;
-
- lprocfs_ost_init_vars(&lvars);
- rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
- LUSTRE_OSS_NAME, NULL);