Whamcloud - gitweb
b=11089
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgs/mgs_handler.c
5  *  Lustre Management Server (mgs) request handler
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MGS
30 #define D_MGS D_CONFIG/*|D_WARNING*/
31
32 #ifdef __KERNEL__
33 # include <linux/module.h>
34 # include <linux/pagemap.h>
35 # include <linux/miscdevice.h>
36 # include <linux/init.h>
37 #else
38 # include <liblustre.h>
39 #endif
40
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lprocfs_status.h>
44 #include <lustre_fsfilt.h>
45 #include <lustre_commit_confd.h>
46 #include <lustre_disk.h>
47 #include "mgs_internal.h"
48
49
50 /* Establish a connection to the MGS.*/
51 static int mgs_connect(const struct lu_env *env,
52                        struct lustre_handle *conn, struct obd_device *obd,
53                        struct obd_uuid *cluuid, struct obd_connect_data *data,
54                        void *localdata)
55 {
56         struct obd_export *exp;
57         int rc;
58         ENTRY;
59
60         if (!conn || !obd || !cluuid)
61                 RETURN(-EINVAL);
62
63         rc = class_connect(conn, obd, cluuid);
64         if (rc)
65                 RETURN(rc);
66         exp = class_conn2export(conn);
67         LASSERT(exp);
68
69         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
70
71         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
72
73         if (data != NULL) {
74                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
75                 exp->exp_connect_flags = data->ocd_connect_flags;
76                 data->ocd_version = LUSTRE_VERSION_CODE;
77         }
78
79         if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
80                 CWARN("MGS requires FID support, but client not\n");
81                 rc = -EBADE;
82         }
83
84         if (rc) {
85                 class_disconnect(exp);
86         } else {
87                 class_export_put(exp);
88         }
89
90         RETURN(rc);
91 }
92
93 static int mgs_disconnect(struct obd_export *exp)
94 {
95         int rc;
96         ENTRY;
97
98         LASSERT(exp);
99
100         class_export_get(exp);
101         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
102
103         /* Disconnect early so that clients can't keep using export */
104         rc = class_disconnect(exp);
105         ldlm_cancel_locks_for_export(exp);
106
107         /* complete all outstanding replies */
108         spin_lock(&exp->exp_lock);
109         while (!list_empty(&exp->exp_outstanding_replies)) {
110                 struct ptlrpc_reply_state *rs =
111                         list_entry(exp->exp_outstanding_replies.next,
112                                    struct ptlrpc_reply_state, rs_exp_list);
113                 struct ptlrpc_service *svc = rs->rs_service;
114
115                 spin_lock(&svc->srv_lock);
116                 list_del_init(&rs->rs_exp_list);
117                 ptlrpc_schedule_difficult_reply(rs);
118                 spin_unlock(&svc->srv_lock);
119         }
120         spin_unlock(&exp->exp_lock);
121
122         class_export_put(exp);
123         RETURN(rc);
124 }
125
126 static int mgs_cleanup(struct obd_device *obd);
127 static int mgs_handle(struct ptlrpc_request *req);
128
129 static int mgs_llog_init(struct obd_device *obd, int group,
130                          struct obd_device *tgt, int count,
131                          struct llog_catid *logid, struct obd_uuid *uuid)
132 {
133         struct obd_llog_group *olg = &obd->obd_olg;
134         int rc;
135         ENTRY;
136
137         LASSERT(group == OBD_LLOG_GROUP);
138         LASSERT(olg->olg_group == group);
139
140         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
141                         &llog_lvfs_ops);
142         RETURN(rc);
143 }
144
145 static int mgs_llog_finish(struct obd_device *obd, int count)
146 {
147         struct llog_ctxt *ctxt;
148         int rc = 0;
149         ENTRY;
150
151         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
152         if (ctxt)
153                 rc = llog_cleanup(ctxt);
154
155         RETURN(rc);
156 }
157
158 /* Start the MGS obd */
159 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
160 {
161         struct lprocfs_static_vars lvars;
162         struct mgs_obd *mgs = &obd->u.mgs;
163         struct lustre_mount_info *lmi;
164         struct lustre_sb_info *lsi;
165         struct vfsmount *mnt;
166         int rc = 0;
167         ENTRY;
168
169         CDEBUG(D_CONFIG, "Starting MGS\n");
170
171         /* Find our disk */
172         lmi = server_get_mount(obd->obd_name);
173         if (!lmi)
174                 RETURN(rc = -EINVAL);
175
176         mnt = lmi->lmi_mnt;
177         lsi = s2lsi(lmi->lmi_sb);
178         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
179         if (IS_ERR(obd->obd_fsops))
180                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
181
182         /* namespace for mgs llog */
183         obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER,
184                                                 LDLM_NAMESPACE_MODEST);
185         if (obd->obd_namespace == NULL)
186                 GOTO(err_ops, rc = -ENOMEM);
187
188         /* ldlm setup */
189         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
190                            "mgs_ldlm_client", &obd->obd_ldlm_client);
191
192         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
193
194         rc = mgs_fs_setup(obd, mnt);
195         if (rc) {
196                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
197                        obd->obd_name, rc);
198                 GOTO(err_ns, rc);
199         }
200
201         rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
202         if (rc)
203                 GOTO(err_fs, rc);
204
205         /* No recovery for MGC's */
206         obd->obd_replayable = 0;
207
208         /* Internal mgs setup */
209         mgs_init_fsdb_list(obd);
210         sema_init(&mgs->mgs_sem, 1);
211
212         /* Start the service threads */
213         mgs->mgs_service =
214                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
215                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
216                                 MGC_REPLY_PORTAL, MGS_SERVICE_WATCHDOG_TIMEOUT,
217                                 mgs_handle, LUSTRE_MGS_NAME,
218                                 obd->obd_proc_entry, NULL,
219                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
220                                 "ll_mgs", LCT_MD_THREAD);
221
222         if (!mgs->mgs_service) {
223                 CERROR("failed to start service\n");
224                 GOTO(err_llog, rc = -ENOMEM);
225         }
226
227         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
228         if (rc)
229                 GOTO(err_thread, rc);
230
231         /* Setup proc */
232         lprocfs_mgs_init_vars(&lvars);
233         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
234                 lproc_mgs_setup(obd);
235         }
236
237         ping_evictor_start();
238
239         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
240
241         RETURN(0);
242
243 err_thread:
244         ptlrpc_unregister_service(mgs->mgs_service);
245 err_llog:
246         obd_llog_finish(obd, 0);
247 err_fs:
248         /* No extra cleanup needed for llog_init_commit_thread() */
249         mgs_fs_cleanup(obd);
250 err_ns:
251         ldlm_namespace_free(obd->obd_namespace, 0);
252         obd->obd_namespace = NULL;
253 err_ops:
254         fsfilt_put_ops(obd->obd_fsops);
255 err_put:
256         server_put_mount(obd->obd_name, mnt);
257         mgs->mgs_sb = 0;
258         return rc;
259 }
260
261 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
262 {
263         int rc = 0;
264         ENTRY;
265
266         switch (stage) {
267         case OBD_CLEANUP_EARLY:
268         case OBD_CLEANUP_EXPORTS:
269                 break;
270         case OBD_CLEANUP_SELF_EXP:
271                 rc = obd_llog_finish(obd, 0);
272                 break;
273         case OBD_CLEANUP_OBD:
274                 break;
275         }
276         RETURN(rc);
277 }
278
279 static int mgs_ldlm_nsfree(void *data)
280 {
281         struct ldlm_namespace *ns = (struct ldlm_namespace *)data;
282         int rc;
283         ENTRY;
284
285         ptlrpc_daemonize("ll_mgs_nsfree");
286         rc = ldlm_namespace_free(ns, 1 /* obd_force should always be on */);
287         RETURN(rc);
288 }
289
290 static int mgs_cleanup(struct obd_device *obd)
291 {
292         struct mgs_obd *mgs = &obd->u.mgs;
293         ENTRY;
294
295         if (mgs->mgs_sb == NULL)
296                 RETURN(0);
297
298         ping_evictor_stop();
299
300         ptlrpc_unregister_service(mgs->mgs_service);
301
302         mgs_cleanup_fsdb_list(obd);
303         lproc_mgs_cleanup(obd);
304         mgs_fs_cleanup(obd);
305
306         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
307         mgs->mgs_sb = NULL;
308
309         /* Free the namespace in it's own thread, so that if the
310            ldlm_cancel_handler put the last mgs obd ref, we won't
311            deadlock here. */
312         cfs_kernel_thread(mgs_ldlm_nsfree, obd->obd_namespace,
313                           CLONE_VM | CLONE_FILES);
314
315
316         fsfilt_put_ops(obd->obd_fsops);
317
318         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
319         RETURN(0);
320 }
321
322 /* similar to filter_prepare_destroy */
323 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
324                             struct lustre_handle *lockh)
325 {
326         struct ldlm_res_id res_id;
327         int rc, flags = 0;
328         ENTRY;
329
330         rc = mgc_fsname2resid(fsname, &res_id);
331         if (!rc)
332                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
333                                             LDLM_PLAIN, NULL, LCK_EX,
334                                             &flags, ldlm_blocking_ast,
335                                             ldlm_completion_ast, NULL,
336                                             fsname, 0, NULL, lockh);
337         if (rc)
338                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
339
340         RETURN(rc);
341 }
342
343 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
344 {
345         ENTRY;
346         ldlm_lock_decref(lockh, LCK_EX);
347         RETURN(0);
348 }
349
350 /* rc=0 means ok
351       1 means update
352      <0 means error */
353 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
354 {
355         int rc;
356         ENTRY;
357
358         rc = mgs_check_index(obd, mti);
359         if (rc == 0) {
360                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
361                                    "this MGS does not know about it. Assuming"
362                                    " writeconf.\n", mti->mti_svname);
363                 mti->mti_flags |= LDD_F_WRITECONF;
364                 rc = 1;
365         } else if (rc == -1) {
366                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
367                                    "disappeared! Regenerating all logs.\n",
368                                    mti->mti_fsname);
369                 mti->mti_flags |= LDD_F_WRITECONF;
370                 rc = 1;
371         } else {
372                 /* Index is correctly marked as used */
373
374                 /* If the logs don't contain the mti_nids then add
375                    them as failover nids */
376                 rc = mgs_check_failnid(obd, mti);
377         }
378
379         RETURN(rc);
380 }
381
382 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
383 static int mgs_handle_target_reg(struct ptlrpc_request *req)
384 {
385         struct obd_device *obd = req->rq_export->exp_obd;
386         struct lustre_handle lockh;
387         struct mgs_target_info *mti, *rep_mti;
388         int rc = 0, lockrc;
389         ENTRY;
390
391         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
392
393         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
394         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
395                                 LDD_F_UPDATE))) {
396                 /* We're just here as a startup ping. */
397                 CDEBUG(D_MGS, "Server %s is running on %s\n",
398                        mti->mti_svname, obd_export_nid2str(req->rq_export));
399                 rc = mgs_check_target(obd, mti);
400                 /* above will set appropriate mti flags */
401                 if (rc <= 0)
402                         /* Nothing wrong, or fatal error */
403                         GOTO(out_nolock, rc);
404         }
405
406         /* Revoke the config lock to make sure nobody is reading. */
407         /* Although actually I think it should be alright if
408            someone was reading while we were updating the logs - if we
409            revoke at the end they will just update from where they left off. */
410         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
411         if (lockrc != ELDLM_OK) {
412                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
413                                    "update their configuration (%d). Updating "
414                                    "local logs anyhow; you might have to "
415                                    "manually restart other nodes to get the "
416                                    "latest configuration.\n",
417                                    obd->obd_name, lockrc);
418         }
419
420         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_TARGET_REG, 10);
421
422         /* Log writing contention is handled by the fsdb_sem */
423
424         if (mti->mti_flags & LDD_F_WRITECONF) {
425                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
426                     mti->mti_stripe_index == 0) {
427                         rc = mgs_erase_logs(obd, mti->mti_fsname);
428                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
429                                       "request.  All servers must be restarted "
430                                       "in order to regenerate the logs."
431                                       "\n", obd->obd_name, mti->mti_fsname);
432                 } else if (mti->mti_flags &
433                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
434                         rc = mgs_erase_log(obd, mti->mti_svname);
435                         LCONSOLE_WARN("%s: Regenerating %s log by user "
436                                       "request.\n",
437                                       obd->obd_name, mti->mti_svname);
438                 }
439                 mti->mti_flags |= LDD_F_UPDATE;
440                 /* Erased logs means start from scratch. */
441                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
442         }
443
444         /* COMPAT_146 */
445         if (mti->mti_flags & LDD_F_UPGRADE14) {
446                 rc = mgs_upgrade_sv_14(obd, mti);
447                 if (rc) {
448                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
449                         GOTO(out, rc);
450                 }
451                 
452                 /* We're good to go */
453                 mti->mti_flags |= LDD_F_UPDATE;
454         }
455         /* end COMPAT_146 */
456
457         if (mti->mti_flags & LDD_F_UPDATE) {
458                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
459                        mti->mti_stripe_index);
460
461                 /* create or update the target log
462                    and update the client/mdt logs */
463                 rc = mgs_write_log_target(obd, mti);
464                 if (rc) {
465                         CERROR("Failed to write %s log (%d)\n",
466                                mti->mti_svname, rc);
467                         GOTO(out, rc);
468                 }
469
470                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
471                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
472                                     LDD_F_UPGRADE14);
473                 mti->mti_flags |= LDD_F_REWRITE_LDD;
474         }
475
476 out:
477         /* done with log update */
478         if (lockrc == ELDLM_OK)
479                 mgs_put_cfg_lock(&lockh);
480 out_nolock:
481         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
482                mti->mti_stripe_index, rc);
483         rc = req_capsule_server_pack(&req->rq_pill);
484         if (rc)
485                 RETURN(rc);
486
487         /* send back the whole mti in the reply */
488         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
489         *rep_mti = *mti;
490
491         /* Flush logs to disk */
492         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
493         RETURN(rc);
494 }
495
496 static int mgs_set_info_rpc(struct ptlrpc_request *req)
497 {
498         struct obd_device *obd = req->rq_export->exp_obd;
499         struct mgs_send_param *msp, *rep_msp;
500         struct lustre_handle lockh;
501         int lockrc, rc;
502         struct lustre_cfg_bufs bufs;
503         struct lustre_cfg *lcfg;
504         char fsname[MTI_NAME_MAXLEN];
505         ENTRY;
506
507         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
508         LASSERT(msp);
509
510         /* Construct lustre_cfg structure to pass to function mgs_setparam */
511         lustre_cfg_bufs_reset(&bufs, NULL);
512         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
513         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
514         rc = mgs_setparam(obd, lcfg, fsname);
515         if (rc) {
516                 CERROR("Error %d in setting the parameter %s for fs %s\n",
517                        rc, msp->mgs_param, fsname);
518                 RETURN(rc);
519         }
520
521         /* Revoke lock so everyone updates.  Should be alright if
522          * someone was already reading while we were updating the logs,
523          * so we don't really need to hold the lock while we're
524          * writing.
525          */
526         if (fsname[0]) {
527                 lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
528                 if (lockrc != ELDLM_OK)
529                         CERROR("lock error %d for fs %s\n", lockrc,
530                                fsname);
531                 else
532                         mgs_put_cfg_lock(&lockh);
533         }
534         lustre_cfg_free(lcfg);
535
536         rc = req_capsule_server_pack(&req->rq_pill);
537         if (rc == 0) {
538                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
539                 rep_msp = msp;
540         }
541         RETURN(rc);
542 }
543
544 /* Called whenever a target cleans up. */
545 /* XXX - Currently unused */
546 static int mgs_handle_target_del(struct ptlrpc_request *req)
547 {
548         ENTRY;
549         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
550         RETURN(0);
551 }
552
553 /* XXX - Currently unused */
554 static int mgs_handle_exception(struct ptlrpc_request *req)
555 {
556         ENTRY;
557         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
558         RETURN(0);
559 }
560
561 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
562 int mgs_handle(struct ptlrpc_request *req)
563 {
564         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
565         int opc, rc = 0;
566         ENTRY;
567
568         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
569         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_REQUEST_NET, 2);
570
571         LASSERT(current->journal_info == NULL);
572         opc = lustre_msg_get_opc(req->rq_reqmsg);
573         if (opc != MGS_CONNECT) {
574                 if (req->rq_export == NULL) {
575                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
576                                opc);
577                         req->rq_status = -ENOTCONN;
578                         GOTO(out, rc = -ENOTCONN);
579                 }
580         }
581
582         switch (opc) {
583         case MGS_CONNECT:
584                 DEBUG_REQ(D_MGS, req, "connect");
585                 /* MGS and MDS have same request format for connect */
586                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
587                 rc = target_handle_connect(req);
588                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
589                         /* Make clients trying to reconnect after a MGS restart
590                            happy; also requires obd_replayable */
591                         lustre_msg_add_op_flags(req->rq_repmsg,
592                                                 MSG_CONNECT_RECONNECT);
593                 break;
594         case MGS_DISCONNECT:
595                 DEBUG_REQ(D_MGS, req, "disconnect");
596                 /* MGS and MDS have same request format for disconnect */
597                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
598                 rc = target_handle_disconnect(req);
599                 req->rq_status = rc;            /* superfluous? */
600                 break;
601         case MGS_EXCEPTION:
602                 DEBUG_REQ(D_MGS, req, "exception");
603                 rc = mgs_handle_exception(req);
604                 break;
605         case MGS_TARGET_REG:
606                 DEBUG_REQ(D_MGS, req, "target add");
607                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
608                 rc = mgs_handle_target_reg(req);
609                 break;
610         case MGS_TARGET_DEL:
611                 DEBUG_REQ(D_MGS, req, "target del");
612                 rc = mgs_handle_target_del(req);
613                 break;
614         case MGS_SET_INFO:
615                 DEBUG_REQ(D_MGS, req, "set_info");
616                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
617                 rc = mgs_set_info_rpc(req);
618                 break;
619
620         case LDLM_ENQUEUE:
621                 DEBUG_REQ(D_MGS, req, "enqueue");
622                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
623                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
624                                          ldlm_server_blocking_ast, NULL);
625                 break;
626         case LDLM_BL_CALLBACK:
627         case LDLM_CP_CALLBACK:
628                 DEBUG_REQ(D_MGS, req, "callback");
629                 CERROR("callbacks should not happen on MGS\n");
630                 LBUG();
631                 break;
632
633         case OBD_PING:
634                 DEBUG_REQ(D_INFO, req, "ping");
635                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
636                 rc = target_handle_ping(req);
637                 break;
638         case OBD_LOG_CANCEL:
639                 DEBUG_REQ(D_MGS, req, "log cancel");
640                 rc = -ENOTSUPP; /* la la la */
641                 break;
642
643         case LLOG_ORIGIN_HANDLE_CREATE:
644                 DEBUG_REQ(D_MGS, req, "llog_init");
645                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
646                 rc = llog_origin_handle_create(req);
647                 break;
648         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
649                 DEBUG_REQ(D_MGS, req, "llog next block");
650                 req_capsule_set(&req->rq_pill,
651                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
652                 rc = llog_origin_handle_next_block(req);
653                 break;
654         case LLOG_ORIGIN_HANDLE_READ_HEADER:
655                 DEBUG_REQ(D_MGS, req, "llog read header");
656                 req_capsule_set(&req->rq_pill,
657                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
658                 rc = llog_origin_handle_read_header(req);
659                 break;
660         case LLOG_ORIGIN_HANDLE_CLOSE:
661                 DEBUG_REQ(D_MGS, req, "llog close");
662                 rc = llog_origin_handle_close(req);
663                 break;
664         case LLOG_CATINFO:
665                 DEBUG_REQ(D_MGS, req, "llog catinfo");
666                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
667                 rc = llog_catinfo(req);
668                 break;
669         default:
670                 req->rq_status = -ENOTSUPP;
671                 rc = ptlrpc_error(req);
672                 RETURN(rc);
673         }
674
675         LASSERT(current->journal_info == NULL);
676
677         if (rc)
678                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
679
680 out:
681         target_send_reply(req, rc, fail);
682         RETURN(0);
683 }
684
685 static inline int mgs_destroy_export(struct obd_export *exp)
686 {
687         ENTRY;
688
689         target_destroy_export(exp);
690
691         RETURN(0);
692 }
693
694 /* from mdt_iocontrol */
695 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
696                   void *karg, void *uarg)
697 {
698         struct obd_device *obd = exp->exp_obd;
699         struct obd_ioctl_data *data = karg;
700         struct lvfs_run_ctxt saved;
701         int rc = 0;
702
703         ENTRY;
704         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
705
706         switch (cmd) {
707
708         case OBD_IOC_PARAM: {
709                 struct lustre_handle lockh;
710                 struct lustre_cfg *lcfg;
711                 struct llog_rec_hdr rec;
712                 char fsname[MTI_NAME_MAXLEN];
713                 int lockrc;
714
715                 rec.lrh_len = llog_data_len(data->ioc_plen1);
716
717                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
718                         rec.lrh_type = OBD_CFG_REC;
719                 } else {
720                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
721                         RETURN(-EINVAL);
722                 }
723
724                 OBD_ALLOC(lcfg, data->ioc_plen1);
725                 if (lcfg == NULL)
726                         RETURN(-ENOMEM);
727                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
728                 if (rc)
729                         GOTO(out_free, rc);
730
731                 if (lcfg->lcfg_bufcount < 1)
732                         GOTO(out_free, rc = -EINVAL);
733
734                 rc = mgs_setparam(obd, lcfg, fsname);
735                 if (rc) {
736                         CERROR("setparam err %d\n", rc);
737                         GOTO(out_free, rc);
738                 }
739
740                 /* Revoke lock so everyone updates.  Should be alright if
741                    someone was already reading while we were updating the logs,
742                    so we don't really need to hold the lock while we're
743                    writing (above). */
744                 if (fsname[0]) {
745                         lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
746                         if (lockrc != ELDLM_OK)
747                                 CERROR("lock error %d for fs %s\n", lockrc,
748                                        fsname);
749                         else
750                                 mgs_put_cfg_lock(&lockh);
751                 }
752
753 out_free:
754                 OBD_FREE(lcfg, data->ioc_plen1);
755                 RETURN(rc);
756         }
757
758         case OBD_IOC_DUMP_LOG: {
759                 struct llog_ctxt *ctxt;
760                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
761                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
762                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
763                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
764                 llog_ctxt_put(ctxt);
765
766                 RETURN(rc);
767         }
768
769         case OBD_IOC_LLOG_CHECK:
770         case OBD_IOC_LLOG_INFO:
771         case OBD_IOC_LLOG_PRINT: {
772                 struct llog_ctxt *ctxt;
773                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
774
775                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
776                 rc = llog_ioctl(ctxt, cmd, data);
777                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
778                 llog_ctxt_put(ctxt);
779
780                 RETURN(rc);
781         }
782
783         default:
784                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
785                 RETURN(-EINVAL);
786         }
787         RETURN(0);
788 }
789
790 /* use obd ops to offer management infrastructure */
791 static struct obd_ops mgs_obd_ops = {
792         .o_owner           = THIS_MODULE,
793         .o_connect         = mgs_connect,
794         .o_disconnect      = mgs_disconnect,
795         .o_setup           = mgs_setup,
796         .o_precleanup      = mgs_precleanup,
797         .o_cleanup         = mgs_cleanup,
798         .o_destroy_export  = mgs_destroy_export,
799         .o_iocontrol       = mgs_iocontrol,
800         .o_llog_init       = mgs_llog_init,
801         .o_llog_finish     = mgs_llog_finish
802 };
803
804 static int __init mgs_init(void)
805 {
806         struct lprocfs_static_vars lvars;
807
808         lprocfs_mgs_init_vars(&lvars);
809         class_register_type(&mgs_obd_ops, NULL,
810                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
811
812         return 0;
813 }
814
815 static void /*__exit*/ mgs_exit(void)
816 {
817         class_unregister_type(LUSTRE_MGS_NAME);
818 }
819
820 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
821 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
822 MODULE_LICENSE("GPL");
823
824 module_init(mgs_init);
825 module_exit(mgs_exit);