Whamcloud - gitweb
b=13537
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         ENTRY;
49
50         lock_kernel();
51         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52                 if (ids[i] > (mds->mds_lov_objids)[i]) {
53                         (mds->mds_lov_objids)[i] = ids[i];
54                         mds->mds_lov_objids_dirty = 1;
55                 }
56         unlock_kernel();
57         EXIT;
58 }
59
60 static int mds_lov_read_objids(struct obd_device *obd)
61 {
62         struct mds_obd *mds = &obd->u.mds;
63         obd_id *ids;
64         loff_t off = 0;
65         int i, rc, size;
66         ENTRY;
67
68         LASSERT(!mds->mds_lov_objids_size);
69         LASSERT(!mds->mds_lov_objids_dirty);
70
71         /* Read everything in the file, even if our current lov desc
72            has fewer targets. Old targets not in the lov descriptor
73            during mds setup may still have valid objids. */
74         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
75         if (size == 0)
76                 RETURN(0);
77
78         OBD_ALLOC(ids, size);
79         if (ids == NULL)
80                 RETURN(-ENOMEM);
81         mds->mds_lov_objids = ids;
82         mds->mds_lov_objids_size = size;
83
84         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
85         if (rc < 0) {
86                 CERROR("Error reading objids %d\n", rc);
87                 RETURN(rc);
88         }
89
90         mds->mds_lov_objids_in_file = size / sizeof(*ids);
91
92         for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
93                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
94                        mds->mds_lov_objids[i], i);
95         }
96         RETURN(0);
97 }
98
99 int mds_lov_write_objids(struct obd_device *obd)
100 {
101         struct mds_obd *mds = &obd->u.mds;
102         loff_t off = 0;
103         int i, rc, tgts;
104         ENTRY;
105
106         if (!mds->mds_lov_objids_dirty)
107                 RETURN(0);
108
109         tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
110
111         if (!tgts)
112                 RETURN(0);
113
114         for (i = 0; i < tgts; i++)
115                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
116                        mds->mds_lov_objids[i], i);
117
118         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
119                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
120                                  &off, 0);
121         if (rc >= 0) {
122                 mds->mds_lov_objids_dirty = 0;
123                 rc = 0;
124         }
125
126         RETURN(rc);
127 }
128
129 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
130 {
131         int rc;
132         struct obdo oa;
133         struct obd_trans_info oti = {0};
134         struct lov_stripe_md  *empty_ea = NULL;
135         ENTRY;
136
137         LASSERT(mds->mds_lov_objids != NULL);
138
139         /* This create will in fact either create or destroy:  If the OST is
140          * missing objects below this ID, they will be created.  If it finds
141          * objects above this ID, they will be removed. */
142         memset(&oa, 0, sizeof(oa));
143         oa.o_valid = OBD_MD_FLFLAGS;
144         oa.o_flags = OBD_FL_DELORPHAN;
145         if (ost_uuid != NULL) {
146                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
147                 oa.o_valid |= OBD_MD_FLINLINE;
148         }
149         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
150
151         RETURN(rc);
152 }
153
154 /* update the LOV-OSC knowledge of the last used object id's */
155 int mds_lov_set_nextid(struct obd_device *obd)
156 {
157         struct mds_obd *mds = &obd->u.mds;
158         int rc;
159         ENTRY;
160
161         LASSERT(!obd->obd_recovering);
162         LASSERT(mds->mds_lov_objids != NULL);
163      
164         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
165         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
166
167         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
168                                 KEY_NEXT_ID,
169                                 mds->mds_lov_desc.ld_tgt_count *
170                                 sizeof(*mds->mds_lov_objids),
171                                 mds->mds_lov_objids, NULL);
172
173         if (rc)
174                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
175                         obd->obd_name, rc);
176
177         RETURN(rc);
178 }
179
180 /* Update the lov desc for a new size lov. */
181 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
182 {
183         struct mds_obd *mds = &obd->u.mds;
184         struct lov_desc *ld;
185         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
186         int rc = 0;
187         ENTRY;
188
189         OBD_ALLOC(ld, sizeof(*ld));
190         if (!ld)
191                 RETURN(-ENOMEM);
192
193         rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
194                           &valsize, ld);
195         if (rc)
196                 GOTO(out, rc);
197
198         /* The size of the LOV target table may have increased. */
199         size = ld->ld_tgt_count * sizeof(obd_id);
200         if ((mds->mds_lov_objids_size == 0) ||
201             (size > mds->mds_lov_objids_size)) {
202                 obd_id *ids;
203
204                 /* add room by powers of 2 */
205                 size = 1;
206                 while (size < ld->ld_tgt_count)
207                         size = size << 1;
208                 size = size * sizeof(obd_id);
209
210                 OBD_ALLOC(ids, size);
211                 if (ids == NULL)
212                         GOTO(out, rc = -ENOMEM);
213                 memset(ids, 0, size);
214                 if (mds->mds_lov_objids_size) {
215                         obd_id *old_ids = mds->mds_lov_objids;
216                         memcpy(ids, mds->mds_lov_objids,
217                                mds->mds_lov_objids_size);
218                         mds->mds_lov_objids = ids;
219                         OBD_FREE(old_ids, mds->mds_lov_objids_size);
220                 }
221                 mds->mds_lov_objids = ids;
222                 mds->mds_lov_objids_size = size;
223         }
224
225         /* Don't change the mds_lov_desc until the objids size matches the
226            count (paranoia) */
227         mds->mds_lov_desc = *ld;
228         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
229                mds->mds_lov_desc.ld_tgt_count);
230
231         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
232                         max(mds->mds_lov_desc.ld_tgt_count,
233                             mds->mds_lov_objids_in_file));
234         mds->mds_max_mdsize = lov_mds_md_size(stripes);
235         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
236         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
237                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
238                stripes);
239
240         /* If we added a target we have to reconnect the llogs */
241         /* We only _need_ to do this at first add (idx), or the first time
242            after recovery.  However, it should now be safe to call anytime. */
243         llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
244
245 out:
246         OBD_FREE(ld, sizeof(*ld));
247         RETURN(rc);
248 }
249
250
251 #define MDSLOV_NO_INDEX -1
252
253 /* Inform MDS about new/updated target */
254 static int mds_lov_update_mds(struct obd_device *obd,
255                               struct obd_device *watched,
256                               __u32 idx, struct obd_uuid *uuid)
257 {
258         struct mds_obd *mds = &obd->u.mds;
259         int old_count;
260         int rc = 0;
261         ENTRY;
262
263         /* Don't let anyone else mess with mds_lov_objids now */
264         mutex_down(&obd->obd_dev_sem);
265
266         old_count = mds->mds_lov_desc.ld_tgt_count;
267         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
268         if (rc) 
269                 GOTO(out, rc);
270
271         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
272                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
273                mds->mds_lov_desc.ld_tgt_count);
274
275         /* idx is set as data from lov_notify. */
276         if (idx == MDSLOV_NO_INDEX || obd->obd_recovering) 
277                 GOTO(out, rc);
278                 
279         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
280                 CERROR("index %d > count %d!\n", idx, 
281                        mds->mds_lov_desc.ld_tgt_count);
282                 GOTO(out, rc = -EINVAL);
283         }
284
285         if (idx >= mds->mds_lov_objids_in_file) {
286                 /* We never read this lastid; ask the osc */
287                 obd_id lastid;
288                 __u32 size = sizeof(lastid);
289                 rc = obd_get_info(watched->obd_self_export, strlen("last_id"),
290                                   "last_id", &size, &lastid);
291                 if (rc)
292                         GOTO(out, rc);
293                 mds->mds_lov_objids[idx] = lastid;
294                 mds->mds_lov_objids_dirty = 1;
295                 mds_lov_write_objids(obd);
296         } else {
297                 /* We have read this lastid from disk; tell the osc.
298                    Don't call this during recovery. */
299                 rc = mds_lov_set_nextid(obd);
300                 if (rc) {
301                         CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
302                         /* Don't abort the rest of the sync */
303                         rc = 0;
304                 }
305         }
306
307         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
308                mds->mds_lov_objids[idx], idx, rc);
309 out:
310         mutex_up(&obd->obd_dev_sem);
311         RETURN(rc);
312 }
313
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
316 {
317         struct mds_obd *mds = &obd->u.mds;
318         struct lustre_handle conn = {0,};
319         struct obd_connect_data *data;
320         int rc, i;
321         ENTRY;
322
323         if (IS_ERR(mds->mds_osc_obd))
324                 RETURN(PTR_ERR(mds->mds_osc_obd));
325
326         if (mds->mds_osc_obd)
327                 RETURN(0);
328
329         mds->mds_osc_obd = class_name2obd(lov_name);
330         if (!mds->mds_osc_obd) {
331                 CERROR("MDS cannot locate LOV %s\n", lov_name);
332                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
333                 RETURN(-ENOTCONN);
334         }
335
336         OBD_ALLOC(data, sizeof(*data));
337         if (data == NULL)
338                 RETURN(-ENOMEM);
339         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT;
341 #ifdef HAVE_LRU_RESIZE_SUPPORT
342         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
343 #endif
344         data->ocd_version = LUSTRE_VERSION_CODE;
345         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
346         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data);
347         OBD_FREE(data, sizeof(*data));
348         if (rc) {
349                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
350                 mds->mds_osc_obd = ERR_PTR(rc);
351                 RETURN(rc);
352         }
353         mds->mds_osc_exp = class_conn2export(&conn);
354
355         rc = obd_register_observer(mds->mds_osc_obd, obd);
356         if (rc) {
357                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
358                        lov_name, rc);
359                 GOTO(err_discon, rc);
360         }
361         
362         /* Deny new client connections until we are sure we have some OSTs */
363         obd->obd_no_conn = 1;
364
365         mutex_down(&obd->obd_dev_sem);
366         rc = mds_lov_read_objids(obd);
367         if (rc) {
368                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
369                 GOTO(err_reg, rc);
370         }
371
372         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
373         if (rc)
374                 GOTO(err_reg, rc);
375
376         /* tgt_count may be 0! */
377         rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
378         if (rc) {
379                 CERROR("failed to initialize catalog %d\n", rc);
380                 GOTO(err_reg, rc);
381         }
382
383         /* If we're mounting this code for the first time on an existing FS,
384          * we need to populate the objids array from the real OST values */
385         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
386                 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
387                 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
388                                   "last_id", &size, mds->mds_lov_objids);
389                 if (!rc) {
390                         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
391                                 CWARN("got last object "LPU64" from OST %d\n",
392                                       mds->mds_lov_objids[i], i);
393                         mds->mds_lov_objids_dirty = 1;
394                         rc = mds_lov_write_objids(obd);
395                         if (rc)
396                                 CERROR("got last objids from OSTs, but error "
397                                        "writing objids file: %d\n", rc);
398                 }
399         }
400         mutex_up(&obd->obd_dev_sem);
401
402         /* I want to see a callback happen when the OBD moves to a
403          * "For General Use" state, and that's when we'll call
404          * set_nextid().  The class driver can help us here, because
405          * it can use the obd_recovering flag to determine when the
406          * the OBD is full available. */
407         if (!obd->obd_recovering)
408                 rc = mds_postrecov(obd);
409         RETURN(rc);
410
411 err_reg:
412         mutex_up(&obd->obd_dev_sem);
413         obd_register_observer(mds->mds_osc_obd, NULL);
414 err_discon:
415         obd_disconnect(mds->mds_osc_exp);
416         mds->mds_osc_exp = NULL;
417         mds->mds_osc_obd = ERR_PTR(rc);
418         RETURN(rc);
419 }
420
421 int mds_lov_disconnect(struct obd_device *obd)
422 {
423         struct mds_obd *mds = &obd->u.mds;
424         int rc = 0;
425         ENTRY;
426
427         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
428                 obd_register_observer(mds->mds_osc_obd, NULL);
429
430                 /* The actual disconnect of the mds_lov will be called from
431                  * class_disconnect_exports from mds_lov_clean. So we have to
432                  * ensure that class_cleanup doesn't fail due to the extra ref
433                  * we're holding now. The mechanism to do that already exists -
434                  * the obd_force flag. We'll drop the final ref to the
435                  * mds_osc_exp in mds_cleanup. */
436                 mds->mds_osc_obd->obd_force = 1;
437         }
438
439         RETURN(rc);
440 }
441
442 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
443                   void *karg, void *uarg)
444 {
445         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
446         struct obd_device *obd = exp->exp_obd;
447         struct mds_obd *mds = &obd->u.mds;
448         struct obd_ioctl_data *data = karg;
449         struct lvfs_run_ctxt saved;
450         int rc = 0;
451
452         ENTRY;
453         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
454
455         switch (cmd) {
456         case OBD_IOC_RECORD: {
457                 char *name = data->ioc_inlbuf1;
458                 struct llog_ctxt *ctxt;
459
460                 if (mds->mds_cfg_llh)
461                         RETURN(-EBUSY);
462
463                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
464                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
465                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
466                 llog_ctxt_put(ctxt);
467                 if (rc == 0)
468                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
469                                          &cfg_uuid);
470                 else
471                         mds->mds_cfg_llh = NULL;
472                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
473
474                 RETURN(rc);
475         }
476
477         case OBD_IOC_ENDRECORD: {
478                 if (!mds->mds_cfg_llh)
479                         RETURN(-EBADF);
480
481                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
482                 rc = llog_close(mds->mds_cfg_llh);
483                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
484
485                 mds->mds_cfg_llh = NULL;
486                 RETURN(rc);
487         }
488
489         case OBD_IOC_CLEAR_LOG: {
490                 char *name = data->ioc_inlbuf1;
491                 struct llog_ctxt *ctxt;
492                 if (mds->mds_cfg_llh)
493                         RETURN(-EBUSY);
494
495                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
496                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
497                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
498                 llog_ctxt_put(ctxt);
499                 if (rc == 0) {
500                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
501                                          NULL);
502
503                         rc = llog_destroy(mds->mds_cfg_llh);
504                         llog_free_handle(mds->mds_cfg_llh);
505                 }
506                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
507
508                 mds->mds_cfg_llh = NULL;
509                 RETURN(rc);
510         }
511
512         case OBD_IOC_DORECORD: {
513                 char *cfg_buf;
514                 struct llog_rec_hdr rec;
515                 if (!mds->mds_cfg_llh)
516                         RETURN(-EBADF);
517
518                 rec.lrh_len = llog_data_len(data->ioc_plen1);
519
520                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
521                         rec.lrh_type = OBD_CFG_REC;
522                 } else {
523                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
524                         RETURN(-EINVAL);
525                 }
526
527                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
528                 if (cfg_buf == NULL)
529                         RETURN(-EINVAL);
530                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
531                 if (rc) {
532                         OBD_FREE(cfg_buf, data->ioc_plen1);
533                         RETURN(rc);
534                 }
535
536                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
537                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
538                                     cfg_buf, -1);
539                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
540
541                 OBD_FREE(cfg_buf, data->ioc_plen1);
542                 RETURN(rc);
543         }
544
545         case OBD_IOC_PARSE: {
546                 struct llog_ctxt *ctxt =
547                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
548                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
549                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
550                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
551                 llog_ctxt_put(ctxt);
552                 if (rc)
553                         RETURN(rc);
554
555                 RETURN(rc);
556         }
557
558         case OBD_IOC_DUMP_LOG: {
559                 struct llog_ctxt *ctxt =
560                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
561                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
562                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
563                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
564                 llog_ctxt_put(ctxt);
565                 if (rc)
566                         RETURN(rc);
567
568                 RETURN(rc);
569         }
570
571         case OBD_IOC_SYNC: {
572                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
573                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
574                 RETURN(rc);
575         }
576
577         case OBD_IOC_SET_READONLY: {
578                 void *handle;
579                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
580                 BDEVNAME_DECLARE_STORAGE(tmp);
581                 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
582                        obd->obd_name, ll_bdevname(obd->u.obt.obt_sb, tmp));
583
584                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
585                 if (!IS_ERR(handle))
586                         rc = fsfilt_commit(obd, inode, handle, 1);
587
588                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
589                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
590
591                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
592                 RETURN(0);
593         }
594
595         case OBD_IOC_CATLOGLIST: {
596                 int count = mds->mds_lov_desc.ld_tgt_count;
597                 rc = llog_catalog_list(obd, count, data);
598                 RETURN(rc);
599
600         }
601         case OBD_IOC_LLOG_CHECK:
602         case OBD_IOC_LLOG_CANCEL:
603         case OBD_IOC_LLOG_REMOVE: {
604                 struct llog_ctxt *ctxt =
605                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
606                 int rc2;
607
608                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
609                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
610                 rc = llog_ioctl(ctxt, cmd, data);
611                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
612                 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
613                 llog_ctxt_put(ctxt);
614                 rc2 = obd_set_info_async(mds->mds_osc_exp,
615                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
616                                          0, NULL, NULL);
617                 if (!rc)
618                         rc = rc2;
619                 RETURN(rc);
620         }
621         case OBD_IOC_LLOG_INFO:
622         case OBD_IOC_LLOG_PRINT: {
623                 struct llog_ctxt *ctxt =
624                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
625
626                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
627                 rc = llog_ioctl(ctxt, cmd, data);
628                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
629                 llog_ctxt_put(ctxt);
630
631                 RETURN(rc);
632         }
633
634         case OBD_IOC_ABORT_RECOVERY:
635                 CERROR("aborting recovery for device %s\n", obd->obd_name);
636                 target_abort_recovery(obd);
637                 RETURN(0);
638
639         default:
640                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
641                 RETURN(-EINVAL);
642         }
643         RETURN(0);
644
645 }
646
647 /* Collect the preconditions we need to allow client connects */
648 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
649 {
650         if (flag & CONFIG_LOG)
651                 obd->u.mds.mds_fl_cfglog = 1;
652         if (flag & CONFIG_SYNC)
653                 obd->u.mds.mds_fl_synced = 1;
654         if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_synced)
655                 /* Open for clients */
656                 obd->obd_no_conn = 0;
657 }
658
659 struct mds_lov_sync_info {
660         struct obd_device *mlsi_obd;     /* the lov device to sync */
661         struct obd_device *mlsi_watched; /* target osc */
662         __u32              mlsi_index;   /* index of target */
663 };
664
665 /* We only sync one osc at a time, so that we don't have to hold
666    any kind of lock on the whole mds_lov_desc, which may change
667    (grow) as a result of mds_lov_add_ost.  This also avoids any
668    kind of mismatch between the lov_desc and the mds_lov_desc,
669    which are not in lock-step during lov_add_obd */
670 static int __mds_lov_synchronize(void *data)
671 {
672         struct mds_lov_sync_info *mlsi = data;
673         struct obd_device *obd = mlsi->mlsi_obd;
674         struct obd_device *watched = mlsi->mlsi_watched;
675         struct mds_obd *mds = &obd->u.mds;
676         struct obd_uuid *uuid;
677         __u32  idx = mlsi->mlsi_index;
678         struct llog_ctxt *ctxt;
679         int rc = 0;
680         ENTRY;
681
682         OBD_FREE(mlsi, sizeof(*mlsi));
683
684         LASSERT(obd);
685         LASSERT(watched);
686         uuid = &watched->u.cli.cl_target_uuid;
687         LASSERT(uuid);
688
689         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
690
691         rc = mds_lov_update_mds(obd, watched, idx, uuid);
692         if (rc != 0) {
693                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
694                 GOTO(out, rc);
695         }
696
697         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
698                                 KEY_MDS_CONN, 0, uuid, NULL);
699         if (rc != 0)
700                 GOTO(out, rc);
701
702         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
703         if (!ctxt) 
704               RETURN(-ENODEV); 
705         
706         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
707
708         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
709                           NULL, NULL, uuid); 
710         llog_ctxt_put(ctxt);
711
712         if (rc != 0) {
713                 CERROR("%s failed at llog_origin_connect: %d\n",
714                        obd_uuid2str(uuid), rc);
715                 GOTO(out, rc);
716         }
717
718         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
719               obd->obd_name, obd_uuid2str(uuid));
720
721         if (obd->obd_stopping)
722                 GOTO(out, rc = -ENODEV);
723
724         rc = mds_lov_clear_orphans(mds, uuid);
725         if (rc != 0) {
726                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
727                        obd_uuid2str(uuid), rc);
728                 GOTO(out, rc);
729         }
730
731         EXIT;
732 out:
733         if (rc) {
734                 /* Deactivate it for safety */
735                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
736                        rc);
737                 if (!obd->obd_stopping && mds->mds_osc_obd &&
738                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping) 
739                         obd_notify(mds->mds_osc_obd, watched,
740                                    OBD_NOTIFY_INACTIVE, NULL);
741         } else {
742                 /* We've successfully synced at least 1 OST and are ready
743                    to handle client requests */
744                 mds_allow_cli(obd, CONFIG_SYNC);
745         }
746
747         class_decref(obd);
748         return rc;
749 }
750
751 int mds_lov_synchronize(void *data)
752 {
753         struct mds_lov_sync_info *mlsi = data;
754         char name[20];
755
756         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
757                 /* There is still a watched target,
758                 but we don't know its index */
759                 sprintf(name, "ll_sync_tgt");
760         else
761                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
762         ptlrpc_daemonize(name);
763
764         RETURN(__mds_lov_synchronize(data));
765 }
766
767 int mds_lov_start_synchronize(struct obd_device *obd,
768                               struct obd_device *watched,
769                               void *data, int nonblock)
770 {
771         struct mds_lov_sync_info *mlsi;
772         int rc;
773
774         ENTRY;
775
776         LASSERT(watched);
777
778         OBD_ALLOC(mlsi, sizeof(*mlsi));
779         if (mlsi == NULL)
780                 RETURN(-ENOMEM);
781
782         mlsi->mlsi_obd = obd;
783         mlsi->mlsi_watched = watched;
784         if (data)
785                 mlsi->mlsi_index = *(__u32 *)data;
786         else
787                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
788
789         /* Although class_export_get(obd->obd_self_export) would lock
790            the MDS in place, since it's only a self-export
791            it doesn't lock the LOV in place.  The LOV can be disconnected
792            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
793            Simply taking an export ref on the LOV doesn't help, because it's
794            still disconnected. Taking an obd reference insures that we don't
795            disconnect the LOV.  This of course means a cleanup won't
796            finish for as long as the sync is blocking. */
797         class_incref(obd);
798
799         if (nonblock) {
800                 /* Synchronize in the background */
801                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
802                                        CLONE_VM | CLONE_FILES);
803                 if (rc < 0) {
804                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
805                                obd->obd_name, rc);
806                         class_decref(obd);
807                 } else {
808                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
809                                "thread=%d\n", obd->obd_name,
810                                mlsi->mlsi_index, rc);
811                         rc = 0;
812                 }
813         } else {
814                 rc = __mds_lov_synchronize((void *)mlsi);
815         }
816
817         RETURN(rc);
818 }
819
820 int mds_notify(struct obd_device *obd, struct obd_device *watched,
821                enum obd_notify_event ev, void *data)
822 {
823         int rc = 0;
824         ENTRY;
825
826         switch (ev) {
827         /* We only handle these: */
828         case OBD_NOTIFY_ACTIVE:
829         case OBD_NOTIFY_SYNC:
830         case OBD_NOTIFY_SYNC_NONBLOCK:
831                 break;
832         case OBD_NOTIFY_CONFIG:
833                 mds_allow_cli(obd, (unsigned long)data);
834         default:
835                 RETURN(0);
836         }
837
838         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
839
840         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
841                 CERROR("unexpected notification of %s %s!\n",
842                        watched->obd_type->typ_name, watched->obd_name);
843                 RETURN(-EINVAL);
844         }
845
846         if (obd->obd_recovering) {
847                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
848                       obd->obd_name,
849                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
850                 /* We still have to fix the lov descriptor for ost's added
851                    after the mdt in the config log.  They didn't make it into
852                    mds_lov_connect. */
853                 mutex_down(&obd->obd_dev_sem);
854                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
855                 mutex_up(&obd->obd_dev_sem);
856                 mds_allow_cli(obd, CONFIG_SYNC);
857                 RETURN(rc);
858         }
859
860         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
861         rc = mds_lov_start_synchronize(obd, watched, data,
862                                        !(ev == OBD_NOTIFY_SYNC));
863
864         lquota_recovery(mds_quota_interface_ref, obd);
865
866         RETURN(rc);
867 }
868
869 /* Convert the on-disk LOV EA structre.
870  * We always try to convert from an old LOV EA format to the common in-memory
871  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
872  * then convert back to the new on-disk format and save it back to disk
873  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
874  * to convert it each time this inode is accessed.
875  *
876  * This function is a bit interesting in the error handling.  We can safely
877  * ship the old lmm to the client in case of failure, since it uses the same
878  * obd_unpackmd() code and can do the conversion if the MDS fails for some
879  * reason.  We will not delete the old lmm data until we have written the
880  * new format lmm data in fsfilt_set_md(). */
881 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
882                        struct lov_mds_md *lmm, int lmm_size)
883 {
884         struct lov_stripe_md *lsm = NULL;
885         void *handle;
886         int rc, err;
887         ENTRY;
888
889         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
890             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
891                 RETURN(0);
892
893         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
894                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
895                LOV_MAGIC);
896
897         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
898         if (rc < 0)
899                 GOTO(conv_end, rc);
900
901         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
902         if (rc < 0)
903                 GOTO(conv_free, rc);
904         lmm_size = rc;
905
906         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
907         if (IS_ERR(handle)) {
908                 rc = PTR_ERR(handle);
909                 GOTO(conv_free, rc);
910         }
911
912         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
913
914         err = fsfilt_commit(obd, inode, handle, 0);
915         if (!rc)
916                 rc = err ? err : lmm_size;
917         GOTO(conv_free, rc);
918 conv_free:
919         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
920 conv_end:
921         return rc;
922 }
923
924 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
925                          struct lov_desc *desc)
926 {
927         int i;
928         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
929                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
930                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
931         }
932 }
933