Whamcloud - gitweb
99dd197b8e6c4f952d6787b064c77b5bda122fa5
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct obd_export **exp, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *lexp;
71         struct lustre_handle conn = { 0 };
72         int rc;
73         ENTRY;
74
75         if (!exp || !obd || !cluuid)
76                 RETURN(-EINVAL);
77
78         rc = class_connect(&conn, obd, cluuid);
79         if (rc)
80                 RETURN(rc);
81
82         lexp = class_conn2export(&conn);
83         LASSERT(lexp);
84
85         mgs_counter_incr(lexp, LPROC_MGS_CONNECT);
86
87         if (data != NULL) {
88                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
89                 lexp->exp_connect_flags = data->ocd_connect_flags;
90                 data->ocd_version = LUSTRE_VERSION_CODE;
91         }
92
93         rc = mgs_client_add(obd, lexp, localdata);
94
95         if (rc) {
96                 class_disconnect(lexp);
97         } else {
98                 *exp = lexp;
99         }
100
101         RETURN(rc);
102 }
103
104 static int mgs_reconnect(const struct lu_env *env,
105                          struct obd_export *exp, struct obd_device *obd,
106                          struct obd_uuid *cluuid, struct obd_connect_data *data,
107                          void *localdata)
108 {
109         ENTRY;
110
111         if (exp == NULL || obd == NULL || cluuid == NULL)
112                 RETURN(-EINVAL);
113
114         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
115
116         if (data != NULL) {
117                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
118                 exp->exp_connect_flags = data->ocd_connect_flags;
119                 data->ocd_version = LUSTRE_VERSION_CODE;
120         }
121
122         RETURN(0);
123 }
124
125 static int mgs_disconnect(struct obd_export *exp)
126 {
127         int rc;
128         ENTRY;
129
130         LASSERT(exp);
131
132         class_export_get(exp);
133         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
134
135         rc = server_disconnect_export(exp);
136
137         class_export_put(exp);
138         RETURN(rc);
139 }
140
141 static int mgs_cleanup(struct obd_device *obd);
142 static int mgs_handle(struct ptlrpc_request *req);
143
144 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
145                          struct obd_device *tgt, int *index)
146 {
147         int rc;
148         ENTRY;
149
150         LASSERT(olg == &obd->obd_olg);
151         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
152                         &llog_lvfs_ops);
153         RETURN(rc);
154 }
155
156 static int mgs_llog_finish(struct obd_device *obd, int count)
157 {
158         struct llog_ctxt *ctxt;
159         int rc = 0;
160         ENTRY;
161
162         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
163         if (ctxt)
164                 rc = llog_cleanup(ctxt);
165
166         RETURN(rc);
167 }
168
169 /* Start the MGS obd */
170 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
171 {
172         struct lprocfs_static_vars lvars;
173         struct mgs_obd *mgs = &obd->u.mgs;
174         struct lustre_mount_info *lmi;
175         struct lustre_sb_info *lsi;
176         struct vfsmount *mnt;
177         int rc = 0;
178         ENTRY;
179
180         CDEBUG(D_CONFIG, "Starting MGS\n");
181
182         /* Find our disk */
183         lmi = server_get_mount(obd->obd_name);
184         if (!lmi)
185                 RETURN(rc = -EINVAL);
186
187         mnt = lmi->lmi_mnt;
188         lsi = s2lsi(lmi->lmi_sb);
189         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
190         if (IS_ERR(obd->obd_fsops))
191                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
192
193         if (lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb))) {
194                 CERROR("%s: Underlying device is marked as read-only. "
195                        "Setup failed\n", obd->obd_name);
196                 GOTO(err_ops, rc = -EROFS);
197         }
198
199         /* namespace for mgs llog */
200         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
201                                                 LDLM_NAMESPACE_MODEST);
202         if (obd->obd_namespace == NULL)
203                 GOTO(err_ops, rc = -ENOMEM);
204
205         /* ldlm setup */
206         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
207                            "mgs_ldlm_client", &obd->obd_ldlm_client);
208
209         rc = mgs_fs_setup(obd, mnt);
210         if (rc) {
211                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
212                        obd->obd_name, rc);
213                 GOTO(err_ns, rc);
214         }
215
216         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
217         if (rc)
218                 GOTO(err_fs, rc);
219
220         /* No recovery for MGC's */
221         obd->obd_replayable = 0;
222
223         /* Internal mgs setup */
224         mgs_init_fsdb_list(obd);
225         sema_init(&mgs->mgs_sem, 1);
226
227         /* Setup proc */
228         lprocfs_mgs_init_vars(&lvars);
229         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0)
230                 lproc_mgs_setup(obd);
231
232         /* Start the service threads */
233         mgs->mgs_service =
234                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
235                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
236                                 MGC_REPLY_PORTAL, 2,
237                                 mgs_handle, LUSTRE_MGS_NAME,
238                                 obd->obd_proc_entry, target_print_req,
239                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
240                                 "ll_mgs", LCT_MD_THREAD, NULL);
241
242         if (!mgs->mgs_service) {
243                 CERROR("failed to start service\n");
244                 GOTO(err_llog, rc = -ENOMEM);
245         }
246
247         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
248         if (rc)
249                 GOTO(err_thread, rc);
250
251         ping_evictor_start();
252
253         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
254
255         RETURN(0);
256
257 err_thread:
258         ptlrpc_unregister_service(mgs->mgs_service);
259 err_llog:
260         lproc_mgs_cleanup(obd);
261         obd_llog_finish(obd, 0);
262 err_fs:
263         /* No extra cleanup needed for llog_init_commit_thread() */
264         mgs_fs_cleanup(obd);
265 err_ns:
266         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
267         obd->obd_namespace = NULL;
268 err_ops:
269         fsfilt_put_ops(obd->obd_fsops);
270 err_put:
271         server_put_mount(obd->obd_name, mnt);
272         mgs->mgs_sb = 0;
273         return rc;
274 }
275
276 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
277 {
278         int rc = 0;
279         ENTRY;
280
281         switch (stage) {
282         case OBD_CLEANUP_EARLY:
283         case OBD_CLEANUP_EXPORTS:
284                 rc = obd_llog_finish(obd, 0);
285                 break;
286         }
287         RETURN(rc);
288 }
289
290 /**
291  * Performs cleanup procedures for passed \a obd given it is mgs obd.
292  */
293 static int mgs_cleanup(struct obd_device *obd)
294 {
295         struct mgs_obd *mgs = &obd->u.mgs;
296         ENTRY;
297
298         if (mgs->mgs_sb == NULL)
299                 RETURN(0);
300
301         ping_evictor_stop();
302
303         ptlrpc_unregister_service(mgs->mgs_service);
304
305         mgs_cleanup_fsdb_list(obd);
306         lproc_mgs_cleanup(obd);
307         mgs_fs_cleanup(obd);
308
309         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
310         mgs->mgs_sb = NULL;
311
312         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
313         obd->obd_namespace = NULL;
314
315         fsfilt_put_ops(obd->obd_fsops);
316
317         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
318         RETURN(0);
319 }
320
321 /* similar to filter_prepare_destroy */
322 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
323                             struct lustre_handle *lockh)
324 {
325         struct ldlm_res_id res_id;
326         int rc, flags = 0;
327         ENTRY;
328
329         rc = mgc_fsname2resid(fsname, &res_id);
330         if (!rc)
331                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
332                                             LDLM_PLAIN, NULL, LCK_EX,
333                                             &flags, ldlm_blocking_ast,
334                                             ldlm_completion_ast, NULL,
335                                             fsname, 0, NULL, lockh);
336         if (rc)
337                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
338
339         RETURN(rc);
340 }
341
342 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
343 {
344         ENTRY;
345         ldlm_lock_decref(lockh, LCK_EX);
346         RETURN(0);
347 }
348
349 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
350                             struct lustre_handle *lockh)
351 {
352         int lockrc;
353
354         if (fsname[0]) {
355                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
356                 if (lockrc != ELDLM_OK)
357                         CERROR("lock error %d for fs %s\n", lockrc,
358                                fsname);
359                 else
360                         mgs_put_cfg_lock(lockh);
361         }
362 }
363
364 /* rc=0 means ok
365       1 means update
366      <0 means error */
367 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
368 {
369         int rc;
370         ENTRY;
371
372         rc = mgs_check_index(obd, mti);
373         if (rc == 0) {
374                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
375                                    "this MGS does not know about it, preventing "
376                                    "registration.\n", mti->mti_svname);
377                 rc = -ENOENT;
378         } else if (rc == -1) {
379                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
380                                    "disappeared! Regenerating all logs.\n",
381                                    mti->mti_fsname);
382                 mti->mti_flags |= LDD_F_WRITECONF;
383                 rc = 1;
384         } else {
385                 /* Index is correctly marked as used */
386
387                 /* If the logs don't contain the mti_nids then add
388                    them as failover nids */
389                 rc = mgs_check_failnid(obd, mti);
390         }
391
392         RETURN(rc);
393 }
394
395 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
396 static int mgs_handle_target_reg(struct ptlrpc_request *req)
397 {
398         struct obd_device *obd = req->rq_export->exp_obd;
399         struct lustre_handle lockh;
400         struct mgs_target_info *mti, *rep_mti;
401         int rc = 0, lockrc;
402         ENTRY;
403
404         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
405
406         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
407         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
408                                 LDD_F_UPDATE))) {
409                 /* We're just here as a startup ping. */
410                 CDEBUG(D_MGS, "Server %s is running on %s\n",
411                        mti->mti_svname, obd_export_nid2str(req->rq_export));
412                 rc = mgs_check_target(obd, mti);
413                 /* above will set appropriate mti flags */
414                 if (rc <= 0)
415                         /* Nothing wrong, or fatal error */
416                         GOTO(out_nolock, rc);
417         }
418
419         /* Revoke the config lock to make sure nobody is reading. */
420         /* Although actually I think it should be alright if
421            someone was reading while we were updating the logs - if we
422            revoke at the end they will just update from where they left off. */
423         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
424         if (lockrc != ELDLM_OK) {
425                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
426                                    "update their configuration (%d). Updating "
427                                    "local logs anyhow; you might have to "
428                                    "manually restart other nodes to get the "
429                                    "latest configuration.\n",
430                                    obd->obd_name, lockrc);
431         }
432
433         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
434
435         /* Log writing contention is handled by the fsdb_sem */
436
437         if (mti->mti_flags & LDD_F_WRITECONF) {
438                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
439                     mti->mti_stripe_index == 0) {
440                         rc = mgs_erase_logs(obd, mti->mti_fsname);
441                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
442                                       "request.  All servers must be restarted "
443                                       "in order to regenerate the logs."
444                                       "\n", obd->obd_name, mti->mti_fsname);
445                 } else if (mti->mti_flags &
446                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
447                         rc = mgs_erase_log(obd, mti->mti_svname);
448                         LCONSOLE_WARN("%s: Regenerating %s log by user "
449                                       "request.\n",
450                                       obd->obd_name, mti->mti_svname);
451                 }
452                 mti->mti_flags |= LDD_F_UPDATE;
453                 /* Erased logs means start from scratch. */
454                 mti->mti_flags &= ~LDD_F_UPGRADE14;
455         }
456
457         /* COMPAT_146 */
458         if (mti->mti_flags & LDD_F_UPGRADE14) {
459                 rc = mgs_upgrade_sv_14(obd, mti);
460                 if (rc) {
461                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
462                         GOTO(out, rc);
463                 }
464
465                 /* We're good to go */
466                 mti->mti_flags |= LDD_F_UPDATE;
467         }
468         /* end COMPAT_146 */
469
470         if (mti->mti_flags & LDD_F_UPDATE) {
471                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
472                        mti->mti_stripe_index);
473
474                 /* create or update the target log
475                    and update the client/mdt logs */
476                 rc = mgs_write_log_target(obd, mti);
477                 if (rc) {
478                         CERROR("Failed to write %s log (%d)\n",
479                                mti->mti_svname, rc);
480                         GOTO(out, rc);
481                 }
482
483                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
484                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
485                                     LDD_F_UPGRADE14);
486                 mti->mti_flags |= LDD_F_REWRITE_LDD;
487         }
488
489 out:
490         /* done with log update */
491         if (lockrc == ELDLM_OK)
492                 mgs_put_cfg_lock(&lockh);
493 out_nolock:
494         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
495                mti->mti_stripe_index, rc);
496         rc = req_capsule_server_pack(&req->rq_pill);
497         if (rc)
498                 RETURN(rc);
499
500         /* send back the whole mti in the reply */
501         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
502         *rep_mti = *mti;
503
504         /* Flush logs to disk */
505         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
506         RETURN(rc);
507 }
508
509 static int mgs_set_info_rpc(struct ptlrpc_request *req)
510 {
511         struct obd_device *obd = req->rq_export->exp_obd;
512         struct mgs_send_param *msp, *rep_msp;
513         struct lustre_handle lockh;
514         int rc;
515         struct lustre_cfg_bufs bufs;
516         struct lustre_cfg *lcfg;
517         char fsname[MTI_NAME_MAXLEN];
518         ENTRY;
519
520         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
521         LASSERT(msp);
522
523         /* Construct lustre_cfg structure to pass to function mgs_setparam */
524         lustre_cfg_bufs_reset(&bufs, NULL);
525         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
526         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
527         rc = mgs_setparam(obd, lcfg, fsname);
528         if (rc) {
529                 CERROR("Error %d in setting the parameter %s for fs %s\n",
530                        rc, msp->mgs_param, fsname);
531                 RETURN(rc);
532         }
533
534         /* request for update */
535         mgs_revoke_lock(obd, fsname, &lockh);
536
537         lustre_cfg_free(lcfg);
538
539         rc = req_capsule_server_pack(&req->rq_pill);
540         if (rc == 0) {
541                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
542                 rep_msp = msp;
543         }
544         RETURN(rc);
545 }
546
547 /*
548  * similar as in ost_connect_check_sptlrpc()
549  */
550 static int mgs_connect_check_sptlrpc(struct ptlrpc_request *req)
551 {
552         struct obd_export     *exp = req->rq_export;
553         struct obd_device     *obd = exp->exp_obd;
554         struct fs_db          *fsdb;
555         struct sptlrpc_flavor  flvr;
556         int                    rc = 0;
557
558         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
559                 rc = mgs_find_or_make_fsdb(obd, MGSSELF_NAME, &fsdb);
560                 if (rc)
561                         return rc;
562
563                 down(&fsdb->fsdb_sem);
564                 if (sptlrpc_rule_set_choose(&fsdb->fsdb_srpc_gen,
565                                             LUSTRE_SP_MGC, LUSTRE_SP_MGS,
566                                             req->rq_peer.nid,
567                                             &flvr) == 0) {
568                         /* by defualt allow any flavors */
569                         flvr.sf_rpc = SPTLRPC_FLVR_ANY;
570                 }
571                 up(&fsdb->fsdb_sem);
572
573                 spin_lock(&exp->exp_lock);
574
575                 exp->exp_sp_peer = req->rq_sp_from;
576                 exp->exp_flvr = flvr;
577
578                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
579                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
580                         CERROR("invalid rpc flavor %x, expect %x, from %s\n",
581                                req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
582                                libcfs_nid2str(req->rq_peer.nid));
583                         rc = -EACCES;
584                 }
585
586                 spin_unlock(&exp->exp_lock);
587         } else {
588                 if (exp->exp_sp_peer != req->rq_sp_from) {
589                         CERROR("RPC source %s doesn't match %s\n",
590                                sptlrpc_part2name(req->rq_sp_from),
591                                sptlrpc_part2name(exp->exp_sp_peer));
592                         rc = -EACCES;
593                 } else {
594                         rc = sptlrpc_target_export_check(exp, req);
595                 }
596         }
597
598         return rc;
599 }
600
601 /* Called whenever a target cleans up. */
602 /* XXX - Currently unused */
603 static int mgs_handle_target_del(struct ptlrpc_request *req)
604 {
605         ENTRY;
606         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
607         RETURN(0);
608 }
609
610 /* XXX - Currently unused */
611 static int mgs_handle_exception(struct ptlrpc_request *req)
612 {
613         ENTRY;
614         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
615         RETURN(0);
616 }
617
618 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
619 int mgs_handle(struct ptlrpc_request *req)
620 {
621         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
622         int opc, rc = 0;
623         ENTRY;
624
625         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
626         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
627         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
628                 RETURN(0);
629
630         LASSERT(current->journal_info == NULL);
631         opc = lustre_msg_get_opc(req->rq_reqmsg);
632
633         if (opc == SEC_CTX_INIT ||
634             opc == SEC_CTX_INIT_CONT ||
635             opc == SEC_CTX_FINI)
636                 GOTO(out, rc = 0);
637
638         if (opc != MGS_CONNECT) {
639                 if (!class_connected_export(req->rq_export)) {
640                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
641                                opc);
642                         req->rq_status = -ENOTCONN;
643                         GOTO(out, rc = -ENOTCONN);
644                 }
645         }
646
647         switch (opc) {
648         case MGS_CONNECT:
649                 DEBUG_REQ(D_MGS, req, "connect");
650                 /* MGS and MDS have same request format for connect */
651                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
652                 rc = target_handle_connect(req);
653                 if (rc == 0)
654                         rc = mgs_connect_check_sptlrpc(req);
655
656                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
657                         /* Make clients trying to reconnect after a MGS restart
658                            happy; also requires obd_replayable */
659                         lustre_msg_add_op_flags(req->rq_repmsg,
660                                                 MSG_CONNECT_RECONNECT);
661                 break;
662         case MGS_DISCONNECT:
663                 DEBUG_REQ(D_MGS, req, "disconnect");
664                 /* MGS and MDS have same request format for disconnect */
665                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
666                 rc = target_handle_disconnect(req);
667                 req->rq_status = rc;            /* superfluous? */
668                 break;
669         case MGS_EXCEPTION:
670                 DEBUG_REQ(D_MGS, req, "exception");
671                 rc = mgs_handle_exception(req);
672                 break;
673         case MGS_TARGET_REG:
674                 DEBUG_REQ(D_MGS, req, "target add");
675                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
676                 rc = mgs_handle_target_reg(req);
677                 break;
678         case MGS_TARGET_DEL:
679                 DEBUG_REQ(D_MGS, req, "target del");
680                 rc = mgs_handle_target_del(req);
681                 break;
682         case MGS_SET_INFO:
683                 DEBUG_REQ(D_MGS, req, "set_info");
684                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
685                 rc = mgs_set_info_rpc(req);
686                 break;
687
688         case LDLM_ENQUEUE:
689                 DEBUG_REQ(D_MGS, req, "enqueue");
690                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
691                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
692                                          ldlm_server_blocking_ast, NULL);
693                 break;
694         case LDLM_BL_CALLBACK:
695         case LDLM_CP_CALLBACK:
696                 DEBUG_REQ(D_MGS, req, "callback");
697                 CERROR("callbacks should not happen on MGS\n");
698                 LBUG();
699                 break;
700
701         case OBD_PING:
702                 DEBUG_REQ(D_INFO, req, "ping");
703                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
704                 rc = target_handle_ping(req);
705                 break;
706         case OBD_LOG_CANCEL:
707                 DEBUG_REQ(D_MGS, req, "log cancel");
708                 rc = -ENOTSUPP; /* la la la */
709                 break;
710
711         case LLOG_ORIGIN_HANDLE_CREATE:
712                 DEBUG_REQ(D_MGS, req, "llog_init");
713                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
714                 rc = llog_origin_handle_create(req);
715                 break;
716         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
717                 DEBUG_REQ(D_MGS, req, "llog next block");
718                 req_capsule_set(&req->rq_pill,
719                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
720                 rc = llog_origin_handle_next_block(req);
721                 break;
722         case LLOG_ORIGIN_HANDLE_READ_HEADER:
723                 DEBUG_REQ(D_MGS, req, "llog read header");
724                 req_capsule_set(&req->rq_pill,
725                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
726                 rc = llog_origin_handle_read_header(req);
727                 break;
728         case LLOG_ORIGIN_HANDLE_CLOSE:
729                 DEBUG_REQ(D_MGS, req, "llog close");
730                 rc = llog_origin_handle_close(req);
731                 break;
732         case LLOG_CATINFO:
733                 DEBUG_REQ(D_MGS, req, "llog catinfo");
734                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
735                 rc = llog_catinfo(req);
736                 break;
737         default:
738                 req->rq_status = -ENOTSUPP;
739                 rc = ptlrpc_error(req);
740                 RETURN(rc);
741         }
742
743         LASSERT(current->journal_info == NULL);
744
745         if (rc)
746                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
747
748 out:
749         target_send_reply(req, rc, fail);
750         RETURN(0);
751 }
752
753 static inline int mgs_init_export(struct obd_export *exp)
754 {
755         spin_lock(&exp->exp_lock);
756         exp->exp_connecting = 1;
757         spin_unlock(&exp->exp_lock);
758
759         return ldlm_init_export(exp);
760 }
761
762 static inline int mgs_destroy_export(struct obd_export *exp)
763 {
764         ENTRY;
765
766         target_destroy_export(exp);
767         mgs_client_free(exp);
768         ldlm_destroy_export(exp);
769
770         RETURN(0);
771 }
772
773 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
774 {
775         char *ptr;
776
777         ENTRY;
778         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
779                 *fsname = *ptr;
780                 fsname++;
781         }
782         if (*ptr == '\0')
783                 return -EINVAL;
784         *fsname = '\0';
785         ptr++;
786         strcpy(poolname, ptr);
787
788         RETURN(0);
789 }
790
791 static int mgs_iocontrol_pool(struct obd_device *obd,
792                               struct obd_ioctl_data *data)
793 {
794         int rc;
795         struct lustre_handle lockh;
796         struct lustre_cfg *lcfg = NULL;
797         struct llog_rec_hdr rec;
798         char *fsname = NULL;
799         char *poolname = NULL;
800         ENTRY;
801
802         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
803         if (fsname == NULL)
804                 RETURN(-ENOMEM);
805
806         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
807         if (poolname == NULL) {
808                 rc = -ENOMEM;
809                 GOTO(out_pool, rc);
810         }
811         rec.lrh_len = llog_data_len(data->ioc_plen1);
812
813         if (data->ioc_type == LUSTRE_CFG_TYPE) {
814                 rec.lrh_type = OBD_CFG_REC;
815         } else {
816                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
817                 rc = -EINVAL;
818                 GOTO(out_pool, rc);
819         }
820
821         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
822                 rc = -E2BIG;
823                 GOTO(out_pool, rc);
824         }
825
826         OBD_ALLOC(lcfg, data->ioc_plen1);
827         if (lcfg == NULL)
828                 GOTO(out_pool, rc = -ENOMEM);
829
830         if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
831                 GOTO(out_pool, rc = -EFAULT);
832
833         if (lcfg->lcfg_bufcount < 2) {
834                 GOTO(out_pool, rc = -EFAULT);
835         }
836
837         /* first arg is always <fsname>.<poolname> */
838         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
839                             poolname);
840
841         switch (lcfg->lcfg_command) {
842         case LCFG_POOL_NEW: {
843                 if (lcfg->lcfg_bufcount != 2)
844                         RETURN(-EINVAL);
845                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
846                                   poolname, NULL);
847                 break;
848         }
849         case LCFG_POOL_ADD: {
850                 if (lcfg->lcfg_bufcount != 3)
851                         RETURN(-EINVAL);
852                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
853                                   lustre_cfg_string(lcfg, 2));
854                 break;
855         }
856         case LCFG_POOL_REM: {
857                 if (lcfg->lcfg_bufcount != 3)
858                         RETURN(-EINVAL);
859                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
860                                   lustre_cfg_string(lcfg, 2));
861                 break;
862         }
863         case LCFG_POOL_DEL: {
864                 if (lcfg->lcfg_bufcount != 2)
865                         RETURN(-EINVAL);
866                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
867                                   poolname, NULL);
868                 break;
869         }
870         default: {
871                  rc = -EINVAL;
872                  GOTO(out_pool, rc);
873         }
874         }
875
876         if (rc) {
877                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
878                        rc, lcfg->lcfg_command, fsname, poolname);
879                 GOTO(out_pool, rc);
880         }
881
882         /* request for update */
883         mgs_revoke_lock(obd, fsname, &lockh);
884
885 out_pool:
886         if (lcfg != NULL)
887                 OBD_FREE(lcfg, data->ioc_plen1);
888
889         if (fsname != NULL)
890                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
891
892         if (poolname != NULL)
893                 OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
894
895         RETURN(rc);
896 }
897
898 /* from mdt_iocontrol */
899 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
900                   void *karg, void *uarg)
901 {
902         struct obd_device *obd = exp->exp_obd;
903         struct obd_ioctl_data *data = karg;
904         struct lvfs_run_ctxt saved;
905         int rc = 0;
906
907         ENTRY;
908         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
909
910         switch (cmd) {
911
912         case OBD_IOC_PARAM: {
913                 struct lustre_handle lockh;
914                 struct lustre_cfg *lcfg;
915                 struct llog_rec_hdr rec;
916                 char fsname[MTI_NAME_MAXLEN];
917
918                 rec.lrh_len = llog_data_len(data->ioc_plen1);
919
920                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
921                         rec.lrh_type = OBD_CFG_REC;
922                 } else {
923                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
924                         RETURN(-EINVAL);
925                 }
926
927                 OBD_ALLOC(lcfg, data->ioc_plen1);
928                 if (lcfg == NULL)
929                         RETURN(-ENOMEM);
930                 if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
931                         GOTO(out_free, rc = -EFAULT);
932
933                 if (lcfg->lcfg_bufcount < 1)
934                         GOTO(out_free, rc = -EINVAL);
935
936                 rc = mgs_setparam(obd, lcfg, fsname);
937                 if (rc) {
938                         CERROR("setparam err %d\n", rc);
939                         GOTO(out_free, rc);
940                 }
941
942                 /* Revoke lock so everyone updates.  Should be alright if
943                    someone was already reading while we were updating the logs,
944                    so we don't really need to hold the lock while we're
945                    writing (above). */
946                 mgs_revoke_lock(obd, fsname, &lockh);
947
948 out_free:
949                 OBD_FREE(lcfg, data->ioc_plen1);
950                 RETURN(rc);
951         }
952
953         case OBD_IOC_POOL: {
954                 RETURN(mgs_iocontrol_pool(obd, data));
955         }
956
957         case OBD_IOC_DUMP_LOG: {
958                 struct llog_ctxt *ctxt;
959                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
960                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
961                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
962                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
963                 llog_ctxt_put(ctxt);
964
965                 RETURN(rc);
966         }
967
968         case OBD_IOC_LLOG_CHECK:
969         case OBD_IOC_LLOG_INFO:
970         case OBD_IOC_LLOG_PRINT: {
971                 struct llog_ctxt *ctxt;
972                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
973
974                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
975                 rc = llog_ioctl(ctxt, cmd, data);
976                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
977                 llog_ctxt_put(ctxt);
978
979                 RETURN(rc);
980         }
981
982         default:
983                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
984                 RETURN(-EINVAL);
985         }
986         RETURN(0);
987 }
988
989 /* use obd ops to offer management infrastructure */
990 static struct obd_ops mgs_obd_ops = {
991         .o_owner           = THIS_MODULE,
992         .o_connect         = mgs_connect,
993         .o_reconnect       = mgs_reconnect,
994         .o_disconnect      = mgs_disconnect,
995         .o_setup           = mgs_setup,
996         .o_precleanup      = mgs_precleanup,
997         .o_cleanup         = mgs_cleanup,
998         .o_init_export     = mgs_init_export,
999         .o_destroy_export  = mgs_destroy_export,
1000         .o_iocontrol       = mgs_iocontrol,
1001         .o_llog_init       = mgs_llog_init,
1002         .o_llog_finish     = mgs_llog_finish
1003 };
1004
1005 static int __init mgs_init(void)
1006 {
1007         struct lprocfs_static_vars lvars;
1008
1009         lprocfs_mgs_init_vars(&lvars);
1010         class_register_type(&mgs_obd_ops, NULL,
1011                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
1012
1013         return 0;
1014 }
1015
1016 static void /*__exit*/ mgs_exit(void)
1017 {
1018         class_unregister_type(LUSTRE_MGS_NAME);
1019 }
1020
1021 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1022 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
1023 MODULE_LICENSE("GPL");
1024
1025 module_init(mgs_init);
1026 module_exit(mgs_exit);