Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         unsigned int i=0, j;
48
49         CDEBUG(D_INFO, "dump from %s\n", label);
50         if (mds->mds_lov_page_dirty == NULL) {
51                 CERROR("NULL bitmap!\n");
52                 GOTO(skip_bitmap, i);
53         }
54
55         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
57 skip_bitmap:
58         if (mds->mds_lov_page_array == NULL) {
59                 CERROR("not init page array!\n");
60                 GOTO(skip_array, i);
61
62         }
63         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64                 obd_id *data = mds->mds_lov_page_array[i];
65
66                 if (data == NULL)
67                         continue;
68
69                 for(j=0; j < OBJID_PER_PAGE(); j++) {
70                         if (data[j] == 0)
71                                 continue;
72                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
73                 }
74         }
75 skip_array:
76         EXIT;
77 }
78
79 int mds_lov_init_objids(struct obd_device *obd)
80 {
81         struct mds_obd *mds = &obd->u.mds;
82         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
83         struct file *file;
84         int rc;
85         ENTRY;
86
87         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
88
89         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90         if (mds->mds_lov_page_dirty == NULL)
91                 RETURN(-ENOMEM);
92
93
94         OBD_ALLOC(mds->mds_lov_page_array, size);
95         if (mds->mds_lov_page_array == NULL)
96                 GOTO(err_free_bitmap, rc = -ENOMEM);
97
98         /* open and test the lov objd file */
99         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
100         if (IS_ERR(file)) {
101                 rc = PTR_ERR(file);
102                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103                 GOTO(err_free, rc = PTR_ERR(file));
104         }
105         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107                        file->f_dentry->d_inode->i_mode);
108                 GOTO(err_open, rc = -ENOENT);
109         }
110         mds->mds_lov_objid_filp = file;
111
112         RETURN (0);
113 err_open:
114         if (filp_close((struct file *)file, 0))
115                 CERROR("can't close %s after error\n", LOV_OBJID);
116 err_free:
117         OBD_FREE(mds->mds_lov_page_array, size);
118 err_free_bitmap:
119         FREE_BITMAP(mds->mds_lov_page_dirty);
120
121         RETURN(rc);
122 }
123 EXPORT_SYMBOL(mds_lov_init_objids);
124
125 void mds_lov_destroy_objids(struct obd_device *obd)
126 {
127         struct mds_obd *mds = &obd->u.mds;
128         int i, rc;
129         ENTRY;
130
131         if (mds->mds_lov_page_array != NULL) {
132                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133                         obd_id *data = mds->mds_lov_page_array[i];
134                         if (data != NULL)
135                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
136                 }
137                 OBD_FREE(mds->mds_lov_page_array,
138                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
139         }
140
141         if (mds->mds_lov_objid_filp) {
142                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143                 mds->mds_lov_objid_filp = NULL;
144                 if (rc)
145                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
146         }
147
148         FREE_BITMAP(mds->mds_lov_page_dirty);
149         EXIT;
150 }
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
152
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
154 {
155         struct mds_obd *mds = &obd->u.mds;
156         int j;
157         ENTRY;
158
159         /* if we create file without objects - lmm is NULL */
160         if (lmm == NULL)
161                 return;
162
163         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164                 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165                 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166                 int page = i / OBJID_PER_PAGE();
167                 int idx = i % OBJID_PER_PAGE();
168                 obd_id *data = mds->mds_lov_page_array[page];
169
170                 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171                                " old %llu\n", i, id, data[idx]);
172                 if (id > data[idx]) {
173                         data[idx] = id;
174                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
175                 }
176         }
177         EXIT;
178 }
179 EXPORT_SYMBOL(mds_lov_update_objids);
180
181 static int mds_lov_read_objids(struct obd_device *obd)
182 {
183         struct mds_obd *mds = &obd->u.mds;
184         loff_t off = 0;
185         int i, rc, count = 0, page = 0;
186         size_t size;
187         ENTRY;
188
189         /* Read everything in the file, even if our current lov desc
190            has fewer targets. Old targets not in the lov descriptor
191            during mds setup may still have valid objids. */
192         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
193         if (size == 0)
194                 RETURN(0);
195
196         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197         CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
198         for(i=0; i < page; i++) {
199                 obd_id *data =  mds->mds_lov_page_array[i];
200                 loff_t off_old = off;
201
202                 LASSERT(data == NULL);
203                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
204                 if (data == NULL)
205                         GOTO(out, rc = -ENOMEM);
206
207                 mds->mds_lov_page_array[i] = data;
208
209                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
211                 if (rc < 0) {
212                         CERROR("Error reading objids %d\n", rc);
213                         GOTO(out, rc);
214                 }
215                 if (off == off_old)
216                         break; // eof
217
218                 count += (off-off_old)/sizeof(obd_id);
219         }
220         mds->mds_lov_objid_count = count;
221         if (count) {
222                 count --;
223                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
224                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
225         }
226         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
227                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
228 out:
229         mds_lov_dump_objids("read",obd);
230
231         RETURN(0);
232 }
233
234 int mds_lov_write_objids(struct obd_device *obd)
235 {
236         struct mds_obd *mds = &obd->u.mds;
237         int i, rc = 0;
238         ENTRY;
239
240         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
241                 RETURN(0);
242
243         mds_lov_dump_objids("write", obd);
244
245         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
246                 obd_id *data =  mds->mds_lov_page_array[i];
247                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
248                 loff_t off = i * size;
249
250                 LASSERT(data != NULL);
251
252                 /* check for particaly filled last page */
253                 if (i == mds->mds_lov_objid_lastpage)
254                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
255
256                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
257                                          size, &off, 0);
258                 if (rc < 0)
259                         break;
260                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
261         }
262         if (rc >= 0)
263                 rc = 0;
264
265         RETURN(rc);
266 }
267 EXPORT_SYMBOL(mds_lov_write_objids);
268
269 static int mds_lov_get_objid(struct obd_device * obd,
270                              __u32 idx)
271 {
272         struct mds_obd *mds = &obd->u.mds;
273         unsigned int page;
274         unsigned int off;
275         obd_id *data;
276         int rc = 0;
277         ENTRY;
278
279         page = idx / OBJID_PER_PAGE();
280         off = idx % OBJID_PER_PAGE();
281         data = mds->mds_lov_page_array[page];
282         if (data == NULL) {
283                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
284                 if (data == NULL)
285                         GOTO(out, rc = -ENOMEM);
286
287                 mds->mds_lov_page_array[page] = data;
288         }
289
290         if (data[off] == 0) {
291                 /* We never read this lastid; ask the osc */
292                 struct obd_id_info lastid;
293                 __u32 size = sizeof(lastid);
294
295                 lastid.idx = idx;
296                 lastid.data = &data[off];
297                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
298                                   KEY_LAST_ID, &size, &lastid);
299                 if (rc)
300                         GOTO(out, rc);
301
302                 if (idx > mds->mds_lov_objid_count) {
303                         mds->mds_lov_objid_count = idx;
304                         mds->mds_lov_objid_lastpage = page;
305                         mds->mds_lov_objid_lastidx = off;
306                 }
307                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
308         }
309 out:
310         RETURN(rc);
311 }
312
313 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
314 {
315         int rc;
316         struct obdo oa;
317         struct obd_trans_info oti = {0};
318         struct lov_stripe_md  *empty_ea = NULL;
319         ENTRY;
320
321         LASSERT(mds->mds_lov_page_array != NULL);
322
323         /* This create will in fact either create or destroy:  If the OST is
324          * missing objects below this ID, they will be created.  If it finds
325          * objects above this ID, they will be removed. */
326         memset(&oa, 0, sizeof(oa));
327         oa.o_flags = OBD_FL_DELORPHAN;
328         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
329         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
330         if (ost_uuid != NULL) {
331                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
332                 oa.o_valid |= OBD_MD_FLINLINE;
333         }
334         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
335
336         RETURN(rc);
337 }
338
339 /* for one target */
340 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
341 {
342         struct mds_obd *mds = &obd->u.mds;
343         int rc;
344         struct obd_id_info info;
345         ENTRY;
346
347         LASSERT(!obd->obd_recovering);
348
349         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
350         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
351
352         info.idx = idx;
353         info.data = id;
354         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
355                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
356         if (rc)
357                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
358                         obd->obd_name, rc);
359
360         RETURN(rc);
361 }
362
363 static __u32 mds_lov_get_idx(struct obd_export *lov,
364                              struct obd_uuid *ost_uuid)
365 {
366         int rc;
367         int valsize = sizeof(ost_uuid);
368
369         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
370                           &valsize, ost_uuid);
371         LASSERT(rc >= 0);
372
373         RETURN(rc);
374 }
375
376 /* Update the lov desc for a new size lov. */
377 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
378 {
379         struct mds_obd *mds = &obd->u.mds;
380         struct lov_desc *ld;
381         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
382         int rc = 0;
383         ENTRY;
384
385         OBD_ALLOC(ld, sizeof(*ld));
386         if (!ld)
387                 RETURN(-ENOMEM);
388
389         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
390                           &valsize, ld);
391         if (rc)
392                 GOTO(out, rc);
393
394         /* Don't change the mds_lov_desc until the objids size matches the
395            count (paranoia) */
396         mds->mds_lov_desc = *ld;
397         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
398                mds->mds_lov_desc.ld_tgt_count);
399
400         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
401                                mds->mds_lov_desc.ld_tgt_count);
402
403         mds->mds_max_mdsize = lov_mds_md_size(stripes);
404         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
405         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
406                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
407                stripes);
408
409         /* If we added a target we have to reconnect the llogs */
410         /* We only _need_ to do this at first add (idx), or the first time
411            after recovery.  However, it should now be safe to call anytime. */
412         rc = llog_cat_initialize(obd, &obd->obd_olg,
413                                  mds->mds_lov_desc.ld_tgt_count, NULL);
414
415         /*XXX this notifies the MDD until lov handling use old mds code */
416         if (obd->obd_upcall.onu_owner) {
417                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
418                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
419                                                  obd->obd_upcall.onu_owner);
420         }
421 out:
422         OBD_FREE(ld, sizeof(*ld));
423         RETURN(rc);
424 }
425
426
427 #define MDSLOV_NO_INDEX -1
428
429 /* Inform MDS about new/updated target */
430 static int mds_lov_update_mds(struct obd_device *obd,
431                               struct obd_device *watched,
432                               __u32 idx)
433 {
434         struct mds_obd *mds = &obd->u.mds;
435         int rc = 0;
436         int page;
437         int off;
438         obd_id *data;
439
440         ENTRY;
441
442         /* Don't let anyone else mess with mds_lov_objids now */
443         mutex_down(&obd->obd_dev_sem);
444
445         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
446         if (rc)
447                 GOTO(out, rc);
448
449         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
450                idx, obd->obd_recovering, obd->obd_async_recov,
451                mds->mds_lov_desc.ld_tgt_count);
452
453         /* idx is set as data from lov_notify. */
454         if (obd->obd_recovering)
455                 GOTO(out, rc);
456
457         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
458                 CERROR("index %d > count %d!\n", idx,
459                        mds->mds_lov_desc.ld_tgt_count);
460                 GOTO(out, rc = -EINVAL);
461         }
462
463         rc = mds_lov_get_objid(obd, idx);
464         if (rc)
465                 GOTO(out, rc);
466
467         page = idx / OBJID_PER_PAGE();
468         off = idx % OBJID_PER_PAGE();
469         data = mds->mds_lov_page_array[page];
470
471         /* We have read this lastid from disk; tell the osc.
472            Don't call this during recovery. */
473         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
474         if (rc) {
475                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
476                 /* Don't abort the rest of the sync */
477                 rc = 0;
478         } else {
479                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
480                         data[off], idx, rc);
481         }
482 out:
483         mutex_up(&obd->obd_dev_sem);
484         RETURN(rc);
485 }
486
487 /* update the LOV-OSC knowledge of the last used object id's */
488 int mds_lov_connect(struct obd_device *obd, char * lov_name)
489 {
490         struct mds_obd *mds = &obd->u.mds;
491         struct lustre_handle conn = {0,};
492         struct obd_connect_data *data;
493         int rc;
494         ENTRY;
495
496         if (IS_ERR(mds->mds_osc_obd))
497                 RETURN(PTR_ERR(mds->mds_osc_obd));
498
499         if (mds->mds_osc_obd)
500                 RETURN(0);
501
502         mds->mds_osc_obd = class_name2obd(lov_name);
503         if (!mds->mds_osc_obd) {
504                 CERROR("MDS cannot locate LOV %s\n", lov_name);
505                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
506                 RETURN(-ENOTCONN);
507         }
508
509         OBD_ALLOC(data, sizeof(*data));
510         if (data == NULL)
511                 RETURN(-ENOMEM);
512         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX |
513                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
514                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID;
515 #ifdef HAVE_LRU_RESIZE_SUPPORT
516         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
517 #endif
518         data->ocd_version = LUSTRE_VERSION_CODE;
519         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
520         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
521         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
522         OBD_FREE(data, sizeof(*data));
523         if (rc) {
524                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
525                 mds->mds_osc_obd = ERR_PTR(rc);
526                 RETURN(rc);
527         }
528         mds->mds_osc_exp = class_conn2export(&conn);
529
530         rc = obd_register_observer(mds->mds_osc_obd, obd);
531         if (rc) {
532                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
533                        lov_name, rc);
534                 GOTO(err_discon, rc);
535         }
536
537         /* Deny new client connections until we are sure we have some OSTs */
538         obd->obd_no_conn = 1;
539
540         mutex_down(&obd->obd_dev_sem);
541         rc = mds_lov_read_objids(obd);
542         if (rc) {
543                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
544                 GOTO(err_reg, rc);
545         }
546
547         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
548         if (rc)
549                 GOTO(err_reg, rc);
550
551         /* tgt_count may be 0! */
552         rc = llog_cat_initialize(obd, &obd->obd_olg, 
553                                  mds->mds_lov_desc.ld_tgt_count, NULL);
554         if (rc) {
555                 CERROR("failed to initialize catalog %d\n", rc);
556                 GOTO(err_reg, rc);
557         }
558
559         /* If we're mounting this code for the first time on an existing FS,
560          * we need to populate the objids array from the real OST values */
561         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
562                 __u32 i = mds->mds_lov_objid_count;
563                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
564                         rc = mds_lov_get_objid(obd, i);
565                         if (rc != 0)
566                                 break;
567                 }
568                 if (rc == 0)
569                         rc = mds_lov_write_objids(obd);
570                 if (rc)
571                         CERROR("got last objids from OSTs, but error "
572                                 "in update objids file: %d\n", rc);
573         }
574         mutex_up(&obd->obd_dev_sem);
575
576         /* I want to see a callback happen when the OBD moves to a
577          * "For General Use" state, and that's when we'll call
578          * set_nextid().  The class driver can help us here, because
579          * it can use the obd_recovering flag to determine when the
580          * the OBD is full available. */
581         /* MDD device will care about that
582         if (!obd->obd_recovering)
583                 rc = mds_postrecov(obd);
584          */
585         RETURN(rc);
586
587 err_reg:
588         mutex_up(&obd->obd_dev_sem);
589         obd_register_observer(mds->mds_osc_obd, NULL);
590 err_discon:
591         obd_disconnect(mds->mds_osc_exp);
592         mds->mds_osc_exp = NULL;
593         mds->mds_osc_obd = ERR_PTR(rc);
594         RETURN(rc);
595 }
596
597 int mds_lov_disconnect(struct obd_device *obd)
598 {
599         struct mds_obd *mds = &obd->u.mds;
600         int rc = 0;
601         ENTRY;
602
603         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
604                 obd_register_observer(mds->mds_osc_obd, NULL);
605
606                 /* The actual disconnect of the mds_lov will be called from
607                  * class_disconnect_exports from mds_lov_clean. So we have to
608                  * ensure that class_cleanup doesn't fail due to the extra ref
609                  * we're holding now. The mechanism to do that already exists -
610                  * the obd_force flag. We'll drop the final ref to the
611                  * mds_osc_exp in mds_cleanup. */
612                 mds->mds_osc_obd->obd_force = 1;
613         }
614
615         RETURN(rc);
616 }
617
618 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
619                   void *karg, void *uarg)
620 {
621         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
622         struct obd_device *obd = exp->exp_obd;
623         struct mds_obd *mds = &obd->u.mds;
624         struct obd_ioctl_data *data = karg;
625         struct lvfs_run_ctxt saved;
626         int rc = 0;
627
628         ENTRY;
629         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
630
631         switch (cmd) {
632         case OBD_IOC_RECORD: {
633                 char *name = data->ioc_inlbuf1;
634                 struct llog_ctxt *ctxt;
635
636                 if (mds->mds_cfg_llh)
637                         RETURN(-EBUSY);
638
639                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
640                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
641                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
642                 llog_ctxt_put(ctxt);
643                 if (rc == 0)
644                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
645                                          &cfg_uuid);
646                 else
647                         mds->mds_cfg_llh = NULL;
648                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
649
650                 RETURN(rc);
651         }
652
653         case OBD_IOC_ENDRECORD: {
654                 if (!mds->mds_cfg_llh)
655                         RETURN(-EBADF);
656
657                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
658                 rc = llog_close(mds->mds_cfg_llh);
659                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
660
661                 mds->mds_cfg_llh = NULL;
662                 RETURN(rc);
663         }
664
665         case OBD_IOC_CLEAR_LOG: {
666                 char *name = data->ioc_inlbuf1;
667                 struct llog_ctxt *ctxt;
668                 if (mds->mds_cfg_llh)
669                         RETURN(-EBUSY);
670
671                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
672                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
673                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
674                 llog_ctxt_put(ctxt);
675                 if (rc == 0) {
676                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
677                                          NULL);
678
679                         rc = llog_destroy(mds->mds_cfg_llh);
680                         llog_free_handle(mds->mds_cfg_llh);
681                 }
682                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
683
684                 mds->mds_cfg_llh = NULL;
685                 RETURN(rc);
686         }
687
688         case OBD_IOC_DORECORD: {
689                 char *cfg_buf;
690                 struct llog_rec_hdr rec;
691                 if (!mds->mds_cfg_llh)
692                         RETURN(-EBADF);
693
694                 rec.lrh_len = llog_data_len(data->ioc_plen1);
695
696                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
697                         rec.lrh_type = OBD_CFG_REC;
698                 } else {
699                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
700                         RETURN(-EINVAL);
701                 }
702
703                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
704                 if (cfg_buf == NULL)
705                         RETURN(-EINVAL);
706                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
707                 if (rc) {
708                         OBD_FREE(cfg_buf, data->ioc_plen1);
709                         RETURN(rc);
710                 }
711
712                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
713                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
714                                     cfg_buf, -1);
715                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
716
717                 OBD_FREE(cfg_buf, data->ioc_plen1);
718                 RETURN(rc);
719         }
720
721         case OBD_IOC_PARSE: {
722                 struct llog_ctxt *ctxt =
723                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
724                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
725                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
726                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
727                 llog_ctxt_put(ctxt);
728                 if (rc)
729                         RETURN(rc);
730
731                 RETURN(rc);
732         }
733
734         case OBD_IOC_DUMP_LOG: {
735                 struct llog_ctxt *ctxt =
736                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
737                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
738                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
739                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
740                 llog_ctxt_put(ctxt);
741                 if (rc)
742                         RETURN(rc);
743
744                 RETURN(rc);
745         }
746
747         case OBD_IOC_SYNC: {
748                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
749                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
750                 RETURN(rc);
751         }
752
753         case OBD_IOC_SET_READONLY: {
754                 void *handle;
755                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
756                 BDEVNAME_DECLARE_STORAGE(tmp);
757                 CERROR("*** setting device %s read-only ***\n",
758                        ll_bdevname(obd->u.obt.obt_sb, tmp));
759
760                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
761                 if (!IS_ERR(handle))
762                         rc = fsfilt_commit(obd, inode, handle, 1);
763
764                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
765                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
766
767                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
768                 RETURN(0);
769         }
770
771         case OBD_IOC_CATLOGLIST: {
772                 int count = mds->mds_lov_desc.ld_tgt_count;
773                 rc = llog_catalog_list(obd, count, data);
774                 RETURN(rc);
775
776         }
777         case OBD_IOC_LLOG_CHECK:
778         case OBD_IOC_LLOG_CANCEL:
779         case OBD_IOC_LLOG_REMOVE: {
780                 struct llog_ctxt *ctxt =
781                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
782                 int rc2;
783                 __u32 group;
784
785                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
786                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
787                 rc = llog_ioctl(ctxt, cmd, data);
788                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
789                 llog_cat_initialize(obd, &obd->obd_olg,
790                                     mds->mds_lov_desc.ld_tgt_count, NULL);
791                 group = FILTER_GROUP_MDS0 + mds->mds_id;
792                 llog_ctxt_put(ctxt);
793                 rc2 = obd_set_info_async(mds->mds_osc_exp,
794                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
795                                          sizeof(group), &group, NULL);
796                 if (!rc)
797                         rc = rc2;
798                 RETURN(rc);
799         }
800         case OBD_IOC_LLOG_INFO:
801         case OBD_IOC_LLOG_PRINT: {
802                 struct llog_ctxt *ctxt =
803                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
804
805                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
806                 rc = llog_ioctl(ctxt, cmd, data);
807                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
808                 llog_ctxt_put(ctxt);
809
810                 RETURN(rc);
811         }
812
813         case OBD_IOC_ABORT_RECOVERY:
814                 CERROR("aborting recovery for device %s\n", obd->obd_name);
815                 target_stop_recovery_thread(obd);
816                 RETURN(0);
817
818         default:
819                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
820                 RETURN(-EINVAL);
821         }
822         RETURN(0);
823
824 }
825
826 /* Collect the preconditions we need to allow client connects */
827 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
828 {
829         if (flag & CONFIG_LOG)
830                 obd->u.mds.mds_fl_cfglog = 1;
831         if (flag & CONFIG_SYNC)
832                 obd->u.mds.mds_fl_synced = 1;
833         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
834                 /* Open for clients */
835                 obd->obd_no_conn = 0;
836 }
837
838 struct mds_lov_sync_info {
839         struct obd_device *mlsi_obd;     /* the lov device to sync */
840         struct obd_device *mlsi_watched; /* target osc */
841         __u32              mlsi_index;   /* index of target */
842 };
843
844 static int mds_propagate_capa_keys(struct mds_obd *mds)
845 {
846         struct lustre_capa_key *key;
847         int i, rc = 0;
848
849         ENTRY;
850
851         if (!mds->mds_capa_keys)
852                 RETURN(0);
853
854         for (i = 0; i < 2; i++) {
855                 key = &mds->mds_capa_keys[i];
856                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
857
858                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
859                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
860                 if (rc) {
861                         DEBUG_CAPA_KEY(D_ERROR, key,
862                                        "propagate failed (rc = %d) for", rc);
863                         RETURN(rc);
864                 }
865         }
866
867         RETURN(0);
868 }
869
870 /* We only sync one osc at a time, so that we don't have to hold
871    any kind of lock on the whole mds_lov_desc, which may change
872    (grow) as a result of mds_lov_add_ost.  This also avoids any
873    kind of mismatch between the lov_desc and the mds_lov_desc,
874    which are not in lock-step during lov_add_obd */
875 static int __mds_lov_synchronize(void *data)
876 {
877         struct mds_lov_sync_info *mlsi = data;
878         struct obd_device *obd = mlsi->mlsi_obd;
879         struct obd_device *watched = mlsi->mlsi_watched;
880         struct mds_obd *mds = &obd->u.mds;
881         struct obd_uuid *uuid;
882         __u32  idx = mlsi->mlsi_index;
883         struct mds_group_info mgi;
884         struct llog_ctxt *ctxt;
885         int rc = 0;
886         ENTRY;
887
888         OBD_FREE(mlsi, sizeof(*mlsi));
889
890         LASSERT(obd);
891         LASSERT(watched);
892         uuid = &watched->u.cli.cl_target_uuid;
893         LASSERT(uuid);
894
895         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
896
897         rc = mds_lov_update_mds(obd, watched, idx);
898         if (rc != 0) {
899                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
900                 GOTO(out, rc);
901         }
902
903         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
904         mgi.uuid = uuid;
905
906         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
907                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
908
909         /* propagate capability keys */
910         rc = mds_propagate_capa_keys(mds);
911         if (rc)
912                 GOTO(out, rc);
913
914         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
915         if (!ctxt) 
916                 RETURN(-ENODEV);
917
918         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
919
920         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
921                           NULL, NULL, uuid); 
922         llog_ctxt_put(ctxt);
923         if (rc != 0) {
924                 CERROR("%s failed at llog_origin_connect: %d\n",
925                        obd_uuid2str(uuid), rc);
926                 GOTO(out, rc);
927         }
928
929         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
930               obd->obd_name, obd_uuid2str(uuid));
931         /*
932          * FIXME: this obd_stopping was useless, 
933          * since obd in mdt layer was set
934          */
935         if (obd->obd_stopping)
936                 GOTO(out, rc = -ENODEV);
937
938         rc = mds_lov_clear_orphans(mds, uuid);
939         if (rc != 0) {
940                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
941                        obd_uuid2str(uuid), rc);
942                 GOTO(out, rc);
943         }
944
945         if (obd->obd_upcall.onu_owner) {
946                 /*
947                  * This is a hack for mds_notify->mdd_notify. When the mds obd
948                  * in mdd is removed, This hack should be removed.
949                  */
950                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
951                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
952                                                  obd->obd_upcall.onu_owner);
953         }
954         EXIT;
955 out:
956         if (rc) {
957                 /* Deactivate it for safety */
958                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
959                        rc);
960                 if (!obd->obd_stopping && mds->mds_osc_obd &&
961                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
962                         obd_notify(mds->mds_osc_obd, watched,
963                                    OBD_NOTIFY_INACTIVE, NULL);
964         }
965
966         class_decref(obd);
967         return rc;
968 }
969
970 int mds_lov_synchronize(void *data)
971 {
972         struct mds_lov_sync_info *mlsi = data;
973         char name[20];
974
975         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
976         ptlrpc_daemonize(name);
977
978         RETURN(__mds_lov_synchronize(data));
979 }
980
981 int mds_lov_start_synchronize(struct obd_device *obd,
982                               struct obd_device *watched,
983                               void *data, int nonblock)
984 {
985         struct mds_lov_sync_info *mlsi;
986         int rc;
987         struct mds_obd *mds = &obd->u.mds;
988         struct obd_uuid *uuid;
989         ENTRY;
990
991         LASSERT(watched);
992         uuid = &watched->u.cli.cl_target_uuid;
993
994         OBD_ALLOC(mlsi, sizeof(*mlsi));
995         if (mlsi == NULL)
996                 RETURN(-ENOMEM);
997
998         mlsi->mlsi_obd = obd;
999         mlsi->mlsi_watched = watched;
1000         if (data)
1001                 mlsi->mlsi_index = *(__u32 *)data;
1002         else
1003                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
1004
1005         /* Although class_export_get(obd->obd_self_export) would lock
1006            the MDS in place, since it's only a self-export
1007            it doesn't lock the LOV in place.  The LOV can be disconnected
1008            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1009            Simply taking an export ref on the LOV doesn't help, because it's
1010            still disconnected. Taking an obd reference insures that we don't
1011            disconnect the LOV.  This of course means a cleanup won't
1012            finish for as long as the sync is blocking. */
1013         class_incref(obd);
1014
1015         if (nonblock) {
1016                 /* Synchronize in the background */
1017                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1018                                        CLONE_VM | CLONE_FILES);
1019                 if (rc < 0) {
1020                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1021                                obd->obd_name, rc);
1022                         class_decref(obd);
1023                 } else {
1024                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1025                                "thread=%d\n", obd->obd_name,
1026                                mlsi->mlsi_index, rc);
1027                         rc = 0;
1028                 }
1029         } else {
1030                 rc = __mds_lov_synchronize((void *)mlsi);
1031         }
1032
1033         RETURN(rc);
1034 }
1035
1036 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1037                enum obd_notify_event ev, void *data)
1038 {
1039         int rc = 0;
1040         ENTRY;
1041
1042         switch (ev) {
1043         /* We only handle these: */
1044         case OBD_NOTIFY_ACTIVE:
1045         case OBD_NOTIFY_SYNC:
1046         case OBD_NOTIFY_SYNC_NONBLOCK:
1047                 break;
1048         case OBD_NOTIFY_CONFIG:
1049                 mds_allow_cli(obd, (unsigned int)data);
1050         default:
1051                 RETURN(0);
1052         }
1053
1054         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1055         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1056                 CERROR("unexpected notification of %s %s!\n",
1057                        watched->obd_type->typ_name, watched->obd_name);
1058                 RETURN(-EINVAL);
1059         }
1060
1061         if (obd->obd_recovering) {
1062                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1063                       obd->obd_name,
1064                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1065                 /* We still have to fix the lov descriptor for ost's added
1066                    after the mdt in the config log.  They didn't make it into
1067                    mds_lov_connect. */
1068                 mutex_down(&obd->obd_dev_sem);
1069                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1070                 if (rc) {
1071                         mutex_up(&obd->obd_dev_sem);
1072                         RETURN(rc);
1073                 }
1074                 /* We should update init llog here too for replay unlink and 
1075                  * possiable llog init race when recovery complete */
1076                 llog_cat_initialize(obd, &obd->obd_olg, 
1077                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
1078                                     &watched->u.cli.cl_target_uuid);
1079                 mutex_up(&obd->obd_dev_sem);
1080                 mds_allow_cli(obd, CONFIG_SYNC);
1081                 RETURN(rc);
1082         }
1083
1084         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
1085         rc = mds_lov_start_synchronize(obd, watched, data,
1086                                        !(ev == OBD_NOTIFY_SYNC));
1087
1088         lquota_recovery(mds_quota_interface_ref, obd);
1089
1090         RETURN(rc);
1091 }
1092
1093 /* Convert the on-disk LOV EA structre.
1094  * We always try to convert from an old LOV EA format to the common in-memory
1095  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1096  * then convert back to the new on-disk format and save it back to disk
1097  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1098  * to convert it each time this inode is accessed.
1099  *
1100  * This function is a bit interesting in the error handling.  We can safely
1101  * ship the old lmm to the client in case of failure, since it uses the same
1102  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1103  * reason.  We will not delete the old lmm data until we have written the
1104  * new format lmm data in fsfilt_set_md(). */
1105 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1106                        struct lov_mds_md *lmm, int lmm_size)
1107 {
1108         struct lov_stripe_md *lsm = NULL;
1109         void *handle;
1110         int rc, err;
1111         ENTRY;
1112
1113         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1114             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1115                 RETURN(0);
1116
1117         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1118                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1119                LOV_MAGIC);
1120
1121         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1122         if (rc < 0)
1123                 GOTO(conv_end, rc);
1124
1125         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1126         if (rc < 0)
1127                 GOTO(conv_free, rc);
1128         lmm_size = rc;
1129
1130         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1131         if (IS_ERR(handle)) {
1132                 rc = PTR_ERR(handle);
1133                 GOTO(conv_free, rc);
1134         }
1135
1136         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1137
1138         err = fsfilt_commit(obd, inode, handle, 0);
1139         if (!rc)
1140                 rc = err ? err : lmm_size;
1141         GOTO(conv_free, rc);
1142 conv_free:
1143         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1144 conv_end:
1145         return rc;
1146 }