Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgs/mgs_handler.c
5  *  Lustre Management Server (mgs) request handler
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MGS
30 #define D_MGS D_CONFIG/*|D_WARNING*/
31
32 #ifdef __KERNEL__
33 # include <linux/module.h>
34 # include <linux/pagemap.h>
35 # include <linux/miscdevice.h>
36 # include <linux/init.h>
37 #else
38 # include <liblustre.h>
39 #endif
40
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lprocfs_status.h>
44 #include <lustre_fsfilt.h>
45 #include <lustre_commit_confd.h>
46 #include <lustre_disk.h>
47 #include "mgs_internal.h"
48
49
50 /* Establish a connection to the MGS.*/
51 static int mgs_connect(struct lustre_handle *conn, struct obd_device *obd,
52                        struct obd_uuid *cluuid, struct obd_connect_data *data,
53                        void *localdata)
54 {
55         struct obd_export *exp;
56         int rc;
57         ENTRY;
58
59         if (!conn || !obd || !cluuid)
60                 RETURN(-EINVAL);
61
62         rc = class_connect(conn, obd, cluuid);
63         if (rc)
64                 RETURN(rc);
65         exp = class_conn2export(conn);
66         LASSERT(exp);
67
68         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
69
70         if (data != NULL) {
71                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
72                 exp->exp_connect_flags = data->ocd_connect_flags;
73                 data->ocd_version = LUSTRE_VERSION_CODE;
74         }
75
76         rc = mgs_client_add(obd, exp);
77
78         if (rc) {
79                 class_disconnect(exp);
80         } else {
81                 class_export_put(exp);
82         }
83
84         RETURN(rc);
85 }
86
87 static int mgs_disconnect(struct obd_export *exp)
88 {
89         int rc;
90         ENTRY;
91
92         LASSERT(exp);
93
94         class_export_get(exp);
95         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
96
97         /* Disconnect early so that clients can't keep using export */
98         rc = class_disconnect(exp);
99         ldlm_cancel_locks_for_export(exp);
100
101         /* complete all outstanding replies */
102         spin_lock(&exp->exp_lock);
103         while (!list_empty(&exp->exp_outstanding_replies)) {
104                 struct ptlrpc_reply_state *rs =
105                         list_entry(exp->exp_outstanding_replies.next,
106                                    struct ptlrpc_reply_state, rs_exp_list);
107                 struct ptlrpc_service *svc = rs->rs_service;
108
109                 spin_lock(&svc->srv_lock);
110                 list_del_init(&rs->rs_exp_list);
111                 ptlrpc_schedule_difficult_reply(rs);
112                 spin_unlock(&svc->srv_lock);
113         }
114         spin_unlock(&exp->exp_lock);
115
116         class_export_put(exp);
117         RETURN(rc);
118 }
119
120 static int mgs_cleanup(struct obd_device *obd);
121 static int mgs_handle(struct ptlrpc_request *req);
122
123 /* Start the MGS obd */
124 static int mgs_setup(struct obd_device *obd, obd_count len, void *buf)
125 {
126         struct lprocfs_static_vars lvars;
127         struct mgs_obd *mgs = &obd->u.mgs;
128         struct lustre_mount_info *lmi;
129         struct lustre_sb_info *lsi;
130         struct vfsmount *mnt;
131         int rc = 0;
132         ENTRY;
133
134         CDEBUG(D_CONFIG, "Starting MGS\n");
135
136         /* Find our disk */
137         lmi = server_get_mount(obd->obd_name);
138         if (!lmi) 
139                 RETURN(rc = -EINVAL);
140
141         mnt = lmi->lmi_mnt;
142         lsi = s2lsi(lmi->lmi_sb);
143         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
144         if (IS_ERR(obd->obd_fsops))
145                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
146
147         /* namespace for mgs llog */
148         obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER, 
149                                                 LDLM_NAMESPACE_MODEST);
150         if (obd->obd_namespace == NULL)
151                 GOTO(err_ops, rc = -ENOMEM);
152
153         /* ldlm setup */
154         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
155                            "mgs_ldlm_client", &obd->obd_ldlm_client);
156
157         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
158
159         rc = mgs_fs_setup(obd, mnt);
160         if (rc) {
161                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
162                        obd->obd_name, rc);
163                 GOTO(err_ns, rc);
164         }
165
166         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
167                         &llog_lvfs_ops);
168         if (rc)
169                 GOTO(err_fs, rc);
170
171         /* No recovery for MGC's */
172         obd->obd_replayable = 0;
173
174         /* Internal mgs setup */
175         mgs_init_fsdb_list(obd);
176         sema_init(&mgs->mgs_sem, 1);
177
178         /* Start the service threads */
179         mgs->mgs_service =
180                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
181                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
182                                 MGC_REPLY_PORTAL, 2000,
183                                 mgs_handle, LUSTRE_MGS_NAME,
184                                 obd->obd_proc_entry, NULL,
185                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
186                                 "ll_mgs");
187
188         if (!mgs->mgs_service) {
189                 CERROR("failed to start service\n");
190                 GOTO(err_fs, rc = -ENOMEM);
191         }
192
193         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
194         if (rc)
195                 GOTO(err_thread, rc);
196
197         /* Setup proc */
198         lprocfs_mgs_init_vars(&lvars);
199         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
200                 lproc_mgs_setup(obd);
201         }
202
203         ping_evictor_start();
204
205         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
206
207         RETURN(0);
208
209 err_thread:
210         ptlrpc_unregister_service(mgs->mgs_service);
211 err_fs:
212         /* No extra cleanup needed for llog_init_commit_thread() */
213         mgs_fs_cleanup(obd);
214 err_ns:
215         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
216         obd->obd_namespace = NULL;
217 err_ops:
218         fsfilt_put_ops(obd->obd_fsops);
219 err_put:
220         server_put_mount(obd->obd_name, mnt);
221         mgs->mgs_sb = 0;
222         return rc;
223 }
224
225 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
226 {
227         int rc = 0;
228         ENTRY;
229
230         switch (stage) {
231         case OBD_CLEANUP_EARLY:
232         case OBD_CLEANUP_EXPORTS:
233                 break;
234         case OBD_CLEANUP_SELF_EXP:
235                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
236                 rc = obd_llog_finish(obd, 0);
237                 break;
238         case OBD_CLEANUP_OBD:
239                 break;
240         }
241         RETURN(rc);
242 }
243
244 static int mgs_ldlm_nsfree(void *data)
245 {
246         struct ldlm_namespace *ns = (struct ldlm_namespace *)data;
247         ENTRY;
248
249         ptlrpc_daemonize("ll_mgs_nsfree");
250         ldlm_namespace_free(ns, NULL, 1 /* obd_force should always be on */);
251         RETURN(0);
252 }
253
254 static int mgs_cleanup(struct obd_device *obd)
255 {
256         struct mgs_obd *mgs = &obd->u.mgs;
257         ENTRY;
258
259         if (mgs->mgs_sb == NULL)
260                 RETURN(0);
261         
262         ping_evictor_stop();
263
264         ptlrpc_unregister_service(mgs->mgs_service);
265
266         mgs_cleanup_fsdb_list(obd);
267         lproc_mgs_cleanup(obd);
268         mgs_fs_cleanup(obd);
269
270         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
271         mgs->mgs_sb = NULL;
272
273         /* Free the namespace in it's own thread, so that if the 
274            ldlm_cancel_handler put the last mgs obd ref, we won't 
275            deadlock here. */
276         cfs_kernel_thread(mgs_ldlm_nsfree, obd->obd_namespace, 
277                           CLONE_VM | CLONE_FILES);
278
279         fsfilt_put_ops(obd->obd_fsops);
280
281         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
282         RETURN(0);
283 }
284
285 /* similar to filter_prepare_destroy */
286 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
287                             struct lustre_handle *lockh)
288 {
289         struct ldlm_res_id res_id;
290         int rc, flags = 0;
291         ENTRY;
292
293         rc = mgc_fsname2resid(fsname, &res_id);
294         if (!rc) 
295                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
296                                             LDLM_PLAIN, NULL, LCK_EX,
297                                             &flags, ldlm_blocking_ast,
298                                             ldlm_completion_ast, NULL,
299                                             fsname, 0, NULL, lockh);
300         if (rc) 
301                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
302         
303         RETURN(rc);
304 }
305
306 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
307 {
308         ENTRY;
309         ldlm_lock_decref(lockh, LCK_EX);
310         RETURN(0);
311 }
312
313 /* rc=0 means ok
314       1 means update
315      <0 means error */
316 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
317 {
318         int rc;
319         ENTRY;
320
321         rc = mgs_check_index(obd, mti);
322         if (rc == 0) {
323                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
324                                   "this MGS does not know about it.  Assuming "
325                                   "writeconf.\n", mti->mti_svname);
326                 mti->mti_flags |= LDD_F_WRITECONF;
327                 rc = 1;
328         } else if (rc == -1) {
329                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
330                                    "disappeared! Regenerating all logs.\n",
331                                    mti->mti_fsname);
332                 mti->mti_flags |= LDD_F_WRITECONF;
333                 rc = 1;
334         } else {
335                 /* Index is correctly marked as used */
336
337                 /* If the logs don't contain the mti_nids then add 
338                    them as failover nids */
339                 rc = mgs_check_failnid(obd, mti);
340         }
341
342         RETURN(rc);
343 }
344
345 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
346 static int mgs_handle_target_reg(struct ptlrpc_request *req)
347 {    
348         struct obd_device *obd = req->rq_export->exp_obd;
349         struct lustre_handle lockh;
350         struct mgs_target_info *mti, *rep_mti;
351         int rep_size[] = { sizeof(struct ptlrpc_body), sizeof(*mti) };
352         int rc = 0, lockrc;
353         ENTRY;
354
355         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
356
357         mti = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*mti),
358                                  lustre_swab_mgs_target_info);
359         
360         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
361                                 LDD_F_UPDATE))) {
362                 /* We're just here as a startup ping. */
363                 CDEBUG(D_MGS, "Server %s is running on %s\n",
364                        mti->mti_svname, obd_export_nid2str(req->rq_export));
365                 rc = mgs_check_target(obd, mti);
366                 /* above will set appropriate mti flags */
367                 if (rc <= 0) 
368                         /* Nothing wrong, or fatal error */
369                         GOTO(out_nolock, rc);
370         }
371
372         /* Revoke the config lock to make sure nobody is reading. */
373         /* Although actually I think it should be alright if
374            someone was reading while we were updating the logs - if we 
375            revoke at the end they will just update from where they left off. */
376         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
377         if (lockrc != ELDLM_OK) {
378                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
379                                    "update their configuration (%d). Updating "
380                                    "local logs anyhow; you might have to "
381                                    "manually restart other nodes to get the "
382                                    "latest configuration.\n",
383                                    obd->obd_name, lockrc);
384         }
385
386         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
387
388         /* Log writing contention is handled by the fsdb_sem */
389
390         if (mti->mti_flags & LDD_F_WRITECONF) {
391                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
392                         rc = mgs_erase_logs(obd, mti->mti_fsname);
393                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
394                                       "request.  All servers must be restarted "
395                                       "in order to regenerate the logs."
396                                       "\n", obd->obd_name, mti->mti_fsname);
397                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
398                         rc = mgs_erase_log(obd, mti->mti_svname);
399                         LCONSOLE_WARN("%s: Regenerating %s log by user "
400                                       "request.\n",
401                                       obd->obd_name, mti->mti_svname);
402                 }
403                 mti->mti_flags |= LDD_F_UPDATE;
404                 /* Erased logs means start from scratch. */
405                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
406         }
407
408         /* COMPAT_146 */
409         if (mti->mti_flags & LDD_F_UPGRADE14) {
410                 rc = mgs_upgrade_sv_14(obd, mti);
411                 if (rc) {
412                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
413                         GOTO(out, rc);
414                 }
415                 
416                 /* We're good to go */
417                 mti->mti_flags |= LDD_F_UPDATE;
418         }
419         /* end COMPAT_146 */
420
421         if (mti->mti_flags & LDD_F_UPDATE) {
422                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname, 
423                        mti->mti_stripe_index);
424                 
425                 /* create or update the target log 
426                    and update the client/mdt logs */
427                 rc = mgs_write_log_target(obd, mti);
428                 if (rc) {
429                         CERROR("Failed to write %s log (%d)\n", 
430                                mti->mti_svname, rc);
431                         GOTO(out, rc);
432                 }
433
434                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE | 
435                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
436                                     LDD_F_UPGRADE14);
437                 mti->mti_flags |= LDD_F_REWRITE_LDD;
438         }
439
440 out:
441         /* done with log update */
442         if (lockrc == ELDLM_OK)
443                 mgs_put_cfg_lock(&lockh);
444 out_nolock:
445         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname, 
446                mti->mti_stripe_index, rc);
447         lustre_pack_reply(req, 2, rep_size, NULL); 
448         /* send back the whole mti in the reply */
449         rep_mti = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
450                                  sizeof(*rep_mti));
451         memcpy(rep_mti, mti, sizeof(*rep_mti));
452
453         /* Flush logs to disk */
454         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
455         RETURN(rc);
456 }
457
458 static int mgs_set_info_rpc(struct ptlrpc_request *req)
459 {
460         struct obd_device *obd = req->rq_export->exp_obd;
461         struct mgs_send_param *msp, *rep_msp;
462         struct lustre_handle lockh;
463         int rep_size[] = { sizeof(struct ptlrpc_body), sizeof(*msp) };
464         int lockrc, rc;
465         struct lustre_cfg_bufs bufs;
466         struct lustre_cfg *lcfg;
467         char fsname[MTI_NAME_MAXLEN];
468         ENTRY;
469
470         msp = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*msp), NULL);
471
472         /* Construct lustre_cfg structure to pass to function mgs_setparam */
473         lustre_cfg_bufs_reset(&bufs, NULL);
474         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
475         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
476         rc = mgs_setparam(obd, lcfg, fsname);
477         if (rc) {
478                 CERROR("Error %d in setting the parameter %s for fs %s\n",
479                        rc, msp->mgs_param, fsname);
480                 RETURN(rc);
481         }
482
483         /* Revoke lock so everyone updates.  Should be alright if
484          * someone was already reading while we were updating the logs,
485          * so we don't really need to hold the lock while we're
486          * writing.
487          */
488         if (fsname[0]) {
489                 lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
490                 if (lockrc != ELDLM_OK)
491                         CERROR("lock error %d for fs %s\n", lockrc,
492                                fsname);
493                 else
494                         mgs_put_cfg_lock(&lockh);
495         }
496         lustre_cfg_free(lcfg);
497
498         lustre_pack_reply(req, 2, rep_size, NULL);
499         rep_msp = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
500                                  sizeof(*rep_msp));
501         memcpy(rep_msp, msp, sizeof(*rep_msp));
502
503         RETURN(rc);
504 }
505
506 /* Called whenever a target cleans up. */
507 /* XXX - Currently unused */
508 static int mgs_handle_target_del(struct ptlrpc_request *req)
509 {
510         ENTRY;
511         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
512         RETURN(0);
513 }
514
515 /* XXX - Currently unused */
516 static int mgs_handle_exception(struct ptlrpc_request *req)
517 {
518         ENTRY;
519         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
520         RETURN(0);
521 }
522
523 int mgs_handle(struct ptlrpc_request *req)
524 {
525         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
526         int opc, rc = 0;
527         ENTRY;
528
529         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
530         OBD_FAIL_RETURN(OBD_FAIL_MGS_ALL_REQUEST_NET, 0);
531
532         LASSERT(current->journal_info == NULL);
533         opc = lustre_msg_get_opc(req->rq_reqmsg);
534         if (opc != MGS_CONNECT) {
535                 if (req->rq_export == NULL) {
536                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
537                                opc);
538                         req->rq_status = -ENOTCONN;
539                         GOTO(out, rc = -ENOTCONN);
540                 }
541         }
542
543         switch (opc) {
544         case MGS_CONNECT:
545                 DEBUG_REQ(D_MGS, req, "connect");
546                 rc = target_handle_connect(req, mgs_handle);
547                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
548                         /* Make clients trying to reconnect after a MGS restart
549                            happy; also requires obd_replayable */
550                         lustre_msg_add_op_flags(req->rq_repmsg,
551                                                 MSG_CONNECT_RECONNECT);
552                 break;
553         case MGS_DISCONNECT:
554                 DEBUG_REQ(D_MGS, req, "disconnect");
555                 rc = target_handle_disconnect(req);
556                 req->rq_status = rc;            /* superfluous? */
557                 break;
558         case MGS_EXCEPTION:
559                 DEBUG_REQ(D_MGS, req, "exception");
560                 rc = mgs_handle_exception(req);
561                 break;
562         case MGS_TARGET_REG:
563                 DEBUG_REQ(D_MGS, req, "target add");
564                 rc = mgs_handle_target_reg(req);
565                 break;
566         case MGS_TARGET_DEL:
567                 DEBUG_REQ(D_MGS, req, "target del");
568                 rc = mgs_handle_target_del(req);
569                 break;
570         case MGS_SET_INFO:
571                 rc = mgs_set_info_rpc(req);
572                 break;
573
574         case LDLM_ENQUEUE:
575                 DEBUG_REQ(D_MGS, req, "enqueue");
576                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
577                                          ldlm_server_blocking_ast, NULL);
578                 break;
579         case LDLM_BL_CALLBACK:
580         case LDLM_CP_CALLBACK:
581                 DEBUG_REQ(D_MGS, req, "callback");
582                 CERROR("callbacks should not happen on MGS\n");
583                 LBUG();
584                 break;
585
586         case OBD_PING:
587                 DEBUG_REQ(D_INFO, req, "ping");
588                 rc = target_handle_ping(req);
589                 break;
590         case OBD_LOG_CANCEL:
591                 DEBUG_REQ(D_MGS, req, "log cancel");
592                 rc = -ENOTSUPP; /* la la la */
593                 break;
594
595         case LLOG_ORIGIN_HANDLE_CREATE:
596                 DEBUG_REQ(D_MGS, req, "llog_init");
597                 rc = llog_origin_handle_create(req);
598                 break;
599         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
600                 DEBUG_REQ(D_MGS, req, "llog next block");
601                 rc = llog_origin_handle_next_block(req);
602                 break;
603         case LLOG_ORIGIN_HANDLE_READ_HEADER:
604                 DEBUG_REQ(D_MGS, req, "llog read header");
605                 rc = llog_origin_handle_read_header(req);
606                 break;
607         case LLOG_ORIGIN_HANDLE_CLOSE:
608                 DEBUG_REQ(D_MGS, req, "llog close");
609                 rc = llog_origin_handle_close(req);
610                 break;
611         case LLOG_CATINFO:
612                 DEBUG_REQ(D_MGS, req, "llog catinfo");
613                 rc = llog_catinfo(req);
614                 break;
615         default:
616                 req->rq_status = -ENOTSUPP;
617                 rc = ptlrpc_error(req);
618                 RETURN(rc);
619         }
620
621         LASSERT(current->journal_info == NULL);
622         
623         if (rc) 
624                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
625
626  out:
627         target_send_reply(req, rc, fail);
628         RETURN(0);
629 }
630
631 static inline int mgs_destroy_export(struct obd_export *exp)
632 {
633         ENTRY;
634
635         target_destroy_export(exp);
636         mgs_client_free(exp);
637
638         RETURN(0);
639 }
640
641 /* from mdt_iocontrol */
642 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
643                   void *karg, void *uarg)
644 {
645         struct obd_device *obd = exp->exp_obd;
646         struct obd_ioctl_data *data = karg;
647         struct lvfs_run_ctxt saved;
648         int rc = 0;
649
650         ENTRY;
651         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
652
653         switch (cmd) {
654
655         case OBD_IOC_PARAM: {
656                 struct lustre_handle lockh;
657                 struct lustre_cfg *lcfg;
658                 struct llog_rec_hdr rec;
659                 char fsname[MTI_NAME_MAXLEN];
660                 int lockrc;
661
662                 rec.lrh_len = llog_data_len(data->ioc_plen1);
663
664                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
665                         rec.lrh_type = OBD_CFG_REC;
666                 } else {
667                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
668                         RETURN(-EINVAL);
669                 }
670
671                 OBD_ALLOC(lcfg, data->ioc_plen1);
672                 if (lcfg == NULL)
673                         RETURN(-ENOMEM);
674                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
675                 if (rc) 
676                         GOTO(out_free, rc);
677
678                 if (lcfg->lcfg_bufcount < 1)
679                         GOTO(out_free, rc = -EINVAL);
680
681                 rc = mgs_setparam(obd, lcfg, fsname);
682                 if (rc) {
683                         CERROR("setparam err %d\n", rc);
684                         GOTO(out_free, rc);
685                 }
686
687                 /* Revoke lock so everyone updates.  Should be alright if
688                    someone was already reading while we were updating the logs,
689                    so we don't really need to hold the lock while we're
690                    writing (above). */
691                 if (fsname[0]) {
692                         lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
693                         if (lockrc != ELDLM_OK) 
694                                 CERROR("lock error %d for fs %s\n", lockrc, 
695                                        fsname);
696                         else
697                                 mgs_put_cfg_lock(&lockh);
698                 }
699
700 out_free:
701                 OBD_FREE(lcfg, data->ioc_plen1);
702                 RETURN(rc);
703         }
704
705         case OBD_IOC_DUMP_LOG: {
706                 struct llog_ctxt *ctxt =
707                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
708                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
709                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
710                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
711                 llog_ctxt_put(ctxt);
712                 if (rc)
713                         RETURN(rc);
714
715                 RETURN(rc);
716         }
717
718         case OBD_IOC_LLOG_CHECK:
719         case OBD_IOC_LLOG_INFO:
720         case OBD_IOC_LLOG_PRINT: {
721                 struct llog_ctxt *ctxt =
722                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
723
724                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
725                 rc = llog_ioctl(ctxt, cmd, data);
726                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
727                 llog_ctxt_put(ctxt);
728
729                 RETURN(rc);
730         }
731
732         default:
733                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
734                 RETURN(-EINVAL);
735         }
736         RETURN(0);
737 }
738
739 /* use obd ops to offer management infrastructure */
740 static struct obd_ops mgs_obd_ops = {
741         .o_owner           = THIS_MODULE,
742         .o_connect         = mgs_connect,
743         .o_disconnect      = mgs_disconnect,
744         .o_setup           = mgs_setup,
745         .o_precleanup      = mgs_precleanup,
746         .o_cleanup         = mgs_cleanup,
747         .o_destroy_export  = mgs_destroy_export,
748         .o_iocontrol       = mgs_iocontrol,
749 };
750
751 static int __init mgs_init(void)
752 {
753         struct lprocfs_static_vars lvars;
754
755         lprocfs_mgs_init_vars(&lvars);
756         class_register_type(&mgs_obd_ops, lvars.module_vars, LUSTRE_MGS_NAME);
757
758         return 0;
759 }
760
761 static void /*__exit*/ mgs_exit(void)
762 {
763         class_unregister_type(LUSTRE_MGS_NAME);
764 }
765
766 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
767 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
768 MODULE_LICENSE("GPL");
769
770 module_init(mgs_init);
771 module_exit(mgs_exit);