+/* This function makes sure dead exports are evicted in a timely manner.
+ This function is only called when some export receives a message (i.e.,
+ the network is up.) */
+static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
+{
+ struct obd_export *oldest_exp;
+ time_t oldest_time;
+
+ ENTRY;
+
+ LASSERT(exp);
+
+ /* Compensate for slow machines, etc, by faking our request time
+ into the future. Although this can break the strict time-ordering
+ of the list, we can be really lazy here - we don't have to evict
+ at the exact right moment. Eventually, all silent exports
+ will make it to the top of the list. */
+ exp->exp_last_request_time = max(exp->exp_last_request_time,
+ cfs_time_current_sec() + extra_delay);
+
+ CDEBUG(D_HA, "updating export %s at "CFS_TIME_T" exp %p\n",
+ exp->exp_client_uuid.uuid,
+ exp->exp_last_request_time, exp);
+
+ /* exports may get disconnected from the chain even though the
+ export has references, so we must keep the spin lock while
+ manipulating the lists */
+ spin_lock(&exp->exp_obd->obd_dev_lock);
+
+ if (list_empty(&exp->exp_obd_chain_timed)) {
+ /* this one is not timed */
+ spin_unlock(&exp->exp_obd->obd_dev_lock);
+ EXIT;
+ return;
+ }
+
+ list_move_tail(&exp->exp_obd_chain_timed,
+ &exp->exp_obd->obd_exports_timed);
+
+ oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next,
+ struct obd_export, exp_obd_chain_timed);
+ oldest_time = oldest_exp->exp_last_request_time;
+ spin_unlock(&exp->exp_obd->obd_dev_lock);
+
+ if (exp->exp_obd->obd_recovering) {
+ /* be nice to everyone during recovery */
+ EXIT;
+ return;
+ }
+
+ /* Note - racing to start/reset the obd_eviction timer is safe */
+ if (exp->exp_obd->obd_eviction_timer == 0) {
+ /* Check if the oldest entry is expired. */
+ if (cfs_time_current_sec() > (oldest_time + PING_EVICT_TIMEOUT +
+ extra_delay)) {
+ /* We need a second timer, in case the net was down and
+ * it just came back. Since the pinger may skip every
+ * other PING_INTERVAL (see note in ptlrpc_pinger_main),
+ * we better wait for 3. */
+ exp->exp_obd->obd_eviction_timer =
+ cfs_time_current_sec() + 3 * PING_INTERVAL;
+ CDEBUG(D_HA, "%s: Think about evicting %s from "CFS_TIME_T"\n",
+ exp->exp_obd->obd_name, obd_export_nid2str(exp),
+ oldest_time);
+ }
+ } else {
+ if (cfs_time_current_sec() >
+ (exp->exp_obd->obd_eviction_timer + extra_delay)) {
+ /* The evictor won't evict anyone who we've heard from
+ * recently, so we don't have to check before we start
+ * it. */
+ if (!ping_evictor_wake(exp))
+ exp->exp_obd->obd_eviction_timer = 0;
+ }
+ }
+
+ EXIT;
+}
+
+static int ptlrpc_check_req(struct ptlrpc_request *req)
+{
+ if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
+ req->rq_export->exp_conn_cnt)) {
+ DEBUG_REQ(D_ERROR, req,
+ "DROPPING req from old connection %d < %d",
+ lustre_msg_get_conn_cnt(req->rq_reqmsg),
+ req->rq_export->exp_conn_cnt);
+ return -EEXIST;
+ }
+ if (unlikely(req->rq_export->exp_obd &&
+ req->rq_export->exp_obd->obd_fail)) {
+ /* Failing over, don't handle any more reqs, send
+ error response instead. */
+ CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
+ req, req->rq_export->exp_obd->obd_name);
+ req->rq_status = -ENODEV;
+ ptlrpc_error(req);
+ return -ENODEV;
+ }
+
+ return 0;
+}
+
+static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_request *rq;
+ __s32 next;
+
+ spin_lock(&svc->srv_at_lock);
+ if (list_empty(&svc->srv_at_list)) {
+ cfs_timer_disarm(&svc->srv_at_timer);
+ spin_unlock(&svc->srv_at_lock);
+ return;
+ }
+
+ /* Set timer for closest deadline */
+ rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
+ rq_timed_list);
+ next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
+ at_early_margin);
+ if (next <= 0)
+ ptlrpc_at_timer((unsigned long)svc);
+ else
+ cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
+ spin_unlock(&svc->srv_at_lock);
+ CDEBUG(D_INFO, "armed %s at %+ds\n", svc->srv_name, next);
+}
+
+/* Add rpc to early reply check list */
+static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+ struct ptlrpc_request *rq;
+ int found = 0;
+
+ if (AT_OFF)
+ return(0);
+
+ if (req->rq_no_reply)
+ return 0;
+
+ if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
+ return(-ENOSYS);
+
+ spin_lock(&svc->srv_at_lock);
+
+ if (unlikely(req->rq_sent_final)) {
+ spin_unlock(&svc->srv_at_lock);
+ return 0;
+ }
+
+ LASSERT(list_empty(&req->rq_timed_list));
+ /* Add to sorted list. Presumably latest rpcs will have the latest
+ deadlines, so search backward. */
+ list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
+ if (req->rq_deadline >= rq->rq_deadline) {
+ list_add(&req->rq_timed_list, &rq->rq_timed_list);
+ found++;
+ break;
+ }
+ }
+ if (!found)
+ /* Add to front if shortest deadline or list empty */
+ list_add(&req->rq_timed_list, &svc->srv_at_list);
+
+ /* Check if we're the head of the list */
+ found = (svc->srv_at_list.next == &req->rq_timed_list);
+
+ spin_unlock(&svc->srv_at_lock);
+
+ if (found)
+ ptlrpc_at_set_timer(svc);
+
+ return 0;
+}
+
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
+ int extra_time)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+ struct ptlrpc_request *reqcopy;
+ struct lustre_msg *reqmsg;
+ cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
+ time_t newdl;
+ int rc;
+ ENTRY;
+
+ /* deadline is when the client expects us to reply, margin is the
+ difference between clients' and servers' expectations */
+ DEBUG_REQ(D_ADAPTTO, req,
+ "%ssending early reply (deadline %+lds, margin %+lds) for "
+ "%d+%d", AT_OFF ? "AT off - not " : "",
+ olddl, olddl - at_get(&svc->srv_at_estimate),
+ at_get(&svc->srv_at_estimate), extra_time);
+
+ if (AT_OFF)
+ RETURN(0);
+
+ if (olddl < 0) {
+ DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
+ "not sending early reply. Consider increasing "
+ "at_early_margin (%d)?", olddl, at_early_margin);
+
+ /* Return an error so we're not re-added to the timed list. */
+ RETURN(-ETIMEDOUT);
+ }
+
+ if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){
+ DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, "
+ "but no AT support");
+ RETURN(-ENOSYS);
+ }
+
+ if (req->rq_export && req->rq_export->exp_in_recovery) {
+ /* don't increase server estimates during recovery, and give
+ clients the full recovery time. */
+ newdl = cfs_time_current_sec() +
+ req->rq_export->exp_obd->obd_recovery_timeout;
+ } else {
+ if (extra_time) {
+ /* Fake our processing time into the future to ask the
+ clients for some extra amount of time */
+ extra_time += cfs_time_current_sec() -
+ req->rq_arrival_time.tv_sec;
+ at_add(&svc->srv_at_estimate, extra_time);
+ }
+ newdl = req->rq_arrival_time.tv_sec +
+ at_get(&svc->srv_at_estimate);
+ }
+ if (req->rq_deadline >= newdl) {
+ /* We're not adding any time, no need to send an early reply
+ (e.g. maybe at adaptive_max) */
+ DEBUG_REQ(D_WARNING, req, "Couldn't add any time ("
+ CFS_DURATION_T"/"CFS_DURATION_T"), "
+ "not sending early reply\n", olddl,
+ cfs_time_sub(newdl, cfs_time_current_sec()));
+ RETURN(-ETIMEDOUT);
+ }
+
+ OBD_ALLOC(reqcopy, sizeof *reqcopy);
+ if (reqcopy == NULL)
+ RETURN(-ENOMEM);
+ OBD_ALLOC(reqmsg, req->rq_reqlen);
+ if (!reqmsg) {
+ OBD_FREE(reqcopy, sizeof *reqcopy);
+ RETURN(-ENOMEM);
+ }
+
+ *reqcopy = *req;
+ reqcopy->rq_reply_state = NULL;
+ reqcopy->rq_rep_swab_mask = 0;
+ reqcopy->rq_pack_bulk = 0;
+ reqcopy->rq_pack_udesc = 0;
+ reqcopy->rq_packed_final = 0;
+ sptlrpc_svc_ctx_addref(reqcopy);
+ /* We only need the reqmsg for the magic */
+ reqcopy->rq_reqmsg = reqmsg;
+ memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
+
+ if (req->rq_sent_final) {
+ DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
+ "abort sending early reply\n");
+ GOTO(out, rc = 0);
+ }
+
+ /* Connection ref */
+ reqcopy->rq_export = class_conn2export(
+ lustre_msg_get_handle(reqcopy->rq_reqmsg));
+ if (reqcopy->rq_export == NULL)
+ GOTO(out, rc = -ENODEV);
+
+ /* RPC ref */
+ class_export_rpc_get(reqcopy->rq_export);
+ if (reqcopy->rq_export->exp_obd &&
+ reqcopy->rq_export->exp_obd->obd_fail)
+ GOTO(out_put, rc = -ENODEV);
+
+ rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
+ if (rc)
+ GOTO(out_put, rc);
+
+ rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
+
+ if (!rc) {
+ /* Adjust our own deadline to what we told the client */
+ req->rq_deadline = newdl;
+ req->rq_early_count++; /* number sent, server side */
+ } else {
+ DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
+ }
+
+ /* Free the (early) reply state from lustre_pack_reply.
+ (ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
+ ptlrpc_req_drop_rs(reqcopy);
+
+out_put:
+ class_export_rpc_put(reqcopy->rq_export);
+ class_export_put(reqcopy->rq_export);
+out:
+ sptlrpc_svc_ctx_decref(reqcopy);
+ OBD_FREE(reqmsg, req->rq_reqlen);
+ OBD_FREE(reqcopy, sizeof *reqcopy);
+ RETURN(rc);
+}
+
+/* Send early replies to everybody expiring within at_early_margin
+ asking for at_extra time */
+static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_request *rq, *n;
+ struct list_head work_list;
+ time_t now = cfs_time_current_sec();
+ cfs_duration_t delay;
+ int first, counter = 0;
+ ENTRY;
+
+ spin_lock(&svc->srv_at_lock);
+ if (svc->srv_at_check == 0) {
+ spin_unlock(&svc->srv_at_lock);
+ RETURN(0);
+ }
+ delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
+ svc->srv_at_check = 0;
+
+ if (list_empty(&svc->srv_at_list)) {
+ spin_unlock(&svc->srv_at_lock);
+ RETURN(0);
+ }
+
+ /* The timer went off, but maybe the nearest rpc already completed. */
+ rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
+ rq_timed_list);
+ first = (int)(rq->rq_deadline - now);
+ if (first > at_early_margin) {
+ /* We've still got plenty of time. Reset the timer. */
+ spin_unlock(&svc->srv_at_lock);
+ ptlrpc_at_set_timer(svc);
+ RETURN(0);
+ }
+
+ /* We're close to a timeout, and we don't know how much longer the
+ server will take. Send early replies to everyone expiring soon. */
+ CFS_INIT_LIST_HEAD(&work_list);
+ list_for_each_entry_safe(rq, n, &svc->srv_at_list, rq_timed_list) {
+ if (rq->rq_deadline <= now + at_early_margin) {
+ list_move_tail(&rq->rq_timed_list, &work_list);
+ counter++;
+ } else {
+ break;
+ }
+ }
+
+ spin_unlock(&svc->srv_at_lock);
+
+ /* we have a new earliest deadline, restart the timer */
+ ptlrpc_at_set_timer(svc);
+
+ CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
+ "replies\n", first, at_extra, counter);
+ if (first < 0) {
+ /* We're already past request deadlines before we even get a
+ chance to send early replies */
+ LCONSOLE_WARN("%s: This server is not able to keep up with "
+ "request traffic (cpu-bound).\n", svc->srv_name);
+ CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, "
+ "delay="CFS_DURATION_T"(jiff)\n",
+ counter, svc->srv_n_queued_reqs, svc->srv_n_active_reqs,
+ at_get(&svc->srv_at_estimate), delay);
+ }
+
+ /* ptlrpc_server_finish_request may delete an entry out of
+ * the work list */
+ spin_lock(&svc->srv_at_lock);
+ while (!list_empty(&work_list)) {
+ rq = list_entry(work_list.next, struct ptlrpc_request,
+ rq_timed_list);
+ list_del_init(&rq->rq_timed_list);
+ /* if the entry is still in the worklist, it hasn't been
+ deleted, and is safe to take a ref to keep the req around */
+ atomic_inc(&rq->rq_refcount);
+ spin_unlock(&svc->srv_at_lock);
+
+ if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
+ ptlrpc_at_add_timed(rq);
+
+ ptlrpc_server_drop_request(rq);
+ spin_lock(&svc->srv_at_lock);
+ }
+ spin_unlock(&svc->srv_at_lock);
+
+ RETURN(0);
+}
+
+/* Handle freshly incoming reqs, add to timed early reply list,
+ pass on to regular request queue */
+static int
+ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_request *req;
+ __u32 deadline;
+ int rc;
+ ENTRY;
+
+ LASSERT(svc);
+
+ spin_lock(&svc->srv_lock);
+ if (list_empty(&svc->srv_req_in_queue)) {
+ spin_unlock(&svc->srv_lock);
+ RETURN(0);
+ }
+
+ req = list_entry(svc->srv_req_in_queue.next,
+ struct ptlrpc_request, rq_list);
+ list_del_init (&req->rq_list);
+ /* Consider this still a "queued" request as far as stats are
+ concerned */
+ spin_unlock(&svc->srv_lock);
+
+ /* go through security check/transform */
+ rc = sptlrpc_svc_unwrap_request(req);
+ switch (rc) {
+ case SECSVC_OK:
+ break;
+ case SECSVC_COMPLETE:
+ target_send_reply(req, 0, OBD_FAIL_MDS_ALL_REPLY_NET);
+ goto err_req;
+ case SECSVC_DROP:
+ goto err_req;
+ default:
+ LBUG();
+ }
+
+ /* Clear request swab mask; this is a new request */
+ req->rq_req_swab_mask = 0;
+
+ rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+ if (rc != 0) {
+ CERROR("error unpacking request: ptl %d from %s x"LPU64"\n",
+ svc->srv_req_portal, libcfs_id2str(req->rq_peer),
+ req->rq_xid);
+ goto err_req;
+ }
+
+ rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
+ if (rc) {
+ CERROR ("error unpacking ptlrpc body: ptl %d from %s x"
+ LPU64"\n", svc->srv_req_portal,
+ libcfs_id2str(req->rq_peer), req->rq_xid);
+ goto err_req;
+ }
+
+ rc = -EINVAL;
+ if (lustre_msg_get_type(req->rq_reqmsg) != PTL_RPC_MSG_REQUEST) {
+ CERROR("wrong packet type received (type=%u) from %s\n",
+ lustre_msg_get_type(req->rq_reqmsg),
+ libcfs_id2str(req->rq_peer));
+ goto err_req;
+ }
+
+ CDEBUG(D_NET, "got req "LPD64"\n", req->rq_xid);
+
+ req->rq_export = class_conn2export(
+ lustre_msg_get_handle(req->rq_reqmsg));
+ if (req->rq_export) {
+ rc = ptlrpc_check_req(req);
+ if (rc == 0) {
+ rc = sptlrpc_target_export_check(req->rq_export, req);
+ if (rc)
+ DEBUG_REQ(D_ERROR, req, "DROPPING req with "
+ "illegal security flavor,");
+ }
+
+ class_export_put(req->rq_export);
+ req->rq_export = NULL;
+ if (rc)
+ goto err_req;
+ }
+
+ /* req_in handling should/must be fast */
+ if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
+ DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s",
+ cfs_time_sub(cfs_time_current_sec(),
+ req->rq_arrival_time.tv_sec));
+
+ /* Set rpc server deadline and add it to the timed list */
+ deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
+ MSGHDR_AT_SUPPORT) ?
+ /* The max time the client expects us to take */
+ lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
+ req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
+ if (unlikely(deadline == 0)) {
+ DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
+ goto err_req;
+ }
+
+ ptlrpc_at_add_timed(req);
+
+ /* Move it over to the request processing queue */
+ spin_lock(&svc->srv_lock);
+ list_add_tail(&req->rq_list, &svc->srv_request_queue);
+ cfs_waitq_signal(&svc->srv_waitq);
+ spin_unlock(&svc->srv_lock);
+ RETURN(1);
+
+err_req:
+ spin_lock(&svc->srv_lock);
+ svc->srv_n_queued_reqs--;
+ svc->srv_n_active_reqs++;
+ spin_unlock(&svc->srv_lock);
+ ptlrpc_server_finish_request(req);
+
+ RETURN(1);
+}
+
+static int
+ptlrpc_server_handle_request(struct ptlrpc_service *svc,
+ struct ptlrpc_thread *thread)