Whamcloud - gitweb
a56c781a1b70ba4b7bc3f5caf4f6ccae5b5fb50d
[fs/lustre-release.git] / lustre / mgc / mgc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mgc/mgc_request.c
5  *  Lustre Management Client
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Nathan Rutman <nathan@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #ifndef EXPORT_SYMTAB
28 # define EXPORT_SYMTAB
29 #endif
30 #define DEBUG_SUBSYSTEM S_MGC
31 #define D_MGC D_CONFIG /*|D_WARNING*/
32
33 #ifdef __KERNEL__
34 # include <linux/module.h>
35 # include <linux/pagemap.h>
36 # include <linux/miscdevice.h>
37 # include <linux/init.h>
38 #else
39 # include <liblustre.h>
40 #endif
41
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_log.h>
45 #include <lustre_fsfilt.h>
46 #include <lustre_disk.h>
47
48
49 int mgc_logname2resid(char *logname, struct ldlm_res_id *res_id)
50 {
51         char *name_end;
52         int len;
53         __u64 resname = 0;
54
55         /* fsname is at most 8 chars long at the beginning of the logname
56            e.g. "lustre-MDT0001" or "lustre" */
57         name_end = strrchr(logname, '-');
58         if (name_end)
59                 len = name_end - logname;
60         else
61                 len = strlen(logname);
62         if (len > 8) {
63                 CERROR("fsname too long: %s\n", logname);
64                 return -EINVAL;
65         }
66         if (len <= 0) {
67                 CERROR("missing fsname: %s\n", logname);
68                 return -EINVAL;
69         }
70         memcpy(&resname, logname, len);
71
72         memset(res_id, 0, sizeof(*res_id));
73         /* FIXME are resid names swabbed across the wire? */
74         res_id->name[0] = cpu_to_le64(resname);
75         CDEBUG(D_MGC, "log %s to resid "LPX64"/"LPX64" (%.8s)\n", logname,
76                res_id->name[0], res_id->name[1], (char *)&res_id->name[0]);
77         return 0;
78 }
79 EXPORT_SYMBOL(mgc_logname2resid);
80
81 /********************** config llog list **********************/
82 static struct list_head config_llog_list = LIST_HEAD_INIT(config_llog_list);
83 static spinlock_t       config_list_lock = SPIN_LOCK_UNLOCKED;
84
85 /* Take a reference to a config log */
86 static int config_log_get(struct config_llog_data *cld)
87 {
88         ENTRY;
89         CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
90                atomic_read(&cld->cld_refcount));
91         if (cld->cld_stopping)
92                 RETURN(1);
93         atomic_inc(&cld->cld_refcount);
94         RETURN(0);
95 }
96
97 /* Drop a reference to a config log.  When no longer referenced, 
98    we can free the config log data */
99 static void config_log_put(struct config_llog_data *cld)
100 {
101         ENTRY;
102         CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
103                atomic_read(&cld->cld_refcount));
104         if (atomic_dec_and_test(&cld->cld_refcount)) {
105                 CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
106                 OBD_FREE(cld->cld_logname, strlen(cld->cld_logname) + 1);
107                 if (cld->cld_cfg.cfg_instance != NULL)
108                         OBD_FREE(cld->cld_cfg.cfg_instance,
109                                  strlen(cld->cld_cfg.cfg_instance) + 1);
110                 OBD_FREE(cld, sizeof(*cld));
111         }
112         EXIT;
113 }
114
115 /* Find a config log by name */
116 static struct config_llog_data *config_log_find(char *logname, 
117                                                struct config_llog_instance *cfg)
118 {
119         struct list_head *tmp;
120         struct config_llog_data *cld;
121         char *logid = logname;
122         int match_instance = 0;
123         ENTRY;
124
125         if (cfg && cfg->cfg_instance) {
126                 match_instance++;
127                 logid = cfg->cfg_instance;
128         }
129         if (!logid) {
130                 CERROR("No log specified\n");
131                 RETURN(ERR_PTR(-EINVAL));
132         }
133
134         spin_lock(&config_list_lock);
135         list_for_each(tmp, &config_llog_list) {
136                 cld = list_entry(tmp, struct config_llog_data, cld_list_chain);
137                 if (match_instance && cld->cld_cfg.cfg_instance &&
138                     strcmp(logid, cld->cld_cfg.cfg_instance) == 0)
139                         goto out_found;
140                 if (!match_instance &&
141                     strcmp(logid, cld->cld_logname) == 0)
142                         goto out_found;
143         }
144         spin_unlock(&config_list_lock);
145
146         CERROR("can't get log %s\n", logid);
147         RETURN(ERR_PTR(-ENOENT));
148 out_found:
149         atomic_inc(&cld->cld_refcount);
150         spin_unlock(&config_list_lock);
151         RETURN(cld);
152 }
153
154 /* Add this log to our list of active logs.
155    We have one active log per "mount" - client instance or servername.
156    Each instance may be at a different point in the log. */
157 static int config_log_add(char *logname, struct config_llog_instance *cfg,
158                           struct super_block *sb)
159 {
160         struct config_llog_data *cld;
161         int rc;
162         ENTRY;
163
164         CDEBUG(D_MGC, "adding config log %s:%s\n", logname, cfg->cfg_instance);
165
166         OBD_ALLOC(cld, sizeof(*cld));
167         if (!cld)
168                 RETURN(-ENOMEM);
169         OBD_ALLOC(cld->cld_logname, strlen(logname) + 1);
170         if (!cld->cld_logname) {
171                 OBD_FREE(cld, sizeof(*cld));
172                 RETURN(-ENOMEM);
173         }
174         strcpy(cld->cld_logname, logname);
175         cld->cld_cfg = *cfg;
176         cld->cld_cfg.cfg_last_idx = 0;
177         cld->cld_cfg.cfg_flags = 0;
178         cld->cld_cfg.cfg_sb = sb;
179         atomic_set(&cld->cld_refcount, 1);
180         if (cfg->cfg_instance != NULL) {
181                 OBD_ALLOC(cld->cld_cfg.cfg_instance,
182                           strlen(cfg->cfg_instance) + 1);
183                 strcpy(cld->cld_cfg.cfg_instance, cfg->cfg_instance);
184         }
185         rc = mgc_logname2resid(logname, &cld->cld_resid);
186         if (rc) {
187                 config_log_put(cld);
188                 RETURN(rc);
189         }
190         spin_lock(&config_list_lock);
191         list_add(&cld->cld_list_chain, &config_llog_list);
192         spin_unlock(&config_list_lock);
193
194         RETURN(rc);
195 }
196
197 /* Stop watching for updates on this log. */
198 static int config_log_end(char *logname, struct config_llog_instance *cfg)
199 {
200         struct config_llog_data *cld;
201         int rc = 0;
202         ENTRY;
203
204         cld = config_log_find(logname, cfg);
205         if (IS_ERR(cld))
206                 RETURN(PTR_ERR(cld));
207         /* drop the ref from the find */
208         config_log_put(cld);
209
210         cld->cld_stopping = 1;
211         spin_lock(&config_list_lock);
212         list_del(&cld->cld_list_chain);
213         spin_unlock(&config_list_lock);
214         /* drop the start ref */
215         config_log_put(cld);
216         CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client",
217                rc);
218         RETURN(rc);
219 }
220
221 /* Failsafe */
222 static void config_log_end_all(void)
223 {
224         struct list_head *tmp, *n;
225         struct config_llog_data *cld;
226         ENTRY;
227
228         spin_lock(&config_list_lock);
229         list_for_each_safe(tmp, n, &config_llog_list) {
230                 cld = list_entry(tmp, struct config_llog_data, cld_list_chain);
231                 CERROR("conflog failsafe %s\n", cld->cld_logname);
232                 list_del(&cld->cld_list_chain);
233                 config_log_put(cld);
234         }
235         spin_unlock(&config_list_lock);
236         EXIT;
237 }
238
239
240 /********************** class fns **********************/
241
242 static int mgc_fs_setup(struct obd_device *obd, struct super_block *sb,
243                         struct vfsmount *mnt)
244 {
245         struct lvfs_run_ctxt saved;
246         struct lustre_sb_info *lsi = s2lsi(sb);
247         struct client_obd *cli = &obd->u.cli;
248         struct dentry *dentry;
249         char *label;
250         int err = 0;
251         ENTRY;
252
253         LASSERT(lsi);
254         LASSERT(lsi->lsi_srv_mnt == mnt);
255
256         /* The mgc fs exclusion sem. Only one fs can be setup at a time. */
257         down(&cli->cl_mgc_sem);
258
259         cleanup_group_info();
260
261         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
262         if (IS_ERR(obd->obd_fsops)) {
263                 up(&cli->cl_mgc_sem);
264                 CERROR("No fstype %s rc=%ld\n", MT_STR(lsi->lsi_ldd),
265                        PTR_ERR(obd->obd_fsops));
266                 RETURN(PTR_ERR(obd->obd_fsops));
267         }
268
269         cli->cl_mgc_vfsmnt = mnt;
270         fsfilt_setup(obd, mnt->mnt_sb);
271
272         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
273         obd->obd_lvfs_ctxt.pwdmnt = mnt;
274         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
275         obd->obd_lvfs_ctxt.fs = get_ds();
276
277         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
278         dentry = lookup_one_len(MOUNT_CONFIGS_DIR, current->fs->pwd,
279                                 strlen(MOUNT_CONFIGS_DIR));
280         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
281         if (IS_ERR(dentry)) {
282                 err = PTR_ERR(dentry);
283                 CERROR("cannot lookup %s directory: rc = %d\n",
284                        MOUNT_CONFIGS_DIR, err);
285                 GOTO(err_ops, err);
286         }
287         cli->cl_mgc_configs_dir = dentry;
288
289         /* We take an obd ref to insure that we can't get to mgc_cleanup
290            without calling mgc_fs_cleanup first. */
291         class_incref(obd);
292
293         label = fsfilt_get_label(obd, mnt->mnt_sb);
294         if (label)
295                 CDEBUG(D_MGC, "MGC using disk labelled=%s\n", label);
296
297         /* We keep the cl_mgc_sem until mgc_fs_cleanup */
298         RETURN(0);
299
300 err_ops:
301         fsfilt_put_ops(obd->obd_fsops);
302         obd->obd_fsops = NULL;
303         cli->cl_mgc_vfsmnt = NULL;
304         up(&cli->cl_mgc_sem);
305         RETURN(err);
306 }
307
308 static int mgc_fs_cleanup(struct obd_device *obd)
309 {
310         struct client_obd *cli = &obd->u.cli;
311         int rc = 0;
312         ENTRY;
313
314         LASSERT(cli->cl_mgc_vfsmnt != NULL);
315
316         if (cli->cl_mgc_configs_dir != NULL) {
317                 struct lvfs_run_ctxt saved;
318                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
319                 l_dput(cli->cl_mgc_configs_dir);
320                 cli->cl_mgc_configs_dir = NULL;
321                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
322                 class_decref(obd);
323         }
324
325         cli->cl_mgc_vfsmnt = NULL;
326         if (obd->obd_fsops)
327                 fsfilt_put_ops(obd->obd_fsops);
328
329         up(&cli->cl_mgc_sem);
330         RETURN(rc);
331 }
332
333 static int mgc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
334 {
335         int rc = 0;
336         ENTRY;
337
338         switch (stage) {
339         case OBD_CLEANUP_EARLY: 
340         case OBD_CLEANUP_EXPORTS:
341                 break;
342         case OBD_CLEANUP_SELF_EXP:
343                 rc = obd_llog_finish(obd, 0);
344                 if (rc != 0)
345                         CERROR("failed to cleanup llogging subsystems\n");
346                 break;
347         case OBD_CLEANUP_OBD:
348                 break;
349         }
350         RETURN(rc);
351 }
352
353 static int mgc_cleanup(struct obd_device *obd)
354 {
355         struct client_obd *cli = &obd->u.cli;
356         int rc;
357         ENTRY;
358
359         LASSERT(cli->cl_mgc_vfsmnt == NULL);
360         
361         config_log_end_all();
362
363         ptlrpcd_decref();
364
365         rc = client_obd_cleanup(obd);
366         RETURN(rc);
367 }
368
369 static struct obd_device *the_mgc;
370
371 static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
372 {
373         int rc;
374         ENTRY;
375
376         ptlrpcd_addref();
377
378         rc = client_obd_setup(obd, lcfg);
379         if (rc)
380                 GOTO(err_decref, rc);
381
382         rc = obd_llog_init(obd, obd, 0, NULL);
383         if (rc) {
384                 CERROR("failed to setup llogging subsystems\n");
385                 GOTO(err_cleanup, rc);
386         }
387
388         the_mgc = obd;
389         RETURN(rc);
390
391 err_cleanup:
392         client_obd_cleanup(obd);
393 err_decref:
394         ptlrpcd_decref();
395         RETURN(rc);
396 }
397
398 static int mgc_process_log(struct obd_device *mgc,
399                            struct config_llog_data *cld);
400
401 /* FIXME I don't want a thread for every cld; make a list of cld's to requeue
402    and use only 1 thread. */
403 /* reenqueue the lock, reparse the log */
404 static int mgc_async_requeue(void *data)
405 {
406         wait_queue_head_t   waitq;
407         struct l_wait_info  lwi;
408         struct config_llog_data *cld = (struct config_llog_data *)data;
409         char name[24];
410         int rc = 0;
411         ENTRY;
412
413         if (!data)
414                 RETURN(-EINVAL);
415         if (cld->cld_stopping)
416                 GOTO(out, rc = 0);
417
418         snprintf(name, sizeof(name), "ll_log_%s", cld->cld_logname);
419         name[sizeof(name)-1] = '\0';
420         ptlrpc_daemonize(name);
421
422         CDEBUG(D_MGC, "requeue "LPX64" %s:%s\n",
423                cld->cld_resid.name[0], cld->cld_logname,
424                cld->cld_cfg.cfg_instance);
425
426         /* Sleep a few seconds to allow the server who caused
427            the lock revocation to finish its setup, plus some random
428            so everyone doesn't try to reconnect at once. */
429         init_waitqueue_head(&waitq);
430         lwi = LWI_TIMEOUT(3 * HZ + (ll_rand() & 0x7f), NULL, NULL);
431         l_wait_event(waitq, 0, &lwi);
432
433         LASSERT(the_mgc);
434
435         class_export_get(the_mgc->obd_self_export);
436 #if 0
437         /* Re-send server info every time, in case MGS needs to regen its
438            logs (for write_conf).  Do we need this?  It's extra RPCs for
439            every server at every update.  Turning it off until I'm sure
440            it's needed. */
441         server_register_target(cld->cld_cfg.cfg_sb);
442 #endif
443         rc = mgc_process_log(the_mgc, cld);
444         class_export_put(the_mgc->obd_self_export);
445 out:
446         /* Whether we enqueued again or not in mgc_process_log,
447            we're done with the ref from the old mgc_blocking_ast */
448         config_log_put(cld);
449
450         RETURN(rc);
451 }
452
453 /* based on ll_mdc_blocking_ast */
454 static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
455                             void *data, int flag)
456 {
457         struct lustre_handle lockh;
458         struct config_llog_data *cld = (struct config_llog_data *)data;
459         int rc = 0;
460         ENTRY;
461
462         switch (flag) {
463         case LDLM_CB_BLOCKING:
464                 /* mgs wants the lock, give it up... */
465                 LDLM_DEBUG(lock, "MGC blocking CB");
466                 ldlm_lock2handle(lock, &lockh);
467                 rc = ldlm_cli_cancel(&lockh);
468                 break;
469         case LDLM_CB_CANCELING: {
470                 /* We've given up the lock, prepare ourselves to update. */
471                 LDLM_DEBUG(lock, "MGC cancel CB");
472
473                 CDEBUG(D_MGC, "Lock res "LPX64" (%.8s)\n",
474                        lock->l_resource->lr_name.name[0],
475                        (char *)&lock->l_resource->lr_name.name[0]);
476
477                 /* Make sure not to re-enqueue when the mgc is stopping
478                    (we get called from client_disconnect_export) */
479                 if (!lock->l_conn_export ||
480                     !lock->l_conn_export->exp_obd->u.cli.cl_conn_count) {
481                         CDEBUG(D_MGC, "Disconnecting, don't requeue\n");
482                         goto out_drop;
483                 }
484                 if (lock->l_req_mode != lock->l_granted_mode) {
485                         CERROR("original grant failed, won't requeue\n");
486                         goto out_drop;
487                 }
488                 if (!data) {
489                         CERROR("missing data, won't requeue\n");
490                         goto out_drop;
491                 }
492                 if (cld->cld_stopping) {
493                         CERROR("stopping, won't requeue\n");
494                         goto out_drop;
495                 }
496
497                 /* Re-enqueue the lock in a separate thread, because we must
498                    return from this fn before that lock can be taken. */
499                 rc = cfs_kernel_thread(mgc_async_requeue, data,
500                                        CLONE_VM | CLONE_FILES);
501                 if (rc < 0) {
502                         CERROR("Cannot re-enqueue thread: %d\n", rc);
503                 } else {
504                         rc = 0;
505                         break;
506                 }
507 out_drop:
508                 /* Drop this here or in mgc_async_requeue,
509                    in either case, we're done with the reference
510                    after this. */
511                 config_log_put(cld);
512                 break;
513         }
514         default:
515                 LBUG();
516         }
517
518
519         if (rc) {
520                 CERROR("%s CB failed %d:\n", flag == LDLM_CB_BLOCKING ?
521                        "blocking" : "cancel", rc);
522                 LDLM_ERROR(lock, "MGC ast");
523         }
524         RETURN(rc);
525 }
526
527 /* Take a config lock so we can get cancel notifications */
528 static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
529                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
530                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
531                        void *data, __u32 lvb_len, void *lvb_swabber,
532                        struct lustre_handle *lockh)
533 {
534         struct config_llog_data *cld = (struct config_llog_data *)data;
535         struct obd_device *obd = class_exp2obd(exp);
536         int rc;
537         ENTRY;
538
539         CDEBUG(D_MGC, "Enqueue for %s (res "LPX64")\n", cld->cld_logname,
540                cld->cld_resid.name[0]);
541
542         /* We can only drop this config log ref when we drop the lock */
543         if (config_log_get(cld))
544                 RETURN(ELDLM_LOCK_ABORTED);
545
546         /* We need a callback for every lockholder, so don't try to
547            ldlm_lock_match (see rev 1.1.2.11.2.47) */
548
549         rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, cld->cld_resid,
550                               type, NULL, mode, flags,
551                               mgc_blocking_ast, ldlm_completion_ast, NULL,
552                               data, NULL, 0, NULL, lockh);
553
554         RETURN(rc);
555 }
556
557 static int mgc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
558                       __u32 mode, struct lustre_handle *lockh)
559 {
560         ENTRY;
561
562         ldlm_lock_decref(lockh, mode);
563
564         RETURN(0);
565 }
566
567 #if 0
568 static int mgc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
569                          void *karg, void *uarg)
570 {
571         struct obd_device *obd = exp->exp_obd;
572         struct obd_ioctl_data *data = karg;
573         struct llog_ctxt *ctxt;
574         struct lvfs_run_ctxt saved;
575         int rc;
576         ENTRY;
577
578 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
579         MOD_INC_USE_COUNT;
580 #else
581         if (!try_module_get(THIS_MODULE)) {
582                 CERROR("Can't get module. Is it alive?");
583                 return -EINVAL;
584         }
585 #endif
586         switch (cmd) {
587         /* REPLicator context */
588         case OBD_IOC_PARSE: {
589                 CERROR("MGC parsing llog %s\n", data->ioc_inlbuf1);
590                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
591                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
592                 GOTO(out, rc);
593         }
594 #ifdef __KERNEL__
595         case OBD_IOC_LLOG_INFO:
596         case OBD_IOC_LLOG_PRINT: {
597                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
598                 rc = llog_ioctl(ctxt, cmd, data);
599
600                 GOTO(out, rc);
601         }
602 #endif
603         /* ORIGinator context */
604         case OBD_IOC_DUMP_LOG: {
605                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
606                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
607                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
608                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
609                 if (rc)
610                         RETURN(rc);
611
612                 GOTO(out, rc);
613         }
614         default:
615                 CERROR("mgc_ioctl(): unrecognised ioctl %#x\n", cmd);
616                 GOTO(out, rc = -ENOTTY);
617         }
618 out:
619 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
620         MOD_DEC_USE_COUNT;
621 #else
622         module_put(THIS_MODULE);
623 #endif
624
625         return rc;
626 }
627 #endif
628
629 /* Send target_reg message to MGS */
630 static int mgc_target_register(struct obd_export *exp,
631                                struct mgs_target_info *mti)
632 {
633         struct ptlrpc_request *req;
634         struct mgs_target_info *req_mti, *rep_mti;
635         int size = sizeof(*req_mti);
636         int rep_size = sizeof(*mti);
637         int rc;
638         ENTRY;
639
640         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MGS_VERSION,
641                               MGS_TARGET_REG, 1, &size, NULL);
642         if (!req)
643                 RETURN(-ENOMEM);
644
645         req_mti = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*req_mti));
646         if (!req_mti)
647                 RETURN(-ENOMEM);
648         memcpy(req_mti, mti, sizeof(*req_mti));
649
650         req->rq_replen = lustre_msg_size(1, &rep_size);
651
652         CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
653
654         rc = ptlrpc_queue_wait(req);
655         if (!rc) {
656                 rep_mti = lustre_swab_repbuf(req, 0, sizeof(*rep_mti),
657                                              lustre_swab_mgs_target_info);
658                 memcpy(mti, rep_mti, sizeof(*rep_mti));
659                 CDEBUG(D_MGC, "register %s got index = %d\n",
660                        mti->mti_svname, mti->mti_stripe_index);
661         } else {
662                 CERROR("register failed. rc=%d\n", rc);
663         }
664         ptlrpc_req_finished(req);
665
666         RETURN(rc);
667 }
668
669 int mgc_set_info_async(struct obd_export *exp, obd_count keylen,
670                        void *key, obd_count vallen, void *val, 
671                        struct ptlrpc_request_set *set)
672 {
673         struct obd_import *imp = class_exp2cliimp(exp);
674         int rc = -EINVAL;
675         ENTRY;
676
677         /* Try to "recover" the initial connection; i.e. retry */
678         if (KEY_IS(KEY_INIT_RECOV)) {
679                 if (vallen != sizeof(int))
680                         RETURN(-EINVAL);
681                 imp->imp_initial_recov = *(int *)val;
682                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
683                        exp->exp_obd->obd_name, imp->imp_initial_recov);
684                 RETURN(0);
685         }
686         /* Turn off initial_recov after we try all backup servers once */
687         if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
688                 int value;
689                 if (vallen != sizeof(int))
690                         RETURN(-EINVAL);
691                 value = *(int *)val;
692                 imp->imp_initial_recov_bk = value > 0;
693                 if (imp->imp_invalid || value > 1) {
694                         /* Resurrect if we previously died */
695                         CDEBUG(D_MGC, "Reactivate %s %d:%d:%d:%s\n", 
696                                imp->imp_obd->obd_name, value,
697                                imp->imp_deactive, imp->imp_invalid, 
698                                ptlrpc_import_state_name(imp->imp_state));
699                         /* can't put this in obdclass, module loop with ptlrpc*/
700                         /* This seems to be necessary when restarting a
701                            combo mgs/mdt while the mgc is alive */
702                         ptlrpc_invalidate_import(imp);
703                         /* Remove 'invalid' flag */
704                         ptlrpc_activate_import(imp);
705                         /* Attempt a new connect */
706                         ptlrpc_recover_import(imp, NULL);
707                 }
708                 RETURN(0);
709         }
710         /* Hack alert */
711         if (KEY_IS("register_target")) {
712                 struct mgs_target_info *mti;
713                 if (vallen != sizeof(struct mgs_target_info))
714                         RETURN(-EINVAL);
715                 mti = (struct mgs_target_info *)val;
716                 CDEBUG(D_MGC, "register_target %s %#x\n",
717                        mti->mti_svname, mti->mti_flags);
718                 rc =  mgc_target_register(exp, mti);
719                 RETURN(rc);
720         }
721         if (KEY_IS("set_fs")) {
722                 struct super_block *sb = (struct super_block *)val;
723                 struct lustre_sb_info *lsi;
724                 if (vallen != sizeof(struct super_block))
725                         RETURN(-EINVAL);
726                 lsi = s2lsi(sb);
727                 rc = mgc_fs_setup(exp->exp_obd, sb, lsi->lsi_srv_mnt);
728                 if (rc) {
729                         CERROR("set_fs got %d\n", rc);
730                 }
731                 RETURN(rc);
732         }
733         if (KEY_IS("clear_fs")) {
734                 if (vallen != 0)
735                         RETURN(-EINVAL);
736                 rc = mgc_fs_cleanup(exp->exp_obd);
737                 if (rc) {
738                         CERROR("clear_fs got %d\n", rc);
739                 }
740                 RETURN(rc);
741         }
742
743         RETURN(rc);
744 }
745
746 static int mgc_import_event(struct obd_device *obd,
747                             struct obd_import *imp,
748                             enum obd_import_event event)
749 {
750         int rc = 0;
751
752         LASSERT(imp->imp_obd == obd);
753         CDEBUG(D_MGC, "import event %#x\n", event);
754
755         switch (event) {
756         case IMP_EVENT_INVALIDATE: {
757                 struct ldlm_namespace *ns = obd->obd_namespace;
758                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
759                 break;
760         }
761         case IMP_EVENT_DISCON: 
762                 /* MGC imports should not wait for recovery */
763                 ptlrpc_invalidate_import(imp);
764                 break;
765         case IMP_EVENT_INACTIVE: 
766         case IMP_EVENT_ACTIVE: 
767         case IMP_EVENT_OCD:
768                 break;
769         default:
770                 CERROR("Unknown import event %#x\n", event);
771                 LBUG();
772         }
773         RETURN(rc);
774 }
775
776 static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
777                          int count, struct llog_catid *logid)
778 {
779         struct llog_ctxt *ctxt;
780         int rc;
781         ENTRY;
782
783         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
784                         &llog_lvfs_ops);
785         if (rc)
786                 RETURN(rc);
787
788         rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
789                         &llog_client_ops);
790         if (rc == 0) {
791                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
792                 ctxt->loc_imp = obd->u.cli.cl_import;
793         }
794
795         RETURN(rc);
796 }
797
798 static int mgc_llog_finish(struct obd_device *obd, int count)
799 {
800         int rc;
801         ENTRY;
802
803         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
804         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
805
806         RETURN(rc);
807 }
808
809 /* identical to mgs_log_is_empty */
810 static int mgc_llog_is_empty(struct obd_device *obd, struct llog_ctxt *ctxt,
811                             char *name)
812 {
813         struct lvfs_run_ctxt saved;
814         struct llog_handle *llh;
815         int rc = 0;
816
817         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
818         rc = llog_create(ctxt, &llh, NULL, name);
819         if (rc == 0) {
820                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
821                 rc = llog_get_size(llh);
822                 llog_close(llh);
823         }
824         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
825         /* header is record 1 */
826         return(rc <= 1);
827 }
828
829 static int mgc_copy_handler(struct llog_handle *llh, struct llog_rec_hdr *rec,
830                             void *data)
831 {
832         struct llog_rec_hdr local_rec = *rec;
833         struct llog_handle *local_llh = (struct llog_handle *)data;
834         char *cfg_buf = (char*) (rec + 1);
835         struct lustre_cfg *lcfg;
836         int rc = 0;
837         ENTRY;
838
839         lcfg = (struct lustre_cfg *)cfg_buf;
840
841         /* FIXME we should always write to an empty log, so remove this check.*/
842         /* append new records */
843         if (rec->lrh_index >= llog_get_size(local_llh)) {
844                 rc = llog_write_rec(local_llh, &local_rec, NULL, 0,
845                                     (void *)cfg_buf, -1);
846
847                 CDEBUG(D_INFO, "idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
848                        rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
849                        lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
850         } else {
851                 CDEBUG(D_INFO, "skip idx=%d\n",  rec->lrh_index);
852         }
853
854         RETURN(rc);
855 }
856
857 static int mgc_copy_llog(struct obd_device *obd, struct llog_ctxt *rctxt,
858                          struct llog_ctxt *lctxt, char *logname)
859 {
860         struct llog_handle *local_llh, *remote_llh;
861         struct obd_uuid *uuid;
862         int rc, rc2;
863         ENTRY;
864
865         /* open local log */
866         rc = llog_create(lctxt, &local_llh, NULL, logname);
867         if (rc)
868                 RETURN(rc);
869         /* set the log header uuid for fun */
870         OBD_ALLOC_PTR(uuid);
871         obd_str2uuid(uuid, logname);
872         rc = llog_init_handle(local_llh, LLOG_F_IS_PLAIN, uuid);
873         OBD_FREE_PTR(uuid);
874         if (rc)
875                 GOTO(out_closel, rc);
876
877         /* FIXME write new log to a temp name, then vfs_rename over logname
878            upon successful completion. */
879
880         /* open remote log */
881         rc = llog_create(rctxt, &remote_llh, NULL, logname);
882         if (rc)
883                 GOTO(out_closel, rc);
884         rc = llog_init_handle(remote_llh, LLOG_F_IS_PLAIN, NULL);
885         if (rc)
886                 GOTO(out_closer, rc);
887
888         rc = llog_process(remote_llh, mgc_copy_handler,(void *)local_llh, NULL);
889
890 out_closer:
891         rc2 = llog_close(remote_llh);
892         if (!rc)
893                 rc = rc2;
894 out_closel:
895         rc2 = llog_close(local_llh);
896         if (!rc)
897                 rc = rc2;
898
899         CDEBUG(D_MGC, "Copied remote log %s (%d)\n", logname, rc);
900         RETURN(rc);
901 }
902
903 DECLARE_MUTEX(llog_process_lock);
904
905 /* Get a config log from the MGS and process it.
906    This func is called for both clients and servers. */
907 static int mgc_process_log(struct obd_device *mgc,
908                            struct config_llog_data *cld)
909 {
910         struct llog_ctxt *ctxt, *lctxt;
911         struct lustre_handle lockh;
912         struct client_obd *cli = &mgc->u.cli;
913         struct lvfs_run_ctxt saved;
914         struct lustre_sb_info *lsi;
915         int rc = 0, rcl, flags = 0, must_pop = 0;
916         ENTRY;
917
918         if (!cld || !cld->cld_cfg.cfg_sb) {
919                 /* This should never happen */
920                 CERROR("Missing cld, aborting log update\n");
921                 RETURN(-EINVAL);
922         }
923         if (cld->cld_stopping)
924                 RETURN(0);
925
926         lsi = s2lsi(cld->cld_cfg.cfg_sb);
927
928         CDEBUG(D_MGC, "Process log %s:%s from %d\n", cld->cld_logname,
929                cld->cld_cfg.cfg_instance, cld->cld_cfg.cfg_last_idx + 1);
930
931         ctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
932         if (!ctxt) {
933                 CERROR("missing llog context\n");
934                 RETURN(-EINVAL);
935         }
936
937         /* I don't want mutliple processes running process_log at once --
938            sounds like badness.  It actually might be fine, as long as
939            we're not trying to update from the same log
940            simultaneously (in which case we should use a per-log sem.) */
941         down(&llog_process_lock);
942
943         /* Get the cfg lock on the llog */
944         rcl = mgc_enqueue(mgc->u.cli.cl_mgc_mgsexp, NULL, LDLM_PLAIN, NULL,
945                           LCK_CR, &flags, NULL, NULL, NULL,
946                           cld, 0, NULL, &lockh);
947         if (rcl)
948                 CERROR("Can't get cfg lock: %d\n", rcl);
949
950         lctxt = llog_get_context(mgc, LLOG_CONFIG_ORIG_CTXT);
951
952         /* Copy the setup log locally if we can. Don't mess around if we're
953            running an MGS though (logs are already local). */
954         if (lctxt && lsi && (lsi->lsi_flags & LSI_SERVER) &&
955             (lsi->lsi_srv_mnt == cli->cl_mgc_vfsmnt) &&
956             !IS_MGS(lsi->lsi_ldd)) {
957                 push_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
958                 must_pop++;
959                 if (rcl == 0)
960                         /* Only try to copy log if we have the lock. */
961                         rc = mgc_copy_llog(mgc, ctxt, lctxt, cld->cld_logname);
962                 if (rcl || rc) {
963                         if (mgc_llog_is_empty(mgc, lctxt, cld->cld_logname)) {
964                                 LCONSOLE_ERROR("Failed to get MGS log %s "
965                                                "and no local copy.\n",
966                                                cld->cld_logname);
967                                 GOTO(out_pop, rc = -ENOTCONN);
968                         }
969                         LCONSOLE_WARN("Failed to get MGS log %s, using "
970                                       "local copy.\n", cld->cld_logname);
971                 }
972                 /* Now, whether we copied or not, start using the local llog.
973                    If we failed to copy, we'll start using whatever the old
974                    log has. */
975                 ctxt = lctxt;
976         }
977
978         /* logname and instance info should be the same, so use our
979            copy of the instance for the update.  The cfg_last_idx will
980            be updated here. */
981         rc = class_config_parse_llog(ctxt, cld->cld_logname, &cld->cld_cfg);
982
983  out_pop:
984         if (must_pop)
985                 pop_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
986
987         /* Now drop the lock so MGS can revoke it */
988         if (!rcl) {
989                 rcl = mgc_cancel(mgc->u.cli.cl_mgc_mgsexp, NULL,
990                                  LCK_CR, &lockh);
991                 if (rcl)
992                         CERROR("Can't drop cfg lock: %d\n", rcl);
993         }
994
995         if (rc) {
996                 CERROR("%s: the configuration '%s' could not be read "
997                        "(%d) from the MGS.\n",
998                        mgc->obd_name, cld->cld_logname, rc);
999         }
1000
1001         up(&llog_process_lock);
1002
1003         RETURN(rc);
1004 }
1005
1006 static int mgc_process_config(struct obd_device *obd, obd_count len, void *buf)
1007 {
1008         struct lustre_cfg *lcfg = buf;
1009         int cmd;
1010         int rc = 0;
1011         ENTRY;
1012
1013         switch(cmd = lcfg->lcfg_command) {
1014         case LCFG_LOV_ADD_OBD: {
1015                 struct mgs_target_info *mti;
1016
1017                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
1018                     sizeof(struct mgs_target_info))
1019                         GOTO(out, rc = -EINVAL);
1020
1021                 mti = (struct mgs_target_info *)lustre_cfg_buf(lcfg, 1);
1022                 CDEBUG(D_MGC, "add_target %s %#x\n",
1023                        mti->mti_svname, mti->mti_flags);
1024                 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
1025                 break;
1026         }
1027         case LCFG_LOV_DEL_OBD:
1028                 /* FIXME */
1029                 CERROR("lov_del_obd unimplemented\n");
1030                 rc = -ENOSYS;
1031                 break;
1032         case LCFG_LOG_START: {
1033                 struct config_llog_data *cld;
1034                 struct config_llog_instance *cfg;
1035                 struct super_block *sb;
1036                 char *logname = lustre_cfg_string(lcfg, 1);
1037                 cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
1038                 sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
1039
1040                 CDEBUG(D_MGC, "parse_log %s from %d\n", logname,
1041                        cfg->cfg_last_idx);
1042
1043                 /* We're only called through here on the initial mount */
1044                 rc = config_log_add(logname, cfg, sb);
1045                 if (rc)
1046                         break;
1047                 cld = config_log_find(logname, cfg);
1048                 if (IS_ERR(cld)) {
1049                         rc = PTR_ERR(cld);
1050                         break;
1051                 }
1052
1053                 /* COMPAT_146 */
1054                 /* For old logs, there was no start marker. */
1055                 /* FIXME only set this for old logs! */
1056                 cld->cld_cfg.cfg_flags |= CFG_F_MARKER;
1057
1058                 rc = mgc_process_log(obd, cld);
1059                 config_log_put(cld);
1060
1061                 break;
1062         }
1063         case LCFG_LOG_END: {
1064                 struct config_llog_instance *cfg = NULL;
1065                 char *logname = lustre_cfg_string(lcfg, 1);
1066                 if (lcfg->lcfg_bufcount >= 2)
1067                         cfg = (struct config_llog_instance *)lustre_cfg_buf(
1068                                 lcfg, 2);
1069                 rc = config_log_end(logname, cfg);
1070                 break;
1071         }
1072         default: {
1073                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1074                 GOTO(out, rc = -EINVAL);
1075
1076         }
1077         }
1078 out:
1079         RETURN(rc);
1080 }
1081
1082 struct obd_ops mgc_obd_ops = {
1083         .o_owner        = THIS_MODULE,
1084         .o_setup        = mgc_setup,
1085         .o_precleanup   = mgc_precleanup,
1086         .o_cleanup      = mgc_cleanup,
1087         .o_add_conn     = client_import_add_conn,
1088         .o_del_conn     = client_import_del_conn,
1089         .o_connect      = client_connect_import,
1090         .o_disconnect   = client_disconnect_export,
1091         //.o_enqueue      = mgc_enqueue,
1092         .o_cancel       = mgc_cancel,
1093         //.o_iocontrol    = mgc_iocontrol,
1094         .o_set_info_async = mgc_set_info_async,
1095         .o_import_event = mgc_import_event,
1096         .o_llog_init    = mgc_llog_init,
1097         .o_llog_finish  = mgc_llog_finish,
1098         .o_process_config = mgc_process_config,
1099 };
1100
1101 int __init mgc_init(void)
1102 {
1103         return class_register_type(&mgc_obd_ops, NULL, NULL, 
1104                                    LUSTRE_MGC_NAME, NULL);
1105 }
1106
1107 #ifdef __KERNEL__
1108 static void /*__exit*/ mgc_exit(void)
1109 {
1110         class_unregister_type(LUSTRE_MGC_NAME);
1111 }
1112
1113 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1114 MODULE_DESCRIPTION("Lustre Management Client");
1115 MODULE_LICENSE("GPL");
1116
1117 module_init(mgc_init);
1118 module_exit(mgc_exit);
1119 #endif