1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #define DEBUG_SUBSYSTEM S_LDLM
26 # include <linux/module.h>
28 # include <liblustre.h>
30 #include <linux/obd_ost.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_net.h>
35 int client_import_connect(struct lustre_handle *dlm_handle,
36 struct obd_device *obd,
37 struct obd_uuid *cluuid)
39 struct client_obd *cli = &obd->u.cli;
40 struct obd_import *imp = cli->cl_import;
41 struct obd_export *exp;
42 struct ptlrpc_request *request;
43 /* XXX maybe this is a good time to create a connect struct? */
44 int rc, size[] = {sizeof(imp->imp_target_uuid),
45 sizeof(obd->obd_uuid),
47 char *tmp[] = {imp->imp_target_uuid.uuid,
50 int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT;
55 rc = class_connect(dlm_handle, obd, cluuid);
60 if (cli->cl_conn_count > 1)
63 if (obd->obd_namespace != NULL)
64 CERROR("already have namespace!\n");
65 obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
66 LDLM_NAMESPACE_CLIENT);
67 if (obd->obd_namespace == NULL)
68 GOTO(out_disco, rc = -ENOMEM);
70 request = ptlrpc_prep_req(imp, rq_opc, 3, size, tmp);
72 GOTO(out_ldlm, rc = -ENOMEM);
74 request->rq_level = LUSTRE_CONN_NEW;
75 request->rq_replen = lustre_msg_size(0, NULL);
77 imp->imp_dlm_handle = *dlm_handle;
79 imp->imp_level = LUSTRE_CONN_CON;
80 rc = ptlrpc_queue_wait(request);
82 class_disconnect(dlm_handle, 0);
86 exp = class_conn2export(dlm_handle);
87 exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
88 class_export_put(exp);
90 msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
91 if (rq_opc == MDS_CONNECT || msg_flags & MSG_CONNECT_REPLAYABLE) {
92 imp->imp_replayable = 1;
93 CDEBUG(D_HA, "connected to replayable target: %s\n",
94 imp->imp_target_uuid.uuid);
95 ptlrpc_pinger_add_import(imp);
97 imp->imp_level = LUSTRE_CONN_FULL;
98 imp->imp_remote_handle = request->rq_repmsg->handle;
99 CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
100 imp->imp_remote_handle.cookie);
104 ptlrpc_req_finished(request);
107 ldlm_namespace_free(obd->obd_namespace);
108 obd->obd_namespace = NULL;
110 cli->cl_conn_count--;
111 class_disconnect(dlm_handle, 0);
118 int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
120 struct obd_device *obd = class_conn2obd(dlm_handle);
121 struct client_obd *cli = &obd->u.cli;
122 struct obd_import *imp = cli->cl_import;
123 struct ptlrpc_request *request = NULL;
124 int rc = 0, err, rq_opc;
128 CERROR("invalid connection for disconnect: cookie "LPX64"\n",
129 dlm_handle ? dlm_handle->cookie : -1UL);
133 rq_opc = obd->obd_type->typ_ops->o_brw ? OST_DISCONNECT:MDS_DISCONNECT;
135 if (!cli->cl_conn_count) {
136 CERROR("disconnecting disconnected device (%s)\n",
138 GOTO(out_sem, rc = -EINVAL);
141 cli->cl_conn_count--;
142 if (cli->cl_conn_count)
143 GOTO(out_no_disconnect, rc = 0);
145 if (obd->obd_namespace != NULL) {
146 /* obd_no_recov == local only */
147 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
148 obd->obd_no_recov, NULL);
149 ldlm_namespace_free(obd->obd_namespace);
150 obd->obd_namespace = NULL;
153 /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
154 if (obd->obd_no_recov) {
155 ptlrpc_set_import_active(imp, 0);
157 request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
159 GOTO(out_req, rc = -ENOMEM);
161 request->rq_replen = lustre_msg_size(0, NULL);
163 rc = ptlrpc_queue_wait(request);
167 if (imp->imp_replayable)
168 ptlrpc_pinger_del_import(imp);
173 ptlrpc_req_finished(request);
175 err = class_disconnect(dlm_handle, 0);
183 /* --------------------------------------------------------------------------
184 * from old lib/target.c
185 * -------------------------------------------------------------------------- */
187 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
188 struct obd_uuid *cluuid)
190 if (exp->exp_connection) {
191 struct lustre_handle *hdl;
192 hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle;
193 /* Might be a re-connect after a partition. */
194 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
195 CERROR("%s reconnecting\n", cluuid->uuid);
196 conn->cookie = exp->exp_handle.h_cookie;
199 CERROR("%s reconnecting from %s, "
200 "handle mismatch (ours "LPX64", theirs "
201 LPX64")\n", cluuid->uuid,
202 exp->exp_connection->c_remote_uuid.uuid,
203 hdl->cookie, conn->cookie);
204 /* XXX disconnect them here? */
205 memset(conn, 0, sizeof *conn);
206 /* This is a little scary, but right now we build this
207 * file separately into each server module, so I won't
208 * go _immediately_ to hell.
214 conn->cookie = exp->exp_handle.h_cookie;
215 CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
217 CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
221 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
223 struct obd_device *target;
224 struct obd_export *export = NULL;
225 struct obd_import *dlmimp;
226 struct lustre_handle conn;
227 struct obd_uuid tgtuuid;
228 struct obd_uuid cluuid;
229 struct obd_uuid remote_uuid;
232 int rc, i, abort_recovery;
235 LASSERT_REQSWAB (req, 0);
236 str = lustre_msg_string (req->rq_reqmsg, 0, sizeof (tgtuuid.uuid) - 1);
238 CERROR("bad target UUID for connect\n");
239 GOTO(out, rc = -EINVAL);
241 obd_str2uuid (&tgtuuid, str);
243 LASSERT_REQSWAB (req, 1);
244 str = lustre_msg_string (req->rq_reqmsg, 1, sizeof (cluuid.uuid) - 1);
246 CERROR("bad client UUID for connect\n");
247 GOTO(out, rc = -EINVAL);
249 obd_str2uuid (&cluuid, str);
251 i = class_uuid2dev(&tgtuuid);
253 CERROR("UUID '%s' not found for connect\n", tgtuuid.uuid);
254 GOTO(out, rc = -ENODEV);
257 target = &obd_dev[i];
258 if (!target || target->obd_stopping || !target->obd_set_up) {
259 CERROR("UUID '%s' is not available for connect\n", str);
260 GOTO(out, rc = -ENODEV);
263 /* XXX extract a nettype and format accordingly */
264 snprintf(remote_uuid.uuid, sizeof remote_uuid,
265 "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
267 spin_lock_bh(&target->obd_processing_task_lock);
268 abort_recovery = target->obd_abort_recovery;
269 spin_unlock_bh(&target->obd_processing_task_lock);
271 target_abort_recovery(target);
273 tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
275 GOTO(out, rc = -EPROTO);
277 memcpy(&conn, tmp, sizeof conn);
279 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
283 /* lctl gets a backstage, all-access pass. */
284 if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
285 goto dont_check_exports;
287 spin_lock(&target->obd_dev_lock);
288 list_for_each(p, &target->obd_exports) {
289 export = list_entry(p, struct obd_export, exp_obd_chain);
290 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
291 spin_unlock(&target->obd_dev_lock);
292 LASSERT(export->exp_obd == target);
294 rc = target_handle_reconnect(&conn, export, &cluuid);
299 /* If we found an export, we already unlocked. */
301 spin_unlock(&target->obd_dev_lock);
303 /* Tell the client if we're in recovery. */
304 /* If this is the first client, start the recovery timer */
305 if (target->obd_recovering) {
306 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
307 target_start_recovery_timer(target, handler);
310 /* Tell the client if we support replayable requests */
311 if (target->obd_replayable)
312 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
314 if (export == NULL) {
315 if (target->obd_recovering) {
316 CERROR("denying connection for new client %s: "
317 "in recovery\n", cluuid.uuid);
321 rc = obd_connect(&conn, target, &cluuid);
325 /* If all else goes well, this is our RPC return code. */
328 if (rc && rc != EALREADY)
331 req->rq_repmsg->handle = conn;
333 /* If the client and the server are the same node, we will already
334 * have an export that really points to the client's DLM export,
335 * because we have a shared handles table.
337 * XXX this will go away when shaver stops sending the "connect" handle
338 * in the real "remote handle" field of the request --phik 24 Apr 2003
340 if (req->rq_export != NULL)
341 class_export_put(req->rq_export);
343 /* ownership of this export ref transfers to the request */
344 export = req->rq_export = class_conn2export(&conn);
345 LASSERT(export != NULL);
347 if (req->rq_connection != NULL)
348 ptlrpc_put_connection(req->rq_connection);
349 if (export->exp_connection != NULL)
350 ptlrpc_put_connection(export->exp_connection);
351 export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
353 req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
355 if (rc == EALREADY) {
356 /* We indicate the reconnection in a flag, not an error code. */
357 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
361 memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
364 if (export->exp_ldlm_data.led_import != NULL)
365 class_destroy_import(export->exp_ldlm_data.led_import);
366 dlmimp = export->exp_ldlm_data.led_import = class_new_import();
367 dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
368 dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
369 dlmimp->imp_remote_handle = conn;
370 dlmimp->imp_obd = target;
371 dlmimp->imp_dlm_fake = 1;
372 dlmimp->imp_level = LUSTRE_CONN_FULL;
373 class_import_put(dlmimp);
380 int target_handle_disconnect(struct ptlrpc_request *req)
382 struct lustre_handle *conn = &req->rq_reqmsg->handle;
383 struct obd_import *dlmimp;
387 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
391 req->rq_status = obd_disconnect(conn, 0);
393 dlmimp = req->rq_export->exp_ldlm_data.led_import;
394 class_destroy_import(dlmimp);
396 class_export_put(req->rq_export);
397 req->rq_export = NULL;
405 void target_cancel_recovery_timer(struct obd_device *obd)
407 del_timer(&obd->obd_recovery_timer);
410 static void abort_delayed_replies(struct obd_device *obd)
412 struct ptlrpc_request *req;
413 struct list_head *tmp, *n;
414 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
415 req = list_entry(tmp, struct ptlrpc_request, rq_list);
416 DEBUG_REQ(D_ERROR, req, "aborted:");
417 req->rq_status = -ENOTCONN;
418 req->rq_type = PTL_RPC_MSG_ERR;
420 list_del(&req->rq_list);
421 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
422 OBD_FREE(req, sizeof *req);
426 static void abort_recovery_queue(struct obd_device *obd)
428 struct ptlrpc_request *req;
429 struct list_head *tmp, *n;
432 list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
433 req = list_entry(tmp, struct ptlrpc_request, rq_list);
434 DEBUG_REQ(D_ERROR, req, "aborted:");
435 req->rq_status = -ENOTCONN;
436 req->rq_type = PTL_RPC_MSG_ERR;
437 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
442 DEBUG_REQ(D_ERROR, req,
443 "packing failed for abort-reply; skipping");
445 list_del(&req->rq_list);
446 class_export_put(req->rq_export);
447 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
448 OBD_FREE(req, sizeof *req);
452 void target_abort_recovery(void *data)
454 struct obd_device *obd = data;
456 CERROR("disconnecting clients and aborting recovery\n");
457 spin_lock_bh(&obd->obd_processing_task_lock);
458 if (!obd->obd_recovering) {
459 spin_unlock_bh(&obd->obd_processing_task_lock);
464 obd->obd_recovering = obd->obd_abort_recovery = 0;
465 obd->obd_recoverable_clients = 0;
466 wake_up(&obd->obd_next_transno_waitq);
467 target_cancel_recovery_timer(obd);
468 spin_unlock_bh(&obd->obd_processing_task_lock);
469 class_disconnect_exports(obd, 0);
470 abort_delayed_replies(obd);
471 abort_recovery_queue(obd);
472 ptlrpc_run_recovery_over_upcall(obd);
475 static void target_recovery_expired(unsigned long castmeharder)
477 struct obd_device *obd = (struct obd_device *)castmeharder;
478 CERROR("recovery timed out, aborting\n");
479 spin_lock_bh(&obd->obd_processing_task_lock);
480 obd->obd_abort_recovery = 1;
481 wake_up(&obd->obd_next_transno_waitq);
482 spin_unlock_bh(&obd->obd_processing_task_lock);
485 static void reset_recovery_timer(struct obd_device *obd)
488 spin_lock(&obd->obd_dev_lock);
489 recovering = obd->obd_recovering;
490 spin_unlock(&obd->obd_dev_lock);
494 CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
495 OBD_RECOVERY_TIMEOUT / HZ);
496 mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
500 /* Only start it the first time called */
501 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
503 spin_lock_bh(&obd->obd_processing_task_lock);
504 if (obd->obd_recovery_handler) {
505 spin_unlock_bh(&obd->obd_processing_task_lock);
508 CERROR("%s: starting recovery timer\n", obd->obd_name);
509 obd->obd_recovery_handler = handler;
510 obd->obd_recovery_timer.function = target_recovery_expired;
511 obd->obd_recovery_timer.data = (unsigned long)obd;
512 init_timer(&obd->obd_recovery_timer);
513 spin_unlock_bh(&obd->obd_processing_task_lock);
515 reset_recovery_timer(obd);
518 static int check_for_next_transno(struct obd_device *obd)
520 struct ptlrpc_request *req;
523 req = list_entry(obd->obd_recovery_queue.next,
524 struct ptlrpc_request, rq_list);
525 LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
527 wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
528 (obd->obd_recovering) == 0;
529 CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
530 req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
531 obd->obd_recovering, wake_up);
535 static void process_recovery_queue(struct obd_device *obd)
537 struct ptlrpc_request *req;
538 int abort_recovery = 0;
539 struct l_wait_info lwi = { 0 };
543 spin_lock_bh(&obd->obd_processing_task_lock);
544 LASSERT(obd->obd_processing_task == current->pid);
545 req = list_entry(obd->obd_recovery_queue.next,
546 struct ptlrpc_request, rq_list);
548 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
549 spin_unlock_bh(&obd->obd_processing_task_lock);
550 CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
552 obd->obd_next_recovery_transno,
553 req->rq_reqmsg->transno);
554 l_wait_event(obd->obd_next_transno_waitq,
555 check_for_next_transno(obd), &lwi);
556 spin_lock_bh(&obd->obd_processing_task_lock);
557 abort_recovery = obd->obd_abort_recovery;
558 spin_unlock_bh(&obd->obd_processing_task_lock);
559 if (abort_recovery) {
560 target_abort_recovery(obd);
565 list_del_init(&req->rq_list);
566 spin_unlock_bh(&obd->obd_processing_task_lock);
568 DEBUG_REQ(D_ERROR, req, "processing: ");
569 (void)obd->obd_recovery_handler(req);
570 reset_recovery_timer(obd);
571 #warning FIXME: mds_fsync_super(mds->mds_sb);
572 class_export_put(req->rq_export);
573 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
574 OBD_FREE(req, sizeof *req);
575 spin_lock_bh(&obd->obd_processing_task_lock);
576 obd->obd_next_recovery_transno++;
577 if (list_empty(&obd->obd_recovery_queue)) {
578 obd->obd_processing_task = 0;
579 spin_unlock_bh(&obd->obd_processing_task_lock);
582 spin_unlock_bh(&obd->obd_processing_task_lock);
587 int target_queue_recovery_request(struct ptlrpc_request *req,
588 struct obd_device *obd)
590 struct list_head *tmp;
592 __u64 transno = req->rq_reqmsg->transno;
593 struct ptlrpc_request *saved_req;
594 struct lustre_msg *reqmsg;
596 /* CAVEAT EMPTOR: The incoming request message has been swabbed
597 * (i.e. buflens etc are in my own byte order), but type-dependent
598 * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
601 INIT_LIST_HEAD(&req->rq_list);
602 DEBUG_REQ(D_HA, req, "not queueing");
606 /* XXX If I were a real man, these LBUGs would be sane cleanups. */
607 /* XXX just like the request-dup code in queue_final_reply */
608 OBD_ALLOC(saved_req, sizeof *saved_req);
611 OBD_ALLOC(reqmsg, req->rq_reqlen);
615 spin_lock_bh(&obd->obd_processing_task_lock);
617 /* If we're processing the queue, we want don't want to queue this
620 * Also, if this request has a transno less than the one we're waiting
621 * for, we should process it now. It could (and currently always will)
622 * be an open request for a descriptor that was opened some time ago.
624 if (obd->obd_processing_task == current->pid ||
625 transno < obd->obd_next_recovery_transno) {
626 /* Processing the queue right now, don't re-add. */
627 LASSERT(list_empty(&req->rq_list));
628 spin_unlock_bh(&obd->obd_processing_task_lock);
629 OBD_FREE(reqmsg, req->rq_reqlen);
630 OBD_FREE(saved_req, sizeof *saved_req);
634 memcpy(saved_req, req, sizeof *req);
635 memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
637 req->rq_reqmsg = reqmsg;
638 class_export_get(req->rq_export);
639 INIT_LIST_HEAD(&req->rq_list);
642 list_for_each(tmp, &obd->obd_recovery_queue) {
643 struct ptlrpc_request *reqiter =
644 list_entry(tmp, struct ptlrpc_request, rq_list);
646 if (reqiter->rq_reqmsg->transno > transno) {
647 list_add_tail(&req->rq_list, &reqiter->rq_list);
654 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
657 if (obd->obd_processing_task != 0) {
658 /* Someone else is processing this queue, we'll leave it to
661 if (transno == obd->obd_next_recovery_transno)
662 wake_up(&obd->obd_next_transno_waitq);
663 spin_unlock_bh(&obd->obd_processing_task_lock);
667 /* Nobody is processing, and we know there's (at least) one to process
668 * now, so we'll do the honours.
670 obd->obd_processing_task = current->pid;
671 spin_unlock_bh(&obd->obd_processing_task_lock);
673 process_recovery_queue(obd);
677 struct obd_device * target_req2obd(struct ptlrpc_request *req)
679 return req->rq_export->exp_obd;
682 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
684 struct obd_device *obd = target_req2obd(req);
685 struct ptlrpc_request *saved_req;
686 struct lustre_msg *reqmsg;
687 int recovery_done = 0;
690 /* Just like ptlrpc_error, but without the sending. */
691 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
693 req->rq_type = PTL_RPC_MSG_ERR;
696 LASSERT(list_empty(&req->rq_list));
697 /* XXX just like the request-dup code in queue_recovery_request */
698 OBD_ALLOC(saved_req, sizeof *saved_req);
701 OBD_ALLOC(reqmsg, req->rq_reqlen);
704 memcpy(saved_req, req, sizeof *saved_req);
705 memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
707 req->rq_reqmsg = reqmsg;
708 list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
710 spin_lock_bh(&obd->obd_processing_task_lock);
711 --obd->obd_recoverable_clients;
712 recovery_done = (obd->obd_recoverable_clients == 0);
713 spin_unlock_bh(&obd->obd_processing_task_lock);
716 struct list_head *tmp, *n;
717 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
719 "%s: all clients recovered, sending delayed replies\n",
721 obd->obd_recovering = 0;
722 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
723 req = list_entry(tmp, struct ptlrpc_request, rq_list);
724 DEBUG_REQ(D_ERROR, req, "delayed:");
726 list_del(&req->rq_list);
727 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
728 OBD_FREE(req, sizeof *req);
730 target_cancel_recovery_timer(obd);
732 CERROR("%s: %d recoverable clients remain\n",
733 obd->obd_name, obd->obd_recoverable_clients);
739 static void ptlrpc_abort_reply (struct ptlrpc_request *req)
741 /* On return, we must be sure that the ACK callback has either
742 * happened or will not happen. Note that the SENT callback will
743 * happen come what may since we successfully posted the PUT. */
745 struct l_wait_info lwi;
749 /* serialise with ACK callback */
750 spin_lock_irqsave (&req->rq_lock, flags);
751 if (!req->rq_want_ack) {
752 spin_unlock_irqrestore (&req->rq_lock, flags);
753 /* The ACK callback has happened already. Although the
754 * SENT callback might still be outstanding (yes really) we
755 * don't care; this is just like normal completion. */
758 spin_unlock_irqrestore (&req->rq_lock, flags);
760 /* Have a bash at unlinking the MD. This will fail until the SENT
761 * callback has happened since the MD is busy from the PUT. If the
762 * ACK still hasn't arrived after then, a successful unlink will
763 * ensure the ACK callback never happens. */
764 rc = PtlMDUnlink (req->rq_reply_md_h);
769 /* SENT callback happened; ACK callback preempted */
770 LASSERT (req->rq_want_ack);
771 spin_lock_irqsave (&req->rq_lock, flags);
772 req->rq_want_ack = 0;
773 spin_unlock_irqrestore (&req->rq_lock, flags);
778 /* Still sending or ACK callback in progress: wait until
779 * either callback has completed and try again.
780 * Actually we can't wait for the SENT callback because
781 * there's no state the SENT callback can touch that will
782 * allow it to communicate with us! So we just wait here
783 * for a short time, effectively polling for the SENT
784 * callback by calling PtlMDUnlink() again, to see if it
785 * has finished. Note that if the ACK does arrive, its
786 * callback wakes us in short order. --eeb */
787 lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
788 rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack,
790 CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
791 /* NB go back and test rq_want_ack with locking, to ensure
792 * if ACK callback happened, it has completed stopped
793 * referencing this req. */
798 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
803 struct ptlrpc_req_ack_lock *ack_lock;
804 struct l_wait_info lwi = { 0 };
805 wait_queue_t commit_wait;
806 struct obd_device *obd =
807 req->rq_export ? req->rq_export->exp_obd : NULL;
808 struct obd_export *exp =
809 (req->rq_export && req->rq_ack_locks[0].mode) ?
810 req->rq_export : NULL;
813 exp->exp_outstanding_reply = req;
814 spin_lock_irqsave (&req->rq_lock, flags);
815 req->rq_want_ack = 1;
816 spin_unlock_irqrestore (&req->rq_lock, flags);
819 if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
821 DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
822 netrc = ptlrpc_error(req);
824 DEBUG_REQ(D_NET, req, "sending reply");
825 netrc = ptlrpc_reply(req);
828 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
829 DEBUG_REQ(D_ERROR, req, "dropping reply");
830 if (!exp && req->rq_repmsg) {
831 OBD_FREE(req->rq_repmsg, req->rq_replen);
832 req->rq_repmsg = NULL;
834 init_waitqueue_head(&req->rq_wait_for_rep);
838 /* a failed send simulates the callbacks */
839 LASSERT(netrc == 0 || req->rq_want_ack == 0);
841 LASSERT(req->rq_want_ack == 0);
844 LASSERT(obd != NULL);
846 init_waitqueue_entry(&commit_wait, current);
847 add_wait_queue(&obd->obd_commit_waitq, &commit_wait);
848 rc = l_wait_event(req->rq_wait_for_rep,
849 !req->rq_want_ack || req->rq_resent ||
850 req->rq_transno <= obd->obd_last_committed, &lwi);
851 remove_wait_queue(&obd->obd_commit_waitq, &commit_wait);
853 spin_lock_irqsave (&req->rq_lock, flags);
854 /* If we got here because the ACK callback ran, this acts as a
855 * barrier to ensure the callback completed the wakeup. */
856 spin_unlock_irqrestore (&req->rq_lock, flags);
858 /* If we committed the transno already, then we might wake up before
859 * the ack arrives. We need to stop waiting for the ack before we can
860 * reuse this request structure. We are guaranteed by this point that
861 * this cannot abort the sending of the actual reply.*/
862 ptlrpc_abort_reply(req);
864 if (req->rq_resent) {
865 DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
870 DEBUG_REQ(D_HA, req, "cancelling locks for %s",
871 req->rq_want_ack ? "commit" : "ack");
873 exp->exp_outstanding_reply = NULL;
875 for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
878 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
882 int target_handle_ping(struct ptlrpc_request *req)
884 return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);