Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         unsigned int i=0, j;
48
49         CDEBUG(D_INFO, "dump from %s\n", label);
50         if (mds->mds_lov_page_dirty == NULL) {
51                 CERROR("NULL bitmap!\n");
52                 GOTO(skip_bitmap, i);
53         }
54
55         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
57 skip_bitmap:
58         if (mds->mds_lov_page_array == NULL) {
59                 CERROR("not init page array!\n");
60                 GOTO(skip_array, i);
61
62         }
63         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64                 obd_id *data = mds->mds_lov_page_array[i];
65
66                 if (data == NULL)
67                         continue;
68
69                 for(j=0; j < OBJID_PER_PAGE(); j++) {
70                         if (data[j] == 0)
71                                 continue;
72                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
73                 }
74         }
75 skip_array:
76         EXIT;
77 }
78
79 int mds_lov_init_objids(struct obd_device *obd)
80 {
81         struct mds_obd *mds = &obd->u.mds;
82         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
83         struct file *file;
84         int rc;
85         ENTRY;
86
87         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
88
89         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90         if (mds->mds_lov_page_dirty == NULL)
91                 RETURN(-ENOMEM);
92
93
94         OBD_ALLOC(mds->mds_lov_page_array, size);
95         if (mds->mds_lov_page_array == NULL)
96                 GOTO(err_free_bitmap, rc = -ENOMEM);
97
98         /* open and test the lov objd file */
99         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
100         if (IS_ERR(file)) {
101                 rc = PTR_ERR(file);
102                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103                 GOTO(err_free, rc = PTR_ERR(file));
104         }
105         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107                        file->f_dentry->d_inode->i_mode);
108                 GOTO(err_open, rc = -ENOENT);
109         }
110         mds->mds_lov_objid_filp = file;
111
112         RETURN (0);
113 err_open:
114         if (filp_close((struct file *)file, 0))
115                 CERROR("can't close %s after error\n", LOV_OBJID);
116 err_free:
117         OBD_FREE(mds->mds_lov_page_array, size);
118 err_free_bitmap:
119         FREE_BITMAP(mds->mds_lov_page_dirty);
120
121         RETURN(rc);
122 }
123 EXPORT_SYMBOL(mds_lov_init_objids);
124
125 void mds_lov_destroy_objids(struct obd_device *obd)
126 {
127         struct mds_obd *mds = &obd->u.mds;
128         int i, rc;
129         ENTRY;
130
131         if (mds->mds_lov_page_array != NULL) {
132                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133                         obd_id *data = mds->mds_lov_page_array[i];
134                         if (data != NULL)
135                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
136                 }
137                 OBD_FREE(mds->mds_lov_page_array,
138                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
139         }
140
141         if (mds->mds_lov_objid_filp) {
142                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143                 mds->mds_lov_objid_filp = NULL;
144                 if (rc)
145                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
146         }
147
148         FREE_BITMAP(mds->mds_lov_page_dirty);
149         EXIT;
150 }
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
152
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
154 {
155         struct mds_obd *mds = &obd->u.mds;
156         int j;
157         ENTRY;
158
159         /* if we create file without objects - lmm is NULL */
160         if (lmm == NULL)
161                 return;
162
163         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164                 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165                 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166                 int page = i / OBJID_PER_PAGE();
167                 int idx = i % OBJID_PER_PAGE();
168                 obd_id *data = mds->mds_lov_page_array[page];
169
170                 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171                                " old %llu\n", i, id, data[idx]);
172                 if (id > data[idx]) {
173                         data[idx] = id;
174                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
175                 }
176         }
177         EXIT;
178 }
179 EXPORT_SYMBOL(mds_lov_update_objids);
180
181 static int mds_lov_read_objids(struct obd_device *obd)
182 {
183         struct mds_obd *mds = &obd->u.mds;
184         loff_t off = 0;
185         int i, rc, count = 0, page = 0;
186         size_t size;
187         ENTRY;
188
189         /* Read everything in the file, even if our current lov desc
190            has fewer targets. Old targets not in the lov descriptor
191            during mds setup may still have valid objids. */
192         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
193         if (size == 0)
194                 RETURN(0);
195
196         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197         CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
198         for(i=0; i < page; i++) {
199                 obd_id *data =  mds->mds_lov_page_array[i];
200                 loff_t off_old = off;
201
202                 LASSERT(data == NULL);
203                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
204                 if (data == NULL)
205                         GOTO(out, rc = -ENOMEM);
206
207                 mds->mds_lov_page_array[i] = data;
208
209                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
211                 if (rc < 0) {
212                         CERROR("Error reading objids %d\n", rc);
213                         GOTO(out, rc);
214                 }
215                 if (off == off_old)
216                         break; // eof
217
218                 count += (off-off_old)/sizeof(obd_id);
219         }
220         mds->mds_lov_objid_count = count;
221         if (count) {
222                 count --;
223                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
224                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
225         }
226         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
227                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
228 out:
229         mds_lov_dump_objids("read",obd);
230
231         RETURN(0);
232 }
233
234 int mds_lov_write_objids(struct obd_device *obd)
235 {
236         struct mds_obd *mds = &obd->u.mds;
237         int i, rc = 0;
238         ENTRY;
239
240         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
241                 RETURN(0);
242
243         mds_lov_dump_objids("write", obd);
244
245         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
246                 obd_id *data =  mds->mds_lov_page_array[i];
247                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
248                 loff_t off = i * size;
249
250                 LASSERT(data != NULL);
251
252                 /* check for particaly filled last page */
253                 if (i == mds->mds_lov_objid_lastpage)
254                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
255
256                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
257                                          size, &off, 0);
258                 if (rc < 0)
259                         break;
260                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
261         }
262         if (rc >= 0)
263                 rc = 0;
264
265         RETURN(rc);
266 }
267 EXPORT_SYMBOL(mds_lov_write_objids);
268
269 static int mds_lov_get_objid(struct obd_device * obd,
270                              __u32 idx)
271 {
272         struct mds_obd *mds = &obd->u.mds;
273         unsigned int page;
274         unsigned int off;
275         obd_id *data;
276         int rc = 0;
277         ENTRY;
278
279         page = idx / OBJID_PER_PAGE();
280         off = idx % OBJID_PER_PAGE();
281         data = mds->mds_lov_page_array[page];
282         if (data == NULL) {
283                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
284                 if (data == NULL)
285                         GOTO(out, rc = -ENOMEM);
286
287                 mds->mds_lov_page_array[page] = data;
288         }
289
290         if (data[off] == 0) {
291                 /* We never read this lastid; ask the osc */
292                 struct obd_id_info lastid;
293                 __u32 size = sizeof(lastid);
294
295                 lastid.idx = idx;
296                 lastid.data = &data[off];
297                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
298                                   KEY_LAST_ID, &size, &lastid);
299                 if (rc)
300                         GOTO(out, rc);
301
302                 if (idx > mds->mds_lov_objid_count) {
303                         mds->mds_lov_objid_count = idx;
304                         mds->mds_lov_objid_lastpage = page;
305                         mds->mds_lov_objid_lastidx = off;
306                 }
307                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
308         }
309 out:
310         RETURN(rc);
311 }
312
313 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
314 {
315         int rc;
316         struct obdo oa;
317         struct obd_trans_info oti = {0};
318         struct lov_stripe_md  *empty_ea = NULL;
319         ENTRY;
320
321         LASSERT(mds->mds_lov_page_array != NULL);
322
323         /* This create will in fact either create or destroy:  If the OST is
324          * missing objects below this ID, they will be created.  If it finds
325          * objects above this ID, they will be removed. */
326         memset(&oa, 0, sizeof(oa));
327         oa.o_flags = OBD_FL_DELORPHAN;
328         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
329         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
330         if (ost_uuid != NULL) {
331                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
332                 oa.o_valid |= OBD_MD_FLINLINE;
333         }
334         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
335
336         RETURN(rc);
337 }
338
339 /* for one target */
340 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
341 {
342         struct mds_obd *mds = &obd->u.mds;
343         int rc;
344         struct obd_id_info info;
345         ENTRY;
346
347         LASSERT(!obd->obd_recovering);
348
349         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
350         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
351
352         info.idx = idx;
353         info.data = id;
354         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
355                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
356         if (rc)
357                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
358                         obd->obd_name, rc);
359
360         RETURN(rc);
361 }
362
363 static __u32 mds_lov_get_idx(struct obd_export *lov,
364                              struct obd_uuid *ost_uuid)
365 {
366         int rc;
367         int valsize = sizeof(ost_uuid);
368
369         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
370                           &valsize, ost_uuid);
371         LASSERT(rc >= 0);
372
373         RETURN(rc);
374 }
375
376 /* Update the lov desc for a new size lov. */
377 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
378 {
379         struct mds_obd *mds = &obd->u.mds;
380         struct lov_desc *ld;
381         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
382         int rc = 0;
383         ENTRY;
384
385         OBD_ALLOC(ld, sizeof(*ld));
386         if (!ld)
387                 RETURN(-ENOMEM);
388
389         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
390                           &valsize, ld);
391         if (rc)
392                 GOTO(out, rc);
393
394         /* Don't change the mds_lov_desc until the objids size matches the
395            count (paranoia) */
396         mds->mds_lov_desc = *ld;
397         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
398                mds->mds_lov_desc.ld_tgt_count);
399
400         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
401                                mds->mds_lov_desc.ld_tgt_count);
402
403         mds->mds_max_mdsize = lov_mds_md_size(stripes);
404         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
405         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
406                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
407                stripes);
408
409         /* If we added a target we have to reconnect the llogs */
410         /* We only _need_ to do this at first add (idx), or the first time
411            after recovery.  However, it should now be safe to call anytime. */
412         rc = llog_cat_initialize(obd, &obd->obd_olg,
413                                  mds->mds_lov_desc.ld_tgt_count, NULL);
414
415         /*XXX this notifies the MDD until lov handling use old mds code */
416         if (obd->obd_upcall.onu_owner) {
417                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
418                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
419                                                  obd->obd_upcall.onu_owner);
420         }
421 out:
422         OBD_FREE(ld, sizeof(*ld));
423         RETURN(rc);
424 }
425
426
427 #define MDSLOV_NO_INDEX -1
428
429 /* Inform MDS about new/updated target */
430 static int mds_lov_update_mds(struct obd_device *obd,
431                               struct obd_device *watched,
432                               __u32 idx)
433 {
434         struct mds_obd *mds = &obd->u.mds;
435         int rc = 0;
436         int page;
437         int off;
438         obd_id *data;
439
440         ENTRY;
441
442         /* Don't let anyone else mess with mds_lov_objids now */
443         mutex_down(&obd->obd_dev_sem);
444
445         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
446         if (rc)
447                 GOTO(out, rc);
448
449         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
450                idx, obd->obd_recovering, obd->obd_async_recov,
451                mds->mds_lov_desc.ld_tgt_count);
452
453         /* idx is set as data from lov_notify. */
454         if (obd->obd_recovering)
455                 GOTO(out, rc);
456
457         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
458                 CERROR("index %d > count %d!\n", idx,
459                        mds->mds_lov_desc.ld_tgt_count);
460                 GOTO(out, rc = -EINVAL);
461         }
462
463         rc = mds_lov_get_objid(obd, idx);
464         if (rc)
465                 GOTO(out, rc);
466
467         page = idx / OBJID_PER_PAGE();
468         off = idx % OBJID_PER_PAGE();
469         data = mds->mds_lov_page_array[page];
470
471         /* We have read this lastid from disk; tell the osc.
472            Don't call this during recovery. */
473         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
474         if (rc) {
475                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
476                 /* Don't abort the rest of the sync */
477                 rc = 0;
478         } else {
479                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
480                         data[off], idx, rc);
481         }
482 out:
483         mutex_up(&obd->obd_dev_sem);
484         RETURN(rc);
485 }
486
487 /* update the LOV-OSC knowledge of the last used object id's */
488 int mds_lov_connect(struct obd_device *obd, char * lov_name)
489 {
490         struct mds_obd *mds = &obd->u.mds;
491         struct lustre_handle conn = {0,};
492         struct obd_connect_data *data;
493         int rc;
494         ENTRY;
495
496         if (IS_ERR(mds->mds_osc_obd))
497                 RETURN(PTR_ERR(mds->mds_osc_obd));
498
499         if (mds->mds_osc_obd)
500                 RETURN(0);
501
502         mds->mds_osc_obd = class_name2obd(lov_name);
503         if (!mds->mds_osc_obd) {
504                 CERROR("MDS cannot locate LOV %s\n", lov_name);
505                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
506                 RETURN(-ENOTCONN);
507         }
508
509         OBD_ALLOC(data, sizeof(*data));
510         if (data == NULL)
511                 RETURN(-ENOMEM);
512         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX |
513                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
514                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID;
515 #ifdef HAVE_LRU_RESIZE_SUPPORT
516         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
517 #endif
518         data->ocd_version = LUSTRE_VERSION_CODE;
519         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
520         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
521         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
522         OBD_FREE(data, sizeof(*data));
523         if (rc) {
524                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
525                 mds->mds_osc_obd = ERR_PTR(rc);
526                 RETURN(rc);
527         }
528         mds->mds_osc_exp = class_conn2export(&conn);
529
530         rc = obd_register_observer(mds->mds_osc_obd, obd);
531         if (rc) {
532                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
533                        lov_name, rc);
534                 GOTO(err_discon, rc);
535         }
536
537         /* Deny new client connections until we are sure we have some OSTs */
538         obd->obd_no_conn = 1;
539
540         mutex_down(&obd->obd_dev_sem);
541         rc = mds_lov_read_objids(obd);
542         if (rc) {
543                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
544                 GOTO(err_reg, rc);
545         }
546
547         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
548         if (rc)
549                 GOTO(err_reg, rc);
550
551         /* tgt_count may be 0! */
552         rc = llog_cat_initialize(obd, &obd->obd_olg, 
553                                  mds->mds_lov_desc.ld_tgt_count, NULL);
554         if (rc) {
555                 CERROR("failed to initialize catalog %d\n", rc);
556                 GOTO(err_reg, rc);
557         }
558
559         /* If we're mounting this code for the first time on an existing FS,
560          * we need to populate the objids array from the real OST values */
561         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
562                 __u32 i = mds->mds_lov_objid_count;
563                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
564                         rc = mds_lov_get_objid(obd, i);
565                         if (rc != 0)
566                                 break;
567                 }
568                 if (rc == 0)
569                         rc = mds_lov_write_objids(obd);
570                 if (rc)
571                         CERROR("got last objids from OSTs, but error "
572                                 "in update objids file: %d\n", rc);
573         }
574         mutex_up(&obd->obd_dev_sem);
575
576         /* I want to see a callback happen when the OBD moves to a
577          * "For General Use" state, and that's when we'll call
578          * set_nextid().  The class driver can help us here, because
579          * it can use the obd_recovering flag to determine when the
580          * the OBD is full available. */
581         /* MDD device will care about that
582         if (!obd->obd_recovering)
583                 rc = mds_postrecov(obd);
584          */
585         RETURN(rc);
586
587 err_reg:
588         mutex_up(&obd->obd_dev_sem);
589         obd_register_observer(mds->mds_osc_obd, NULL);
590 err_discon:
591         obd_disconnect(mds->mds_osc_exp);
592         mds->mds_osc_exp = NULL;
593         mds->mds_osc_obd = ERR_PTR(rc);
594         RETURN(rc);
595 }
596
597 int mds_lov_disconnect(struct obd_device *obd)
598 {
599         struct mds_obd *mds = &obd->u.mds;
600         int rc = 0;
601         ENTRY;
602
603         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
604                 obd_register_observer(mds->mds_osc_obd, NULL);
605
606                 /* The actual disconnect of the mds_lov will be called from
607                  * class_disconnect_exports from mds_lov_clean. So we have to
608                  * ensure that class_cleanup doesn't fail due to the extra ref
609                  * we're holding now. The mechanism to do that already exists -
610                  * the obd_force flag. We'll drop the final ref to the
611                  * mds_osc_exp in mds_cleanup. */
612                 mds->mds_osc_obd->obd_force = 1;
613         }
614
615         RETURN(rc);
616 }
617
618 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
619                   void *karg, void *uarg)
620 {
621         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
622         struct obd_device *obd = exp->exp_obd;
623         struct mds_obd *mds = &obd->u.mds;
624         struct obd_ioctl_data *data = karg;
625         struct lvfs_run_ctxt saved;
626         int rc = 0;
627
628         ENTRY;
629         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
630
631         switch (cmd) {
632         case OBD_IOC_RECORD: {
633                 char *name = data->ioc_inlbuf1;
634                 struct llog_ctxt *ctxt;
635
636                 if (mds->mds_cfg_llh)
637                         RETURN(-EBUSY);
638
639                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
640                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
641                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
642                 llog_ctxt_put(ctxt);
643                 if (rc == 0)
644                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
645                                          &cfg_uuid);
646                 else
647                         mds->mds_cfg_llh = NULL;
648                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
649
650                 RETURN(rc);
651         }
652
653         case OBD_IOC_ENDRECORD: {
654                 if (!mds->mds_cfg_llh)
655                         RETURN(-EBADF);
656
657                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
658                 rc = llog_close(mds->mds_cfg_llh);
659                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
660
661                 mds->mds_cfg_llh = NULL;
662                 RETURN(rc);
663         }
664
665         case OBD_IOC_CLEAR_LOG: {
666                 char *name = data->ioc_inlbuf1;
667                 struct llog_ctxt *ctxt;
668                 if (mds->mds_cfg_llh)
669                         RETURN(-EBUSY);
670
671                 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
672                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
673                 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
674                 llog_ctxt_put(ctxt);
675                 if (rc == 0) {
676                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
677                                          NULL);
678
679                         rc = llog_destroy(mds->mds_cfg_llh);
680                         llog_free_handle(mds->mds_cfg_llh);
681                 }
682                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
683
684                 mds->mds_cfg_llh = NULL;
685                 RETURN(rc);
686         }
687
688         case OBD_IOC_DORECORD: {
689                 char *cfg_buf;
690                 struct llog_rec_hdr rec;
691                 if (!mds->mds_cfg_llh)
692                         RETURN(-EBADF);
693
694                 rec.lrh_len = llog_data_len(data->ioc_plen1);
695
696                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
697                         rec.lrh_type = OBD_CFG_REC;
698                 } else {
699                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
700                         RETURN(-EINVAL);
701                 }
702
703                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
704                 if (cfg_buf == NULL)
705                         RETURN(-EINVAL);
706                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
707                 if (rc) {
708                         OBD_FREE(cfg_buf, data->ioc_plen1);
709                         RETURN(rc);
710                 }
711
712                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
713                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
714                                     cfg_buf, -1);
715                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
716
717                 OBD_FREE(cfg_buf, data->ioc_plen1);
718                 RETURN(rc);
719         }
720
721         case OBD_IOC_PARSE: {
722                 struct llog_ctxt *ctxt =
723                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
724                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
725                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
726                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
727                 llog_ctxt_put(ctxt);
728                 if (rc)
729                         RETURN(rc);
730
731                 RETURN(rc);
732         }
733
734         case OBD_IOC_DUMP_LOG: {
735                 struct llog_ctxt *ctxt =
736                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
737                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
738                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
739                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
740                 llog_ctxt_put(ctxt);
741                 if (rc)
742                         RETURN(rc);
743
744                 RETURN(rc);
745         }
746
747         case OBD_IOC_SYNC: {
748                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
749                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
750                 RETURN(rc);
751         }
752
753         case OBD_IOC_SET_READONLY: {
754                 void *handle;
755                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
756                 BDEVNAME_DECLARE_STORAGE(tmp);
757                 CERROR("*** setting device %s read-only ***\n",
758                        ll_bdevname(obd->u.obt.obt_sb, tmp));
759
760                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
761                 if (!IS_ERR(handle))
762                         rc = fsfilt_commit(obd, inode, handle, 1);
763
764                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
765                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
766
767                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
768                 RETURN(0);
769         }
770
771         case OBD_IOC_CATLOGLIST: {
772                 int count = mds->mds_lov_desc.ld_tgt_count;
773                 rc = llog_catalog_list(obd, count, data);
774                 RETURN(rc);
775
776         }
777         case OBD_IOC_LLOG_CHECK:
778         case OBD_IOC_LLOG_CANCEL:
779         case OBD_IOC_LLOG_REMOVE: {
780                 struct llog_ctxt *ctxt =
781                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
782                 int rc2;
783                 __u32 group;
784
785                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
786                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
787                 rc = llog_ioctl(ctxt, cmd, data);
788                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
789                 llog_cat_initialize(obd, &obd->obd_olg,
790                                     mds->mds_lov_desc.ld_tgt_count, NULL);
791                 group = FILTER_GROUP_MDS0 + mds->mds_id;
792                 llog_ctxt_put(ctxt);
793                 rc2 = obd_set_info_async(mds->mds_osc_exp,
794                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
795                                          sizeof(group), &group, NULL);
796                 if (!rc)
797                         rc = rc2;
798                 RETURN(rc);
799         }
800         case OBD_IOC_LLOG_INFO:
801         case OBD_IOC_LLOG_PRINT: {
802                 struct llog_ctxt *ctxt =
803                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
804
805                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
806                 rc = llog_ioctl(ctxt, cmd, data);
807                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
808                 llog_ctxt_put(ctxt);
809
810                 RETURN(rc);
811         }
812
813         case OBD_IOC_ABORT_RECOVERY:
814                 CERROR("aborting recovery for device %s\n", obd->obd_name);
815                 target_stop_recovery_thread(obd);
816                 RETURN(0);
817
818         default:
819                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
820                 RETURN(-EINVAL);
821         }
822         RETURN(0);
823
824 }
825
826 /* Collect the preconditions we need to allow client connects */
827 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
828 {
829         if (flag & CONFIG_LOG)
830                 obd->u.mds.mds_fl_cfglog = 1;
831         if (flag & CONFIG_SYNC)
832                 obd->u.mds.mds_fl_synced = 1;
833         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
834                 /* Open for clients */
835                 obd->obd_no_conn = 0;
836 }
837
838 struct mds_lov_sync_info {
839         struct obd_device *mlsi_obd;     /* the lov device to sync */
840         struct obd_device *mlsi_watched; /* target osc */
841         __u32              mlsi_index;   /* index of target */
842 };
843
844 static int mds_propagate_capa_keys(struct mds_obd *mds)
845 {
846         struct lustre_capa_key *key;
847         int i, rc = 0;
848
849         ENTRY;
850
851         if (!mds->mds_capa_keys)
852                 RETURN(0);
853
854         for (i = 0; i < 2; i++) {
855                 key = &mds->mds_capa_keys[i];
856                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
857
858                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
859                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
860                 if (rc) {
861                         DEBUG_CAPA_KEY(D_ERROR, key,
862                                        "propagate failed (rc = %d) for", rc);
863                         RETURN(rc);
864                 }
865         }
866
867         RETURN(0);
868 }
869
870 /* We only sync one osc at a time, so that we don't have to hold
871    any kind of lock on the whole mds_lov_desc, which may change
872    (grow) as a result of mds_lov_add_ost.  This also avoids any
873    kind of mismatch between the lov_desc and the mds_lov_desc,
874    which are not in lock-step during lov_add_obd */
875 static int __mds_lov_synchronize(void *data)
876 {
877         struct mds_lov_sync_info *mlsi = data;
878         struct obd_device *obd = mlsi->mlsi_obd;
879         struct obd_device *watched = mlsi->mlsi_watched;
880         struct mds_obd *mds = &obd->u.mds;
881         struct obd_uuid *uuid;
882         __u32  idx = mlsi->mlsi_index;
883         struct mds_group_info mgi;
884         struct llog_ctxt *ctxt;
885         int rc = 0;
886         ENTRY;
887
888         OBD_FREE(mlsi, sizeof(*mlsi));
889
890         LASSERT(obd);
891         LASSERT(watched);
892         uuid = &watched->u.cli.cl_target_uuid;
893         LASSERT(uuid);
894
895         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
896
897         rc = mds_lov_update_mds(obd, watched, idx);
898         if (rc != 0) {
899                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
900                 GOTO(out, rc);
901         }
902
903         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
904         mgi.uuid = uuid;
905
906         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
907                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
908         if (rc != 0)
909                 GOTO(out, rc);
910         /* propagate capability keys */
911         rc = mds_propagate_capa_keys(mds);
912         if (rc)
913                 GOTO(out, rc);
914
915         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
916         if (!ctxt) 
917                 RETURN(-ENODEV);
918
919         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
920
921         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
922                           NULL, NULL, uuid); 
923         llog_ctxt_put(ctxt);
924         if (rc != 0) {
925                 CERROR("%s failed at llog_origin_connect: %d\n",
926                        obd_uuid2str(uuid), rc);
927                 GOTO(out, rc);
928         }
929
930         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
931               obd->obd_name, obd_uuid2str(uuid));
932         /*
933          * FIXME: this obd_stopping was useless, 
934          * since obd in mdt layer was set
935          */
936         if (obd->obd_stopping)
937                 GOTO(out, rc = -ENODEV);
938
939         rc = mds_lov_clear_orphans(mds, uuid);
940         if (rc != 0) {
941                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
942                        obd_uuid2str(uuid), rc);
943                 GOTO(out, rc);
944         }
945
946         if (obd->obd_upcall.onu_owner) {
947                 /*
948                  * This is a hack for mds_notify->mdd_notify. When the mds obd
949                  * in mdd is removed, This hack should be removed.
950                  */
951                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
952                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
953                                                  obd->obd_upcall.onu_owner);
954         }
955         EXIT;
956 out:
957         if (rc) {
958                 /* Deactivate it for safety */
959                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
960                        rc);
961                 if (!obd->obd_stopping && mds->mds_osc_obd &&
962                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
963                         obd_notify(mds->mds_osc_obd, watched,
964                                    OBD_NOTIFY_INACTIVE, NULL);
965         }
966
967         class_decref(obd);
968         return rc;
969 }
970
971 int mds_lov_synchronize(void *data)
972 {
973         struct mds_lov_sync_info *mlsi = data;
974         char name[20];
975
976         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
977         ptlrpc_daemonize(name);
978
979         RETURN(__mds_lov_synchronize(data));
980 }
981
982 int mds_lov_start_synchronize(struct obd_device *obd,
983                               struct obd_device *watched,
984                               void *data, int nonblock)
985 {
986         struct mds_lov_sync_info *mlsi;
987         int rc;
988         struct mds_obd *mds = &obd->u.mds;
989         struct obd_uuid *uuid;
990         ENTRY;
991
992         LASSERT(watched);
993         uuid = &watched->u.cli.cl_target_uuid;
994
995         OBD_ALLOC(mlsi, sizeof(*mlsi));
996         if (mlsi == NULL)
997                 RETURN(-ENOMEM);
998
999         mlsi->mlsi_obd = obd;
1000         mlsi->mlsi_watched = watched;
1001         if (data)
1002                 mlsi->mlsi_index = *(__u32 *)data;
1003         else
1004                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
1005
1006         /* Although class_export_get(obd->obd_self_export) would lock
1007            the MDS in place, since it's only a self-export
1008            it doesn't lock the LOV in place.  The LOV can be disconnected
1009            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1010            Simply taking an export ref on the LOV doesn't help, because it's
1011            still disconnected. Taking an obd reference insures that we don't
1012            disconnect the LOV.  This of course means a cleanup won't
1013            finish for as long as the sync is blocking. */
1014         class_incref(obd);
1015
1016         if (nonblock) {
1017                 /* Synchronize in the background */
1018                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1019                                        CLONE_VM | CLONE_FILES);
1020                 if (rc < 0) {
1021                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1022                                obd->obd_name, rc);
1023                         class_decref(obd);
1024                 } else {
1025                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1026                                "thread=%d\n", obd->obd_name,
1027                                mlsi->mlsi_index, rc);
1028                         rc = 0;
1029                 }
1030         } else {
1031                 rc = __mds_lov_synchronize((void *)mlsi);
1032         }
1033
1034         RETURN(rc);
1035 }
1036
1037 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1038                enum obd_notify_event ev, void *data)
1039 {
1040         int rc = 0;
1041         ENTRY;
1042
1043         switch (ev) {
1044         /* We only handle these: */
1045         case OBD_NOTIFY_ACTIVE:
1046         case OBD_NOTIFY_SYNC:
1047         case OBD_NOTIFY_SYNC_NONBLOCK:
1048                 break;
1049         case OBD_NOTIFY_CONFIG:
1050                 mds_allow_cli(obd, (unsigned int)data);
1051         default:
1052                 RETURN(0);
1053         }
1054
1055         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1056         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1057                 CERROR("unexpected notification of %s %s!\n",
1058                        watched->obd_type->typ_name, watched->obd_name);
1059                 RETURN(-EINVAL);
1060         }
1061
1062         if (obd->obd_recovering) {
1063                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1064                       obd->obd_name,
1065                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1066                 /* We still have to fix the lov descriptor for ost's added
1067                    after the mdt in the config log.  They didn't make it into
1068                    mds_lov_connect. */
1069                 mutex_down(&obd->obd_dev_sem);
1070                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1071                 if (rc) {
1072                         mutex_up(&obd->obd_dev_sem);
1073                         RETURN(rc);
1074                 }
1075                 /* We should update init llog here too for replay unlink and 
1076                  * possiable llog init race when recovery complete */
1077                 llog_cat_initialize(obd, &obd->obd_olg, 
1078                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
1079                                     &watched->u.cli.cl_target_uuid);
1080                 mutex_up(&obd->obd_dev_sem);
1081                 mds_allow_cli(obd, CONFIG_SYNC);
1082                 RETURN(rc);
1083         }
1084
1085         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
1086         rc = mds_lov_start_synchronize(obd, watched, data,
1087                                        !(ev == OBD_NOTIFY_SYNC));
1088
1089         lquota_recovery(mds_quota_interface_ref, obd);
1090
1091         RETURN(rc);
1092 }
1093
1094 /* Convert the on-disk LOV EA structre.
1095  * We always try to convert from an old LOV EA format to the common in-memory
1096  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1097  * then convert back to the new on-disk format and save it back to disk
1098  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1099  * to convert it each time this inode is accessed.
1100  *
1101  * This function is a bit interesting in the error handling.  We can safely
1102  * ship the old lmm to the client in case of failure, since it uses the same
1103  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1104  * reason.  We will not delete the old lmm data until we have written the
1105  * new format lmm data in fsfilt_set_md(). */
1106 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1107                        struct lov_mds_md *lmm, int lmm_size)
1108 {
1109         struct lov_stripe_md *lsm = NULL;
1110         void *handle;
1111         int rc, err;
1112         ENTRY;
1113
1114         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1115             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1116                 RETURN(0);
1117
1118         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1119                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1120                LOV_MAGIC);
1121
1122         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1123         if (rc < 0)
1124                 GOTO(conv_end, rc);
1125
1126         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1127         if (rc < 0)
1128                 GOTO(conv_free, rc);
1129         lmm_size = rc;
1130
1131         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1132         if (IS_ERR(handle)) {
1133                 rc = PTR_ERR(handle);
1134                 GOTO(conv_free, rc);
1135         }
1136
1137         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1138
1139         err = fsfilt_commit(obd, inode, handle, 0);
1140         if (!rc)
1141                 rc = err ? err : lmm_size;
1142         GOTO(conv_free, rc);
1143 conv_free:
1144         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1145 conv_end:
1146         return rc;
1147 }