Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_commit_confd.h>
61 #include <lustre_disk.h>
62 #include "mgs_internal.h"
63
64
65 /* Establish a connection to the MGS.*/
66 static int mgs_connect(const struct lu_env *env,
67                        struct lustre_handle *conn, struct obd_device *obd,
68                        struct obd_uuid *cluuid, struct obd_connect_data *data,
69                        void *localdata)
70 {
71         struct obd_export *exp;
72         int rc;
73         ENTRY;
74
75         if (!conn || !obd || !cluuid)
76                 RETURN(-EINVAL);
77
78         rc = class_connect(conn, obd, cluuid);
79         if (rc)
80                 RETURN(rc);
81         exp = class_conn2export(conn);
82         LASSERT(exp);
83
84         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
85
86         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
87
88         if (data != NULL) {
89                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
90                 exp->exp_connect_flags = data->ocd_connect_flags;
91                 data->ocd_version = LUSTRE_VERSION_CODE;
92         }
93
94         if (rc) {
95                 class_disconnect(exp);
96         } else {
97                 class_export_put(exp);
98         }
99
100         RETURN(rc);
101 }
102
103 static int mgs_disconnect(struct obd_export *exp)
104 {
105         int rc;
106         ENTRY;
107
108         LASSERT(exp);
109
110         class_export_get(exp);
111         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
112
113         /* Disconnect early so that clients can't keep using export */
114         rc = class_disconnect(exp);
115         ldlm_cancel_locks_for_export(exp);
116
117         /* complete all outstanding replies */
118         spin_lock(&exp->exp_lock);
119         while (!list_empty(&exp->exp_outstanding_replies)) {
120                 struct ptlrpc_reply_state *rs =
121                         list_entry(exp->exp_outstanding_replies.next,
122                                    struct ptlrpc_reply_state, rs_exp_list);
123                 struct ptlrpc_service *svc = rs->rs_service;
124
125                 spin_lock(&svc->srv_lock);
126                 list_del_init(&rs->rs_exp_list);
127                 ptlrpc_schedule_difficult_reply(rs);
128                 spin_unlock(&svc->srv_lock);
129         }
130         spin_unlock(&exp->exp_lock);
131
132         class_export_put(exp);
133         RETURN(rc);
134 }
135
136 static int mgs_cleanup(struct obd_device *obd);
137 static int mgs_handle(struct ptlrpc_request *req);
138
139 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
140                          struct obd_device *tgt, int count,
141                          struct llog_catid *logid, struct obd_uuid *uuid)
142 {
143         int rc;
144         ENTRY;
145
146         LASSERT(olg == &obd->obd_olg);
147         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
148                         &llog_lvfs_ops);
149         RETURN(rc);
150 }
151
152 static int mgs_llog_finish(struct obd_device *obd, int count)
153 {
154         struct llog_ctxt *ctxt;
155         int rc = 0;
156         ENTRY;
157
158         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
159         if (ctxt)
160                 rc = llog_cleanup(ctxt);
161
162         RETURN(rc);
163 }
164
165 /* Start the MGS obd */
166 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
167 {
168         struct lprocfs_static_vars lvars;
169         struct mgs_obd *mgs = &obd->u.mgs;
170         struct lustre_mount_info *lmi;
171         struct lustre_sb_info *lsi;
172         struct vfsmount *mnt;
173         int rc = 0;
174         ENTRY;
175
176         CDEBUG(D_CONFIG, "Starting MGS\n");
177
178         /* Find our disk */
179         lmi = server_get_mount(obd->obd_name);
180         if (!lmi)
181                 RETURN(rc = -EINVAL);
182
183         mnt = lmi->lmi_mnt;
184         lsi = s2lsi(lmi->lmi_sb);
185         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
186         if (IS_ERR(obd->obd_fsops))
187                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
188
189         /* namespace for mgs llog */
190         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
191                                                 LDLM_NAMESPACE_MODEST);
192         if (obd->obd_namespace == NULL)
193                 GOTO(err_ops, rc = -ENOMEM);
194
195         /* ldlm setup */
196         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
197                            "mgs_ldlm_client", &obd->obd_ldlm_client);
198
199         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
200
201         rc = mgs_fs_setup(obd, mnt);
202         if (rc) {
203                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
204                        obd->obd_name, rc);
205                 GOTO(err_ns, rc);
206         }
207
208         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
209         if (rc)
210                 GOTO(err_fs, rc);
211
212         /* No recovery for MGC's */
213         obd->obd_replayable = 0;
214
215         /* Internal mgs setup */
216         mgs_init_fsdb_list(obd);
217         sema_init(&mgs->mgs_sem, 1);
218
219         /* Start the service threads */
220         mgs->mgs_service =
221                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
222                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
223                                 MGC_REPLY_PORTAL, 2000,
224                                 mgs_handle, LUSTRE_MGS_NAME,
225                                 obd->obd_proc_entry, target_print_req,
226                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
227                                 "ll_mgs", LCT_MD_THREAD);
228
229         if (!mgs->mgs_service) {
230                 CERROR("failed to start service\n");
231                 GOTO(err_llog, rc = -ENOMEM);
232         }
233
234         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
235         if (rc)
236                 GOTO(err_thread, rc);
237
238         /* Setup proc */
239         lprocfs_mgs_init_vars(&lvars);
240         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
241                 lproc_mgs_setup(obd);
242         }
243
244         ping_evictor_start();
245
246         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
247
248         RETURN(0);
249
250 err_thread:
251         ptlrpc_unregister_service(mgs->mgs_service);
252 err_llog:
253         obd_llog_finish(obd, 0);
254 err_fs:
255         /* No extra cleanup needed for llog_init_commit_thread() */
256         mgs_fs_cleanup(obd);
257 err_ns:
258         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
259         obd->obd_namespace = NULL;
260 err_ops:
261         fsfilt_put_ops(obd->obd_fsops);
262 err_put:
263         server_put_mount(obd->obd_name, mnt);
264         mgs->mgs_sb = 0;
265         return rc;
266 }
267
268 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
269 {
270         int rc = 0;
271         ENTRY;
272
273         switch (stage) {
274         case OBD_CLEANUP_EARLY:
275         case OBD_CLEANUP_EXPORTS:
276                 rc = obd_llog_finish(obd, 0);
277                 break;
278         }
279         RETURN(rc);
280 }
281
282 /**
283  * Performs cleanup procedures for passed \a obd given it is mgs obd.
284  */
285 static int mgs_cleanup(struct obd_device *obd)
286 {
287         struct mgs_obd *mgs = &obd->u.mgs;
288         ENTRY;
289
290         if (mgs->mgs_sb == NULL)
291                 RETURN(0);
292
293         ping_evictor_stop();
294
295         ptlrpc_unregister_service(mgs->mgs_service);
296
297         mgs_cleanup_fsdb_list(obd);
298         lproc_mgs_cleanup(obd);
299         mgs_fs_cleanup(obd);
300
301         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
302         mgs->mgs_sb = NULL;
303
304         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
305         obd->obd_namespace = NULL;
306
307         fsfilt_put_ops(obd->obd_fsops);
308
309         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
310         RETURN(0);
311 }
312
313 /* similar to filter_prepare_destroy */
314 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
315                             struct lustre_handle *lockh)
316 {
317         struct ldlm_res_id res_id;
318         int rc, flags = 0;
319         ENTRY;
320
321         rc = mgc_fsname2resid(fsname, &res_id);
322         if (!rc)
323                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
324                                             LDLM_PLAIN, NULL, LCK_EX,
325                                             &flags, ldlm_blocking_ast,
326                                             ldlm_completion_ast, NULL,
327                                             fsname, 0, NULL, lockh);
328         if (rc)
329                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
330
331         RETURN(rc);
332 }
333
334 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
335 {
336         ENTRY;
337         ldlm_lock_decref(lockh, LCK_EX);
338         RETURN(0);
339 }
340
341 /* rc=0 means ok
342       1 means update
343      <0 means error */
344 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
345 {
346         int rc;
347         ENTRY;
348
349         rc = mgs_check_index(obd, mti);
350         if (rc == 0) {
351                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
352                                    "this MGS does not know about it. Assuming"
353                                    " writeconf.\n", mti->mti_svname);
354                 mti->mti_flags |= LDD_F_WRITECONF;
355                 rc = 1;
356         } else if (rc == -1) {
357                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
358                                    "disappeared! Regenerating all logs.\n",
359                                    mti->mti_fsname);
360                 mti->mti_flags |= LDD_F_WRITECONF;
361                 rc = 1;
362         } else {
363                 /* Index is correctly marked as used */
364
365                 /* If the logs don't contain the mti_nids then add
366                    them as failover nids */
367                 rc = mgs_check_failnid(obd, mti);
368         }
369
370         RETURN(rc);
371 }
372
373 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
374 static int mgs_handle_target_reg(struct ptlrpc_request *req)
375 {
376         struct obd_device *obd = req->rq_export->exp_obd;
377         struct lustre_handle lockh;
378         struct mgs_target_info *mti, *rep_mti;
379         int rc = 0, lockrc;
380         ENTRY;
381
382         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
383
384         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
385         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
386                                 LDD_F_UPDATE))) {
387                 /* We're just here as a startup ping. */
388                 CDEBUG(D_MGS, "Server %s is running on %s\n",
389                        mti->mti_svname, obd_export_nid2str(req->rq_export));
390                 rc = mgs_check_target(obd, mti);
391                 /* above will set appropriate mti flags */
392                 if (rc <= 0)
393                         /* Nothing wrong, or fatal error */
394                         GOTO(out_nolock, rc);
395         }
396
397         /* Revoke the config lock to make sure nobody is reading. */
398         /* Although actually I think it should be alright if
399            someone was reading while we were updating the logs - if we
400            revoke at the end they will just update from where they left off. */
401         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
402         if (lockrc != ELDLM_OK) {
403                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
404                                    "update their configuration (%d). Updating "
405                                    "local logs anyhow; you might have to "
406                                    "manually restart other nodes to get the "
407                                    "latest configuration.\n",
408                                    obd->obd_name, lockrc);
409         }
410
411         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
412
413         /* Log writing contention is handled by the fsdb_sem */
414
415         if (mti->mti_flags & LDD_F_WRITECONF) {
416                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
417                     mti->mti_stripe_index == 0) {
418                         rc = mgs_erase_logs(obd, mti->mti_fsname);
419                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
420                                       "request.  All servers must be restarted "
421                                       "in order to regenerate the logs."
422                                       "\n", obd->obd_name, mti->mti_fsname);
423                 } else if (mti->mti_flags &
424                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
425                         rc = mgs_erase_log(obd, mti->mti_svname);
426                         LCONSOLE_WARN("%s: Regenerating %s log by user "
427                                       "request.\n",
428                                       obd->obd_name, mti->mti_svname);
429                 }
430                 mti->mti_flags |= LDD_F_UPDATE;
431                 /* Erased logs means start from scratch. */
432                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
433         }
434
435         /* COMPAT_146 */
436         if (mti->mti_flags & LDD_F_UPGRADE14) {
437                 rc = mgs_upgrade_sv_14(obd, mti);
438                 if (rc) {
439                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
440                         GOTO(out, rc);
441                 }
442                 
443                 /* We're good to go */
444                 mti->mti_flags |= LDD_F_UPDATE;
445         }
446         /* end COMPAT_146 */
447
448         if (mti->mti_flags & LDD_F_UPDATE) {
449                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
450                        mti->mti_stripe_index);
451
452                 /* create or update the target log
453                    and update the client/mdt logs */
454                 rc = mgs_write_log_target(obd, mti);
455                 if (rc) {
456                         CERROR("Failed to write %s log (%d)\n",
457                                mti->mti_svname, rc);
458                         GOTO(out, rc);
459                 }
460
461                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
462                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
463                                     LDD_F_UPGRADE14);
464                 mti->mti_flags |= LDD_F_REWRITE_LDD;
465         }
466
467 out:
468         /* done with log update */
469         if (lockrc == ELDLM_OK)
470                 mgs_put_cfg_lock(&lockh);
471 out_nolock:
472         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
473                mti->mti_stripe_index, rc);
474         rc = req_capsule_server_pack(&req->rq_pill);
475         if (rc)
476                 RETURN(rc);
477
478         /* send back the whole mti in the reply */
479         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
480         *rep_mti = *mti;
481
482         /* Flush logs to disk */
483         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
484         RETURN(rc);
485 }
486
487 static int mgs_set_info_rpc(struct ptlrpc_request *req)
488 {
489         struct obd_device *obd = req->rq_export->exp_obd;
490         struct mgs_send_param *msp, *rep_msp;
491         struct lustre_handle lockh;
492         int lockrc, rc;
493         struct lustre_cfg_bufs bufs;
494         struct lustre_cfg *lcfg;
495         char fsname[MTI_NAME_MAXLEN];
496         ENTRY;
497
498         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
499         LASSERT(msp);
500
501         /* Construct lustre_cfg structure to pass to function mgs_setparam */
502         lustre_cfg_bufs_reset(&bufs, NULL);
503         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
504         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
505         rc = mgs_setparam(obd, lcfg, fsname);
506         if (rc) {
507                 CERROR("Error %d in setting the parameter %s for fs %s\n",
508                        rc, msp->mgs_param, fsname);
509                 RETURN(rc);
510         }
511
512         /* Revoke lock so everyone updates.  Should be alright if
513          * someone was already reading while we were updating the logs,
514          * so we don't really need to hold the lock while we're
515          * writing.
516          */
517         if (fsname[0]) {
518                 lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
519                 if (lockrc != ELDLM_OK)
520                         CERROR("lock error %d for fs %s\n", lockrc,
521                                fsname);
522                 else
523                         mgs_put_cfg_lock(&lockh);
524         }
525         lustre_cfg_free(lcfg);
526
527         rc = req_capsule_server_pack(&req->rq_pill);
528         if (rc == 0) {
529                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
530                 rep_msp = msp;
531         }
532         RETURN(rc);
533 }
534
535 /* Called whenever a target cleans up. */
536 /* XXX - Currently unused */
537 static int mgs_handle_target_del(struct ptlrpc_request *req)
538 {
539         ENTRY;
540         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
541         RETURN(0);
542 }
543
544 /* XXX - Currently unused */
545 static int mgs_handle_exception(struct ptlrpc_request *req)
546 {
547         ENTRY;
548         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
549         RETURN(0);
550 }
551
552 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
553 int mgs_handle(struct ptlrpc_request *req)
554 {
555         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
556         int opc, rc = 0;
557         ENTRY;
558
559         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
560         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
561         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
562                 RETURN(0);
563
564         LASSERT(current->journal_info == NULL);
565         opc = lustre_msg_get_opc(req->rq_reqmsg);
566         if (opc != MGS_CONNECT) {
567                 if (req->rq_export == NULL) {
568                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
569                                opc);
570                         req->rq_status = -ENOTCONN;
571                         GOTO(out, rc = -ENOTCONN);
572                 }
573         }
574
575         switch (opc) {
576         case MGS_CONNECT:
577                 DEBUG_REQ(D_MGS, req, "connect");
578                 /* MGS and MDS have same request format for connect */
579                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
580                 rc = target_handle_connect(req);
581                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
582                         /* Make clients trying to reconnect after a MGS restart
583                            happy; also requires obd_replayable */
584                         lustre_msg_add_op_flags(req->rq_repmsg,
585                                                 MSG_CONNECT_RECONNECT);
586                 break;
587         case MGS_DISCONNECT:
588                 DEBUG_REQ(D_MGS, req, "disconnect");
589                 /* MGS and MDS have same request format for disconnect */
590                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
591                 rc = target_handle_disconnect(req);
592                 req->rq_status = rc;            /* superfluous? */
593                 break;
594         case MGS_EXCEPTION:
595                 DEBUG_REQ(D_MGS, req, "exception");
596                 rc = mgs_handle_exception(req);
597                 break;
598         case MGS_TARGET_REG:
599                 DEBUG_REQ(D_MGS, req, "target add");
600                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
601                 rc = mgs_handle_target_reg(req);
602                 break;
603         case MGS_TARGET_DEL:
604                 DEBUG_REQ(D_MGS, req, "target del");
605                 rc = mgs_handle_target_del(req);
606                 break;
607         case MGS_SET_INFO:
608                 DEBUG_REQ(D_MGS, req, "set_info");
609                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
610                 rc = mgs_set_info_rpc(req);
611                 break;
612
613         case LDLM_ENQUEUE:
614                 DEBUG_REQ(D_MGS, req, "enqueue");
615                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
616                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
617                                          ldlm_server_blocking_ast, NULL);
618                 break;
619         case LDLM_BL_CALLBACK:
620         case LDLM_CP_CALLBACK:
621                 DEBUG_REQ(D_MGS, req, "callback");
622                 CERROR("callbacks should not happen on MGS\n");
623                 LBUG();
624                 break;
625
626         case OBD_PING:
627                 DEBUG_REQ(D_INFO, req, "ping");
628                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
629                 rc = target_handle_ping(req);
630                 break;
631         case OBD_LOG_CANCEL:
632                 DEBUG_REQ(D_MGS, req, "log cancel");
633                 rc = -ENOTSUPP; /* la la la */
634                 break;
635
636         case LLOG_ORIGIN_HANDLE_CREATE:
637                 DEBUG_REQ(D_MGS, req, "llog_init");
638                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
639                 rc = llog_origin_handle_create(req);
640                 break;
641         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
642                 DEBUG_REQ(D_MGS, req, "llog next block");
643                 req_capsule_set(&req->rq_pill,
644                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
645                 rc = llog_origin_handle_next_block(req);
646                 break;
647         case LLOG_ORIGIN_HANDLE_READ_HEADER:
648                 DEBUG_REQ(D_MGS, req, "llog read header");
649                 req_capsule_set(&req->rq_pill,
650                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
651                 rc = llog_origin_handle_read_header(req);
652                 break;
653         case LLOG_ORIGIN_HANDLE_CLOSE:
654                 DEBUG_REQ(D_MGS, req, "llog close");
655                 rc = llog_origin_handle_close(req);
656                 break;
657         case LLOG_CATINFO:
658                 DEBUG_REQ(D_MGS, req, "llog catinfo");
659                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
660                 rc = llog_catinfo(req);
661                 break;
662         default:
663                 req->rq_status = -ENOTSUPP;
664                 rc = ptlrpc_error(req);
665                 RETURN(rc);
666         }
667
668         LASSERT(current->journal_info == NULL);
669
670         if (rc)
671                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
672
673 out:
674         target_send_reply(req, rc, fail);
675         RETURN(0);
676 }
677
678 static inline int mgs_destroy_export(struct obd_export *exp)
679 {
680         ENTRY;
681
682         target_destroy_export(exp);
683
684         RETURN(0);
685 }
686
687 /* from mdt_iocontrol */
688 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
689                   void *karg, void *uarg)
690 {
691         struct obd_device *obd = exp->exp_obd;
692         struct obd_ioctl_data *data = karg;
693         struct lvfs_run_ctxt saved;
694         int rc = 0;
695
696         ENTRY;
697         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
698
699         switch (cmd) {
700
701         case OBD_IOC_PARAM: {
702                 struct lustre_handle lockh;
703                 struct lustre_cfg *lcfg;
704                 struct llog_rec_hdr rec;
705                 char fsname[MTI_NAME_MAXLEN];
706                 int lockrc;
707
708                 rec.lrh_len = llog_data_len(data->ioc_plen1);
709
710                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
711                         rec.lrh_type = OBD_CFG_REC;
712                 } else {
713                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
714                         RETURN(-EINVAL);
715                 }
716
717                 OBD_ALLOC(lcfg, data->ioc_plen1);
718                 if (lcfg == NULL)
719                         RETURN(-ENOMEM);
720                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
721                 if (rc)
722                         GOTO(out_free, rc);
723
724                 if (lcfg->lcfg_bufcount < 1)
725                         GOTO(out_free, rc = -EINVAL);
726
727                 rc = mgs_setparam(obd, lcfg, fsname);
728                 if (rc) {
729                         CERROR("setparam err %d\n", rc);
730                         GOTO(out_free, rc);
731                 }
732
733                 /* Revoke lock so everyone updates.  Should be alright if
734                    someone was already reading while we were updating the logs,
735                    so we don't really need to hold the lock while we're
736                    writing (above). */
737                 if (fsname[0]) {
738                         lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
739                         if (lockrc != ELDLM_OK)
740                                 CERROR("lock error %d for fs %s\n", lockrc,
741                                        fsname);
742                         else
743                                 mgs_put_cfg_lock(&lockh);
744                 }
745
746 out_free:
747                 OBD_FREE(lcfg, data->ioc_plen1);
748                 RETURN(rc);
749         }
750
751         case OBD_IOC_DUMP_LOG: {
752                 struct llog_ctxt *ctxt;
753                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
754                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
755                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
756                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
757                 llog_ctxt_put(ctxt);
758
759                 RETURN(rc);
760         }
761
762         case OBD_IOC_LLOG_CHECK:
763         case OBD_IOC_LLOG_INFO:
764         case OBD_IOC_LLOG_PRINT: {
765                 struct llog_ctxt *ctxt;
766                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
767
768                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
769                 rc = llog_ioctl(ctxt, cmd, data);
770                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
771                 llog_ctxt_put(ctxt);
772
773                 RETURN(rc);
774         }
775
776         default:
777                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
778                 RETURN(-EINVAL);
779         }
780         RETURN(0);
781 }
782
783 /* use obd ops to offer management infrastructure */
784 static struct obd_ops mgs_obd_ops = {
785         .o_owner           = THIS_MODULE,
786         .o_connect         = mgs_connect,
787         .o_disconnect      = mgs_disconnect,
788         .o_setup           = mgs_setup,
789         .o_precleanup      = mgs_precleanup,
790         .o_cleanup         = mgs_cleanup,
791         .o_destroy_export  = mgs_destroy_export,
792         .o_iocontrol       = mgs_iocontrol,
793         .o_llog_init       = mgs_llog_init,
794         .o_llog_finish     = mgs_llog_finish
795 };
796
797 static int __init mgs_init(void)
798 {
799         struct lprocfs_static_vars lvars;
800
801         lprocfs_mgs_init_vars(&lvars);
802         class_register_type(&mgs_obd_ops, NULL,
803                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
804
805         return 0;
806 }
807
808 static void /*__exit*/ mgs_exit(void)
809 {
810         class_unregister_type(LUSTRE_MGS_NAME);
811 }
812
813 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
814 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
815 MODULE_LICENSE("GPL");
816
817 module_init(mgs_init);
818 module_exit(mgs_exit);