Whamcloud - gitweb
b=15253 add conf_param -d to remove permanent settings
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct obd_export **exp, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *lexp;
71         struct lustre_handle conn = { 0 };
72         int rc;
73         ENTRY;
74
75         if (!exp || !obd || !cluuid)
76                 RETURN(-EINVAL);
77
78         rc = class_connect(&conn, obd, cluuid);
79         if (rc)
80                 RETURN(rc);
81
82         lexp = class_conn2export(&conn);
83         LASSERT(lexp);
84
85         mgs_counter_incr(lexp, LPROC_MGS_CONNECT);
86
87         if (data != NULL) {
88                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
89                 lexp->exp_connect_flags = data->ocd_connect_flags;
90                 data->ocd_version = LUSTRE_VERSION_CODE;
91         }
92
93         rc = mgs_client_add(obd, lexp, localdata);
94
95         if (rc) {
96                 class_disconnect(lexp);
97         } else {
98                 *exp = lexp;
99         }
100
101         RETURN(rc);
102 }
103
104 static int mgs_reconnect(const struct lu_env *env,
105                          struct obd_export *exp, struct obd_device *obd,
106                          struct obd_uuid *cluuid, struct obd_connect_data *data,
107                          void *localdata)
108 {
109         ENTRY;
110
111         if (exp == NULL || obd == NULL || cluuid == NULL)
112                 RETURN(-EINVAL);
113
114         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
115
116         if (data != NULL) {
117                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
118                 exp->exp_connect_flags = data->ocd_connect_flags;
119                 data->ocd_version = LUSTRE_VERSION_CODE;
120         }
121
122         RETURN(0);
123 }
124
125 static int mgs_disconnect(struct obd_export *exp)
126 {
127         int rc;
128         ENTRY;
129
130         LASSERT(exp);
131
132         class_export_get(exp);
133         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
134
135         rc = server_disconnect_export(exp);
136
137         class_export_put(exp);
138         RETURN(rc);
139 }
140
141 static int mgs_cleanup(struct obd_device *obd);
142 static int mgs_handle(struct ptlrpc_request *req);
143
144 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
145                          struct obd_device *tgt, int *index)
146 {
147         int rc;
148         ENTRY;
149
150         LASSERT(olg == &obd->obd_olg);
151         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
152                         &llog_lvfs_ops);
153         RETURN(rc);
154 }
155
156 static int mgs_llog_finish(struct obd_device *obd, int count)
157 {
158         struct llog_ctxt *ctxt;
159         int rc = 0;
160         ENTRY;
161
162         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
163         if (ctxt)
164                 rc = llog_cleanup(ctxt);
165
166         RETURN(rc);
167 }
168
169 /* Start the MGS obd */
170 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
171 {
172         struct lprocfs_static_vars lvars;
173         struct mgs_obd *mgs = &obd->u.mgs;
174         struct lustre_mount_info *lmi;
175         struct lustre_sb_info *lsi;
176         struct vfsmount *mnt;
177         int rc = 0;
178         ENTRY;
179
180         CDEBUG(D_CONFIG, "Starting MGS\n");
181
182         /* Find our disk */
183         lmi = server_get_mount(obd->obd_name);
184         if (!lmi)
185                 RETURN(rc = -EINVAL);
186
187         mnt = lmi->lmi_mnt;
188         lsi = s2lsi(lmi->lmi_sb);
189         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
190         if (IS_ERR(obd->obd_fsops))
191                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
192
193         if (lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb))) {
194                 CERROR("%s: Underlying device is marked as read-only. "
195                        "Setup failed\n", obd->obd_name);
196                 GOTO(err_ops, rc = -EROFS);
197         }
198
199         /* namespace for mgs llog */
200         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
201                                                 LDLM_NAMESPACE_MODEST);
202         if (obd->obd_namespace == NULL)
203                 GOTO(err_ops, rc = -ENOMEM);
204
205         /* ldlm setup */
206         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
207                            "mgs_ldlm_client", &obd->obd_ldlm_client);
208
209         rc = mgs_fs_setup(obd, mnt);
210         if (rc) {
211                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
212                        obd->obd_name, rc);
213                 GOTO(err_ns, rc);
214         }
215
216         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
217         if (rc)
218                 GOTO(err_fs, rc);
219
220         /* No recovery for MGC's */
221         obd->obd_replayable = 0;
222
223         /* Internal mgs setup */
224         mgs_init_fsdb_list(obd);
225         cfs_sema_init(&mgs->mgs_sem, 1);
226
227         /* Setup proc */
228         lprocfs_mgs_init_vars(&lvars);
229         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
230                 lproc_mgs_setup(obd);
231                 rc = lprocfs_alloc_md_stats(obd, LPROC_MGS_LAST);
232                 if (rc)
233                         GOTO(err_llog, rc);
234         }
235
236         /* Start the service threads */
237         mgs->mgs_service =
238                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
239                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
240                                 MGC_REPLY_PORTAL, 2,
241                                 mgs_handle, LUSTRE_MGS_NAME,
242                                 obd->obd_proc_entry, target_print_req,
243                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
244                                 "ll_mgs", LCT_MD_THREAD, NULL);
245
246         if (!mgs->mgs_service) {
247                 CERROR("failed to start service\n");
248                 GOTO(err_llog, rc = -ENOMEM);
249         }
250
251         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
252         if (rc)
253                 GOTO(err_thread, rc);
254
255         ping_evictor_start();
256
257         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
258
259         RETURN(0);
260
261 err_thread:
262         ptlrpc_unregister_service(mgs->mgs_service);
263 err_llog:
264         lproc_mgs_cleanup(obd);
265         obd_llog_finish(obd, 0);
266 err_fs:
267         /* No extra cleanup needed for llog_init_commit_thread() */
268         mgs_fs_cleanup(obd);
269 err_ns:
270         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
271         obd->obd_namespace = NULL;
272 err_ops:
273         fsfilt_put_ops(obd->obd_fsops);
274 err_put:
275         server_put_mount(obd->obd_name, mnt);
276         mgs->mgs_sb = 0;
277         return rc;
278 }
279
280 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
281 {
282         int rc = 0;
283         ENTRY;
284
285         switch (stage) {
286         case OBD_CLEANUP_EARLY:
287         case OBD_CLEANUP_EXPORTS:
288                 rc = obd_llog_finish(obd, 0);
289                 break;
290         }
291         RETURN(rc);
292 }
293
294 /**
295  * Performs cleanup procedures for passed \a obd given it is mgs obd.
296  */
297 static int mgs_cleanup(struct obd_device *obd)
298 {
299         struct mgs_obd *mgs = &obd->u.mgs;
300         ENTRY;
301
302         if (mgs->mgs_sb == NULL)
303                 RETURN(0);
304
305         ping_evictor_stop();
306
307         ptlrpc_unregister_service(mgs->mgs_service);
308
309         mgs_cleanup_fsdb_list(obd);
310         lproc_mgs_cleanup(obd);
311         mgs_fs_cleanup(obd);
312
313         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
314         mgs->mgs_sb = NULL;
315
316         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
317         obd->obd_namespace = NULL;
318
319         fsfilt_put_ops(obd->obd_fsops);
320
321         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
322         RETURN(0);
323 }
324
325 /* similar to filter_prepare_destroy */
326 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
327                             struct lustre_handle *lockh)
328 {
329         struct ldlm_res_id res_id;
330         int rc, flags = 0;
331         ENTRY;
332
333         rc = mgc_fsname2resid(fsname, &res_id);
334         if (!rc)
335                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
336                                             LDLM_PLAIN, NULL, LCK_EX,
337                                             &flags, ldlm_blocking_ast,
338                                             ldlm_completion_ast, NULL,
339                                             fsname, 0, NULL, lockh);
340         if (rc)
341                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
342
343         RETURN(rc);
344 }
345
346 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
347 {
348         ENTRY;
349         ldlm_lock_decref(lockh, LCK_EX);
350         RETURN(0);
351 }
352
353 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
354                             struct lustre_handle *lockh)
355 {
356         int lockrc;
357
358         if (fsname[0]) {
359                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
360                 if (lockrc != ELDLM_OK)
361                         CERROR("lock error %d for fs %s\n", lockrc,
362                                fsname);
363                 else
364                         mgs_put_cfg_lock(lockh);
365         }
366 }
367
368 /* rc=0 means ok
369       1 means update
370      <0 means error */
371 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
372 {
373         int rc;
374         ENTRY;
375
376         rc = mgs_check_index(obd, mti);
377         if (rc == 0) {
378                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
379                                    "this MGS does not know about it, preventing "
380                                    "registration.\n", mti->mti_svname);
381                 rc = -ENOENT;
382         } else if (rc == -1) {
383                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
384                                    "disappeared! Regenerating all logs.\n",
385                                    mti->mti_fsname);
386                 mti->mti_flags |= LDD_F_WRITECONF;
387                 rc = 1;
388         } else {
389                 /* Index is correctly marked as used */
390
391                 /* If the logs don't contain the mti_nids then add
392                    them as failover nids */
393                 rc = mgs_check_failnid(obd, mti);
394         }
395
396         RETURN(rc);
397 }
398
399 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
400 static int mgs_handle_target_reg(struct ptlrpc_request *req)
401 {
402         struct obd_device *obd = req->rq_export->exp_obd;
403         struct lustre_handle lockh;
404         struct mgs_target_info *mti, *rep_mti;
405         int rc = 0, lockrc;
406         ENTRY;
407
408         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
409
410         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
411         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
412                                 LDD_F_UPDATE))) {
413                 /* We're just here as a startup ping. */
414                 CDEBUG(D_MGS, "Server %s is running on %s\n",
415                        mti->mti_svname, obd_export_nid2str(req->rq_export));
416                 rc = mgs_check_target(obd, mti);
417                 /* above will set appropriate mti flags */
418                 if (rc <= 0)
419                         /* Nothing wrong, or fatal error */
420                         GOTO(out_nolock, rc);
421         }
422
423         /* Revoke the config lock to make sure nobody is reading. */
424         /* Although actually I think it should be alright if
425            someone was reading while we were updating the logs - if we
426            revoke at the end they will just update from where they left off. */
427         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
428         if (lockrc != ELDLM_OK) {
429                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
430                                    "update their configuration (%d). Updating "
431                                    "local logs anyhow; you might have to "
432                                    "manually restart other nodes to get the "
433                                    "latest configuration.\n",
434                                    obd->obd_name, lockrc);
435         }
436
437         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
438
439         /* Log writing contention is handled by the fsdb_sem */
440
441         if (mti->mti_flags & LDD_F_WRITECONF) {
442                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
443                     mti->mti_stripe_index == 0) {
444                         rc = mgs_erase_logs(obd, mti->mti_fsname);
445                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
446                                       "request.  All servers must be restarted "
447                                       "in order to regenerate the logs."
448                                       "\n", obd->obd_name, mti->mti_fsname);
449                 } else if (mti->mti_flags &
450                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
451                         rc = mgs_erase_log(obd, mti->mti_svname);
452                         LCONSOLE_WARN("%s: Regenerating %s log by user "
453                                       "request.\n",
454                                       obd->obd_name, mti->mti_svname);
455                 }
456                 mti->mti_flags |= LDD_F_UPDATE;
457                 /* Erased logs means start from scratch. */
458                 mti->mti_flags &= ~LDD_F_UPGRADE14;
459         }
460
461         /* COMPAT_146 */
462         if (mti->mti_flags & LDD_F_UPGRADE14) {
463                 rc = mgs_upgrade_sv_14(obd, mti);
464                 if (rc) {
465                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
466                         GOTO(out, rc);
467                 }
468
469                 /* We're good to go */
470                 mti->mti_flags |= LDD_F_UPDATE;
471         }
472         /* end COMPAT_146 */
473
474         if (mti->mti_flags & LDD_F_UPDATE) {
475                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
476                        mti->mti_stripe_index);
477
478                 /* create or update the target log
479                    and update the client/mdt logs */
480                 rc = mgs_write_log_target(obd, mti);
481                 if (rc) {
482                         CERROR("Failed to write %s log (%d)\n",
483                                mti->mti_svname, rc);
484                         GOTO(out, rc);
485                 }
486
487                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
488                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
489                                     LDD_F_UPGRADE14);
490                 mti->mti_flags |= LDD_F_REWRITE_LDD;
491         }
492
493 out:
494         /* done with log update */
495         if (lockrc == ELDLM_OK)
496                 mgs_put_cfg_lock(&lockh);
497 out_nolock:
498         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
499                mti->mti_stripe_index, rc);
500         rc = req_capsule_server_pack(&req->rq_pill);
501         if (rc)
502                 RETURN(rc);
503
504         /* send back the whole mti in the reply */
505         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
506         *rep_mti = *mti;
507
508         /* Flush logs to disk */
509         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
510         RETURN(rc);
511 }
512
513 static int mgs_set_info_rpc(struct ptlrpc_request *req)
514 {
515         struct obd_device *obd = req->rq_export->exp_obd;
516         struct mgs_send_param *msp, *rep_msp;
517         struct lustre_handle lockh;
518         int rc;
519         struct lustre_cfg_bufs bufs;
520         struct lustre_cfg *lcfg;
521         char fsname[MTI_NAME_MAXLEN];
522         ENTRY;
523
524         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
525         LASSERT(msp);
526
527         /* Construct lustre_cfg structure to pass to function mgs_setparam */
528         lustre_cfg_bufs_reset(&bufs, NULL);
529         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
530         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
531         rc = mgs_setparam(obd, lcfg, fsname);
532         if (rc) {
533                 CERROR("Error %d in setting the parameter %s for fs %s\n",
534                        rc, msp->mgs_param, fsname);
535                 RETURN(rc);
536         }
537
538         /* request for update */
539         mgs_revoke_lock(obd, fsname, &lockh);
540
541         lustre_cfg_free(lcfg);
542
543         rc = req_capsule_server_pack(&req->rq_pill);
544         if (rc == 0) {
545                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
546                 rep_msp = msp;
547         }
548         RETURN(rc);
549 }
550
551 /*
552  * similar as in ost_connect_check_sptlrpc()
553  */
554 static int mgs_connect_check_sptlrpc(struct ptlrpc_request *req)
555 {
556         struct obd_export     *exp = req->rq_export;
557         struct obd_device     *obd = exp->exp_obd;
558         struct fs_db          *fsdb;
559         struct sptlrpc_flavor  flvr;
560         int                    rc = 0;
561
562         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
563                 rc = mgs_find_or_make_fsdb(obd, MGSSELF_NAME, &fsdb);
564                 if (rc)
565                         return rc;
566
567                 cfs_down(&fsdb->fsdb_sem);
568                 if (sptlrpc_rule_set_choose(&fsdb->fsdb_srpc_gen,
569                                             LUSTRE_SP_MGC, LUSTRE_SP_MGS,
570                                             req->rq_peer.nid,
571                                             &flvr) == 0) {
572                         /* by defualt allow any flavors */
573                         flvr.sf_rpc = SPTLRPC_FLVR_ANY;
574                 }
575                 cfs_up(&fsdb->fsdb_sem);
576
577                 cfs_spin_lock(&exp->exp_lock);
578
579                 exp->exp_sp_peer = req->rq_sp_from;
580                 exp->exp_flvr = flvr;
581
582                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
583                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
584                         CERROR("invalid rpc flavor %x, expect %x, from %s\n",
585                                req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
586                                libcfs_nid2str(req->rq_peer.nid));
587                         rc = -EACCES;
588                 }
589
590                 cfs_spin_unlock(&exp->exp_lock);
591         } else {
592                 if (exp->exp_sp_peer != req->rq_sp_from) {
593                         CERROR("RPC source %s doesn't match %s\n",
594                                sptlrpc_part2name(req->rq_sp_from),
595                                sptlrpc_part2name(exp->exp_sp_peer));
596                         rc = -EACCES;
597                 } else {
598                         rc = sptlrpc_target_export_check(exp, req);
599                 }
600         }
601
602         return rc;
603 }
604
605 /* Called whenever a target cleans up. */
606 /* XXX - Currently unused */
607 static int mgs_handle_target_del(struct ptlrpc_request *req)
608 {
609         ENTRY;
610         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
611         RETURN(0);
612 }
613
614 /* XXX - Currently unused */
615 static int mgs_handle_exception(struct ptlrpc_request *req)
616 {
617         ENTRY;
618         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
619         RETURN(0);
620 }
621
622 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
623 int mgs_handle(struct ptlrpc_request *req)
624 {
625         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
626         int opc, rc = 0;
627         ENTRY;
628
629         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
630         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
631         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
632                 RETURN(0);
633
634         LASSERT(current->journal_info == NULL);
635         opc = lustre_msg_get_opc(req->rq_reqmsg);
636
637         if (opc == SEC_CTX_INIT ||
638             opc == SEC_CTX_INIT_CONT ||
639             opc == SEC_CTX_FINI)
640                 GOTO(out, rc = 0);
641
642         if (opc != MGS_CONNECT) {
643                 if (!class_connected_export(req->rq_export)) {
644                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
645                                opc);
646                         req->rq_status = -ENOTCONN;
647                         GOTO(out, rc = -ENOTCONN);
648                 }
649         }
650
651         switch (opc) {
652         case MGS_CONNECT:
653                 DEBUG_REQ(D_MGS, req, "connect");
654                 /* MGS and MDS have same request format for connect */
655                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
656                 rc = target_handle_connect(req);
657                 if (rc == 0)
658                         rc = mgs_connect_check_sptlrpc(req);
659
660                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
661                         /* Make clients trying to reconnect after a MGS restart
662                            happy; also requires obd_replayable */
663                         lustre_msg_add_op_flags(req->rq_repmsg,
664                                                 MSG_CONNECT_RECONNECT);
665                 break;
666         case MGS_DISCONNECT:
667                 DEBUG_REQ(D_MGS, req, "disconnect");
668                 /* MGS and MDS have same request format for disconnect */
669                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
670                 rc = target_handle_disconnect(req);
671                 req->rq_status = rc;            /* superfluous? */
672                 break;
673         case MGS_EXCEPTION:
674                 DEBUG_REQ(D_MGS, req, "exception");
675                 rc = mgs_handle_exception(req);
676                 break;
677         case MGS_TARGET_REG:
678                 DEBUG_REQ(D_MGS, req, "target add");
679                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
680                 rc = mgs_handle_target_reg(req);
681                 break;
682         case MGS_TARGET_DEL:
683                 DEBUG_REQ(D_MGS, req, "target del");
684                 rc = mgs_handle_target_del(req);
685                 break;
686         case MGS_SET_INFO:
687                 DEBUG_REQ(D_MGS, req, "set_info");
688                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
689                 rc = mgs_set_info_rpc(req);
690                 break;
691
692         case LDLM_ENQUEUE:
693                 DEBUG_REQ(D_MGS, req, "enqueue");
694                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
695                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
696                                          ldlm_server_blocking_ast, NULL);
697                 break;
698         case LDLM_BL_CALLBACK:
699         case LDLM_CP_CALLBACK:
700                 DEBUG_REQ(D_MGS, req, "callback");
701                 CERROR("callbacks should not happen on MGS\n");
702                 LBUG();
703                 break;
704
705         case OBD_PING:
706                 DEBUG_REQ(D_INFO, req, "ping");
707                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
708                 rc = target_handle_ping(req);
709                 break;
710         case OBD_LOG_CANCEL:
711                 DEBUG_REQ(D_MGS, req, "log cancel");
712                 rc = -ENOTSUPP; /* la la la */
713                 break;
714
715         case LLOG_ORIGIN_HANDLE_CREATE:
716                 DEBUG_REQ(D_MGS, req, "llog_init");
717                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
718                 rc = llog_origin_handle_create(req);
719                 break;
720         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
721                 DEBUG_REQ(D_MGS, req, "llog next block");
722                 req_capsule_set(&req->rq_pill,
723                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
724                 rc = llog_origin_handle_next_block(req);
725                 break;
726         case LLOG_ORIGIN_HANDLE_READ_HEADER:
727                 DEBUG_REQ(D_MGS, req, "llog read header");
728                 req_capsule_set(&req->rq_pill,
729                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
730                 rc = llog_origin_handle_read_header(req);
731                 break;
732         case LLOG_ORIGIN_HANDLE_CLOSE:
733                 DEBUG_REQ(D_MGS, req, "llog close");
734                 rc = llog_origin_handle_close(req);
735                 break;
736         case LLOG_CATINFO:
737                 DEBUG_REQ(D_MGS, req, "llog catinfo");
738                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
739                 rc = llog_catinfo(req);
740                 break;
741         default:
742                 req->rq_status = -ENOTSUPP;
743                 rc = ptlrpc_error(req);
744                 RETURN(rc);
745         }
746
747         LASSERT(current->journal_info == NULL);
748
749         if (rc)
750                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
751
752 out:
753         target_send_reply(req, rc, fail);
754         RETURN(0);
755 }
756
757 static inline int mgs_init_export(struct obd_export *exp)
758 {
759         cfs_spin_lock(&exp->exp_lock);
760         exp->exp_connecting = 1;
761         cfs_spin_unlock(&exp->exp_lock);
762
763         return ldlm_init_export(exp);
764 }
765
766 static inline int mgs_destroy_export(struct obd_export *exp)
767 {
768         ENTRY;
769
770         target_destroy_export(exp);
771         mgs_client_free(exp);
772         ldlm_destroy_export(exp);
773
774         RETURN(0);
775 }
776
777 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
778 {
779         char *ptr;
780
781         ENTRY;
782         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
783                 *fsname = *ptr;
784                 fsname++;
785         }
786         if (*ptr == '\0')
787                 return -EINVAL;
788         *fsname = '\0';
789         ptr++;
790         strcpy(poolname, ptr);
791
792         RETURN(0);
793 }
794
795 static int mgs_iocontrol_pool(struct obd_device *obd,
796                               struct obd_ioctl_data *data)
797 {
798         int rc;
799         struct lustre_handle lockh;
800         struct lustre_cfg *lcfg = NULL;
801         struct llog_rec_hdr rec;
802         char *fsname = NULL;
803         char *poolname = NULL;
804         ENTRY;
805
806         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
807         if (fsname == NULL)
808                 RETURN(-ENOMEM);
809
810         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
811         if (poolname == NULL) {
812                 rc = -ENOMEM;
813                 GOTO(out_pool, rc);
814         }
815         rec.lrh_len = llog_data_len(data->ioc_plen1);
816
817         if (data->ioc_type == LUSTRE_CFG_TYPE) {
818                 rec.lrh_type = OBD_CFG_REC;
819         } else {
820                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
821                 rc = -EINVAL;
822                 GOTO(out_pool, rc);
823         }
824
825         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
826                 rc = -E2BIG;
827                 GOTO(out_pool, rc);
828         }
829
830         OBD_ALLOC(lcfg, data->ioc_plen1);
831         if (lcfg == NULL)
832                 GOTO(out_pool, rc = -ENOMEM);
833
834         if (cfs_copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
835                 GOTO(out_pool, rc = -EFAULT);
836
837         if (lcfg->lcfg_bufcount < 2) {
838                 GOTO(out_pool, rc = -EFAULT);
839         }
840
841         /* first arg is always <fsname>.<poolname> */
842         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
843                             poolname);
844
845         switch (lcfg->lcfg_command) {
846         case LCFG_POOL_NEW: {
847                 if (lcfg->lcfg_bufcount != 2)
848                         RETURN(-EINVAL);
849                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
850                                   poolname, NULL);
851                 break;
852         }
853         case LCFG_POOL_ADD: {
854                 if (lcfg->lcfg_bufcount != 3)
855                         RETURN(-EINVAL);
856                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
857                                   lustre_cfg_string(lcfg, 2));
858                 break;
859         }
860         case LCFG_POOL_REM: {
861                 if (lcfg->lcfg_bufcount != 3)
862                         RETURN(-EINVAL);
863                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
864                                   lustre_cfg_string(lcfg, 2));
865                 break;
866         }
867         case LCFG_POOL_DEL: {
868                 if (lcfg->lcfg_bufcount != 2)
869                         RETURN(-EINVAL);
870                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
871                                   poolname, NULL);
872                 break;
873         }
874         default: {
875                  rc = -EINVAL;
876                  GOTO(out_pool, rc);
877         }
878         }
879
880         if (rc) {
881                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
882                        rc, lcfg->lcfg_command, fsname, poolname);
883                 GOTO(out_pool, rc);
884         }
885
886         /* request for update */
887         mgs_revoke_lock(obd, fsname, &lockh);
888
889 out_pool:
890         if (lcfg != NULL)
891                 OBD_FREE(lcfg, data->ioc_plen1);
892
893         if (fsname != NULL)
894                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
895
896         if (poolname != NULL)
897                 OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
898
899         RETURN(rc);
900 }
901
902 /* from mdt_iocontrol */
903 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
904                   void *karg, void *uarg)
905 {
906         struct obd_device *obd = exp->exp_obd;
907         struct obd_ioctl_data *data = karg;
908         struct lvfs_run_ctxt saved;
909         int rc = 0;
910
911         ENTRY;
912         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
913
914         switch (cmd) {
915
916         case OBD_IOC_PARAM: {
917                 struct lustre_handle lockh;
918                 struct lustre_cfg *lcfg;
919                 struct llog_rec_hdr rec;
920                 char fsname[MTI_NAME_MAXLEN];
921
922                 rec.lrh_len = llog_data_len(data->ioc_plen1);
923
924                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
925                         rec.lrh_type = OBD_CFG_REC;
926                 } else {
927                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
928                         RETURN(-EINVAL);
929                 }
930
931                 OBD_ALLOC(lcfg, data->ioc_plen1);
932                 if (lcfg == NULL)
933                         RETURN(-ENOMEM);
934                 if (cfs_copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
935                         GOTO(out_free, rc = -EFAULT);
936
937                 if (lcfg->lcfg_bufcount < 1)
938                         GOTO(out_free, rc = -EINVAL);
939
940                 rc = mgs_setparam(obd, lcfg, fsname);
941                 if (rc) {
942                         CERROR("setparam err %d\n", rc);
943                         GOTO(out_free, rc);
944                 }
945
946                 /* Revoke lock so everyone updates.  Should be alright if
947                    someone was already reading while we were updating the logs,
948                    so we don't really need to hold the lock while we're
949                    writing (above). */
950                 mgs_revoke_lock(obd, fsname, &lockh);
951
952 out_free:
953                 OBD_FREE(lcfg, data->ioc_plen1);
954                 RETURN(rc);
955         }
956
957         case OBD_IOC_POOL: {
958                 RETURN(mgs_iocontrol_pool(obd, data));
959         }
960
961         case OBD_IOC_DUMP_LOG: {
962                 struct llog_ctxt *ctxt;
963                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
964                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
965                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
966                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
967                 llog_ctxt_put(ctxt);
968
969                 RETURN(rc);
970         }
971
972         case OBD_IOC_LLOG_CHECK:
973         case OBD_IOC_LLOG_INFO:
974         case OBD_IOC_LLOG_PRINT: {
975                 struct llog_ctxt *ctxt;
976                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
977
978                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
979                 rc = llog_ioctl(ctxt, cmd, data);
980                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
981                 llog_ctxt_put(ctxt);
982
983                 RETURN(rc);
984         }
985
986         default:
987                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
988                 RETURN(-EINVAL);
989         }
990         RETURN(0);
991 }
992
993 /* use obd ops to offer management infrastructure */
994 static struct obd_ops mgs_obd_ops = {
995         .o_owner           = THIS_MODULE,
996         .o_connect         = mgs_connect,
997         .o_reconnect       = mgs_reconnect,
998         .o_disconnect      = mgs_disconnect,
999         .o_setup           = mgs_setup,
1000         .o_precleanup      = mgs_precleanup,
1001         .o_cleanup         = mgs_cleanup,
1002         .o_init_export     = mgs_init_export,
1003         .o_destroy_export  = mgs_destroy_export,
1004         .o_iocontrol       = mgs_iocontrol,
1005         .o_llog_init       = mgs_llog_init,
1006         .o_llog_finish     = mgs_llog_finish
1007 };
1008
1009 static int __init mgs_init(void)
1010 {
1011         struct lprocfs_static_vars lvars;
1012
1013         lprocfs_mgs_init_vars(&lvars);
1014         class_register_type(&mgs_obd_ops, NULL,
1015                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
1016
1017         return 0;
1018 }
1019
1020 static void /*__exit*/ mgs_exit(void)
1021 {
1022         class_unregister_type(LUSTRE_MGS_NAME);
1023 }
1024
1025 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1026 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
1027 MODULE_LICENSE("GPL");
1028
1029 module_init(mgs_init);
1030 module_exit(mgs_exit);