Whamcloud - gitweb
b=20595
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct obd_export **exp, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *lexp;
71         struct lustre_handle conn = { 0 };
72         int rc;
73         ENTRY;
74
75         if (!exp || !obd || !cluuid)
76                 RETURN(-EINVAL);
77
78         rc = class_connect(&conn, obd, cluuid);
79         if (rc)
80                 RETURN(rc);
81
82         lexp = class_conn2export(&conn);
83         LASSERT(lexp);
84
85         mgs_counter_incr(lexp, LPROC_MGS_CONNECT);
86
87         if (data != NULL) {
88                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
89                 lexp->exp_connect_flags = data->ocd_connect_flags;
90                 data->ocd_version = LUSTRE_VERSION_CODE;
91         }
92
93         rc = mgs_client_add(obd, lexp, localdata);
94
95         if (rc) {
96                 class_disconnect(lexp);
97         } else {
98                 *exp = lexp;
99         }
100
101         RETURN(rc);
102 }
103
104 static int mgs_reconnect(const struct lu_env *env,
105                          struct obd_export *exp, struct obd_device *obd,
106                          struct obd_uuid *cluuid, struct obd_connect_data *data,
107                          void *localdata)
108 {
109         ENTRY;
110
111         if (exp == NULL || obd == NULL || cluuid == NULL)
112                 RETURN(-EINVAL);
113
114         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
115
116         if (data != NULL) {
117                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
118                 exp->exp_connect_flags = data->ocd_connect_flags;
119                 data->ocd_version = LUSTRE_VERSION_CODE;
120         }
121
122         RETURN(0);
123 }
124
125 static int mgs_disconnect(struct obd_export *exp)
126 {
127         int rc;
128         ENTRY;
129
130         LASSERT(exp);
131
132         class_export_get(exp);
133         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
134
135         /* Disconnect early so that clients can't keep using export */
136         rc = class_disconnect(exp);
137         ldlm_cancel_locks_for_export(exp);
138
139         lprocfs_exp_cleanup(exp);
140
141         /* complete all outstanding replies */
142         spin_lock(&exp->exp_lock);
143         while (!list_empty(&exp->exp_outstanding_replies)) {
144                 struct ptlrpc_reply_state *rs =
145                         list_entry(exp->exp_outstanding_replies.next,
146                                    struct ptlrpc_reply_state, rs_exp_list);
147                 struct ptlrpc_service *svc = rs->rs_service;
148
149                 spin_lock(&svc->srv_lock);
150                 list_del_init(&rs->rs_exp_list);
151                 spin_lock(&rs->rs_lock);
152                 ptlrpc_schedule_difficult_reply(rs);
153                 spin_unlock(&rs->rs_lock);
154                 spin_unlock(&svc->srv_lock);
155         }
156         spin_unlock(&exp->exp_lock);
157
158         class_export_put(exp);
159         RETURN(rc);
160 }
161
162 static int mgs_cleanup(struct obd_device *obd);
163 static int mgs_handle(struct ptlrpc_request *req);
164
165 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
166                          struct obd_device *tgt, int *index)
167 {
168         int rc;
169         ENTRY;
170
171         LASSERT(olg == &obd->obd_olg);
172         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
173                         &llog_lvfs_ops);
174         RETURN(rc);
175 }
176
177 static int mgs_llog_finish(struct obd_device *obd, int count)
178 {
179         struct llog_ctxt *ctxt;
180         int rc = 0;
181         ENTRY;
182
183         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
184         if (ctxt)
185                 rc = llog_cleanup(ctxt);
186
187         RETURN(rc);
188 }
189
190 /* Start the MGS obd */
191 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
192 {
193         struct lprocfs_static_vars lvars;
194         struct mgs_obd *mgs = &obd->u.mgs;
195         struct lustre_mount_info *lmi;
196         struct lustre_sb_info *lsi;
197         struct vfsmount *mnt;
198         int rc = 0;
199         ENTRY;
200
201         CDEBUG(D_CONFIG, "Starting MGS\n");
202
203         /* Find our disk */
204         lmi = server_get_mount(obd->obd_name);
205         if (!lmi)
206                 RETURN(rc = -EINVAL);
207
208         mnt = lmi->lmi_mnt;
209         lsi = s2lsi(lmi->lmi_sb);
210         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
211         if (IS_ERR(obd->obd_fsops))
212                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
213
214         if (lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb))) {
215                 CERROR("%s: Underlying device is marked as read-only. "
216                        "Setup failed\n", obd->obd_name);
217                 GOTO(err_ops, rc = -EROFS);
218         }
219
220         /* namespace for mgs llog */
221         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
222                                                 LDLM_NAMESPACE_MODEST);
223         if (obd->obd_namespace == NULL)
224                 GOTO(err_ops, rc = -ENOMEM);
225
226         /* ldlm setup */
227         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
228                            "mgs_ldlm_client", &obd->obd_ldlm_client);
229
230         rc = mgs_fs_setup(obd, mnt);
231         if (rc) {
232                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
233                        obd->obd_name, rc);
234                 GOTO(err_ns, rc);
235         }
236
237         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
238         if (rc)
239                 GOTO(err_fs, rc);
240
241         /* No recovery for MGC's */
242         obd->obd_replayable = 0;
243
244         /* Internal mgs setup */
245         mgs_init_fsdb_list(obd);
246         sema_init(&mgs->mgs_sem, 1);
247
248         /* Setup proc */
249         lprocfs_mgs_init_vars(&lvars);
250         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0)
251                 lproc_mgs_setup(obd);
252
253         /* Start the service threads */
254         mgs->mgs_service =
255                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
256                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
257                                 MGC_REPLY_PORTAL, 2,
258                                 mgs_handle, LUSTRE_MGS_NAME,
259                                 obd->obd_proc_entry, target_print_req,
260                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
261                                 "ll_mgs", LCT_MD_THREAD, NULL);
262
263         if (!mgs->mgs_service) {
264                 CERROR("failed to start service\n");
265                 GOTO(err_llog, rc = -ENOMEM);
266         }
267
268         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
269         if (rc)
270                 GOTO(err_thread, rc);
271
272         ping_evictor_start();
273
274         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
275
276         RETURN(0);
277
278 err_thread:
279         ptlrpc_unregister_service(mgs->mgs_service);
280 err_llog:
281         lproc_mgs_cleanup(obd);
282         obd_llog_finish(obd, 0);
283 err_fs:
284         /* No extra cleanup needed for llog_init_commit_thread() */
285         mgs_fs_cleanup(obd);
286 err_ns:
287         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
288         obd->obd_namespace = NULL;
289 err_ops:
290         fsfilt_put_ops(obd->obd_fsops);
291 err_put:
292         server_put_mount(obd->obd_name, mnt);
293         mgs->mgs_sb = 0;
294         return rc;
295 }
296
297 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
298 {
299         int rc = 0;
300         ENTRY;
301
302         switch (stage) {
303         case OBD_CLEANUP_EARLY:
304         case OBD_CLEANUP_EXPORTS:
305                 rc = obd_llog_finish(obd, 0);
306                 break;
307         }
308         RETURN(rc);
309 }
310
311 /**
312  * Performs cleanup procedures for passed \a obd given it is mgs obd.
313  */
314 static int mgs_cleanup(struct obd_device *obd)
315 {
316         struct mgs_obd *mgs = &obd->u.mgs;
317         ENTRY;
318
319         if (mgs->mgs_sb == NULL)
320                 RETURN(0);
321
322         ping_evictor_stop();
323
324         ptlrpc_unregister_service(mgs->mgs_service);
325
326         mgs_cleanup_fsdb_list(obd);
327         lproc_mgs_cleanup(obd);
328         mgs_fs_cleanup(obd);
329
330         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
331         mgs->mgs_sb = NULL;
332
333         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
334         obd->obd_namespace = NULL;
335
336         fsfilt_put_ops(obd->obd_fsops);
337
338         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
339         RETURN(0);
340 }
341
342 /* similar to filter_prepare_destroy */
343 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
344                             struct lustre_handle *lockh)
345 {
346         struct ldlm_res_id res_id;
347         int rc, flags = 0;
348         ENTRY;
349
350         rc = mgc_fsname2resid(fsname, &res_id);
351         if (!rc)
352                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
353                                             LDLM_PLAIN, NULL, LCK_EX,
354                                             &flags, ldlm_blocking_ast,
355                                             ldlm_completion_ast, NULL,
356                                             fsname, 0, NULL, NULL, lockh);
357         if (rc)
358                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
359
360         RETURN(rc);
361 }
362
363 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
364 {
365         ENTRY;
366         ldlm_lock_decref(lockh, LCK_EX);
367         RETURN(0);
368 }
369
370 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
371                             struct lustre_handle *lockh)
372 {
373         int lockrc;
374
375         if (fsname[0]) {
376                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
377                 if (lockrc != ELDLM_OK)
378                         CERROR("lock error %d for fs %s\n", lockrc,
379                                fsname);
380                 else
381                         mgs_put_cfg_lock(lockh);
382         }
383 }
384
385 /* rc=0 means ok
386       1 means update
387      <0 means error */
388 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
389 {
390         int rc;
391         ENTRY;
392
393         rc = mgs_check_index(obd, mti);
394         if (rc == 0) {
395                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
396                                    "this MGS does not know about it, preventing "
397                                    "registration.\n", mti->mti_svname);
398                 rc = -ENOENT;
399         } else if (rc == -1) {
400                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
401                                    "disappeared! Regenerating all logs.\n",
402                                    mti->mti_fsname);
403                 mti->mti_flags |= LDD_F_WRITECONF;
404                 rc = 1;
405         } else {
406                 /* Index is correctly marked as used */
407
408                 /* If the logs don't contain the mti_nids then add
409                    them as failover nids */
410                 rc = mgs_check_failnid(obd, mti);
411         }
412
413         RETURN(rc);
414 }
415
416 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
417 static int mgs_handle_target_reg(struct ptlrpc_request *req)
418 {
419         struct obd_device *obd = req->rq_export->exp_obd;
420         struct lustre_handle lockh;
421         struct mgs_target_info *mti, *rep_mti;
422         int rc = 0, lockrc;
423         ENTRY;
424
425         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
426
427         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
428         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
429                                 LDD_F_UPDATE))) {
430                 /* We're just here as a startup ping. */
431                 CDEBUG(D_MGS, "Server %s is running on %s\n",
432                        mti->mti_svname, obd_export_nid2str(req->rq_export));
433                 rc = mgs_check_target(obd, mti);
434                 /* above will set appropriate mti flags */
435                 if (rc <= 0)
436                         /* Nothing wrong, or fatal error */
437                         GOTO(out_nolock, rc);
438         }
439
440         /* Revoke the config lock to make sure nobody is reading. */
441         /* Although actually I think it should be alright if
442            someone was reading while we were updating the logs - if we
443            revoke at the end they will just update from where they left off. */
444         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
445         if (lockrc != ELDLM_OK) {
446                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
447                                    "update their configuration (%d). Updating "
448                                    "local logs anyhow; you might have to "
449                                    "manually restart other nodes to get the "
450                                    "latest configuration.\n",
451                                    obd->obd_name, lockrc);
452         }
453
454         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
455
456         /* Log writing contention is handled by the fsdb_sem */
457
458         if (mti->mti_flags & LDD_F_WRITECONF) {
459                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
460                     mti->mti_stripe_index == 0) {
461                         rc = mgs_erase_logs(obd, mti->mti_fsname);
462                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
463                                       "request.  All servers must be restarted "
464                                       "in order to regenerate the logs."
465                                       "\n", obd->obd_name, mti->mti_fsname);
466                 } else if (mti->mti_flags &
467                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
468                         rc = mgs_erase_log(obd, mti->mti_svname);
469                         LCONSOLE_WARN("%s: Regenerating %s log by user "
470                                       "request.\n",
471                                       obd->obd_name, mti->mti_svname);
472                 }
473                 mti->mti_flags |= LDD_F_UPDATE;
474                 /* Erased logs means start from scratch. */
475                 mti->mti_flags &= ~LDD_F_UPGRADE14;
476         }
477
478         /* COMPAT_146 */
479         if (mti->mti_flags & LDD_F_UPGRADE14) {
480                 rc = mgs_upgrade_sv_14(obd, mti);
481                 if (rc) {
482                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
483                         GOTO(out, rc);
484                 }
485
486                 /* We're good to go */
487                 mti->mti_flags |= LDD_F_UPDATE;
488         }
489         /* end COMPAT_146 */
490
491         if (mti->mti_flags & LDD_F_UPDATE) {
492                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
493                        mti->mti_stripe_index);
494
495                 /* create or update the target log
496                    and update the client/mdt logs */
497                 rc = mgs_write_log_target(obd, mti);
498                 if (rc) {
499                         CERROR("Failed to write %s log (%d)\n",
500                                mti->mti_svname, rc);
501                         GOTO(out, rc);
502                 }
503
504                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
505                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
506                                     LDD_F_UPGRADE14);
507                 mti->mti_flags |= LDD_F_REWRITE_LDD;
508         }
509
510 out:
511         /* done with log update */
512         if (lockrc == ELDLM_OK)
513                 mgs_put_cfg_lock(&lockh);
514 out_nolock:
515         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
516                mti->mti_stripe_index, rc);
517         rc = req_capsule_server_pack(&req->rq_pill);
518         if (rc)
519                 RETURN(rc);
520
521         /* send back the whole mti in the reply */
522         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
523         *rep_mti = *mti;
524
525         /* Flush logs to disk */
526         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
527         RETURN(rc);
528 }
529
530 static int mgs_set_info_rpc(struct ptlrpc_request *req)
531 {
532         struct obd_device *obd = req->rq_export->exp_obd;
533         struct mgs_send_param *msp, *rep_msp;
534         struct lustre_handle lockh;
535         int rc;
536         struct lustre_cfg_bufs bufs;
537         struct lustre_cfg *lcfg;
538         char fsname[MTI_NAME_MAXLEN];
539         ENTRY;
540
541         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
542         LASSERT(msp);
543
544         /* Construct lustre_cfg structure to pass to function mgs_setparam */
545         lustre_cfg_bufs_reset(&bufs, NULL);
546         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
547         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
548         rc = mgs_setparam(obd, lcfg, fsname);
549         if (rc) {
550                 CERROR("Error %d in setting the parameter %s for fs %s\n",
551                        rc, msp->mgs_param, fsname);
552                 RETURN(rc);
553         }
554
555         /* request for update */
556         mgs_revoke_lock(obd, fsname, &lockh);
557
558         lustre_cfg_free(lcfg);
559
560         rc = req_capsule_server_pack(&req->rq_pill);
561         if (rc == 0) {
562                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
563                 rep_msp = msp;
564         }
565         RETURN(rc);
566 }
567
568 /*
569  * similar as in ost_connect_check_sptlrpc()
570  */
571 static int mgs_connect_check_sptlrpc(struct ptlrpc_request *req)
572 {
573         struct obd_export     *exp = req->rq_export;
574         struct obd_device     *obd = exp->exp_obd;
575         struct fs_db          *fsdb;
576         struct sptlrpc_flavor  flvr;
577         int                    rc = 0;
578
579         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
580                 rc = mgs_find_or_make_fsdb(obd, MGSSELF_NAME, &fsdb);
581                 if (rc)
582                         return rc;
583
584                 down(&fsdb->fsdb_sem);
585                 if (sptlrpc_rule_set_choose(&fsdb->fsdb_srpc_gen,
586                                             LUSTRE_SP_MGC, LUSTRE_SP_MGS,
587                                             req->rq_peer.nid,
588                                             &flvr) == 0) {
589                         /* by defualt allow any flavors */
590                         flvr.sf_rpc = SPTLRPC_FLVR_ANY;
591                 }
592                 up(&fsdb->fsdb_sem);
593
594                 spin_lock(&exp->exp_lock);
595
596                 exp->exp_sp_peer = req->rq_sp_from;
597                 exp->exp_flvr = flvr;
598
599                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
600                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
601                         CERROR("invalid rpc flavor %x, expect %x, from %s\n",
602                                req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
603                                libcfs_nid2str(req->rq_peer.nid));
604                         rc = -EACCES;
605                 }
606
607                 spin_unlock(&exp->exp_lock);
608         } else {
609                 if (exp->exp_sp_peer != req->rq_sp_from) {
610                         CERROR("RPC source %s doesn't match %s\n",
611                                sptlrpc_part2name(req->rq_sp_from),
612                                sptlrpc_part2name(exp->exp_sp_peer));
613                         rc = -EACCES;
614                 } else {
615                         rc = sptlrpc_target_export_check(exp, req);
616                 }
617         }
618
619         return rc;
620 }
621
622 /* Called whenever a target cleans up. */
623 /* XXX - Currently unused */
624 static int mgs_handle_target_del(struct ptlrpc_request *req)
625 {
626         ENTRY;
627         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
628         RETURN(0);
629 }
630
631 /* XXX - Currently unused */
632 static int mgs_handle_exception(struct ptlrpc_request *req)
633 {
634         ENTRY;
635         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
636         RETURN(0);
637 }
638
639 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
640 int mgs_handle(struct ptlrpc_request *req)
641 {
642         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
643         int opc, rc = 0;
644         ENTRY;
645
646         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
647         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
648         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
649                 RETURN(0);
650
651         LASSERT(current->journal_info == NULL);
652         opc = lustre_msg_get_opc(req->rq_reqmsg);
653
654         if (opc == SEC_CTX_INIT ||
655             opc == SEC_CTX_INIT_CONT ||
656             opc == SEC_CTX_FINI)
657                 GOTO(out, rc = 0);
658
659         if (opc != MGS_CONNECT) {
660                 if (req->rq_export == NULL) {
661                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
662                                opc);
663                         req->rq_status = -ENOTCONN;
664                         GOTO(out, rc = -ENOTCONN);
665                 }
666         }
667
668         switch (opc) {
669         case MGS_CONNECT:
670                 DEBUG_REQ(D_MGS, req, "connect");
671                 /* MGS and MDS have same request format for connect */
672                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
673                 rc = target_handle_connect(req);
674                 if (rc == 0)
675                         rc = mgs_connect_check_sptlrpc(req);
676
677                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
678                         /* Make clients trying to reconnect after a MGS restart
679                            happy; also requires obd_replayable */
680                         lustre_msg_add_op_flags(req->rq_repmsg,
681                                                 MSG_CONNECT_RECONNECT);
682                 break;
683         case MGS_DISCONNECT:
684                 DEBUG_REQ(D_MGS, req, "disconnect");
685                 /* MGS and MDS have same request format for disconnect */
686                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
687                 rc = target_handle_disconnect(req);
688                 req->rq_status = rc;            /* superfluous? */
689                 break;
690         case MGS_EXCEPTION:
691                 DEBUG_REQ(D_MGS, req, "exception");
692                 rc = mgs_handle_exception(req);
693                 break;
694         case MGS_TARGET_REG:
695                 DEBUG_REQ(D_MGS, req, "target add");
696                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
697                 rc = mgs_handle_target_reg(req);
698                 break;
699         case MGS_TARGET_DEL:
700                 DEBUG_REQ(D_MGS, req, "target del");
701                 rc = mgs_handle_target_del(req);
702                 break;
703         case MGS_SET_INFO:
704                 DEBUG_REQ(D_MGS, req, "set_info");
705                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
706                 rc = mgs_set_info_rpc(req);
707                 break;
708
709         case LDLM_ENQUEUE:
710                 DEBUG_REQ(D_MGS, req, "enqueue");
711                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
712                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
713                                          ldlm_server_blocking_ast, NULL);
714                 break;
715         case LDLM_BL_CALLBACK:
716         case LDLM_CP_CALLBACK:
717                 DEBUG_REQ(D_MGS, req, "callback");
718                 CERROR("callbacks should not happen on MGS\n");
719                 LBUG();
720                 break;
721
722         case OBD_PING:
723                 DEBUG_REQ(D_INFO, req, "ping");
724                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
725                 rc = target_handle_ping(req);
726                 break;
727         case OBD_LOG_CANCEL:
728                 DEBUG_REQ(D_MGS, req, "log cancel");
729                 rc = -ENOTSUPP; /* la la la */
730                 break;
731
732         case LLOG_ORIGIN_HANDLE_CREATE:
733                 DEBUG_REQ(D_MGS, req, "llog_init");
734                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
735                 rc = llog_origin_handle_create(req);
736                 break;
737         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
738                 DEBUG_REQ(D_MGS, req, "llog next block");
739                 req_capsule_set(&req->rq_pill,
740                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
741                 rc = llog_origin_handle_next_block(req);
742                 break;
743         case LLOG_ORIGIN_HANDLE_READ_HEADER:
744                 DEBUG_REQ(D_MGS, req, "llog read header");
745                 req_capsule_set(&req->rq_pill,
746                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
747                 rc = llog_origin_handle_read_header(req);
748                 break;
749         case LLOG_ORIGIN_HANDLE_CLOSE:
750                 DEBUG_REQ(D_MGS, req, "llog close");
751                 rc = llog_origin_handle_close(req);
752                 break;
753         case LLOG_CATINFO:
754                 DEBUG_REQ(D_MGS, req, "llog catinfo");
755                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
756                 rc = llog_catinfo(req);
757                 break;
758         default:
759                 req->rq_status = -ENOTSUPP;
760                 rc = ptlrpc_error(req);
761                 RETURN(rc);
762         }
763
764         LASSERT(current->journal_info == NULL);
765
766         if (rc)
767                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
768
769 out:
770         target_send_reply(req, rc, fail);
771         RETURN(0);
772 }
773
774 static inline int mgs_init_export(struct obd_export *exp)
775 {
776         spin_lock(&exp->exp_lock);
777         exp->exp_connecting = 1;
778         spin_unlock(&exp->exp_lock);
779
780         return ldlm_init_export(exp);
781 }
782
783 static inline int mgs_destroy_export(struct obd_export *exp)
784 {
785         ENTRY;
786
787         target_destroy_export(exp);
788         mgs_client_free(exp);
789         ldlm_destroy_export(exp);
790
791         RETURN(0);
792 }
793
794 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
795 {
796         char *ptr;
797
798         ENTRY;
799         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
800                 *fsname = *ptr;
801                 fsname++;
802         }
803         if (*ptr == '\0')
804                 return -EINVAL;
805         *fsname = '\0';
806         ptr++;
807         strcpy(poolname, ptr);
808
809         RETURN(0);
810 }
811
812 static int mgs_iocontrol_pool(struct obd_device *obd,
813                               struct obd_ioctl_data *data)
814 {
815         int rc;
816         struct lustre_handle lockh;
817         struct lustre_cfg *lcfg = NULL;
818         struct llog_rec_hdr rec;
819         char *fsname = NULL;
820         char *poolname = NULL;
821         ENTRY;
822
823         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
824         if (fsname == NULL)
825                 RETURN(-ENOMEM);
826
827         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
828         if (poolname == NULL) {
829                 rc = -ENOMEM;
830                 GOTO(out_pool, rc);
831         }
832         rec.lrh_len = llog_data_len(data->ioc_plen1);
833
834         if (data->ioc_type == LUSTRE_CFG_TYPE) {
835                 rec.lrh_type = OBD_CFG_REC;
836         } else {
837                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
838                 rc = -EINVAL;
839                 GOTO(out_pool, rc);
840         }
841
842         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
843                 rc = -E2BIG;
844                 GOTO(out_pool, rc);
845         }
846
847         OBD_ALLOC(lcfg, data->ioc_plen1);
848         if (lcfg == NULL)
849                 GOTO(out_pool, rc = -ENOMEM);
850
851         if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
852                 GOTO(out_pool, rc = -EFAULT);
853
854         if (lcfg->lcfg_bufcount < 2) {
855                 GOTO(out_pool, rc = -EFAULT);
856         }
857
858         /* first arg is always <fsname>.<poolname> */
859         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
860                             poolname);
861
862         switch (lcfg->lcfg_command) {
863         case LCFG_POOL_NEW: {
864                 if (lcfg->lcfg_bufcount != 2)
865                         RETURN(-EINVAL);
866                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
867                                   poolname, NULL);
868                 break;
869         }
870         case LCFG_POOL_ADD: {
871                 if (lcfg->lcfg_bufcount != 3)
872                         RETURN(-EINVAL);
873                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
874                                   lustre_cfg_string(lcfg, 2));
875                 break;
876         }
877         case LCFG_POOL_REM: {
878                 if (lcfg->lcfg_bufcount != 3)
879                         RETURN(-EINVAL);
880                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
881                                   lustre_cfg_string(lcfg, 2));
882                 break;
883         }
884         case LCFG_POOL_DEL: {
885                 if (lcfg->lcfg_bufcount != 2)
886                         RETURN(-EINVAL);
887                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
888                                   poolname, NULL);
889                 break;
890         }
891         default: {
892                  rc = -EINVAL;
893                  GOTO(out_pool, rc);
894         }
895         }
896
897         if (rc) {
898                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
899                        rc, lcfg->lcfg_command, fsname, poolname);
900                 GOTO(out_pool, rc);
901         }
902
903         /* request for update */
904         mgs_revoke_lock(obd, fsname, &lockh);
905
906 out_pool:
907         if (lcfg != NULL)
908                 OBD_FREE(lcfg, data->ioc_plen1);
909
910         if (fsname != NULL)
911                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
912
913         if (poolname != NULL)
914                 OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
915
916         RETURN(rc);
917 }
918
919 /* from mdt_iocontrol */
920 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
921                   void *karg, void *uarg)
922 {
923         struct obd_device *obd = exp->exp_obd;
924         struct obd_ioctl_data *data = karg;
925         struct lvfs_run_ctxt saved;
926         int rc = 0;
927
928         ENTRY;
929         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
930
931         switch (cmd) {
932
933         case OBD_IOC_PARAM: {
934                 struct lustre_handle lockh;
935                 struct lustre_cfg *lcfg;
936                 struct llog_rec_hdr rec;
937                 char fsname[MTI_NAME_MAXLEN];
938
939                 rec.lrh_len = llog_data_len(data->ioc_plen1);
940
941                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
942                         rec.lrh_type = OBD_CFG_REC;
943                 } else {
944                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
945                         RETURN(-EINVAL);
946                 }
947
948                 OBD_ALLOC(lcfg, data->ioc_plen1);
949                 if (lcfg == NULL)
950                         RETURN(-ENOMEM);
951                 if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
952                         GOTO(out_free, rc = -EFAULT);
953
954                 if (lcfg->lcfg_bufcount < 1)
955                         GOTO(out_free, rc = -EINVAL);
956
957                 rc = mgs_setparam(obd, lcfg, fsname);
958                 if (rc) {
959                         CERROR("setparam err %d\n", rc);
960                         GOTO(out_free, rc);
961                 }
962
963                 /* Revoke lock so everyone updates.  Should be alright if
964                    someone was already reading while we were updating the logs,
965                    so we don't really need to hold the lock while we're
966                    writing (above). */
967                 mgs_revoke_lock(obd, fsname, &lockh);
968
969 out_free:
970                 OBD_FREE(lcfg, data->ioc_plen1);
971                 RETURN(rc);
972         }
973
974         case OBD_IOC_POOL: {
975                 RETURN(mgs_iocontrol_pool(obd, data));
976         }
977
978         case OBD_IOC_DUMP_LOG: {
979                 struct llog_ctxt *ctxt;
980                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
981                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
982                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
983                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
984                 llog_ctxt_put(ctxt);
985
986                 RETURN(rc);
987         }
988
989         case OBD_IOC_LLOG_CHECK:
990         case OBD_IOC_LLOG_INFO:
991         case OBD_IOC_LLOG_PRINT: {
992                 struct llog_ctxt *ctxt;
993                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
994
995                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
996                 rc = llog_ioctl(ctxt, cmd, data);
997                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
998                 llog_ctxt_put(ctxt);
999
1000                 RETURN(rc);
1001         }
1002
1003         default:
1004                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
1005                 RETURN(-EINVAL);
1006         }
1007         RETURN(0);
1008 }
1009
1010 /* use obd ops to offer management infrastructure */
1011 static struct obd_ops mgs_obd_ops = {
1012         .o_owner           = THIS_MODULE,
1013         .o_connect         = mgs_connect,
1014         .o_reconnect       = mgs_reconnect,
1015         .o_disconnect      = mgs_disconnect,
1016         .o_setup           = mgs_setup,
1017         .o_precleanup      = mgs_precleanup,
1018         .o_cleanup         = mgs_cleanup,
1019         .o_init_export     = mgs_init_export,
1020         .o_destroy_export  = mgs_destroy_export,
1021         .o_iocontrol       = mgs_iocontrol,
1022         .o_llog_init       = mgs_llog_init,
1023         .o_llog_finish     = mgs_llog_finish
1024 };
1025
1026 static int __init mgs_init(void)
1027 {
1028         struct lprocfs_static_vars lvars;
1029
1030         lprocfs_mgs_init_vars(&lvars);
1031         class_register_type(&mgs_obd_ops, NULL,
1032                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
1033
1034         return 0;
1035 }
1036
1037 static void /*__exit*/ mgs_exit(void)
1038 {
1039         class_unregister_type(LUSTRE_MGS_NAME);
1040 }
1041
1042 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1043 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
1044 MODULE_LICENSE("GPL");
1045
1046 module_init(mgs_init);
1047 module_exit(mgs_exit);