Whamcloud - gitweb
use userland bitops from libcfs.
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         unsigned int i=0, j;
48
49         CDEBUG(D_INFO, "dump from %s\n", label);
50         if (mds->mds_lov_page_dirty == NULL) {
51                 CERROR("NULL bitmap!\n");
52                 GOTO(skip_bitmap, i);
53         }
54
55         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
57 skip_bitmap:
58         if (mds->mds_lov_page_array == NULL) {
59                 CERROR("not init page array!\n");
60                 GOTO(skip_array, i);
61
62         }
63         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64                 obd_id *data = mds->mds_lov_page_array[i];
65
66                 if (data == NULL)
67                         continue;
68
69                 for(j=0; j < OBJID_PER_PAGE(); j++) {
70                         if (data[j] == 0)
71                                 continue;
72                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
73                 }
74         }
75 skip_array:
76         EXIT;
77 }
78
79 int mds_lov_init_objids(struct obd_device *obd)
80 {
81         struct mds_obd *mds = &obd->u.mds;
82         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
83         struct file *file;
84         int rc;
85         ENTRY;
86
87         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
88
89         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90         if (mds->mds_lov_page_dirty == NULL)
91                 RETURN(-ENOMEM);
92
93
94         OBD_ALLOC(mds->mds_lov_page_array, size);
95         if (mds->mds_lov_page_array == NULL)
96                 GOTO(err_free_bitmap, rc = -ENOMEM);
97
98         /* open and test the lov objd file */
99         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
100         if (IS_ERR(file)) {
101                 rc = PTR_ERR(file);
102                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103                 GOTO(err_free, rc = PTR_ERR(file));
104         }
105         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107                        file->f_dentry->d_inode->i_mode);
108                 GOTO(err_open, rc = -ENOENT);
109         }
110         mds->mds_lov_objid_filp = file;
111
112         RETURN (0);
113 err_open:
114         if (filp_close((struct file *)file, 0))
115                 CERROR("can't close %s after error\n", LOV_OBJID);
116 err_free:
117         OBD_FREE(mds->mds_lov_page_array, size);
118 err_free_bitmap:
119         FREE_BITMAP(mds->mds_lov_page_dirty);
120
121         RETURN(rc);
122 }
123 EXPORT_SYMBOL(mds_lov_init_objids);
124
125 void mds_lov_destroy_objids(struct obd_device *obd)
126 {
127         struct mds_obd *mds = &obd->u.mds;
128         int i, rc;
129         ENTRY;
130
131         if (mds->mds_lov_page_array != NULL) {
132                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133                         obd_id *data = mds->mds_lov_page_array[i];
134                         if (data != NULL)
135                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
136                 }
137                 OBD_FREE(mds->mds_lov_page_array,
138                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
139         }
140
141         if (mds->mds_lov_objid_filp) {
142                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143                 mds->mds_lov_objid_filp = NULL;
144                 if (rc)
145                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
146         }
147
148         FREE_BITMAP(mds->mds_lov_page_dirty);
149         EXIT;
150 }
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
152
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
154 {
155         struct mds_obd *mds = &obd->u.mds;
156         int j;
157         ENTRY;
158
159         /* if we create file without objects - lmm is NULL */
160         if (lmm == NULL)
161                 return;
162
163         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164                 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165                 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166                 int page = i / OBJID_PER_PAGE();
167                 int idx = i % OBJID_PER_PAGE();
168                 obd_id *data = mds->mds_lov_page_array[page];
169
170                 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171                                " old %llu\n", i, id, data[idx]);
172                 if (id > data[idx]) {
173                         data[idx] = id;
174                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
175                 }
176         }
177         EXIT;
178 }
179 EXPORT_SYMBOL(mds_lov_update_objids);
180
181 static int mds_lov_read_objids(struct obd_device *obd)
182 {
183         struct mds_obd *mds = &obd->u.mds;
184         loff_t off = 0;
185         int i, rc, count = 0, page = 0;
186         size_t size;
187         ENTRY;
188
189         /* Read everything in the file, even if our current lov desc
190            has fewer targets. Old targets not in the lov descriptor
191            during mds setup may still have valid objids. */
192         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
193         if (size == 0)
194                 RETURN(0);
195
196         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197         CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
198         for(i=0; i < page; i++) {
199                 obd_id *data =  mds->mds_lov_page_array[i];
200                 loff_t off_old = off;
201
202                 LASSERT(data == NULL);
203                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
204                 if (data == NULL)
205                         GOTO(out, rc = -ENOMEM);
206
207                 mds->mds_lov_page_array[i] = data;
208
209                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
211                 if (rc < 0) {
212                         CERROR("Error reading objids %d\n", rc);
213                         GOTO(out, rc);
214                 }
215                 if (off == off_old)
216                         break; // eof
217
218                 count += (off-off_old)/sizeof(obd_id);
219         }
220         mds->mds_lov_objid_count = count;
221         if (count) {
222                 count --;
223                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
224                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
225         }
226         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
227                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
228 out:
229         mds_lov_dump_objids("read",obd);
230
231         RETURN(0);
232 }
233
234 int mds_lov_write_objids(struct obd_device *obd)
235 {
236         struct mds_obd *mds = &obd->u.mds;
237         int i, rc = 0;
238         ENTRY;
239
240         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
241                 RETURN(0);
242
243         mds_lov_dump_objids("write", obd);
244
245         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
246                 obd_id *data =  mds->mds_lov_page_array[i];
247                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
248                 loff_t off = i * size;
249
250                 LASSERT(data != NULL);
251
252                 /* check for particaly filled last page */
253                 if (i == mds->mds_lov_objid_lastpage)
254                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
255
256                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
257                                          size, &off, 0);
258                 if (rc < 0)
259                         break;
260                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
261         }
262         if (rc >= 0)
263                 rc = 0;
264
265         RETURN(rc);
266 }
267 EXPORT_SYMBOL(mds_lov_write_objids);
268
269 static int mds_lov_get_objid(struct obd_device * obd, struct obd_export *export,
270                              __u32 idx)
271 {
272         struct mds_obd *mds = &obd->u.mds;
273         unsigned int page;
274         unsigned int off;
275         obd_id *data;
276         int rc = 0;
277         ENTRY;
278
279         page = idx / OBJID_PER_PAGE();
280         off = idx % OBJID_PER_PAGE();
281         data = mds->mds_lov_page_array[page];
282         if (data == NULL) {
283                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
284                 if (data == NULL)
285                         GOTO(out, rc = -ENOMEM);
286
287                 mds->mds_lov_page_array[page] = data;
288         }
289
290         if (data[off] == 0) {
291                 /* We never read this lastid; ask the osc */
292                 struct obd_id_info lastid;
293                 __u32 size = sizeof(lastid);
294
295                 lastid.idx = idx;
296                 lastid.data = &data[off];
297                 rc = obd_get_info(export, sizeof(KEY_LAST_ID),
298                                   KEY_LAST_ID, &size, &lastid);
299                 if (rc)
300                         GOTO(out, rc);
301
302                 if (idx > mds->mds_lov_objid_count) {
303                         mds->mds_lov_objid_count = idx;
304                         mds->mds_lov_objid_lastpage = page;
305                         mds->mds_lov_objid_lastidx = off;
306                 }
307                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
308         }
309 out:
310         RETURN(rc);
311 }
312
313 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
314 {
315         int rc;
316         struct obdo oa;
317         struct obd_trans_info oti = {0};
318         struct lov_stripe_md  *empty_ea = NULL;
319         ENTRY;
320
321         LASSERT(mds->mds_lov_page_array != NULL);
322
323         /* This create will in fact either create or destroy:  If the OST is
324          * missing objects below this ID, they will be created.  If it finds
325          * objects above this ID, they will be removed. */
326         memset(&oa, 0, sizeof(oa));
327         oa.o_flags = OBD_FL_DELORPHAN;
328         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
329         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
330         if (ost_uuid != NULL) {
331                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
332                 oa.o_valid |= OBD_MD_FLINLINE;
333         }
334         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
335
336         RETURN(rc);
337 }
338
339 /* for one target */
340 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
341 {
342         struct mds_obd *mds = &obd->u.mds;
343         int rc;
344         struct obd_id_info info;
345         ENTRY;
346
347         LASSERT(!obd->obd_recovering);
348
349         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
350         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
351
352         info.idx = idx;
353         info.data = id;
354
355         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
356                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
357         if (rc)
358                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
359                         obd->obd_name, rc);
360
361         RETURN(rc);
362 }
363
364 static __u32 mds_lov_get_idx(struct obd_export *lov,
365                              struct obd_uuid *ost_uuid)
366 {
367         int rc;
368         int valsize = sizeof(ost_uuid);
369
370         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
371                           &valsize, ost_uuid);
372         LASSERT(rc >= 0);
373
374         RETURN(rc);
375 }
376
377 /* Update the lov desc for a new size lov. */
378 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
379 {
380         struct mds_obd *mds = &obd->u.mds;
381         struct lov_desc *ld;
382         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
383         int page, rc = 0;
384         ENTRY;
385
386         OBD_ALLOC(ld, sizeof(*ld));
387         if (!ld)
388                 RETURN(-ENOMEM);
389
390         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
391                           &valsize, ld);
392         if (rc)
393                 GOTO(out, rc);
394
395         /* The size of the LOV target table may have increased. */
396         page = ld->ld_tgt_count / OBJID_PER_PAGE();
397         if (mds->mds_lov_page_array[page] == NULL) {
398                 obd_id *ids;
399
400                 OBD_ALLOC(ids, MDS_LOV_ALLOC_SIZE);
401                 if (ids == NULL)
402                         GOTO(out, rc = -ENOMEM);
403
404                 mds->mds_lov_page_array[page] = ids;
405         }
406
407         /* Don't change the mds_lov_desc until the objids size matches the
408            count (paranoia) */
409         mds->mds_lov_desc = *ld;
410         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
411                mds->mds_lov_desc.ld_tgt_count);
412
413         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
414                                mds->mds_lov_desc.ld_tgt_count);
415
416         mds->mds_max_mdsize = lov_mds_md_size(stripes);
417         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
418         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
419                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
420                stripes);
421
422         /* If we added a target we have to reconnect the llogs */
423         /* We only _need_ to do this at first add (idx), or the first time
424            after recovery.  However, it should now be safe to call anytime. */
425         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
426
427         /*XXX this notifies the MDD until lov handling use old mds code */
428         if (obd->obd_upcall.onu_owner) {
429                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
430                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
431                                                  obd->obd_upcall.onu_owner);
432         }
433 out:
434         OBD_FREE(ld, sizeof(*ld));
435         RETURN(rc);
436 }
437
438
439 #define MDSLOV_NO_INDEX -1
440
441 /* Inform MDS about new/updated target */
442 static int mds_lov_update_mds(struct obd_device *obd,
443                               struct obd_device *watched,
444                               __u32 idx, struct obd_uuid *uuid)
445 {
446         struct mds_obd *mds = &obd->u.mds;
447         __u32 old_count;
448         int rc = 0;
449         int page;
450         int off;
451         obd_id *data;
452
453         ENTRY;
454
455         /* Don't let anyone else mess with mds_lov_objids now */
456         mutex_down(&obd->obd_dev_sem);
457
458         old_count = mds->mds_lov_desc.ld_tgt_count;
459         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
460         if (rc)
461                 GOTO(out, rc);
462
463         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
464                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
465                mds->mds_lov_desc.ld_tgt_count);
466
467         /* idx is set as data from lov_notify. */
468         if (obd->obd_recovering)
469                 GOTO(out, rc);
470
471         /* mds post recov not know about ost index - ask lov for it */
472         if (idx == MDSLOV_NO_INDEX)
473                 idx = mds_lov_get_idx(mds->mds_osc_exp, uuid);
474
475         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
476                 CERROR("index %d > count %d!\n", idx,
477                        mds->mds_lov_desc.ld_tgt_count);
478                 GOTO(out, rc = -EINVAL);
479         }
480
481         page = idx / OBJID_PER_PAGE();
482         off = idx % OBJID_PER_PAGE();
483         data = mds->mds_lov_page_array[page];
484         CDEBUG(D_CONFIG, "idx %d - %p - %d/%d\n", idx, data, page, off);
485
486         if (data[off] == 0) {
487                 rc = mds_lov_get_objid(obd, watched->obd_self_export, idx);
488         }
489         /* We have read this lastid from disk; tell the osc.
490            Don't call this during recovery. */
491         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
492         if (rc) {
493                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
494                 /* Don't abort the rest of the sync */
495                 rc = 0;
496         }
497
498         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
499                data[off], idx, rc);
500 out:
501         mutex_up(&obd->obd_dev_sem);
502         RETURN(rc);
503 }
504
505 /* update the LOV-OSC knowledge of the last used object id's */
506 int mds_lov_connect(struct obd_device *obd, char * lov_name)
507 {
508         struct mds_obd *mds = &obd->u.mds;
509         struct lustre_handle conn = {0,};
510         struct obd_connect_data *data;
511         int rc;
512         ENTRY;
513
514         if (IS_ERR(mds->mds_osc_obd))
515                 RETURN(PTR_ERR(mds->mds_osc_obd));
516
517         if (mds->mds_osc_obd)
518                 RETURN(0);
519
520         mds->mds_osc_obd = class_name2obd(lov_name);
521         if (!mds->mds_osc_obd) {
522                 CERROR("MDS cannot locate LOV %s\n", lov_name);
523                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
524                 RETURN(-ENOTCONN);
525         }
526
527         OBD_ALLOC(data, sizeof(*data));
528         if (data == NULL)
529                 RETURN(-ENOMEM);
530         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
531                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
532                                   OBD_CONNECT_OSS_CAPA;
533 #ifdef HAVE_LRU_RESIZE_SUPPORT
534         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
535 #endif
536         data->ocd_version = LUSTRE_VERSION_CODE;
537         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
538         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
539         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
540         OBD_FREE(data, sizeof(*data));
541         if (rc) {
542                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
543                 mds->mds_osc_obd = ERR_PTR(rc);
544                 RETURN(rc);
545         }
546         mds->mds_osc_exp = class_conn2export(&conn);
547
548         rc = obd_register_observer(mds->mds_osc_obd, obd);
549         if (rc) {
550                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
551                        lov_name, rc);
552                 GOTO(err_discon, rc);
553         }
554
555         /* Deny new client connections until we are sure we have some OSTs */
556         obd->obd_no_conn = 1;
557
558         mutex_down(&obd->obd_dev_sem);
559         rc = mds_lov_read_objids(obd);
560         if (rc) {
561                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
562                 GOTO(err_reg, rc);
563         }
564
565         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
566         if (rc)
567                 GOTO(err_reg, rc);
568
569         /* If we're mounting this code for the first time on an existing FS,
570          * we need to populate the objids array from the real OST values */
571         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
572                 __u32 i = mds->mds_lov_objid_count;
573                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
574                         rc = mds_lov_get_objid(obd, mds->mds_osc_exp, i);
575                         if (rc != 0)
576                                 break;
577                 }
578                 if (rc == 0)
579                         rc = mds_lov_write_objids(obd);
580                 if (rc)
581                         CERROR("got last objids from OSTs, but error "
582                                 "in update objids file: %d\n", rc);
583         }
584         mutex_up(&obd->obd_dev_sem);
585
586         /* I want to see a callback happen when the OBD moves to a
587          * "For General Use" state, and that's when we'll call
588          * set_nextid().  The class driver can help us here, because
589          * it can use the obd_recovering flag to determine when the
590          * the OBD is full available. */
591         /* MDD device will care about that
592         if (!obd->obd_recovering)
593                 rc = mds_postrecov(obd);
594          */
595         RETURN(rc);
596
597 err_reg:
598         mutex_up(&obd->obd_dev_sem);
599         obd_register_observer(mds->mds_osc_obd, NULL);
600 err_discon:
601         obd_disconnect(mds->mds_osc_exp);
602         mds->mds_osc_exp = NULL;
603         mds->mds_osc_obd = ERR_PTR(rc);
604         RETURN(rc);
605 }
606
607 int mds_lov_disconnect(struct obd_device *obd)
608 {
609         struct mds_obd *mds = &obd->u.mds;
610         int rc = 0;
611         ENTRY;
612
613         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
614                 obd_register_observer(mds->mds_osc_obd, NULL);
615
616                 /* The actual disconnect of the mds_lov will be called from
617                  * class_disconnect_exports from mds_lov_clean. So we have to
618                  * ensure that class_cleanup doesn't fail due to the extra ref
619                  * we're holding now. The mechanism to do that already exists -
620                  * the obd_force flag. We'll drop the final ref to the
621                  * mds_osc_exp in mds_cleanup. */
622                 mds->mds_osc_obd->obd_force = 1;
623         }
624
625         RETURN(rc);
626 }
627
628 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
629                   void *karg, void *uarg)
630 {
631         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
632         struct obd_device *obd = exp->exp_obd;
633         struct mds_obd *mds = &obd->u.mds;
634         struct obd_ioctl_data *data = karg;
635         struct lvfs_run_ctxt saved;
636         int rc = 0;
637
638         ENTRY;
639         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
640
641         switch (cmd) {
642         case OBD_IOC_RECORD: {
643                 char *name = data->ioc_inlbuf1;
644                 if (mds->mds_cfg_llh)
645                         RETURN(-EBUSY);
646
647                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
648                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
649                                  &mds->mds_cfg_llh, NULL, name);
650                 if (rc == 0)
651                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
652                                          &cfg_uuid);
653                 else
654                         mds->mds_cfg_llh = NULL;
655                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
656
657                 RETURN(rc);
658         }
659
660         case OBD_IOC_ENDRECORD: {
661                 if (!mds->mds_cfg_llh)
662                         RETURN(-EBADF);
663
664                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
665                 rc = llog_close(mds->mds_cfg_llh);
666                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
667
668                 mds->mds_cfg_llh = NULL;
669                 RETURN(rc);
670         }
671
672         case OBD_IOC_CLEAR_LOG: {
673                 char *name = data->ioc_inlbuf1;
674                 if (mds->mds_cfg_llh)
675                         RETURN(-EBUSY);
676
677                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
678                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
679                                  &mds->mds_cfg_llh, NULL, name);
680                 if (rc == 0) {
681                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
682                                          NULL);
683
684                         rc = llog_destroy(mds->mds_cfg_llh);
685                         llog_free_handle(mds->mds_cfg_llh);
686                 }
687                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
688
689                 mds->mds_cfg_llh = NULL;
690                 RETURN(rc);
691         }
692
693         case OBD_IOC_DORECORD: {
694                 char *cfg_buf;
695                 struct llog_rec_hdr rec;
696                 if (!mds->mds_cfg_llh)
697                         RETURN(-EBADF);
698
699                 rec.lrh_len = llog_data_len(data->ioc_plen1);
700
701                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
702                         rec.lrh_type = OBD_CFG_REC;
703                 } else {
704                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
705                         RETURN(-EINVAL);
706                 }
707
708                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
709                 if (cfg_buf == NULL)
710                         RETURN(-EINVAL);
711                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
712                 if (rc) {
713                         OBD_FREE(cfg_buf, data->ioc_plen1);
714                         RETURN(rc);
715                 }
716
717                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
718                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
719                                     cfg_buf, -1);
720                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
721
722                 OBD_FREE(cfg_buf, data->ioc_plen1);
723                 RETURN(rc);
724         }
725
726         case OBD_IOC_PARSE: {
727                 struct llog_ctxt *ctxt =
728                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
729                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
730                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
731                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
732                 if (rc)
733                         RETURN(rc);
734
735                 RETURN(rc);
736         }
737
738         case OBD_IOC_DUMP_LOG: {
739                 struct llog_ctxt *ctxt =
740                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
741                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
742                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
743                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
744                 if (rc)
745                         RETURN(rc);
746
747                 RETURN(rc);
748         }
749
750         case OBD_IOC_SYNC: {
751                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
752                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
753                 RETURN(rc);
754         }
755
756         case OBD_IOC_SET_READONLY: {
757                 void *handle;
758                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
759                 BDEVNAME_DECLARE_STORAGE(tmp);
760                 CERROR("*** setting device %s read-only ***\n",
761                        ll_bdevname(obd->u.obt.obt_sb, tmp));
762
763                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
764                 if (!IS_ERR(handle))
765                         rc = fsfilt_commit(obd, inode, handle, 1);
766
767                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
768                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
769
770                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
771                 RETURN(0);
772         }
773
774         case OBD_IOC_CATLOGLIST: {
775                 int count = mds->mds_lov_desc.ld_tgt_count;
776                 rc = llog_catalog_list(obd, count, data);
777                 RETURN(rc);
778
779         }
780         case OBD_IOC_LLOG_CHECK:
781         case OBD_IOC_LLOG_CANCEL:
782         case OBD_IOC_LLOG_REMOVE: {
783                 struct llog_ctxt *ctxt =
784                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
785                 int rc2;
786                 __u32 group;
787
788                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
789                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
790                 rc = llog_ioctl(ctxt, cmd, data);
791                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
792                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
793                 group = FILTER_GROUP_MDS0 + mds->mds_id;
794                 rc2 = obd_set_info_async(mds->mds_osc_exp,
795                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
796                                          sizeof(group), &group, NULL);
797                 if (!rc)
798                         rc = rc2;
799                 RETURN(rc);
800         }
801         case OBD_IOC_LLOG_INFO:
802         case OBD_IOC_LLOG_PRINT: {
803                 struct llog_ctxt *ctxt =
804                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
805
806                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
807                 rc = llog_ioctl(ctxt, cmd, data);
808                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
809
810                 RETURN(rc);
811         }
812
813         case OBD_IOC_ABORT_RECOVERY:
814                 CERROR("aborting recovery for device %s\n", obd->obd_name);
815                 target_stop_recovery_thread(obd);
816                 RETURN(0);
817
818         default:
819                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
820                 RETURN(-EINVAL);
821         }
822         RETURN(0);
823
824 }
825
826 /* Collect the preconditions we need to allow client connects */
827 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
828 {
829         if (flag & CONFIG_LOG)
830                 obd->u.mds.mds_fl_cfglog = 1;
831         if (flag & CONFIG_SYNC)
832                 obd->u.mds.mds_fl_synced = 1;
833         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
834                 /* Open for clients */
835                 obd->obd_no_conn = 0;
836 }
837
838 struct mds_lov_sync_info {
839         struct obd_device *mlsi_obd;     /* the lov device to sync */
840         struct obd_device *mlsi_watched; /* target osc */
841         __u32              mlsi_index;   /* index of target */
842 };
843
844 static int mds_propagate_capa_keys(struct mds_obd *mds)
845 {
846         struct lustre_capa_key *key;
847         int i, rc = 0;
848
849         ENTRY;
850
851         if (!mds->mds_capa_keys)
852                 RETURN(0);
853
854         for (i = 0; i < 2; i++) {
855                 key = &mds->mds_capa_keys[i];
856                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
857
858                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
859                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
860                 if (rc) {
861                         DEBUG_CAPA_KEY(D_ERROR, key,
862                                        "propagate failed (rc = %d) for", rc);
863                         RETURN(rc);
864                 }
865         }
866
867         RETURN(0);
868 }
869
870 /* We only sync one osc at a time, so that we don't have to hold
871    any kind of lock on the whole mds_lov_desc, which may change
872    (grow) as a result of mds_lov_add_ost.  This also avoids any
873    kind of mismatch between the lov_desc and the mds_lov_desc,
874    which are not in lock-step during lov_add_obd */
875 static int __mds_lov_synchronize(void *data)
876 {
877         struct mds_lov_sync_info *mlsi = data;
878         struct obd_device *obd = mlsi->mlsi_obd;
879         struct obd_device *watched = mlsi->mlsi_watched;
880         struct mds_obd *mds = &obd->u.mds;
881         struct obd_uuid *uuid;
882         __u32  idx = mlsi->mlsi_index;
883         struct mds_group_info mgi;
884         int rc = 0;
885         ENTRY;
886
887         OBD_FREE(mlsi, sizeof(*mlsi));
888
889         LASSERT(obd);
890         LASSERT(watched);
891         uuid = &watched->u.cli.cl_target_uuid;
892         LASSERT(uuid);
893
894         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
895
896         rc = mds_lov_update_mds(obd, watched, idx, uuid);
897         if (rc != 0) {
898                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
899                 GOTO(out, rc);
900         }
901         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
902         mgi.uuid = uuid;
903
904         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
905                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
906         if (rc != 0)
907                 GOTO(out, rc);
908
909         /* propagate capability keys */
910         rc = mds_propagate_capa_keys(mds);
911         if (rc)
912                 GOTO(out, rc);
913
914         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
915                           mds->mds_lov_desc.ld_tgt_count,
916                           NULL, NULL, uuid);
917
918         if (rc != 0) {
919                 CERROR("%s failed at llog_origin_connect: %d\n",
920                        obd_uuid2str(uuid), rc);
921                 GOTO(out, rc);
922         }
923
924         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
925               obd->obd_name, obd_uuid2str(uuid));
926         /*
927          * FIXME: this obd_stopping was useless, 
928          * since obd in mdt layer was set
929          */
930         if (obd->obd_stopping)
931                 GOTO(out, rc = -ENODEV);
932
933         rc = mds_lov_clear_orphans(mds, uuid);
934         if (rc != 0) {
935                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
936                        obd_uuid2str(uuid), rc);
937                 GOTO(out, rc);
938         }
939
940         if (obd->obd_upcall.onu_owner) {
941                 /*
942                  * This is a hack for mds_notify->mdd_notify. When the mds obd
943                  * in mdd is removed, This hack should be removed.
944                  */
945                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
946                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
947                                                  obd->obd_upcall.onu_owner);
948         }
949         EXIT;
950 out:
951         if (rc) {
952                 /* Deactivate it for safety */
953                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
954                        rc);
955                 if (!obd->obd_stopping && mds->mds_osc_obd &&
956                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
957                         obd_notify(mds->mds_osc_obd, watched,
958                                    OBD_NOTIFY_INACTIVE, NULL);
959         }
960
961         class_decref(obd);
962         return rc;
963 }
964
965 int mds_lov_synchronize(void *data)
966 {
967         struct mds_lov_sync_info *mlsi = data;
968         char name[20];
969
970         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
971                 /* There is still a watched target,
972                 but we don't know its index */
973                 sprintf(name, "ll_sync_tgt");
974         else
975                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
976         ptlrpc_daemonize(name);
977
978         RETURN(__mds_lov_synchronize(data));
979 }
980
981 int mds_lov_start_synchronize(struct obd_device *obd,
982                               struct obd_device *watched,
983                               void *data, int nonblock)
984 {
985         struct mds_lov_sync_info *mlsi;
986         int rc;
987
988         ENTRY;
989
990         LASSERT(watched);
991
992         OBD_ALLOC(mlsi, sizeof(*mlsi));
993         if (mlsi == NULL)
994                 RETURN(-ENOMEM);
995
996         mlsi->mlsi_obd = obd;
997         mlsi->mlsi_watched = watched;
998         if (data)
999                 mlsi->mlsi_index = *(__u32 *)data;
1000         else
1001                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
1002
1003         /* Although class_export_get(obd->obd_self_export) would lock
1004            the MDS in place, since it's only a self-export
1005            it doesn't lock the LOV in place.  The LOV can be disconnected
1006            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1007            Simply taking an export ref on the LOV doesn't help, because it's
1008            still disconnected. Taking an obd reference insures that we don't
1009            disconnect the LOV.  This of course means a cleanup won't
1010            finish for as long as the sync is blocking. */
1011         class_incref(obd);
1012
1013         if (nonblock) {
1014                 /* Synchronize in the background */
1015                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1016                                        CLONE_VM | CLONE_FILES);
1017                 if (rc < 0) {
1018                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1019                                obd->obd_name, rc);
1020                         class_decref(obd);
1021                 } else {
1022                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1023                                "thread=%d\n", obd->obd_name,
1024                                mlsi->mlsi_index, rc);
1025                         rc = 0;
1026                 }
1027         } else {
1028                 rc = __mds_lov_synchronize((void *)mlsi);
1029         }
1030
1031         RETURN(rc);
1032 }
1033
1034 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1035                enum obd_notify_event ev, void *data)
1036 {
1037         int rc = 0;
1038         ENTRY;
1039
1040         switch (ev) {
1041         /* We only handle these: */
1042         case OBD_NOTIFY_ACTIVE:
1043         case OBD_NOTIFY_SYNC:
1044         case OBD_NOTIFY_SYNC_NONBLOCK:
1045                 break;
1046         case OBD_NOTIFY_CONFIG:
1047                 mds_allow_cli(obd, (unsigned int)data);
1048         default:
1049                 RETURN(0);
1050         }
1051
1052         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1053         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1054                 CERROR("unexpected notification of %s %s!\n",
1055                        watched->obd_type->typ_name, watched->obd_name);
1056                 RETURN(-EINVAL);
1057         }
1058
1059         if (obd->obd_recovering) {
1060                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1061                       obd->obd_name,
1062                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1063                 /* We still have to fix the lov descriptor for ost's added
1064                    after the mdt in the config log.  They didn't make it into
1065                    mds_lov_connect. */
1066                 mutex_down(&obd->obd_dev_sem);
1067                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1068                 if (rc) {
1069                         mutex_up(&obd->obd_dev_sem);
1070                         RETURN(rc);
1071                 }
1072                 /* We should update init llog here too for replay unlink and 
1073                  * possiable llog init race when recovery complete */
1074                 llog_cat_initialize(obd, NULL, 
1075                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
1076                                     &watched->u.cli.cl_target_uuid);
1077                 mutex_up(&obd->obd_dev_sem);
1078                 mds_allow_cli(obd, CONFIG_SYNC);
1079                 RETURN(rc);
1080         }
1081
1082         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
1083         rc = mds_lov_start_synchronize(obd, watched, data,
1084                                        !(ev == OBD_NOTIFY_SYNC));
1085
1086         lquota_recovery(mds_quota_interface_ref, obd);
1087
1088         RETURN(rc);
1089 }
1090
1091 /* Convert the on-disk LOV EA structre.
1092  * We always try to convert from an old LOV EA format to the common in-memory
1093  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1094  * then convert back to the new on-disk format and save it back to disk
1095  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1096  * to convert it each time this inode is accessed.
1097  *
1098  * This function is a bit interesting in the error handling.  We can safely
1099  * ship the old lmm to the client in case of failure, since it uses the same
1100  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1101  * reason.  We will not delete the old lmm data until we have written the
1102  * new format lmm data in fsfilt_set_md(). */
1103 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1104                        struct lov_mds_md *lmm, int lmm_size)
1105 {
1106         struct lov_stripe_md *lsm = NULL;
1107         void *handle;
1108         int rc, err;
1109         ENTRY;
1110
1111         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1112             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1113                 RETURN(0);
1114
1115         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1116                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1117                LOV_MAGIC);
1118
1119         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1120         if (rc < 0)
1121                 GOTO(conv_end, rc);
1122
1123         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1124         if (rc < 0)
1125                 GOTO(conv_free, rc);
1126         lmm_size = rc;
1127
1128         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1129         if (IS_ERR(handle)) {
1130                 rc = PTR_ERR(handle);
1131                 GOTO(conv_free, rc);
1132         }
1133
1134         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1135
1136         err = fsfilt_commit(obd, inode, handle, 0);
1137         if (!rc)
1138                 rc = err ? err : lmm_size;
1139         GOTO(conv_free, rc);
1140 conv_free:
1141         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1142 conv_end:
1143         return rc;
1144 }