Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgs/mgs_handler.c
5  *  Lustre Management Server (mgs) request handler
6  *
7  *  Copyright (C) 2001-2005 Cluster File Systems, Inc.
8  *   Author Nathan <nathan@clusterfs.com>
9  *   Author LinSongTao <lincent@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #ifndef EXPORT_SYMTAB
28 # define EXPORT_SYMTAB
29 #endif
30 #define DEBUG_SUBSYSTEM S_MGS
31 #define D_MGS D_CONFIG|D_ERROR
32
33 #ifdef __KERNEL__
34 # include <linux/module.h>
35 # include <linux/pagemap.h>
36 # include <linux/miscdevice.h>
37 # include <linux/init.h>
38 #else
39 # include <liblustre.h>
40 #endif
41
42 #include <linux/obd_class.h>
43 #include <linux/lustre_dlm.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_fsfilt.h>
46 #include <linux/lustre_commit_confd.h>
47 #include <linux/lustre_disk.h>
48 #include "mgs_internal.h"
49
50 static int mgs_cleanup(struct obd_device *obd);
51
52 /* Establish a connection to the MGS.*/
53 static int mgs_connect(struct lustre_handle *conn, struct obd_device *obd,
54                        struct obd_uuid *cluuid, struct obd_connect_data *data)
55 {
56         struct obd_export *exp;
57         int rc;
58         ENTRY;
59
60         if (!conn || !obd || !cluuid)
61                 RETURN(-EINVAL);
62
63         rc = class_connect(conn, obd, cluuid);
64         if (rc)
65                 RETURN(rc);
66         exp = class_conn2export(conn);
67         LASSERT(exp);
68
69         if (data != NULL) {
70                 data->ocd_connect_flags &= MGMT_CONNECT_SUPPORTED;
71                 exp->exp_connect_flags = data->ocd_connect_flags;
72         }
73
74         if (rc) {
75                 class_disconnect(exp);
76         } else {
77                 class_export_put(exp);
78         }
79
80         RETURN(rc);
81 }
82
83 static int mgs_disconnect(struct obd_export *exp)
84 {
85         unsigned long irqflags;
86         int rc;
87         ENTRY;
88
89         LASSERT(exp);
90         class_export_get(exp);
91
92         /* Disconnect early so that clients can't keep using export */
93         rc = class_disconnect(exp);
94         ldlm_cancel_locks_for_export(exp);
95
96         /* complete all outstanding replies */
97         spin_lock_irqsave(&exp->exp_lock, irqflags);
98         while (!list_empty(&exp->exp_outstanding_replies)) {
99                 struct ptlrpc_reply_state *rs =
100                         list_entry(exp->exp_outstanding_replies.next,
101                                    struct ptlrpc_reply_state, rs_exp_list);
102                 struct ptlrpc_service *svc = rs->rs_service;
103
104                 spin_lock(&svc->srv_lock);
105                 list_del_init(&rs->rs_exp_list);
106                 ptlrpc_schedule_difficult_reply(rs);
107                 spin_unlock(&svc->srv_lock);
108         }
109         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
110
111         class_export_put(exp);
112         RETURN(rc);
113 }
114
115 static int mgs_handle(struct ptlrpc_request *req);
116
117 /* Start the MGS obd */
118 static int mgs_setup(struct obd_device *obd, obd_count len, void *buf)
119 {
120         struct lprocfs_static_vars lvars;
121         struct mgs_obd *mgs = &obd->u.mgs;
122         struct lustre_mount_info *lmi;
123         struct lustre_sb_info *lsi;
124         struct vfsmount *mnt;
125         int rc = 0;
126         ENTRY;
127
128         CDEBUG(D_CONFIG, "Starting MGS\n");
129
130         /* Find our disk */
131         lmi = server_get_mount(obd->obd_name);
132         if (!lmi) 
133                 RETURN(rc = -EINVAL);
134
135         mnt = lmi->lmi_mnt;
136         lsi = s2lsi(lmi->lmi_sb);
137         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
138         if (IS_ERR(obd->obd_fsops))
139                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
140
141         /* namespace for mgs llog */
142         obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER);
143         if (obd->obd_namespace == NULL) {
144                 mgs_cleanup(obd);
145                 GOTO(err_ops, rc = -ENOMEM);
146         }
147
148         /* ldlm setup */
149         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
150                            "mgs_ldlm_client", &obd->obd_ldlm_client);
151
152         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
153
154         rc = mgs_fs_setup(obd, mnt);
155         if (rc) {
156                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
157                        obd->obd_name, rc);
158                 GOTO(err_ns, rc);
159         }
160
161         rc = llog_start_commit_thread();
162         if (rc < 0)
163                 GOTO(err_fs, rc);
164
165         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
166                         &llog_lvfs_ops);
167         if (rc)
168                 GOTO(err_fs, rc);
169
170         /* Internal mgs setup */
171         mgs_init_db_list(obd);
172
173         /* Start the service threads */
174         mgs->mgs_service =
175                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
176                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL, 
177                                 MGC_REPLY_PORTAL, MGS_SERVICE_WATCHDOG_TIMEOUT,
178                                 mgs_handle, LUSTRE_MGS_NAME, 
179                                 obd->obd_proc_entry, NULL, MGS_NUM_THREADS);
180
181         if (!mgs->mgs_service) {
182                 CERROR("failed to start service\n");
183                 GOTO(err_fs, rc = -ENOMEM);
184         }
185
186         rc = ptlrpc_start_threads(obd, mgs->mgs_service, "lustre_mgs");
187         if (rc)
188                 GOTO(err_thread, rc);
189
190         /* Setup proc */
191         lprocfs_init_vars(mgs, &lvars);
192         lprocfs_obd_setup(obd, lvars.obd_vars);
193
194         ldlm_timeout = 6;
195         ping_evictor_start();
196
197         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
198
199         RETURN(0);
200
201 err_thread:
202         ptlrpc_unregister_service(mgs->mgs_service);
203 err_fs:
204         /* No extra cleanup needed for llog_init_commit_thread() */
205         mgs_fs_cleanup(obd);
206 err_ns:
207         ldlm_namespace_free(obd->obd_namespace, 0);
208         obd->obd_namespace = NULL;
209 err_ops:
210         fsfilt_put_ops(obd->obd_fsops);
211 err_put:
212         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
213         mgs->mgs_sb = 0;
214         return rc;
215 }
216
217 static int mgs_precleanup(struct obd_device *obd, int stage)
218 {
219         int rc = 0;
220         ENTRY;
221
222         switch (stage) {
223         case OBD_CLEANUP_SELF_EXP:
224                 mgs_cleanup_db_list(obd);
225                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
226                 rc = obd_llog_finish(obd, 0);
227         }
228         RETURN(rc);
229 }
230
231 static int mgs_cleanup(struct obd_device *obd)
232 {
233         struct mgs_obd *mgs = &obd->u.mgs;
234         lvfs_sbdev_type save_dev;
235         ENTRY;
236
237         ping_evictor_stop();
238
239         if (mgs->mgs_sb == NULL)
240                 RETURN(0);
241
242         save_dev = lvfs_sbdev(mgs->mgs_sb);
243         
244         lprocfs_obd_cleanup(obd);
245
246         ptlrpc_unregister_service(mgs->mgs_service);
247
248         mgs_fs_cleanup(obd);
249
250         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
251         mgs->mgs_sb = NULL;
252
253         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
254
255         LASSERT(!obd->obd_recovering);
256
257         lvfs_clear_rdonly(save_dev);
258
259         fsfilt_put_ops(obd->obd_fsops);
260
261         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
262
263         RETURN(0);
264 }
265
266 /* similar to filter_prepare_destroy */
267 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
268                             struct lustre_handle *lockh)
269 {
270         /* FIXME resource should be based on fsname, 
271            one lock per fs.  One lock per config log? */
272         struct ldlm_res_id res_id = {.name = {12321}};
273         int rc, flags = 0;
274
275         CERROR("mgs_lock %s\n", fsname);
276
277         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
278                               LDLM_PLAIN, NULL, LCK_EX, &flags,
279                               ldlm_blocking_ast, ldlm_completion_ast, 
280                               NULL, NULL, NULL, 0, NULL, lockh);
281         if (rc) {
282                 CERROR("can't take cfg lock %d\n", rc);
283         }
284
285         return rc;
286 }
287
288 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
289 {
290         CERROR("mgs_unlock\n");
291         
292         ldlm_lock_decref(lockh, LCK_EX);
293         return 0;
294 }
295
296 static int mgs_handle_target_add(struct ptlrpc_request *req)
297 {    
298         struct obd_device *obd = req->rq_export->exp_obd;
299         struct lustre_handle lockh;
300         struct mgmt_target_info *mti, *rep_mti;
301         int rep_size = sizeof(*mti);
302         int rc, lockrc;
303         ENTRY;
304
305         mti = lustre_swab_reqbuf(req, 0, sizeof(*mti),
306                                  lustre_swab_mgmt_target_info);
307         
308         CDEBUG(D_MGS, "adding %s, index=%d\n", mti->mti_svname, 
309                mti->mti_stripe_index);
310
311         /* set the new target index if needed */
312         if (mti->mti_flags & LDD_F_NEED_INDEX) {
313                 rc = mgs_set_next_index(obd, mti);
314                 if (rc) {
315                         CERROR("Can't get index (%d)\n", rc);
316                         GOTO(out, rc);
317                 }
318         }
319
320         /* revoke the config lock so everyone will update */
321         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
322         if (lockrc != ELDLM_OK) {
323                 LCONSOLE_ERROR("Can't signal other nodes to update their "
324                                "configuration (%d). Updating local logs "
325                                "anyhow; you might have to manually restart "
326                                "other servers to get the latest configuration."
327                                "\n", lockrc);
328         }
329
330         /* create the log for the new target 
331            and update the client/mdt logs */
332         rc = mgs_write_log_target(obd, mti);
333         
334         /* done with log update */
335         if (lockrc == ELDLM_OK)
336                 mgs_put_cfg_lock(&lockh);
337
338         if (rc) {
339                 CERROR("Failed to write %s log (%d)\n", 
340                        mti->mti_svname, rc);
341                 GOTO(out, rc);
342         }
343
344 out:
345         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname, 
346                mti->mti_stripe_index, rc);
347         lustre_pack_reply(req, 1, &rep_size, NULL); 
348         /* send back the whole mti in the reply */
349         rep_mti = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep_mti));
350         memcpy(rep_mti, mti, sizeof(*rep_mti));
351         RETURN(rc);
352 }
353
354 int mgs_handle(struct ptlrpc_request *req)
355 {
356         int fail = OBD_FAIL_MGMT_ALL_REPLY_NET;
357         int rc = 0;
358         ENTRY;
359
360         OBD_FAIL_RETURN(OBD_FAIL_MGMT_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
361
362         LASSERT(current->journal_info == NULL);
363         if (req->rq_reqmsg->opc != MGMT_CONNECT) {
364                 if (req->rq_export == NULL) {
365                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
366                                req->rq_reqmsg->opc);
367                         req->rq_status = -ENOTCONN;
368                         GOTO(out, rc = -ENOTCONN);
369                 }
370         }
371
372         switch (req->rq_reqmsg->opc) {
373         case MGMT_CONNECT:
374                 DEBUG_REQ(D_MGS, req, "connect");
375                 OBD_FAIL_RETURN(OBD_FAIL_MGMT_CONNECT_NET, 0);
376                 rc = target_handle_connect(req, mgs_handle);
377                 break;
378         case MGMT_DISCONNECT:
379                 DEBUG_REQ(D_MGS, req, "disconnect");
380                 OBD_FAIL_RETURN(OBD_FAIL_MGMT_DISCONNECT_NET, 0);
381                 rc = target_handle_disconnect(req);
382                 req->rq_status = rc;            /* superfluous? */
383                 break;
384
385         case MGMT_TARGET_ADD:
386                 DEBUG_REQ(D_MGS, req, "target add\n");
387                 rc = mgs_handle_target_add(req);
388                 break;
389         case MGMT_TARGET_DEL:
390                 DEBUG_REQ(D_MGS, req, "target del\n");
391                 //rc = mgs_handle_target_del(req);
392                 break;
393
394         case LDLM_ENQUEUE:
395                 DEBUG_REQ(D_MGS, req, "enqueue");
396                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
397                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
398                                          ldlm_server_blocking_ast, NULL);
399                 fail = OBD_FAIL_LDLM_REPLY;
400                 break;
401         case LDLM_BL_CALLBACK:
402         case LDLM_CP_CALLBACK:
403                 DEBUG_REQ(D_MGS, req, "callback");
404                 CERROR("callbacks should not happen on MGS\n");
405                 LBUG();
406                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
407                 break;
408
409         case OBD_PING:
410                 DEBUG_REQ(D_INFO, req, "ping");
411                 rc = target_handle_ping(req);
412                 break;
413
414         case OBD_LOG_CANCEL:
415                 DEBUG_REQ(D_MGS, req, "log cancel\n");
416                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
417                 rc = -ENOTSUPP; /* la la la */
418                 break;
419
420         case LLOG_ORIGIN_HANDLE_CREATE:
421                 DEBUG_REQ(D_MGS, req, "llog_init");
422                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
423                 rc = llog_origin_handle_create(req);
424                 break;
425         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
426                 DEBUG_REQ(D_MGS, req, "llog next block");
427                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
428                 rc = llog_origin_handle_next_block(req);
429                 break;
430         case LLOG_ORIGIN_HANDLE_READ_HEADER:
431                 DEBUG_REQ(D_MGS, req, "llog read header");
432                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
433                 rc = llog_origin_handle_read_header(req);
434                 break;
435         case LLOG_ORIGIN_HANDLE_CLOSE:
436                 DEBUG_REQ(D_MGS, req, "llog close");
437                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
438                 rc = llog_origin_handle_close(req);
439                 break;
440         case LLOG_CATINFO:
441                 DEBUG_REQ(D_MGS, req, "llog catinfo");
442                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
443                 rc = llog_catinfo(req);
444                 break;
445         default:
446                 req->rq_status = -ENOTSUPP;
447                 rc = ptlrpc_error(req);
448                 RETURN(rc);
449         }
450
451         LASSERT(current->journal_info == NULL);
452         
453         CDEBUG(D_CONFIG | (rc?D_ERROR:0), "MGS handle cmd=%d rc=%d\n",
454                req->rq_reqmsg->opc, rc);
455
456  out:
457         target_send_reply(req, rc, fail);
458         RETURN(0);
459 }
460
461 static inline int mgs_destroy_export(struct obd_export *exp)
462 {
463         ENTRY;
464
465         target_destroy_export(exp);
466
467         RETURN(0);
468 }
469
470
471 /* use obd ops to offer management infrastructure */
472 static struct obd_ops mgs_obd_ops = {
473         .o_owner           = THIS_MODULE,
474         .o_connect         = mgs_connect,
475         .o_disconnect      = mgs_disconnect,
476         .o_setup           = mgs_setup,
477         .o_precleanup      = mgs_precleanup,
478         .o_cleanup         = mgs_cleanup,
479         .o_destroy_export  = mgs_destroy_export,
480         .o_iocontrol       = mgs_iocontrol,
481 };
482
483 static int __init mgs_init(void)
484 {
485         struct lprocfs_static_vars lvars;
486
487         lprocfs_init_vars(mgs, &lvars);
488         class_register_type(&mgs_obd_ops, lvars.module_vars, LUSTRE_MGS_NAME);
489
490         return 0;
491 }
492
493 static void /*__exit*/ mgs_exit(void)
494 {
495         class_unregister_type(LUSTRE_MGS_NAME);
496 }
497
498 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
499 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
500 MODULE_LICENSE("GPL");
501
502 module_init(mgs_init);
503 module_exit(mgs_exit);