- rs->rs_opc = lustre_msg_get_opc(rs->rs_msg);
-
- cfs_spin_lock(&exp->exp_uncommitted_replies_lock);
- CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n",
- rs->rs_transno, exp->exp_last_committed);
- if (rs->rs_transno > exp->exp_last_committed) {
- /* not committed already */
- cfs_list_add_tail(&rs->rs_obd_list,
- &exp->exp_uncommitted_replies);
- }
- cfs_spin_unlock (&exp->exp_uncommitted_replies_lock);
-
- cfs_spin_lock(&exp->exp_lock);
- cfs_list_add_tail(&rs->rs_exp_list, &exp->exp_outstanding_replies);
- cfs_spin_unlock(&exp->exp_lock);
-
- netrc = target_send_reply_msg (req, rc, fail_id);
-
- cfs_spin_lock(&svc->srv_rs_lock);
-
- cfs_atomic_inc(&svc->srv_n_difficult_replies);
-
- if (netrc != 0) {
- /* error sending: reply is off the net. Also we need +1
- * reply ref until ptlrpc_handle_rs() is done
- * with the reply state (if the send was successful, there
- * would have been +1 ref for the net, which
- * reply_out_callback leaves alone) */
- rs->rs_on_net = 0;
- ptlrpc_rs_addref(rs);
- }
-
- cfs_spin_lock(&rs->rs_lock);
- if (rs->rs_transno <= exp->exp_last_committed ||
- (!rs->rs_on_net && !rs->rs_no_ack) ||
- cfs_list_empty(&rs->rs_exp_list) || /* completed already */
- cfs_list_empty(&rs->rs_obd_list)) {
- CDEBUG(D_HA, "Schedule reply immediately\n");
- ptlrpc_dispatch_difficult_reply(rs);
- } else {
- cfs_list_add (&rs->rs_list, &svc->srv_active_replies);
- rs->rs_scheduled = 0; /* allow notifier to schedule */
- }
- cfs_spin_unlock(&rs->rs_lock);
- cfs_spin_unlock(&svc->srv_rs_lock);
- EXIT;
-}
-
-int target_handle_ping(struct ptlrpc_request *req)
-{
- obd_ping(req->rq_export);
- return req_capsule_server_pack(&req->rq_pill);
-}
-
-void target_committed_to_req(struct ptlrpc_request *req)
-{
- struct obd_export *exp = req->rq_export;
-
- if (!exp->exp_obd->obd_no_transno && req->rq_repmsg != NULL)
- lustre_msg_set_last_committed(req->rq_repmsg,
- exp->exp_last_committed);
- else
- DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/"
- "%d)", exp->exp_obd->obd_no_transno,
- req->rq_repmsg == NULL);
-
- CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n",
- exp->exp_last_committed, req->rq_transno, req->rq_xid);
-}
-EXPORT_SYMBOL(target_committed_to_req);
-
-int target_handle_qc_callback(struct ptlrpc_request *req)
-{
- struct obd_quotactl *oqctl;
- struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
-
- oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL) {
- CERROR("Can't unpack obd_quotactl\n");
- RETURN(-EPROTO);
- }
-
- cli->cl_qchk_stat = oqctl->qc_stat;
-
- return 0;
-}
-
-#ifdef HAVE_QUOTA_SUPPORT
-int target_handle_dqacq_callback(struct ptlrpc_request *req)
-{
-#ifdef __KERNEL__
- struct obd_device *obd = req->rq_export->exp_obd;
- struct obd_device *master_obd = NULL, *lov_obd = NULL;
- struct obd_device_target *obt;
- struct lustre_quota_ctxt *qctxt;
- struct qunit_data *qdata = NULL;
- int rc = 0;
- ENTRY;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DROP_QUOTA_REQ))
- RETURN(rc);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc) {
- CERROR("packing reply failed!: rc = %d\n", rc);
- RETURN(rc);
- }
-
- LASSERT(req->rq_export);
-
- qdata = quota_get_qdata(req, QUOTA_REQUEST, QUOTA_EXPORT);
- if (IS_ERR(qdata)) {
- rc = PTR_ERR(qdata);
- CDEBUG(D_ERROR, "Can't unpack qunit_data(rc: %d)\n", rc);
- req->rq_status = rc;
- GOTO(out, rc);
- }
-
- /* we use the observer */
- if (obd_pin_observer(obd, &lov_obd) ||
- obd_pin_observer(lov_obd, &master_obd)) {
- CERROR("Can't find the observer, it is recovering\n");
- req->rq_status = -EAGAIN;
- GOTO(out, rc);
- }
-
- obt = &master_obd->u.obt;
- qctxt = &obt->obt_qctxt;
-
- if (!qctxt->lqc_setup || !qctxt->lqc_valid) {
- /* quota_type has not been processed yet, return EAGAIN
- * until we know whether or not quotas are supposed to
- * be enabled */
- CDEBUG(D_QUOTA, "quota_type not processed yet, return "
- "-EAGAIN\n");
- req->rq_status = -EAGAIN;
- GOTO(out, rc);
- }
-
- cfs_down_read(&obt->obt_rwsem);
- if (qctxt->lqc_lqs_hash == NULL) {
- cfs_up_read(&obt->obt_rwsem);
- /* quota_type has not been processed yet, return EAGAIN
- * until we know whether or not quotas are supposed to
- * be enabled */
- CDEBUG(D_QUOTA, "quota_ctxt is not ready yet, return "
- "-EAGAIN\n");
- req->rq_status = -EAGAIN;
- GOTO(out, rc);
- }
-
- LASSERT(qctxt->lqc_handler);
- rc = qctxt->lqc_handler(master_obd, qdata,
- lustre_msg_get_opc(req->rq_reqmsg));
- cfs_up_read(&obt->obt_rwsem);
- if (rc && rc != -EDQUOT)
- CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR,
- "dqacq/dqrel failed! (rc:%d)\n", rc);
- req->rq_status = rc;
-
- rc = quota_copy_qdata(req, qdata, QUOTA_REPLY, QUOTA_EXPORT);
- if (rc < 0) {
- CERROR("Can't pack qunit_data(rc: %d)\n", rc);
- GOTO(out, rc);
- }
-
- /* Block the quota req. b=14840 */
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_BLOCK_QUOTA_REQ, obd_timeout);
- EXIT;
-
-out:
- if (master_obd)
- obd_unpin_observer(lov_obd);
- if (lov_obd)
- obd_unpin_observer(obd);
-
- rc = ptlrpc_reply(req);
- return rc;
-#else
- return 0;
-#endif /* !__KERNEL__ */
+ rs->rs_opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ spin_lock(&exp->exp_uncommitted_replies_lock);
+ CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n",
+ rs->rs_transno, exp->exp_last_committed);
+ if (rs->rs_transno > exp->exp_last_committed) {
+ /* not committed already */
+ list_add_tail(&rs->rs_obd_list,
+ &exp->exp_uncommitted_replies);
+ }
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
+
+ spin_lock(&exp->exp_lock);
+ list_add_tail(&rs->rs_exp_list, &exp->exp_outstanding_replies);
+ spin_unlock(&exp->exp_lock);
+
+ netrc = target_send_reply_msg(req, rc, fail_id);
+
+ spin_lock(&svcpt->scp_rep_lock);
+
+ atomic_inc(&svcpt->scp_nreps_difficult);
+
+ if (netrc != 0) {
+ /* error sending: reply is off the net. Also we need +1
+ * reply ref until ptlrpc_handle_rs() is done
+ * with the reply state (if the send was successful, there
+ * would have been +1 ref for the net, which
+ * reply_out_callback leaves alone) */
+ rs->rs_on_net = 0;
+ ptlrpc_rs_addref(rs);
+ }
+
+ spin_lock(&rs->rs_lock);
+ if (rs->rs_transno <= exp->exp_last_committed ||
+ (!rs->rs_on_net && !rs->rs_no_ack) ||
+ list_empty(&rs->rs_exp_list) || /* completed already */
+ list_empty(&rs->rs_obd_list)) {
+ CDEBUG(D_HA, "Schedule reply immediately\n");
+ ptlrpc_dispatch_difficult_reply(rs);
+ } else {
+ list_add(&rs->rs_list, &svcpt->scp_rep_active);
+ rs->rs_scheduled = 0; /* allow notifier to schedule */
+ }
+ spin_unlock(&rs->rs_lock);
+ spin_unlock(&svcpt->scp_rep_lock);
+ EXIT;