Whamcloud - gitweb
b=14836
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct lustre_handle *conn, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *exp;
71         int rc;
72         ENTRY;
73
74         if (!conn || !obd || !cluuid)
75                 RETURN(-EINVAL);
76
77         rc = class_connect(conn, obd, cluuid);
78         if (rc)
79                 RETURN(rc);
80         exp = class_conn2export(conn);
81         LASSERT(exp);
82
83         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
84
85         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
86
87         if (data != NULL) {
88                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
89                 exp->exp_connect_flags = data->ocd_connect_flags;
90                 data->ocd_version = LUSTRE_VERSION_CODE;
91         }
92
93         if (rc) {
94                 class_disconnect(exp);
95         } else {
96                 class_export_put(exp);
97         }
98
99         RETURN(rc);
100 }
101
102 static int mgs_reconnect(const struct lu_env *env,
103                          struct obd_export *exp, struct obd_device *obd,
104                          struct obd_uuid *cluuid, struct obd_connect_data *data)
105 {
106         ENTRY;
107
108         if (exp == NULL || obd == NULL || cluuid == NULL)
109                 RETURN(-EINVAL);
110
111         if (data != NULL) {
112                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
113                 exp->exp_connect_flags = data->ocd_connect_flags;
114                 data->ocd_version = LUSTRE_VERSION_CODE;
115         }
116
117         RETURN(0);
118 }
119
120 static int mgs_disconnect(struct obd_export *exp)
121 {
122         int rc;
123         ENTRY;
124
125         LASSERT(exp);
126
127         class_export_get(exp);
128         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
129
130         /* Disconnect early so that clients can't keep using export */
131         rc = class_disconnect(exp);
132         ldlm_cancel_locks_for_export(exp);
133
134         lprocfs_exp_cleanup(exp);
135
136         /* complete all outstanding replies */
137         spin_lock(&exp->exp_lock);
138         while (!list_empty(&exp->exp_outstanding_replies)) {
139                 struct ptlrpc_reply_state *rs =
140                         list_entry(exp->exp_outstanding_replies.next,
141                                    struct ptlrpc_reply_state, rs_exp_list);
142                 struct ptlrpc_service *svc = rs->rs_service;
143
144                 spin_lock(&svc->srv_lock);
145                 list_del_init(&rs->rs_exp_list);
146                 ptlrpc_schedule_difficult_reply(rs);
147                 spin_unlock(&svc->srv_lock);
148         }
149         spin_unlock(&exp->exp_lock);
150
151         class_export_put(exp);
152         RETURN(rc);
153 }
154
155 static int mgs_cleanup(struct obd_device *obd);
156 static int mgs_handle(struct ptlrpc_request *req);
157
158 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
159                          struct obd_device *tgt, int count,
160                          struct llog_catid *logid, struct obd_uuid *uuid)
161 {
162         int rc;
163         ENTRY;
164
165         LASSERT(olg == &obd->obd_olg);
166         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
167                         &llog_lvfs_ops);
168         RETURN(rc);
169 }
170
171 static int mgs_llog_finish(struct obd_device *obd, int count)
172 {
173         struct llog_ctxt *ctxt;
174         int rc = 0;
175         ENTRY;
176
177         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
178         if (ctxt)
179                 rc = llog_cleanup(ctxt);
180
181         RETURN(rc);
182 }
183
184 /* Start the MGS obd */
185 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
186 {
187         struct lprocfs_static_vars lvars;
188         struct mgs_obd *mgs = &obd->u.mgs;
189         struct lustre_mount_info *lmi;
190         struct lustre_sb_info *lsi;
191         struct vfsmount *mnt;
192         int rc = 0;
193         ENTRY;
194
195         CDEBUG(D_CONFIG, "Starting MGS\n");
196
197         /* Find our disk */
198         lmi = server_get_mount(obd->obd_name);
199         if (!lmi)
200                 RETURN(rc = -EINVAL);
201
202         mnt = lmi->lmi_mnt;
203         lsi = s2lsi(lmi->lmi_sb);
204         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
205         if (IS_ERR(obd->obd_fsops))
206                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
207
208         /* namespace for mgs llog */
209         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
210                                                 LDLM_NAMESPACE_MODEST);
211         if (obd->obd_namespace == NULL)
212                 GOTO(err_ops, rc = -ENOMEM);
213
214         /* ldlm setup */
215         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
216                            "mgs_ldlm_client", &obd->obd_ldlm_client);
217
218         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
219
220         rc = mgs_fs_setup(obd, mnt);
221         if (rc) {
222                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
223                        obd->obd_name, rc);
224                 GOTO(err_ns, rc);
225         }
226
227         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
228         if (rc)
229                 GOTO(err_fs, rc);
230
231         /* No recovery for MGC's */
232         obd->obd_replayable = 0;
233
234         /* Internal mgs setup */
235         mgs_init_fsdb_list(obd);
236         sema_init(&mgs->mgs_sem, 1);
237
238         /* Start the service threads */
239         mgs->mgs_service =
240                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
241                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
242                                 MGC_REPLY_PORTAL, 2000,
243                                 mgs_handle, LUSTRE_MGS_NAME,
244                                 obd->obd_proc_entry, target_print_req,
245                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
246                                 "ll_mgs", LCT_MD_THREAD);
247
248         if (!mgs->mgs_service) {
249                 CERROR("failed to start service\n");
250                 GOTO(err_llog, rc = -ENOMEM);
251         }
252
253         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
254         if (rc)
255                 GOTO(err_thread, rc);
256
257         /* Setup proc */
258         lprocfs_mgs_init_vars(&lvars);
259         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
260                 lproc_mgs_setup(obd);
261         }
262
263         ping_evictor_start();
264
265         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
266
267         RETURN(0);
268
269 err_thread:
270         ptlrpc_unregister_service(mgs->mgs_service);
271 err_llog:
272         obd_llog_finish(obd, 0);
273 err_fs:
274         /* No extra cleanup needed for llog_init_commit_thread() */
275         mgs_fs_cleanup(obd);
276 err_ns:
277         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
278         obd->obd_namespace = NULL;
279 err_ops:
280         fsfilt_put_ops(obd->obd_fsops);
281 err_put:
282         server_put_mount(obd->obd_name, mnt);
283         mgs->mgs_sb = 0;
284         return rc;
285 }
286
287 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
288 {
289         int rc = 0;
290         ENTRY;
291
292         switch (stage) {
293         case OBD_CLEANUP_EARLY:
294         case OBD_CLEANUP_EXPORTS:
295                 rc = obd_llog_finish(obd, 0);
296                 break;
297         }
298         RETURN(rc);
299 }
300
301 /**
302  * Performs cleanup procedures for passed \a obd given it is mgs obd.
303  */
304 static int mgs_cleanup(struct obd_device *obd)
305 {
306         struct mgs_obd *mgs = &obd->u.mgs;
307         ENTRY;
308
309         if (mgs->mgs_sb == NULL)
310                 RETURN(0);
311
312         ping_evictor_stop();
313
314         ptlrpc_unregister_service(mgs->mgs_service);
315
316         mgs_cleanup_fsdb_list(obd);
317         lproc_mgs_cleanup(obd);
318         mgs_fs_cleanup(obd);
319
320         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
321         mgs->mgs_sb = NULL;
322
323         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
324         obd->obd_namespace = NULL;
325
326         fsfilt_put_ops(obd->obd_fsops);
327
328         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
329         RETURN(0);
330 }
331
332 /* similar to filter_prepare_destroy */
333 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
334                             struct lustre_handle *lockh)
335 {
336         struct ldlm_res_id res_id;
337         int rc, flags = 0;
338         ENTRY;
339
340         rc = mgc_fsname2resid(fsname, &res_id);
341         if (!rc)
342                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
343                                             LDLM_PLAIN, NULL, LCK_EX,
344                                             &flags, ldlm_blocking_ast,
345                                             ldlm_completion_ast, NULL,
346                                             fsname, 0, NULL, lockh);
347         if (rc)
348                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
349
350         RETURN(rc);
351 }
352
353 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
354 {
355         ENTRY;
356         ldlm_lock_decref(lockh, LCK_EX);
357         RETURN(0);
358 }
359
360 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
361                             struct lustre_handle *lockh)
362 {
363         int lockrc;
364
365         if (fsname[0]) {
366                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
367                 if (lockrc != ELDLM_OK)
368                         CERROR("lock error %d for fs %s\n", lockrc,
369                                fsname);
370                 else
371                         mgs_put_cfg_lock(lockh);
372         }
373 }
374
375 /* rc=0 means ok
376       1 means update
377      <0 means error */
378 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
379 {
380         int rc;
381         ENTRY;
382
383         rc = mgs_check_index(obd, mti);
384         if (rc == 0) {
385                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
386                                    "this MGS does not know about it. Assuming"
387                                    " writeconf.\n", mti->mti_svname);
388                 mti->mti_flags |= LDD_F_WRITECONF;
389                 rc = 1;
390         } else if (rc == -1) {
391                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
392                                    "disappeared! Regenerating all logs.\n",
393                                    mti->mti_fsname);
394                 mti->mti_flags |= LDD_F_WRITECONF;
395                 rc = 1;
396         } else {
397                 /* Index is correctly marked as used */
398
399                 /* If the logs don't contain the mti_nids then add
400                    them as failover nids */
401                 rc = mgs_check_failnid(obd, mti);
402         }
403
404         RETURN(rc);
405 }
406
407 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
408 static int mgs_handle_target_reg(struct ptlrpc_request *req)
409 {
410         struct obd_device *obd = req->rq_export->exp_obd;
411         struct lustre_handle lockh;
412         struct mgs_target_info *mti, *rep_mti;
413         int rc = 0, lockrc;
414         ENTRY;
415
416         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
417
418         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
419         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
420                                 LDD_F_UPDATE))) {
421                 /* We're just here as a startup ping. */
422                 CDEBUG(D_MGS, "Server %s is running on %s\n",
423                        mti->mti_svname, obd_export_nid2str(req->rq_export));
424                 rc = mgs_check_target(obd, mti);
425                 /* above will set appropriate mti flags */
426                 if (rc <= 0)
427                         /* Nothing wrong, or fatal error */
428                         GOTO(out_nolock, rc);
429         }
430
431         /* Revoke the config lock to make sure nobody is reading. */
432         /* Although actually I think it should be alright if
433            someone was reading while we were updating the logs - if we
434            revoke at the end they will just update from where they left off. */
435         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
436         if (lockrc != ELDLM_OK) {
437                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
438                                    "update their configuration (%d). Updating "
439                                    "local logs anyhow; you might have to "
440                                    "manually restart other nodes to get the "
441                                    "latest configuration.\n",
442                                    obd->obd_name, lockrc);
443         }
444
445         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
446
447         /* Log writing contention is handled by the fsdb_sem */
448
449         if (mti->mti_flags & LDD_F_WRITECONF) {
450                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
451                     mti->mti_stripe_index == 0) {
452                         rc = mgs_erase_logs(obd, mti->mti_fsname);
453                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
454                                       "request.  All servers must be restarted "
455                                       "in order to regenerate the logs."
456                                       "\n", obd->obd_name, mti->mti_fsname);
457                 } else if (mti->mti_flags &
458                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
459                         rc = mgs_erase_log(obd, mti->mti_svname);
460                         LCONSOLE_WARN("%s: Regenerating %s log by user "
461                                       "request.\n",
462                                       obd->obd_name, mti->mti_svname);
463                 }
464                 mti->mti_flags |= LDD_F_UPDATE;
465                 /* Erased logs means start from scratch. */
466                 mti->mti_flags &= ~LDD_F_UPGRADE14; 
467         }
468
469         /* COMPAT_146 */
470         if (mti->mti_flags & LDD_F_UPGRADE14) {
471                 rc = mgs_upgrade_sv_14(obd, mti);
472                 if (rc) {
473                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
474                         GOTO(out, rc);
475                 }
476                 
477                 /* We're good to go */
478                 mti->mti_flags |= LDD_F_UPDATE;
479         }
480         /* end COMPAT_146 */
481
482         if (mti->mti_flags & LDD_F_UPDATE) {
483                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
484                        mti->mti_stripe_index);
485
486                 /* create or update the target log
487                    and update the client/mdt logs */
488                 rc = mgs_write_log_target(obd, mti);
489                 if (rc) {
490                         CERROR("Failed to write %s log (%d)\n",
491                                mti->mti_svname, rc);
492                         GOTO(out, rc);
493                 }
494
495                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
496                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
497                                     LDD_F_UPGRADE14);
498                 mti->mti_flags |= LDD_F_REWRITE_LDD;
499         }
500
501 out:
502         /* done with log update */
503         if (lockrc == ELDLM_OK)
504                 mgs_put_cfg_lock(&lockh);
505 out_nolock:
506         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
507                mti->mti_stripe_index, rc);
508         rc = req_capsule_server_pack(&req->rq_pill);
509         if (rc)
510                 RETURN(rc);
511
512         /* send back the whole mti in the reply */
513         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
514         *rep_mti = *mti;
515
516         /* Flush logs to disk */
517         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
518         RETURN(rc);
519 }
520
521 static int mgs_set_info_rpc(struct ptlrpc_request *req)
522 {
523         struct obd_device *obd = req->rq_export->exp_obd;
524         struct mgs_send_param *msp, *rep_msp;
525         struct lustre_handle lockh;
526         int rc;
527         struct lustre_cfg_bufs bufs;
528         struct lustre_cfg *lcfg;
529         char fsname[MTI_NAME_MAXLEN];
530         ENTRY;
531
532         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
533         LASSERT(msp);
534
535         /* Construct lustre_cfg structure to pass to function mgs_setparam */
536         lustre_cfg_bufs_reset(&bufs, NULL);
537         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
538         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
539         rc = mgs_setparam(obd, lcfg, fsname);
540         if (rc) {
541                 CERROR("Error %d in setting the parameter %s for fs %s\n",
542                        rc, msp->mgs_param, fsname);
543                 RETURN(rc);
544         }
545
546         /* request for update */
547         mgs_revoke_lock(obd, fsname, &lockh);
548
549         lustre_cfg_free(lcfg);
550
551         rc = req_capsule_server_pack(&req->rq_pill);
552         if (rc == 0) {
553                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
554                 rep_msp = msp;
555         }
556         RETURN(rc);
557 }
558
559 /* Called whenever a target cleans up. */
560 /* XXX - Currently unused */
561 static int mgs_handle_target_del(struct ptlrpc_request *req)
562 {
563         ENTRY;
564         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
565         RETURN(0);
566 }
567
568 /* XXX - Currently unused */
569 static int mgs_handle_exception(struct ptlrpc_request *req)
570 {
571         ENTRY;
572         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
573         RETURN(0);
574 }
575
576 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
577 int mgs_handle(struct ptlrpc_request *req)
578 {
579         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
580         int opc, rc = 0;
581         ENTRY;
582
583         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
584         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
585         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
586                 RETURN(0);
587
588         LASSERT(current->journal_info == NULL);
589         opc = lustre_msg_get_opc(req->rq_reqmsg);
590         if (opc != MGS_CONNECT) {
591                 if (req->rq_export == NULL) {
592                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
593                                opc);
594                         req->rq_status = -ENOTCONN;
595                         GOTO(out, rc = -ENOTCONN);
596                 }
597         }
598
599         switch (opc) {
600         case MGS_CONNECT:
601                 DEBUG_REQ(D_MGS, req, "connect");
602                 /* MGS and MDS have same request format for connect */
603                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
604                 rc = target_handle_connect(req);
605                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
606                         /* Make clients trying to reconnect after a MGS restart
607                            happy; also requires obd_replayable */
608                         lustre_msg_add_op_flags(req->rq_repmsg,
609                                                 MSG_CONNECT_RECONNECT);
610                 break;
611         case MGS_DISCONNECT:
612                 DEBUG_REQ(D_MGS, req, "disconnect");
613                 /* MGS and MDS have same request format for disconnect */
614                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
615                 rc = target_handle_disconnect(req);
616                 req->rq_status = rc;            /* superfluous? */
617                 break;
618         case MGS_EXCEPTION:
619                 DEBUG_REQ(D_MGS, req, "exception");
620                 rc = mgs_handle_exception(req);
621                 break;
622         case MGS_TARGET_REG:
623                 DEBUG_REQ(D_MGS, req, "target add");
624                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
625                 rc = mgs_handle_target_reg(req);
626                 break;
627         case MGS_TARGET_DEL:
628                 DEBUG_REQ(D_MGS, req, "target del");
629                 rc = mgs_handle_target_del(req);
630                 break;
631         case MGS_SET_INFO:
632                 DEBUG_REQ(D_MGS, req, "set_info");
633                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
634                 rc = mgs_set_info_rpc(req);
635                 break;
636
637         case LDLM_ENQUEUE:
638                 DEBUG_REQ(D_MGS, req, "enqueue");
639                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
640                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
641                                          ldlm_server_blocking_ast, NULL);
642                 break;
643         case LDLM_BL_CALLBACK:
644         case LDLM_CP_CALLBACK:
645                 DEBUG_REQ(D_MGS, req, "callback");
646                 CERROR("callbacks should not happen on MGS\n");
647                 LBUG();
648                 break;
649
650         case OBD_PING:
651                 DEBUG_REQ(D_INFO, req, "ping");
652                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
653                 rc = target_handle_ping(req);
654                 break;
655         case OBD_LOG_CANCEL:
656                 DEBUG_REQ(D_MGS, req, "log cancel");
657                 rc = -ENOTSUPP; /* la la la */
658                 break;
659
660         case LLOG_ORIGIN_HANDLE_CREATE:
661                 DEBUG_REQ(D_MGS, req, "llog_init");
662                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
663                 rc = llog_origin_handle_create(req);
664                 break;
665         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
666                 DEBUG_REQ(D_MGS, req, "llog next block");
667                 req_capsule_set(&req->rq_pill,
668                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
669                 rc = llog_origin_handle_next_block(req);
670                 break;
671         case LLOG_ORIGIN_HANDLE_READ_HEADER:
672                 DEBUG_REQ(D_MGS, req, "llog read header");
673                 req_capsule_set(&req->rq_pill,
674                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
675                 rc = llog_origin_handle_read_header(req);
676                 break;
677         case LLOG_ORIGIN_HANDLE_CLOSE:
678                 DEBUG_REQ(D_MGS, req, "llog close");
679                 rc = llog_origin_handle_close(req);
680                 break;
681         case LLOG_CATINFO:
682                 DEBUG_REQ(D_MGS, req, "llog catinfo");
683                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
684                 rc = llog_catinfo(req);
685                 break;
686         default:
687                 req->rq_status = -ENOTSUPP;
688                 rc = ptlrpc_error(req);
689                 RETURN(rc);
690         }
691
692         LASSERT(current->journal_info == NULL);
693
694         if (rc)
695                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
696
697 out:
698         target_send_reply(req, rc, fail);
699         RETURN(0);
700 }
701
702 static inline int mgs_init_export(struct obd_export *exp)
703 {
704         return ldlm_init_export(exp);
705 }
706
707 static inline int mgs_destroy_export(struct obd_export *exp)
708 {
709         ENTRY;
710
711         target_destroy_export(exp);
712         ldlm_destroy_export(exp);
713
714         RETURN(0);
715 }
716
717 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
718 {
719         char *ptr;
720
721         ENTRY;
722         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
723                 *fsname = *ptr;
724                 fsname++;
725         }
726         if (*ptr == '\0')
727                 return -EINVAL;
728         *fsname = '\0';
729         ptr++;
730         strcpy(poolname, ptr);
731
732         RETURN(0);
733 }
734
735 static int mgs_iocontrol_pool(struct obd_device *obd, 
736                               struct obd_ioctl_data *data)
737 {
738         int rc;
739         struct lustre_handle lockh;
740         struct lustre_cfg *lcfg = NULL;
741         struct llog_rec_hdr rec;
742         char *fsname = NULL;
743         char *poolname = NULL;
744         ENTRY;
745
746         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
747         if (fsname == NULL)
748                 RETURN(-ENOMEM);
749
750         OBD_ALLOC(poolname, MAXPOOLNAME + 1);
751         if (poolname == NULL) {
752                 rc = -ENOMEM;
753                 GOTO(out_pool, rc);
754         }
755         rec.lrh_len = llog_data_len(data->ioc_plen1);
756
757         if (data->ioc_type == LUSTRE_CFG_TYPE) {
758                 rec.lrh_type = OBD_CFG_REC;
759         } else {
760                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
761                 rc = -EINVAL;
762                 GOTO(out_pool, rc);
763         }
764
765         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
766                 rc = -E2BIG;
767                 GOTO(out_pool, rc);
768         }
769
770         OBD_ALLOC(lcfg, data->ioc_plen1);
771         if (lcfg == NULL) {
772                 rc = -ENOMEM;
773                 GOTO(out_pool, rc);
774         }
775         rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
776         if (rc)
777                 GOTO(out_pool, rc);
778
779         if (lcfg->lcfg_bufcount < 2) {
780                 rc = -EINVAL;
781                 GOTO(out_pool, rc);
782         }
783
784         /* first arg is always <fsname>.<poolname> */
785         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
786                             poolname);
787
788         switch (lcfg->lcfg_command) {
789         case LCFG_POOL_NEW: {
790                 if (lcfg->lcfg_bufcount != 2)
791                         RETURN(-EINVAL);
792                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
793                                   poolname, NULL);
794                 break;
795         }
796         case LCFG_POOL_ADD: {
797                 if (lcfg->lcfg_bufcount != 3)
798                         RETURN(-EINVAL);
799                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
800                                   lustre_cfg_string(lcfg, 2));
801                 break;
802         }
803         case LCFG_POOL_REM: {
804                 if (lcfg->lcfg_bufcount != 3)
805                         RETURN(-EINVAL);
806                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
807                                   lustre_cfg_string(lcfg, 2));
808                 break;
809         }
810         case LCFG_POOL_DEL: {
811                 if (lcfg->lcfg_bufcount != 2)
812                         RETURN(-EINVAL);
813                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
814                                   poolname, NULL);
815                 break;
816         }
817         default: {
818                  rc = -EINVAL;
819                  GOTO(out_pool, rc);
820         }
821         }
822
823         if (rc) {
824                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
825                        rc, lcfg->lcfg_command, fsname, poolname);
826                 GOTO(out_pool, rc);
827         }
828
829         /* request for update */
830         mgs_revoke_lock(obd, fsname, &lockh);
831
832 out_pool:
833         if (lcfg != NULL)
834                 OBD_FREE(lcfg, data->ioc_plen1);
835
836         if (fsname != NULL)
837                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
838
839         if (poolname != NULL)
840                 OBD_FREE(poolname, MAXPOOLNAME + 1);
841
842         RETURN(rc);
843 }
844
845 /* from mdt_iocontrol */
846 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
847                   void *karg, void *uarg)
848 {
849         struct obd_device *obd = exp->exp_obd;
850         struct obd_ioctl_data *data = karg;
851         struct lvfs_run_ctxt saved;
852         int rc = 0;
853
854         ENTRY;
855         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
856
857         switch (cmd) {
858
859         case OBD_IOC_PARAM: {
860                 struct lustre_handle lockh;
861                 struct lustre_cfg *lcfg;
862                 struct llog_rec_hdr rec;
863                 char fsname[MTI_NAME_MAXLEN];
864
865                 rec.lrh_len = llog_data_len(data->ioc_plen1);
866
867                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
868                         rec.lrh_type = OBD_CFG_REC;
869                 } else {
870                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
871                         RETURN(-EINVAL);
872                 }
873
874                 OBD_ALLOC(lcfg, data->ioc_plen1);
875                 if (lcfg == NULL)
876                         RETURN(-ENOMEM);
877                 rc = copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1);
878                 if (rc)
879                         GOTO(out_free, rc);
880
881                 if (lcfg->lcfg_bufcount < 1)
882                         GOTO(out_free, rc = -EINVAL);
883
884                 rc = mgs_setparam(obd, lcfg, fsname);
885                 if (rc) {
886                         CERROR("setparam err %d\n", rc);
887                         GOTO(out_free, rc);
888                 }
889
890                 /* Revoke lock so everyone updates.  Should be alright if
891                    someone was already reading while we were updating the logs,
892                    so we don't really need to hold the lock while we're
893                    writing (above). */
894                 mgs_revoke_lock(obd, fsname, &lockh);
895
896 out_free:
897                 OBD_FREE(lcfg, data->ioc_plen1);
898                 RETURN(rc);
899         }
900
901         case OBD_IOC_POOL: {
902                 RETURN(mgs_iocontrol_pool(obd, data));
903         }
904
905         case OBD_IOC_DUMP_LOG: {
906                 struct llog_ctxt *ctxt;
907                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
908                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
909                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
910                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
911                 llog_ctxt_put(ctxt);
912
913                 RETURN(rc);
914         }
915
916         case OBD_IOC_LLOG_CHECK:
917         case OBD_IOC_LLOG_INFO:
918         case OBD_IOC_LLOG_PRINT: {
919                 struct llog_ctxt *ctxt;
920                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
921
922                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
923                 rc = llog_ioctl(ctxt, cmd, data);
924                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
925                 llog_ctxt_put(ctxt);
926
927                 RETURN(rc);
928         }
929
930         default:
931                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
932                 RETURN(-EINVAL);
933         }
934         RETURN(0);
935 }
936
937 /* use obd ops to offer management infrastructure */
938 static struct obd_ops mgs_obd_ops = {
939         .o_owner           = THIS_MODULE,
940         .o_connect         = mgs_connect,
941         .o_reconnect       = mgs_reconnect,
942         .o_disconnect      = mgs_disconnect,
943         .o_setup           = mgs_setup,
944         .o_precleanup      = mgs_precleanup,
945         .o_cleanup         = mgs_cleanup,
946         .o_init_export     = mgs_init_export,
947         .o_destroy_export  = mgs_destroy_export,
948         .o_iocontrol       = mgs_iocontrol,
949         .o_llog_init       = mgs_llog_init,
950         .o_llog_finish     = mgs_llog_finish
951 };
952
953 static int __init mgs_init(void)
954 {
955         struct lprocfs_static_vars lvars;
956
957         lprocfs_mgs_init_vars(&lvars);
958         class_register_type(&mgs_obd_ops, NULL,
959                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
960
961         return 0;
962 }
963
964 static void /*__exit*/ mgs_exit(void)
965 {
966         class_unregister_type(LUSTRE_MGS_NAME);
967 }
968
969 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
970 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
971 MODULE_LICENSE("GPL");
972
973 module_init(mgs_init);
974 module_exit(mgs_exit);