1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #define DEBUG_SUBSYSTEM S_LDLM
26 # include <linux/module.h>
28 # include <liblustre.h>
30 #include <linux/obd_ost.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_net.h>
35 int client_import_connect(struct lustre_handle *dlm_handle,
36 struct obd_device *obd,
37 struct obd_uuid *cluuid)
39 struct client_obd *cli = &obd->u.cli;
40 struct obd_import *imp = cli->cl_import;
41 struct obd_export *exp;
42 struct ptlrpc_request *request;
43 /* XXX maybe this is a good time to create a connect struct? */
44 int rc, size[] = {sizeof(imp->imp_target_uuid),
45 sizeof(obd->obd_uuid),
47 char *tmp[] = {imp->imp_target_uuid.uuid,
54 rc = class_connect(dlm_handle, obd, cluuid);
59 if (cli->cl_conn_count > 1)
62 if (obd->obd_namespace != NULL)
63 CERROR("already have namespace!\n");
64 obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
65 LDLM_NAMESPACE_CLIENT);
66 if (obd->obd_namespace == NULL)
67 GOTO(out_disco, rc = -ENOMEM);
69 request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
71 GOTO(out_ldlm, rc = -ENOMEM);
73 request->rq_level = LUSTRE_CONN_NEW;
74 request->rq_replen = lustre_msg_size(0, NULL);
76 lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_PEER);
78 imp->imp_dlm_handle = *dlm_handle;
80 imp->imp_level = LUSTRE_CONN_CON;
81 rc = ptlrpc_queue_wait(request);
83 class_disconnect(dlm_handle, 0);
87 exp = class_conn2export(dlm_handle);
88 exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
89 class_export_put(exp);
91 msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
92 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
93 imp->imp_replayable = 1;
94 CDEBUG(D_HA, "connected to replayable target: %s\n",
95 imp->imp_target_uuid.uuid);
96 ptlrpc_pinger_add_import(imp);
98 imp->imp_level = LUSTRE_CONN_FULL;
99 imp->imp_remote_handle = request->rq_repmsg->handle;
100 CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
101 imp->imp_remote_handle.cookie);
105 ptlrpc_req_finished(request);
108 ldlm_namespace_free(obd->obd_namespace);
109 obd->obd_namespace = NULL;
111 cli->cl_conn_count--;
112 class_disconnect(dlm_handle, 0);
119 int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
121 struct obd_device *obd = class_conn2obd(dlm_handle);
122 struct client_obd *cli = &obd->u.cli;
123 struct obd_import *imp = cli->cl_import;
124 struct ptlrpc_request *request = NULL;
125 int rc = 0, err, rq_opc;
129 CERROR("invalid connection for disconnect: cookie "LPX64"\n",
130 dlm_handle ? dlm_handle->cookie : -1UL);
134 switch (imp->imp_connect_op) {
135 case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
136 case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
137 case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
139 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
140 imp->imp_target_uuid.uuid, imp->imp_connect_op);
145 if (!cli->cl_conn_count) {
146 CERROR("disconnecting disconnected device (%s)\n",
148 GOTO(out_sem, rc = -EINVAL);
151 cli->cl_conn_count--;
152 if (cli->cl_conn_count)
153 GOTO(out_no_disconnect, rc = 0);
155 if (obd->obd_namespace != NULL) {
156 /* obd_no_recov == local only */
157 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
158 obd->obd_no_recov, NULL);
159 ldlm_namespace_free(obd->obd_namespace);
160 obd->obd_namespace = NULL;
163 /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
164 if (obd->obd_no_recov) {
165 ptlrpc_set_import_active(imp, 0);
167 request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
169 GOTO(out_req, rc = -ENOMEM);
171 request->rq_replen = lustre_msg_size(0, NULL);
173 rc = ptlrpc_queue_wait(request);
177 if (imp->imp_replayable)
178 ptlrpc_pinger_del_import(imp);
183 ptlrpc_req_finished(request);
185 err = class_disconnect(dlm_handle, 0);
193 /* --------------------------------------------------------------------------
194 * from old lib/target.c
195 * -------------------------------------------------------------------------- */
197 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
198 struct obd_uuid *cluuid)
200 if (exp->exp_connection) {
201 struct lustre_handle *hdl;
202 hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle;
203 /* Might be a re-connect after a partition. */
204 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
205 CERROR("%s reconnecting\n", cluuid->uuid);
206 conn->cookie = exp->exp_handle.h_cookie;
209 CERROR("%s reconnecting from %s, "
210 "handle mismatch (ours "LPX64", theirs "
211 LPX64")\n", cluuid->uuid,
212 exp->exp_connection->c_remote_uuid.uuid,
213 hdl->cookie, conn->cookie);
214 /* XXX disconnect them here? */
215 memset(conn, 0, sizeof *conn);
216 /* This is a little scary, but right now we build this
217 * file separately into each server module, so I won't
218 * go _immediately_ to hell.
224 conn->cookie = exp->exp_handle.h_cookie;
225 CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
227 CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
231 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
233 struct obd_device *target;
234 struct obd_export *export = NULL;
235 struct obd_import *dlmimp;
236 struct lustre_handle conn;
237 struct obd_uuid tgtuuid;
238 struct obd_uuid cluuid;
239 struct obd_uuid remote_uuid;
242 int rc = 0, abort_recovery;
245 LASSERT_REQSWAB (req, 0);
246 str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1);
248 CERROR("bad target UUID for connect\n");
249 GOTO(out, rc = -EINVAL);
252 obd_str2uuid (&tgtuuid, str);
253 target = class_uuid2obd(&tgtuuid);
254 if (!target || target->obd_stopping || !target->obd_set_up) {
255 CERROR("UUID '%s' is not available for connect\n", str);
256 GOTO(out, rc = -ENODEV);
259 LASSERT_REQSWAB (req, 1);
260 str = lustre_msg_string(req->rq_reqmsg, 1, sizeof(cluuid) - 1);
262 CERROR("bad client UUID for connect\n");
263 GOTO(out, rc = -EINVAL);
266 obd_str2uuid (&cluuid, str);
268 /* XXX extract a nettype and format accordingly */
269 snprintf(remote_uuid.uuid, sizeof remote_uuid,
270 "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
272 spin_lock_bh(&target->obd_processing_task_lock);
273 abort_recovery = target->obd_abort_recovery;
274 spin_unlock_bh(&target->obd_processing_task_lock);
276 target_abort_recovery(target);
278 tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
280 GOTO(out, rc = -EPROTO);
282 memcpy(&conn, tmp, sizeof conn);
284 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
288 /* lctl gets a backstage, all-access pass. */
289 if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
290 goto dont_check_exports;
292 spin_lock(&target->obd_dev_lock);
293 list_for_each(p, &target->obd_exports) {
294 export = list_entry(p, struct obd_export, exp_obd_chain);
295 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
296 spin_unlock(&target->obd_dev_lock);
297 LASSERT(export->exp_obd == target);
299 rc = target_handle_reconnect(&conn, export, &cluuid);
304 /* If we found an export, we already unlocked. */
306 spin_unlock(&target->obd_dev_lock);
308 /* Tell the client if we're in recovery. */
309 /* If this is the first client, start the recovery timer */
310 if (target->obd_recovering) {
311 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
312 target_start_recovery_timer(target, handler);
315 /* Tell the client if we support replayable requests */
316 if (target->obd_replayable)
317 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
319 if (export == NULL) {
320 if (target->obd_recovering) {
321 CERROR("denying connection for new client %s: "
322 "in recovery\n", cluuid.uuid);
326 rc = obd_connect(&conn, target, &cluuid);
330 /* If all else goes well, this is our RPC return code. */
333 if (rc && rc != EALREADY)
336 req->rq_repmsg->handle = conn;
338 /* If the client and the server are the same node, we will already
339 * have an export that really points to the client's DLM export,
340 * because we have a shared handles table.
342 * XXX this will go away when shaver stops sending the "connect" handle
343 * in the real "remote handle" field of the request --phik 24 Apr 2003
345 if (req->rq_export != NULL)
346 class_export_put(req->rq_export);
348 /* ownership of this export ref transfers to the request */
349 export = req->rq_export = class_conn2export(&conn);
350 LASSERT(export != NULL);
352 if (req->rq_connection != NULL)
353 ptlrpc_put_connection(req->rq_connection);
354 if (export->exp_connection != NULL)
355 ptlrpc_put_connection(export->exp_connection);
356 export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
358 req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
360 if (rc == EALREADY) {
361 /* We indicate the reconnection in a flag, not an error code. */
362 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
366 memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
369 if (export->exp_ldlm_data.led_import != NULL)
370 class_destroy_import(export->exp_ldlm_data.led_import);
371 dlmimp = export->exp_ldlm_data.led_import = class_new_import();
372 dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
373 dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
374 dlmimp->imp_remote_handle = conn;
375 dlmimp->imp_obd = target;
376 dlmimp->imp_dlm_fake = 1;
377 dlmimp->imp_level = LUSTRE_CONN_FULL;
378 class_import_put(dlmimp);
385 int target_handle_disconnect(struct ptlrpc_request *req)
387 struct lustre_handle *conn = &req->rq_reqmsg->handle;
388 struct obd_import *dlmimp;
392 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
396 req->rq_status = obd_disconnect(conn, 0);
398 dlmimp = req->rq_export->exp_ldlm_data.led_import;
399 class_destroy_import(dlmimp);
401 class_export_put(req->rq_export);
402 req->rq_export = NULL;
410 void target_cancel_recovery_timer(struct obd_device *obd)
412 del_timer(&obd->obd_recovery_timer);
415 static void abort_delayed_replies(struct obd_device *obd)
417 struct ptlrpc_request *req;
418 struct list_head *tmp, *n;
419 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
420 req = list_entry(tmp, struct ptlrpc_request, rq_list);
421 DEBUG_REQ(D_ERROR, req, "aborted:");
422 req->rq_status = -ENOTCONN;
423 req->rq_type = PTL_RPC_MSG_ERR;
425 list_del(&req->rq_list);
426 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
427 OBD_FREE(req, sizeof *req);
431 static void abort_recovery_queue(struct obd_device *obd)
433 struct ptlrpc_request *req;
434 struct list_head *tmp, *n;
437 list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
438 req = list_entry(tmp, struct ptlrpc_request, rq_list);
439 DEBUG_REQ(D_ERROR, req, "aborted:");
440 req->rq_status = -ENOTCONN;
441 req->rq_type = PTL_RPC_MSG_ERR;
442 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
447 DEBUG_REQ(D_ERROR, req,
448 "packing failed for abort-reply; skipping");
450 list_del(&req->rq_list);
451 class_export_put(req->rq_export);
452 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
453 OBD_FREE(req, sizeof *req);
457 void target_abort_recovery(void *data)
459 struct obd_device *obd = data;
461 CERROR("disconnecting clients and aborting recovery\n");
462 spin_lock_bh(&obd->obd_processing_task_lock);
463 if (!obd->obd_recovering) {
464 spin_unlock_bh(&obd->obd_processing_task_lock);
469 obd->obd_recovering = obd->obd_abort_recovery = 0;
470 obd->obd_recoverable_clients = 0;
471 wake_up(&obd->obd_next_transno_waitq);
472 target_cancel_recovery_timer(obd);
473 spin_unlock_bh(&obd->obd_processing_task_lock);
474 class_disconnect_exports(obd, 0);
475 abort_delayed_replies(obd);
476 abort_recovery_queue(obd);
477 ptlrpc_run_recovery_over_upcall(obd);
480 static void target_recovery_expired(unsigned long castmeharder)
482 struct obd_device *obd = (struct obd_device *)castmeharder;
483 CERROR("recovery timed out, aborting\n");
484 spin_lock_bh(&obd->obd_processing_task_lock);
485 obd->obd_abort_recovery = 1;
486 wake_up(&obd->obd_next_transno_waitq);
487 spin_unlock_bh(&obd->obd_processing_task_lock);
490 static void reset_recovery_timer(struct obd_device *obd)
493 spin_lock(&obd->obd_dev_lock);
494 recovering = obd->obd_recovering;
495 spin_unlock(&obd->obd_dev_lock);
499 CERROR("timer will expire in %ld seconds\n", OBD_RECOVERY_TIMEOUT / HZ);
500 mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
504 /* Only start it the first time called */
505 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
507 spin_lock_bh(&obd->obd_processing_task_lock);
508 if (obd->obd_recovery_handler) {
509 spin_unlock_bh(&obd->obd_processing_task_lock);
512 CERROR("%s: starting recovery timer\n", obd->obd_name);
513 obd->obd_recovery_handler = handler;
514 obd->obd_recovery_timer.function = target_recovery_expired;
515 obd->obd_recovery_timer.data = (unsigned long)obd;
516 init_timer(&obd->obd_recovery_timer);
517 spin_unlock_bh(&obd->obd_processing_task_lock);
519 reset_recovery_timer(obd);
522 static int check_for_next_transno(struct obd_device *obd)
524 struct ptlrpc_request *req;
527 req = list_entry(obd->obd_recovery_queue.next,
528 struct ptlrpc_request, rq_list);
529 LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
531 wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
532 (obd->obd_recovering) == 0;
533 CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
534 req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
535 obd->obd_recovering, wake_up);
539 static void process_recovery_queue(struct obd_device *obd)
541 struct ptlrpc_request *req;
542 int abort_recovery = 0;
543 struct l_wait_info lwi = { 0 };
547 spin_lock_bh(&obd->obd_processing_task_lock);
548 LASSERT(obd->obd_processing_task == current->pid);
549 req = list_entry(obd->obd_recovery_queue.next,
550 struct ptlrpc_request, rq_list);
552 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
553 spin_unlock_bh(&obd->obd_processing_task_lock);
554 CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
556 obd->obd_next_recovery_transno,
557 req->rq_reqmsg->transno);
558 l_wait_event(obd->obd_next_transno_waitq,
559 check_for_next_transno(obd), &lwi);
560 spin_lock_bh(&obd->obd_processing_task_lock);
561 abort_recovery = obd->obd_abort_recovery;
562 spin_unlock_bh(&obd->obd_processing_task_lock);
563 if (abort_recovery) {
564 target_abort_recovery(obd);
569 list_del_init(&req->rq_list);
570 spin_unlock_bh(&obd->obd_processing_task_lock);
572 DEBUG_REQ(D_ERROR, req, "processing: ");
573 (void)obd->obd_recovery_handler(req);
574 reset_recovery_timer(obd);
575 /* bug 1580: decide how to properly sync() in recovery */
576 //mds_fsync_super(mds->mds_sb);
577 class_export_put(req->rq_export);
578 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
579 OBD_FREE(req, sizeof *req);
580 spin_lock_bh(&obd->obd_processing_task_lock);
581 obd->obd_next_recovery_transno++;
582 if (list_empty(&obd->obd_recovery_queue)) {
583 obd->obd_processing_task = 0;
584 spin_unlock_bh(&obd->obd_processing_task_lock);
587 spin_unlock_bh(&obd->obd_processing_task_lock);
592 int target_queue_recovery_request(struct ptlrpc_request *req,
593 struct obd_device *obd)
595 struct list_head *tmp;
597 __u64 transno = req->rq_reqmsg->transno;
598 struct ptlrpc_request *saved_req;
599 struct lustre_msg *reqmsg;
601 /* CAVEAT EMPTOR: The incoming request message has been swabbed
602 * (i.e. buflens etc are in my own byte order), but type-dependent
603 * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
606 INIT_LIST_HEAD(&req->rq_list);
607 DEBUG_REQ(D_HA, req, "not queueing");
611 /* XXX If I were a real man, these LBUGs would be sane cleanups. */
612 /* XXX just like the request-dup code in queue_final_reply */
613 OBD_ALLOC(saved_req, sizeof *saved_req);
616 OBD_ALLOC(reqmsg, req->rq_reqlen);
620 spin_lock_bh(&obd->obd_processing_task_lock);
622 /* If we're processing the queue, we want don't want to queue this
625 * Also, if this request has a transno less than the one we're waiting
626 * for, we should process it now. It could (and currently always will)
627 * be an open request for a descriptor that was opened some time ago.
629 if (obd->obd_processing_task == current->pid ||
630 transno < obd->obd_next_recovery_transno) {
631 /* Processing the queue right now, don't re-add. */
632 LASSERT(list_empty(&req->rq_list));
633 spin_unlock_bh(&obd->obd_processing_task_lock);
634 OBD_FREE(reqmsg, req->rq_reqlen);
635 OBD_FREE(saved_req, sizeof *saved_req);
639 memcpy(saved_req, req, sizeof *req);
640 memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
642 req->rq_reqmsg = reqmsg;
643 class_export_get(req->rq_export);
644 INIT_LIST_HEAD(&req->rq_list);
647 list_for_each(tmp, &obd->obd_recovery_queue) {
648 struct ptlrpc_request *reqiter =
649 list_entry(tmp, struct ptlrpc_request, rq_list);
651 if (reqiter->rq_reqmsg->transno > transno) {
652 list_add_tail(&req->rq_list, &reqiter->rq_list);
659 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
662 if (obd->obd_processing_task != 0) {
663 /* Someone else is processing this queue, we'll leave it to
666 if (transno == obd->obd_next_recovery_transno)
667 wake_up(&obd->obd_next_transno_waitq);
668 spin_unlock_bh(&obd->obd_processing_task_lock);
672 /* Nobody is processing, and we know there's (at least) one to process
673 * now, so we'll do the honours.
675 obd->obd_processing_task = current->pid;
676 spin_unlock_bh(&obd->obd_processing_task_lock);
678 process_recovery_queue(obd);
682 struct obd_device * target_req2obd(struct ptlrpc_request *req)
684 return req->rq_export->exp_obd;
687 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
689 struct obd_device *obd = target_req2obd(req);
690 struct ptlrpc_request *saved_req;
691 struct lustre_msg *reqmsg;
692 int recovery_done = 0;
695 /* Just like ptlrpc_error, but without the sending. */
696 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
698 req->rq_type = PTL_RPC_MSG_ERR;
701 LASSERT(list_empty(&req->rq_list));
702 /* XXX just like the request-dup code in queue_recovery_request */
703 OBD_ALLOC(saved_req, sizeof *saved_req);
706 OBD_ALLOC(reqmsg, req->rq_reqlen);
709 memcpy(saved_req, req, sizeof *saved_req);
710 memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
712 req->rq_reqmsg = reqmsg;
713 list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
715 spin_lock_bh(&obd->obd_processing_task_lock);
716 --obd->obd_recoverable_clients;
717 recovery_done = (obd->obd_recoverable_clients == 0);
718 spin_unlock_bh(&obd->obd_processing_task_lock);
721 struct list_head *tmp, *n;
722 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
723 CERROR("%s: all clients recovered, sending delayed replies\n",
725 obd->obd_recovering = 0;
726 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
727 req = list_entry(tmp, struct ptlrpc_request, rq_list);
728 DEBUG_REQ(D_ERROR, req, "delayed:");
730 list_del(&req->rq_list);
731 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
732 OBD_FREE(req, sizeof *req);
734 target_cancel_recovery_timer(obd);
736 CERROR("%s: %d recoverable clients remain\n",
737 obd->obd_name, obd->obd_recoverable_clients);
743 static void ptlrpc_abort_reply (struct ptlrpc_request *req)
745 /* On return, we must be sure that the ACK callback has either
746 * happened or will not happen. Note that the SENT callback will
747 * happen come what may since we successfully posted the PUT. */
749 struct l_wait_info lwi;
753 /* serialise with ACK callback */
754 spin_lock_irqsave (&req->rq_lock, flags);
755 if (!req->rq_want_ack) {
756 spin_unlock_irqrestore (&req->rq_lock, flags);
757 /* The ACK callback has happened already. Although the
758 * SENT callback might still be outstanding (yes really) we
759 * don't care; this is just like normal completion. */
762 spin_unlock_irqrestore (&req->rq_lock, flags);
764 /* Have a bash at unlinking the MD. This will fail until the SENT
765 * callback has happened since the MD is busy from the PUT. If the
766 * ACK still hasn't arrived after then, a successful unlink will
767 * ensure the ACK callback never happens. */
768 rc = PtlMDUnlink (req->rq_reply_md_h);
773 /* SENT callback happened; ACK callback preempted */
774 LASSERT (req->rq_want_ack);
775 spin_lock_irqsave (&req->rq_lock, flags);
776 req->rq_want_ack = 0;
777 spin_unlock_irqrestore (&req->rq_lock, flags);
782 /* Still sending or ACK callback in progress: wait until
783 * either callback has completed and try again.
784 * Actually we can't wait for the SENT callback because
785 * there's no state the SENT callback can touch that will
786 * allow it to communicate with us! So we just wait here
787 * for a short time, effectively polling for the SENT
788 * callback by calling PtlMDUnlink() again, to see if it
789 * has finished. Note that if the ACK does arrive, its
790 * callback wakes us in short order. --eeb */
791 lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
792 rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack,
794 CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
795 /* NB go back and test rq_want_ack with locking, to ensure
796 * if ACK callback happened, it has completed stopped
797 * referencing this req. */
802 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
807 struct ptlrpc_req_ack_lock *ack_lock;
808 struct l_wait_info lwi = { 0 };
809 wait_queue_t commit_wait;
810 struct obd_device *obd =
811 req->rq_export ? req->rq_export->exp_obd : NULL;
812 struct obd_export *exp =
813 (req->rq_export && req->rq_ack_locks[0].mode) ?
814 req->rq_export : NULL;
817 exp->exp_outstanding_reply = req;
818 spin_lock_irqsave (&req->rq_lock, flags);
819 req->rq_want_ack = 1;
820 spin_unlock_irqrestore (&req->rq_lock, flags);
823 if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
825 DEBUG_REQ(D_NET, req, "sending reply");
826 netrc = ptlrpc_reply(req);
827 } else if (rc == -ENOTCONN) {
828 DEBUG_REQ(D_HA, req, "processing error (%d)", rc);
829 netrc = ptlrpc_error(req);
831 DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
832 netrc = ptlrpc_error(req);
835 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
836 DEBUG_REQ(D_ERROR, req, "dropping reply");
837 if (!exp && req->rq_repmsg) {
838 OBD_FREE(req->rq_repmsg, req->rq_replen);
839 req->rq_repmsg = NULL;
841 init_waitqueue_head(&req->rq_wait_for_rep);
845 /* a failed send simulates the callbacks */
846 LASSERT(netrc == 0 || req->rq_want_ack == 0);
848 LASSERT(req->rq_want_ack == 0);
851 LASSERT(obd != NULL);
853 init_waitqueue_entry(&commit_wait, current);
854 add_wait_queue(&obd->obd_commit_waitq, &commit_wait);
855 rc = l_wait_event(req->rq_wait_for_rep,
856 !req->rq_want_ack || req->rq_resent ||
857 req->rq_transno <= obd->obd_last_committed, &lwi);
858 remove_wait_queue(&obd->obd_commit_waitq, &commit_wait);
860 spin_lock_irqsave (&req->rq_lock, flags);
861 /* If we got here because the ACK callback ran, this acts as a
862 * barrier to ensure the callback completed the wakeup. */
863 spin_unlock_irqrestore (&req->rq_lock, flags);
865 /* If we committed the transno already, then we might wake up before
866 * the ack arrives. We need to stop waiting for the ack before we can
867 * reuse this request structure. We are guaranteed by this point that
868 * this cannot abort the sending of the actual reply.*/
869 ptlrpc_abort_reply(req);
871 if (req->rq_resent) {
872 DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
877 DEBUG_REQ(D_HA, req, "cancelling locks for %s",
878 req->rq_want_ack ? "commit" : "ack");
880 exp->exp_outstanding_reply = NULL;
882 for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
885 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
889 int target_handle_ping(struct ptlrpc_request *req)
891 return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);