Whamcloud - gitweb
581f1e1d45d4e70302aeeaa42bdc847c3bb10e5e
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct lustre_handle *conn, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *exp;
71         int rc;
72         ENTRY;
73
74         if (!conn || !obd || !cluuid)
75                 RETURN(-EINVAL);
76
77         rc = class_connect(conn, obd, cluuid);
78         if (rc)
79                 RETURN(rc);
80         exp = class_conn2export(conn);
81         LASSERT(exp);
82
83         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
84
85         if (data != NULL) {
86                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
87                 exp->exp_connect_flags = data->ocd_connect_flags;
88                 data->ocd_version = LUSTRE_VERSION_CODE;
89         }
90
91         rc = mgs_client_add(obd, exp, localdata);
92
93         if (rc) {
94                 class_disconnect(exp);
95         } else {
96                 class_export_put(exp);
97         }
98
99         RETURN(rc);
100 }
101
102 static int mgs_reconnect(const struct lu_env *env,
103                          struct obd_export *exp, struct obd_device *obd,
104                          struct obd_uuid *cluuid, struct obd_connect_data *data,
105                          void *localdata)
106 {
107         ENTRY;
108
109         if (exp == NULL || obd == NULL || cluuid == NULL)
110                 RETURN(-EINVAL);
111
112         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
113
114         if (data != NULL) {
115                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
116                 exp->exp_connect_flags = data->ocd_connect_flags;
117                 data->ocd_version = LUSTRE_VERSION_CODE;
118         }
119
120         RETURN(0);
121 }
122
123 static int mgs_disconnect(struct obd_export *exp)
124 {
125         int rc;
126         ENTRY;
127
128         LASSERT(exp);
129
130         class_export_get(exp);
131         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
132
133         /* Disconnect early so that clients can't keep using export */
134         rc = class_disconnect(exp);
135         ldlm_cancel_locks_for_export(exp);
136
137         lprocfs_exp_cleanup(exp);
138
139         /* complete all outstanding replies */
140         spin_lock(&exp->exp_lock);
141         while (!list_empty(&exp->exp_outstanding_replies)) {
142                 struct ptlrpc_reply_state *rs =
143                         list_entry(exp->exp_outstanding_replies.next,
144                                    struct ptlrpc_reply_state, rs_exp_list);
145                 struct ptlrpc_service *svc = rs->rs_service;
146
147                 spin_lock(&svc->srv_lock);
148                 list_del_init(&rs->rs_exp_list);
149                 spin_lock(&rs->rs_lock);
150                 ptlrpc_schedule_difficult_reply(rs);
151                 spin_unlock(&rs->rs_lock);
152                 spin_unlock(&svc->srv_lock);
153         }
154         spin_unlock(&exp->exp_lock);
155
156         class_export_put(exp);
157         RETURN(rc);
158 }
159
160 static int mgs_cleanup(struct obd_device *obd);
161 static int mgs_handle(struct ptlrpc_request *req);
162
163 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
164                          struct obd_device *tgt, int count,
165                          struct llog_catid *logid, struct obd_uuid *uuid)
166 {
167         int rc;
168         ENTRY;
169
170         LASSERT(olg == &obd->obd_olg);
171         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
172                         &llog_lvfs_ops);
173         RETURN(rc);
174 }
175
176 static int mgs_llog_finish(struct obd_device *obd, int count)
177 {
178         struct llog_ctxt *ctxt;
179         int rc = 0;
180         ENTRY;
181
182         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
183         if (ctxt)
184                 rc = llog_cleanup(ctxt);
185
186         RETURN(rc);
187 }
188
189 /* Start the MGS obd */
190 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
191 {
192         struct lprocfs_static_vars lvars;
193         struct mgs_obd *mgs = &obd->u.mgs;
194         struct lustre_mount_info *lmi;
195         struct lustre_sb_info *lsi;
196         struct vfsmount *mnt;
197         int rc = 0;
198         ENTRY;
199
200         CDEBUG(D_CONFIG, "Starting MGS\n");
201
202         /* Find our disk */
203         lmi = server_get_mount(obd->obd_name);
204         if (!lmi)
205                 RETURN(rc = -EINVAL);
206
207         mnt = lmi->lmi_mnt;
208         lsi = s2lsi(lmi->lmi_sb);
209         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
210         if (IS_ERR(obd->obd_fsops))
211                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
212
213         /* namespace for mgs llog */
214         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
215                                                 LDLM_NAMESPACE_MODEST);
216         if (obd->obd_namespace == NULL)
217                 GOTO(err_ops, rc = -ENOMEM);
218
219         /* ldlm setup */
220         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
221                            "mgs_ldlm_client", &obd->obd_ldlm_client);
222
223         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
224
225         rc = mgs_fs_setup(obd, mnt);
226         if (rc) {
227                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
228                        obd->obd_name, rc);
229                 GOTO(err_ns, rc);
230         }
231
232         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
233         if (rc)
234                 GOTO(err_fs, rc);
235
236         /* No recovery for MGC's */
237         obd->obd_replayable = 0;
238
239         /* Internal mgs setup */
240         mgs_init_fsdb_list(obd);
241         sema_init(&mgs->mgs_sem, 1);
242
243         /* Start the service threads */
244         mgs->mgs_service =
245                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
246                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
247                                 MGC_REPLY_PORTAL, 2000,
248                                 mgs_handle, LUSTRE_MGS_NAME,
249                                 obd->obd_proc_entry, target_print_req,
250                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
251                                 "ll_mgs", LCT_MD_THREAD, NULL);
252
253         if (!mgs->mgs_service) {
254                 CERROR("failed to start service\n");
255                 GOTO(err_llog, rc = -ENOMEM);
256         }
257
258         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
259         if (rc)
260                 GOTO(err_thread, rc);
261
262         /* Setup proc */
263         lprocfs_mgs_init_vars(&lvars);
264         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
265                 lproc_mgs_setup(obd);
266         }
267
268         ping_evictor_start();
269
270         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
271
272         RETURN(0);
273
274 err_thread:
275         ptlrpc_unregister_service(mgs->mgs_service);
276 err_llog:
277         obd_llog_finish(obd, 0);
278 err_fs:
279         /* No extra cleanup needed for llog_init_commit_thread() */
280         mgs_fs_cleanup(obd);
281 err_ns:
282         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
283         obd->obd_namespace = NULL;
284 err_ops:
285         fsfilt_put_ops(obd->obd_fsops);
286 err_put:
287         server_put_mount(obd->obd_name, mnt);
288         mgs->mgs_sb = 0;
289         return rc;
290 }
291
292 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
293 {
294         int rc = 0;
295         ENTRY;
296
297         switch (stage) {
298         case OBD_CLEANUP_EARLY:
299         case OBD_CLEANUP_EXPORTS:
300                 rc = obd_llog_finish(obd, 0);
301                 break;
302         }
303         RETURN(rc);
304 }
305
306 /**
307  * Performs cleanup procedures for passed \a obd given it is mgs obd.
308  */
309 static int mgs_cleanup(struct obd_device *obd)
310 {
311         struct mgs_obd *mgs = &obd->u.mgs;
312         ENTRY;
313
314         if (mgs->mgs_sb == NULL)
315                 RETURN(0);
316
317         ping_evictor_stop();
318
319         ptlrpc_unregister_service(mgs->mgs_service);
320
321         mgs_cleanup_fsdb_list(obd);
322         lproc_mgs_cleanup(obd);
323         mgs_fs_cleanup(obd);
324
325         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
326         mgs->mgs_sb = NULL;
327
328         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
329         obd->obd_namespace = NULL;
330
331         fsfilt_put_ops(obd->obd_fsops);
332
333         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
334         RETURN(0);
335 }
336
337 /* similar to filter_prepare_destroy */
338 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
339                             struct lustre_handle *lockh)
340 {
341         struct ldlm_res_id res_id;
342         int rc, flags = 0;
343         ENTRY;
344
345         rc = mgc_fsname2resid(fsname, &res_id);
346         if (!rc)
347                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
348                                             LDLM_PLAIN, NULL, LCK_EX,
349                                             &flags, ldlm_blocking_ast,
350                                             ldlm_completion_ast, NULL,
351                                             fsname, 0, NULL, NULL, lockh);
352         if (rc)
353                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
354
355         RETURN(rc);
356 }
357
358 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
359 {
360         ENTRY;
361         ldlm_lock_decref(lockh, LCK_EX);
362         RETURN(0);
363 }
364
365 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
366                             struct lustre_handle *lockh)
367 {
368         int lockrc;
369
370         if (fsname[0]) {
371                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
372                 if (lockrc != ELDLM_OK)
373                         CERROR("lock error %d for fs %s\n", lockrc,
374                                fsname);
375                 else
376                         mgs_put_cfg_lock(lockh);
377         }
378 }
379
380 /* rc=0 means ok
381       1 means update
382      <0 means error */
383 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
384 {
385         int rc;
386         ENTRY;
387
388         rc = mgs_check_index(obd, mti);
389         if (rc == 0) {
390                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
391                                    "this MGS does not know about it, preventing "
392                                    "registration.\n", mti->mti_svname);
393                 rc = -ENOENT;
394         } else if (rc == -1) {
395                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
396                                    "disappeared! Regenerating all logs.\n",
397                                    mti->mti_fsname);
398                 mti->mti_flags |= LDD_F_WRITECONF;
399                 rc = 1;
400         } else {
401                 /* Index is correctly marked as used */
402
403                 /* If the logs don't contain the mti_nids then add
404                    them as failover nids */
405                 rc = mgs_check_failnid(obd, mti);
406         }
407
408         RETURN(rc);
409 }
410
411 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
412 static int mgs_handle_target_reg(struct ptlrpc_request *req)
413 {
414         struct obd_device *obd = req->rq_export->exp_obd;
415         struct lustre_handle lockh;
416         struct mgs_target_info *mti, *rep_mti;
417         int rc = 0, lockrc;
418         ENTRY;
419
420         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
421
422         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
423         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
424                                 LDD_F_UPDATE))) {
425                 /* We're just here as a startup ping. */
426                 CDEBUG(D_MGS, "Server %s is running on %s\n",
427                        mti->mti_svname, obd_export_nid2str(req->rq_export));
428                 rc = mgs_check_target(obd, mti);
429                 /* above will set appropriate mti flags */
430                 if (rc <= 0)
431                         /* Nothing wrong, or fatal error */
432                         GOTO(out_nolock, rc);
433         }
434
435         /* Revoke the config lock to make sure nobody is reading. */
436         /* Although actually I think it should be alright if
437            someone was reading while we were updating the logs - if we
438            revoke at the end they will just update from where they left off. */
439         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
440         if (lockrc != ELDLM_OK) {
441                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
442                                    "update their configuration (%d). Updating "
443                                    "local logs anyhow; you might have to "
444                                    "manually restart other nodes to get the "
445                                    "latest configuration.\n",
446                                    obd->obd_name, lockrc);
447         }
448
449         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
450
451         /* Log writing contention is handled by the fsdb_sem */
452
453         if (mti->mti_flags & LDD_F_WRITECONF) {
454                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
455                     mti->mti_stripe_index == 0) {
456                         rc = mgs_erase_logs(obd, mti->mti_fsname);
457                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
458                                       "request.  All servers must be restarted "
459                                       "in order to regenerate the logs."
460                                       "\n", obd->obd_name, mti->mti_fsname);
461                 } else if (mti->mti_flags &
462                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
463                         rc = mgs_erase_log(obd, mti->mti_svname);
464                         LCONSOLE_WARN("%s: Regenerating %s log by user "
465                                       "request.\n",
466                                       obd->obd_name, mti->mti_svname);
467                 }
468                 mti->mti_flags |= LDD_F_UPDATE;
469                 /* Erased logs means start from scratch. */
470                 mti->mti_flags &= ~LDD_F_UPGRADE14;
471         }
472
473         /* COMPAT_146 */
474         if (mti->mti_flags & LDD_F_UPGRADE14) {
475                 rc = mgs_upgrade_sv_14(obd, mti);
476                 if (rc) {
477                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
478                         GOTO(out, rc);
479                 }
480
481                 /* We're good to go */
482                 mti->mti_flags |= LDD_F_UPDATE;
483         }
484         /* end COMPAT_146 */
485
486         if (mti->mti_flags & LDD_F_UPDATE) {
487                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
488                        mti->mti_stripe_index);
489
490                 /* create or update the target log
491                    and update the client/mdt logs */
492                 rc = mgs_write_log_target(obd, mti);
493                 if (rc) {
494                         CERROR("Failed to write %s log (%d)\n",
495                                mti->mti_svname, rc);
496                         GOTO(out, rc);
497                 }
498
499                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
500                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
501                                     LDD_F_UPGRADE14);
502                 mti->mti_flags |= LDD_F_REWRITE_LDD;
503         }
504
505 out:
506         /* done with log update */
507         if (lockrc == ELDLM_OK)
508                 mgs_put_cfg_lock(&lockh);
509 out_nolock:
510         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
511                mti->mti_stripe_index, rc);
512         rc = req_capsule_server_pack(&req->rq_pill);
513         if (rc)
514                 RETURN(rc);
515
516         /* send back the whole mti in the reply */
517         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
518         *rep_mti = *mti;
519
520         /* Flush logs to disk */
521         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
522         RETURN(rc);
523 }
524
525 static int mgs_set_info_rpc(struct ptlrpc_request *req)
526 {
527         struct obd_device *obd = req->rq_export->exp_obd;
528         struct mgs_send_param *msp, *rep_msp;
529         struct lustre_handle lockh;
530         int rc;
531         struct lustre_cfg_bufs bufs;
532         struct lustre_cfg *lcfg;
533         char fsname[MTI_NAME_MAXLEN];
534         ENTRY;
535
536         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
537         LASSERT(msp);
538
539         /* Construct lustre_cfg structure to pass to function mgs_setparam */
540         lustre_cfg_bufs_reset(&bufs, NULL);
541         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
542         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
543         rc = mgs_setparam(obd, lcfg, fsname);
544         if (rc) {
545                 CERROR("Error %d in setting the parameter %s for fs %s\n",
546                        rc, msp->mgs_param, fsname);
547                 RETURN(rc);
548         }
549
550         /* request for update */
551         mgs_revoke_lock(obd, fsname, &lockh);
552
553         lustre_cfg_free(lcfg);
554
555         rc = req_capsule_server_pack(&req->rq_pill);
556         if (rc == 0) {
557                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
558                 rep_msp = msp;
559         }
560         RETURN(rc);
561 }
562
563 /*
564  * similar as in ost_connect_check_sptlrpc()
565  */
566 static int mgs_connect_check_sptlrpc(struct ptlrpc_request *req)
567 {
568         struct obd_export     *exp = req->rq_export;
569         struct obd_device     *obd = exp->exp_obd;
570         struct fs_db          *fsdb;
571         struct sptlrpc_flavor  flvr;
572         int                    rc = 0;
573
574         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
575                 rc = mgs_find_or_make_fsdb(obd, MGSSELF_NAME, &fsdb);
576                 if (rc)
577                         return rc;
578
579                 down(&fsdb->fsdb_sem);
580                 if (sptlrpc_rule_set_choose(&fsdb->fsdb_srpc_gen,
581                                             LUSTRE_SP_MGC, LUSTRE_SP_MGS,
582                                             req->rq_peer.nid,
583                                             &flvr) == 0) {
584                         /* by defualt allow any flavors */
585                         flvr.sf_rpc = SPTLRPC_FLVR_ANY;
586                 }
587                 up(&fsdb->fsdb_sem);
588
589                 spin_lock(&exp->exp_lock);
590
591                 exp->exp_sp_peer = req->rq_sp_from;
592                 exp->exp_flvr = flvr;
593
594                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
595                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
596                         CERROR("invalid rpc flavor %x, expect %x, from %s\n",
597                                req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
598                                libcfs_nid2str(req->rq_peer.nid));
599                         rc = -EACCES;
600                 }
601
602                 spin_unlock(&exp->exp_lock);
603         } else {
604                 if (exp->exp_sp_peer != req->rq_sp_from) {
605                         CERROR("RPC source %s doesn't match %s\n",
606                                sptlrpc_part2name(req->rq_sp_from),
607                                sptlrpc_part2name(exp->exp_sp_peer));
608                         rc = -EACCES;
609                 } else {
610                         rc = sptlrpc_target_export_check(exp, req);
611                 }
612         }
613
614         return rc;
615 }
616
617 /* Called whenever a target cleans up. */
618 /* XXX - Currently unused */
619 static int mgs_handle_target_del(struct ptlrpc_request *req)
620 {
621         ENTRY;
622         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
623         RETURN(0);
624 }
625
626 /* XXX - Currently unused */
627 static int mgs_handle_exception(struct ptlrpc_request *req)
628 {
629         ENTRY;
630         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
631         RETURN(0);
632 }
633
634 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
635 int mgs_handle(struct ptlrpc_request *req)
636 {
637         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
638         int opc, rc = 0;
639         ENTRY;
640
641         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
642         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
643         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
644                 RETURN(0);
645
646         LASSERT(current->journal_info == NULL);
647         opc = lustre_msg_get_opc(req->rq_reqmsg);
648
649         if (opc == SEC_CTX_INIT ||
650             opc == SEC_CTX_INIT_CONT ||
651             opc == SEC_CTX_FINI)
652                 GOTO(out, rc = 0);
653
654         if (opc != MGS_CONNECT) {
655                 if (req->rq_export == NULL) {
656                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
657                                opc);
658                         req->rq_status = -ENOTCONN;
659                         GOTO(out, rc = -ENOTCONN);
660                 }
661         }
662
663         switch (opc) {
664         case MGS_CONNECT:
665                 DEBUG_REQ(D_MGS, req, "connect");
666                 /* MGS and MDS have same request format for connect */
667                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
668                 rc = target_handle_connect(req);
669                 if (rc == 0)
670                         rc = mgs_connect_check_sptlrpc(req);
671
672                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
673                         /* Make clients trying to reconnect after a MGS restart
674                            happy; also requires obd_replayable */
675                         lustre_msg_add_op_flags(req->rq_repmsg,
676                                                 MSG_CONNECT_RECONNECT);
677                 break;
678         case MGS_DISCONNECT:
679                 DEBUG_REQ(D_MGS, req, "disconnect");
680                 /* MGS and MDS have same request format for disconnect */
681                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
682                 rc = target_handle_disconnect(req);
683                 req->rq_status = rc;            /* superfluous? */
684                 break;
685         case MGS_EXCEPTION:
686                 DEBUG_REQ(D_MGS, req, "exception");
687                 rc = mgs_handle_exception(req);
688                 break;
689         case MGS_TARGET_REG:
690                 DEBUG_REQ(D_MGS, req, "target add");
691                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
692                 rc = mgs_handle_target_reg(req);
693                 break;
694         case MGS_TARGET_DEL:
695                 DEBUG_REQ(D_MGS, req, "target del");
696                 rc = mgs_handle_target_del(req);
697                 break;
698         case MGS_SET_INFO:
699                 DEBUG_REQ(D_MGS, req, "set_info");
700                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
701                 rc = mgs_set_info_rpc(req);
702                 break;
703
704         case LDLM_ENQUEUE:
705                 DEBUG_REQ(D_MGS, req, "enqueue");
706                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
707                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
708                                          ldlm_server_blocking_ast, NULL);
709                 break;
710         case LDLM_BL_CALLBACK:
711         case LDLM_CP_CALLBACK:
712                 DEBUG_REQ(D_MGS, req, "callback");
713                 CERROR("callbacks should not happen on MGS\n");
714                 LBUG();
715                 break;
716
717         case OBD_PING:
718                 DEBUG_REQ(D_INFO, req, "ping");
719                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
720                 rc = target_handle_ping(req);
721                 break;
722         case OBD_LOG_CANCEL:
723                 DEBUG_REQ(D_MGS, req, "log cancel");
724                 rc = -ENOTSUPP; /* la la la */
725                 break;
726
727         case LLOG_ORIGIN_HANDLE_CREATE:
728                 DEBUG_REQ(D_MGS, req, "llog_init");
729                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
730                 rc = llog_origin_handle_create(req);
731                 break;
732         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
733                 DEBUG_REQ(D_MGS, req, "llog next block");
734                 req_capsule_set(&req->rq_pill,
735                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
736                 rc = llog_origin_handle_next_block(req);
737                 break;
738         case LLOG_ORIGIN_HANDLE_READ_HEADER:
739                 DEBUG_REQ(D_MGS, req, "llog read header");
740                 req_capsule_set(&req->rq_pill,
741                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
742                 rc = llog_origin_handle_read_header(req);
743                 break;
744         case LLOG_ORIGIN_HANDLE_CLOSE:
745                 DEBUG_REQ(D_MGS, req, "llog close");
746                 rc = llog_origin_handle_close(req);
747                 break;
748         case LLOG_CATINFO:
749                 DEBUG_REQ(D_MGS, req, "llog catinfo");
750                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
751                 rc = llog_catinfo(req);
752                 break;
753         default:
754                 req->rq_status = -ENOTSUPP;
755                 rc = ptlrpc_error(req);
756                 RETURN(rc);
757         }
758
759         LASSERT(current->journal_info == NULL);
760
761         if (rc)
762                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
763
764 out:
765         target_send_reply(req, rc, fail);
766         RETURN(0);
767 }
768
769 static inline int mgs_init_export(struct obd_export *exp)
770 {
771         return ldlm_init_export(exp);
772 }
773
774 static inline int mgs_destroy_export(struct obd_export *exp)
775 {
776         ENTRY;
777
778         target_destroy_export(exp);
779         mgs_client_free(exp);
780         ldlm_destroy_export(exp);
781
782         RETURN(0);
783 }
784
785 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
786 {
787         char *ptr;
788
789         ENTRY;
790         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
791                 *fsname = *ptr;
792                 fsname++;
793         }
794         if (*ptr == '\0')
795                 return -EINVAL;
796         *fsname = '\0';
797         ptr++;
798         strcpy(poolname, ptr);
799
800         RETURN(0);
801 }
802
803 static int mgs_iocontrol_pool(struct obd_device *obd,
804                               struct obd_ioctl_data *data)
805 {
806         int rc;
807         struct lustre_handle lockh;
808         struct lustre_cfg *lcfg = NULL;
809         struct llog_rec_hdr rec;
810         char *fsname = NULL;
811         char *poolname = NULL;
812         ENTRY;
813
814         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
815         if (fsname == NULL)
816                 RETURN(-ENOMEM);
817
818         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
819         if (poolname == NULL) {
820                 rc = -ENOMEM;
821                 GOTO(out_pool, rc);
822         }
823         rec.lrh_len = llog_data_len(data->ioc_plen1);
824
825         if (data->ioc_type == LUSTRE_CFG_TYPE) {
826                 rec.lrh_type = OBD_CFG_REC;
827         } else {
828                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
829                 rc = -EINVAL;
830                 GOTO(out_pool, rc);
831         }
832
833         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
834                 rc = -E2BIG;
835                 GOTO(out_pool, rc);
836         }
837
838         OBD_ALLOC(lcfg, data->ioc_plen1);
839         if (lcfg == NULL)
840                 GOTO(out_pool, rc = -ENOMEM);
841
842         if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
843                 GOTO(out_pool, rc = -EFAULT);
844
845         if (lcfg->lcfg_bufcount < 2) {
846                 GOTO(out_pool, rc = -EFAULT);
847         }
848
849         /* first arg is always <fsname>.<poolname> */
850         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
851                             poolname);
852
853         switch (lcfg->lcfg_command) {
854         case LCFG_POOL_NEW: {
855                 if (lcfg->lcfg_bufcount != 2)
856                         RETURN(-EINVAL);
857                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
858                                   poolname, NULL);
859                 break;
860         }
861         case LCFG_POOL_ADD: {
862                 if (lcfg->lcfg_bufcount != 3)
863                         RETURN(-EINVAL);
864                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
865                                   lustre_cfg_string(lcfg, 2));
866                 break;
867         }
868         case LCFG_POOL_REM: {
869                 if (lcfg->lcfg_bufcount != 3)
870                         RETURN(-EINVAL);
871                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
872                                   lustre_cfg_string(lcfg, 2));
873                 break;
874         }
875         case LCFG_POOL_DEL: {
876                 if (lcfg->lcfg_bufcount != 2)
877                         RETURN(-EINVAL);
878                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
879                                   poolname, NULL);
880                 break;
881         }
882         default: {
883                  rc = -EINVAL;
884                  GOTO(out_pool, rc);
885         }
886         }
887
888         if (rc) {
889                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
890                        rc, lcfg->lcfg_command, fsname, poolname);
891                 GOTO(out_pool, rc);
892         }
893
894         /* request for update */
895         mgs_revoke_lock(obd, fsname, &lockh);
896
897 out_pool:
898         if (lcfg != NULL)
899                 OBD_FREE(lcfg, data->ioc_plen1);
900
901         if (fsname != NULL)
902                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
903
904         if (poolname != NULL)
905                 OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
906
907         RETURN(rc);
908 }
909
910 /* from mdt_iocontrol */
911 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
912                   void *karg, void *uarg)
913 {
914         struct obd_device *obd = exp->exp_obd;
915         struct obd_ioctl_data *data = karg;
916         struct lvfs_run_ctxt saved;
917         int rc = 0;
918
919         ENTRY;
920         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
921
922         switch (cmd) {
923
924         case OBD_IOC_PARAM: {
925                 struct lustre_handle lockh;
926                 struct lustre_cfg *lcfg;
927                 struct llog_rec_hdr rec;
928                 char fsname[MTI_NAME_MAXLEN];
929
930                 rec.lrh_len = llog_data_len(data->ioc_plen1);
931
932                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
933                         rec.lrh_type = OBD_CFG_REC;
934                 } else {
935                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
936                         RETURN(-EINVAL);
937                 }
938
939                 OBD_ALLOC(lcfg, data->ioc_plen1);
940                 if (lcfg == NULL)
941                         RETURN(-ENOMEM);
942                 if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
943                         GOTO(out_free, rc = -EFAULT);
944
945                 if (lcfg->lcfg_bufcount < 1)
946                         GOTO(out_free, rc = -EINVAL);
947
948                 rc = mgs_setparam(obd, lcfg, fsname);
949                 if (rc) {
950                         CERROR("setparam err %d\n", rc);
951                         GOTO(out_free, rc);
952                 }
953
954                 /* Revoke lock so everyone updates.  Should be alright if
955                    someone was already reading while we were updating the logs,
956                    so we don't really need to hold the lock while we're
957                    writing (above). */
958                 mgs_revoke_lock(obd, fsname, &lockh);
959
960 out_free:
961                 OBD_FREE(lcfg, data->ioc_plen1);
962                 RETURN(rc);
963         }
964
965         case OBD_IOC_POOL: {
966                 RETURN(mgs_iocontrol_pool(obd, data));
967         }
968
969         case OBD_IOC_DUMP_LOG: {
970                 struct llog_ctxt *ctxt;
971                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
972                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
973                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
974                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
975                 llog_ctxt_put(ctxt);
976
977                 RETURN(rc);
978         }
979
980         case OBD_IOC_LLOG_CHECK:
981         case OBD_IOC_LLOG_INFO:
982         case OBD_IOC_LLOG_PRINT: {
983                 struct llog_ctxt *ctxt;
984                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
985
986                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
987                 rc = llog_ioctl(ctxt, cmd, data);
988                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
989                 llog_ctxt_put(ctxt);
990
991                 RETURN(rc);
992         }
993
994         default:
995                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
996                 RETURN(-EINVAL);
997         }
998         RETURN(0);
999 }
1000
1001 /* use obd ops to offer management infrastructure */
1002 static struct obd_ops mgs_obd_ops = {
1003         .o_owner           = THIS_MODULE,
1004         .o_connect         = mgs_connect,
1005         .o_reconnect       = mgs_reconnect,
1006         .o_disconnect      = mgs_disconnect,
1007         .o_setup           = mgs_setup,
1008         .o_precleanup      = mgs_precleanup,
1009         .o_cleanup         = mgs_cleanup,
1010         .o_init_export     = mgs_init_export,
1011         .o_destroy_export  = mgs_destroy_export,
1012         .o_iocontrol       = mgs_iocontrol,
1013         .o_llog_init       = mgs_llog_init,
1014         .o_llog_finish     = mgs_llog_finish
1015 };
1016
1017 static int __init mgs_init(void)
1018 {
1019         struct lprocfs_static_vars lvars;
1020
1021         lprocfs_mgs_init_vars(&lvars);
1022         class_register_type(&mgs_obd_ops, NULL,
1023                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
1024
1025         return 0;
1026 }
1027
1028 static void /*__exit*/ mgs_exit(void)
1029 {
1030         class_unregister_type(LUSTRE_MGS_NAME);
1031 }
1032
1033 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1034 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
1035 MODULE_LICENSE("GPL");
1036
1037 module_init(mgs_init);
1038 module_exit(mgs_exit);