Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / lib / target.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  * Target-common OBD method implementations and utility functions.
25  */
26
27 #define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_OST /* XXX WRONG */
29
30 #include <linux/module.h>
31 #include <linux/obd_ost.h>
32 #include <linux/lustre_net.h>
33 #include <linux/lustre_dlm.h>
34
35 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
36                             struct obd_uuid *cluuid)
37 {
38         if (exp->exp_connection) {
39                 struct lustre_handle *hdl;
40                 hdl = &exp->exp_ldlm_data.led_import.imp_handle;
41                 /* Might be a re-connect after a partition. */
42                 if (!memcmp(conn, hdl, sizeof *conn)) {
43                         CERROR("%s reconnecting\n", cluuid->uuid);
44                         conn->addr = (__u64) (unsigned long)exp;
45                         conn->cookie = exp->exp_cookie;
46                         RETURN(EALREADY);
47                 } else {
48                         CERROR("%s reconnecting from %s, "
49                                "handle mismatch (ours "LPX64"/"LPX64", "
50                                "theirs "LPX64"/"LPX64")\n", cluuid->uuid,
51                                exp->exp_connection->c_remote_uuid.uuid,
52                                hdl->addr,
53                                hdl->cookie, conn->addr, conn->cookie);
54                         /* XXX disconnect them here? */
55                         memset(conn, 0, sizeof *conn);
56                         /* This is a little scary, but right now we build this
57                          * file separately into each server module, so I won't
58                          * go _immediately_ to hell.
59                          */
60                         RETURN(-EALREADY);
61                 }
62         }
63
64         conn->addr = (__u64) (unsigned long)exp;
65         conn->cookie = exp->exp_cookie;
66         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", cluuid->uuid, exp);
67         CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
68                (long long)conn->addr, (long long)conn->cookie);
69         RETURN(0);
70 }
71
72
73 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
74 {
75         struct obd_device *target;
76         struct obd_export *export = NULL;
77         struct obd_import *dlmimp;
78         struct lustre_handle conn;
79         struct obd_uuid tgtuuid;
80         struct obd_uuid cluuid;
81         struct list_head *p;
82         int rc, i;
83         ENTRY;
84
85         if (req->rq_reqmsg->buflens[0] > 37) {
86                 CERROR("bad target UUID for connect\n");
87                 GOTO(out, rc = -EINVAL);
88         }
89         obd_str2uuid(&tgtuuid, lustre_msg_buf(req->rq_reqmsg, 0));
90
91         if (req->rq_reqmsg->buflens[1] > 37) {
92                 CERROR("bad client UUID for connect\n");
93                 GOTO(out, rc = -EINVAL);
94         }
95         obd_str2uuid(&cluuid, lustre_msg_buf(req->rq_reqmsg, 1));
96
97         i = class_uuid2dev(&tgtuuid);
98         if (i == -1) {
99                 CERROR("UUID '%s' not found for connect\n", tgtuuid.uuid);
100                 GOTO(out, rc = -ENODEV);
101         }
102
103         target = &obd_dev[i];
104         if (!target)
105                 GOTO(out, rc = -ENODEV);
106
107         spin_lock_bh(&target->obd_processing_task_lock);
108         if (target->obd_flags & OBD_ABORT_RECOVERY)
109                 target_abort_recovery(target);
110         spin_unlock_bh(&target->obd_processing_task_lock);
111
112         conn.addr = req->rq_reqmsg->addr;
113         conn.cookie = req->rq_reqmsg->cookie;
114
115         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
116         if (rc)
117                 GOTO(out, rc);
118
119         /* lctl gets a backstage, all-access pass. */
120         if (!strcmp(cluuid.uuid, "OBD_CLASS_UUID"))
121                 goto dont_check_exports;
122
123         spin_lock(&target->obd_dev_lock);
124         list_for_each(p, &target->obd_exports) {
125                 export = list_entry(p, struct obd_export, exp_obd_chain);
126                 if (!memcmp(&cluuid, &export->exp_client_uuid,
127                             sizeof(export->exp_client_uuid))) {
128                         spin_unlock(&target->obd_dev_lock);
129                         LASSERT(export->exp_obd == target);
130
131                         rc = target_handle_reconnect(&conn, export, &cluuid);
132                         break;
133                 }
134                 export = NULL;
135         }
136         /* If we found an export, we already unlocked. */
137         if (!export)
138                 spin_unlock(&target->obd_dev_lock);
139
140         /* Tell the client if we're in recovery. */
141         /* If this is the first client, start the recovery timer */
142         if (target->obd_flags & OBD_RECOVERING) {
143                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
144                 target_start_recovery_timer(target, handler);
145         }
146
147         /* Tell the client if we support replayable requests */
148         if (target->obd_flags & OBD_REPLAYABLE)
149                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
150
151         if (!export) {
152                 if (target->obd_flags & OBD_RECOVERING) {
153                         CERROR("denying connection for new client %s: "
154                                "in recovery\n", cluuid.uuid);
155                         rc = -EBUSY;
156                 } else {
157  dont_check_exports:
158                         rc = obd_connect(&conn, target, &cluuid, ptlrpc_recovd,
159                                          target_revoke_connection);
160                 }
161         }
162
163         /* If all else goes well, this is our RPC return code. */
164         req->rq_status = 0;
165
166         if (rc && rc != EALREADY)
167                 GOTO(out, rc);
168
169         req->rq_repmsg->addr = conn.addr;
170         req->rq_repmsg->cookie = conn.cookie;
171
172         export = class_conn2export(&conn);
173         LASSERT(export);
174
175         req->rq_export = export;
176         export->exp_connection = ptlrpc_get_connection(&req->rq_peer, &cluuid);
177         if (req->rq_connection != NULL)
178                 ptlrpc_put_connection(req->rq_connection);
179         req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
180
181         if (rc == EALREADY) {
182                 /* We indicate the reconnection in a flag, not an error code. */
183                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
184                 GOTO(out, rc = 0);
185         }
186
187         spin_lock(&export->exp_connection->c_lock);
188         list_add(&export->exp_conn_chain, &export->exp_connection->c_exports);
189         spin_unlock(&export->exp_connection->c_lock);
190         recovd_conn_manage(export->exp_connection, ptlrpc_recovd,
191                            target_revoke_connection);
192
193         dlmimp = &export->exp_ldlm_data.led_import;
194         dlmimp->imp_connection = req->rq_connection;
195         dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
196         dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
197         dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
198         dlmimp->imp_obd = target;
199         dlmimp->imp_recover = NULL;
200         INIT_LIST_HEAD(&dlmimp->imp_replay_list);
201         INIT_LIST_HEAD(&dlmimp->imp_sending_list);
202         INIT_LIST_HEAD(&dlmimp->imp_delayed_list);
203         spin_lock_init(&dlmimp->imp_lock);
204         dlmimp->imp_level = LUSTRE_CONN_FULL;
205 out:
206         if (rc)
207                 req->rq_status = rc;
208         RETURN(rc);
209 }
210
211 int target_handle_disconnect(struct ptlrpc_request *req)
212 {
213         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
214         int rc;
215         ENTRY;
216
217         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
218         if (rc)
219                 RETURN(rc);
220
221         req->rq_status = obd_disconnect(conn);
222         req->rq_export = NULL;
223         RETURN(0);
224 }
225
226 static int target_disconnect_client(struct ptlrpc_connection *conn)
227 {
228         struct list_head *expiter, *n;
229         struct lustre_handle hdl;
230         struct obd_export *exp;
231         int rc;
232         ENTRY;
233
234         list_for_each_safe(expiter, n, &conn->c_exports) {
235                 exp = list_entry(expiter, struct obd_export, exp_conn_chain);
236
237                 CDEBUG(D_HA, "disconnecting export %p/%s\n",
238                        exp, exp->exp_client_uuid.uuid);
239                 hdl.addr = (__u64)(unsigned long)exp;
240                 hdl.cookie = exp->exp_cookie;
241                 rc = obd_disconnect(&hdl);
242                 if (rc)
243                         CERROR("disconnecting export %p failed: %d\n", exp, rc);
244         }
245
246         /* XXX spank the connection (it's frozen in _RECOVD for now!) */
247         RETURN(0);
248 }
249
250 static int target_fence_failed_connection(struct ptlrpc_connection *conn)
251 {
252         ENTRY;
253
254         conn->c_recovd_data.rd_phase = RD_PREPARED;
255
256         RETURN(0);
257 }
258
259 int target_revoke_connection(struct recovd_data *rd, int phase)
260 {
261         struct ptlrpc_connection *conn = class_rd2conn(rd);
262
263         LASSERT(conn);
264         ENTRY;
265
266         switch (phase) {
267             case PTLRPC_RECOVD_PHASE_PREPARE:
268                 RETURN(target_fence_failed_connection(conn));
269             case PTLRPC_RECOVD_PHASE_RECOVER:
270                 RETURN(target_disconnect_client(conn));
271             case PTLRPC_RECOVD_PHASE_FAILURE:
272                 LBUG();
273                 RETURN(0);
274         }
275
276         LBUG();
277         RETURN(-ENOSYS);
278 }
279
280 /*
281  * Recovery functions 
282  */
283
284 static void abort_delayed_replies(struct obd_device *obd)
285 {
286         struct ptlrpc_request *req;
287         struct list_head *tmp, *n;
288         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
289                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
290                 DEBUG_REQ(D_ERROR, req, "aborted:");
291                 req->rq_status = -ENOTCONN;
292                 req->rq_type = PTL_RPC_MSG_ERR;
293                 ptlrpc_reply(req->rq_svc, req);
294                 list_del(&req->rq_list);
295                 OBD_FREE(req, sizeof *req);
296         }
297 }
298
299 void target_abort_recovery(void *data)
300 {
301         struct obd_device *obd = data;
302         CERROR("disconnecting clients and aborting recovery\n");
303         obd->obd_recoverable_clients = 0;
304         obd->obd_flags &= ~(OBD_RECOVERING | OBD_ABORT_RECOVERY);
305         abort_delayed_replies(obd);
306         spin_unlock_bh(&obd->obd_processing_task_lock);
307         class_disconnect_all(obd);
308         spin_lock_bh(&obd->obd_processing_task_lock);
309 }
310
311 static void target_recovery_expired(unsigned long castmeharder)
312 {
313         struct obd_device *obd = (struct obd_device *)castmeharder;
314         CERROR("recovery timed out, aborting\n");
315         spin_lock_bh(&obd->obd_processing_task_lock);
316         obd->obd_flags |= OBD_ABORT_RECOVERY;
317         wake_up(&obd->obd_next_transno_waitq);
318         spin_unlock_bh(&obd->obd_processing_task_lock);
319 }
320
321 static void reset_recovery_timer(struct obd_device *obd)
322 {
323         CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
324                OBD_RECOVERY_TIMEOUT / HZ);
325         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
326 }
327
328
329 /* Only start it the first time called */
330 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
331 {
332         spin_lock_bh(&obd->obd_processing_task_lock);
333         if (obd->obd_recovery_handler) {
334                 spin_unlock_bh(&obd->obd_processing_task_lock);
335                 return;
336         }
337         CERROR("%s: starting recovery timer\n", obd->obd_name);
338         obd->obd_recovery_handler = handler;
339         obd->obd_recovery_timer.function = target_recovery_expired;
340         obd->obd_recovery_timer.data = (unsigned long)obd;
341         init_timer(&obd->obd_recovery_timer);
342         spin_unlock_bh(&obd->obd_processing_task_lock);
343
344         reset_recovery_timer(obd);
345 }
346
347 static void cancel_recovery_timer(struct obd_device *obd)
348 {
349         del_timer(&obd->obd_recovery_timer);
350 }
351
352 static int check_for_next_transno(struct obd_device *obd)
353 {
354         struct ptlrpc_request *req;
355         req = list_entry(obd->obd_recovery_queue.next,
356                          struct ptlrpc_request, rq_list);
357         LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
358
359         return req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
360                 (obd->obd_flags & OBD_RECOVERING) == 0;
361 }
362
363 static void process_recovery_queue(struct obd_device *obd)
364 {
365         struct ptlrpc_request *req;
366         int aborted = 0;
367         ENTRY;
368
369         for (;;) {
370                 spin_lock_bh(&obd->obd_processing_task_lock);
371                 LASSERT(obd->obd_processing_task == current->pid);
372                 req = list_entry(obd->obd_recovery_queue.next,
373                                  struct ptlrpc_request, rq_list);
374
375                 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
376                         struct l_wait_info lwi = { 0 };
377                         spin_unlock_bh(&obd->obd_processing_task_lock);
378                         CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
379                                LPD64")\n",
380                                obd->obd_next_recovery_transno,
381                                req->rq_reqmsg->transno);
382                         l_wait_event(obd->obd_next_transno_waitq,
383                                      check_for_next_transno(obd), &lwi);
384                         spin_lock_bh(&obd->obd_processing_task_lock);
385                         if (obd->obd_flags & OBD_ABORT_RECOVERY) {
386                                 target_abort_recovery(obd);
387                                 aborted = 1;
388                         }
389                         spin_unlock_bh(&obd->obd_processing_task_lock);
390                         if (aborted)
391                                 return;
392                         continue;
393                 }
394                 list_del_init(&req->rq_list);
395                 spin_unlock_bh(&obd->obd_processing_task_lock);
396
397                 DEBUG_REQ(D_ERROR, req, "processing: ");
398                 (void)obd->obd_recovery_handler(req);
399                 reset_recovery_timer(obd);
400 #warning FIXME: mds_fsync_super(mds->mds_sb);
401                 OBD_FREE(req, sizeof *req);
402                 spin_lock_bh(&obd->obd_processing_task_lock);
403                 obd->obd_next_recovery_transno++;
404                 if (list_empty(&obd->obd_recovery_queue)) {
405                         obd->obd_processing_task = 0;
406                         spin_unlock_bh(&obd->obd_processing_task_lock);
407                         break;
408                 }
409                 spin_unlock_bh(&obd->obd_processing_task_lock);
410         }
411         EXIT;
412 }
413
414 int target_queue_recovery_request(struct ptlrpc_request *req,
415                                   struct obd_device *obd)
416 {
417         struct list_head *tmp;
418         int inserted = 0;
419         __u64 transno = req->rq_reqmsg->transno;
420         struct ptlrpc_request *saved_req;
421
422         if (!transno) {
423                 INIT_LIST_HEAD(&req->rq_list);
424                 DEBUG_REQ(D_HA, req, "not queueing");
425                 return 1;
426         }
427
428         spin_lock_bh(&obd->obd_processing_task_lock);
429
430         if (obd->obd_processing_task == current->pid) {
431                 /* Processing the queue right now, don't re-add. */
432                 LASSERT(list_empty(&req->rq_list));
433                 spin_unlock_bh(&obd->obd_processing_task_lock);
434                 return 1;
435         }
436
437         OBD_ALLOC(saved_req, sizeof *saved_req);
438         if (!saved_req)
439                 LBUG();
440         memcpy(saved_req, req, sizeof *req);
441         req = saved_req;
442         INIT_LIST_HEAD(&req->rq_list);
443
444         /* XXX O(n^2) */
445         list_for_each(tmp, &obd->obd_recovery_queue) {
446                 struct ptlrpc_request *reqiter =
447                         list_entry(tmp, struct ptlrpc_request, rq_list);
448
449                 if (reqiter->rq_reqmsg->transno > transno) {
450                         list_add_tail(&req->rq_list, &reqiter->rq_list);
451                         inserted = 1;
452                         break;
453                 }
454         }
455
456         if (!inserted) {
457                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
458         }
459
460         if (obd->obd_processing_task != 0) {
461                 /* Someone else is processing this queue, we'll leave it to
462                  * them.
463                  */
464                 if (transno == obd->obd_next_recovery_transno)
465                         wake_up(&obd->obd_next_transno_waitq);
466                 spin_unlock_bh(&obd->obd_processing_task_lock);
467                 return 0;
468         }
469
470         /* Nobody is processing, and we know there's (at least) one to process
471          * now, so we'll do the honours.
472          */
473         obd->obd_processing_task = current->pid;
474         spin_unlock_bh(&obd->obd_processing_task_lock);
475
476         process_recovery_queue(obd);
477         return 0;
478 }
479
480 struct obd_device * target_req2obd(struct ptlrpc_request *req)
481 {
482         return req->rq_export->exp_obd;
483 }
484
485 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
486 {
487         struct obd_device *obd = target_req2obd(req);
488         struct ptlrpc_request *saved_req;
489
490         spin_lock_bh(&obd->obd_processing_task_lock);
491         if (rc) {
492                 /* Just like ptlrpc_error, but without the sending. */
493                 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
494                                 &req->rq_repmsg);
495                 req->rq_type = PTL_RPC_MSG_ERR;
496         }
497
498         LASSERT(list_empty(&req->rq_list));
499         OBD_ALLOC(saved_req, sizeof *saved_req);
500         memcpy(saved_req, req, sizeof *saved_req);
501         req = saved_req;
502         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
503         if (--obd->obd_recoverable_clients == 0) {
504                 struct list_head *tmp, *n;
505                 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
506                 CDEBUG(D_ERROR,
507                        "all clients recovered, sending delayed replies\n");
508                 obd->obd_flags &= ~OBD_RECOVERING;
509                 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
510                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
511                         DEBUG_REQ(D_ERROR, req, "delayed:");
512                         ptlrpc_reply(req->rq_svc, req);
513                         list_del(&req->rq_list);
514                         OBD_FREE(req, sizeof *req);
515                 }
516                 cancel_recovery_timer(obd);
517         } else {
518                 CERROR("%d recoverable clients remain\n",
519                        obd->obd_recoverable_clients);
520         }
521
522         spin_unlock_bh(&obd->obd_processing_task_lock);
523         return 1;
524 }