Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgs/mgs_handler.c
5  *  Lustre Management Server (mgs) request handler
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MGS
30 #define D_MGS D_CONFIG/*|D_WARNING*/
31
32 #ifdef __KERNEL__
33 # include <linux/module.h>
34 # include <linux/pagemap.h>
35 # include <linux/miscdevice.h>
36 # include <linux/init.h>
37 #else
38 # include <liblustre.h>
39 #endif
40
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lprocfs_status.h>
44 #include <lustre_fsfilt.h>
45 #include <lustre_commit_confd.h>
46 #include <lustre_disk.h>
47 #include "mgs_internal.h"
48
49
50 /* Establish a connection to the MGS.*/
51 static int mgs_connect(const struct lu_env *env,
52                        struct lustre_handle *conn, struct obd_device *obd,
53                        struct obd_uuid *cluuid, struct obd_connect_data *data,
54                        void *localdata)
55 {
56         struct obd_export *exp;
57         int rc;
58         ENTRY;
59
60         if (!conn || !obd || !cluuid)
61                 RETURN(-EINVAL);
62
63         rc = class_connect(conn, obd, cluuid);
64         if (rc)
65                 RETURN(rc);
66         exp = class_conn2export(conn);
67         LASSERT(exp);
68
69         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
70
71         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
72
73         if (data != NULL) {
74                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
75                 exp->exp_connect_flags = data->ocd_connect_flags;
76                 data->ocd_version = LUSTRE_VERSION_CODE;
77         }
78
79         if (rc) {
80                 class_disconnect(exp);
81         } else {
82                 class_export_put(exp);
83         }
84
85         RETURN(rc);
86 }
87
88 static int mgs_disconnect(struct obd_export *exp)
89 {
90         int rc;
91         ENTRY;
92
93         LASSERT(exp);
94
95         class_export_get(exp);
96         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
97
98         /* Disconnect early so that clients can't keep using export */
99         rc = class_disconnect(exp);
100         ldlm_cancel_locks_for_export(exp);
101
102         /* complete all outstanding replies */
103         spin_lock(&exp->exp_lock);
104         while (!list_empty(&exp->exp_outstanding_replies)) {
105                 struct ptlrpc_reply_state *rs =
106                         list_entry(exp->exp_outstanding_replies.next,
107                                    struct ptlrpc_reply_state, rs_exp_list);
108                 struct ptlrpc_service *svc = rs->rs_service;
109
110                 spin_lock(&svc->srv_lock);
111                 list_del_init(&rs->rs_exp_list);
112                 ptlrpc_schedule_difficult_reply(rs);
113                 spin_unlock(&svc->srv_lock);
114         }
115         spin_unlock(&exp->exp_lock);
116
117         class_export_put(exp);
118         RETURN(rc);
119 }
120
121 static int mgs_cleanup(struct obd_device *obd);
122 static int mgs_handle(struct ptlrpc_request *req);
123
124 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
125                          struct obd_device *tgt, int count,
126                          struct llog_catid *logid, struct obd_uuid *uuid)
127 {
128         int rc;
129         ENTRY;
130
131         LASSERT(olg == &obd->obd_olg);
132         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
133                         &llog_lvfs_ops);
134         RETURN(rc);
135 }
136
137 static int mgs_llog_finish(struct obd_device *obd, int count)
138 {
139         struct llog_ctxt *ctxt;
140         int rc = 0;
141         ENTRY;
142
143         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
144         if (ctxt)
145                 rc = llog_cleanup(ctxt);
146
147         RETURN(rc);
148 }
149
150 /* Start the MGS obd */
151 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
152 {
153         struct lprocfs_static_vars lvars;
154         struct mgs_obd *mgs = &obd->u.mgs;
155         struct lustre_mount_info *lmi;
156         struct lustre_sb_info *lsi;
157         struct vfsmount *mnt;
158         int rc = 0;
159         ENTRY;
160
161         CDEBUG(D_CONFIG, "Starting MGS\n");
162
163         /* Find our disk */
164         lmi = server_get_mount(obd->obd_name);
165         if (!lmi)
166                 RETURN(rc = -EINVAL);
167
168         mnt = lmi->lmi_mnt;
169         lsi = s2lsi(lmi->lmi_sb);
170         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
171         if (IS_ERR(obd->obd_fsops))
172                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
173
174         /* namespace for mgs llog */
175         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
176                                                 LDLM_NAMESPACE_MODEST);
177         if (obd->obd_namespace == NULL)
178                 GOTO(err_ops, rc = -ENOMEM);
179
180         /* ldlm setup */
181         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
182                            "mgs_ldlm_client", &obd->obd_ldlm_client);
183
184         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
185
186         rc = mgs_fs_setup(obd, mnt);
187         if (rc) {
188                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
189                        obd->obd_name, rc);
190                 GOTO(err_ns, rc);
191         }
192
193         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
194         if (rc)
195                 GOTO(err_fs, rc);
196
197         /* No recovery for MGC's */
198         obd->obd_replayable = 0;
199
200         /* Internal mgs setup */
201         mgs_init_fsdb_list(obd);
202         sema_init(&mgs->mgs_sem, 1);
203
204         /* Start the service threads */
205         mgs->mgs_service =
206                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
207                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
208                                 MGC_REPLY_PORTAL, 2000,
209                                 mgs_handle, LUSTRE_MGS_NAME,
210                                 obd->obd_proc_entry, target_print_req,
211                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
212                                 "ll_mgs", LCT_MD_THREAD);
213
214         if (!mgs->mgs_service) {
215                 CERROR("failed to start service\n");
216                 GOTO(err_llog, rc = -ENOMEM);
217         }
218
219         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
220         if (rc)
221                 GOTO(err_thread, rc);
222
223         /* Setup proc */
224         lprocfs_mgs_init_vars(&lvars);
225         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
226                 lproc_mgs_setup(obd);
227         }
228
229         ping_evictor_start();
230
231         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
232
233         RETURN(0);
234
235 err_thread:
236         ptlrpc_unregister_service(mgs->mgs_service);
237 err_llog:
238         obd_llog_finish(obd, 0);
239 err_fs:
240         /* No extra cleanup needed for llog_init_commit_thread() */
241         mgs_fs_cleanup(obd);
242 err_ns:
243         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
244         obd->obd_namespace = NULL;
245 err_ops:
246         fsfilt_put_ops(obd->obd_fsops);
247 err_put:
248         server_put_mount(obd->obd_name, mnt);
249         mgs->mgs_sb = 0;
250         return rc;
251 }
252
253 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
254 {
255         int rc = 0;
256         ENTRY;
257
258         switch (stage) {
259         case OBD_CLEANUP_EARLY:
260         case OBD_CLEANUP_EXPORTS:
261                 break;
262         case OBD_CLEANUP_SELF_EXP:
263                 rc = obd_llog_finish(obd, 0);
264                 break;
265         case OBD_CLEANUP_OBD:
266                 break;
267         }
268         RETURN(rc);
269 }
270
271 /**
272  * Performs cleanup procedures for passed \a obd given it is mgs obd.
273  */
274 static int mgs_cleanup(struct obd_device *obd)
275 {
276         struct mgs_obd *mgs = &obd->u.mgs;
277         ENTRY;
278
279         if (mgs->mgs_sb == NULL)
280                 RETURN(0);
281
282         ping_evictor_stop();
283
284         ptlrpc_unregister_service(mgs->mgs_service);
285
286         mgs_cleanup_fsdb_list(obd);
287         lproc_mgs_cleanup(obd);
288         mgs_fs_cleanup(obd);
289
290         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
291         mgs->mgs_sb = NULL;
292
293         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
294         obd->obd_namespace = NULL;
295
296         fsfilt_put_ops(obd->obd_fsops);
297
298         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
299         RETURN(0);
300 }
301
302 /* similar to filter_prepare_destroy */
303 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
304                             struct lustre_handle *lockh)
305 {
306         struct ldlm_res_id res_id;
307         int rc, flags = 0;
308         ENTRY;
309
310         rc = mgc_fsname2resid(fsname, &res_id);
311         if (!rc)
312                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
313                                             LDLM_PLAIN, NULL, LCK_EX,
314                                             &flags, ldlm_blocking_ast,
315                                             ldlm_completion_ast, NULL,
316                                             fsname, 0, NULL, lockh);
317         if (rc)
318                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
319
320         RETURN(rc);
321 }
322
323 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
324 {
325         ENTRY;
326         ldlm_lock_decref(lockh, LCK_EX);
327         RETURN(0);
328 }
329
330 /* rc=0 means ok
331       1 means update
332      <0 means error */
333 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
334 {
335         int rc;
336         ENTRY;
337
338         rc = mgs_check_index(obd, mti);
339         if (rc == 0) {
340                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
341                                    "this MGS does not know about it. Assuming"
342                                    " writeconf.\n", mti->mti_svname);
343                 mti->mti_flags |= LDD_F_WRITECONF;
344                 rc = 1;
345         } else if (rc == -1) {
346                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
347                                    "disappeared! Regenerating all logs.\n",
348                                    mti->mti_fsname);
349                 mti->mti_flags |= LDD_F_WRITECONF;
350                 rc = 1;
351         } else {
352                 /* Index is correctly marked as used */
353
354                 /* If the logs don't contain the mti_nids then add
355                    them as failover nids */
356                 rc = mgs_check_failnid(obd, mti);
357         }
358
359         RETURN(rc);
360 }
361
362 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
363 static int mgs_handle_target_reg(struct ptlrpc_request *req)
364 {
365         struct obd_device *obd = req->rq_export->exp_obd;
366         struct lustre_handle lockh;
367         struct mgs_target_info *mti, *rep_mti;
368         int rc = 0, lockrc;
369         ENTRY;
370
371         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
372
373         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
374         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
375                                 LDD_F_UPDATE))) {
376                 /* We're just here as a startup ping. */
377                 CDEBUG(D_MGS, "Server %s is running on %s\n",
378                        mti->mti_svname, obd_export_nid2str(req->rq_export));
379                 rc = mgs_check_target(obd, mti);
380                 /* above will set appropriate mti flags */
381                 if (rc <= 0)
382                         /* Nothing wrong, or fatal error */
383                         GOTO(out_nolock, rc);
384         }
385
386         /* Revoke the config lock to make sure nobody is reading. */
387         /* Although actually I think it should be alright if
388            someone was reading while we were updating the logs - if we
389            revoke at the end they will just update from where they left off. */
390         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
391         if (lockrc != ELDLM_OK) {
392                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
393                                    "update their configuration (%d). Updating "
394                                    "local logs anyhow; you might have to "
395                                    "manually restart other nodes to get the "
396                                    "latest configuration.\n",
397                                    obd->obd_name, lockrc);
398         }
399
400         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
401
402         /* Log writing contention is handled by the fsdb_sem */
403
404         if (mti->mti_flags & LDD_F_WRITECONF) {
405                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
406                     mti->mti_stripe_index == 0) {
407                         rc = mgs_erase_logs(obd, mti->mti_fsname);
408                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
409                                       "request.  All servers must be restarted "
410                                       "in order to regenerate the logs."
411                                       "\n", obd->obd_name, mti->mti_fsname);
412                 } else if (mti->mti_flags &
413                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
414                         rc = mgs_erase_log(obd, mti->mti_svname);
415                         LCONSOLE_WARN("%s: Regenerating %s log by user "
416                                       "request.\n",
417                                       obd->obd_name, mti->mti_svname);
418                 }
419                 mti->mti_flags |= LDD_F_UPDATE;
420                 /* Erased logs means start from scratch. */
421                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
422         }
423
424         /* COMPAT_146 */
425         if (mti->mti_flags & LDD_F_UPGRADE14) {
426                 rc = mgs_upgrade_sv_14(obd, mti);
427                 if (rc) {
428                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
429                         GOTO(out, rc);
430                 }
431                 
432                 /* We're good to go */
433                 mti->mti_flags |= LDD_F_UPDATE;
434         }
435         /* end COMPAT_146 */
436
437         if (mti->mti_flags & LDD_F_UPDATE) {
438                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
439                        mti->mti_stripe_index);
440
441                 /* create or update the target log
442                    and update the client/mdt logs */
443                 rc = mgs_write_log_target(obd, mti);
444                 if (rc) {
445                         CERROR("Failed to write %s log (%d)\n",
446                                mti->mti_svname, rc);
447                         GOTO(out, rc);
448                 }
449
450                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
451                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
452                                     LDD_F_UPGRADE14);
453                 mti->mti_flags |= LDD_F_REWRITE_LDD;
454         }
455
456 out:
457         /* done with log update */
458         if (lockrc == ELDLM_OK)
459                 mgs_put_cfg_lock(&lockh);
460 out_nolock:
461         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
462                mti->mti_stripe_index, rc);
463         rc = req_capsule_server_pack(&req->rq_pill);
464         if (rc)
465                 RETURN(rc);
466
467         /* send back the whole mti in the reply */
468         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
469         *rep_mti = *mti;
470
471         /* Flush logs to disk */
472         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
473         RETURN(rc);
474 }
475
476 static int mgs_set_info_rpc(struct ptlrpc_request *req)
477 {
478         struct obd_device *obd = req->rq_export->exp_obd;
479         struct mgs_send_param *msp, *rep_msp;
480         struct lustre_handle lockh;
481         int lockrc, rc;
482         struct lustre_cfg_bufs bufs;
483         struct lustre_cfg *lcfg;
484         char fsname[MTI_NAME_MAXLEN];
485         ENTRY;
486
487         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
488         LASSERT(msp);
489
490         /* Construct lustre_cfg structure to pass to function mgs_setparam */
491         lustre_cfg_bufs_reset(&bufs, NULL);
492         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
493         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
494         rc = mgs_setparam(obd, lcfg, fsname);
495         if (rc) {
496                 CERROR("Error %d in setting the parameter %s for fs %s\n",
497                        rc, msp->mgs_param, fsname);
498                 RETURN(rc);
499         }
500
501         /* Revoke lock so everyone updates.  Should be alright if
502          * someone was already reading while we were updating the logs,
503          * so we don't really need to hold the lock while we're
504          * writing.
505          */
506         if (fsname[0]) {
507                 lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
508                 if (lockrc != ELDLM_OK)
509                         CERROR("lock error %d for fs %s\n", lockrc,
510                                fsname);
511                 else
512                         mgs_put_cfg_lock(&lockh);
513         }
514         lustre_cfg_free(lcfg);
515
516         rc = req_capsule_server_pack(&req->rq_pill);
517         if (rc == 0) {
518                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
519                 rep_msp = msp;
520         }
521         RETURN(rc);
522 }
523
524 /* Called whenever a target cleans up. */
525 /* XXX - Currently unused */
526 static int mgs_handle_target_del(struct ptlrpc_request *req)
527 {
528         ENTRY;
529         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
530         RETURN(0);
531 }
532
533 /* XXX - Currently unused */
534 static int mgs_handle_exception(struct ptlrpc_request *req)
535 {
536         ENTRY;
537         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
538         RETURN(0);
539 }
540
541 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
542 int mgs_handle(struct ptlrpc_request *req)
543 {
544         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
545         int opc, rc = 0;
546         ENTRY;
547
548         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
549         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
550         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
551                 RETURN(0);
552
553         LASSERT(current->journal_info == NULL);
554         opc = lustre_msg_get_opc(req->rq_reqmsg);
555         if (opc != MGS_CONNECT) {
556                 if (req->rq_export == NULL) {
557                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
558                                opc);
559                         req->rq_status = -ENOTCONN;
560                         GOTO(out, rc = -ENOTCONN);
561                 }
562         }
563
564         switch (opc) {
565         case MGS_CONNECT:
566                 DEBUG_REQ(D_MGS, req, "connect");
567                 /* MGS and MDS have same request format for connect */
568                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
569                 rc = target_handle_connect(req);
570                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
571                         /* Make clients trying to reconnect after a MGS restart
572                            happy; also requires obd_replayable */
573                         lustre_msg_add_op_flags(req->rq_repmsg,
574                                                 MSG_CONNECT_RECONNECT);
575                 break;
576         case MGS_DISCONNECT:
577                 DEBUG_REQ(D_MGS, req, "disconnect");
578                 /* MGS and MDS have same request format for disconnect */
579                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
580                 rc = target_handle_disconnect(req);
581                 req->rq_status = rc;            /* superfluous? */
582                 break;
583         case MGS_EXCEPTION:
584                 DEBUG_REQ(D_MGS, req, "exception");
585                 rc = mgs_handle_exception(req);
586                 break;
587         case MGS_TARGET_REG:
588                 DEBUG_REQ(D_MGS, req, "target add");
589                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
590                 rc = mgs_handle_target_reg(req);
591                 break;
592         case MGS_TARGET_DEL:
593                 DEBUG_REQ(D_MGS, req, "target del");
594                 rc = mgs_handle_target_del(req);
595                 break;
596         case MGS_SET_INFO:
597                 DEBUG_REQ(D_MGS, req, "set_info");
598                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
599                 rc = mgs_set_info_rpc(req);
600                 break;
601
602         case LDLM_ENQUEUE:
603                 DEBUG_REQ(D_MGS, req, "enqueue");
604                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
605                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
606                                          ldlm_server_blocking_ast, NULL);
607                 break;
608         case LDLM_BL_CALLBACK:
609         case LDLM_CP_CALLBACK:
610                 DEBUG_REQ(D_MGS, req, "callback");
611                 CERROR("callbacks should not happen on MGS\n");
612                 LBUG();
613                 break;
614
615         case OBD_PING:
616                 DEBUG_REQ(D_INFO, req, "ping");
617                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
618                 rc = target_handle_ping(req);
619                 break;
620         case OBD_LOG_CANCEL:
621                 DEBUG_REQ(D_MGS, req, "log cancel");
622                 rc = -ENOTSUPP; /* la la la */
623                 break;
624
625         case LLOG_ORIGIN_HANDLE_CREATE:
626                 DEBUG_REQ(D_MGS, req, "llog_init");
627                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
628                 rc = llog_origin_handle_create(req);
629                 break;
630         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
631                 DEBUG_REQ(D_MGS, req, "llog next block");
632                 req_capsule_set(&req->rq_pill,
633                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
634                 rc = llog_origin_handle_next_block(req);
635                 break;
636         case LLOG_ORIGIN_HANDLE_READ_HEADER:
637                 DEBUG_REQ(D_MGS, req, "llog read header");
638                 req_capsule_set(&req->rq_pill,
639                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
640                 rc = llog_origin_handle_read_header(req);
641                 break;
642         case LLOG_ORIGIN_HANDLE_CLOSE:
643                 DEBUG_REQ(D_MGS, req, "llog close");
644                 rc = llog_origin_handle_close(req);
645                 break;
646         case LLOG_CATINFO:
647                 DEBUG_REQ(D_MGS, req, "llog catinfo");
648                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
649                 rc = llog_catinfo(req);
650                 break;
651         default:
652                 req->rq_status = -ENOTSUPP;
653                 rc = ptlrpc_error(req);
654                 RETURN(rc);
655         }
656
657         LASSERT(current->journal_info == NULL);
658
659         if (rc)
660                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
661
662 out:
663         target_send_reply(req, rc, fail);
664         RETURN(0);
665 }
666
667 static inline int mgs_destroy_export(struct obd_export *exp)
668 {
669         ENTRY;
670
671         target_destroy_export(exp);
672
673         RETURN(0);
674 }
675
676 /* from mdt_iocontrol */
677 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
678                   void *karg, void *uarg)
679 {
680         struct obd_device *obd = exp->exp_obd;
681         struct obd_ioctl_data *data = karg;
682         struct lvfs_run_ctxt saved;
683         int rc = 0;
684
685         ENTRY;
686         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
687
688         switch (cmd) {
689
690         case OBD_IOC_PARAM: {
691                 struct lustre_handle lockh;
692                 struct lustre_cfg *lcfg;
693                 struct llog_rec_hdr rec;
694                 char fsname[MTI_NAME_MAXLEN];
695                 int lockrc;
696
697                 rec.lrh_len = llog_data_len(data->ioc_plen1);
698
699                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
700                         rec.lrh_type = OBD_CFG_REC;
701                 } else {
702                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
703                         RETURN(-EINVAL);
704                 }
705
706                 OBD_ALLOC(lcfg, data->ioc_plen1);
707                 if (lcfg == NULL)
708                         RETURN(-ENOMEM);
709                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
710                 if (rc)
711                         GOTO(out_free, rc);
712
713                 if (lcfg->lcfg_bufcount < 1)
714                         GOTO(out_free, rc = -EINVAL);
715
716                 rc = mgs_setparam(obd, lcfg, fsname);
717                 if (rc) {
718                         CERROR("setparam err %d\n", rc);
719                         GOTO(out_free, rc);
720                 }
721
722                 /* Revoke lock so everyone updates.  Should be alright if
723                    someone was already reading while we were updating the logs,
724                    so we don't really need to hold the lock while we're
725                    writing (above). */
726                 if (fsname[0]) {
727                         lockrc = mgs_get_cfg_lock(obd, fsname, &lockh);
728                         if (lockrc != ELDLM_OK)
729                                 CERROR("lock error %d for fs %s\n", lockrc,
730                                        fsname);
731                         else
732                                 mgs_put_cfg_lock(&lockh);
733                 }
734
735 out_free:
736                 OBD_FREE(lcfg, data->ioc_plen1);
737                 RETURN(rc);
738         }
739
740         case OBD_IOC_DUMP_LOG: {
741                 struct llog_ctxt *ctxt;
742                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
743                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
744                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
745                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
746                 llog_ctxt_put(ctxt);
747
748                 RETURN(rc);
749         }
750
751         case OBD_IOC_LLOG_CHECK:
752         case OBD_IOC_LLOG_INFO:
753         case OBD_IOC_LLOG_PRINT: {
754                 struct llog_ctxt *ctxt;
755                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
756
757                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
758                 rc = llog_ioctl(ctxt, cmd, data);
759                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
760                 llog_ctxt_put(ctxt);
761
762                 RETURN(rc);
763         }
764
765         default:
766                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
767                 RETURN(-EINVAL);
768         }
769         RETURN(0);
770 }
771
772 /* use obd ops to offer management infrastructure */
773 static struct obd_ops mgs_obd_ops = {
774         .o_owner           = THIS_MODULE,
775         .o_connect         = mgs_connect,
776         .o_disconnect      = mgs_disconnect,
777         .o_setup           = mgs_setup,
778         .o_precleanup      = mgs_precleanup,
779         .o_cleanup         = mgs_cleanup,
780         .o_destroy_export  = mgs_destroy_export,
781         .o_iocontrol       = mgs_iocontrol,
782         .o_llog_init       = mgs_llog_init,
783         .o_llog_finish     = mgs_llog_finish
784 };
785
786 static int __init mgs_init(void)
787 {
788         struct lprocfs_static_vars lvars;
789
790         lprocfs_mgs_init_vars(&lvars);
791         class_register_type(&mgs_obd_ops, NULL,
792                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
793
794         return 0;
795 }
796
797 static void /*__exit*/ mgs_exit(void)
798 {
799         class_unregister_type(LUSTRE_MGS_NAME);
800 }
801
802 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
803 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
804 MODULE_LICENSE("GPL");
805
806 module_init(mgs_init);
807 module_exit(mgs_exit);