Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgs/mgs_handler.c
5  *  Lustre Management Server (mgs) request handler
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MGS
30 #define D_MGS D_CONFIG/*|D_WARNING*/
31
32 #ifdef __KERNEL__
33 # include <linux/module.h>
34 # include <linux/pagemap.h>
35 # include <linux/miscdevice.h>
36 # include <linux/init.h>
37 #else
38 # include <liblustre.h>
39 #endif
40
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lprocfs_status.h>
44 #include <lustre_fsfilt.h>
45 #include <lustre_commit_confd.h>
46 #include <lustre_disk.h>
47 #include "mgs_internal.h"
48
49
50 /* Establish a connection to the MGS.*/
51 static int mgs_connect(const struct lu_env *env,
52                        struct lustre_handle *conn, struct obd_device *obd,
53                        struct obd_uuid *cluuid, struct obd_connect_data *data,
54                        void *localdata)
55 {
56         struct obd_export *exp;
57         int rc;
58         ENTRY;
59
60         if (!conn || !obd || !cluuid)
61                 RETURN(-EINVAL);
62
63         rc = class_connect(conn, obd, cluuid);
64         if (rc)
65                 RETURN(rc);
66         exp = class_conn2export(conn);
67         LASSERT(exp);
68
69         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
70
71         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
72
73         if (data != NULL) {
74                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
75                 exp->exp_connect_flags = data->ocd_connect_flags;
76                 data->ocd_version = LUSTRE_VERSION_CODE;
77         }
78
79         if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
80                 CWARN("MGS requires FID support, but client not\n");
81                 rc = -EBADE;
82         }
83
84         if (rc) {
85                 class_disconnect(exp);
86         } else {
87                 class_export_put(exp);
88         }
89
90         RETURN(rc);
91 }
92
93 static int mgs_disconnect(struct obd_export *exp)
94 {
95         int rc;
96         ENTRY;
97
98         LASSERT(exp);
99
100         class_export_get(exp);
101         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
102
103         /* Disconnect early so that clients can't keep using export */
104         rc = class_disconnect(exp);
105         ldlm_cancel_locks_for_export(exp);
106
107         /* complete all outstanding replies */
108         spin_lock(&exp->exp_lock);
109         while (!list_empty(&exp->exp_outstanding_replies)) {
110                 struct ptlrpc_reply_state *rs =
111                         list_entry(exp->exp_outstanding_replies.next,
112                                    struct ptlrpc_reply_state, rs_exp_list);
113                 struct ptlrpc_service *svc = rs->rs_service;
114
115                 spin_lock(&svc->srv_lock);
116                 list_del_init(&rs->rs_exp_list);
117                 ptlrpc_schedule_difficult_reply(rs);
118                 spin_unlock(&svc->srv_lock);
119         }
120         spin_unlock(&exp->exp_lock);
121
122         class_export_put(exp);
123         RETURN(rc);
124 }
125
126 static int mgs_cleanup(struct obd_device *obd);
127 static int mgs_handle(struct ptlrpc_request *req);
128
129 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
130                          struct obd_device *tgt, int count,
131                          struct llog_catid *logid, struct obd_uuid *uuid)
132 {
133         int rc;
134         ENTRY;
135
136         LASSERT(olg == &obd->obd_olg);
137         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
138                         &llog_lvfs_ops);
139         RETURN(rc);
140 }
141
142 static int mgs_llog_finish(struct obd_device *obd, int count)
143 {
144         struct llog_ctxt *ctxt;
145         int rc = 0;
146         ENTRY;
147
148         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
149         if (ctxt)
150                 rc = llog_cleanup(ctxt);
151
152         RETURN(rc);
153 }
154
155 /* Start the MGS obd */
156 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
157 {
158         struct lprocfs_static_vars lvars;
159         struct mgs_obd *mgs = &obd->u.mgs;
160         struct lustre_mount_info *lmi;
161         struct lustre_sb_info *lsi;
162         struct vfsmount *mnt;
163         int rc = 0;
164         ENTRY;
165
166         CDEBUG(D_CONFIG, "Starting MGS\n");
167
168         /* Find our disk */
169         lmi = server_get_mount(obd->obd_name);
170         if (!lmi)
171                 RETURN(rc = -EINVAL);
172
173         mnt = lmi->lmi_mnt;
174         lsi = s2lsi(lmi->lmi_sb);
175         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
176         if (IS_ERR(obd->obd_fsops))
177                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
178
179         /* namespace for mgs llog */
180         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
181                                                 LDLM_NAMESPACE_MODEST);
182         if (obd->obd_namespace == NULL)
183                 GOTO(err_ops, rc = -ENOMEM);
184
185         /* ldlm setup */
186         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
187                            "mgs_ldlm_client", &obd->obd_ldlm_client);
188
189         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
190
191         rc = mgs_fs_setup(obd, mnt);
192         if (rc) {
193                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
194                        obd->obd_name, rc);
195                 GOTO(err_ns, rc);
196         }
197
198         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
199         if (rc)
200                 GOTO(err_fs, rc);
201
202         /* No recovery for MGC's */
203         obd->obd_replayable = 0;
204
205         /* Internal mgs setup */
206         mgs_init_fsdb_list(obd);
207         sema_init(&mgs->mgs_sem, 1);
208
209         /* Start the service threads */
210         mgs->mgs_service =
211                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
212                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
213                                 MGC_REPLY_PORTAL, MGS_SERVICE_WATCHDOG_TIMEOUT,
214                                 mgs_handle, LUSTRE_MGS_NAME,
215                                 obd->obd_proc_entry, NULL,
216                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
217                                 "ll_mgs", LCT_MD_THREAD);
218
219         if (!mgs->mgs_service) {
220                 CERROR("failed to start service\n");
221                 GOTO(err_llog, rc = -ENOMEM);
222         }
223
224         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
225         if (rc)
226                 GOTO(err_thread, rc);
227
228         /* Setup proc */
229         lprocfs_mgs_init_vars(&lvars);
230         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
231                 lproc_mgs_setup(obd);
232         }
233
234         ping_evictor_start();
235
236         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
237
238         RETURN(0);
239
240 err_thread:
241         ptlrpc_unregister_service(mgs->mgs_service);
242 err_llog:
243         obd_llog_finish(obd, 0);
244 err_fs:
245         /* No extra cleanup needed for llog_init_commit_thread() */
246         mgs_fs_cleanup(obd);
247 err_ns:
248         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
249         obd->obd_namespace = NULL;
250 err_ops:
251         fsfilt_put_ops(obd->obd_fsops);
252 err_put:
253         server_put_mount(obd->obd_name, mnt);
254         mgs->mgs_sb = 0;
255         return rc;
256 }
257
258 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
259 {
260         int rc = 0;
261         ENTRY;
262
263         switch (stage) {
264         case OBD_CLEANUP_EARLY:
265         case OBD_CLEANUP_EXPORTS:
266                 break;
267         case OBD_CLEANUP_SELF_EXP:
268                 rc = obd_llog_finish(obd, 0);
269                 break;
270         case OBD_CLEANUP_OBD:
271                 break;
272         }
273         RETURN(rc);
274 }
275
276 /**
277  * Performs cleanup procedures for passed \a obd given it is mgs obd.
278  */
279 static int mgs_cleanup(struct obd_device *obd)
280 {
281         struct mgs_obd *mgs = &obd->u.mgs;
282         ENTRY;
283
284         if (mgs->mgs_sb == NULL)
285                 RETURN(0);
286
287         ping_evictor_stop();
288
289         ptlrpc_unregister_service(mgs->mgs_service);
290
291         mgs_cleanup_fsdb_list(obd);
292         lproc_mgs_cleanup(obd);
293         mgs_fs_cleanup(obd);
294
295         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
296         mgs->mgs_sb = NULL;
297
298         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
299         obd->obd_namespace = NULL;
300
301         fsfilt_put_ops(obd->obd_fsops);
302
303         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
304         RETURN(0);
305 }
306
307 /* similar to filter_prepare_destroy */
308 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
309                             struct lustre_handle *lockh)
310 {
311         struct ldlm_res_id res_id;
312         int rc, flags = 0;
313         ENTRY;
314
315         rc = mgc_fsname2resid(fsname, &res_id);
316         if (!rc)
317                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
318                                             LDLM_PLAIN, NULL, LCK_EX,
319                                             &flags, ldlm_blocking_ast,
320                                             ldlm_completion_ast, NULL,
321                                             fsname, 0, NULL, lockh);
322         if (rc)
323                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
324
325         RETURN(rc);
326 }
327
328 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
329 {
330         ENTRY;
331         ldlm_lock_decref(lockh, LCK_EX);
332         RETURN(0);
333 }
334
335 /* rc=0 means ok
336       1 means update
337      <0 means error */
338 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
339 {
340         int rc;
341         ENTRY;
342
343         rc = mgs_check_index(obd, mti);
344         if (rc == 0) {
345                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
346                                    "this MGS does not know about it. Assuming"
347                                    " writeconf.\n", mti->mti_svname);
348                 mti->mti_flags |= LDD_F_WRITECONF;
349                 rc = 1;
350         } else if (rc == -1) {
351                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
352                                    "disappeared! Regenerating all logs.\n",
353                                    mti->mti_fsname);
354                 mti->mti_flags |= LDD_F_WRITECONF;
355                 rc = 1;
356         } else {
357                 /* Index is correctly marked as used */
358
359                 /* If the logs don't contain the mti_nids then add
360                    them as failover nids */
361                 rc = mgs_check_failnid(obd, mti);
362         }
363
364         RETURN(rc);
365 }
366
367 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
368 static int mgs_handle_target_reg(struct ptlrpc_request *req)
369 {
370         struct obd_device *obd = req->rq_export->exp_obd;
371         struct lustre_handle lockh;
372         struct mgs_target_info *mti, *rep_mti;
373         int rc = 0, lockrc;
374         ENTRY;
375
376         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
377
378         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
379         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
380                                 LDD_F_UPDATE))) {
381                 /* We're just here as a startup ping. */
382                 CDEBUG(D_MGS, "Server %s is running on %s\n",
383                        mti->mti_svname, obd_export_nid2str(req->rq_export));
384                 rc = mgs_check_target(obd, mti);
385                 /* above will set appropriate mti flags */
386                 if (rc <= 0)
387                         /* Nothing wrong, or fatal error */
388                         GOTO(out_nolock, rc);
389         }
390
391         /* Revoke the config lock to make sure nobody is reading. */
392         /* Although actually I think it should be alright if
393            someone was reading while we were updating the logs - if we
394            revoke at the end they will just update from where they left off. */
395         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
396         if (lockrc != ELDLM_OK) {
397                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
398                                    "update their configuration (%d). Updating "
399                                    "local logs anyhow; you might have to "
400                                    "manually restart other nodes to get the "
401                                    "latest configuration.\n",
402                                    obd->obd_name, lockrc);
403         }
404
405         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_TARGET_REG, 10);
406
407         /* Log writing contention is handled by the fsdb_sem */
408
409         if (mti->mti_flags & LDD_F_WRITECONF) {
410                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
411                     mti->mti_stripe_index == 0) {
412                         rc = mgs_erase_logs(obd, mti->mti_fsname);
413                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
414                                       "request.  All servers must be restarted "
415                                       "in order to regenerate the logs."
416                                       "\n", obd->obd_name, mti->mti_fsname);
417                 } else if (mti->mti_flags &
418                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
419                         rc = mgs_erase_log(obd, mti->mti_svname);
420                         LCONSOLE_WARN("%s: Regenerating %s log by user "
421                                       "request.\n",
422                                       obd->obd_name, mti->mti_svname);
423                 }
424                 mti->mti_flags |= LDD_F_UPDATE;
425                 /* Erased logs means start from scratch. */
426                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
427         }
428
429         /* COMPAT_146 */
430         if (mti->mti_flags & LDD_F_UPGRADE14) {
431                 rc = mgs_upgrade_sv_14(obd, mti);
432                 if (rc) {
433                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
434                         GOTO(out, rc);
435                 }
436                 
437                 /* We're good to go */
438                 mti->mti_flags |= LDD_F_UPDATE;
439         }
440         /* end COMPAT_146 */
441
442         if (mti->mti_flags & LDD_F_UPDATE) {
443                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
444                        mti->mti_stripe_index);
445
446                 /* create or update the target log
447                    and update the client/mdt logs */
448                 rc = mgs_write_log_target(obd, mti);
449                 if (rc) {
450                         CERROR("Failed to write %s log (%d)\n",
451                                mti->mti_svname, rc);
452                         GOTO(out, rc);
453                 }
454
455                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
456                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
457                                     LDD_F_UPGRADE14);
458                 mti->mti_flags |= LDD_F_REWRITE_LDD;
459         }
460
461 out:
462         /* done with log update */
463         if (lockrc == ELDLM_OK)
464                 mgs_put_cfg_lock(&lockh);
465 out_nolock:
466         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
467                mti->mti_stripe_index, rc);
468         rc = req_capsule_server_pack(&req->rq_pill);
469         if (rc)
470                 RETURN(rc);
471
472         /* send back the whole mti in the reply */
473         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
474         *rep_mti = *mti;
475
476         /* Flush logs to disk */
477         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
478         RETURN(rc);
479 }
480
481 static int mgs_set_info_rpc(struct ptlrpc_request *req)
482 {
483         struct obd_device *obd = req->rq_export->exp_obd;
484         struct mgs_send_param *msp, *rep_msp;
485         struct lustre_handle lockh;
486         int lockrc, rc;
487         struct lustre_cfg_bufs bufs;
488         struct lustre_cfg *lcfg;
489         char fsname[MTI_NAME_MAXLEN];
490         ENTRY;
491
492         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
493         LASSERT(msp);
494
495         /* Construct lustre_cfg structure to pass to function mgs_setparam */
496         lustre_cfg_bufs_reset(&bufs, NULL);
497         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
498         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
499         rc = mgs_setparam(obd, lcfg, fsname);
500         if (rc) {
501                 CERROR("Error %d in setting the parameter %s for fs %s\n",
502                        rc, msp->mgs_param, fsname);
503                 RETURN(rc);
504         }
505
506         /* Revoke lock so everyone updates.  Should be alright if
507          * someone was already reading while we were updating the logs,
508          * so we don't really need to hold the lock while we're
509          * writing.
510          */
511         if (fsname[0]) {
512                 lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
513                 if (lockrc != ELDLM_OK)
514                         CERROR("lock error %d for fs %s\n", lockrc,
515                                fsname);
516                 else
517                         mgs_put_cfg_lock(&lockh);
518         }
519         lustre_cfg_free(lcfg);
520
521         rc = req_capsule_server_pack(&req->rq_pill);
522         if (rc == 0) {
523                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
524                 rep_msp = msp;
525         }
526         RETURN(rc);
527 }
528
529 /* Called whenever a target cleans up. */
530 /* XXX - Currently unused */
531 static int mgs_handle_target_del(struct ptlrpc_request *req)
532 {
533         ENTRY;
534         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
535         RETURN(0);
536 }
537
538 /* XXX - Currently unused */
539 static int mgs_handle_exception(struct ptlrpc_request *req)
540 {
541         ENTRY;
542         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
543         RETURN(0);
544 }
545
546 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
547 int mgs_handle(struct ptlrpc_request *req)
548 {
549         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
550         int opc, rc = 0;
551         ENTRY;
552
553         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
554         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_REQUEST_NET, 2);
555
556         LASSERT(current->journal_info == NULL);
557         opc = lustre_msg_get_opc(req->rq_reqmsg);
558         if (opc != MGS_CONNECT) {
559                 if (req->rq_export == NULL) {
560                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
561                                opc);
562                         req->rq_status = -ENOTCONN;
563                         GOTO(out, rc = -ENOTCONN);
564                 }
565         }
566
567         switch (opc) {
568         case MGS_CONNECT:
569                 DEBUG_REQ(D_MGS, req, "connect");
570                 /* MGS and MDS have same request format for connect */
571                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
572                 rc = target_handle_connect(req);
573                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
574                         /* Make clients trying to reconnect after a MGS restart
575                            happy; also requires obd_replayable */
576                         lustre_msg_add_op_flags(req->rq_repmsg,
577                                                 MSG_CONNECT_RECONNECT);
578                 break;
579         case MGS_DISCONNECT:
580                 DEBUG_REQ(D_MGS, req, "disconnect");
581                 /* MGS and MDS have same request format for disconnect */
582                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
583                 rc = target_handle_disconnect(req);
584                 req->rq_status = rc;            /* superfluous? */
585                 break;
586         case MGS_EXCEPTION:
587                 DEBUG_REQ(D_MGS, req, "exception");
588                 rc = mgs_handle_exception(req);
589                 break;
590         case MGS_TARGET_REG:
591                 DEBUG_REQ(D_MGS, req, "target add");
592                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
593                 rc = mgs_handle_target_reg(req);
594                 break;
595         case MGS_TARGET_DEL:
596                 DEBUG_REQ(D_MGS, req, "target del");
597                 rc = mgs_handle_target_del(req);
598                 break;
599         case MGS_SET_INFO:
600                 DEBUG_REQ(D_MGS, req, "set_info");
601                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
602                 rc = mgs_set_info_rpc(req);
603                 break;
604
605         case LDLM_ENQUEUE:
606                 DEBUG_REQ(D_MGS, req, "enqueue");
607                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
608                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
609                                          ldlm_server_blocking_ast, NULL);
610                 break;
611         case LDLM_BL_CALLBACK:
612         case LDLM_CP_CALLBACK:
613                 DEBUG_REQ(D_MGS, req, "callback");
614                 CERROR("callbacks should not happen on MGS\n");
615                 LBUG();
616                 break;
617
618         case OBD_PING:
619                 DEBUG_REQ(D_INFO, req, "ping");
620                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
621                 rc = target_handle_ping(req);
622                 break;
623         case OBD_LOG_CANCEL:
624                 DEBUG_REQ(D_MGS, req, "log cancel");
625                 rc = -ENOTSUPP; /* la la la */
626                 break;
627
628         case LLOG_ORIGIN_HANDLE_CREATE:
629                 DEBUG_REQ(D_MGS, req, "llog_init");
630                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
631                 rc = llog_origin_handle_create(req);
632                 break;
633         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
634                 DEBUG_REQ(D_MGS, req, "llog next block");
635                 req_capsule_set(&req->rq_pill,
636                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
637                 rc = llog_origin_handle_next_block(req);
638                 break;
639         case LLOG_ORIGIN_HANDLE_READ_HEADER:
640                 DEBUG_REQ(D_MGS, req, "llog read header");
641                 req_capsule_set(&req->rq_pill,
642                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
643                 rc = llog_origin_handle_read_header(req);
644                 break;
645         case LLOG_ORIGIN_HANDLE_CLOSE:
646                 DEBUG_REQ(D_MGS, req, "llog close");
647                 rc = llog_origin_handle_close(req);
648                 break;
649         case LLOG_CATINFO:
650                 DEBUG_REQ(D_MGS, req, "llog catinfo");
651                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
652                 rc = llog_catinfo(req);
653                 break;
654         default:
655                 req->rq_status = -ENOTSUPP;
656                 rc = ptlrpc_error(req);
657                 RETURN(rc);
658         }
659
660         LASSERT(current->journal_info == NULL);
661
662         if (rc)
663                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
664
665 out:
666         target_send_reply(req, rc, fail);
667         RETURN(0);
668 }
669
670 static inline int mgs_destroy_export(struct obd_export *exp)
671 {
672         ENTRY;
673
674         target_destroy_export(exp);
675
676         RETURN(0);
677 }
678
679 /* from mdt_iocontrol */
680 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
681                   void *karg, void *uarg)
682 {
683         struct obd_device *obd = exp->exp_obd;
684         struct obd_ioctl_data *data = karg;
685         struct lvfs_run_ctxt saved;
686         int rc = 0;
687
688         ENTRY;
689         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
690
691         switch (cmd) {
692
693         case OBD_IOC_PARAM: {
694                 struct lustre_handle lockh;
695                 struct lustre_cfg *lcfg;
696                 struct llog_rec_hdr rec;
697                 char fsname[MTI_NAME_MAXLEN];
698                 int lockrc;
699
700                 rec.lrh_len = llog_data_len(data->ioc_plen1);
701
702                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
703                         rec.lrh_type = OBD_CFG_REC;
704                 } else {
705                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
706                         RETURN(-EINVAL);
707                 }
708
709                 OBD_ALLOC(lcfg, data->ioc_plen1);
710                 if (lcfg == NULL)
711                         RETURN(-ENOMEM);
712                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
713                 if (rc)
714                         GOTO(out_free, rc);
715
716                 if (lcfg->lcfg_bufcount < 1)
717                         GOTO(out_free, rc = -EINVAL);
718
719                 rc = mgs_setparam(obd, lcfg, fsname);
720                 if (rc) {
721                         CERROR("setparam err %d\n", rc);
722                         GOTO(out_free, rc);
723                 }
724
725                 /* Revoke lock so everyone updates.  Should be alright if
726                    someone was already reading while we were updating the logs,
727                    so we don't really need to hold the lock while we're
728                    writing (above). */
729                 if (fsname[0]) {
730                         lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
731                         if (lockrc != ELDLM_OK)
732                                 CERROR("lock error %d for fs %s\n", lockrc,
733                                        fsname);
734                         else
735                                 mgs_put_cfg_lock(&lockh);
736                 }
737
738 out_free:
739                 OBD_FREE(lcfg, data->ioc_plen1);
740                 RETURN(rc);
741         }
742
743         case OBD_IOC_DUMP_LOG: {
744                 struct llog_ctxt *ctxt;
745                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
746                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
747                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
748                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
749                 llog_ctxt_put(ctxt);
750
751                 RETURN(rc);
752         }
753
754         case OBD_IOC_LLOG_CHECK:
755         case OBD_IOC_LLOG_INFO:
756         case OBD_IOC_LLOG_PRINT: {
757                 struct llog_ctxt *ctxt;
758                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
759
760                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
761                 rc = llog_ioctl(ctxt, cmd, data);
762                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
763                 llog_ctxt_put(ctxt);
764
765                 RETURN(rc);
766         }
767
768         default:
769                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
770                 RETURN(-EINVAL);
771         }
772         RETURN(0);
773 }
774
775 /* use obd ops to offer management infrastructure */
776 static struct obd_ops mgs_obd_ops = {
777         .o_owner           = THIS_MODULE,
778         .o_connect         = mgs_connect,
779         .o_disconnect      = mgs_disconnect,
780         .o_setup           = mgs_setup,
781         .o_precleanup      = mgs_precleanup,
782         .o_cleanup         = mgs_cleanup,
783         .o_destroy_export  = mgs_destroy_export,
784         .o_iocontrol       = mgs_iocontrol,
785         .o_llog_init       = mgs_llog_init,
786         .o_llog_finish     = mgs_llog_finish
787 };
788
789 static int __init mgs_init(void)
790 {
791         struct lprocfs_static_vars lvars;
792
793         lprocfs_mgs_init_vars(&lvars);
794         class_register_type(&mgs_obd_ops, NULL,
795                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
796
797         return 0;
798 }
799
800 static void /*__exit*/ mgs_exit(void)
801 {
802         class_unregister_type(LUSTRE_MGS_NAME);
803 }
804
805 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
806 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
807 MODULE_LICENSE("GPL");
808
809 module_init(mgs_init);
810 module_exit(mgs_exit);