Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_MGS
45 #define D_MGS D_CONFIG/*|D_WARNING*/
46
47 #ifdef __KERNEL__
48 # include <linux/module.h>
49 # include <linux/pagemap.h>
50 # include <linux/miscdevice.h>
51 # include <linux/init.h>
52 #else
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_dlm.h>
58 #include <lprocfs_status.h>
59 #include <lustre_fsfilt.h>
60 #include <lustre_disk.h>
61 #include "mgs_internal.h"
62
63
64 /* Establish a connection to the MGS.*/
65 static int mgs_connect(const struct lu_env *env,
66                        struct lustre_handle *conn, struct obd_device *obd,
67                        struct obd_uuid *cluuid, struct obd_connect_data *data,
68                        void *localdata)
69 {
70         struct obd_export *exp;
71         int rc;
72         ENTRY;
73
74         if (!conn || !obd || !cluuid)
75                 RETURN(-EINVAL);
76
77         rc = class_connect(conn, obd, cluuid);
78         if (rc)
79                 RETURN(rc);
80         exp = class_conn2export(conn);
81         LASSERT(exp);
82
83         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
84
85         if (data != NULL) {
86                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
87                 exp->exp_connect_flags = data->ocd_connect_flags;
88                 data->ocd_version = LUSTRE_VERSION_CODE;
89         }
90
91         rc = mgs_client_add(obd, exp, localdata);
92
93         if (rc) {
94                 class_disconnect(exp);
95         } else {
96                 class_export_put(exp);
97         }
98
99         RETURN(rc);
100 }
101
102 static int mgs_reconnect(const struct lu_env *env,
103                          struct obd_export *exp, struct obd_device *obd,
104                          struct obd_uuid *cluuid, struct obd_connect_data *data,
105                          void *localdata)
106 {
107         ENTRY;
108
109         if (exp == NULL || obd == NULL || cluuid == NULL)
110                 RETURN(-EINVAL);
111
112         mgs_counter_incr(exp, LPROC_MGS_CONNECT);
113
114         if (data != NULL) {
115                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
116                 exp->exp_connect_flags = data->ocd_connect_flags;
117                 data->ocd_version = LUSTRE_VERSION_CODE;
118         }
119
120         RETURN(0);
121 }
122
123 static int mgs_disconnect(struct obd_export *exp)
124 {
125         int rc;
126         ENTRY;
127
128         LASSERT(exp);
129
130         class_export_get(exp);
131         mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
132
133         /* Disconnect early so that clients can't keep using export */
134         rc = class_disconnect(exp);
135         ldlm_cancel_locks_for_export(exp);
136
137         lprocfs_exp_cleanup(exp);
138
139         /* complete all outstanding replies */
140         spin_lock(&exp->exp_lock);
141         while (!list_empty(&exp->exp_outstanding_replies)) {
142                 struct ptlrpc_reply_state *rs =
143                         list_entry(exp->exp_outstanding_replies.next,
144                                    struct ptlrpc_reply_state, rs_exp_list);
145                 struct ptlrpc_service *svc = rs->rs_service;
146
147                 spin_lock(&svc->srv_lock);
148                 list_del_init(&rs->rs_exp_list);
149                 ptlrpc_schedule_difficult_reply(rs);
150                 spin_unlock(&svc->srv_lock);
151         }
152         spin_unlock(&exp->exp_lock);
153
154         class_export_put(exp);
155         RETURN(rc);
156 }
157
158 static int mgs_cleanup(struct obd_device *obd);
159 static int mgs_handle(struct ptlrpc_request *req);
160
161 static int mgs_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
162                          struct obd_device *tgt, int count,
163                          struct llog_catid *logid, struct obd_uuid *uuid)
164 {
165         int rc;
166         ENTRY;
167
168         LASSERT(olg == &obd->obd_olg);
169         rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
170                         &llog_lvfs_ops);
171         RETURN(rc);
172 }
173
174 static int mgs_llog_finish(struct obd_device *obd, int count)
175 {
176         struct llog_ctxt *ctxt;
177         int rc = 0;
178         ENTRY;
179
180         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
181         if (ctxt)
182                 rc = llog_cleanup(ctxt);
183
184         RETURN(rc);
185 }
186
187 /* Start the MGS obd */
188 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
189 {
190         struct lprocfs_static_vars lvars;
191         struct mgs_obd *mgs = &obd->u.mgs;
192         struct lustre_mount_info *lmi;
193         struct lustre_sb_info *lsi;
194         struct vfsmount *mnt;
195         int rc = 0;
196         ENTRY;
197
198         CDEBUG(D_CONFIG, "Starting MGS\n");
199
200         /* Find our disk */
201         lmi = server_get_mount(obd->obd_name);
202         if (!lmi)
203                 RETURN(rc = -EINVAL);
204
205         mnt = lmi->lmi_mnt;
206         lsi = s2lsi(lmi->lmi_sb);
207         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
208         if (IS_ERR(obd->obd_fsops))
209                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
210
211         /* namespace for mgs llog */
212         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
213                                                 LDLM_NAMESPACE_MODEST);
214         if (obd->obd_namespace == NULL)
215                 GOTO(err_ops, rc = -ENOMEM);
216
217         /* ldlm setup */
218         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
219                            "mgs_ldlm_client", &obd->obd_ldlm_client);
220
221         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
222
223         rc = mgs_fs_setup(obd, mnt);
224         if (rc) {
225                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
226                        obd->obd_name, rc);
227                 GOTO(err_ns, rc);
228         }
229
230         rc = obd_llog_init(obd, &obd->obd_olg, obd, 0, NULL, NULL);
231         if (rc)
232                 GOTO(err_fs, rc);
233
234         /* No recovery for MGC's */
235         obd->obd_replayable = 0;
236
237         /* Internal mgs setup */
238         mgs_init_fsdb_list(obd);
239         sema_init(&mgs->mgs_sem, 1);
240
241         /* Start the service threads */
242         mgs->mgs_service =
243                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
244                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
245                                 MGC_REPLY_PORTAL, 2000,
246                                 mgs_handle, LUSTRE_MGS_NAME,
247                                 obd->obd_proc_entry, target_print_req,
248                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
249                                 "ll_mgs", LCT_MD_THREAD, NULL);
250
251         if (!mgs->mgs_service) {
252                 CERROR("failed to start service\n");
253                 GOTO(err_llog, rc = -ENOMEM);
254         }
255
256         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
257         if (rc)
258                 GOTO(err_thread, rc);
259
260         /* Setup proc */
261         lprocfs_mgs_init_vars(&lvars);
262         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
263                 lproc_mgs_setup(obd);
264         }
265
266         ping_evictor_start();
267
268         LCONSOLE_INFO("MGS %s started\n", obd->obd_name);
269
270         RETURN(0);
271
272 err_thread:
273         ptlrpc_unregister_service(mgs->mgs_service);
274 err_llog:
275         obd_llog_finish(obd, 0);
276 err_fs:
277         /* No extra cleanup needed for llog_init_commit_thread() */
278         mgs_fs_cleanup(obd);
279 err_ns:
280         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
281         obd->obd_namespace = NULL;
282 err_ops:
283         fsfilt_put_ops(obd->obd_fsops);
284 err_put:
285         server_put_mount(obd->obd_name, mnt);
286         mgs->mgs_sb = 0;
287         return rc;
288 }
289
290 static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
291 {
292         int rc = 0;
293         ENTRY;
294
295         switch (stage) {
296         case OBD_CLEANUP_EARLY:
297         case OBD_CLEANUP_EXPORTS:
298                 rc = obd_llog_finish(obd, 0);
299                 break;
300         }
301         RETURN(rc);
302 }
303
304 /**
305  * Performs cleanup procedures for passed \a obd given it is mgs obd.
306  */
307 static int mgs_cleanup(struct obd_device *obd)
308 {
309         struct mgs_obd *mgs = &obd->u.mgs;
310         ENTRY;
311
312         if (mgs->mgs_sb == NULL)
313                 RETURN(0);
314
315         ping_evictor_stop();
316
317         ptlrpc_unregister_service(mgs->mgs_service);
318
319         mgs_cleanup_fsdb_list(obd);
320         lproc_mgs_cleanup(obd);
321         mgs_fs_cleanup(obd);
322
323         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
324         mgs->mgs_sb = NULL;
325
326         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
327         obd->obd_namespace = NULL;
328
329         fsfilt_put_ops(obd->obd_fsops);
330
331         LCONSOLE_INFO("%s has stopped.\n", obd->obd_name);
332         RETURN(0);
333 }
334
335 /* similar to filter_prepare_destroy */
336 static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
337                             struct lustre_handle *lockh)
338 {
339         struct ldlm_res_id res_id;
340         int rc, flags = 0;
341         ENTRY;
342
343         rc = mgc_fsname2resid(fsname, &res_id);
344         if (!rc)
345                 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
346                                             LDLM_PLAIN, NULL, LCK_EX,
347                                             &flags, ldlm_blocking_ast,
348                                             ldlm_completion_ast, NULL,
349                                             fsname, 0, NULL, NULL, lockh);
350         if (rc)
351                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
352
353         RETURN(rc);
354 }
355
356 static int mgs_put_cfg_lock(struct lustre_handle *lockh)
357 {
358         ENTRY;
359         ldlm_lock_decref(lockh, LCK_EX);
360         RETURN(0);
361 }
362
363 static void mgs_revoke_lock(struct obd_device *obd, char *fsname,
364                             struct lustre_handle *lockh)
365 {
366         int lockrc;
367
368         if (fsname[0]) {
369                 lockrc = mgs_get_cfg_lock(obd, fsname, lockh);
370                 if (lockrc != ELDLM_OK)
371                         CERROR("lock error %d for fs %s\n", lockrc,
372                                fsname);
373                 else
374                         mgs_put_cfg_lock(lockh);
375         }
376 }
377
378 /* rc=0 means ok
379       1 means update
380      <0 means error */
381 static int mgs_check_target(struct obd_device *obd, struct mgs_target_info *mti)
382 {
383         int rc;
384         ENTRY;
385
386         rc = mgs_check_index(obd, mti);
387         if (rc == 0) {
388                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
389                                    "this MGS does not know about it, preventing "
390                                    "registration.\n", mti->mti_svname);
391                 rc = -ENOENT;
392         } else if (rc == -1) {
393                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
394                                    "disappeared! Regenerating all logs.\n",
395                                    mti->mti_fsname);
396                 mti->mti_flags |= LDD_F_WRITECONF;
397                 rc = 1;
398         } else {
399                 /* Index is correctly marked as used */
400
401                 /* If the logs don't contain the mti_nids then add
402                    them as failover nids */
403                 rc = mgs_check_failnid(obd, mti);
404         }
405
406         RETURN(rc);
407 }
408
409 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
410 static int mgs_handle_target_reg(struct ptlrpc_request *req)
411 {
412         struct obd_device *obd = req->rq_export->exp_obd;
413         struct lustre_handle lockh;
414         struct mgs_target_info *mti, *rep_mti;
415         int rc = 0, lockrc;
416         ENTRY;
417
418         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
419
420         mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
421         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
422                                 LDD_F_UPDATE))) {
423                 /* We're just here as a startup ping. */
424                 CDEBUG(D_MGS, "Server %s is running on %s\n",
425                        mti->mti_svname, obd_export_nid2str(req->rq_export));
426                 rc = mgs_check_target(obd, mti);
427                 /* above will set appropriate mti flags */
428                 if (rc <= 0)
429                         /* Nothing wrong, or fatal error */
430                         GOTO(out_nolock, rc);
431         }
432
433         /* Revoke the config lock to make sure nobody is reading. */
434         /* Although actually I think it should be alright if
435            someone was reading while we were updating the logs - if we
436            revoke at the end they will just update from where they left off. */
437         lockrc = mgs_get_cfg_lock(obd, mti->mti_fsname, &lockh);
438         if (lockrc != ELDLM_OK) {
439                 LCONSOLE_ERROR_MSG(0x13d, "%s: Can't signal other nodes to "
440                                    "update their configuration (%d). Updating "
441                                    "local logs anyhow; you might have to "
442                                    "manually restart other nodes to get the "
443                                    "latest configuration.\n",
444                                    obd->obd_name, lockrc);
445         }
446
447         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
448
449         /* Log writing contention is handled by the fsdb_sem */
450
451         if (mti->mti_flags & LDD_F_WRITECONF) {
452                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
453                     mti->mti_stripe_index == 0) {
454                         rc = mgs_erase_logs(obd, mti->mti_fsname);
455                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
456                                       "request.  All servers must be restarted "
457                                       "in order to regenerate the logs."
458                                       "\n", obd->obd_name, mti->mti_fsname);
459                 } else if (mti->mti_flags &
460                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
461                         rc = mgs_erase_log(obd, mti->mti_svname);
462                         LCONSOLE_WARN("%s: Regenerating %s log by user "
463                                       "request.\n",
464                                       obd->obd_name, mti->mti_svname);
465                 }
466                 mti->mti_flags |= LDD_F_UPDATE;
467                 /* Erased logs means start from scratch. */
468                 mti->mti_flags &= ~LDD_F_UPGRADE14;
469         }
470
471         /* COMPAT_146 */
472         if (mti->mti_flags & LDD_F_UPGRADE14) {
473                 rc = mgs_upgrade_sv_14(obd, mti);
474                 if (rc) {
475                         CERROR("Can't upgrade from 1.4 (%d)\n", rc);
476                         GOTO(out, rc);
477                 }
478
479                 /* We're good to go */
480                 mti->mti_flags |= LDD_F_UPDATE;
481         }
482         /* end COMPAT_146 */
483
484         if (mti->mti_flags & LDD_F_UPDATE) {
485                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
486                        mti->mti_stripe_index);
487
488                 /* create or update the target log
489                    and update the client/mdt logs */
490                 rc = mgs_write_log_target(obd, mti);
491                 if (rc) {
492                         CERROR("Failed to write %s log (%d)\n",
493                                mti->mti_svname, rc);
494                         GOTO(out, rc);
495                 }
496
497                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
498                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
499                                     LDD_F_UPGRADE14);
500                 mti->mti_flags |= LDD_F_REWRITE_LDD;
501         }
502
503 out:
504         /* done with log update */
505         if (lockrc == ELDLM_OK)
506                 mgs_put_cfg_lock(&lockh);
507 out_nolock:
508         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
509                mti->mti_stripe_index, rc);
510         rc = req_capsule_server_pack(&req->rq_pill);
511         if (rc)
512                 RETURN(rc);
513
514         /* send back the whole mti in the reply */
515         rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
516         *rep_mti = *mti;
517
518         /* Flush logs to disk */
519         fsfilt_sync(obd, obd->u.mgs.mgs_sb);
520         RETURN(rc);
521 }
522
523 static int mgs_set_info_rpc(struct ptlrpc_request *req)
524 {
525         struct obd_device *obd = req->rq_export->exp_obd;
526         struct mgs_send_param *msp, *rep_msp;
527         struct lustre_handle lockh;
528         int rc;
529         struct lustre_cfg_bufs bufs;
530         struct lustre_cfg *lcfg;
531         char fsname[MTI_NAME_MAXLEN];
532         ENTRY;
533
534         msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
535         LASSERT(msp);
536
537         /* Construct lustre_cfg structure to pass to function mgs_setparam */
538         lustre_cfg_bufs_reset(&bufs, NULL);
539         lustre_cfg_bufs_set_string(&bufs, 1, msp->mgs_param);
540         lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
541         rc = mgs_setparam(obd, lcfg, fsname);
542         if (rc) {
543                 CERROR("Error %d in setting the parameter %s for fs %s\n",
544                        rc, msp->mgs_param, fsname);
545                 RETURN(rc);
546         }
547
548         /* request for update */
549         mgs_revoke_lock(obd, fsname, &lockh);
550
551         lustre_cfg_free(lcfg);
552
553         rc = req_capsule_server_pack(&req->rq_pill);
554         if (rc == 0) {
555                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
556                 rep_msp = msp;
557         }
558         RETURN(rc);
559 }
560
561 /*
562  * similar as in ost_connect_check_sptlrpc()
563  */
564 static int mgs_connect_check_sptlrpc(struct ptlrpc_request *req)
565 {
566         struct obd_export     *exp = req->rq_export;
567         struct obd_device     *obd = exp->exp_obd;
568         struct fs_db          *fsdb;
569         struct sptlrpc_flavor  flvr;
570         int                    rc = 0;
571
572         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
573                 rc = mgs_find_or_make_fsdb(obd, MGSSELF_NAME, &fsdb);
574                 if (rc)
575                         return rc;
576
577                 down(&fsdb->fsdb_sem);
578                 if (sptlrpc_rule_set_choose(&fsdb->fsdb_srpc_gen,
579                                             LUSTRE_SP_MGC, LUSTRE_SP_MGS,
580                                             req->rq_peer.nid,
581                                             &flvr) == 0) {
582                         /* by defualt allow any flavors */
583                         flvr.sf_rpc = SPTLRPC_FLVR_ANY;
584                 }
585                 up(&fsdb->fsdb_sem);
586
587                 spin_lock(&exp->exp_lock);
588
589                 exp->exp_sp_peer = req->rq_sp_from;
590                 exp->exp_flvr = flvr;
591
592                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
593                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
594                         CERROR("invalid rpc flavor %x, expect %x, from %s\n",
595                                req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
596                                libcfs_nid2str(req->rq_peer.nid));
597                         rc = -EACCES;
598                 }
599
600                 spin_unlock(&exp->exp_lock);
601         } else {
602                 if (exp->exp_sp_peer != req->rq_sp_from) {
603                         CERROR("RPC source %s doesn't match %s\n",
604                                sptlrpc_part2name(req->rq_sp_from),
605                                sptlrpc_part2name(exp->exp_sp_peer));
606                         rc = -EACCES;
607                 } else {
608                         rc = sptlrpc_target_export_check(exp, req);
609                 }
610         }
611
612         return rc;
613 }
614
615 /* Called whenever a target cleans up. */
616 /* XXX - Currently unused */
617 static int mgs_handle_target_del(struct ptlrpc_request *req)
618 {
619         ENTRY;
620         mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
621         RETURN(0);
622 }
623
624 /* XXX - Currently unused */
625 static int mgs_handle_exception(struct ptlrpc_request *req)
626 {
627         ENTRY;
628         mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
629         RETURN(0);
630 }
631
632 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
633 int mgs_handle(struct ptlrpc_request *req)
634 {
635         int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
636         int opc, rc = 0;
637         ENTRY;
638
639         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
640         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
641         if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
642                 RETURN(0);
643
644         LASSERT(current->journal_info == NULL);
645         opc = lustre_msg_get_opc(req->rq_reqmsg);
646
647         if (opc == SEC_CTX_INIT ||
648             opc == SEC_CTX_INIT_CONT ||
649             opc == SEC_CTX_FINI)
650                 GOTO(out, rc = 0);
651
652         if (opc != MGS_CONNECT) {
653                 if (req->rq_export == NULL) {
654                         CERROR("lustre_mgs: operation %d on unconnected MGS\n",
655                                opc);
656                         req->rq_status = -ENOTCONN;
657                         GOTO(out, rc = -ENOTCONN);
658                 }
659         }
660
661         switch (opc) {
662         case MGS_CONNECT:
663                 DEBUG_REQ(D_MGS, req, "connect");
664                 /* MGS and MDS have same request format for connect */
665                 req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
666                 rc = target_handle_connect(req);
667                 if (rc == 0)
668                         rc = mgs_connect_check_sptlrpc(req);
669
670                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
671                         /* Make clients trying to reconnect after a MGS restart
672                            happy; also requires obd_replayable */
673                         lustre_msg_add_op_flags(req->rq_repmsg,
674                                                 MSG_CONNECT_RECONNECT);
675                 break;
676         case MGS_DISCONNECT:
677                 DEBUG_REQ(D_MGS, req, "disconnect");
678                 /* MGS and MDS have same request format for disconnect */
679                 req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
680                 rc = target_handle_disconnect(req);
681                 req->rq_status = rc;            /* superfluous? */
682                 break;
683         case MGS_EXCEPTION:
684                 DEBUG_REQ(D_MGS, req, "exception");
685                 rc = mgs_handle_exception(req);
686                 break;
687         case MGS_TARGET_REG:
688                 DEBUG_REQ(D_MGS, req, "target add");
689                 req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
690                 rc = mgs_handle_target_reg(req);
691                 break;
692         case MGS_TARGET_DEL:
693                 DEBUG_REQ(D_MGS, req, "target del");
694                 rc = mgs_handle_target_del(req);
695                 break;
696         case MGS_SET_INFO:
697                 DEBUG_REQ(D_MGS, req, "set_info");
698                 req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
699                 rc = mgs_set_info_rpc(req);
700                 break;
701
702         case LDLM_ENQUEUE:
703                 DEBUG_REQ(D_MGS, req, "enqueue");
704                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
705                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
706                                          ldlm_server_blocking_ast, NULL);
707                 break;
708         case LDLM_BL_CALLBACK:
709         case LDLM_CP_CALLBACK:
710                 DEBUG_REQ(D_MGS, req, "callback");
711                 CERROR("callbacks should not happen on MGS\n");
712                 LBUG();
713                 break;
714
715         case OBD_PING:
716                 DEBUG_REQ(D_INFO, req, "ping");
717                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
718                 rc = target_handle_ping(req);
719                 break;
720         case OBD_LOG_CANCEL:
721                 DEBUG_REQ(D_MGS, req, "log cancel");
722                 rc = -ENOTSUPP; /* la la la */
723                 break;
724
725         case LLOG_ORIGIN_HANDLE_CREATE:
726                 DEBUG_REQ(D_MGS, req, "llog_init");
727                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
728                 rc = llog_origin_handle_create(req);
729                 break;
730         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
731                 DEBUG_REQ(D_MGS, req, "llog next block");
732                 req_capsule_set(&req->rq_pill,
733                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
734                 rc = llog_origin_handle_next_block(req);
735                 break;
736         case LLOG_ORIGIN_HANDLE_READ_HEADER:
737                 DEBUG_REQ(D_MGS, req, "llog read header");
738                 req_capsule_set(&req->rq_pill,
739                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
740                 rc = llog_origin_handle_read_header(req);
741                 break;
742         case LLOG_ORIGIN_HANDLE_CLOSE:
743                 DEBUG_REQ(D_MGS, req, "llog close");
744                 rc = llog_origin_handle_close(req);
745                 break;
746         case LLOG_CATINFO:
747                 DEBUG_REQ(D_MGS, req, "llog catinfo");
748                 req_capsule_set(&req->rq_pill, &RQF_LLOG_CATINFO);
749                 rc = llog_catinfo(req);
750                 break;
751         default:
752                 req->rq_status = -ENOTSUPP;
753                 rc = ptlrpc_error(req);
754                 RETURN(rc);
755         }
756
757         LASSERT(current->journal_info == NULL);
758
759         if (rc)
760                 CERROR("MGS handle cmd=%d rc=%d\n", opc, rc);
761
762 out:
763         target_send_reply(req, rc, fail);
764         RETURN(0);
765 }
766
767 static inline int mgs_init_export(struct obd_export *exp)
768 {
769         return ldlm_init_export(exp);
770 }
771
772 static inline int mgs_destroy_export(struct obd_export *exp)
773 {
774         ENTRY;
775
776         target_destroy_export(exp);
777         mgs_client_free(exp);
778         ldlm_destroy_export(exp);
779
780         RETURN(0);
781 }
782
783 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
784 {
785         char *ptr;
786
787         ENTRY;
788         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
789                 *fsname = *ptr;
790                 fsname++;
791         }
792         if (*ptr == '\0')
793                 return -EINVAL;
794         *fsname = '\0';
795         ptr++;
796         strcpy(poolname, ptr);
797
798         RETURN(0);
799 }
800
801 static int mgs_iocontrol_pool(struct obd_device *obd,
802                               struct obd_ioctl_data *data)
803 {
804         int rc;
805         struct lustre_handle lockh;
806         struct lustre_cfg *lcfg = NULL;
807         struct llog_rec_hdr rec;
808         char *fsname = NULL;
809         char *poolname = NULL;
810         ENTRY;
811
812         OBD_ALLOC(fsname, MTI_NAME_MAXLEN);
813         if (fsname == NULL)
814                 RETURN(-ENOMEM);
815
816         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
817         if (poolname == NULL) {
818                 rc = -ENOMEM;
819                 GOTO(out_pool, rc);
820         }
821         rec.lrh_len = llog_data_len(data->ioc_plen1);
822
823         if (data->ioc_type == LUSTRE_CFG_TYPE) {
824                 rec.lrh_type = OBD_CFG_REC;
825         } else {
826                 CERROR("unknown cfg record type:%d \n", data->ioc_type);
827                 rc = -EINVAL;
828                 GOTO(out_pool, rc);
829         }
830
831         if (data->ioc_plen1 > CFS_PAGE_SIZE) {
832                 rc = -E2BIG;
833                 GOTO(out_pool, rc);
834         }
835
836         OBD_ALLOC(lcfg, data->ioc_plen1);
837         if (lcfg == NULL)
838                 GOTO(out_pool, rc = -ENOMEM);
839
840         if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
841                 GOTO(out_pool, rc = -EFAULT);
842
843         if (lcfg->lcfg_bufcount < 2) {
844                 GOTO(out_pool, rc = -EFAULT);
845         }
846
847         /* first arg is always <fsname>.<poolname> */
848         mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), fsname,
849                             poolname);
850
851         switch (lcfg->lcfg_command) {
852         case LCFG_POOL_NEW: {
853                 if (lcfg->lcfg_bufcount != 2)
854                         RETURN(-EINVAL);
855                 rc = mgs_pool_cmd(obd, LCFG_POOL_NEW, fsname,
856                                   poolname, NULL);
857                 break;
858         }
859         case LCFG_POOL_ADD: {
860                 if (lcfg->lcfg_bufcount != 3)
861                         RETURN(-EINVAL);
862                 rc = mgs_pool_cmd(obd, LCFG_POOL_ADD, fsname, poolname,
863                                   lustre_cfg_string(lcfg, 2));
864                 break;
865         }
866         case LCFG_POOL_REM: {
867                 if (lcfg->lcfg_bufcount != 3)
868                         RETURN(-EINVAL);
869                 rc = mgs_pool_cmd(obd, LCFG_POOL_REM, fsname, poolname,
870                                   lustre_cfg_string(lcfg, 2));
871                 break;
872         }
873         case LCFG_POOL_DEL: {
874                 if (lcfg->lcfg_bufcount != 2)
875                         RETURN(-EINVAL);
876                 rc = mgs_pool_cmd(obd, LCFG_POOL_DEL, fsname,
877                                   poolname, NULL);
878                 break;
879         }
880         default: {
881                  rc = -EINVAL;
882                  GOTO(out_pool, rc);
883         }
884         }
885
886         if (rc) {
887                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
888                        rc, lcfg->lcfg_command, fsname, poolname);
889                 GOTO(out_pool, rc);
890         }
891
892         /* request for update */
893         mgs_revoke_lock(obd, fsname, &lockh);
894
895 out_pool:
896         if (lcfg != NULL)
897                 OBD_FREE(lcfg, data->ioc_plen1);
898
899         if (fsname != NULL)
900                 OBD_FREE(fsname, MTI_NAME_MAXLEN);
901
902         if (poolname != NULL)
903                 OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
904
905         RETURN(rc);
906 }
907
908 /* from mdt_iocontrol */
909 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
910                   void *karg, void *uarg)
911 {
912         struct obd_device *obd = exp->exp_obd;
913         struct obd_ioctl_data *data = karg;
914         struct lvfs_run_ctxt saved;
915         int rc = 0;
916
917         ENTRY;
918         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
919
920         switch (cmd) {
921
922         case OBD_IOC_PARAM: {
923                 struct lustre_handle lockh;
924                 struct lustre_cfg *lcfg;
925                 struct llog_rec_hdr rec;
926                 char fsname[MTI_NAME_MAXLEN];
927
928                 rec.lrh_len = llog_data_len(data->ioc_plen1);
929
930                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
931                         rec.lrh_type = OBD_CFG_REC;
932                 } else {
933                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
934                         RETURN(-EINVAL);
935                 }
936
937                 OBD_ALLOC(lcfg, data->ioc_plen1);
938                 if (lcfg == NULL)
939                         RETURN(-ENOMEM);
940                 if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
941                         GOTO(out_free, rc = -EFAULT);
942
943                 if (lcfg->lcfg_bufcount < 1)
944                         GOTO(out_free, rc = -EINVAL);
945
946                 rc = mgs_setparam(obd, lcfg, fsname);
947                 if (rc) {
948                         CERROR("setparam err %d\n", rc);
949                         GOTO(out_free, rc);
950                 }
951
952                 /* Revoke lock so everyone updates.  Should be alright if
953                    someone was already reading while we were updating the logs,
954                    so we don't really need to hold the lock while we're
955                    writing (above). */
956                 mgs_revoke_lock(obd, fsname, &lockh);
957
958 out_free:
959                 OBD_FREE(lcfg, data->ioc_plen1);
960                 RETURN(rc);
961         }
962
963         case OBD_IOC_POOL: {
964                 RETURN(mgs_iocontrol_pool(obd, data));
965         }
966
967         case OBD_IOC_DUMP_LOG: {
968                 struct llog_ctxt *ctxt;
969                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
970                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
971                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
972                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
973                 llog_ctxt_put(ctxt);
974
975                 RETURN(rc);
976         }
977
978         case OBD_IOC_LLOG_CHECK:
979         case OBD_IOC_LLOG_INFO:
980         case OBD_IOC_LLOG_PRINT: {
981                 struct llog_ctxt *ctxt;
982                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
983
984                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
985                 rc = llog_ioctl(ctxt, cmd, data);
986                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
987                 llog_ctxt_put(ctxt);
988
989                 RETURN(rc);
990         }
991
992         default:
993                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
994                 RETURN(-EINVAL);
995         }
996         RETURN(0);
997 }
998
999 /* use obd ops to offer management infrastructure */
1000 static struct obd_ops mgs_obd_ops = {
1001         .o_owner           = THIS_MODULE,
1002         .o_connect         = mgs_connect,
1003         .o_reconnect       = mgs_reconnect,
1004         .o_disconnect      = mgs_disconnect,
1005         .o_setup           = mgs_setup,
1006         .o_precleanup      = mgs_precleanup,
1007         .o_cleanup         = mgs_cleanup,
1008         .o_init_export     = mgs_init_export,
1009         .o_destroy_export  = mgs_destroy_export,
1010         .o_iocontrol       = mgs_iocontrol,
1011         .o_llog_init       = mgs_llog_init,
1012         .o_llog_finish     = mgs_llog_finish
1013 };
1014
1015 static int __init mgs_init(void)
1016 {
1017         struct lprocfs_static_vars lvars;
1018
1019         lprocfs_mgs_init_vars(&lvars);
1020         class_register_type(&mgs_obd_ops, NULL,
1021                             lvars.module_vars, LUSTRE_MGS_NAME, NULL);
1022
1023         return 0;
1024 }
1025
1026 static void /*__exit*/ mgs_exit(void)
1027 {
1028         class_unregister_type(LUSTRE_MGS_NAME);
1029 }
1030
1031 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1032 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
1033 MODULE_LICENSE("GPL");
1034
1035 module_init(mgs_init);
1036 module_exit(mgs_exit);