Whamcloud - gitweb
cleanup in userland bitops code.
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         unsigned int i=0, j;
48
49         CDEBUG(D_INFO, "dump from %s\n", label);
50         if (mds->mds_lov_page_dirty == NULL) {
51                 CERROR("NULL bitmap!\n");
52                 GOTO(skip_bitmap, i);
53         }
54
55         for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
56                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
57 skip_bitmap:
58         if (mds->mds_lov_page_array == NULL) {
59                 CERROR("not init page array!\n");
60                 GOTO(skip_array, i);
61
62         }
63         for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
64                 obd_id *data = mds->mds_lov_page_array[i];
65
66                 if (data == NULL)
67                         continue;
68
69                 for(j=0; j < OBJID_PER_PAGE(); j++) {
70                         if (data[j] == 0)
71                                 continue;
72                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
73                                i,j,data[j]);
74                 }
75         }
76 skip_array:
77         EXIT;
78 }
79
80 int mds_lov_init_objids(struct obd_device *obd)
81 {
82         struct mds_obd *mds = &obd->u.mds;
83         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
84         struct file *file;
85         int rc;
86         ENTRY;
87
88         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
89
90         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
91         if (mds->mds_lov_page_dirty == NULL)
92                 RETURN(-ENOMEM);
93
94
95         OBD_ALLOC(mds->mds_lov_page_array, size);
96         if (mds->mds_lov_page_array == NULL)
97                 GOTO(err_free_bitmap, rc = -ENOMEM);
98
99         /* open and test the lov objd file */
100         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
101         if (IS_ERR(file)) {
102                 rc = PTR_ERR(file);
103                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
104                 GOTO(err_free, rc = PTR_ERR(file));
105         }
106         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
107                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
108                        file->f_dentry->d_inode->i_mode);
109                 GOTO(err_open, rc = -ENOENT);
110         }
111         mds->mds_lov_objid_filp = file;
112
113         RETURN (0);
114 err_open:
115         if (filp_close((struct file *)file, 0))
116                 CERROR("can't close %s after error\n", LOV_OBJID);
117 err_free:
118         OBD_FREE(mds->mds_lov_page_array, size);
119 err_free_bitmap:
120         FREE_BITMAP(mds->mds_lov_page_dirty);
121
122         RETURN(rc);
123 }
124 EXPORT_SYMBOL(mds_lov_init_objids);
125
126 void mds_lov_destroy_objids(struct obd_device *obd)
127 {
128         struct mds_obd *mds = &obd->u.mds;
129         int i, rc;
130         ENTRY;
131
132         if (mds->mds_lov_page_array != NULL) {
133                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
134                         obd_id *data = mds->mds_lov_page_array[i];
135                         if (data != NULL)
136                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
137                 }
138                 OBD_FREE(mds->mds_lov_page_array,
139                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
140         }
141
142         if (mds->mds_lov_objid_filp) {
143                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
144                 mds->mds_lov_objid_filp = NULL;
145                 if (rc)
146                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
147         }
148
149         FREE_BITMAP(mds->mds_lov_page_dirty);
150         EXIT;
151 }
152 EXPORT_SYMBOL(mds_lov_destroy_objids);
153
154 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
155 {
156         struct mds_obd *mds = &obd->u.mds;
157         int j;
158         ENTRY;
159
160         /* if we create file without objects - lmm is NULL */
161         if (lmm == NULL)
162                 return;
163
164         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
165                 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
166                 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
167                 int page = i / OBJID_PER_PAGE();
168                 int idx = i % OBJID_PER_PAGE();
169                 obd_id *data = mds->mds_lov_page_array[page];
170
171                 CDEBUG(D_INODE,"update last object for ost %d"
172                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
173                 if (id > data[idx]) {
174                         data[idx] = id;
175                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
176                 }
177         }
178         EXIT;
179 }
180 EXPORT_SYMBOL(mds_lov_update_objids);
181
182 static int mds_lov_read_objids(struct obd_device *obd)
183 {
184         struct mds_obd *mds = &obd->u.mds;
185         loff_t off = 0;
186         int i, rc = 0, count = 0, page = 0;
187         size_t size;
188         ENTRY;
189
190         /* Read everything in the file, even if our current lov desc
191            has fewer targets. Old targets not in the lov descriptor
192            during mds setup may still have valid objids. */
193         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
194         if (size == 0)
195                 RETURN(0);
196
197         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
198         CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
199         for(i=0; i < page; i++) {
200                 obd_id *data =  mds->mds_lov_page_array[i];
201                 loff_t off_old = off;
202
203                 LASSERT(data == NULL);
204                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
205                 if (data == NULL)
206                         GOTO(out, rc = -ENOMEM);
207
208                 mds->mds_lov_page_array[i] = data;
209
210                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
211                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
212                 if (rc < 0) {
213                         CERROR("Error reading objids %d\n", rc);
214                         GOTO(out, rc);
215                 }
216                 if (off == off_old)
217                         break; /* eof */
218
219                 count += (off-off_old)/sizeof(obd_id);
220         }
221         mds->mds_lov_objid_count = count;
222         if (count) {
223                 count --;
224                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
225                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
226         }
227         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
228                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
229 out:
230         mds_lov_dump_objids("read",obd);
231
232         RETURN(rc);
233 }
234
235 int mds_lov_write_objids(struct obd_device *obd)
236 {
237         struct mds_obd *mds = &obd->u.mds;
238         int i, rc = 0;
239         ENTRY;
240
241         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
242                 RETURN(0);
243
244         mds_lov_dump_objids("write", obd);
245
246         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
247                 obd_id *data =  mds->mds_lov_page_array[i];
248                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
249                 loff_t off = i * size;
250
251                 LASSERT(data != NULL);
252
253                 /* check for particaly filled last page */
254                 if (i == mds->mds_lov_objid_lastpage)
255                         size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
256
257                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
258                                          size, &off, 0);
259                 if (rc < 0)
260                         break;
261                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
262         }
263         if (rc >= 0)
264                 rc = 0;
265
266         RETURN(rc);
267 }
268 EXPORT_SYMBOL(mds_lov_write_objids);
269
270 static int mds_lov_get_objid(struct obd_device * obd,
271                              __u32 idx)
272 {
273         struct mds_obd *mds = &obd->u.mds;
274         unsigned int page;
275         unsigned int off;
276         obd_id *data;
277         int rc = 0;
278         ENTRY;
279
280         page = idx / OBJID_PER_PAGE();
281         off = idx % OBJID_PER_PAGE();
282         data = mds->mds_lov_page_array[page];
283         if (data == NULL) {
284                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
285                 if (data == NULL)
286                         GOTO(out, rc = -ENOMEM);
287
288                 mds->mds_lov_page_array[page] = data;
289         }
290
291         if (data[off] == 0) {
292                 /* We never read this lastid; ask the osc */
293                 struct obd_id_info lastid;
294                 __u32 size = sizeof(lastid);
295
296                 lastid.idx = idx;
297                 lastid.data = &data[off];
298                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
299                                   KEY_LAST_ID, &size, &lastid);
300                 if (rc)
301                         GOTO(out, rc);
302
303                 if (idx > mds->mds_lov_objid_count) {
304                         mds->mds_lov_objid_count = idx;
305                         mds->mds_lov_objid_lastpage = page;
306                         mds->mds_lov_objid_lastidx = off;
307                 }
308                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
309         }
310         CDEBUG(D_INFO, "idx %d - %p - %d/%d - "LPU64"\n",
311                idx, data, page, off, data[off]);
312 out:
313         RETURN(rc);
314 }
315
316 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
317 {
318         int rc;
319         struct obdo oa;
320         struct obd_trans_info oti = {0};
321         struct lov_stripe_md  *empty_ea = NULL;
322         ENTRY;
323
324         LASSERT(mds->mds_lov_page_array != NULL);
325
326         /* This create will in fact either create or destroy:  If the OST is
327          * missing objects below this ID, they will be created.  If it finds
328          * objects above this ID, they will be removed. */
329         memset(&oa, 0, sizeof(oa));
330         oa.o_flags = OBD_FL_DELORPHAN;
331         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
332         if (ost_uuid != NULL) {
333                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
334                 oa.o_valid |= OBD_MD_FLINLINE;
335         }
336         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
337
338         RETURN(rc);
339 }
340
341 /* for one target */
342 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
343 {
344         struct mds_obd *mds = &obd->u.mds;
345         int rc;
346         struct obd_id_info info;
347         ENTRY;
348
349         LASSERT(!obd->obd_recovering);
350
351         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
352         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
353
354         info.idx = idx;
355         info.data = id;
356
357         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
358                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
359         if (rc)
360                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
361                         obd->obd_name, rc);
362
363         RETURN(rc);
364 }
365
366 static __u32 mds_lov_get_idx(struct obd_export *lov,
367                              struct obd_uuid *ost_uuid)
368 {
369         int rc;
370         int valsize = sizeof(ost_uuid);
371
372         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
373                           &valsize, ost_uuid);
374         LASSERT(rc >= 0);
375
376         RETURN(rc);
377 }
378
379 /* Update the lov desc for a new size lov. */
380 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
381 {
382         struct mds_obd *mds = &obd->u.mds;
383         struct lov_desc *ld;
384         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
385         int rc = 0;
386         ENTRY;
387
388         OBD_ALLOC(ld, sizeof(*ld));
389         if (!ld)
390                 RETURN(-ENOMEM);
391
392         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
393                           &valsize, ld);
394         if (rc)
395                 GOTO(out, rc);
396
397         /* Don't change the mds_lov_desc until the objids size matches the
398            count (paranoia) */
399         mds->mds_lov_desc = *ld;
400         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
401                mds->mds_lov_desc.ld_tgt_count);
402
403         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
404                         mds->mds_lov_desc.ld_tgt_count);
405
406         mds->mds_max_mdsize = lov_mds_md_size(stripes);
407         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
408         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
409                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
410                stripes);
411
412         /* If we added a target we have to reconnect the llogs */
413         /* We only _need_ to do this at first add (idx), or the first time
414            after recovery.  However, it should now be safe to call anytime. */
415         rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
416
417 out:
418         OBD_FREE(ld, sizeof(*ld));
419         RETURN(rc);
420 }
421
422
423 /* Inform MDS about new/updated target */
424 static int mds_lov_update_mds(struct obd_device *obd,
425                               struct obd_device *watched,
426                               __u32 idx)
427 {
428         struct mds_obd *mds = &obd->u.mds;
429         __u32 old_count;
430         int rc = 0;
431         int page;
432         int off;
433         obd_id *data;
434
435         ENTRY;
436
437         /* Don't let anyone else mess with mds_lov_objids now */
438         mutex_down(&obd->obd_dev_sem);
439
440         old_count = mds->mds_lov_desc.ld_tgt_count;
441         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
442         if (rc)
443                 GOTO(out, rc);
444
445         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
446                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
447                mds->mds_lov_desc.ld_tgt_count);
448
449         /* idx is set as data from lov_notify. */
450         if (obd->obd_recovering)
451                 GOTO(out, rc);
452
453         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
454                 CERROR("index %d > count %d!\n", idx,
455                        mds->mds_lov_desc.ld_tgt_count);
456                 GOTO(out, rc = -EINVAL);
457         }
458
459         rc = mds_lov_get_objid(obd, idx);
460         if (rc) {
461                 CERROR("Failed to get objid - %d\n", rc);
462                 GOTO(out, rc);
463         }
464
465         page = idx / OBJID_PER_PAGE();
466         off = idx % OBJID_PER_PAGE();
467         data = mds->mds_lov_page_array[page];
468         /* We have read this lastid from disk; tell the osc.
469            Don't call this during recovery. */
470         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
471         if (rc) {
472                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
473                 /* Don't abort the rest of the sync */
474                 rc = 0;
475         }
476
477         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
478                data[off], idx, rc);
479 out:
480         mutex_up(&obd->obd_dev_sem);
481         RETURN(rc);
482 }
483
484 /* update the LOV-OSC knowledge of the last used object id's */
485 int mds_lov_connect(struct obd_device *obd, char * lov_name)
486 {
487         struct mds_obd *mds = &obd->u.mds;
488         struct lustre_handle conn = {0,};
489         struct obd_connect_data *data;
490         int rc;
491         ENTRY;
492
493         if (IS_ERR(mds->mds_osc_obd))
494                 RETURN(PTR_ERR(mds->mds_osc_obd));
495
496         if (mds->mds_osc_obd)
497                 RETURN(0);
498
499         mds->mds_osc_obd = class_name2obd(lov_name);
500         if (!mds->mds_osc_obd) {
501                 CERROR("MDS cannot locate LOV %s\n", lov_name);
502                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
503                 RETURN(-ENOTCONN);
504         }
505
506         OBD_ALLOC(data, sizeof(*data));
507         if (data == NULL)
508                 RETURN(-ENOMEM);
509         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
510                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT;
511 #ifdef HAVE_LRU_RESIZE_SUPPORT
512         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
513 #endif
514         data->ocd_version = LUSTRE_VERSION_CODE;
515         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
516         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
517         OBD_FREE(data, sizeof(*data));
518         if (rc) {
519                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
520                 mds->mds_osc_obd = ERR_PTR(rc);
521                 RETURN(rc);
522         }
523         mds->mds_osc_exp = class_conn2export(&conn);
524
525         rc = obd_register_observer(mds->mds_osc_obd, obd);
526         if (rc) {
527                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
528                        lov_name, rc);
529                 GOTO(err_discon, rc);
530         }
531
532         /* Deny new client connections until we are sure we have some OSTs */
533         obd->obd_no_conn = 1;
534
535         mutex_down(&obd->obd_dev_sem);
536         rc = mds_lov_read_objids(obd);
537         if (rc) {
538                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
539                 GOTO(err_reg, rc);
540         }
541
542         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
543         if (rc)
544                 GOTO(err_reg, rc);
545
546         /* If we're mounting this code for the first time on an existing FS,
547          * we need to populate the objids array from the real OST values */
548         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
549                 __u32 i = mds->mds_lov_objid_count;
550                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
551                         rc = mds_lov_get_objid(obd, i);
552                         if (rc != 0)
553                                 break;
554                 }
555                 if (rc == 0)
556                         rc = mds_lov_write_objids(obd);
557                 if (rc)
558                         CERROR("got last objids from OSTs, but error "
559                                 "in update objids file: %d\n", rc);
560         }
561
562         mutex_up(&obd->obd_dev_sem);
563
564         /* I want to see a callback happen when the OBD moves to a
565          * "For General Use" state, and that's when we'll call
566          * set_nextid().  The class driver can help us here, because
567          * it can use the obd_recovering flag to determine when the
568          * the OBD is full available. */
569         if (!obd->obd_recovering)
570                 rc = mds_postrecov(obd);
571         RETURN(rc);
572
573 err_reg:
574         mutex_up(&obd->obd_dev_sem);
575         obd_register_observer(mds->mds_osc_obd, NULL);
576 err_discon:
577         obd_disconnect(mds->mds_osc_exp);
578         mds->mds_osc_exp = NULL;
579         mds->mds_osc_obd = ERR_PTR(rc);
580         RETURN(rc);
581 }
582
583 int mds_lov_disconnect(struct obd_device *obd)
584 {
585         struct mds_obd *mds = &obd->u.mds;
586         int rc = 0;
587         ENTRY;
588
589         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
590                 obd_register_observer(mds->mds_osc_obd, NULL);
591
592                 /* The actual disconnect of the mds_lov will be called from
593                  * class_disconnect_exports from mds_lov_clean. So we have to
594                  * ensure that class_cleanup doesn't fail due to the extra ref
595                  * we're holding now. The mechanism to do that already exists -
596                  * the obd_force flag. We'll drop the final ref to the
597                  * mds_osc_exp in mds_cleanup. */
598                 mds->mds_osc_obd->obd_force = 1;
599         }
600
601         RETURN(rc);
602 }
603
604 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
605                   void *karg, void *uarg)
606 {
607         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
608         struct obd_device *obd = exp->exp_obd;
609         struct mds_obd *mds = &obd->u.mds;
610         struct obd_ioctl_data *data = karg;
611         struct lvfs_run_ctxt saved;
612         int rc = 0;
613
614         ENTRY;
615         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
616
617         switch (cmd) {
618         case OBD_IOC_RECORD: {
619                 char *name = data->ioc_inlbuf1;
620                 struct llog_ctxt *ctxt;
621
622                 if (mds->mds_cfg_llh)
623                         RETURN(-EBUSY);
624
625                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
626                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
627                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
628                 llog_ctxt_put(ctxt);
629                 if (rc == 0)
630                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
631                                          &cfg_uuid);
632                 else
633                         mds->mds_cfg_llh = NULL;
634                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
635
636                 RETURN(rc);
637         }
638
639         case OBD_IOC_ENDRECORD: {
640                 if (!mds->mds_cfg_llh)
641                         RETURN(-EBADF);
642
643                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
644                 rc = llog_close(mds->mds_cfg_llh);
645                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
646
647                 mds->mds_cfg_llh = NULL;
648                 RETURN(rc);
649         }
650
651         case OBD_IOC_CLEAR_LOG: {
652                 char *name = data->ioc_inlbuf1;
653                 struct llog_ctxt *ctxt;
654                 if (mds->mds_cfg_llh)
655                         RETURN(-EBUSY);
656
657                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
658                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
659                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
660                 llog_ctxt_put(ctxt);
661                 if (rc == 0) {
662                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
663                                          NULL);
664
665                         rc = llog_destroy(mds->mds_cfg_llh);
666                         llog_free_handle(mds->mds_cfg_llh);
667                 }
668                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
669
670                 mds->mds_cfg_llh = NULL;
671                 RETURN(rc);
672         }
673
674         case OBD_IOC_DORECORD: {
675                 char *cfg_buf;
676                 struct llog_rec_hdr rec;
677                 if (!mds->mds_cfg_llh)
678                         RETURN(-EBADF);
679
680                 rec.lrh_len = llog_data_len(data->ioc_plen1);
681
682                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
683                         rec.lrh_type = OBD_CFG_REC;
684                 } else {
685                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
686                         RETURN(-EINVAL);
687                 }
688
689                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
690                 if (cfg_buf == NULL)
691                         RETURN(-EINVAL);
692                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
693                 if (rc) {
694                         OBD_FREE(cfg_buf, data->ioc_plen1);
695                         RETURN(rc);
696                 }
697
698                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
699                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
700                                     cfg_buf, -1);
701                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
702
703                 OBD_FREE(cfg_buf, data->ioc_plen1);
704                 RETURN(rc);
705         }
706
707         case OBD_IOC_PARSE: {
708                 struct llog_ctxt *ctxt =
709                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
710                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
711                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
712                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
713                 llog_ctxt_put(ctxt);
714                 if (rc)
715                         RETURN(rc);
716
717                 RETURN(rc);
718         }
719
720         case OBD_IOC_DUMP_LOG: {
721                 struct llog_ctxt *ctxt =
722                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
723                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
724                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
725                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
726                 llog_ctxt_put(ctxt);
727                 if (rc)
728                         RETURN(rc);
729
730                 RETURN(rc);
731         }
732
733         case OBD_IOC_SYNC: {
734                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
735                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
736                 RETURN(rc);
737         }
738
739         case OBD_IOC_SET_READONLY: {
740                 void *handle;
741                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
742                 BDEVNAME_DECLARE_STORAGE(tmp);
743                 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
744                        obd->obd_name, ll_bdevname(obd->u.obt.obt_sb, tmp));
745
746                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
747                 if (!IS_ERR(handle))
748                         rc = fsfilt_commit(obd, inode, handle, 1);
749
750                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
751                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
752
753                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
754                 RETURN(0);
755         }
756
757         case OBD_IOC_CATLOGLIST: {
758                 int count = mds->mds_lov_desc.ld_tgt_count;
759                 rc = llog_catalog_list(obd, count, data);
760                 RETURN(rc);
761
762         }
763         case OBD_IOC_LLOG_CHECK:
764         case OBD_IOC_LLOG_CANCEL:
765         case OBD_IOC_LLOG_REMOVE: {
766                 struct llog_ctxt *ctxt =
767                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
768                 int rc2;
769
770                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
771                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
772                 rc = llog_ioctl(ctxt, cmd, data);
773                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
774                 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
775                 llog_ctxt_put(ctxt);
776                 rc2 = obd_set_info_async(mds->mds_osc_exp,
777                                          sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
778                                          0, NULL, NULL);
779                 if (!rc)
780                         rc = rc2;
781                 RETURN(rc);
782         }
783         case OBD_IOC_LLOG_INFO:
784         case OBD_IOC_LLOG_PRINT: {
785                 struct llog_ctxt *ctxt =
786                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
787
788                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
789                 rc = llog_ioctl(ctxt, cmd, data);
790                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
791                 llog_ctxt_put(ctxt);
792
793                 RETURN(rc);
794         }
795
796         case OBD_IOC_ABORT_RECOVERY:
797                 CERROR("aborting recovery for device %s\n", obd->obd_name);
798                 target_abort_recovery(obd);
799                 RETURN(0);
800
801         default:
802                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
803                 RETURN(-EINVAL);
804         }
805         RETURN(0);
806
807 }
808
809 /* Collect the preconditions we need to allow client connects */
810 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
811 {
812         if (flag & CONFIG_LOG)
813                 obd->u.mds.mds_fl_cfglog = 1;
814         if (flag & CONFIG_SYNC)
815                 obd->u.mds.mds_fl_synced = 1;
816         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
817                 /* Open for clients */
818                 obd->obd_no_conn = 0;
819 }
820
821 struct mds_lov_sync_info {
822         struct obd_device *mlsi_obd;     /* the lov device to sync */
823         struct obd_device *mlsi_watched; /* target osc */
824         __u32              mlsi_index;   /* index of target */
825 };
826
827 /* We only sync one osc at a time, so that we don't have to hold
828    any kind of lock on the whole mds_lov_desc, which may change
829    (grow) as a result of mds_lov_add_ost.  This also avoids any
830    kind of mismatch between the lov_desc and the mds_lov_desc,
831    which are not in lock-step during lov_add_obd */
832 static int __mds_lov_synchronize(void *data)
833 {
834         struct mds_lov_sync_info *mlsi = data;
835         struct obd_device *obd = mlsi->mlsi_obd;
836         struct obd_device *watched = mlsi->mlsi_watched;
837         struct mds_obd *mds = &obd->u.mds;
838         struct obd_uuid *uuid;
839         __u32  idx = mlsi->mlsi_index;
840         struct llog_ctxt *ctxt;
841         int rc = 0;
842         ENTRY;
843
844         OBD_FREE(mlsi, sizeof(*mlsi));
845
846         LASSERT(obd);
847         LASSERT(watched);
848         uuid = &watched->u.cli.cl_target_uuid;
849         LASSERT(uuid);
850
851         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
852
853         rc = mds_lov_update_mds(obd, watched, idx);
854         if (rc != 0) {
855                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
856                 GOTO(out, rc);
857         }
858
859         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
860                                 KEY_MDS_CONN, 0, uuid, NULL);
861         if (rc != 0)
862                 GOTO(out, rc);
863
864         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
865         if (!ctxt)
866               RETURN(-ENODEV);
867
868         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
869
870         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
871                           NULL, NULL, uuid);
872         llog_ctxt_put(ctxt);
873
874         if (rc != 0) {
875                 CERROR("%s failed at llog_origin_connect: %d\n",
876                        obd_uuid2str(uuid), rc);
877                 GOTO(out, rc);
878         }
879
880         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
881               obd->obd_name, obd_uuid2str(uuid));
882
883         if (obd->obd_stopping)
884                 GOTO(out, rc = -ENODEV);
885
886         rc = mds_lov_clear_orphans(mds, uuid);
887         if (rc != 0) {
888                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
889                        obd_uuid2str(uuid), rc);
890                 GOTO(out, rc);
891         }
892
893         EXIT;
894 out:
895         if (rc) {
896                 /* Deactivate it for safety */
897                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
898                        rc);
899                 if (!obd->obd_stopping && mds->mds_osc_obd &&
900                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping) 
901                         obd_notify(mds->mds_osc_obd, watched,
902                                    OBD_NOTIFY_INACTIVE, NULL);
903         } else {
904                 /* We've successfully synced at least 1 OST and are ready
905                    to handle client requests */
906                 mds_allow_cli(obd, CONFIG_SYNC);
907         }
908
909         class_decref(obd);
910         return rc;
911 }
912
913 int mds_lov_synchronize(void *data)
914 {
915         struct mds_lov_sync_info *mlsi = data;
916         char name[20];
917
918         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
919         ptlrpc_daemonize(name);
920
921         RETURN(__mds_lov_synchronize(data));
922 }
923
924 int mds_lov_start_synchronize(struct obd_device *obd,
925                               struct obd_device *watched,
926                               void *data, int nonblock)
927 {
928         struct mds_lov_sync_info *mlsi;
929         struct mds_obd *mds = &obd->u.mds;
930         int rc;
931         struct obd_uuid *uuid;
932         ENTRY;
933
934         LASSERT(watched);
935         uuid = &watched->u.cli.cl_target_uuid;
936
937         OBD_ALLOC(mlsi, sizeof(*mlsi));
938         if (mlsi == NULL)
939                 RETURN(-ENOMEM);
940
941         mlsi->mlsi_obd = obd;
942         mlsi->mlsi_watched = watched;
943         if (data)
944                 mlsi->mlsi_index = *(__u32 *)data;
945         else
946                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
947
948         /* Although class_export_get(obd->obd_self_export) would lock
949            the MDS in place, since it's only a self-export
950            it doesn't lock the LOV in place.  The LOV can be disconnected
951            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
952            Simply taking an export ref on the LOV doesn't help, because it's
953            still disconnected. Taking an obd reference insures that we don't
954            disconnect the LOV.  This of course means a cleanup won't
955            finish for as long as the sync is blocking. */
956         class_incref(obd);
957
958         if (nonblock) {
959                 /* Synchronize in the background */
960                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
961                                        CLONE_VM | CLONE_FILES);
962                 if (rc < 0) {
963                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
964                                obd->obd_name, rc);
965                         class_decref(obd);
966                 } else {
967                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
968                                "thread=%d\n", obd->obd_name,
969                                mlsi->mlsi_index, rc);
970                         rc = 0;
971                 }
972         } else {
973                 rc = __mds_lov_synchronize((void *)mlsi);
974         }
975
976         RETURN(rc);
977 }
978
979 int mds_notify(struct obd_device *obd, struct obd_device *watched,
980                enum obd_notify_event ev, void *data)
981 {
982         int rc = 0;
983         ENTRY;
984
985         switch (ev) {
986         /* We only handle these: */
987         case OBD_NOTIFY_ACTIVE:
988         case OBD_NOTIFY_SYNC:
989         case OBD_NOTIFY_SYNC_NONBLOCK:
990                 break;
991         case OBD_NOTIFY_CONFIG:
992                 mds_allow_cli(obd, (unsigned long)data);
993         default:
994                 RETURN(0);
995         }
996
997         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
998
999         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1000                 CERROR("unexpected notification of %s %s!\n",
1001                        watched->obd_type->typ_name, watched->obd_name);
1002                 RETURN(-EINVAL);
1003         }
1004
1005         if (obd->obd_recovering) {
1006                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1007                       obd->obd_name,
1008                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1009                 /* We still have to fix the lov descriptor for ost's added
1010                    after the mdt in the config log.  They didn't make it into
1011                    mds_lov_connect. */
1012                 mutex_down(&obd->obd_dev_sem);
1013                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1014                 mutex_up(&obd->obd_dev_sem);
1015                 mds_allow_cli(obd, CONFIG_SYNC);
1016                 RETURN(rc);
1017         }
1018
1019         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
1020         rc = mds_lov_start_synchronize(obd, watched, data,
1021                                        !(ev == OBD_NOTIFY_SYNC));
1022
1023         lquota_recovery(mds_quota_interface_ref, obd);
1024
1025         RETURN(rc);
1026 }
1027
1028 int mds_get_default_md(struct obd_device *obd, struct lov_mds_md *lmm,
1029                        int *size)
1030 {
1031         struct lov_desc *ldesc;
1032         ENTRY;
1033
1034         ldesc = &obd->u.mds.mds_lov_desc;
1035         LASSERT(ldesc != NULL);
1036
1037         if (!lmm)
1038                 RETURN(0);
1039
1040         lmm->lmm_magic = LOV_MAGIC_V1;
1041         lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1042         lmm->lmm_pattern = ldesc->ld_pattern;
1043         lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
1044         lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
1045         *size = sizeof(struct lov_mds_md);
1046
1047         RETURN(sizeof(struct lov_mds_md));
1048 }
1049
1050 /* Convert the on-disk LOV EA structre.
1051  * We always try to convert from an old LOV EA format to the common in-memory
1052  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1053  * then convert back to the new on-disk format and save it back to disk
1054  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1055  * to convert it each time this inode is accessed.
1056  *
1057  * This function is a bit interesting in the error handling.  We can safely
1058  * ship the old lmm to the client in case of failure, since it uses the same
1059  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1060  * reason.  We will not delete the old lmm data until we have written the
1061  * new format lmm data in fsfilt_set_md(). */
1062 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1063                        struct lov_mds_md *lmm, int lmm_size)
1064 {
1065         struct lov_stripe_md *lsm = NULL;
1066         void *handle;
1067         int rc, err;
1068         ENTRY;
1069
1070         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1071             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1072                 RETURN(0);
1073
1074         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1075                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1076                LOV_MAGIC);
1077
1078         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1079         if (rc < 0)
1080                 GOTO(conv_end, rc);
1081
1082         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1083         if (rc < 0)
1084                 GOTO(conv_free, rc);
1085         lmm_size = rc;
1086
1087         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1088         if (IS_ERR(handle)) {
1089                 rc = PTR_ERR(handle);
1090                 GOTO(conv_free, rc);
1091         }
1092
1093         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1094
1095         err = fsfilt_commit(obd, inode, handle, 0);
1096         if (!rc)
1097                 rc = err ? err : lmm_size;
1098         GOTO(conv_free, rc);
1099 conv_free:
1100         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1101 conv_end:
1102         return rc;
1103 }