Whamcloud - gitweb
4781703ed8d61b36914c1e19f1a75d4f291ce65f
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgs/mgs_handler.c
5  *  Lustre Management Server (mgs) request handler
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MGS
30 #define D_MGS D_CONFIG/*|D_WARNING*/
31
32 #ifdef __KERNEL__
33 # include <linux/module.h>
34 # include <linux/pagemap.h>
35 # include <linux/miscdevice.h>
36 # include <linux/init.h>
37 #else
38 # include <liblustre.h>
39 #endif
40
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lprocfs_status.h>
44 #include <lustre_fsfilt.h>
45 #include <lustre_commit_confd.h>
46 #include <lustre_disk.h>
47 #include "mgs_internal.h"
48
49
50 /* Establish a connection to the MGS.*/
51 static int mgs_connect(const struct lu_env *env,
52                        struct lustre_handle *conn, struct obd_device *obd,
53                        struct obd_uuid *cluuid, struct obd_connect_data *data,
54                        void *localdata)
55 {
56         struct obd_export *exp;
57         int rc;
58         ENTRY;
59
60         if (!conn || !obd || !cluuid)
61                 RETURN(-EINVAL);
62
63         rc = class_connect(conn, obd, cluuid);
64         if (rc)
65                 RETURN(rc);
66         exp = class_conn2export(conn);
67         LASSERT(exp);
68
69         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
70
71         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
72
73         if (data != NULL) {
74                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
75                 exp->exp_connect_flags = data->ocd_connect_flags;
76                 data->ocd_version = LUSTRE_VERSION_CODE;
77         }
78
79         if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
80                 CWARN("MGS requires FID support, but client not\n");
81                 rc = -EBADE;
82         }
83
84         if (rc) {
85                 class_disconnect(exp);
86         } else {
87                 class_export_put(exp);
88         }
89
90         RETURN(rc);
91 }
92
93 static int mgs_disconnect(struct obd_export *exp)
94 {
95         int rc;
96         ENTRY;
97
98         LASSERT(exp);
99
100         class_export_get(exp);
101         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
102
103         /* Disconnect early so that clients can't keep using export */
104         rc = class_disconnect(exp);
105         ldlm_cancel_locks_for_export(exp);
106
107         /* complete all outstanding replies */
108         spin_lock(&exp->exp_lock);
109         while (!list_empty(&exp->exp_outstanding_replies)) {
110                 struct ptlrpc_reply_state *rs =
111                         list_entry(exp->exp_outstanding_replies.next,
112                                    struct ptlrpc_reply_state, rs_exp_list);
113                 struct ptlrpc_service *svc = rs->rs_service;
114
115                 spin_lock(&svc->srv_lock);
116                 list_del_init(&rs->rs_exp_list);
117                 ptlrpc_schedule_difficult_reply(rs);
118                 spin_unlock(&svc->srv_lock);
119         }
120         spin_unlock(&exp->exp_lock);
121
122         class_export_put(exp);
123         RETURN(rc);
124 }
125
126 static int mgs_cleanup(struct obd_device *obd);
127 static int mgs_handle(struct ptlrpc_request *req);
128
129 static int mgs_llog_init(struct obd_device *obd, int group,
130                          struct obd_device *tgt, int count,
131                          struct llog_catid *logid, struct obd_uuid *uuid)
132 {
133         struct obd_llog_group *olg = &obd->obd_olg;
134         int rc;
135         ENTRY;
136
137         LASSERT(group == OBD_LLOG_GROUP);
138         LASSERT(olg->olg_group == group);
139
140         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
141                         &llog_lvfs_ops);
142         RETURN(rc);
143 }
144
145 static int mgs_llog_finish(struct obd_device *obd, int count)
146 {
147         struct llog_ctxt *ctxt;
148         int rc = 0;
149         ENTRY;
150
151         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
152         if (ctxt)
153                 rc = llog_cleanup(ctxt);
154
155         RETURN(rc);
156 }
157
158 /* Start the MGS obd */
159 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
160 {
161         struct lprocfs_static_vars lvars;
162         struct mgs_obd *mgs = &obd->u.mgs;
163         struct lustre_mount_info *lmi;
164         struct lustre_sb_info *lsi;
165         struct vfsmount *mnt;
166         int rc = 0;
167         ENTRY;
168
169         CDEBUG(D_CONFIG, "Starting MGS\n");
170
171         /* Find our disk */
172         lmi = server_get_mount(obd->obd_name);
173         if (!lmi)
174                 RETURN(rc = -EINVAL);
175
176         mnt = lmi->lmi_mnt;
177         lsi = s2lsi(lmi->lmi_sb);
178         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
179         if (IS_ERR(obd->obd_fsops))
180                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
181
182         /* namespace for mgs llog */
183         obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER,
184                                                 LDLM_NAMESPACE_MODEST);
185         if (obd->obd_namespace == NULL)
186                 GOTO(err_ops, rc = -ENOMEM);
187
188         /* ldlm setup */
189         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
190                            "mgs_ldlm_client", &obd->obd_ldlm_client);
191
192         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
193
194         rc = mgs_fs_setup(obd, mnt);
195         if (rc) {
196                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
197                        obd->obd_name, rc);
198                 GOTO(err_ns, rc);
199         }
200
201         rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
202         if (rc)
203                 GOTO(err_fs, rc);
204
205         /* No recovery for MGC's */
206         obd->obd_replayable = 0;
207
208         /* Internal mgs setup */
209         mgs_init_fsdb_list(obd);
210         sema_init(&mgs->mgs_sem, 1);
211
212         /* Start the service threads */
213         mgs->mgs_service =
214                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
215                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
216                                 MGC_REPLY_PORTAL, MGS_SERVICE_WATCHDOG_TIMEOUT,
217                                 mgs_handle, LUSTRE_MGS_NAME,
218                                 obd->obd_proc_entry, NULL,
219                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
220                                 "ll_mgs", LCT_MD_THREAD);
221
222         if (!mgs->mgs_service) {
223                 CERROR("failed to start service\n");
224                 GOTO(err_llog, rc = -ENOMEM);
225         }
226
227         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
228         if (rc)
229                 GOTO(err_thread, rc);
230
231         /* Setup proc */
232         lprocfs_mgs_init_vars(&lvars);
233         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
234                 lproc_mgs_setup(obd);
235         }
236
237         ping_evictor_start();
238
239         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
240
241         RETURN(0);
242
243 err_thread:
244         ptlrpc_unregister_service(mgs->mgs_service);
245 err_llog:
246         obd_llog_finish(obd, 0);
247 err_fs:
248         /* No extra cleanup needed for llog_init_commit_thread() */
249         mgs_fs_cleanup(obd);
250 err_ns:
251         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
252         obd->obd_namespace = NULL;
253 err_ops:
254         fsfilt_put_ops(obd->obd_fsops);
255 err_put:
256         server_put_mount(obd->obd_name, mnt);
257         mgs->mgs_sb = 0;
258         return rc;
259 }
260
261 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
262 {
263         int rc = 0;
264         ENTRY;
265
266         switch (stage) {
267         case OBD_CLEANUP_EARLY:
268         case OBD_CLEANUP_EXPORTS:
269                 break;
270         case OBD_CLEANUP_SELF_EXP:
271                 rc = obd_llog_finish(obd, 0);
272                 break;
273         case OBD_CLEANUP_OBD:
274                 break;
275         }
276         RETURN(rc);
277 }
278
279 static int mgs_ldlm_nsfree(void *data)
280 {
281         struct ldlm_namespace *ns = (struct ldlm_namespace *)data;
282         ENTRY;
283
284         ptlrpc_daemonize("ll_mgs_nsfree");
285         ldlm_namespace_free(ns, NULL, 1 /* obd_force should always be on */);
286         RETURN(0);
287 }
288
289 static int mgs_cleanup(struct obd_device *obd)
290 {
291         struct mgs_obd *mgs = &obd->u.mgs;
292         ENTRY;
293
294         if (mgs->mgs_sb == NULL)
295                 RETURN(0);
296
297         ping_evictor_stop();
298
299         ptlrpc_unregister_service(mgs->mgs_service);
300
301         mgs_cleanup_fsdb_list(obd);
302         lproc_mgs_cleanup(obd);
303         mgs_fs_cleanup(obd);
304
305         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
306         mgs->mgs_sb = NULL;
307
308         /* Free the namespace in it's own thread, so that if the
309            ldlm_cancel_handler put the last mgs obd ref, we won't
310            deadlock here. */
311         cfs_kernel_thread(mgs_ldlm_nsfree, obd->obd_namespace,
312                           CLONE_VM | CLONE_FILES);
313
314
315         fsfilt_put_ops(obd->obd_fsops);
316
317         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
318         RETURN(0);
319 }
320
321 /* similar to filter_prepare_destroy */
322 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
323                             struct lustre_handle *lockh)
324 {
325         struct ldlm_res_id res_id;
326         int rc, flags = 0;
327         ENTRY;
328
329         rc = mgc_fsname2resid(fsname, &res_id);
330         if (!rc)
331                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
332                                             LDLM_PLAIN, NULL, LCK_EX,
333                                             &flags, ldlm_blocking_ast,
334                                             ldlm_completion_ast, NULL,
335                                             fsname, 0, NULL, lockh);
336         if (rc)
337                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
338
339         RETURN(rc);
340 }
341
342 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
343 {
344         ENTRY;
345         ldlm_lock_decref(lockh, LCK_EX);
346         RETURN(0);
347 }
348
349 /* rc=0 means ok
350       1 means update
351      <0 means error */
352 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
353 {
354         int rc;
355         ENTRY;
356
357         rc = mgs_check_index(obd, mti);
358         if (rc == 0) {
359                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
360                                    "this MGS does not know about it. Assuming"
361                                    " writeconf.\n", mti->mti_svname);
362                 mti->mti_flags |= LDD_F_WRITECONF;
363                 rc = 1;
364         } else if (rc == -1) {
365                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
366                                    "disappeared! Regenerating all logs.\n",
367                                    mti->mti_fsname);
368                 mti->mti_flags |= LDD_F_WRITECONF;
369                 rc = 1;
370         } else {
371                 /* Index is correctly marked as used */
372
373                 /* If the logs don't contain the mti_nids then add
374                    them as failover nids */
375                 rc = mgs_check_failnid(obd, mti);
376         }
377
378         RETURN(rc);
379 }
380
381 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
382 static int mgs_handle_target_reg(struct ptlrpc_request *req)
383 {
384         struct obd_device *obd = req->rq_export->exp_obd;
385         struct lustre_handle lockh;
386         struct mgs_target_info *mti, *rep_mti;
387         int rc = 0, lockrc;
388         ENTRY;
389
390         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
391
392         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
393         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
394                                 LDD_F_UPDATE))) {
395                 /* We're just here as a startup ping. */
396                 CDEBUG(D_MGS, "Server %s is running on %s\n",
397                        mti->mti_svname, obd_export_nid2str(req->rq_export));
398                 rc = mgs_check_target(obd, mti);
399                 /* above will set appropriate mti flags */
400                 if (rc <= 0)
401                         /* Nothing wrong, or fatal error */
402                         GOTO(out_nolock, rc);
403         }
404
405         /* Revoke the config lock to make sure nobody is reading. */
406         /* Although actually I think it should be alright if
407            someone was reading while we were updating the logs - if we
408            revoke at the end they will just update from where they left off. */
409         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
410         if (lockrc != ELDLM_OK) {
411                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
412                                    "update their configuration (%d). Updating "
413                                    "local logs anyhow; you might have to "
414                                    "manually restart other nodes to get the "
415                                    "latest configuration.\n",
416                                    obd->obd_name, lockrc);
417         }
418
419         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_TARGET_REG, 10);
420
421         /* Log writing contention is handled by the fsdb_sem */
422
423         if (mti->mti_flags & LDD_F_WRITECONF) {
424                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
425                     mti->mti_stripe_index == 0) {
426                         rc = mgs_erase_logs(obd, mti->mti_fsname);
427                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
428                                       "request.  All servers must be restarted "
429                                       "in order to regenerate the logs."
430                                       "\n", obd->obd_name, mti->mti_fsname);
431                 } else if (mti->mti_flags &
432                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
433                         rc = mgs_erase_log(obd, mti->mti_svname);
434                         LCONSOLE_WARN("%s: Regenerating %s log by user "
435                                       "request.\n",
436                                       obd->obd_name, mti->mti_svname);
437                 }
438                 mti->mti_flags |= LDD_F_UPDATE;
439                 /* Erased logs means start from scratch. */
440                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
441         }
442
443         /* COMPAT_146 */
444         if (mti->mti_flags & LDD_F_UPGRADE14) {
445                 rc = mgs_upgrade_sv_14(obd, mti);
446                 if (rc) {
447                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
448                         GOTO(out, rc);
449                 }
450                 
451                 /* We're good to go */
452                 mti->mti_flags |= LDD_F_UPDATE;
453         }
454         /* end COMPAT_146 */
455
456         if (mti->mti_flags & LDD_F_UPDATE) {
457                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
458                        mti->mti_stripe_index);
459
460                 /* create or update the target log
461                    and update the client/mdt logs */
462                 rc = mgs_write_log_target(obd, mti);
463                 if (rc) {
464                         CERROR("Failed to write %s log (%d)\n",
465                                mti->mti_svname, rc);
466                         GOTO(out, rc);
467                 }
468
469                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
470                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
471                                     LDD_F_UPGRADE14);
472                 mti->mti_flags |= LDD_F_REWRITE_LDD;
473         }
474
475 out:
476         /* done with log update */
477         if (lockrc == ELDLM_OK)
478                 mgs_put_cfg_lock(&lockh);
479 out_nolock:
480         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
481                mti->mti_stripe_index, rc);
482         rc = req_capsule_server_pack(&req->rq_pill);
483         if (rc)
484                 RETURN(rc);
485
486         /* send back the whole mti in the reply */
487         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
488         *rep_mti = *mti;
489
490         /* Flush logs to disk */
491         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
492         RETURN(rc);
493 }
494
495 static int mgs_set_info_rpc(struct ptlrpc_request *req)
496 {
497         struct obd_device *obd = req->rq_export->exp_obd;
498         struct mgs_send_param *msp, *rep_msp;
499         struct lustre_handle lockh;
500         int lockrc, rc;
501         struct lustre_cfg_bufs bufs;
502         struct lustre_cfg *lcfg;
503         char fsname[MTI_NAME_MAXLEN];
504         ENTRY;
505
506         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
507         LASSERT(msp);
508
509         /* Construct lustre_cfg structure to pass to function mgs_setparam */
510         lustre_cfg_bufs_reset(&bufs, NULL);
511         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
512         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
513         rc = mgs_setparam(obd, lcfg, fsname);
514         if (rc) {
515                 CERROR("Error %d in setting the parameter %s for fs %s\n",
516                        rc, msp->mgs_param, fsname);
517                 RETURN(rc);
518         }
519
520         /* Revoke lock so everyone updates.  Should be alright if
521          * someone was already reading while we were updating the logs,
522          * so we don't really need to hold the lock while we're
523          * writing.
524          */
525         if (fsname[0]) {
526                 lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
527                 if (lockrc != ELDLM_OK)
528                         CERROR("lock error %d for fs %s\n", lockrc,
529                                fsname);
530                 else
531                         mgs_put_cfg_lock(&lockh);
532         }
533         lustre_cfg_free(lcfg);
534
535         rc = req_capsule_server_pack(&req->rq_pill);
536         if (rc == 0) {
537                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
538                 rep_msp = msp;
539         }
540         RETURN(rc);
541 }
542
543 /* Called whenever a target cleans up. */
544 /* XXX - Currently unused */
545 static int mgs_handle_target_del(struct ptlrpc_request *req)
546 {
547         ENTRY;
548         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
549         RETURN(0);
550 }
551
552 /* XXX - Currently unused */
553 static int mgs_handle_exception(struct ptlrpc_request *req)
554 {
555         ENTRY;
556         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
557         RETURN(0);
558 }
559
560 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
561 int mgs_handle(struct ptlrpc_request *req)
562 {
563         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
564         int opc, rc = 0;
565         ENTRY;
566
567         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
568         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_REQUEST_NET, 2);
569
570         LASSERT(current->journal_info == NULL);
571         opc = lustre_msg_get_opc(req->rq_reqmsg);
572         if (opc != MGS_CONNECT) {
573                 if (req->rq_export == NULL) {
574                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
575                                opc);
576                         req->rq_status = -ENOTCONN;
577                         GOTO(out, rc = -ENOTCONN);
578                 }
579         }
580
581         switch (opc) {
582         case MGS_CONNECT:
583                 DEBUG_REQ(D_MGS, req, "connect");
584                 /* MGS and MDS have same request format for connect */
585                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
586                 rc = target_handle_connect(req);
587                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
588                         /* Make clients trying to reconnect after a MGS restart
589                            happy; also requires obd_replayable */
590                         lustre_msg_add_op_flags(req->rq_repmsg,
591                                                 MSG_CONNECT_RECONNECT);
592                 break;
593         case MGS_DISCONNECT:
594                 DEBUG_REQ(D_MGS, req, "disconnect");
595                 /* MGS and MDS have same request format for disconnect */
596                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
597                 rc = target_handle_disconnect(req);
598                 req->rq_status = rc;            /* superfluous? */
599                 break;
600         case MGS_EXCEPTION:
601                 DEBUG_REQ(D_MGS, req, "exception");
602                 rc = mgs_handle_exception(req);
603                 break;
604         case MGS_TARGET_REG:
605                 DEBUG_REQ(D_MGS, req, "target add");
606                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
607                 rc = mgs_handle_target_reg(req);
608                 break;
609         case MGS_TARGET_DEL:
610                 DEBUG_REQ(D_MGS, req, "target del");
611                 rc = mgs_handle_target_del(req);
612                 break;
613         case MGS_SET_INFO:
614                 DEBUG_REQ(D_MGS, req, "set_info");
615                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
616                 rc = mgs_set_info_rpc(req);
617                 break;
618
619         case LDLM_ENQUEUE:
620                 DEBUG_REQ(D_MGS, req, "enqueue");
621                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
622                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
623                                          ldlm_server_blocking_ast, NULL);
624                 break;
625         case LDLM_BL_CALLBACK:
626         case LDLM_CP_CALLBACK:
627                 DEBUG_REQ(D_MGS, req, "callback");
628                 CERROR("callbacks should not happen on MGS\n");
629                 LBUG();
630                 break;
631
632         case OBD_PING:
633                 DEBUG_REQ(D_INFO, req, "ping");
634                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
635                 rc = target_handle_ping(req);
636                 break;
637         case OBD_LOG_CANCEL:
638                 DEBUG_REQ(D_MGS, req, "log cancel");
639                 rc = -ENOTSUPP; /* la la la */
640                 break;
641
642         case LLOG_ORIGIN_HANDLE_CREATE:
643                 DEBUG_REQ(D_MGS, req, "llog_init");
644                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
645                 rc = llog_origin_handle_create(req);
646                 break;
647         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
648                 DEBUG_REQ(D_MGS, req, "llog next block");
649                 req_capsule_set(&req->rq_pill,
650                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
651                 rc = llog_origin_handle_next_block(req);
652                 break;
653         case LLOG_ORIGIN_HANDLE_READ_HEADER:
654                 DEBUG_REQ(D_MGS, req, "llog read header");
655                 req_capsule_set(&req->rq_pill,
656                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
657                 rc = llog_origin_handle_read_header(req);
658                 break;
659         case LLOG_ORIGIN_HANDLE_CLOSE:
660                 DEBUG_REQ(D_MGS, req, "llog close");
661                 rc = llog_origin_handle_close(req);
662                 break;
663         case LLOG_CATINFO:
664                 DEBUG_REQ(D_MGS, req, "llog catinfo");
665                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
666                 rc = llog_catinfo(req);
667                 break;
668         default:
669                 req->rq_status = -ENOTSUPP;
670                 rc = ptlrpc_error(req);
671                 RETURN(rc);
672         }
673
674         LASSERT(current->journal_info == NULL);
675
676         if (rc)
677                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
678
679 out:
680         target_send_reply(req, rc, fail);
681         RETURN(0);
682 }
683
684 static inline int mgs_destroy_export(struct obd_export *exp)
685 {
686         ENTRY;
687
688         target_destroy_export(exp);
689
690         RETURN(0);
691 }
692
693 /* from mdt_iocontrol */
694 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
695                   void *karg, void *uarg)
696 {
697         struct obd_device *obd = exp->exp_obd;
698         struct obd_ioctl_data *data = karg;
699         struct lvfs_run_ctxt saved;
700         int rc = 0;
701
702         ENTRY;
703         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
704
705         switch (cmd) {
706
707         case OBD_IOC_PARAM: {
708                 struct lustre_handle lockh;
709                 struct lustre_cfg *lcfg;
710                 struct llog_rec_hdr rec;
711                 char fsname[MTI_NAME_MAXLEN];
712                 int lockrc;
713
714                 rec.lrh_len = llog_data_len(data->ioc_plen1);
715
716                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
717                         rec.lrh_type = OBD_CFG_REC;
718                 } else {
719                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
720                         RETURN(-EINVAL);
721                 }
722
723                 OBD_ALLOC(lcfg, data->ioc_plen1);
724                 if (lcfg == NULL)
725                         RETURN(-ENOMEM);
726                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
727                 if (rc)
728                         GOTO(out_free, rc);
729
730                 if (lcfg->lcfg_bufcount < 1)
731                         GOTO(out_free, rc = -EINVAL);
732
733                 rc = mgs_setparam(obd, lcfg, fsname);
734                 if (rc) {
735                         CERROR("setparam err %d\n", rc);
736                         GOTO(out_free, rc);
737                 }
738
739                 /* Revoke lock so everyone updates.  Should be alright if
740                    someone was already reading while we were updating the logs,
741                    so we don't really need to hold the lock while we're
742                    writing (above). */
743                 if (fsname[0]) {
744                         lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
745                         if (lockrc != ELDLM_OK)
746                                 CERROR("lock error %d for fs %s\n", lockrc,
747                                        fsname);
748                         else
749                                 mgs_put_cfg_lock(&lockh);
750                 }
751
752 out_free:
753                 OBD_FREE(lcfg, data->ioc_plen1);
754                 RETURN(rc);
755         }
756
757         case OBD_IOC_DUMP_LOG: {
758                 struct llog_ctxt *ctxt;
759                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
760                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
761                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
762                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
763                 llog_ctxt_put(ctxt);
764
765                 RETURN(rc);
766         }
767
768         case OBD_IOC_LLOG_CHECK:
769         case OBD_IOC_LLOG_INFO:
770         case OBD_IOC_LLOG_PRINT: {
771                 struct llog_ctxt *ctxt;
772                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
773
774                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
775                 rc = llog_ioctl(ctxt, cmd, data);
776                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
777                 llog_ctxt_put(ctxt);
778
779                 RETURN(rc);
780         }
781
782         default:
783                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
784                 RETURN(-EINVAL);
785         }
786         RETURN(0);
787 }
788
789 /* use obd ops to offer management infrastructure */
790 static struct obd_ops mgs_obd_ops = {
791         .o_owner           = THIS_MODULE,
792         .o_connect         = mgs_connect,
793         .o_disconnect      = mgs_disconnect,
794         .o_setup           = mgs_setup,
795         .o_precleanup      = mgs_precleanup,
796         .o_cleanup         = mgs_cleanup,
797         .o_destroy_export  = mgs_destroy_export,
798         .o_iocontrol       = mgs_iocontrol,
799         .o_llog_init       = mgs_llog_init,
800         .o_llog_finish     = mgs_llog_finish
801 };
802
803 static int __init mgs_init(void)
804 {
805         struct lprocfs_static_vars lvars;
806
807         lprocfs_mgs_init_vars(&lvars);
808         class_register_type(&mgs_obd_ops, NULL,
809                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
810
811         return 0;
812 }
813
814 static void /*__exit*/ mgs_exit(void)
815 {
816         class_unregister_type(LUSTRE_MGS_NAME);
817 }
818
819 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
820 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
821 MODULE_LICENSE("GPL");
822
823 module_init(mgs_init);
824 module_exit(mgs_exit);