Whamcloud - gitweb
af67705ca3e358dd37368c353f521038d2bf13c2
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgs/mgs_handler.c
5  *  Lustre Management Server (mgs) request handler
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MGS
30 #define D_MGS D_CONFIG/*|D_WARNING*/
31
32 #ifdef __KERNEL__
33 # include <linux/module.h>
34 # include <linux/pagemap.h>
35 # include <linux/miscdevice.h>
36 # include <linux/init.h>
37 #else
38 # include <liblustre.h>
39 #endif
40
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lprocfs_status.h>
44 #include <lustre_fsfilt.h>
45 #include <lustre_commit_confd.h>
46 #include <lustre_disk.h>
47 #include "mgs_internal.h"
48
49
50 /* Establish a connection to the MGS.*/
51 static int mgs_connect(const struct lu_env *env,
52                        struct lustre_handle *conn, struct obd_device *obd,
53                        struct obd_uuid *cluuid, struct obd_connect_data *data,
54                        void *localdata)
55 {
56         struct obd_export *exp;
57         int rc;
58         ENTRY;
59
60         if (!conn || !obd || !cluuid)
61                 RETURN(-EINVAL);
62
63         rc = class_connect(conn, obd, cluuid);
64         if (rc)
65                 RETURN(rc);
66         exp = class_conn2export(conn);
67         LASSERT(exp);
68
69         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
70
71         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
72
73         if (data != NULL) {
74                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
75                 exp->exp_connect_flags = data->ocd_connect_flags;
76                 data->ocd_version = LUSTRE_VERSION_CODE;
77         }
78
79         if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
80                 CWARN("MGS requires FID support, but client not\n");
81                 rc = -EBADE;
82         }
83
84         if (rc) {
85                 class_disconnect(exp);
86         } else {
87                 class_export_put(exp);
88         }
89
90         RETURN(rc);
91 }
92
93 static int mgs_disconnect(struct obd_export *exp)
94 {
95         int rc;
96         ENTRY;
97
98         LASSERT(exp);
99
100         class_export_get(exp);
101         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
102
103         /* Disconnect early so that clients can't keep using export */
104         rc = class_disconnect(exp);
105         ldlm_cancel_locks_for_export(exp);
106
107         /* complete all outstanding replies */
108         spin_lock(&exp->exp_lock);
109         while (!list_empty(&exp->exp_outstanding_replies)) {
110                 struct ptlrpc_reply_state *rs =
111                         list_entry(exp->exp_outstanding_replies.next,
112                                    struct ptlrpc_reply_state, rs_exp_list);
113                 struct ptlrpc_service *svc = rs->rs_service;
114
115                 spin_lock(&svc->srv_lock);
116                 list_del_init(&rs->rs_exp_list);
117                 ptlrpc_schedule_difficult_reply(rs);
118                 spin_unlock(&svc->srv_lock);
119         }
120         spin_unlock(&exp->exp_lock);
121
122         class_export_put(exp);
123         RETURN(rc);
124 }
125
126 static int mgs_cleanup(struct obd_device *obd);
127 static int mgs_handle(struct ptlrpc_request *req);
128
129 static int mgs_llog_init(struct obd_device *obd, int group,
130                          struct obd_device *tgt, int count,
131                          struct llog_catid *logid, struct obd_uuid *uuid)
132 {
133         struct obd_llog_group *olg = &obd->obd_olg;
134         int rc;
135         ENTRY;
136
137         LASSERT(group == OBD_LLOG_GROUP);
138         LASSERT(olg->olg_group == group);
139
140         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
141                         &llog_lvfs_ops);
142         RETURN(rc);
143 }
144
145 static int mgs_llog_finish(struct obd_device *obd, int count)
146 {
147         struct llog_ctxt *ctxt;
148         int rc = 0;
149         ENTRY;
150
151         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
152         if (ctxt)
153                 rc = llog_cleanup(ctxt);
154
155         RETURN(rc);
156 }
157
158 /* Start the MGS obd */
159 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
160 {
161         struct lprocfs_static_vars lvars;
162         struct mgs_obd *mgs = &obd->u.mgs;
163         struct lustre_mount_info *lmi;
164         struct lustre_sb_info *lsi;
165         struct vfsmount *mnt;
166         int rc = 0;
167         ENTRY;
168
169         CDEBUG(D_CONFIG, "Starting MGS\n");
170
171         /* Find our disk */
172         lmi = server_get_mount(obd->obd_name);
173         if (!lmi)
174                 RETURN(rc = -EINVAL);
175
176         mnt = lmi->lmi_mnt;
177         lsi = s2lsi(lmi->lmi_sb);
178         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
179         if (IS_ERR(obd->obd_fsops))
180                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
181
182         /* namespace for mgs llog */
183         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
184                                                 LDLM_NAMESPACE_MODEST);
185         if (obd->obd_namespace == NULL)
186                 GOTO(err_ops, rc = -ENOMEM);
187
188         /* ldlm setup */
189         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
190                            "mgs_ldlm_client", &obd->obd_ldlm_client);
191
192         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
193
194         rc = mgs_fs_setup(obd, mnt);
195         if (rc) {
196                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
197                        obd->obd_name, rc);
198                 GOTO(err_ns, rc);
199         }
200
201         rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
202         if (rc)
203                 GOTO(err_fs, rc);
204
205         /* No recovery for MGC's */
206         obd->obd_replayable = 0;
207
208         /* Internal mgs setup */
209         mgs_init_fsdb_list(obd);
210         sema_init(&mgs->mgs_sem, 1);
211
212         /* Start the service threads */
213         mgs->mgs_service =
214                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
215                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
216                                 MGC_REPLY_PORTAL, MGS_SERVICE_WATCHDOG_TIMEOUT,
217                                 mgs_handle, LUSTRE_MGS_NAME,
218                                 obd->obd_proc_entry, NULL,
219                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
220                                 "ll_mgs", LCT_MD_THREAD);
221
222         if (!mgs->mgs_service) {
223                 CERROR("failed to start service\n");
224                 GOTO(err_llog, rc = -ENOMEM);
225         }
226
227         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
228         if (rc)
229                 GOTO(err_thread, rc);
230
231         /* Setup proc */
232         lprocfs_mgs_init_vars(&lvars);
233         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
234                 lproc_mgs_setup(obd);
235         }
236
237         ping_evictor_start();
238
239         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
240
241         RETURN(0);
242
243 err_thread:
244         ptlrpc_unregister_service(mgs->mgs_service);
245 err_llog:
246         obd_llog_finish(obd, 0);
247 err_fs:
248         /* No extra cleanup needed for llog_init_commit_thread() */
249         mgs_fs_cleanup(obd);
250 err_ns:
251         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
252         obd->obd_namespace = NULL;
253 err_ops:
254         fsfilt_put_ops(obd->obd_fsops);
255 err_put:
256         server_put_mount(obd->obd_name, mnt);
257         mgs->mgs_sb = 0;
258         return rc;
259 }
260
261 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
262 {
263         int rc = 0;
264         ENTRY;
265
266         switch (stage) {
267         case OBD_CLEANUP_EARLY:
268         case OBD_CLEANUP_EXPORTS:
269                 break;
270         case OBD_CLEANUP_SELF_EXP:
271                 rc = obd_llog_finish(obd, 0);
272                 break;
273         case OBD_CLEANUP_OBD:
274                 break;
275         }
276         RETURN(rc);
277 }
278
279 /**
280  * Performs cleanup procedures for passed \a obd given it is mgs obd.
281  */
282 static int mgs_cleanup(struct obd_device *obd)
283 {
284         struct mgs_obd *mgs = &obd->u.mgs;
285         ENTRY;
286
287         if (mgs->mgs_sb == NULL)
288                 RETURN(0);
289
290         ping_evictor_stop();
291
292         ptlrpc_unregister_service(mgs->mgs_service);
293
294         mgs_cleanup_fsdb_list(obd);
295         lproc_mgs_cleanup(obd);
296         mgs_fs_cleanup(obd);
297
298         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
299         mgs->mgs_sb = NULL;
300
301         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
302         obd->obd_namespace = NULL;
303
304         fsfilt_put_ops(obd->obd_fsops);
305
306         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
307         RETURN(0);
308 }
309
310 /* similar to filter_prepare_destroy */
311 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
312                             struct lustre_handle *lockh)
313 {
314         struct ldlm_res_id res_id;
315         int rc, flags = 0;
316         ENTRY;
317
318         rc = mgc_fsname2resid(fsname, &res_id);
319         if (!rc)
320                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
321                                             LDLM_PLAIN, NULL, LCK_EX,
322                                             &flags, ldlm_blocking_ast,
323                                             ldlm_completion_ast, NULL,
324                                             fsname, 0, NULL, lockh);
325         if (rc)
326                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
327
328         RETURN(rc);
329 }
330
331 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
332 {
333         ENTRY;
334         ldlm_lock_decref(lockh, LCK_EX);
335         RETURN(0);
336 }
337
338 /* rc=0 means ok
339       1 means update
340      <0 means error */
341 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
342 {
343         int rc;
344         ENTRY;
345
346         rc = mgs_check_index(obd, mti);
347         if (rc == 0) {
348                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
349                                    "this MGS does not know about it. Assuming"
350                                    " writeconf.\n", mti->mti_svname);
351                 mti->mti_flags |= LDD_F_WRITECONF;
352                 rc = 1;
353         } else if (rc == -1) {
354                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
355                                    "disappeared! Regenerating all logs.\n",
356                                    mti->mti_fsname);
357                 mti->mti_flags |= LDD_F_WRITECONF;
358                 rc = 1;
359         } else {
360                 /* Index is correctly marked as used */
361
362                 /* If the logs don't contain the mti_nids then add
363                    them as failover nids */
364                 rc = mgs_check_failnid(obd, mti);
365         }
366
367         RETURN(rc);
368 }
369
370 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
371 static int mgs_handle_target_reg(struct ptlrpc_request *req)
372 {
373         struct obd_device *obd = req->rq_export->exp_obd;
374         struct lustre_handle lockh;
375         struct mgs_target_info *mti, *rep_mti;
376         int rc = 0, lockrc;
377         ENTRY;
378
379         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
380
381         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
382         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
383                                 LDD_F_UPDATE))) {
384                 /* We're just here as a startup ping. */
385                 CDEBUG(D_MGS, "Server %s is running on %s\n",
386                        mti->mti_svname, obd_export_nid2str(req->rq_export));
387                 rc = mgs_check_target(obd, mti);
388                 /* above will set appropriate mti flags */
389                 if (rc <= 0)
390                         /* Nothing wrong, or fatal error */
391                         GOTO(out_nolock, rc);
392         }
393
394         /* Revoke the config lock to make sure nobody is reading. */
395         /* Although actually I think it should be alright if
396            someone was reading while we were updating the logs - if we
397            revoke at the end they will just update from where they left off. */
398         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
399         if (lockrc != ELDLM_OK) {
400                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
401                                    "update their configuration (%d). Updating "
402                                    "local logs anyhow; you might have to "
403                                    "manually restart other nodes to get the "
404                                    "latest configuration.\n",
405                                    obd->obd_name, lockrc);
406         }
407
408         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_TARGET_REG, 10);
409
410         /* Log writing contention is handled by the fsdb_sem */
411
412         if (mti->mti_flags & LDD_F_WRITECONF) {
413                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
414                     mti->mti_stripe_index == 0) {
415                         rc = mgs_erase_logs(obd, mti->mti_fsname);
416                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
417                                       "request.  All servers must be restarted "
418                                       "in order to regenerate the logs."
419                                       "\n", obd->obd_name, mti->mti_fsname);
420                 } else if (mti->mti_flags &
421                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
422                         rc = mgs_erase_log(obd, mti->mti_svname);
423                         LCONSOLE_WARN("%s: Regenerating %s log by user "
424                                       "request.\n",
425                                       obd->obd_name, mti->mti_svname);
426                 }
427                 mti->mti_flags |= LDD_F_UPDATE;
428                 /* Erased logs means start from scratch. */
429                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
430         }
431
432         /* COMPAT_146 */
433         if (mti->mti_flags & LDD_F_UPGRADE14) {
434                 rc = mgs_upgrade_sv_14(obd, mti);
435                 if (rc) {
436                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
437                         GOTO(out, rc);
438                 }
439                 
440                 /* We're good to go */
441                 mti->mti_flags |= LDD_F_UPDATE;
442         }
443         /* end COMPAT_146 */
444
445         if (mti->mti_flags & LDD_F_UPDATE) {
446                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
447                        mti->mti_stripe_index);
448
449                 /* create or update the target log
450                    and update the client/mdt logs */
451                 rc = mgs_write_log_target(obd, mti);
452                 if (rc) {
453                         CERROR("Failed to write %s log (%d)\n",
454                                mti->mti_svname, rc);
455                         GOTO(out, rc);
456                 }
457
458                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
459                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
460                                     LDD_F_UPGRADE14);
461                 mti->mti_flags |= LDD_F_REWRITE_LDD;
462         }
463
464 out:
465         /* done with log update */
466         if (lockrc == ELDLM_OK)
467                 mgs_put_cfg_lock(&lockh);
468 out_nolock:
469         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
470                mti->mti_stripe_index, rc);
471         rc = req_capsule_server_pack(&req->rq_pill);
472         if (rc)
473                 RETURN(rc);
474
475         /* send back the whole mti in the reply */
476         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
477         *rep_mti = *mti;
478
479         /* Flush logs to disk */
480         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
481         RETURN(rc);
482 }
483
484 static int mgs_set_info_rpc(struct ptlrpc_request *req)
485 {
486         struct obd_device *obd = req->rq_export->exp_obd;
487         struct mgs_send_param *msp, *rep_msp;
488         struct lustre_handle lockh;
489         int lockrc, rc;
490         struct lustre_cfg_bufs bufs;
491         struct lustre_cfg *lcfg;
492         char fsname[MTI_NAME_MAXLEN];
493         ENTRY;
494
495         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
496         LASSERT(msp);
497
498         /* Construct lustre_cfg structure to pass to function mgs_setparam */
499         lustre_cfg_bufs_reset(&bufs, NULL);
500         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
501         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
502         rc = mgs_setparam(obd, lcfg, fsname);
503         if (rc) {
504                 CERROR("Error %d in setting the parameter %s for fs %s\n",
505                        rc, msp->mgs_param, fsname);
506                 RETURN(rc);
507         }
508
509         /* Revoke lock so everyone updates.  Should be alright if
510          * someone was already reading while we were updating the logs,
511          * so we don't really need to hold the lock while we're
512          * writing.
513          */
514         if (fsname[0]) {
515                 lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
516                 if (lockrc != ELDLM_OK)
517                         CERROR("lock error %d for fs %s\n", lockrc,
518                                fsname);
519                 else
520                         mgs_put_cfg_lock(&lockh);
521         }
522         lustre_cfg_free(lcfg);
523
524         rc = req_capsule_server_pack(&req->rq_pill);
525         if (rc == 0) {
526                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
527                 rep_msp = msp;
528         }
529         RETURN(rc);
530 }
531
532 /* Called whenever a target cleans up. */
533 /* XXX - Currently unused */
534 static int mgs_handle_target_del(struct ptlrpc_request *req)
535 {
536         ENTRY;
537         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
538         RETURN(0);
539 }
540
541 /* XXX - Currently unused */
542 static int mgs_handle_exception(struct ptlrpc_request *req)
543 {
544         ENTRY;
545         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
546         RETURN(0);
547 }
548
549 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
550 int mgs_handle(struct ptlrpc_request *req)
551 {
552         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
553         int opc, rc = 0;
554         ENTRY;
555
556         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
557         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_REQUEST_NET, 2);
558
559         LASSERT(current->journal_info == NULL);
560         opc = lustre_msg_get_opc(req->rq_reqmsg);
561         if (opc != MGS_CONNECT) {
562                 if (req->rq_export == NULL) {
563                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
564                                opc);
565                         req->rq_status = -ENOTCONN;
566                         GOTO(out, rc = -ENOTCONN);
567                 }
568         }
569
570         switch (opc) {
571         case MGS_CONNECT:
572                 DEBUG_REQ(D_MGS, req, "connect");
573                 /* MGS and MDS have same request format for connect */
574                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
575                 rc = target_handle_connect(req);
576                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
577                         /* Make clients trying to reconnect after a MGS restart
578                            happy; also requires obd_replayable */
579                         lustre_msg_add_op_flags(req->rq_repmsg,
580                                                 MSG_CONNECT_RECONNECT);
581                 break;
582         case MGS_DISCONNECT:
583                 DEBUG_REQ(D_MGS, req, "disconnect");
584                 /* MGS and MDS have same request format for disconnect */
585                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
586                 rc = target_handle_disconnect(req);
587                 req->rq_status = rc;            /* superfluous? */
588                 break;
589         case MGS_EXCEPTION:
590                 DEBUG_REQ(D_MGS, req, "exception");
591                 rc = mgs_handle_exception(req);
592                 break;
593         case MGS_TARGET_REG:
594                 DEBUG_REQ(D_MGS, req, "target add");
595                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
596                 rc = mgs_handle_target_reg(req);
597                 break;
598         case MGS_TARGET_DEL:
599                 DEBUG_REQ(D_MGS, req, "target del");
600                 rc = mgs_handle_target_del(req);
601                 break;
602         case MGS_SET_INFO:
603                 DEBUG_REQ(D_MGS, req, "set_info");
604                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
605                 rc = mgs_set_info_rpc(req);
606                 break;
607
608         case LDLM_ENQUEUE:
609                 DEBUG_REQ(D_MGS, req, "enqueue");
610                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
611                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
612                                          ldlm_server_blocking_ast, NULL);
613                 break;
614         case LDLM_BL_CALLBACK:
615         case LDLM_CP_CALLBACK:
616                 DEBUG_REQ(D_MGS, req, "callback");
617                 CERROR("callbacks should not happen on MGS\n");
618                 LBUG();
619                 break;
620
621         case OBD_PING:
622                 DEBUG_REQ(D_INFO, req, "ping");
623                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
624                 rc = target_handle_ping(req);
625                 break;
626         case OBD_LOG_CANCEL:
627                 DEBUG_REQ(D_MGS, req, "log cancel");
628                 rc = -ENOTSUPP; /* la la la */
629                 break;
630
631         case LLOG_ORIGIN_HANDLE_CREATE:
632                 DEBUG_REQ(D_MGS, req, "llog_init");
633                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
634                 rc = llog_origin_handle_create(req);
635                 break;
636         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
637                 DEBUG_REQ(D_MGS, req, "llog next block");
638                 req_capsule_set(&req->rq_pill,
639                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
640                 rc = llog_origin_handle_next_block(req);
641                 break;
642         case LLOG_ORIGIN_HANDLE_READ_HEADER:
643                 DEBUG_REQ(D_MGS, req, "llog read header");
644                 req_capsule_set(&req->rq_pill,
645                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
646                 rc = llog_origin_handle_read_header(req);
647                 break;
648         case LLOG_ORIGIN_HANDLE_CLOSE:
649                 DEBUG_REQ(D_MGS, req, "llog close");
650                 rc = llog_origin_handle_close(req);
651                 break;
652         case LLOG_CATINFO:
653                 DEBUG_REQ(D_MGS, req, "llog catinfo");
654                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
655                 rc = llog_catinfo(req);
656                 break;
657         default:
658                 req->rq_status = -ENOTSUPP;
659                 rc = ptlrpc_error(req);
660                 RETURN(rc);
661         }
662
663         LASSERT(current->journal_info == NULL);
664
665         if (rc)
666                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
667
668 out:
669         target_send_reply(req, rc, fail);
670         RETURN(0);
671 }
672
673 static inline int mgs_destroy_export(struct obd_export *exp)
674 {
675         ENTRY;
676
677         target_destroy_export(exp);
678
679         RETURN(0);
680 }
681
682 /* from mdt_iocontrol */
683 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
684                   void *karg, void *uarg)
685 {
686         struct obd_device *obd = exp->exp_obd;
687         struct obd_ioctl_data *data = karg;
688         struct lvfs_run_ctxt saved;
689         int rc = 0;
690
691         ENTRY;
692         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
693
694         switch (cmd) {
695
696         case OBD_IOC_PARAM: {
697                 struct lustre_handle lockh;
698                 struct lustre_cfg *lcfg;
699                 struct llog_rec_hdr rec;
700                 char fsname[MTI_NAME_MAXLEN];
701                 int lockrc;
702
703                 rec.lrh_len = llog_data_len(data->ioc_plen1);
704
705                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
706                         rec.lrh_type = OBD_CFG_REC;
707                 } else {
708                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
709                         RETURN(-EINVAL);
710                 }
711
712                 OBD_ALLOC(lcfg, data->ioc_plen1);
713                 if (lcfg == NULL)
714                         RETURN(-ENOMEM);
715                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
716                 if (rc)
717                         GOTO(out_free, rc);
718
719                 if (lcfg->lcfg_bufcount < 1)
720                         GOTO(out_free, rc = -EINVAL);
721
722                 rc = mgs_setparam(obd, lcfg, fsname);
723                 if (rc) {
724                         CERROR("setparam err %d\n", rc);
725                         GOTO(out_free, rc);
726                 }
727
728                 /* Revoke lock so everyone updates.  Should be alright if
729                    someone was already reading while we were updating the logs,
730                    so we don't really need to hold the lock while we're
731                    writing (above). */
732                 if (fsname[0]) {
733                         lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
734                         if (lockrc != ELDLM_OK)
735                                 CERROR("lock error %d for fs %s\n", lockrc,
736                                        fsname);
737                         else
738                                 mgs_put_cfg_lock(&lockh);
739                 }
740
741 out_free:
742                 OBD_FREE(lcfg, data->ioc_plen1);
743                 RETURN(rc);
744         }
745
746         case OBD_IOC_DUMP_LOG: {
747                 struct llog_ctxt *ctxt;
748                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
749                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
750                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
751                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
752                 llog_ctxt_put(ctxt);
753
754                 RETURN(rc);
755         }
756
757         case OBD_IOC_LLOG_CHECK:
758         case OBD_IOC_LLOG_INFO:
759         case OBD_IOC_LLOG_PRINT: {
760                 struct llog_ctxt *ctxt;
761                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
762
763                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
764                 rc = llog_ioctl(ctxt, cmd, data);
765                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
766                 llog_ctxt_put(ctxt);
767
768                 RETURN(rc);
769         }
770
771         default:
772                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
773                 RETURN(-EINVAL);
774         }
775         RETURN(0);
776 }
777
778 /* use obd ops to offer management infrastructure */
779 static struct obd_ops mgs_obd_ops = {
780         .o_owner           = THIS_MODULE,
781         .o_connect         = mgs_connect,
782         .o_disconnect      = mgs_disconnect,
783         .o_setup           = mgs_setup,
784         .o_precleanup      = mgs_precleanup,
785         .o_cleanup         = mgs_cleanup,
786         .o_destroy_export  = mgs_destroy_export,
787         .o_iocontrol       = mgs_iocontrol,
788         .o_llog_init       = mgs_llog_init,
789         .o_llog_finish     = mgs_llog_finish
790 };
791
792 static int __init mgs_init(void)
793 {
794         struct lprocfs_static_vars lvars;
795
796         lprocfs_mgs_init_vars(&lvars);
797         class_register_type(&mgs_obd_ops, NULL,
798                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
799
800         return 0;
801 }
802
803 static void /*__exit*/ mgs_exit(void)
804 {
805         class_unregister_type(LUSTRE_MGS_NAME);
806 }
807
808 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
809 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
810 MODULE_LICENSE("GPL");
811
812 module_init(mgs_init);
813 module_exit(mgs_exit);