Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/module.h>
32 #include <lustre_mds.h>
33 #include <lustre/lustre_idl.h>
34 #include <obd_class.h>
35 #include <obd_lov.h>
36 #include <lustre_lib.h>
37 #include <lustre_fsfilt.h>
38
39 #include "mds_internal.h"
40
41 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
42 {
43         struct mds_obd *mds = &obd->u.mds;
44         unsigned int i=0, j;
45
46         CDEBUG(D_INFO, "dump from %s\n", label);
47         if (mds->mds_lov_page_dirty == NULL) {
48                 CERROR("NULL bitmap!\n");
49                 GOTO(skip_bitmap, i);
50         }
51
52         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
53                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
54 skip_bitmap:
55         if (mds->mds_lov_page_array == NULL) {
56                 CERROR("not init page array!\n");
57                 GOTO(skip_array, i);
58
59         }
60         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
61                 obd_id *data = mds->mds_lov_page_array[i];
62
63                 if (data == NULL)
64                         continue;
65
66                 for(j=0; j < OBJID_PER_PAGE(); j++) {
67                         if (data[j] == 0)
68                                 continue;
69                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
70                 }
71         }
72 skip_array:
73         EXIT;
74 }
75
76 int mds_lov_init_objids(struct obd_device *obd)
77 {
78         struct mds_obd *mds = &obd->u.mds;
79         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
80         struct file *file;
81         int rc;
82         ENTRY;
83
84         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
85
86         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
87         if (mds->mds_lov_page_dirty == NULL)
88                 RETURN(-ENOMEM);
89
90
91         OBD_ALLOC(mds->mds_lov_page_array, size);
92         if (mds->mds_lov_page_array == NULL)
93                 GOTO(err_free_bitmap, rc = -ENOMEM);
94
95         /* open and test the lov objd file */
96         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
97         if (IS_ERR(file)) {
98                 rc = PTR_ERR(file);
99                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
100                 GOTO(err_free, rc = PTR_ERR(file));
101         }
102         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
103                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
104                        file->f_dentry->d_inode->i_mode);
105                 GOTO(err_open, rc = -ENOENT);
106         }
107         mds->mds_lov_objid_filp = file;
108
109         RETURN (0);
110 err_open:
111         if (filp_close((struct file *)file, 0))
112                 CERROR("can't close %s after error\n", LOV_OBJID);
113 err_free:
114         OBD_FREE(mds->mds_lov_page_array, size);
115 err_free_bitmap:
116         FREE_BITMAP(mds->mds_lov_page_dirty);
117
118         RETURN(rc);
119 }
120
121 void mds_lov_destroy_objids(struct obd_device *obd)
122 {
123         struct mds_obd *mds = &obd->u.mds;
124         int i, rc;
125         ENTRY;
126
127         if (mds->mds_lov_page_array != NULL) {
128                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
129                         obd_id *data = mds->mds_lov_page_array[i];
130                         if (data != NULL)
131                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
132                 }
133                 OBD_FREE(mds->mds_lov_page_array,
134                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
135         }
136
137         if (mds->mds_lov_objid_filp) {
138                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
139                 mds->mds_lov_objid_filp = NULL;
140                 if (rc)
141                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
142         }
143
144         FREE_BITMAP(mds->mds_lov_page_dirty);
145         EXIT;
146 }
147
148 static int mds_lov_read_objids(struct obd_device *obd)
149 {
150         struct mds_obd *mds = &obd->u.mds;
151         loff_t off = 0;
152         int i, rc, count = 0, page = 0;
153         size_t size;
154         ENTRY;
155
156         /* Read everything in the file, even if our current lov desc
157            has fewer targets. Old targets not in the lov descriptor
158            during mds setup may still have valid objids. */
159         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
160         if (size == 0)
161                 RETURN(0);
162
163         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
164         CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
165         for(i=0; i < page; i++) {
166                 obd_id *data =  mds->mds_lov_page_array[i];
167                 loff_t off_old = off;
168
169                 LASSERT(data == NULL);
170                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
171                 if (data == NULL)
172                         GOTO(out, rc = -ENOMEM);
173
174                 mds->mds_lov_page_array[i] = data;
175
176                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
177                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
178                 if (rc < 0) {
179                         CERROR("Error reading objids %d\n", rc);
180                         GOTO(out, rc);
181                 }
182                 if (off == off_old)
183                         break; // eof
184
185                 count += (off-off_old)/sizeof(obd_id);
186         }
187         mds->mds_lov_objid_count = count;
188         if (count) {
189                 count --;
190                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
191                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
192         }
193         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
194                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
195 out:
196         mds_lov_dump_objids("read",obd);
197
198         RETURN(0);
199 }
200
201 int mds_lov_write_objids(struct obd_device *obd)
202 {
203         struct mds_obd *mds = &obd->u.mds;
204         int i, rc = 0;
205         ENTRY;
206
207         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
208                 RETURN(0);
209
210         mds_lov_dump_objids("write", obd);
211
212         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
213                 obd_id *data =  mds->mds_lov_page_array[i];
214                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
215                 loff_t off = i * size;
216
217                 LASSERT(data != NULL);
218
219                 /* check for particaly filled last page */
220                 if (i == mds->mds_lov_objid_lastpage)
221                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
222
223                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
224                                          size, &off, 0);
225                 if (rc < 0)
226                         break;
227                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
228         }
229         if (rc >= 0)
230                 rc = 0;
231
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(mds_lov_write_objids);
235
236 static int mds_lov_get_objid(struct obd_device * obd,
237                              __u32 idx)
238 {
239         struct mds_obd *mds = &obd->u.mds;
240         unsigned int page;
241         unsigned int off;
242         obd_id *data;
243         int rc = 0;
244         ENTRY;
245
246         page = idx / OBJID_PER_PAGE();
247         off = idx % OBJID_PER_PAGE();
248         data = mds->mds_lov_page_array[page];
249         if (data == NULL) {
250                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
251                 if (data == NULL)
252                         GOTO(out, rc = -ENOMEM);
253
254                 mds->mds_lov_page_array[page] = data;
255         }
256
257         if (data[off] == 0) {
258                 /* We never read this lastid; ask the osc */
259                 struct obd_id_info lastid;
260                 __u32 size = sizeof(lastid);
261
262                 lastid.idx = idx;
263                 lastid.data = &data[off];
264                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
265                                   KEY_LAST_ID, &size, &lastid);
266                 if (rc)
267                         GOTO(out, rc);
268
269                 if (idx > mds->mds_lov_objid_count) {
270                         mds->mds_lov_objid_count = idx;
271                         mds->mds_lov_objid_lastpage = page;
272                         mds->mds_lov_objid_lastidx = off;
273                 }
274                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
275         }
276 out:
277         RETURN(rc);
278 }
279
280 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
281 {
282         int rc;
283         struct obdo oa;
284         struct obd_trans_info oti = {0};
285         struct lov_stripe_md  *empty_ea = NULL;
286         ENTRY;
287
288         LASSERT(mds->mds_lov_page_array != NULL);
289
290         /* This create will in fact either create or destroy:  If the OST is
291          * missing objects below this ID, they will be created.  If it finds
292          * objects above this ID, they will be removed. */
293         memset(&oa, 0, sizeof(oa));
294         oa.o_flags = OBD_FL_DELORPHAN;
295         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
296         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
297         if (ost_uuid != NULL) {
298                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
299                 oa.o_valid |= OBD_MD_FLINLINE;
300         }
301         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
302
303         RETURN(rc);
304 }
305
306 /* for one target */
307 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
308 {
309         struct mds_obd *mds = &obd->u.mds;
310         int rc;
311         struct obd_id_info info;
312         ENTRY;
313
314         LASSERT(!obd->obd_recovering);
315
316         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
317         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
318
319         info.idx = idx;
320         info.data = id;
321         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
322                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
323         if (rc)
324                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
325                         obd->obd_name, rc);
326
327         RETURN(rc);
328 }
329
330 static __u32 mds_lov_get_idx(struct obd_export *lov,
331                              struct obd_uuid *ost_uuid)
332 {
333         int rc;
334         int valsize = sizeof(ost_uuid);
335
336         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
337                           &valsize, ost_uuid);
338         LASSERT(rc >= 0);
339
340         RETURN(rc);
341 }
342
343 /* Update the lov desc for a new size lov. */
344 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
345 {
346         struct mds_obd *mds = &obd->u.mds;
347         struct lov_desc *ld;
348         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
349         int rc = 0;
350         ENTRY;
351
352         OBD_ALLOC(ld, sizeof(*ld));
353         if (!ld)
354                 RETURN(-ENOMEM);
355
356         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
357                           &valsize, ld);
358         if (rc)
359                 GOTO(out, rc);
360
361         /* Don't change the mds_lov_desc until the objids size matches the
362            count (paranoia) */
363         mds->mds_lov_desc = *ld;
364         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
365                mds->mds_lov_desc.ld_tgt_count);
366
367         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
368                                mds->mds_lov_desc.ld_tgt_count);
369
370         mds->mds_max_mdsize = lov_mds_md_size(stripes);
371         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
372         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
373                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
374                stripes);
375
376         /* If we added a target we have to reconnect the llogs */
377         /* We only _need_ to do this at first add (idx), or the first time
378            after recovery.  However, it should now be safe to call anytime. */
379         rc = llog_cat_initialize(obd, &obd->obd_olg,
380                                  mds->mds_lov_desc.ld_tgt_count, NULL);
381
382         /*XXX this notifies the MDD until lov handling use old mds code */
383         if (obd->obd_upcall.onu_owner) {
384                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
385                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
386                                                  obd->obd_upcall.onu_owner);
387         }
388 out:
389         OBD_FREE(ld, sizeof(*ld));
390         RETURN(rc);
391 }
392
393
394 #define MDSLOV_NO_INDEX -1
395
396 /* Inform MDS about new/updated target */
397 static int mds_lov_update_mds(struct obd_device *obd,
398                               struct obd_device *watched,
399                               __u32 idx)
400 {
401         struct mds_obd *mds = &obd->u.mds;
402         int rc = 0;
403         int page;
404         int off;
405         obd_id *data;
406
407         ENTRY;
408
409         /* Don't let anyone else mess with mds_lov_objids now */
410         mutex_down(&obd->obd_dev_sem);
411
412         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
413         if (rc)
414                 GOTO(out, rc);
415
416         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
417                idx, obd->obd_recovering, obd->obd_async_recov,
418                mds->mds_lov_desc.ld_tgt_count);
419
420         /* idx is set as data from lov_notify. */
421         if (obd->obd_recovering)
422                 GOTO(out, rc);
423
424         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
425                 CERROR("index %d > count %d!\n", idx,
426                        mds->mds_lov_desc.ld_tgt_count);
427                 GOTO(out, rc = -EINVAL);
428         }
429
430         rc = mds_lov_get_objid(obd, idx);
431         if (rc)
432                 GOTO(out, rc);
433
434         page = idx / OBJID_PER_PAGE();
435         off = idx % OBJID_PER_PAGE();
436         data = mds->mds_lov_page_array[page];
437
438         /* We have read this lastid from disk; tell the osc.
439            Don't call this during recovery. */
440         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
441         if (rc) {
442                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
443                 /* Don't abort the rest of the sync */
444                 rc = 0;
445         } else {
446                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
447                         data[off], idx, rc);
448         }
449 out:
450         mutex_up(&obd->obd_dev_sem);
451         RETURN(rc);
452 }
453
454 /* update the LOV-OSC knowledge of the last used object id's */
455 int mds_lov_connect(struct obd_device *obd, char * lov_name)
456 {
457         struct mds_obd *mds = &obd->u.mds;
458         struct lustre_handle conn = {0,};
459         struct obd_connect_data *data;
460         int rc;
461         ENTRY;
462
463         if (IS_ERR(mds->mds_osc_obd))
464                 RETURN(PTR_ERR(mds->mds_osc_obd));
465
466         if (mds->mds_osc_obd)
467                 RETURN(0);
468
469         mds->mds_osc_obd = class_name2obd(lov_name);
470         if (!mds->mds_osc_obd) {
471                 CERROR("MDS cannot locate LOV %s\n", lov_name);
472                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
473                 RETURN(-ENOTCONN);
474         }
475
476         OBD_ALLOC(data, sizeof(*data));
477         if (data == NULL)
478                 RETURN(-ENOMEM);
479         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
480                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
481                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
482                                   OBD_CONNECT_AT;
483 #ifdef HAVE_LRU_RESIZE_SUPPORT
484         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
485 #endif
486         data->ocd_version = LUSTRE_VERSION_CODE;
487         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
488         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
489         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
490         OBD_FREE(data, sizeof(*data));
491         if (rc) {
492                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
493                 mds->mds_osc_obd = ERR_PTR(rc);
494                 RETURN(rc);
495         }
496         mds->mds_osc_exp = class_conn2export(&conn);
497
498         rc = obd_register_observer(mds->mds_osc_obd, obd);
499         if (rc) {
500                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
501                        lov_name, rc);
502                 GOTO(err_discon, rc);
503         }
504
505         /* Deny new client connections until we are sure we have some OSTs */
506         obd->obd_no_conn = 1;
507
508         mutex_down(&obd->obd_dev_sem);
509         rc = mds_lov_read_objids(obd);
510         if (rc) {
511                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
512                 GOTO(err_reg, rc);
513         }
514
515         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
516         if (rc)
517                 GOTO(err_reg, rc);
518
519         /* tgt_count may be 0! */
520         rc = llog_cat_initialize(obd, &obd->obd_olg, 
521                                  mds->mds_lov_desc.ld_tgt_count, NULL);
522         if (rc) {
523                 CERROR("failed to initialize catalog %d\n", rc);
524                 GOTO(err_reg, rc);
525         }
526
527         /* If we're mounting this code for the first time on an existing FS,
528          * we need to populate the objids array from the real OST values */
529         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
530                 __u32 i = mds->mds_lov_objid_count;
531                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
532                         rc = mds_lov_get_objid(obd, i);
533                         if (rc != 0)
534                                 break;
535                 }
536                 if (rc == 0)
537                         rc = mds_lov_write_objids(obd);
538                 if (rc)
539                         CERROR("got last objids from OSTs, but error "
540                                 "in update objids file: %d\n", rc);
541         }
542         mutex_up(&obd->obd_dev_sem);
543
544         /* I want to see a callback happen when the OBD moves to a
545          * "For General Use" state, and that's when we'll call
546          * set_nextid().  The class driver can help us here, because
547          * it can use the obd_recovering flag to determine when the
548          * the OBD is full available. */
549         /* MDD device will care about that
550         if (!obd->obd_recovering)
551                 rc = mds_postrecov(obd);
552          */
553         RETURN(rc);
554
555 err_reg:
556         mutex_up(&obd->obd_dev_sem);
557         obd_register_observer(mds->mds_osc_obd, NULL);
558 err_discon:
559         obd_disconnect(mds->mds_osc_exp);
560         mds->mds_osc_exp = NULL;
561         mds->mds_osc_obd = ERR_PTR(rc);
562         RETURN(rc);
563 }
564
565 int mds_lov_disconnect(struct obd_device *obd)
566 {
567         struct mds_obd *mds = &obd->u.mds;
568         int rc = 0;
569         ENTRY;
570
571         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
572                 obd_register_observer(mds->mds_osc_obd, NULL);
573
574                 /* The actual disconnect of the mds_lov will be called from
575                  * class_disconnect_exports from mds_lov_clean. So we have to
576                  * ensure that class_cleanup doesn't fail due to the extra ref
577                  * we're holding now. The mechanism to do that already exists -
578                  * the obd_force flag. We'll drop the final ref to the
579                  * mds_osc_exp in mds_cleanup. */
580                 mds->mds_osc_obd->obd_force = 1;
581         }
582
583         RETURN(rc);
584 }
585
586 /* Collect the preconditions we need to allow client connects */
587 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
588 {
589         if (flag & CONFIG_LOG)
590                 obd->u.mds.mds_fl_cfglog = 1;
591         if (flag & CONFIG_SYNC)
592                 obd->u.mds.mds_fl_synced = 1;
593         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
594                 /* Open for clients */
595                 obd->obd_no_conn = 0;
596 }
597
598 struct mds_lov_sync_info {
599         struct obd_device *mlsi_obd;     /* the lov device to sync */
600         struct obd_device *mlsi_watched; /* target osc */
601         __u32              mlsi_index;   /* index of target */
602 };
603
604 static int mds_propagate_capa_keys(struct mds_obd *mds)
605 {
606         struct lustre_capa_key *key;
607         int i, rc = 0;
608
609         ENTRY;
610
611         if (!mds->mds_capa_keys)
612                 RETURN(0);
613
614         for (i = 0; i < 2; i++) {
615                 key = &mds->mds_capa_keys[i];
616                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
617
618                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
619                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
620                 if (rc) {
621                         DEBUG_CAPA_KEY(D_ERROR, key,
622                                        "propagate failed (rc = %d) for", rc);
623                         RETURN(rc);
624                 }
625         }
626
627         RETURN(0);
628 }
629
630 /* We only sync one osc at a time, so that we don't have to hold
631    any kind of lock on the whole mds_lov_desc, which may change
632    (grow) as a result of mds_lov_add_ost.  This also avoids any
633    kind of mismatch between the lov_desc and the mds_lov_desc,
634    which are not in lock-step during lov_add_obd */
635 static int __mds_lov_synchronize(void *data)
636 {
637         struct mds_lov_sync_info *mlsi = data;
638         struct obd_device *obd = mlsi->mlsi_obd;
639         struct obd_device *watched = mlsi->mlsi_watched;
640         struct mds_obd *mds = &obd->u.mds;
641         struct obd_uuid *uuid;
642         __u32  idx = mlsi->mlsi_index;
643         struct mds_group_info mgi;
644         struct llog_ctxt *ctxt;
645         int rc = 0;
646         ENTRY;
647
648         OBD_FREE(mlsi, sizeof(*mlsi));
649
650         LASSERT(obd);
651         LASSERT(watched);
652         uuid = &watched->u.cli.cl_target_uuid;
653         LASSERT(uuid);
654
655         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
656
657         rc = mds_lov_update_mds(obd, watched, idx);
658         if (rc != 0) {
659                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
660                 GOTO(out, rc);
661         }
662
663         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
664         mgi.uuid = uuid;
665
666         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
667                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
668         if (rc != 0)
669                 GOTO(out, rc);
670         /* propagate capability keys */
671         rc = mds_propagate_capa_keys(mds);
672         if (rc)
673                 GOTO(out, rc);
674
675         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
676         if (!ctxt) 
677                 RETURN(-ENODEV);
678
679         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
680
681         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
682                           NULL, NULL, uuid); 
683         llog_ctxt_put(ctxt);
684         if (rc != 0) {
685                 CERROR("%s failed at llog_origin_connect: %d\n",
686                        obd_uuid2str(uuid), rc);
687                 GOTO(out, rc);
688         }
689
690         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
691               obd->obd_name, obd_uuid2str(uuid));
692         /*
693          * FIXME: this obd_stopping was useless, 
694          * since obd in mdt layer was set
695          */
696         if (obd->obd_stopping)
697                 GOTO(out, rc = -ENODEV);
698
699         rc = mds_lov_clear_orphans(mds, uuid);
700         if (rc != 0) {
701                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
702                        obd_uuid2str(uuid), rc);
703                 GOTO(out, rc);
704         }
705
706         if (obd->obd_upcall.onu_owner) {
707                 /*
708                  * This is a hack for mds_notify->mdd_notify. When the mds obd
709                  * in mdd is removed, This hack should be removed.
710                  */
711                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
712                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
713                                                  obd->obd_upcall.onu_owner);
714         }
715         EXIT;
716 out:
717         if (rc) {
718                 /* Deactivate it for safety */
719                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
720                        rc);
721                 if (!obd->obd_stopping && mds->mds_osc_obd &&
722                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
723                         obd_notify(mds->mds_osc_obd, watched,
724                                    OBD_NOTIFY_INACTIVE, NULL);
725         }
726
727         class_decref(obd);
728         return rc;
729 }
730
731 int mds_lov_synchronize(void *data)
732 {
733         struct mds_lov_sync_info *mlsi = data;
734         char name[20];
735
736         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
737         ptlrpc_daemonize(name);
738
739         RETURN(__mds_lov_synchronize(data));
740 }
741
742 int mds_lov_start_synchronize(struct obd_device *obd,
743                               struct obd_device *watched,
744                               void *data, int nonblock)
745 {
746         struct mds_lov_sync_info *mlsi;
747         int rc;
748         struct mds_obd *mds = &obd->u.mds;
749         struct obd_uuid *uuid;
750         ENTRY;
751
752         LASSERT(watched);
753         uuid = &watched->u.cli.cl_target_uuid;
754
755         OBD_ALLOC(mlsi, sizeof(*mlsi));
756         if (mlsi == NULL)
757                 RETURN(-ENOMEM);
758
759         mlsi->mlsi_obd = obd;
760         mlsi->mlsi_watched = watched;
761         if (data)
762                 mlsi->mlsi_index = *(__u32 *)data;
763         else
764                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
765
766         /* Although class_export_get(obd->obd_self_export) would lock
767            the MDS in place, since it's only a self-export
768            it doesn't lock the LOV in place.  The LOV can be disconnected
769            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
770            Simply taking an export ref on the LOV doesn't help, because it's
771            still disconnected. Taking an obd reference insures that we don't
772            disconnect the LOV.  This of course means a cleanup won't
773            finish for as long as the sync is blocking. */
774         class_incref(obd);
775
776         if (nonblock) {
777                 /* Synchronize in the background */
778                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
779                                        CLONE_VM | CLONE_FILES);
780                 if (rc < 0) {
781                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
782                                obd->obd_name, rc);
783                         class_decref(obd);
784                 } else {
785                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
786                                "thread=%d\n", obd->obd_name,
787                                mlsi->mlsi_index, rc);
788                         rc = 0;
789                 }
790         } else {
791                 rc = __mds_lov_synchronize((void *)mlsi);
792         }
793
794         RETURN(rc);
795 }
796
797 int mds_notify(struct obd_device *obd, struct obd_device *watched,
798                enum obd_notify_event ev, void *data)
799 {
800         int rc = 0;
801         ENTRY;
802
803         switch (ev) {
804         /* We only handle these: */
805         case OBD_NOTIFY_ACTIVE:
806         case OBD_NOTIFY_SYNC:
807         case OBD_NOTIFY_SYNC_NONBLOCK:
808                 break;
809         case OBD_NOTIFY_CONFIG:
810                 mds_allow_cli(obd, (unsigned int)data);
811         default:
812                 RETURN(0);
813         }
814
815         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
816         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
817                 CERROR("unexpected notification of %s %s!\n",
818                        watched->obd_type->typ_name, watched->obd_name);
819                 RETURN(-EINVAL);
820         }
821
822         if (obd->obd_recovering) {
823                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
824                       obd->obd_name,
825                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
826                 /* We still have to fix the lov descriptor for ost's added
827                    after the mdt in the config log.  They didn't make it into
828                    mds_lov_connect. */
829                 mutex_down(&obd->obd_dev_sem);
830                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
831                 if (rc) {
832                         mutex_up(&obd->obd_dev_sem);
833                         RETURN(rc);
834                 }
835                 /* We should update init llog here too for replay unlink and 
836                  * possiable llog init race when recovery complete */
837                 llog_cat_initialize(obd, &obd->obd_olg, 
838                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
839                                     &watched->u.cli.cl_target_uuid);
840                 mutex_up(&obd->obd_dev_sem);
841                 mds_allow_cli(obd, CONFIG_SYNC);
842                 RETURN(rc);
843         }
844
845         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
846         rc = mds_lov_start_synchronize(obd, watched, data,
847                                        !(ev == OBD_NOTIFY_SYNC));
848
849         lquota_recovery(mds_quota_interface_ref, obd);
850
851         RETURN(rc);
852 }
853