Whamcloud - gitweb
LU-2145 server: use unified request handler for MGS
[fs/lustre-release.git] / lustre / mgs / mgs_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_handler.c
37  *
38  * Author: Nathan Rutman <nathan@clusterfs.com>
39  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
40  * Author: Mikhail Pershin <tappro@whamcloud.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MGS
44 #define D_MGS D_CONFIG
45
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
48 #include <lustre_param.h>
49
50 #include "mgs_internal.h"
51
52 /*
53  * Regular MGS handlers
54  */
55 static int mgs_connect(struct tgt_session_info *tsi)
56 {
57         struct ptlrpc_request   *req = tgt_ses_req(tsi);
58         int                      rc;
59
60         ENTRY;
61
62         rc = tgt_connect(tsi);
63         if (rc)
64                 RETURN(rc);
65
66         if (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1)
67                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
68
69         RETURN(0);
70 }
71
72 static int mgs_disconnect(struct tgt_session_info *tsi)
73 {
74         int rc;
75
76         ENTRY;
77
78         LASSERT(tsi->tsi_exp);
79
80         rc = tgt_disconnect(tsi);
81         if (rc)
82                 RETURN(err_serious(rc));
83         RETURN(0);
84 }
85
86 static int mgs_exception(struct tgt_session_info *tsi)
87 {
88         ENTRY;
89
90         tgt_counter_incr(tsi->tsi_exp, LPROC_MGS_EXCEPTION);
91
92         RETURN(0);
93 }
94
95 static int mgs_set_info(struct tgt_session_info *tsi)
96 {
97         struct mgs_thread_info  *mgi;
98         struct mgs_send_param   *msp, *rep_msp;
99         struct lustre_cfg       *lcfg;
100         int                      rc;
101
102         ENTRY;
103
104         mgi = mgs_env_info(tsi->tsi_env);
105         if (IS_ERR(mgi))
106                 RETURN(err_serious(PTR_ERR(mgi)));
107
108         msp = req_capsule_client_get(tsi->tsi_pill, &RMF_MGS_SEND_PARAM);
109         if (msp == NULL)
110                 RETURN(err_serious(-EFAULT));
111
112         /* Construct lustre_cfg structure to pass to function mgs_setparam */
113         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
114         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, msp->mgs_param);
115         lcfg = lustre_cfg_new(LCFG_PARAM, &mgi->mgi_bufs);
116
117         rc = mgs_setparam(tsi->tsi_env, exp2mgs_dev(tsi->tsi_exp), lcfg,
118                           mgi->mgi_fsname);
119         if (rc) {
120                 LCONSOLE_WARN("%s: Unable to set parameter %s for %s: %d\n",
121                               tgt_name(tsi->tsi_tgt), msp->mgs_param,
122                               mgi->mgi_fsname, rc);
123                 GOTO(out_cfg, rc);
124         }
125
126         /* send back the whole msp in the reply */
127         rep_msp = req_capsule_server_get(tsi->tsi_pill, &RMF_MGS_SEND_PARAM);
128         *rep_msp = *msp;
129         EXIT;
130 out_cfg:
131         lustre_cfg_free(lcfg);
132         return rc;
133 }
134
135 static int mgs_completion_ast_config(struct ldlm_lock *lock, __u64 flags,
136                                      void *cbdata)
137 {
138         ENTRY;
139
140         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
141                        LDLM_FL_BLOCK_CONV))) {
142                 struct fs_db *fsdb;
143
144                 /* l_ast_data is used as a marker to avoid cancel ldlm lock
145                  * twice. See LU-2317. */
146                 lock_res_and_lock(lock);
147                 fsdb = (struct fs_db *)lock->l_ast_data;
148                 lock->l_ast_data = NULL;
149                 unlock_res_and_lock(lock);
150
151                 if (fsdb != NULL) {
152                         struct lustre_handle lockh;
153
154                         /* clear the bit before lock put */
155                         clear_bit(FSDB_REVOKING_LOCK, &fsdb->fsdb_flags);
156
157                         ldlm_lock2handle(lock, &lockh);
158                         ldlm_lock_decref_and_cancel(&lockh, LCK_EX);
159                 }
160         }
161
162         RETURN(ldlm_completion_ast(lock, flags, cbdata));
163 }
164
165 static int mgs_completion_ast_ir(struct ldlm_lock *lock, __u64 flags,
166                                  void *cbdata)
167 {
168         ENTRY;
169
170         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
171                        LDLM_FL_BLOCK_CONV))) {
172                 struct fs_db *fsdb;
173
174                 /* l_ast_data is used as a marker to avoid cancel ldlm lock
175                  * twice. See LU-1259. */
176                 lock_res_and_lock(lock);
177                 fsdb = (struct fs_db *)lock->l_ast_data;
178                 lock->l_ast_data = NULL;
179                 unlock_res_and_lock(lock);
180
181                 if (fsdb != NULL) {
182                         struct lustre_handle lockh;
183
184                         mgs_ir_notify_complete(fsdb);
185
186                         ldlm_lock2handle(lock, &lockh);
187                         ldlm_lock_decref_and_cancel(&lockh, LCK_EX);
188                 }
189         }
190
191         RETURN(ldlm_completion_ast(lock, flags, cbdata));
192 }
193
194 void mgs_revoke_lock(struct mgs_device *mgs, struct fs_db *fsdb, int type)
195 {
196         ldlm_completion_callback cp = NULL;
197         struct lustre_handle     lockh = { 0 };
198         struct ldlm_res_id       res_id;
199         __u64 flags = LDLM_FL_ATOMIC_CB;
200         int rc;
201         ENTRY;
202
203         LASSERT(fsdb->fsdb_name[0] != '\0');
204         rc = mgc_fsname2resid(fsdb->fsdb_name, &res_id, type);
205         LASSERT(rc == 0);
206
207         switch (type) {
208         case CONFIG_T_CONFIG:
209                 cp = mgs_completion_ast_config;
210                 if (test_and_set_bit(FSDB_REVOKING_LOCK, &fsdb->fsdb_flags))
211                         rc = -EALREADY;
212                 break;
213         case CONFIG_T_RECOVER:
214                 cp = mgs_completion_ast_ir;
215         default:
216                 break;
217         }
218
219         if (!rc) {
220                 LASSERT(cp != NULL);
221                 rc = ldlm_cli_enqueue_local(mgs->mgs_obd->obd_namespace,
222                                             &res_id, LDLM_PLAIN, NULL, LCK_EX,
223                                             &flags, ldlm_blocking_ast, cp,
224                                             NULL, fsdb, 0, LVB_T_NONE, NULL,
225                                             &lockh);
226                 if (rc != ELDLM_OK) {
227                         CERROR("can't take cfg lock for "LPX64"/"LPX64"(%d)\n",
228                                le64_to_cpu(res_id.name[0]),
229                                le64_to_cpu(res_id.name[1]), rc);
230
231                         if (type == CONFIG_T_CONFIG)
232                                 clear_bit(FSDB_REVOKING_LOCK,
233                                               &fsdb->fsdb_flags);
234                 }
235                 /* lock has been cancelled in completion_ast. */
236         }
237
238         RETURN_EXIT;
239 }
240
241 /* rc=0 means ok
242       1 means update
243      <0 means error */
244 static int mgs_check_target(const struct lu_env *env,
245                             struct mgs_device *mgs,
246                             struct mgs_target_info *mti)
247 {
248         int rc;
249         ENTRY;
250
251         rc = mgs_check_index(env, mgs, mti);
252         if (rc == 0) {
253                 LCONSOLE_ERROR_MSG(0x13b, "%s claims to have registered, but "
254                                    "this MGS does not know about it, preventing "
255                                    "registration.\n", mti->mti_svname);
256                 rc = -ENOENT;
257         } else if (rc == -1) {
258                 LCONSOLE_ERROR_MSG(0x13c, "Client log %s-client has "
259                                    "disappeared! Regenerating all logs.\n",
260                                    mti->mti_fsname);
261                 mti->mti_flags |= LDD_F_WRITECONF;
262                 rc = 1;
263         } else {
264                 /* Index is correctly marked as used */
265
266                 /* If the logs don't contain the mti_nids then add
267                    them as failover nids */
268                 rc = mgs_check_failnid(env, mgs, mti);
269         }
270
271         RETURN(rc);
272 }
273
274 /* Ensure this is not a failover node that is connecting first*/
275 static int mgs_check_failover_reg(struct mgs_target_info *mti)
276 {
277         lnet_nid_t nid;
278         char *ptr;
279         int i;
280
281         ptr = mti->mti_params;
282         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
283                 while (class_parse_nid_quiet(ptr, &nid, &ptr) == 0) {
284                         for (i = 0; i < mti->mti_nid_count; i++) {
285                                 if (nid == mti->mti_nids[i]) {
286                                         LCONSOLE_WARN("Denying initial registra"
287                                                       "tion attempt from nid %s"
288                                                       ", specified as failover"
289                                                       "\n",libcfs_nid2str(nid));
290                                         return -EADDRNOTAVAIL;
291                                 }
292                         }
293                 }
294         }
295         return 0;
296 }
297
298 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
299 static int mgs_target_reg(struct tgt_session_info *tsi)
300 {
301         struct obd_device       *obd = tsi->tsi_exp->exp_obd;
302         struct mgs_device       *mgs = exp2mgs_dev(tsi->tsi_exp);
303         struct mgs_target_info  *mti, *rep_mti;
304         struct fs_db            *fsdb;
305         int                      opc;
306         int                      rc = 0;
307
308         ENTRY;
309
310         rc = lu_env_refill((struct lu_env *)tsi->tsi_env);
311         if (rc)
312                 return err_serious(rc);
313
314         tgt_counter_incr(tsi->tsi_exp, LPROC_MGS_TARGET_REG);
315
316         mti = req_capsule_client_get(tsi->tsi_pill, &RMF_MGS_TARGET_INFO);
317         if (mti == NULL) {
318                 DEBUG_REQ(D_HA, tgt_ses_req(tsi), "no mgs_send_param");
319                 RETURN(err_serious(-EFAULT));
320         }
321
322         if (OCD_HAS_FLAG(&tgt_ses_req(tsi)->rq_export->exp_connect_data,
323                          IMP_RECOV))
324                 opc = mti->mti_flags & LDD_F_OPC_MASK;
325         else
326                 opc = LDD_F_OPC_REG;
327
328         if (opc == LDD_F_OPC_READY) {
329                 CDEBUG(D_MGS, "fs: %s index: %d is ready to reconnect.\n",
330                        mti->mti_fsname, mti->mti_stripe_index);
331                 rc = mgs_ir_update(tsi->tsi_env, mgs, mti);
332                 if (rc) {
333                         LASSERT(!(mti->mti_flags & LDD_F_IR_CAPABLE));
334                         CERROR("Update IR return with %d(ignore and IR "
335                                "disabled)\n", rc);
336                 }
337                 GOTO(out_nolock, rc);
338         }
339
340         /* Do not support unregistering right now. */
341         if (opc != LDD_F_OPC_REG)
342                 GOTO(out_nolock, rc = -EINVAL);
343
344         CDEBUG(D_MGS, "fs: %s index: %d is registered to MGS.\n",
345                mti->mti_fsname, mti->mti_stripe_index);
346
347         if (mti->mti_flags & LDD_F_NEED_INDEX)
348                 mti->mti_flags |= LDD_F_WRITECONF;
349
350         if (!(mti->mti_flags & (LDD_F_WRITECONF | LDD_F_UPGRADE14 |
351                                 LDD_F_UPDATE))) {
352                 /* We're just here as a startup ping. */
353                 CDEBUG(D_MGS, "Server %s is running on %s\n",
354                        mti->mti_svname, obd_export_nid2str(tsi->tsi_exp));
355                 rc = mgs_check_target(tsi->tsi_env, mgs, mti);
356                 /* above will set appropriate mti flags */
357                 if (rc <= 0)
358                         /* Nothing wrong, or fatal error */
359                         GOTO(out_nolock, rc);
360         } else if (!(mti->mti_flags & LDD_F_NO_PRIMNODE)) {
361                 rc = mgs_check_failover_reg(mti);
362                 if (rc)
363                         GOTO(out_nolock, rc);
364         }
365
366         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
367
368         if (mti->mti_flags & LDD_F_WRITECONF) {
369                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
370                     mti->mti_stripe_index == 0) {
371                         rc = mgs_erase_logs(tsi->tsi_env, mgs,
372                                             mti->mti_fsname);
373                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
374                                       "request.  All servers must be restarted "
375                                       "in order to regenerate the logs."
376                                       "\n", obd->obd_name, mti->mti_fsname);
377                 } else if (mti->mti_flags &
378                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
379                         rc = mgs_erase_log(tsi->tsi_env, mgs, mti->mti_svname);
380                         LCONSOLE_WARN("%s: Regenerating %s log by user "
381                                       "request.\n",
382                                       obd->obd_name, mti->mti_svname);
383                 }
384                 mti->mti_flags |= LDD_F_UPDATE;
385                 /* Erased logs means start from scratch. */
386                 mti->mti_flags &= ~LDD_F_UPGRADE14;
387                 if (rc)
388                         GOTO(out_nolock, rc);
389         }
390
391         rc = mgs_find_or_make_fsdb(tsi->tsi_env, mgs, mti->mti_fsname, &fsdb);
392         if (rc) {
393                 CERROR("Can't get db for %s: %d\n", mti->mti_fsname, rc);
394                 GOTO(out_nolock, rc);
395         }
396
397         /*
398          * Log writing contention is handled by the fsdb_mutex.
399          *
400          * It should be alright if someone was reading while we were
401          * updating the logs - if we revoke at the end they will just update
402          * from where they left off.
403          */
404
405         if (mti->mti_flags & LDD_F_UPGRADE14) {
406                 CERROR("Can't upgrade from 1.4 (%d)\n", rc);
407                 GOTO(out, rc);
408         }
409
410         if (mti->mti_flags & LDD_F_UPDATE) {
411                 CDEBUG(D_MGS, "updating %s, index=%d\n", mti->mti_svname,
412                        mti->mti_stripe_index);
413
414                 /* create or update the target log
415                    and update the client/mdt logs */
416                 rc = mgs_write_log_target(tsi->tsi_env, mgs, mti, fsdb);
417                 if (rc) {
418                         CERROR("Failed to write %s log (%d)\n",
419                                mti->mti_svname, rc);
420                         GOTO(out, rc);
421                 }
422
423                 mti->mti_flags &= ~(LDD_F_VIRGIN | LDD_F_UPDATE |
424                                     LDD_F_NEED_INDEX | LDD_F_WRITECONF |
425                                     LDD_F_UPGRADE14);
426                 mti->mti_flags |= LDD_F_REWRITE_LDD;
427         }
428
429 out:
430         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
431
432 out_nolock:
433         CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
434                mti->mti_stripe_index, rc);
435          /* An error flag is set in the mti reply rather than an error code */
436         if (rc)
437                 mti->mti_flags |= LDD_F_ERROR;
438
439         /* send back the whole mti in the reply */
440         rep_mti = req_capsule_server_get(tsi->tsi_pill, &RMF_MGS_TARGET_INFO);
441         *rep_mti = *mti;
442
443         /* Flush logs to disk */
444         dt_sync(tsi->tsi_env, mgs->mgs_bottom);
445         RETURN(rc);
446 }
447
448 /* Called whenever a target cleans up. */
449 static int mgs_target_del(struct tgt_session_info *tsi)
450 {
451         ENTRY;
452
453         tgt_counter_incr(tsi->tsi_exp, LPROC_MGS_TARGET_DEL);
454
455         RETURN(0);
456 }
457
458 static int mgs_config_read(struct tgt_session_info *tsi)
459 {
460         struct ptlrpc_request   *req = tgt_ses_req(tsi);
461         struct mgs_config_body  *body;
462         int                      rc;
463
464         ENTRY;
465
466         body = req_capsule_client_get(tsi->tsi_pill, &RMF_MGS_CONFIG_BODY);
467         if (body == NULL) {
468                 DEBUG_REQ(D_HA, req, "no mgs_config_body");
469                 RETURN(err_serious(-EFAULT));
470         }
471
472         switch (body->mcb_type) {
473         case CONFIG_T_RECOVER:
474                 rc = mgs_get_ir_logs(req);
475                 break;
476         case CONFIG_T_CONFIG:
477                 rc = -EOPNOTSUPP;
478                 break;
479         default:
480                 rc = -EINVAL;
481                 break;
482         }
483
484         RETURN(rc);
485 }
486
487 static int mgs_llog_open(struct tgt_session_info *tsi)
488 {
489         struct mgs_thread_info  *mgi;
490         struct ptlrpc_request   *req = tgt_ses_req(tsi);
491         char                    *logname;
492         int                      rc;
493
494         ENTRY;
495
496         rc = tgt_llog_open(tsi);
497         if (rc)
498                 RETURN(rc);
499
500         /*
501          * For old clients there is no direct way of knowing which file system
502          * a client is operating at the MGS side. But we need to pick up those
503          * clients so that the MGS can mark the corresponding file system as
504          * non-IR capable because old clients are not ready to be notified.
505          *
506          * Therefore we attempt to detect the file systems name by hacking the
507          * llog operation which is currently used by the clients to fetch
508          * configuration logs. At present this is fine because this is the
509          * ONLY llog operation between mgc and the MGS.
510          *
511          * If extra llog operation are going to be added, this function needs
512          * further work.
513          *
514          * When releases prior than 2.0 are not supported, the following code
515          * can be removed.
516          */
517         mgi = mgs_env_info(tsi->tsi_env);
518         if (IS_ERR(mgi))
519                 RETURN(PTR_ERR(mgi));
520
521         logname = req_capsule_client_get(tsi->tsi_pill, &RMF_NAME);
522         if (logname) {
523                 char *ptr = strchr(logname, '-');
524                 int   len = (int)(ptr - logname);
525
526                 if (ptr == NULL || len >= sizeof(mgi->mgi_fsname)) {
527                         LCONSOLE_WARN("%s: non-config logname received: %s\n",
528                                       tgt_name(tsi->tsi_tgt), logname);
529                         /* not error, this can be llog test name */
530                 } else {
531                         strncpy(mgi->mgi_fsname, logname, len);
532                         mgi->mgi_fsname[len] = 0;
533
534                         rc = mgs_fsc_attach(tsi->tsi_env, tsi->tsi_exp,
535                                             mgi->mgi_fsname);
536                         if (rc && rc != -EEXIST) {
537                                 LCONSOLE_WARN("%s: Unable to add client %s "
538                                               "to file system %s: %d\n",
539                                               tgt_name(tsi->tsi_tgt),
540                                               libcfs_nid2str(req->rq_peer.nid),
541                                               mgi->mgi_fsname, rc);
542                         } else {
543                                 rc = 0;
544                         }
545                 }
546         } else {
547                 CERROR("%s: no logname in request\n", tgt_name(tsi->tsi_tgt));
548                 RETURN(-EINVAL);
549         }
550         RETURN(rc);
551 }
552
553 /*
554  * sec context handlers
555  */
556 /* XXX: Implement based on mdt_sec_ctx_handle()? */
557 static int mgs_sec_ctx_handle(struct tgt_session_info *tsi)
558 {
559         return 0;
560 }
561
562 static inline int mgs_init_export(struct obd_export *exp)
563 {
564         struct mgs_export_data *data = &exp->u.eu_mgs_data;
565
566         /* init mgs_export_data for fsc */
567         spin_lock_init(&data->med_lock);
568         CFS_INIT_LIST_HEAD(&data->med_clients);
569
570         spin_lock(&exp->exp_lock);
571         exp->exp_connecting = 1;
572         spin_unlock(&exp->exp_lock);
573
574         /* self-export doesn't need client data and ldlm initialization */
575         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
576                                      &exp->exp_client_uuid)))
577                 return 0;
578         return ldlm_init_export(exp);
579 }
580
581 static inline int mgs_destroy_export(struct obd_export *exp)
582 {
583         ENTRY;
584
585         target_destroy_export(exp);
586         mgs_client_free(exp);
587
588         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
589                                      &exp->exp_client_uuid)))
590                 RETURN(0);
591
592         ldlm_destroy_export(exp);
593
594         RETURN(0);
595 }
596
597 static int mgs_extract_fs_pool(char * arg, char *fsname, char *poolname)
598 {
599         char *ptr;
600
601         ENTRY;
602         for (ptr = arg;  (*ptr != '\0') && (*ptr != '.'); ptr++ ) {
603                 *fsname = *ptr;
604                 fsname++;
605         }
606         if (*ptr == '\0')
607                 return -EINVAL;
608         *fsname = '\0';
609         ptr++;
610         strcpy(poolname, ptr);
611
612         RETURN(0);
613 }
614
615 static int mgs_iocontrol_pool(const struct lu_env *env,
616                               struct mgs_device *mgs,
617                               struct obd_ioctl_data *data)
618 {
619         struct mgs_thread_info *mgi = mgs_env_info(env);
620         int rc;
621         struct lustre_cfg *lcfg = NULL;
622         char *poolname = NULL;
623         ENTRY;
624
625         OBD_ALLOC(poolname, LOV_MAXPOOLNAME + 1);
626         if (poolname == NULL)
627                 RETURN(-ENOMEM);
628
629         if (data->ioc_type != LUSTRE_CFG_TYPE) {
630                 CERROR("%s: unknown cfg record type: %d\n",
631                        mgs->mgs_obd->obd_name, data->ioc_type);
632                 GOTO(out_pool, rc = -EINVAL);
633         }
634
635         if (data->ioc_plen1 > PAGE_CACHE_SIZE)
636                 GOTO(out_pool, rc = -E2BIG);
637
638         OBD_ALLOC(lcfg, data->ioc_plen1);
639         if (lcfg == NULL)
640                 GOTO(out_pool, rc = -ENOMEM);
641
642         if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
643                 GOTO(out_lcfg, rc = -EFAULT);
644
645         if (lcfg->lcfg_bufcount < 2)
646                 GOTO(out_lcfg, rc = -EFAULT);
647
648         /* first arg is always <fsname>.<poolname> */
649         rc = mgs_extract_fs_pool(lustre_cfg_string(lcfg, 1), mgi->mgi_fsname,
650                                  poolname);
651         if (rc)
652                 GOTO(out_lcfg, rc);
653
654         switch (lcfg->lcfg_command) {
655         case LCFG_POOL_NEW:
656                 if (lcfg->lcfg_bufcount != 2)
657                         GOTO(out_lcfg, rc = -EINVAL);
658                 rc = mgs_pool_cmd(env, mgs, LCFG_POOL_NEW, mgi->mgi_fsname,
659                                   poolname, NULL);
660                 break;
661         case LCFG_POOL_ADD:
662                 if (lcfg->lcfg_bufcount != 3)
663                         GOTO(out_lcfg, rc = -EINVAL);
664                 rc = mgs_pool_cmd(env, mgs, LCFG_POOL_ADD, mgi->mgi_fsname,
665                                   poolname, lustre_cfg_string(lcfg, 2));
666                 break;
667         case LCFG_POOL_REM:
668                 if (lcfg->lcfg_bufcount != 3)
669                         GOTO(out_lcfg, rc = -EINVAL);
670                 rc = mgs_pool_cmd(env, mgs, LCFG_POOL_REM, mgi->mgi_fsname,
671                                   poolname, lustre_cfg_string(lcfg, 2));
672                 break;
673         case LCFG_POOL_DEL:
674                 if (lcfg->lcfg_bufcount != 2)
675                         GOTO(out_lcfg, rc = -EINVAL);
676                 rc = mgs_pool_cmd(env, mgs, LCFG_POOL_DEL, mgi->mgi_fsname,
677                                   poolname, NULL);
678                 break;
679         default:
680                  rc = -EINVAL;
681         }
682
683         if (rc) {
684                 CERROR("OBD_IOC_POOL err %d, cmd %X for pool %s.%s\n",
685                        rc, lcfg->lcfg_command, mgi->mgi_fsname, poolname);
686                 GOTO(out_lcfg, rc);
687         }
688
689 out_lcfg:
690         OBD_FREE(lcfg, data->ioc_plen1);
691 out_pool:
692         OBD_FREE(poolname, LOV_MAXPOOLNAME + 1);
693         RETURN(rc);
694 }
695
696 /* from mdt_iocontrol */
697 int mgs_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
698                   void *karg, void *uarg)
699 {
700         struct mgs_device *mgs = exp2mgs_dev(exp);
701         struct obd_ioctl_data *data = karg;
702         struct lu_env env;
703         int rc = 0;
704
705         ENTRY;
706         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
707
708         rc = lu_env_init(&env, LCT_MG_THREAD);
709         if (rc)
710                 RETURN(rc);
711
712         switch (cmd) {
713
714         case OBD_IOC_PARAM: {
715                 struct mgs_thread_info *mgi = mgs_env_info(&env);
716                 struct lustre_cfg *lcfg;
717
718                 if (data->ioc_type != LUSTRE_CFG_TYPE) {
719                         CERROR("%s: unknown cfg record type: %d\n",
720                                mgs->mgs_obd->obd_name, data->ioc_type);
721                         GOTO(out, rc = -EINVAL);
722                 }
723
724                 OBD_ALLOC(lcfg, data->ioc_plen1);
725                 if (lcfg == NULL)
726                         GOTO(out, rc = -ENOMEM);
727                 if (copy_from_user(lcfg, data->ioc_pbuf1, data->ioc_plen1))
728                         GOTO(out_free, rc = -EFAULT);
729
730                 if (lcfg->lcfg_bufcount < 1)
731                         GOTO(out_free, rc = -EINVAL);
732
733                 rc = mgs_setparam(&env, mgs, lcfg, mgi->mgi_fsname);
734                 if (rc)
735                         CERROR("%s: setparam err: rc = %d\n",
736                                exp->exp_obd->obd_name, rc);
737 out_free:
738                 OBD_FREE(lcfg, data->ioc_plen1);
739                 break;
740         }
741
742         case OBD_IOC_REPLACE_NIDS: {
743                 if (!data->ioc_inllen1 || !data->ioc_inlbuf1) {
744                         CERROR("No device name specified!\n");
745                         rc = -EINVAL;
746                         break;
747                 }
748
749                 if (data->ioc_inlbuf1[data->ioc_inllen1 - 1] != 0) {
750                         CERROR("Device name is not NUL terminated!\n");
751                         rc = -EINVAL;
752                         break;
753                 }
754
755                 if (data->ioc_plen1 > MTI_NAME_MAXLEN) {
756                         CERROR("Device name is too long\n");
757                         rc = -EOVERFLOW;
758                         break;
759                 }
760
761                 if (!data->ioc_inllen2 || !data->ioc_inlbuf2) {
762                         CERROR("No NIDs were specified!\n");
763                         rc = -EINVAL;
764                         break;
765                 }
766
767                 if (data->ioc_inlbuf2[data->ioc_inllen2 - 1] != 0) {
768                         CERROR("NID list is not NUL terminated!\n");
769                         rc = -EINVAL;
770                         break;
771                 }
772
773                 /* replace nids in llog */
774                 rc = mgs_replace_nids(&env, mgs, data->ioc_inlbuf1,
775                                       data->ioc_inlbuf2);
776                 if (rc)
777                         CERROR("%s: error replacing nids: rc = %d\n",
778                                exp->exp_obd->obd_name, rc);
779
780                 break;
781         }
782
783         case OBD_IOC_POOL:
784                 rc = mgs_iocontrol_pool(&env, mgs, data);
785                 break;
786
787         case OBD_IOC_DUMP_LOG: {
788                 struct llog_ctxt *ctxt;
789
790                 ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
791                 rc = class_config_dump_llog(&env, ctxt, data->ioc_inlbuf1,
792                                             NULL);
793                 llog_ctxt_put(ctxt);
794
795                 break;
796         }
797
798         case OBD_IOC_LLOG_CANCEL:
799         case OBD_IOC_LLOG_REMOVE:
800         case OBD_IOC_LLOG_CHECK:
801         case OBD_IOC_LLOG_INFO:
802         case OBD_IOC_LLOG_PRINT: {
803                 struct llog_ctxt *ctxt;
804
805                 ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
806                 rc = llog_ioctl(&env, ctxt, cmd, data);
807                 llog_ctxt_put(ctxt);
808                 break;
809         }
810
811         default:
812                 CERROR("%s: unknown command %#x\n",
813                        mgs->mgs_obd->obd_name,  cmd);
814                 rc = -ENOTTY;
815                 break;
816         }
817 out:
818         lu_env_fini(&env);
819         RETURN(rc);
820 }
821
822 static int mgs_connect_to_osd(struct mgs_device *m, const char *nextdev)
823 {
824         struct obd_connect_data *data = NULL;
825         struct obd_device       *obd;
826         int                      rc;
827         ENTRY;
828
829         OBD_ALLOC_PTR(data);
830         if (data == NULL)
831                 RETURN(-ENOMEM);
832
833         obd = class_name2obd(nextdev);
834         if (obd == NULL) {
835                 CERROR("can't locate next device: %s\n", nextdev);
836                 GOTO(out, rc = -ENOTCONN);
837         }
838
839         data->ocd_version = LUSTRE_VERSION_CODE;
840
841         rc = obd_connect(NULL, &m->mgs_bottom_exp, obd,
842                          &obd->obd_uuid, data, NULL);
843         if (rc) {
844                 CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc);
845                 GOTO(out, rc);
846         }
847
848         m->mgs_bottom = lu2dt_dev(m->mgs_bottom_exp->exp_obd->obd_lu_dev);
849         m->mgs_dt_dev.dd_lu_dev.ld_site = m->mgs_bottom->dd_lu_dev.ld_site;
850         LASSERT(m->mgs_dt_dev.dd_lu_dev.ld_site);
851 out:
852         OBD_FREE_PTR(data);
853         RETURN(rc);
854 }
855
856 static struct tgt_handler mgs_mgs_handlers[] = {
857 TGT_RPC_HANDLER(MGS_FIRST_OPC,
858                 0,                      MGS_CONNECT,     mgs_connect,
859                 &RQF_CONNECT, LUSTRE_OBD_VERSION),
860 TGT_RPC_HANDLER(MGS_FIRST_OPC,
861                 0,                      MGS_DISCONNECT,  mgs_disconnect,
862                 &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
863 TGT_MGS_HDL_VAR(0,                      MGS_EXCEPTION,   mgs_exception),
864 TGT_MGS_HDL    (HABEO_REFERO | MUTABOR, MGS_SET_INFO,    mgs_set_info),
865 TGT_MGS_HDL    (HABEO_REFERO | MUTABOR, MGS_TARGET_REG,  mgs_target_reg),
866 TGT_MGS_HDL_VAR(0,                      MGS_TARGET_DEL,  mgs_target_del),
867 TGT_MGS_HDL    (HABEO_REFERO,           MGS_CONFIG_READ, mgs_config_read),
868 };
869
870 static struct tgt_handler mgs_obd_handlers[] = {
871 TGT_OBD_HDL(0,  OBD_PING,       tgt_obd_ping),
872 };
873
874 static struct tgt_handler mgs_dlm_handlers[] = {
875 TGT_DLM_HDL(HABEO_CLAVIS,       LDLM_ENQUEUE,   tgt_enqueue),
876 };
877
878 static struct tgt_handler mgs_llog_handlers[] = {
879 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_CREATE,      mgs_llog_open),
880 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
881 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
882 TGT_LLOG_HDL_VAR(0,     LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
883 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
884 };
885
886 static struct tgt_handler mgs_sec_ctx_handlers[] = {
887 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT,           mgs_sec_ctx_handle),
888 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT_CONT,      mgs_sec_ctx_handle),
889 TGT_SEC_HDL_VAR(0,      SEC_CTX_FINI,           mgs_sec_ctx_handle),
890 };
891
892 static struct tgt_opc_slice mgs_common_slice[] = {
893         {
894                 .tos_opc_start = MGS_FIRST_OPC,
895                 .tos_opc_end   = MGS_LAST_OPC,
896                 .tos_hs        = mgs_mgs_handlers
897         },
898         {
899                 .tos_opc_start = OBD_FIRST_OPC,
900                 .tos_opc_end   = OBD_LAST_OPC,
901                 .tos_hs        = mgs_obd_handlers
902         },
903         {
904                 .tos_opc_start = LDLM_FIRST_OPC,
905                 .tos_opc_end   = LDLM_LAST_OPC,
906                 .tos_hs        = mgs_dlm_handlers
907         },
908         {
909                 .tos_opc_start = LLOG_FIRST_OPC,
910                 .tos_opc_end   = LLOG_LAST_OPC,
911                 .tos_hs        = mgs_llog_handlers
912         },
913         {
914                 .tos_opc_start = SEC_FIRST_OPC,
915                 .tos_opc_end   = SEC_LAST_OPC,
916                 .tos_hs        = mgs_sec_ctx_handlers
917         },
918         {
919                 .tos_hs        = NULL
920         }
921 };
922
923 static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
924                      struct lu_device_type *ldt, struct lustre_cfg *lcfg)
925 {
926         struct ptlrpc_service_conf       conf;
927         struct obd_device               *obd;
928         struct lustre_mount_info        *lmi;
929         struct llog_ctxt                *ctxt;
930         int                              rc;
931
932         ENTRY;
933
934         lmi = server_get_mount(lustre_cfg_string(lcfg, 0));
935         if (lmi == NULL)
936                 RETURN(-ENODEV);
937
938         mgs->mgs_dt_dev.dd_lu_dev.ld_ops = &mgs_lu_ops;
939
940         rc = mgs_connect_to_osd(mgs, lustre_cfg_string(lcfg, 3));
941         if (rc)
942                 GOTO(err_lmi, rc);
943
944         obd = class_name2obd(lustre_cfg_string(lcfg, 0));
945         LASSERT(obd);
946         mgs->mgs_obd = obd;
947         mgs->mgs_obd->obd_lu_dev = &mgs->mgs_dt_dev.dd_lu_dev;
948
949         obd->u.obt.obt_magic = OBT_MAGIC;
950         obd->u.obt.obt_instance = 0;
951
952         /* namespace for mgs llog */
953         obd->obd_namespace = ldlm_namespace_new(obd ,"MGS",
954                                                 LDLM_NAMESPACE_SERVER,
955                                                 LDLM_NAMESPACE_MODEST,
956                                                 LDLM_NS_TYPE_MGT);
957         if (obd->obd_namespace == NULL)
958                 GOTO(err_ops, rc = -ENOMEM);
959
960         /* No recovery for MGCs */
961         obd->obd_replayable = 0;
962
963         rc = tgt_init(env, &mgs->mgs_lut, obd, mgs->mgs_bottom,
964                       mgs_common_slice, OBD_FAIL_MGS_ALL_REQUEST_NET,
965                       OBD_FAIL_MGS_ALL_REPLY_NET);
966         if (rc)
967                 GOTO(err_ns, rc);
968
969         rc = mgs_fs_setup(env, mgs);
970         if (rc) {
971                 CERROR("%s: MGS filesystem method init failed: rc = %d\n",
972                        obd->obd_name, rc);
973                 GOTO(err_tgt, rc);
974         }
975
976         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT,
977                         obd, &llog_osd_ops);
978         if (rc)
979                 GOTO(err_fs, rc);
980
981         /* XXX: we need this trick till N:1 stack is supported
982          * set "current" directory for named llogs */
983         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
984         LASSERT(ctxt);
985         ctxt->loc_dir = mgs->mgs_configs_dir;
986         llog_ctxt_put(ctxt);
987
988         /* Internal mgs setup */
989         mgs_init_fsdb_list(mgs);
990         mutex_init(&mgs->mgs_mutex);
991         mgs->mgs_start_time = cfs_time_current_sec();
992         spin_lock_init(&mgs->mgs_lock);
993
994         rc = lproc_mgs_setup(mgs, lustre_cfg_string(lcfg, 3));
995         if (rc != 0) {
996                 CERROR("%s: cannot initialize proc entry: rc = %d\n",
997                        obd->obd_name, rc);
998                 GOTO(err_llog, rc);
999         }
1000
1001         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1002                            "mgs_ldlm_client", &obd->obd_ldlm_client);
1003
1004         conf = (typeof(conf)) {
1005                 .psc_name               = LUSTRE_MGS_NAME,
1006                 .psc_watchdog_factor    = MGS_SERVICE_WATCHDOG_FACTOR,
1007                 .psc_buf                = {
1008                         .bc_nbufs               = MGS_NBUFS,
1009                         .bc_buf_size            = MGS_BUFSIZE,
1010                         .bc_req_max_size        = MGS_MAXREQSIZE,
1011                         .bc_rep_max_size        = MGS_MAXREPSIZE,
1012                         .bc_req_portal          = MGS_REQUEST_PORTAL,
1013                         .bc_rep_portal          = MGC_REPLY_PORTAL,
1014                 },
1015                 .psc_thr                = {
1016                         .tc_thr_name            = "ll_mgs",
1017                         .tc_nthrs_init          = MGS_NTHRS_INIT,
1018                         .tc_nthrs_max           = MGS_NTHRS_MAX,
1019                         .tc_ctx_tags            = LCT_MG_THREAD,
1020                 },
1021                 .psc_ops                = {
1022                         .so_req_handler         = tgt_request_handle,
1023                         .so_req_printer         = target_print_req,
1024                 },
1025         };
1026
1027         /* Start the service threads */
1028         mgs->mgs_service = ptlrpc_register_service(&conf, obd->obd_proc_entry);
1029         if (IS_ERR(mgs->mgs_service)) {
1030                 rc = PTR_ERR(mgs->mgs_service);
1031                 CERROR("failed to start mgs service: %d\n", rc);
1032                 mgs->mgs_service = NULL;
1033                 GOTO(err_lproc, rc);
1034         }
1035
1036         ping_evictor_start();
1037
1038         CDEBUG(D_INFO, "MGS %s started\n", obd->obd_name);
1039
1040         /* device stack is not yet fully setup to keep no objects behind */
1041         lu_site_purge(env, mgs2lu_dev(mgs)->ld_site, ~0);
1042         RETURN(0);
1043 err_lproc:
1044         lproc_mgs_cleanup(mgs);
1045 err_llog:
1046         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1047         if (ctxt) {
1048                 ctxt->loc_dir = NULL;
1049                 llog_cleanup(env, ctxt);
1050         }
1051 err_tgt:
1052         tgt_fini(env, &mgs->mgs_lut);
1053 err_fs:
1054         /* No extra cleanup needed for llog_init_commit_thread() */
1055         mgs_fs_cleanup(env, mgs);
1056 err_ns:
1057         ldlm_namespace_free(obd->obd_namespace, NULL, 0);
1058         obd->obd_namespace = NULL;
1059 err_ops:
1060         lu_site_purge(env, mgs2lu_dev(mgs)->ld_site, ~0);
1061         if (!cfs_hash_is_empty(mgs2lu_dev(mgs)->ld_site->ls_obj_hash)) {
1062                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
1063                 lu_site_print(env, mgs2lu_dev(mgs)->ld_site, &msgdata,
1064                                 lu_cdebug_printer);
1065         }
1066         obd_disconnect(mgs->mgs_bottom_exp);
1067 err_lmi:
1068         if (lmi)
1069                 server_put_mount(lustre_cfg_string(lcfg, 0), lmi->lmi_mnt);
1070         RETURN(rc);
1071 }
1072
1073 static struct lu_device *mgs_device_free(const struct lu_env *env,
1074                                          struct lu_device *lu)
1075 {
1076         struct mgs_device *mgs = lu2mgs_dev(lu);
1077         ENTRY;
1078
1079         dt_device_fini(&mgs->mgs_dt_dev);
1080         OBD_FREE_PTR(mgs);
1081         RETURN(NULL);
1082 }
1083
1084 static int mgs_process_config(const struct lu_env *env,
1085                               struct lu_device *dev,
1086                               struct lustre_cfg *lcfg)
1087 {
1088         LBUG();
1089         return 0;
1090 }
1091
1092 static int mgs_object_init(const struct lu_env *env, struct lu_object *o,
1093                            const struct lu_object_conf *unused)
1094 {
1095         struct mgs_device *d = lu2mgs_dev(o->lo_dev);
1096         struct lu_device  *under;
1097         struct lu_object  *below;
1098         int                rc = 0;
1099         ENTRY;
1100
1101         /* do no set .do_ops as mgs calls to bottom osd directly */
1102
1103         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
1104                         PFID(lu_object_fid(o)));
1105
1106         under = &d->mgs_bottom->dd_lu_dev;
1107         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
1108         if (below != NULL)
1109                 lu_object_add(o, below);
1110         else
1111                 rc = -ENOMEM;
1112
1113         return 0;
1114 }
1115
1116 static void mgs_object_free(const struct lu_env *env, struct lu_object *o)
1117 {
1118         struct mgs_object *obj = lu2mgs_obj(o);
1119         struct lu_object_header *h = o->lo_header;
1120
1121         dt_object_fini(&obj->mgo_obj);
1122         lu_object_header_fini(h);
1123         OBD_FREE_PTR(obj);
1124 }
1125
1126 static int mgs_object_print(const struct lu_env *env, void *cookie,
1127                             lu_printer_t p, const struct lu_object *l)
1128 {
1129         const struct mgs_object *o = lu2mgs_obj((struct lu_object *) l);
1130
1131         return (*p)(env, cookie, LUSTRE_MGS_NAME"-object@%p", o);
1132 }
1133
1134 struct lu_object_operations mgs_lu_obj_ops = {
1135         .loo_object_init        = mgs_object_init,
1136         .loo_object_free        = mgs_object_free,
1137         .loo_object_print       = mgs_object_print,
1138 };
1139
1140 struct lu_object *mgs_object_alloc(const struct lu_env *env,
1141                                    const struct lu_object_header *hdr,
1142                                    struct lu_device *d)
1143 {
1144         struct lu_object_header *h;
1145         struct mgs_object       *o;
1146         struct lu_object        *l;
1147
1148         LASSERT(hdr == NULL);
1149
1150         OBD_ALLOC_PTR(o);
1151         if (o != NULL) {
1152                 l = &o->mgo_obj.do_lu;
1153                 h = &o->mgo_header;
1154
1155                 lu_object_header_init(h);
1156                 dt_object_init(&o->mgo_obj, h, d);
1157                 lu_object_add_top(h, l);
1158
1159                 l->lo_ops = &mgs_lu_obj_ops;
1160
1161                 return l;
1162         } else {
1163                 return NULL;
1164         }
1165 }
1166
1167 const struct lu_device_operations mgs_lu_ops = {
1168         .ldo_object_alloc       = mgs_object_alloc,
1169         .ldo_process_config     = mgs_process_config,
1170 };
1171
1172 static struct lu_device *mgs_device_alloc(const struct lu_env *env,
1173                                           struct lu_device_type *type,
1174                                           struct lustre_cfg *lcfg)
1175 {
1176         struct mgs_device *mgs;
1177         struct lu_device  *ludev;
1178
1179         OBD_ALLOC_PTR(mgs);
1180         if (mgs == NULL) {
1181                 ludev = ERR_PTR(-ENOMEM);
1182         } else {
1183                 int rc;
1184
1185                 ludev = mgs2lu_dev(mgs);
1186                 dt_device_init(&mgs->mgs_dt_dev, type);
1187                 rc = mgs_init0(env, mgs, type, lcfg);
1188                 if (rc != 0) {
1189                         mgs_device_free(env, ludev);
1190                         ludev = ERR_PTR(rc);
1191                 }
1192         }
1193         return ludev;
1194 }
1195
1196 static struct lu_device *mgs_device_fini(const struct lu_env *env,
1197                                          struct lu_device *d)
1198 {
1199         struct mgs_device       *mgs = lu2mgs_dev(d);
1200         struct obd_device       *obd = mgs->mgs_obd;
1201         struct llog_ctxt        *ctxt;
1202
1203         ENTRY;
1204
1205         LASSERT(mgs->mgs_bottom);
1206
1207         class_disconnect_exports(obd);
1208
1209         ping_evictor_stop();
1210
1211         ptlrpc_unregister_service(mgs->mgs_service);
1212
1213         obd_exports_barrier(obd);
1214         obd_zombie_barrier();
1215
1216         tgt_fini(env, &mgs->mgs_lut);
1217         mgs_cleanup_fsdb_list(mgs);
1218         lproc_mgs_cleanup(mgs);
1219
1220         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1221         if (ctxt) {
1222                 ctxt->loc_dir = NULL;
1223                 llog_cleanup(env, ctxt);
1224         }
1225
1226         mgs_fs_cleanup(env, mgs);
1227
1228         ldlm_namespace_free(obd->obd_namespace, NULL, 1);
1229         obd->obd_namespace = NULL;
1230
1231         lu_site_purge(env, d->ld_site, ~0);
1232         if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
1233                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
1234                 lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
1235         }
1236
1237         LASSERT(mgs->mgs_bottom_exp);
1238         obd_disconnect(mgs->mgs_bottom_exp);
1239
1240         server_put_mount(obd->obd_name, NULL);
1241
1242         RETURN(NULL);
1243 }
1244
1245 /* context key constructor/destructor: mgs_key_init, mgs_key_fini */
1246 LU_KEY_INIT_FINI(mgs, struct mgs_thread_info);
1247
1248 LU_TYPE_INIT_FINI(mgs, &mgs_thread_key);
1249
1250 LU_CONTEXT_KEY_DEFINE(mgs, LCT_MG_THREAD);
1251
1252 static struct lu_device_type_operations mgs_device_type_ops = {
1253         .ldto_init              = mgs_type_init,
1254         .ldto_fini              = mgs_type_fini,
1255
1256         .ldto_start             = mgs_type_start,
1257         .ldto_stop              = mgs_type_stop,
1258
1259         .ldto_device_alloc      = mgs_device_alloc,
1260         .ldto_device_free       = mgs_device_free,
1261
1262         .ldto_device_fini       = mgs_device_fini
1263 };
1264
1265 static struct lu_device_type mgs_device_type = {
1266         .ldt_tags       = LU_DEVICE_DT,
1267         .ldt_name       = LUSTRE_MGS_NAME,
1268         .ldt_ops        = &mgs_device_type_ops,
1269         .ldt_ctx_tags   = LCT_MG_THREAD
1270 };
1271
1272 static int mgs_obd_connect(const struct lu_env *env, struct obd_export **exp,
1273                            struct obd_device *obd, struct obd_uuid *cluuid,
1274                            struct obd_connect_data *data, void *localdata)
1275 {
1276         struct obd_export       *lexp;
1277         struct lustre_handle     conn = { 0 };
1278         int                      rc;
1279
1280         ENTRY;
1281
1282         if (exp == NULL || obd == NULL || cluuid == NULL)
1283                 RETURN(-EINVAL);
1284
1285         rc = class_connect(&conn, obd, cluuid);
1286         if (rc)
1287                 RETURN(rc);
1288
1289         lexp = class_conn2export(&conn);
1290         if (lexp == NULL)
1291                 RETURN(-EFAULT);
1292
1293         if (data != NULL) {
1294                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
1295                 data->ocd_version = LUSTRE_VERSION_CODE;
1296                 lexp->exp_connect_data = *data;
1297         }
1298
1299         tgt_counter_incr(lexp, LPROC_MGS_CONNECT);
1300
1301         rc = mgs_export_stats_init(obd, lexp, localdata);
1302         if (rc)
1303                 class_disconnect(lexp);
1304         else
1305                 *exp = lexp;
1306
1307         RETURN(rc);
1308 }
1309
1310 static int mgs_obd_reconnect(const struct lu_env *env, struct obd_export *exp,
1311                              struct obd_device *obd, struct obd_uuid *cluuid,
1312                              struct obd_connect_data *data, void *localdata)
1313 {
1314         ENTRY;
1315
1316         if (exp == NULL || obd == NULL || cluuid == NULL)
1317                 RETURN(-EINVAL);
1318
1319         tgt_counter_incr(exp, LPROC_MGS_CONNECT);
1320
1321         if (data != NULL) {
1322                 data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
1323                 data->ocd_version = LUSTRE_VERSION_CODE;
1324                 exp->exp_connect_data = *data;
1325         }
1326
1327         RETURN(mgs_export_stats_init(obd, exp, localdata));
1328 }
1329
1330 static int mgs_obd_disconnect(struct obd_export *exp)
1331 {
1332         int rc;
1333
1334         ENTRY;
1335
1336         LASSERT(exp);
1337
1338         mgs_fsc_cleanup(exp);
1339
1340         class_export_get(exp);
1341         tgt_counter_incr(exp, LPROC_MGS_DISCONNECT);
1342
1343         rc = server_disconnect_export(exp);
1344         class_export_put(exp);
1345         RETURN(rc);
1346 }
1347
1348 /* use obd ops to offer management infrastructure */
1349 static struct obd_ops mgs_obd_device_ops = {
1350         .o_owner                = THIS_MODULE,
1351         .o_connect              = mgs_obd_connect,
1352         .o_reconnect            = mgs_obd_reconnect,
1353         .o_disconnect           = mgs_obd_disconnect,
1354         .o_init_export          = mgs_init_export,
1355         .o_destroy_export       = mgs_destroy_export,
1356         .o_iocontrol            = mgs_iocontrol,
1357 };
1358
1359 static int __init mgs_init(void)
1360 {
1361         struct lprocfs_static_vars lvars;
1362
1363         lprocfs_mgs_init_vars(&lvars);
1364         class_register_type(&mgs_obd_device_ops, NULL, lvars.module_vars,
1365                             LUSTRE_MGS_NAME, &mgs_device_type);
1366
1367         return 0;
1368 }
1369
1370 static void /*__exit*/ mgs_exit(void)
1371 {
1372         class_unregister_type(LUSTRE_MGS_NAME);
1373 }
1374
1375 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1376 MODULE_DESCRIPTION("Lustre  Management Server (MGS)");
1377 MODULE_LICENSE("GPL");
1378
1379 module_init(mgs_init);
1380 module_exit(mgs_exit);