1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
7 * Author: Mike Shaver <shaver@clusterfs.com>
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 * Target-common OBD method implementations and utility functions.
28 #define DEBUG_SUBSYSTEM S_OST /* XXX WRONG */
30 #include <linux/module.h>
31 #include <linux/obd_ost.h>
32 #include <linux/lustre_net.h>
33 #include <linux/lustre_dlm.h>
35 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
36 struct obd_uuid *cluuid)
38 if (exp->exp_connection) {
39 struct lustre_handle *hdl;
40 hdl = &exp->exp_ldlm_data.led_import.imp_handle;
41 /* Might be a re-connect after a partition. */
42 if (!memcmp(conn, hdl, sizeof *conn)) {
43 CERROR("%s reconnecting\n", cluuid->uuid);
44 conn->addr = (__u64) (unsigned long)exp;
45 conn->cookie = exp->exp_cookie;
48 CERROR("%s reconnecting from %s, "
49 "handle mismatch (ours "LPX64"/"LPX64", "
50 "theirs "LPX64"/"LPX64")\n", cluuid->uuid,
51 exp->exp_connection->c_remote_uuid.uuid,
53 hdl->cookie, conn->addr, conn->cookie);
54 /* XXX disconnect them here? */
55 memset(conn, 0, sizeof *conn);
56 /* This is a little scary, but right now we build this
57 * file separately into each server module, so I won't
58 * go _immediately_ to hell.
64 conn->addr = (__u64) (unsigned long)exp;
65 conn->cookie = exp->exp_cookie;
66 CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", cluuid->uuid, exp);
67 CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
68 (long long)conn->addr, (long long)conn->cookie);
73 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
75 struct obd_device *target;
76 struct obd_export *export = NULL;
77 struct obd_import *dlmimp;
78 struct lustre_handle conn;
79 struct obd_uuid tgtuuid;
80 struct obd_uuid cluuid;
85 if (req->rq_reqmsg->buflens[0] > 37) {
86 CERROR("bad target UUID for connect\n");
87 GOTO(out, rc = -EINVAL);
89 obd_str2uuid(&tgtuuid, lustre_msg_buf(req->rq_reqmsg, 0));
91 if (req->rq_reqmsg->buflens[1] > 37) {
92 CERROR("bad client UUID for connect\n");
93 GOTO(out, rc = -EINVAL);
95 obd_str2uuid(&cluuid, lustre_msg_buf(req->rq_reqmsg, 1));
97 i = class_uuid2dev(&tgtuuid);
99 CERROR("UUID '%s' not found for connect\n", tgtuuid.uuid);
100 GOTO(out, rc = -ENODEV);
103 target = &obd_dev[i];
105 GOTO(out, rc = -ENODEV);
107 spin_lock_bh(&target->obd_processing_task_lock);
108 if (target->obd_flags & OBD_ABORT_RECOVERY)
109 target_abort_recovery(target);
110 spin_unlock_bh(&target->obd_processing_task_lock);
112 conn.addr = req->rq_reqmsg->addr;
113 conn.cookie = req->rq_reqmsg->cookie;
115 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
119 /* lctl gets a backstage, all-access pass. */
120 if (!strcmp(cluuid.uuid, "OBD_CLASS_UUID"))
121 goto dont_check_exports;
123 spin_lock(&target->obd_dev_lock);
124 list_for_each(p, &target->obd_exports) {
125 export = list_entry(p, struct obd_export, exp_obd_chain);
126 if (!memcmp(&cluuid, &export->exp_client_uuid,
127 sizeof(export->exp_client_uuid))) {
128 spin_unlock(&target->obd_dev_lock);
129 LASSERT(export->exp_obd == target);
131 rc = target_handle_reconnect(&conn, export, &cluuid);
136 /* If we found an export, we already unlocked. */
138 spin_unlock(&target->obd_dev_lock);
140 /* Tell the client if we're in recovery. */
141 /* If this is the first client, start the recovery timer */
142 if (target->obd_flags & OBD_RECOVERING) {
143 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
144 target_start_recovery_timer(target, handler);
147 /* Tell the client if we support replayable requests */
148 if (target->obd_flags & OBD_REPLAYABLE)
149 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
152 if (target->obd_flags & OBD_RECOVERING) {
153 CERROR("denying connection for new client %s: "
154 "in recovery\n", cluuid.uuid);
158 rc = obd_connect(&conn, target, &cluuid, ptlrpc_recovd,
159 target_revoke_connection);
163 /* If all else goes well, this is our RPC return code. */
166 if (rc && rc != EALREADY)
169 req->rq_repmsg->addr = conn.addr;
170 req->rq_repmsg->cookie = conn.cookie;
172 export = class_conn2export(&conn);
175 req->rq_export = export;
176 export->exp_connection = ptlrpc_get_connection(&req->rq_peer, &cluuid);
177 if (req->rq_connection != NULL)
178 ptlrpc_put_connection(req->rq_connection);
179 req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
181 if (rc == EALREADY) {
182 /* We indicate the reconnection in a flag, not an error code. */
183 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
187 spin_lock(&export->exp_connection->c_lock);
188 list_add(&export->exp_conn_chain, &export->exp_connection->c_exports);
189 spin_unlock(&export->exp_connection->c_lock);
190 recovd_conn_manage(export->exp_connection, ptlrpc_recovd,
191 target_revoke_connection);
193 dlmimp = &export->exp_ldlm_data.led_import;
194 dlmimp->imp_connection = req->rq_connection;
195 dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
196 dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
197 dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
198 dlmimp->imp_obd = /* LDLM! */ NULL;
199 dlmimp->imp_recover = NULL;
200 INIT_LIST_HEAD(&dlmimp->imp_replay_list);
201 INIT_LIST_HEAD(&dlmimp->imp_sending_list);
202 INIT_LIST_HEAD(&dlmimp->imp_delayed_list);
203 spin_lock_init(&dlmimp->imp_lock);
204 dlmimp->imp_level = LUSTRE_CONN_FULL;
211 int target_handle_disconnect(struct ptlrpc_request *req)
213 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
217 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
221 req->rq_status = obd_disconnect(conn);
222 req->rq_export = NULL;
226 static int target_disconnect_client(struct ptlrpc_connection *conn)
228 struct list_head *expiter, *n;
229 struct lustre_handle hdl;
230 struct obd_export *exp;
234 list_for_each_safe(expiter, n, &conn->c_exports) {
235 exp = list_entry(expiter, struct obd_export, exp_conn_chain);
237 CDEBUG(D_HA, "disconnecting export %p/%s\n",
238 exp, exp->exp_client_uuid.uuid);
239 hdl.addr = (__u64)(unsigned long)exp;
240 hdl.cookie = exp->exp_cookie;
241 rc = obd_disconnect(&hdl);
243 CERROR("disconnecting export %p failed: %d\n", exp, rc);
246 /* XXX spank the connection (it's frozen in _RECOVD for now!) */
250 static int target_fence_failed_connection(struct ptlrpc_connection *conn)
254 conn->c_recovd_data.rd_phase = RD_PREPARED;
259 int target_revoke_connection(struct recovd_data *rd, int phase)
261 struct ptlrpc_connection *conn = class_rd2conn(rd);
267 case PTLRPC_RECOVD_PHASE_PREPARE:
268 RETURN(target_fence_failed_connection(conn));
269 case PTLRPC_RECOVD_PHASE_RECOVER:
270 RETURN(target_disconnect_client(conn));
271 case PTLRPC_RECOVD_PHASE_FAILURE:
284 static void abort_delayed_replies(struct obd_device *obd)
286 struct ptlrpc_request *req;
287 struct list_head *tmp, *n;
288 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
289 req = list_entry(tmp, struct ptlrpc_request, rq_list);
290 DEBUG_REQ(D_ERROR, req, "aborted:");
291 req->rq_status = -ENOTCONN;
292 req->rq_type = PTL_RPC_MSG_ERR;
293 ptlrpc_reply(req->rq_svc, req);
294 list_del(&req->rq_list);
295 OBD_FREE(req, sizeof *req);
299 void target_abort_recovery(void *data)
301 struct obd_device *obd = data;
302 CERROR("disconnecting clients and aborting recovery\n");
303 obd->obd_recoverable_clients = 0;
304 obd->obd_flags &= ~(OBD_RECOVERING | OBD_ABORT_RECOVERY);
305 abort_delayed_replies(obd);
306 spin_unlock_bh(&obd->obd_processing_task_lock);
307 class_disconnect_all(obd);
308 spin_lock_bh(&obd->obd_processing_task_lock);
311 static void target_recovery_expired(unsigned long castmeharder)
313 struct obd_device *obd = (struct obd_device *)castmeharder;
314 CERROR("recovery timed out, aborting\n");
315 spin_lock_bh(&obd->obd_processing_task_lock);
316 obd->obd_flags |= OBD_ABORT_RECOVERY;
317 wake_up(&obd->obd_next_transno_waitq);
318 spin_unlock_bh(&obd->obd_processing_task_lock);
321 static void reset_recovery_timer(struct obd_device *obd)
323 CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
324 OBD_RECOVERY_TIMEOUT / HZ);
325 mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
329 /* Only start it the first time called */
330 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
332 spin_lock_bh(&obd->obd_processing_task_lock);
333 if (obd->obd_recovery_handler) {
334 spin_unlock_bh(&obd->obd_processing_task_lock);
337 CERROR("%s: starting recovery timer\n", obd->obd_name);
338 obd->obd_recovery_handler = handler;
339 obd->obd_recovery_timer.function = target_recovery_expired;
340 obd->obd_recovery_timer.data = (unsigned long)obd;
341 init_timer(&obd->obd_recovery_timer);
342 spin_unlock_bh(&obd->obd_processing_task_lock);
344 reset_recovery_timer(obd);
347 static void cancel_recovery_timer(struct obd_device *obd)
349 del_timer(&obd->obd_recovery_timer);
352 static int check_for_next_transno(struct obd_device *obd)
354 struct ptlrpc_request *req;
355 req = list_entry(obd->obd_recovery_queue.next,
356 struct ptlrpc_request, rq_list);
357 LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
359 return req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
360 (obd->obd_flags & OBD_RECOVERING) == 0;
363 static void process_recovery_queue(struct obd_device *obd)
365 struct ptlrpc_request *req;
370 spin_lock_bh(&obd->obd_processing_task_lock);
371 LASSERT(obd->obd_processing_task == current->pid);
372 req = list_entry(obd->obd_recovery_queue.next,
373 struct ptlrpc_request, rq_list);
375 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
376 spin_unlock_bh(&obd->obd_processing_task_lock);
377 CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
379 obd->obd_next_recovery_transno,
380 req->rq_reqmsg->transno);
381 wait_event(obd->obd_next_transno_waitq,
382 check_for_next_transno(obd));
383 spin_lock_bh(&obd->obd_processing_task_lock);
384 if (obd->obd_flags & OBD_ABORT_RECOVERY) {
385 target_abort_recovery(obd);
388 spin_unlock_bh(&obd->obd_processing_task_lock);
393 list_del_init(&req->rq_list);
394 spin_unlock_bh(&obd->obd_processing_task_lock);
396 DEBUG_REQ(D_ERROR, req, "processing: ");
397 (void)obd->obd_recovery_handler(req);
398 reset_recovery_timer(obd);
399 #warning FIXME: mds_fsync_super(mds->mds_sb);
400 OBD_FREE(req, sizeof *req);
401 spin_lock_bh(&obd->obd_processing_task_lock);
402 obd->obd_next_recovery_transno++;
403 if (list_empty(&obd->obd_recovery_queue)) {
404 obd->obd_processing_task = 0;
405 spin_unlock_bh(&obd->obd_processing_task_lock);
408 spin_unlock_bh(&obd->obd_processing_task_lock);
413 int target_queue_recovery_request(struct ptlrpc_request *req,
414 struct obd_device *obd)
416 struct list_head *tmp;
418 __u64 transno = req->rq_reqmsg->transno;
419 struct ptlrpc_request *saved_req;
422 INIT_LIST_HEAD(&req->rq_list);
423 DEBUG_REQ(D_HA, req, "not queueing");
427 spin_lock_bh(&obd->obd_processing_task_lock);
429 if (obd->obd_processing_task == current->pid) {
430 /* Processing the queue right now, don't re-add. */
431 LASSERT(list_empty(&req->rq_list));
432 spin_unlock_bh(&obd->obd_processing_task_lock);
436 OBD_ALLOC(saved_req, sizeof *saved_req);
439 memcpy(saved_req, req, sizeof *req);
441 INIT_LIST_HEAD(&req->rq_list);
444 list_for_each(tmp, &obd->obd_recovery_queue) {
445 struct ptlrpc_request *reqiter =
446 list_entry(tmp, struct ptlrpc_request, rq_list);
448 if (reqiter->rq_reqmsg->transno > transno) {
449 list_add_tail(&req->rq_list, &reqiter->rq_list);
456 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
459 if (obd->obd_processing_task != 0) {
460 /* Someone else is processing this queue, we'll leave it to
463 if (transno == obd->obd_next_recovery_transno)
464 wake_up(&obd->obd_next_transno_waitq);
465 spin_unlock_bh(&obd->obd_processing_task_lock);
469 /* Nobody is processing, and we know there's (at least) one to process
470 * now, so we'll do the honours.
472 obd->obd_processing_task = current->pid;
473 spin_unlock_bh(&obd->obd_processing_task_lock);
475 process_recovery_queue(obd);
479 struct obd_device * target_req2obd(struct ptlrpc_request *req)
481 return req->rq_export->exp_obd;
484 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
486 struct obd_device *obd = target_req2obd(req);
487 struct ptlrpc_request *saved_req;
489 spin_lock_bh(&obd->obd_processing_task_lock);
491 /* Just like ptlrpc_error, but without the sending. */
492 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
494 req->rq_type = PTL_RPC_MSG_ERR;
497 LASSERT(list_empty(&req->rq_list));
498 OBD_ALLOC(saved_req, sizeof *saved_req);
499 memcpy(saved_req, req, sizeof *saved_req);
501 list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
502 if (--obd->obd_recoverable_clients == 0) {
503 struct list_head *tmp, *n;
504 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
506 "all clients recovered, sending delayed replies\n");
507 obd->obd_flags &= ~OBD_RECOVERING;
508 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
509 req = list_entry(tmp, struct ptlrpc_request, rq_list);
510 DEBUG_REQ(D_ERROR, req, "delayed:");
511 ptlrpc_reply(req->rq_svc, req);
512 list_del(&req->rq_list);
513 OBD_FREE(req, sizeof *req);
515 cancel_recovery_timer(obd);
517 CERROR("%d recoverable clients remain\n",
518 obd->obd_recoverable_clients);
521 spin_unlock_bh(&obd->obd_processing_task_lock);