Whamcloud - gitweb
cleanup usage obd_set_info_async, obd_get_info.
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/module.h>
32 #include <lustre_mds.h>
33 #include <lustre/lustre_idl.h>
34 #include <obd_class.h>
35 #include <obd_lov.h>
36 #include <lustre_lib.h>
37 #include <lustre_fsfilt.h>
38
39 #include "mds_internal.h"
40
41 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
42 {
43         struct mds_obd *mds = &obd->u.mds;
44         unsigned int i=0, j;
45
46         CDEBUG(D_INFO, "dump from %s\n", label);
47         if (mds->mds_lov_page_dirty == NULL) {
48                 CERROR("NULL bitmap!\n");
49                 GOTO(skip_bitmap, i);
50         }
51
52         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
53                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
54 skip_bitmap:
55         if (mds->mds_lov_page_array == NULL) {
56                 CERROR("not init page array!\n");
57                 GOTO(skip_array, i);
58
59         }
60         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
61                 obd_id *data = mds->mds_lov_page_array[i];
62
63                 if (data == NULL)
64                         continue;
65
66                 for(j=0; j < OBJID_PER_PAGE(); j++) {
67                         if (data[j] == 0)
68                                 continue;
69                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
70                 }
71         }
72 skip_array:
73         EXIT;
74 }
75
76 int mds_lov_init_objids(struct obd_device *obd)
77 {
78         struct mds_obd *mds = &obd->u.mds;
79         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
80         struct file *file;
81         int rc;
82         ENTRY;
83
84         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
85
86         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
87         if (mds->mds_lov_page_dirty == NULL)
88                 RETURN(-ENOMEM);
89
90
91         OBD_ALLOC(mds->mds_lov_page_array, size);
92         if (mds->mds_lov_page_array == NULL)
93                 GOTO(err_free_bitmap, rc = -ENOMEM);
94
95         /* open and test the lov objd file */
96         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
97         if (IS_ERR(file)) {
98                 rc = PTR_ERR(file);
99                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
100                 GOTO(err_free, rc = PTR_ERR(file));
101         }
102         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
103                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
104                        file->f_dentry->d_inode->i_mode);
105                 GOTO(err_open, rc = -ENOENT);
106         }
107         mds->mds_lov_objid_filp = file;
108
109         RETURN (0);
110 err_open:
111         if (filp_close((struct file *)file, 0))
112                 CERROR("can't close %s after error\n", LOV_OBJID);
113 err_free:
114         OBD_FREE(mds->mds_lov_page_array, size);
115 err_free_bitmap:
116         FREE_BITMAP(mds->mds_lov_page_dirty);
117
118         RETURN(rc);
119 }
120
121 void mds_lov_destroy_objids(struct obd_device *obd)
122 {
123         struct mds_obd *mds = &obd->u.mds;
124         int i, rc;
125         ENTRY;
126
127         if (mds->mds_lov_page_array != NULL) {
128                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
129                         obd_id *data = mds->mds_lov_page_array[i];
130                         if (data != NULL)
131                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
132                 }
133                 OBD_FREE(mds->mds_lov_page_array,
134                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
135         }
136
137         if (mds->mds_lov_objid_filp) {
138                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
139                 mds->mds_lov_objid_filp = NULL;
140                 if (rc)
141                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
142         }
143
144         FREE_BITMAP(mds->mds_lov_page_dirty);
145         EXIT;
146 }
147
148 static int mds_lov_read_objids(struct obd_device *obd)
149 {
150         struct mds_obd *mds = &obd->u.mds;
151         loff_t off = 0;
152         int i, rc, count = 0, page = 0;
153         size_t size;
154         ENTRY;
155
156         /* Read everything in the file, even if our current lov desc
157            has fewer targets. Old targets not in the lov descriptor
158            during mds setup may still have valid objids. */
159         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
160         if (size == 0)
161                 RETURN(0);
162
163         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
164         CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
165         for(i=0; i < page; i++) {
166                 obd_id *data =  mds->mds_lov_page_array[i];
167                 loff_t off_old = off;
168
169                 LASSERT(data == NULL);
170                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
171                 if (data == NULL)
172                         GOTO(out, rc = -ENOMEM);
173
174                 mds->mds_lov_page_array[i] = data;
175
176                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
177                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
178                 if (rc < 0) {
179                         CERROR("Error reading objids %d\n", rc);
180                         GOTO(out, rc);
181                 }
182                 if (off == off_old)
183                         break; // eof
184
185                 count += (off-off_old)/sizeof(obd_id);
186         }
187         mds->mds_lov_objid_count = count;
188         if (count) {
189                 count --;
190                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
191                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
192         }
193         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
194                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
195 out:
196         mds_lov_dump_objids("read",obd);
197
198         RETURN(0);
199 }
200
201 int mds_lov_write_objids(struct obd_device *obd)
202 {
203         struct mds_obd *mds = &obd->u.mds;
204         int i, rc = 0;
205         ENTRY;
206
207         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
208                 RETURN(0);
209
210         mds_lov_dump_objids("write", obd);
211
212         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
213                 obd_id *data =  mds->mds_lov_page_array[i];
214                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
215                 loff_t off = i * size;
216
217                 LASSERT(data != NULL);
218
219                 /* check for particaly filled last page */
220                 if (i == mds->mds_lov_objid_lastpage)
221                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
222
223                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
224                                          size, &off, 0);
225                 if (rc < 0)
226                         break;
227                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
228         }
229         if (rc >= 0)
230                 rc = 0;
231
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(mds_lov_write_objids);
235
236 static int mds_lov_get_objid(struct obd_device * obd,
237                              __u32 idx)
238 {
239         struct mds_obd *mds = &obd->u.mds;
240         unsigned int page;
241         unsigned int off;
242         obd_id *data;
243         int rc = 0;
244         ENTRY;
245
246         page = idx / OBJID_PER_PAGE();
247         off = idx % OBJID_PER_PAGE();
248         data = mds->mds_lov_page_array[page];
249         if (data == NULL) {
250                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
251                 if (data == NULL)
252                         GOTO(out, rc = -ENOMEM);
253
254                 mds->mds_lov_page_array[page] = data;
255         }
256
257         if (data[off] == 0) {
258                 /* We never read this lastid; ask the osc */
259                 struct obd_id_info lastid;
260                 __u32 size = sizeof(lastid);
261
262                 lastid.idx = idx;
263                 lastid.data = &data[off];
264                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
265                                   KEY_LAST_ID, &size, &lastid);
266                 if (rc)
267                         GOTO(out, rc);
268
269                 if (idx > mds->mds_lov_objid_count) {
270                         mds->mds_lov_objid_count = idx;
271                         mds->mds_lov_objid_lastpage = page;
272                         mds->mds_lov_objid_lastidx = off;
273                 }
274                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
275         }
276 out:
277         RETURN(rc);
278 }
279
280 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
281 {
282         int rc;
283         struct obdo oa;
284         struct obd_trans_info oti = {0};
285         struct lov_stripe_md  *empty_ea = NULL;
286         ENTRY;
287
288         LASSERT(mds->mds_lov_page_array != NULL);
289
290         /* This create will in fact either create or destroy:  If the OST is
291          * missing objects below this ID, they will be created.  If it finds
292          * objects above this ID, they will be removed. */
293         memset(&oa, 0, sizeof(oa));
294         oa.o_flags = OBD_FL_DELORPHAN;
295         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
296         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
297         if (ost_uuid != NULL) {
298                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
299                 oa.o_valid |= OBD_MD_FLINLINE;
300         }
301         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
302
303         RETURN(rc);
304 }
305
306 /* for one target */
307 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
308 {
309         struct mds_obd *mds = &obd->u.mds;
310         int rc;
311         struct obd_id_info info;
312         ENTRY;
313
314         LASSERT(!obd->obd_recovering);
315
316         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
317         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
318
319         info.idx = idx;
320         info.data = id;
321         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
322                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
323         if (rc)
324                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
325                         obd->obd_name, rc);
326
327         RETURN(rc);
328 }
329
330 static __u32 mds_lov_get_idx(struct obd_export *lov,
331                              struct obd_uuid *ost_uuid)
332 {
333         int rc;
334         int valsize = sizeof(ost_uuid);
335
336         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
337                           &valsize, ost_uuid);
338         LASSERT(rc >= 0);
339
340         RETURN(rc);
341 }
342
343 /* Update the lov desc for a new size lov. */
344 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
345 {
346         struct mds_obd *mds = &obd->u.mds;
347         struct lov_desc *ld;
348         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
349         int rc = 0;
350         ENTRY;
351
352         OBD_ALLOC(ld, sizeof(*ld));
353         if (!ld)
354                 RETURN(-ENOMEM);
355
356         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
357                           &valsize, ld);
358         if (rc)
359                 GOTO(out, rc);
360
361         /* Don't change the mds_lov_desc until the objids size matches the
362            count (paranoia) */
363         mds->mds_lov_desc = *ld;
364         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
365                mds->mds_lov_desc.ld_tgt_count);
366
367         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
368                                mds->mds_lov_desc.ld_tgt_count);
369
370         mds->mds_max_mdsize = lov_mds_md_size(stripes);
371         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
372         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
373                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
374                stripes);
375
376         /* If we added a target we have to reconnect the llogs */
377         /* We only _need_ to do this at first add (idx), or the first time
378            after recovery.  However, it should now be safe to call anytime. */
379         rc = llog_cat_initialize(obd, &obd->obd_olg,
380                                  mds->mds_lov_desc.ld_tgt_count, NULL);
381
382         /*XXX this notifies the MDD until lov handling use old mds code */
383         if (obd->obd_upcall.onu_owner) {
384                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
385                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
386                                                  obd->obd_upcall.onu_owner);
387         }
388 out:
389         OBD_FREE(ld, sizeof(*ld));
390         RETURN(rc);
391 }
392
393
394 #define MDSLOV_NO_INDEX -1
395
396 /* Inform MDS about new/updated target */
397 static int mds_lov_update_mds(struct obd_device *obd,
398                               struct obd_device *watched,
399                               __u32 idx)
400 {
401         struct mds_obd *mds = &obd->u.mds;
402         int rc = 0;
403         int page;
404         int off;
405         obd_id *data;
406
407         ENTRY;
408
409         /* Don't let anyone else mess with mds_lov_objids now */
410         mutex_down(&obd->obd_dev_sem);
411
412         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
413         if (rc)
414                 GOTO(out, rc);
415
416         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
417                idx, obd->obd_recovering, obd->obd_async_recov,
418                mds->mds_lov_desc.ld_tgt_count);
419
420         /* idx is set as data from lov_notify. */
421         if (obd->obd_recovering)
422                 GOTO(out, rc);
423
424         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
425                 CERROR("index %d > count %d!\n", idx,
426                        mds->mds_lov_desc.ld_tgt_count);
427                 GOTO(out, rc = -EINVAL);
428         }
429
430         rc = mds_lov_get_objid(obd, idx);
431         if (rc)
432                 GOTO(out, rc);
433
434         page = idx / OBJID_PER_PAGE();
435         off = idx % OBJID_PER_PAGE();
436         data = mds->mds_lov_page_array[page];
437
438         /* We have read this lastid from disk; tell the osc.
439            Don't call this during recovery. */
440         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
441         if (rc) {
442                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
443                 /* Don't abort the rest of the sync */
444                 rc = 0;
445         } else {
446                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
447                         data[off], idx, rc);
448         }
449 out:
450         mutex_up(&obd->obd_dev_sem);
451         RETURN(rc);
452 }
453
454 /* update the LOV-OSC knowledge of the last used object id's */
455 int mds_lov_connect(struct obd_device *obd, char * lov_name)
456 {
457         struct mds_obd *mds = &obd->u.mds;
458         struct lustre_handle conn = {0,};
459         struct obd_connect_data *data;
460         int rc;
461         ENTRY;
462
463         if (IS_ERR(mds->mds_osc_obd))
464                 RETURN(PTR_ERR(mds->mds_osc_obd));
465
466         if (mds->mds_osc_obd)
467                 RETURN(0);
468
469         mds->mds_osc_obd = class_name2obd(lov_name);
470         if (!mds->mds_osc_obd) {
471                 CERROR("MDS cannot locate LOV %s\n", lov_name);
472                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
473                 RETURN(-ENOTCONN);
474         }
475
476         OBD_ALLOC(data, sizeof(*data));
477         if (data == NULL)
478                 RETURN(-ENOMEM);
479         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX |
480                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
481                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID;
482 #ifdef HAVE_LRU_RESIZE_SUPPORT
483         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
484 #endif
485         data->ocd_version = LUSTRE_VERSION_CODE;
486         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
487         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
488         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
489         OBD_FREE(data, sizeof(*data));
490         if (rc) {
491                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
492                 mds->mds_osc_obd = ERR_PTR(rc);
493                 RETURN(rc);
494         }
495         mds->mds_osc_exp = class_conn2export(&conn);
496
497         rc = obd_register_observer(mds->mds_osc_obd, obd);
498         if (rc) {
499                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
500                        lov_name, rc);
501                 GOTO(err_discon, rc);
502         }
503
504         /* Deny new client connections until we are sure we have some OSTs */
505         obd->obd_no_conn = 1;
506
507         mutex_down(&obd->obd_dev_sem);
508         rc = mds_lov_read_objids(obd);
509         if (rc) {
510                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
511                 GOTO(err_reg, rc);
512         }
513
514         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
515         if (rc)
516                 GOTO(err_reg, rc);
517
518         /* tgt_count may be 0! */
519         rc = llog_cat_initialize(obd, &obd->obd_olg, 
520                                  mds->mds_lov_desc.ld_tgt_count, NULL);
521         if (rc) {
522                 CERROR("failed to initialize catalog %d\n", rc);
523                 GOTO(err_reg, rc);
524         }
525
526         /* If we're mounting this code for the first time on an existing FS,
527          * we need to populate the objids array from the real OST values */
528         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
529                 __u32 i = mds->mds_lov_objid_count;
530                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
531                         rc = mds_lov_get_objid(obd, i);
532                         if (rc != 0)
533                                 break;
534                 }
535                 if (rc == 0)
536                         rc = mds_lov_write_objids(obd);
537                 if (rc)
538                         CERROR("got last objids from OSTs, but error "
539                                 "in update objids file: %d\n", rc);
540         }
541         mutex_up(&obd->obd_dev_sem);
542
543         /* I want to see a callback happen when the OBD moves to a
544          * "For General Use" state, and that's when we'll call
545          * set_nextid().  The class driver can help us here, because
546          * it can use the obd_recovering flag to determine when the
547          * the OBD is full available. */
548         /* MDD device will care about that
549         if (!obd->obd_recovering)
550                 rc = mds_postrecov(obd);
551          */
552         RETURN(rc);
553
554 err_reg:
555         mutex_up(&obd->obd_dev_sem);
556         obd_register_observer(mds->mds_osc_obd, NULL);
557 err_discon:
558         obd_disconnect(mds->mds_osc_exp);
559         mds->mds_osc_exp = NULL;
560         mds->mds_osc_obd = ERR_PTR(rc);
561         RETURN(rc);
562 }
563
564 int mds_lov_disconnect(struct obd_device *obd)
565 {
566         struct mds_obd *mds = &obd->u.mds;
567         int rc = 0;
568         ENTRY;
569
570         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
571                 obd_register_observer(mds->mds_osc_obd, NULL);
572
573                 /* The actual disconnect of the mds_lov will be called from
574                  * class_disconnect_exports from mds_lov_clean. So we have to
575                  * ensure that class_cleanup doesn't fail due to the extra ref
576                  * we're holding now. The mechanism to do that already exists -
577                  * the obd_force flag. We'll drop the final ref to the
578                  * mds_osc_exp in mds_cleanup. */
579                 mds->mds_osc_obd->obd_force = 1;
580         }
581
582         RETURN(rc);
583 }
584
585 /* Collect the preconditions we need to allow client connects */
586 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
587 {
588         if (flag & CONFIG_LOG)
589                 obd->u.mds.mds_fl_cfglog = 1;
590         if (flag & CONFIG_SYNC)
591                 obd->u.mds.mds_fl_synced = 1;
592         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
593                 /* Open for clients */
594                 obd->obd_no_conn = 0;
595 }
596
597 struct mds_lov_sync_info {
598         struct obd_device *mlsi_obd;     /* the lov device to sync */
599         struct obd_device *mlsi_watched; /* target osc */
600         __u32              mlsi_index;   /* index of target */
601 };
602
603 static int mds_propagate_capa_keys(struct mds_obd *mds)
604 {
605         struct lustre_capa_key *key;
606         int i, rc = 0;
607
608         ENTRY;
609
610         if (!mds->mds_capa_keys)
611                 RETURN(0);
612
613         for (i = 0; i < 2; i++) {
614                 key = &mds->mds_capa_keys[i];
615                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
616
617                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
618                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
619                 if (rc) {
620                         DEBUG_CAPA_KEY(D_ERROR, key,
621                                        "propagate failed (rc = %d) for", rc);
622                         RETURN(rc);
623                 }
624         }
625
626         RETURN(0);
627 }
628
629 /* We only sync one osc at a time, so that we don't have to hold
630    any kind of lock on the whole mds_lov_desc, which may change
631    (grow) as a result of mds_lov_add_ost.  This also avoids any
632    kind of mismatch between the lov_desc and the mds_lov_desc,
633    which are not in lock-step during lov_add_obd */
634 static int __mds_lov_synchronize(void *data)
635 {
636         struct mds_lov_sync_info *mlsi = data;
637         struct obd_device *obd = mlsi->mlsi_obd;
638         struct obd_device *watched = mlsi->mlsi_watched;
639         struct mds_obd *mds = &obd->u.mds;
640         struct obd_uuid *uuid;
641         __u32  idx = mlsi->mlsi_index;
642         struct mds_group_info mgi;
643         struct llog_ctxt *ctxt;
644         int rc = 0;
645         ENTRY;
646
647         OBD_FREE(mlsi, sizeof(*mlsi));
648
649         LASSERT(obd);
650         LASSERT(watched);
651         uuid = &watched->u.cli.cl_target_uuid;
652         LASSERT(uuid);
653
654         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
655
656         rc = mds_lov_update_mds(obd, watched, idx);
657         if (rc != 0) {
658                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
659                 GOTO(out, rc);
660         }
661
662         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
663         mgi.uuid = uuid;
664
665         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
666                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
667         if (rc != 0)
668                 GOTO(out, rc);
669         /* propagate capability keys */
670         rc = mds_propagate_capa_keys(mds);
671         if (rc)
672                 GOTO(out, rc);
673
674         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
675         if (!ctxt) 
676                 RETURN(-ENODEV);
677
678         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
679
680         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
681                           NULL, NULL, uuid); 
682         llog_ctxt_put(ctxt);
683         if (rc != 0) {
684                 CERROR("%s failed at llog_origin_connect: %d\n",
685                        obd_uuid2str(uuid), rc);
686                 GOTO(out, rc);
687         }
688
689         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
690               obd->obd_name, obd_uuid2str(uuid));
691         /*
692          * FIXME: this obd_stopping was useless, 
693          * since obd in mdt layer was set
694          */
695         if (obd->obd_stopping)
696                 GOTO(out, rc = -ENODEV);
697
698         rc = mds_lov_clear_orphans(mds, uuid);
699         if (rc != 0) {
700                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
701                        obd_uuid2str(uuid), rc);
702                 GOTO(out, rc);
703         }
704
705         if (obd->obd_upcall.onu_owner) {
706                 /*
707                  * This is a hack for mds_notify->mdd_notify. When the mds obd
708                  * in mdd is removed, This hack should be removed.
709                  */
710                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
711                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
712                                                  obd->obd_upcall.onu_owner);
713         }
714         EXIT;
715 out:
716         if (rc) {
717                 /* Deactivate it for safety */
718                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
719                        rc);
720                 if (!obd->obd_stopping && mds->mds_osc_obd &&
721                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
722                         obd_notify(mds->mds_osc_obd, watched,
723                                    OBD_NOTIFY_INACTIVE, NULL);
724         }
725
726         class_decref(obd);
727         return rc;
728 }
729
730 int mds_lov_synchronize(void *data)
731 {
732         struct mds_lov_sync_info *mlsi = data;
733         char name[20];
734
735         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
736         ptlrpc_daemonize(name);
737
738         RETURN(__mds_lov_synchronize(data));
739 }
740
741 int mds_lov_start_synchronize(struct obd_device *obd,
742                               struct obd_device *watched,
743                               void *data, int nonblock)
744 {
745         struct mds_lov_sync_info *mlsi;
746         int rc;
747         struct mds_obd *mds = &obd->u.mds;
748         struct obd_uuid *uuid;
749         ENTRY;
750
751         LASSERT(watched);
752         uuid = &watched->u.cli.cl_target_uuid;
753
754         OBD_ALLOC(mlsi, sizeof(*mlsi));
755         if (mlsi == NULL)
756                 RETURN(-ENOMEM);
757
758         mlsi->mlsi_obd = obd;
759         mlsi->mlsi_watched = watched;
760         if (data)
761                 mlsi->mlsi_index = *(__u32 *)data;
762         else
763                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
764
765         /* Although class_export_get(obd->obd_self_export) would lock
766            the MDS in place, since it's only a self-export
767            it doesn't lock the LOV in place.  The LOV can be disconnected
768            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
769            Simply taking an export ref on the LOV doesn't help, because it's
770            still disconnected. Taking an obd reference insures that we don't
771            disconnect the LOV.  This of course means a cleanup won't
772            finish for as long as the sync is blocking. */
773         class_incref(obd);
774
775         if (nonblock) {
776                 /* Synchronize in the background */
777                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
778                                        CLONE_VM | CLONE_FILES);
779                 if (rc < 0) {
780                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
781                                obd->obd_name, rc);
782                         class_decref(obd);
783                 } else {
784                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
785                                "thread=%d\n", obd->obd_name,
786                                mlsi->mlsi_index, rc);
787                         rc = 0;
788                 }
789         } else {
790                 rc = __mds_lov_synchronize((void *)mlsi);
791         }
792
793         RETURN(rc);
794 }
795
796 int mds_notify(struct obd_device *obd, struct obd_device *watched,
797                enum obd_notify_event ev, void *data)
798 {
799         int rc = 0;
800         ENTRY;
801
802         switch (ev) {
803         /* We only handle these: */
804         case OBD_NOTIFY_ACTIVE:
805         case OBD_NOTIFY_SYNC:
806         case OBD_NOTIFY_SYNC_NONBLOCK:
807                 break;
808         case OBD_NOTIFY_CONFIG:
809                 mds_allow_cli(obd, (unsigned int)data);
810         default:
811                 RETURN(0);
812         }
813
814         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
815         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
816                 CERROR("unexpected notification of %s %s!\n",
817                        watched->obd_type->typ_name, watched->obd_name);
818                 RETURN(-EINVAL);
819         }
820
821         if (obd->obd_recovering) {
822                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
823                       obd->obd_name,
824                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
825                 /* We still have to fix the lov descriptor for ost's added
826                    after the mdt in the config log.  They didn't make it into
827                    mds_lov_connect. */
828                 mutex_down(&obd->obd_dev_sem);
829                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
830                 if (rc) {
831                         mutex_up(&obd->obd_dev_sem);
832                         RETURN(rc);
833                 }
834                 /* We should update init llog here too for replay unlink and 
835                  * possiable llog init race when recovery complete */
836                 llog_cat_initialize(obd, &obd->obd_olg, 
837                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
838                                     &watched->u.cli.cl_target_uuid);
839                 mutex_up(&obd->obd_dev_sem);
840                 mds_allow_cli(obd, CONFIG_SYNC);
841                 RETURN(rc);
842         }
843
844         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
845         rc = mds_lov_start_synchronize(obd, watched, data,
846                                        !(ev == OBD_NOTIFY_SYNC));
847
848         lquota_recovery(mds_quota_interface_ref, obd);
849
850         RETURN(rc);
851 }
852