Whamcloud - gitweb
fix code comment.
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/module.h>
32 #include <lustre_mds.h>
33 #include <lustre/lustre_idl.h>
34 #include <obd_class.h>
35 #include <obd_lov.h>
36 #include <lustre_lib.h>
37 #include <lustre_fsfilt.h>
38
39 #include "mds_internal.h"
40
41 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
42 {
43         struct mds_obd *mds = &obd->u.mds;
44         unsigned int i=0, j;
45
46         CDEBUG(D_INFO, "dump from %s\n", label);
47         if (mds->mds_lov_page_dirty == NULL) {
48                 CERROR("NULL bitmap!\n");
49                 GOTO(skip_bitmap, i);
50         }
51
52         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
53                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
54 skip_bitmap:
55         if (mds->mds_lov_page_array == NULL) {
56                 CERROR("not init page array!\n");
57                 GOTO(skip_array, i);
58
59         }
60         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
61                 obd_id *data = mds->mds_lov_page_array[i];
62
63                 if (data == NULL)
64                         continue;
65
66                 for(j=0; j < OBJID_PER_PAGE(); j++) {
67                         if (data[j] == 0)
68                                 continue;
69                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
70                 }
71         }
72 skip_array:
73         EXIT;
74 }
75
76 int mds_lov_init_objids(struct obd_device *obd)
77 {
78         struct mds_obd *mds = &obd->u.mds;
79         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
80         struct file *file;
81         int rc;
82         ENTRY;
83
84         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
85
86         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
87         if (mds->mds_lov_page_dirty == NULL)
88                 RETURN(-ENOMEM);
89
90
91         OBD_ALLOC(mds->mds_lov_page_array, size);
92         if (mds->mds_lov_page_array == NULL)
93                 GOTO(err_free_bitmap, rc = -ENOMEM);
94
95         /* open and test the lov objd file */
96         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
97         if (IS_ERR(file)) {
98                 rc = PTR_ERR(file);
99                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
100                 GOTO(err_free, rc = PTR_ERR(file));
101         }
102         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
103                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
104                        file->f_dentry->d_inode->i_mode);
105                 GOTO(err_open, rc = -ENOENT);
106         }
107         mds->mds_lov_objid_filp = file;
108
109         RETURN (0);
110 err_open:
111         if (filp_close((struct file *)file, 0))
112                 CERROR("can't close %s after error\n", LOV_OBJID);
113 err_free:
114         OBD_FREE(mds->mds_lov_page_array, size);
115 err_free_bitmap:
116         FREE_BITMAP(mds->mds_lov_page_dirty);
117
118         RETURN(rc);
119 }
120
121 void mds_lov_destroy_objids(struct obd_device *obd)
122 {
123         struct mds_obd *mds = &obd->u.mds;
124         int i, rc;
125         ENTRY;
126
127         if (mds->mds_lov_page_array != NULL) {
128                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
129                         obd_id *data = mds->mds_lov_page_array[i];
130                         if (data != NULL)
131                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
132                 }
133                 OBD_FREE(mds->mds_lov_page_array,
134                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
135         }
136
137         if (mds->mds_lov_objid_filp) {
138                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
139                 mds->mds_lov_objid_filp = NULL;
140                 if (rc)
141                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
142         }
143
144         FREE_BITMAP(mds->mds_lov_page_dirty);
145         EXIT;
146 }
147
148 static int mds_lov_read_objids(struct obd_device *obd)
149 {
150         struct mds_obd *mds = &obd->u.mds;
151         loff_t off = 0;
152         int i, rc, count = 0, page = 0;
153         unsigned long size;
154         ENTRY;
155
156         /* Read everything in the file, even if our current lov desc
157            has fewer targets. Old targets not in the lov descriptor
158            during mds setup may still have valid objids. */
159         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
160         if (size == 0)
161                 RETURN(0);
162
163         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
164         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
165
166         for (i = 0; i < page; i++) {
167                 obd_id *data =  mds->mds_lov_page_array[i];
168                 loff_t off_old = off;
169
170                 LASSERT(data == NULL);
171                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
172                 if (data == NULL)
173                         GOTO(out, rc = -ENOMEM);
174
175                 mds->mds_lov_page_array[i] = data;
176
177                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
178                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
179                 if (rc < 0) {
180                         CERROR("Error reading objids %d\n", rc);
181                         GOTO(out, rc);
182                 }
183                 if (off == off_old)
184                         break; // eof
185
186                 count += (off - off_old)/sizeof(obd_id);
187         }
188         mds->mds_lov_objid_count = count;
189         if (count) {
190                 count --;
191                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
192                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
193         }
194         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
195                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
196 out:
197         mds_lov_dump_objids("read",obd);
198
199         RETURN(0);
200 }
201
202 int mds_lov_write_objids(struct obd_device *obd)
203 {
204         struct mds_obd *mds = &obd->u.mds;
205         int i, rc = 0;
206         ENTRY;
207
208         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
209                 RETURN(0);
210
211         mds_lov_dump_objids("write", obd);
212
213         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
214                 obd_id *data =  mds->mds_lov_page_array[i];
215                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
216                 loff_t off = i * size;
217
218                 LASSERT(data != NULL);
219
220                 /* check for particaly filled last page */
221                 if (i == mds->mds_lov_objid_lastpage)
222                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
223
224                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
225                                          size, &off, 0);
226                 if (rc < 0)
227                         break;
228                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
229         }
230         if (rc >= 0)
231                 rc = 0;
232
233         RETURN(rc);
234 }
235 EXPORT_SYMBOL(mds_lov_write_objids);
236
237 static int mds_lov_get_objid(struct obd_device * obd,
238                              __u32 idx)
239 {
240         struct mds_obd *mds = &obd->u.mds;
241         unsigned int page;
242         unsigned int off;
243         obd_id *data;
244         int rc = 0;
245         ENTRY;
246
247         page = idx / OBJID_PER_PAGE();
248         off = idx % OBJID_PER_PAGE();
249         data = mds->mds_lov_page_array[page];
250         if (data == NULL) {
251                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
252                 if (data == NULL)
253                         GOTO(out, rc = -ENOMEM);
254
255                 mds->mds_lov_page_array[page] = data;
256         }
257
258         if (data[off] == 0) {
259                 /* We never read this lastid; ask the osc */
260                 struct obd_id_info lastid;
261                 __u32 size = sizeof(lastid);
262
263                 lastid.idx = idx;
264                 lastid.data = &data[off];
265                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
266                                   KEY_LAST_ID, &size, &lastid);
267                 if (rc)
268                         GOTO(out, rc);
269
270                 if (idx > mds->mds_lov_objid_count) {
271                         mds->mds_lov_objid_count = idx;
272                         mds->mds_lov_objid_lastpage = page;
273                         mds->mds_lov_objid_lastidx = off;
274                 }
275                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
276         }
277 out:
278         RETURN(rc);
279 }
280
281 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
282 {
283         int rc;
284         struct obdo oa;
285         struct obd_trans_info oti = {0};
286         struct lov_stripe_md  *empty_ea = NULL;
287         ENTRY;
288
289         LASSERT(mds->mds_lov_page_array != NULL);
290
291         /* This create will in fact either create or destroy:  If the OST is
292          * missing objects below this ID, they will be created.  If it finds
293          * objects above this ID, they will be removed. */
294         memset(&oa, 0, sizeof(oa));
295         oa.o_flags = OBD_FL_DELORPHAN;
296         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
297         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
298         if (ost_uuid != NULL) {
299                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
300                 oa.o_valid |= OBD_MD_FLINLINE;
301         }
302         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
303
304         RETURN(rc);
305 }
306
307 /* for one target */
308 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
309 {
310         struct mds_obd *mds = &obd->u.mds;
311         int rc;
312         struct obd_id_info info;
313         ENTRY;
314
315         LASSERT(!obd->obd_recovering);
316
317         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
318         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
319
320         info.idx = idx;
321         info.data = id;
322         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
323                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
324         if (rc)
325                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
326                         obd->obd_name, rc);
327
328         RETURN(rc);
329 }
330
331 static __u32 mds_lov_get_idx(struct obd_export *lov,
332                              struct obd_uuid *ost_uuid)
333 {
334         int rc;
335         int valsize = sizeof(ost_uuid);
336
337         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
338                           &valsize, ost_uuid);
339         LASSERT(rc >= 0);
340
341         RETURN(rc);
342 }
343
344 /* Update the lov desc for a new size lov. */
345 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
346 {
347         struct mds_obd *mds = &obd->u.mds;
348         struct lov_desc *ld;
349         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
350         int rc = 0;
351         ENTRY;
352
353         OBD_ALLOC(ld, sizeof(*ld));
354         if (!ld)
355                 RETURN(-ENOMEM);
356
357         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
358                           &valsize, ld);
359         if (rc)
360                 GOTO(out, rc);
361
362         /* Don't change the mds_lov_desc until the objids size matches the
363            count (paranoia) */
364         mds->mds_lov_desc = *ld;
365         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
366                mds->mds_lov_desc.ld_tgt_count);
367
368         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
369                                mds->mds_lov_desc.ld_tgt_count);
370
371         mds->mds_max_mdsize = lov_mds_md_size(stripes);
372         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
373         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
374                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
375                stripes);
376
377         /* If we added a target we have to reconnect the llogs */
378         /* We only _need_ to do this at first add (idx), or the first time
379            after recovery.  However, it should now be safe to call anytime. */
380         rc = llog_cat_initialize(obd, &obd->obd_olg,
381                                  mds->mds_lov_desc.ld_tgt_count, NULL);
382
383         /*XXX this notifies the MDD until lov handling use old mds code */
384         if (obd->obd_upcall.onu_owner) {
385                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
386                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
387                                                  obd->obd_upcall.onu_owner);
388         }
389 out:
390         OBD_FREE(ld, sizeof(*ld));
391         RETURN(rc);
392 }
393
394
395 #define MDSLOV_NO_INDEX -1
396
397 /* Inform MDS about new/updated target */
398 static int mds_lov_update_mds(struct obd_device *obd,
399                               struct obd_device *watched,
400                               __u32 idx)
401 {
402         struct mds_obd *mds = &obd->u.mds;
403         int rc = 0;
404         int page;
405         int off;
406         obd_id *data;
407
408         ENTRY;
409
410         /* Don't let anyone else mess with mds_lov_objids now */
411         mutex_down(&obd->obd_dev_sem);
412
413         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
414         if (rc)
415                 GOTO(out, rc);
416
417         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
418                idx, obd->obd_recovering, obd->obd_async_recov,
419                mds->mds_lov_desc.ld_tgt_count);
420
421         /* idx is set as data from lov_notify. */
422         if (obd->obd_recovering)
423                 GOTO(out, rc);
424
425         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
426                 CERROR("index %d > count %d!\n", idx,
427                        mds->mds_lov_desc.ld_tgt_count);
428                 GOTO(out, rc = -EINVAL);
429         }
430
431         rc = mds_lov_get_objid(obd, idx);
432         if (rc)
433                 GOTO(out, rc);
434
435         page = idx / OBJID_PER_PAGE();
436         off = idx % OBJID_PER_PAGE();
437         data = mds->mds_lov_page_array[page];
438
439         /* We have read this lastid from disk; tell the osc.
440            Don't call this during recovery. */
441         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
442         if (rc) {
443                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
444                 /* Don't abort the rest of the sync */
445                 rc = 0;
446         } else {
447                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
448                         data[off], idx, rc);
449         }
450 out:
451         mutex_up(&obd->obd_dev_sem);
452         RETURN(rc);
453 }
454
455 /* update the LOV-OSC knowledge of the last used object id's */
456 int mds_lov_connect(struct obd_device *obd, char * lov_name)
457 {
458         struct mds_obd *mds = &obd->u.mds;
459         struct lustre_handle conn = {0,};
460         struct obd_connect_data *data;
461         int rc;
462         ENTRY;
463
464         if (IS_ERR(mds->mds_osc_obd))
465                 RETURN(PTR_ERR(mds->mds_osc_obd));
466
467         if (mds->mds_osc_obd)
468                 RETURN(0);
469
470         mds->mds_osc_obd = class_name2obd(lov_name);
471         if (!mds->mds_osc_obd) {
472                 CERROR("MDS cannot locate LOV %s\n", lov_name);
473                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
474                 RETURN(-ENOTCONN);
475         }
476
477         OBD_ALLOC(data, sizeof(*data));
478         if (data == NULL)
479                 RETURN(-ENOMEM);
480         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
481                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
482                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
483                                   OBD_CONNECT_AT;
484 #ifdef HAVE_LRU_RESIZE_SUPPORT
485         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
486 #endif
487         data->ocd_version = LUSTRE_VERSION_CODE;
488         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
489         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
490         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
491         OBD_FREE(data, sizeof(*data));
492         if (rc) {
493                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
494                 mds->mds_osc_obd = ERR_PTR(rc);
495                 RETURN(rc);
496         }
497         mds->mds_osc_exp = class_conn2export(&conn);
498
499         rc = obd_register_observer(mds->mds_osc_obd, obd);
500         if (rc) {
501                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
502                        lov_name, rc);
503                 GOTO(err_discon, rc);
504         }
505
506         /* Deny new client connections until we are sure we have some OSTs */
507         obd->obd_no_conn = 1;
508
509         mutex_down(&obd->obd_dev_sem);
510         rc = mds_lov_read_objids(obd);
511         if (rc) {
512                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
513                 GOTO(err_reg, rc);
514         }
515
516         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
517         if (rc)
518                 GOTO(err_reg, rc);
519
520         /* tgt_count may be 0! */
521         rc = llog_cat_initialize(obd, &obd->obd_olg, 
522                                  mds->mds_lov_desc.ld_tgt_count, NULL);
523         if (rc) {
524                 CERROR("failed to initialize catalog %d\n", rc);
525                 GOTO(err_reg, rc);
526         }
527
528         /* If we're mounting this code for the first time on an existing FS,
529          * we need to populate the objids array from the real OST values */
530         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
531                 __u32 i = mds->mds_lov_objid_count;
532                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
533                         rc = mds_lov_get_objid(obd, i);
534                         if (rc != 0)
535                                 break;
536                 }
537                 if (rc == 0)
538                         rc = mds_lov_write_objids(obd);
539                 if (rc)
540                         CERROR("got last objids from OSTs, but error "
541                                 "in update objids file: %d\n", rc);
542         }
543         mutex_up(&obd->obd_dev_sem);
544
545         /* I want to see a callback happen when the OBD moves to a
546          * "For General Use" state, and that's when we'll call
547          * set_nextid().  The class driver can help us here, because
548          * it can use the obd_recovering flag to determine when the
549          * the OBD is full available. */
550         /* MDD device will care about that
551         if (!obd->obd_recovering)
552                 rc = mds_postrecov(obd);
553          */
554         RETURN(rc);
555
556 err_reg:
557         mutex_up(&obd->obd_dev_sem);
558         obd_register_observer(mds->mds_osc_obd, NULL);
559 err_discon:
560         obd_disconnect(mds->mds_osc_exp);
561         mds->mds_osc_exp = NULL;
562         mds->mds_osc_obd = ERR_PTR(rc);
563         RETURN(rc);
564 }
565
566 int mds_lov_disconnect(struct obd_device *obd)
567 {
568         struct mds_obd *mds = &obd->u.mds;
569         int rc = 0;
570         ENTRY;
571
572         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
573                 obd_register_observer(mds->mds_osc_obd, NULL);
574
575                 /* The actual disconnect of the mds_lov will be called from
576                  * class_disconnect_exports from mds_lov_clean. So we have to
577                  * ensure that class_cleanup doesn't fail due to the extra ref
578                  * we're holding now. The mechanism to do that already exists -
579                  * the obd_force flag. We'll drop the final ref to the
580                  * mds_osc_exp in mds_cleanup. */
581                 mds->mds_osc_obd->obd_force = 1;
582         }
583
584         RETURN(rc);
585 }
586
587 /* Collect the preconditions we need to allow client connects */
588 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
589 {
590         if (flag & CONFIG_LOG)
591                 obd->u.mds.mds_fl_cfglog = 1;
592         if (flag & CONFIG_SYNC)
593                 obd->u.mds.mds_fl_synced = 1;
594         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
595                 /* Open for clients */
596                 obd->obd_no_conn = 0;
597 }
598
599 struct mds_lov_sync_info {
600         struct obd_device *mlsi_obd;     /* the lov device to sync */
601         struct obd_device *mlsi_watched; /* target osc */
602         __u32              mlsi_index;   /* index of target */
603 };
604
605 static int mds_propagate_capa_keys(struct mds_obd *mds)
606 {
607         struct lustre_capa_key *key;
608         int i, rc = 0;
609
610         ENTRY;
611
612         if (!mds->mds_capa_keys)
613                 RETURN(0);
614
615         for (i = 0; i < 2; i++) {
616                 key = &mds->mds_capa_keys[i];
617                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
618
619                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
620                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
621                 if (rc) {
622                         DEBUG_CAPA_KEY(D_ERROR, key,
623                                        "propagate failed (rc = %d) for", rc);
624                         RETURN(rc);
625                 }
626         }
627
628         RETURN(0);
629 }
630
631 /* We only sync one osc at a time, so that we don't have to hold
632    any kind of lock on the whole mds_lov_desc, which may change
633    (grow) as a result of mds_lov_add_ost.  This also avoids any
634    kind of mismatch between the lov_desc and the mds_lov_desc,
635    which are not in lock-step during lov_add_obd */
636 static int __mds_lov_synchronize(void *data)
637 {
638         struct mds_lov_sync_info *mlsi = data;
639         struct obd_device *obd = mlsi->mlsi_obd;
640         struct obd_device *watched = mlsi->mlsi_watched;
641         struct mds_obd *mds = &obd->u.mds;
642         struct obd_uuid *uuid;
643         __u32  idx = mlsi->mlsi_index;
644         struct mds_group_info mgi;
645         struct llog_ctxt *ctxt;
646         int rc = 0;
647         ENTRY;
648
649         OBD_FREE_PTR(mlsi);
650
651         LASSERT(obd);
652         LASSERT(watched);
653         uuid = &watched->u.cli.cl_target_uuid;
654         LASSERT(uuid);
655
656         down_read(&mds->mds_notify_lock);
657         if (obd->obd_stopping || obd->obd_fail)
658                 GOTO(out, rc = -ENODEV);
659
660         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
661         rc = mds_lov_update_mds(obd, watched, idx);
662         if (rc != 0) {
663                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
664                 GOTO(out, rc);
665         }
666         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
667         mgi.uuid = uuid;
668
669         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
670                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
671         if (rc != 0)
672                 GOTO(out, rc);
673         /* propagate capability keys */
674         rc = mds_propagate_capa_keys(mds);
675         if (rc)
676                 GOTO(out, rc);
677
678         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
679         if (!ctxt) 
680                 GOTO(out, rc = -ENODEV);
681
682         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
683         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
684                           NULL, NULL, uuid); 
685         llog_ctxt_put(ctxt);
686         if (rc != 0) {
687                 CERROR("%s failed at llog_origin_connect: %d\n",
688                        obd_uuid2str(uuid), rc);
689                 GOTO(out, rc);
690         }
691
692         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
693               obd->obd_name, obd_uuid2str(uuid));
694         rc = mds_lov_clear_orphans(mds, uuid);
695         if (rc != 0) {
696                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
697                        obd_uuid2str(uuid), rc);
698                 GOTO(out, rc);
699         }
700
701         if (obd->obd_upcall.onu_owner) {
702                 /*
703                  * This is a hack for mds_notify->mdd_notify. When the mds obd
704                  * in mdd is removed, This hack should be removed.
705                  */
706                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
707                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
708                                                  obd->obd_upcall.onu_owner);
709         }
710         EXIT;
711 out:
712         up_read(&mds->mds_notify_lock);
713         if (rc) {
714                 /* Deactivate it for safety */
715                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
716                        rc);
717                 if (!obd->obd_stopping && mds->mds_osc_obd &&
718                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
719                         obd_notify(mds->mds_osc_obd, watched,
720                                    OBD_NOTIFY_INACTIVE, NULL);
721         }
722
723         class_decref(obd);
724         return rc;
725 }
726
727 int mds_lov_synchronize(void *data)
728 {
729         struct mds_lov_sync_info *mlsi = data;
730         char name[20];
731
732         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
733         ptlrpc_daemonize(name);
734
735         RETURN(__mds_lov_synchronize(data));
736 }
737
738 int mds_lov_start_synchronize(struct obd_device *obd,
739                               struct obd_device *watched,
740                               void *data, int nonblock)
741 {
742         struct mds_lov_sync_info *mlsi;
743         int rc;
744         struct mds_obd *mds = &obd->u.mds;
745         struct obd_uuid *uuid;
746         ENTRY;
747
748         LASSERT(watched);
749         uuid = &watched->u.cli.cl_target_uuid;
750
751         OBD_ALLOC(mlsi, sizeof(*mlsi));
752         if (mlsi == NULL)
753                 RETURN(-ENOMEM);
754
755         mlsi->mlsi_obd = obd;
756         mlsi->mlsi_watched = watched;
757         if (data)
758                 mlsi->mlsi_index = *(__u32 *)data;
759         else
760                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
761
762         /* Although class_export_get(obd->obd_self_export) would lock
763            the MDS in place, since it's only a self-export
764            it doesn't lock the LOV in place.  The LOV can be disconnected
765            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
766            Simply taking an export ref on the LOV doesn't help, because it's
767            still disconnected. Taking an obd reference insures that we don't
768            disconnect the LOV.  This of course means a cleanup won't
769            finish for as long as the sync is blocking. */
770         class_incref(obd);
771
772         if (nonblock) {
773                 /* Synchronize in the background */
774                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
775                                        CLONE_VM | CLONE_FILES);
776                 if (rc < 0) {
777                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
778                                obd->obd_name, rc);
779                         class_decref(obd);
780                 } else {
781                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
782                                "thread=%d\n", obd->obd_name,
783                                mlsi->mlsi_index, rc);
784                         rc = 0;
785                 }
786         } else {
787                 rc = __mds_lov_synchronize((void *)mlsi);
788         }
789
790         RETURN(rc);
791 }
792
793 int mds_notify(struct obd_device *obd, struct obd_device *watched,
794                enum obd_notify_event ev, void *data)
795 {
796         int rc = 0;
797         ENTRY;
798
799         switch (ev) {
800         /* We only handle these: */
801         case OBD_NOTIFY_ACTIVE:
802         case OBD_NOTIFY_SYNC:
803         case OBD_NOTIFY_SYNC_NONBLOCK:
804                 break;
805         case OBD_NOTIFY_CONFIG:
806                 mds_allow_cli(obd, (unsigned long)data);
807         default:
808                 RETURN(0);
809         }
810
811         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
812         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
813                 CERROR("unexpected notification of %s %s!\n",
814                        watched->obd_type->typ_name, watched->obd_name);
815                 RETURN(-EINVAL);
816         }
817
818         if (obd->obd_recovering) {
819                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
820                       obd->obd_name,
821                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
822                 /* We still have to fix the lov descriptor for ost's added
823                    after the mdt in the config log.  They didn't make it into
824                    mds_lov_connect. */
825                 mutex_down(&obd->obd_dev_sem);
826                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
827                 if (rc) {
828                         mutex_up(&obd->obd_dev_sem);
829                         RETURN(rc);
830                 }
831                 /* We should update init llog here too for replay unlink and 
832                  * possiable llog init race when recovery complete */
833                 llog_cat_initialize(obd, &obd->obd_olg, 
834                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
835                                     &watched->u.cli.cl_target_uuid);
836                 mutex_up(&obd->obd_dev_sem);
837                 mds_allow_cli(obd, CONFIG_SYNC);
838                 RETURN(rc);
839         }
840
841         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
842         rc = mds_lov_start_synchronize(obd, watched, data,
843                                        !(ev == OBD_NOTIFY_SYNC));
844
845         lquota_recovery(mds_quota_interface_ref, obd);
846
847         RETURN(rc);
848 }
849