Whamcloud - gitweb
It is be possible to send the lock handle along with each read
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52
53 #include "mds_internal.h"
54
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
56 {
57         struct mds_obd *mds = &obd->u.mds;
58         unsigned int i=0, j;
59
60         CDEBUG(D_INFO, "dump from %s\n", label);
61         if (mds->mds_lov_page_dirty == NULL) {
62                 CERROR("NULL bitmap!\n");
63                 GOTO(skip_bitmap, i);
64         }
65
66         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
68 skip_bitmap:
69         if (mds->mds_lov_page_array == NULL) {
70                 CERROR("not init page array!\n");
71                 GOTO(skip_array, i);
72
73         }
74         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75                 obd_id *data = mds->mds_lov_page_array[i];
76
77                 if (data == NULL)
78                         continue;
79
80                 for(j=0; j < OBJID_PER_PAGE(); j++) {
81                         if (data[j] == 0)
82                                 continue;
83                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
84                 }
85         }
86 skip_array:
87         EXIT;
88 }
89
90 int mds_lov_init_objids(struct obd_device *obd)
91 {
92         struct mds_obd *mds = &obd->u.mds;
93         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
94         struct file *file;
95         int rc;
96         ENTRY;
97
98         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
99
100         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101         if (mds->mds_lov_page_dirty == NULL)
102                 RETURN(-ENOMEM);
103
104
105         OBD_ALLOC(mds->mds_lov_page_array, size);
106         if (mds->mds_lov_page_array == NULL)
107                 GOTO(err_free_bitmap, rc = -ENOMEM);
108
109         /* open and test the lov objd file */
110         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
111         if (IS_ERR(file)) {
112                 rc = PTR_ERR(file);
113                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114                 GOTO(err_free, rc = PTR_ERR(file));
115         }
116         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118                        file->f_dentry->d_inode->i_mode);
119                 GOTO(err_open, rc = -ENOENT);
120         }
121         mds->mds_lov_objid_filp = file;
122
123         RETURN (0);
124 err_open:
125         if (filp_close((struct file *)file, 0))
126                 CERROR("can't close %s after error\n", LOV_OBJID);
127 err_free:
128         OBD_FREE(mds->mds_lov_page_array, size);
129 err_free_bitmap:
130         FREE_BITMAP(mds->mds_lov_page_dirty);
131
132         RETURN(rc);
133 }
134
135 void mds_lov_destroy_objids(struct obd_device *obd)
136 {
137         struct mds_obd *mds = &obd->u.mds;
138         int i, rc;
139         ENTRY;
140
141         if (mds->mds_lov_page_array != NULL) {
142                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143                         obd_id *data = mds->mds_lov_page_array[i];
144                         if (data != NULL)
145                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
146                 }
147                 OBD_FREE(mds->mds_lov_page_array,
148                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
149         }
150
151         if (mds->mds_lov_objid_filp) {
152                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153                 mds->mds_lov_objid_filp = NULL;
154                 if (rc)
155                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
156         }
157
158         FREE_BITMAP(mds->mds_lov_page_dirty);
159         EXIT;
160 }
161
162 static int mds_lov_read_objids(struct obd_device *obd)
163 {
164         struct mds_obd *mds = &obd->u.mds;
165         loff_t off = 0;
166         int i, rc, count = 0, page = 0;
167         unsigned long size;
168         ENTRY;
169
170         /* Read everything in the file, even if our current lov desc
171            has fewer targets. Old targets not in the lov descriptor
172            during mds setup may still have valid objids. */
173         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
174         if (size == 0)
175                 RETURN(0);
176
177         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
179
180         for (i = 0; i < page; i++) {
181                 obd_id *data =  mds->mds_lov_page_array[i];
182                 loff_t off_old = off;
183
184                 LASSERT(data == NULL);
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         GOTO(out, rc = -ENOMEM);
188
189                 mds->mds_lov_page_array[i] = data;
190
191                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
193                 if (rc < 0) {
194                         CERROR("Error reading objids %d\n", rc);
195                         GOTO(out, rc);
196                 }
197                 if (off == off_old)
198                         break; // eof
199
200                 count += (off - off_old)/sizeof(obd_id);
201         }
202         mds->mds_lov_objid_count = count;
203         if (count) {
204                 count --;
205                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
207         }
208         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
210 out:
211         mds_lov_dump_objids("read",obd);
212
213         RETURN(0);
214 }
215
216 int mds_lov_write_objids(struct obd_device *obd)
217 {
218         struct mds_obd *mds = &obd->u.mds;
219         int i, rc = 0;
220         ENTRY;
221
222         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
223                 RETURN(0);
224
225         mds_lov_dump_objids("write", obd);
226
227         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228                 obd_id *data =  mds->mds_lov_page_array[i];
229                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230                 loff_t off = i * size;
231
232                 LASSERT(data != NULL);
233
234                 /* check for particaly filled last page */
235                 if (i == mds->mds_lov_objid_lastpage)
236                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
237
238                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
239                                          size, &off, 0);
240                 if (rc < 0)
241                         break;
242                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
243         }
244         if (rc >= 0)
245                 rc = 0;
246
247         RETURN(rc);
248 }
249 EXPORT_SYMBOL(mds_lov_write_objids);
250
251 static int mds_lov_get_objid(struct obd_device * obd,
252                              __u32 idx)
253 {
254         struct mds_obd *mds = &obd->u.mds;
255         unsigned int page;
256         unsigned int off;
257         obd_id *data;
258         int rc = 0;
259         ENTRY;
260
261         page = idx / OBJID_PER_PAGE();
262         off = idx % OBJID_PER_PAGE();
263         data = mds->mds_lov_page_array[page];
264         if (data == NULL) {
265                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
266                 if (data == NULL)
267                         GOTO(out, rc = -ENOMEM);
268
269                 mds->mds_lov_page_array[page] = data;
270         }
271
272         if (data[off] == 0) {
273                 /* We never read this lastid; ask the osc */
274                 struct obd_id_info lastid;
275                 __u32 size = sizeof(lastid);
276
277                 lastid.idx = idx;
278                 lastid.data = &data[off];
279                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280                                   KEY_LAST_ID, &size, &lastid, NULL);
281                 if (rc)
282                         GOTO(out, rc);
283
284                 if (idx > mds->mds_lov_objid_count) {
285                         mds->mds_lov_objid_count = idx;
286                         mds->mds_lov_objid_lastpage = page;
287                         mds->mds_lov_objid_lastidx = off;
288                 }
289                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
290         }
291 out:
292         RETURN(rc);
293 }
294
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
296 {
297         int rc;
298         struct obdo oa;
299         struct obd_trans_info oti = {0};
300         struct lov_stripe_md  *empty_ea = NULL;
301         ENTRY;
302
303         LASSERT(mds->mds_lov_page_array != NULL);
304
305         /* This create will in fact either create or destroy:  If the OST is
306          * missing objects below this ID, they will be created.  If it finds
307          * objects above this ID, they will be removed. */
308         memset(&oa, 0, sizeof(oa));
309         oa.o_flags = OBD_FL_DELORPHAN;
310         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
311         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312         if (ost_uuid != NULL)
313                 oti.oti_ost_uuid = ost_uuid;       
314         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
315
316         RETURN(rc);
317 }
318
319 /* for one target */
320 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
321 {
322         struct mds_obd *mds = &obd->u.mds;
323         int rc;
324         struct obd_id_info info;
325         ENTRY;
326
327         LASSERT(!obd->obd_recovering);
328
329         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
330         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
331
332         info.idx = idx;
333         info.data = id;
334         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
335                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
336         if (rc)
337                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
338                         obd->obd_name, rc);
339
340         RETURN(rc);
341 }
342
343 static __u32 mds_lov_get_idx(struct obd_export *lov,
344                              struct obd_uuid *ost_uuid)
345 {
346         int rc;
347         int valsize = sizeof(ost_uuid);
348
349         rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
350                           &valsize, ost_uuid, NULL);
351         LASSERT(rc >= 0);
352
353         RETURN(rc);
354 }
355
356 /* Update the lov desc for a new size lov. */
357 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
358 {
359         struct mds_obd *mds = &obd->u.mds;
360         struct lov_desc *ld;
361         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
362         int rc = 0;
363         ENTRY;
364
365         OBD_ALLOC(ld, sizeof(*ld));
366         if (!ld)
367                 RETURN(-ENOMEM);
368
369         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
370                           &valsize, ld, NULL);
371         if (rc)
372                 GOTO(out, rc);
373
374         /* Don't change the mds_lov_desc until the objids size matches the
375            count (paranoia) */
376         mds->mds_lov_desc = *ld;
377         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
378                mds->mds_lov_desc.ld_tgt_count);
379
380         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
381                                mds->mds_lov_desc.ld_tgt_count);
382
383         mds->mds_max_mdsize = lov_mds_md_size(stripes);
384         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
385         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
386                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
387                stripes);
388
389         /* If we added a target we have to reconnect the llogs */
390         /* We only _need_ to do this at first add (idx), or the first time
391            after recovery.  However, it should now be safe to call anytime. */
392         rc = llog_cat_initialize(obd, &obd->obd_olg,
393                                  mds->mds_lov_desc.ld_tgt_count, NULL);
394
395         /*XXX this notifies the MDD until lov handling use old mds code */
396         if (obd->obd_upcall.onu_owner) {
397                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
398                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
399                                                  obd->obd_upcall.onu_owner);
400         }
401 out:
402         OBD_FREE(ld, sizeof(*ld));
403         RETURN(rc);
404 }
405
406
407 #define MDSLOV_NO_INDEX -1
408
409 /* Inform MDS about new/updated target */
410 static int mds_lov_update_mds(struct obd_device *obd,
411                               struct obd_device *watched,
412                               __u32 idx)
413 {
414         struct mds_obd *mds = &obd->u.mds;
415         int rc = 0;
416         int page;
417         int off;
418         obd_id *data;
419
420         ENTRY;
421
422         /* Don't let anyone else mess with mds_lov_objids now */
423         mutex_down(&obd->obd_dev_sem);
424
425         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
426         if (rc)
427                 GOTO(out, rc);
428
429         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
430                idx, obd->obd_recovering, obd->obd_async_recov,
431                mds->mds_lov_desc.ld_tgt_count);
432
433         /* idx is set as data from lov_notify. */
434         if (obd->obd_recovering)
435                 GOTO(out, rc);
436
437         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
438                 CERROR("index %d > count %d!\n", idx,
439                        mds->mds_lov_desc.ld_tgt_count);
440                 GOTO(out, rc = -EINVAL);
441         }
442
443         rc = mds_lov_get_objid(obd, idx);
444         if (rc)
445                 GOTO(out, rc);
446
447         page = idx / OBJID_PER_PAGE();
448         off = idx % OBJID_PER_PAGE();
449         data = mds->mds_lov_page_array[page];
450
451         /* We have read this lastid from disk; tell the osc.
452            Don't call this during recovery. */
453         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
454         if (rc) {
455                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
456                 /* Don't abort the rest of the sync */
457                 rc = 0;
458         } else {
459                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
460                         data[off], idx, rc);
461         }
462 out:
463         mutex_up(&obd->obd_dev_sem);
464         RETURN(rc);
465 }
466
467 /* update the LOV-OSC knowledge of the last used object id's */
468 int mds_lov_connect(struct obd_device *obd, char * lov_name)
469 {
470         struct mds_obd *mds = &obd->u.mds;
471         struct lustre_handle conn = {0,};
472         struct obd_connect_data *data;
473         int rc;
474         ENTRY;
475
476         if (IS_ERR(mds->mds_osc_obd))
477                 RETURN(PTR_ERR(mds->mds_osc_obd));
478
479         if (mds->mds_osc_obd)
480                 RETURN(0);
481
482         mds->mds_osc_obd = class_name2obd(lov_name);
483         if (!mds->mds_osc_obd) {
484                 CERROR("MDS cannot locate LOV %s\n", lov_name);
485                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
486                 RETURN(-ENOTCONN);
487         }
488
489         OBD_ALLOC(data, sizeof(*data));
490         if (data == NULL)
491                 RETURN(-ENOMEM);
492         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
493                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
494                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
495                                   OBD_CONNECT_AT;
496 #ifdef HAVE_LRU_RESIZE_SUPPORT
497         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
498 #endif
499         data->ocd_version = LUSTRE_VERSION_CODE;
500         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
501         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
502         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
503         OBD_FREE(data, sizeof(*data));
504         if (rc) {
505                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
506                 mds->mds_osc_obd = ERR_PTR(rc);
507                 RETURN(rc);
508         }
509         mds->mds_osc_exp = class_conn2export(&conn);
510
511         rc = obd_register_observer(mds->mds_osc_obd, obd);
512         if (rc) {
513                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
514                        lov_name, rc);
515                 GOTO(err_discon, rc);
516         }
517
518         /* Deny new client connections until we are sure we have some OSTs */
519         obd->obd_no_conn = 1;
520
521         mutex_down(&obd->obd_dev_sem);
522         rc = mds_lov_read_objids(obd);
523         if (rc) {
524                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
525                 GOTO(err_reg, rc);
526         }
527
528         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
529         if (rc)
530                 GOTO(err_reg, rc);
531
532         /* tgt_count may be 0! */
533         rc = llog_cat_initialize(obd, &obd->obd_olg, 
534                                  mds->mds_lov_desc.ld_tgt_count, NULL);
535         if (rc) {
536                 CERROR("failed to initialize catalog %d\n", rc);
537                 GOTO(err_reg, rc);
538         }
539
540         /* If we're mounting this code for the first time on an existing FS,
541          * we need to populate the objids array from the real OST values */
542         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
543                 __u32 i = mds->mds_lov_objid_count;
544                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
545                         rc = mds_lov_get_objid(obd, i);
546                         if (rc != 0)
547                                 break;
548                 }
549                 if (rc == 0)
550                         rc = mds_lov_write_objids(obd);
551                 if (rc)
552                         CERROR("got last objids from OSTs, but error "
553                                 "in update objids file: %d\n", rc);
554         }
555         mutex_up(&obd->obd_dev_sem);
556
557         /* I want to see a callback happen when the OBD moves to a
558          * "For General Use" state, and that's when we'll call
559          * set_nextid().  The class driver can help us here, because
560          * it can use the obd_recovering flag to determine when the
561          * the OBD is full available. */
562         /* MDD device will care about that
563         if (!obd->obd_recovering)
564                 rc = mds_postrecov(obd);
565          */
566         RETURN(rc);
567
568 err_reg:
569         mutex_up(&obd->obd_dev_sem);
570         obd_register_observer(mds->mds_osc_obd, NULL);
571 err_discon:
572         obd_disconnect(mds->mds_osc_exp);
573         mds->mds_osc_exp = NULL;
574         mds->mds_osc_obd = ERR_PTR(rc);
575         RETURN(rc);
576 }
577
578 int mds_lov_disconnect(struct obd_device *obd)
579 {
580         struct mds_obd *mds = &obd->u.mds;
581         int rc = 0;
582         ENTRY;
583
584         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
585                 obd_register_observer(mds->mds_osc_obd, NULL);
586
587                 /* The actual disconnect of the mds_lov will be called from
588                  * class_disconnect_exports from mds_lov_clean. So we have to
589                  * ensure that class_cleanup doesn't fail due to the extra ref
590                  * we're holding now. The mechanism to do that already exists -
591                  * the obd_force flag. We'll drop the final ref to the
592                  * mds_osc_exp in mds_cleanup. */
593                 mds->mds_osc_obd->obd_force = 1;
594         }
595
596         RETURN(rc);
597 }
598
599 /* Collect the preconditions we need to allow client connects */
600 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
601 {
602         if (flag & CONFIG_LOG)
603                 obd->u.mds.mds_fl_cfglog = 1;
604         if (flag & CONFIG_SYNC)
605                 obd->u.mds.mds_fl_synced = 1;
606         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
607                 /* Open for clients */
608                 obd->obd_no_conn = 0;
609 }
610
611 struct mds_lov_sync_info {
612         struct obd_device *mlsi_obd;     /* the lov device to sync */
613         struct obd_device *mlsi_watched; /* target osc */
614         __u32              mlsi_index;   /* index of target */
615 };
616
617 static int mds_propagate_capa_keys(struct mds_obd *mds)
618 {
619         struct lustre_capa_key *key;
620         int i, rc = 0;
621
622         ENTRY;
623
624         if (!mds->mds_capa_keys)
625                 RETURN(0);
626
627         for (i = 0; i < 2; i++) {
628                 key = &mds->mds_capa_keys[i];
629                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
630
631                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
632                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
633                 if (rc) {
634                         DEBUG_CAPA_KEY(D_ERROR, key,
635                                        "propagate failed (rc = %d) for", rc);
636                         RETURN(rc);
637                 }
638         }
639
640         RETURN(0);
641 }
642
643 /* We only sync one osc at a time, so that we don't have to hold
644    any kind of lock on the whole mds_lov_desc, which may change
645    (grow) as a result of mds_lov_add_ost.  This also avoids any
646    kind of mismatch between the lov_desc and the mds_lov_desc,
647    which are not in lock-step during lov_add_obd */
648 static int __mds_lov_synchronize(void *data)
649 {
650         struct mds_lov_sync_info *mlsi = data;
651         struct obd_device *obd = mlsi->mlsi_obd;
652         struct obd_device *watched = mlsi->mlsi_watched;
653         struct mds_obd *mds = &obd->u.mds;
654         struct obd_uuid *uuid;
655         __u32  idx = mlsi->mlsi_index;
656         struct mds_group_info mgi;
657         struct llog_ctxt *ctxt;
658         int rc = 0;
659         ENTRY;
660
661         OBD_FREE_PTR(mlsi);
662
663         LASSERT(obd);
664         LASSERT(watched);
665         uuid = &watched->u.cli.cl_target_uuid;
666         LASSERT(uuid);
667
668         down_read(&mds->mds_notify_lock);
669         if (obd->obd_stopping || obd->obd_fail)
670                 GOTO(out, rc = -ENODEV);
671
672         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
673         rc = mds_lov_update_mds(obd, watched, idx);
674         if (rc != 0) {
675                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
676                 GOTO(out, rc);
677         }
678         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
679         mgi.uuid = uuid;
680
681         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
682                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
683         if (rc != 0)
684                 GOTO(out, rc);
685         /* propagate capability keys */
686         rc = mds_propagate_capa_keys(mds);
687         if (rc)
688                 GOTO(out, rc);
689
690         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
691         if (!ctxt) 
692                 GOTO(out, rc = -ENODEV);
693
694         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
695         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
696                           NULL, NULL, uuid); 
697         llog_ctxt_put(ctxt);
698         if (rc != 0) {
699                 CERROR("%s failed at llog_origin_connect: %d\n",
700                        obd_uuid2str(uuid), rc);
701                 GOTO(out, rc);
702         }
703
704         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
705               obd->obd_name, obd_uuid2str(uuid));
706         rc = mds_lov_clear_orphans(mds, uuid);
707         if (rc != 0) {
708                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
709                        obd_uuid2str(uuid), rc);
710                 GOTO(out, rc);
711         }
712
713         if (obd->obd_upcall.onu_owner) {
714                 /*
715                  * This is a hack for mds_notify->mdd_notify. When the mds obd
716                  * in mdd is removed, This hack should be removed.
717                  */
718                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
719                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
720                                                  obd->obd_upcall.onu_owner);
721         }
722         EXIT;
723 out:
724         up_read(&mds->mds_notify_lock);
725         if (rc) {
726                 /* Deactivate it for safety */
727                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
728                        rc);
729                 if (!obd->obd_stopping && mds->mds_osc_obd &&
730                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
731                         obd_notify(mds->mds_osc_obd, watched,
732                                    OBD_NOTIFY_INACTIVE, NULL);
733         }
734
735         class_decref(obd);
736         return rc;
737 }
738
739 int mds_lov_synchronize(void *data)
740 {
741         struct mds_lov_sync_info *mlsi = data;
742         char name[20];
743
744         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
745         ptlrpc_daemonize(name);
746
747         RETURN(__mds_lov_synchronize(data));
748 }
749
750 int mds_lov_start_synchronize(struct obd_device *obd,
751                               struct obd_device *watched,
752                               void *data, int nonblock)
753 {
754         struct mds_lov_sync_info *mlsi;
755         int rc;
756         struct mds_obd *mds = &obd->u.mds;
757         struct obd_uuid *uuid;
758         ENTRY;
759
760         LASSERT(watched);
761         uuid = &watched->u.cli.cl_target_uuid;
762
763         OBD_ALLOC(mlsi, sizeof(*mlsi));
764         if (mlsi == NULL)
765                 RETURN(-ENOMEM);
766
767         mlsi->mlsi_obd = obd;
768         mlsi->mlsi_watched = watched;
769         if (data)
770                 mlsi->mlsi_index = *(__u32 *)data;
771         else
772                 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
773
774         /* Although class_export_get(obd->obd_self_export) would lock
775            the MDS in place, since it's only a self-export
776            it doesn't lock the LOV in place.  The LOV can be disconnected
777            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
778            Simply taking an export ref on the LOV doesn't help, because it's
779            still disconnected. Taking an obd reference insures that we don't
780            disconnect the LOV.  This of course means a cleanup won't
781            finish for as long as the sync is blocking. */
782         class_incref(obd);
783
784         if (nonblock) {
785                 /* Synchronize in the background */
786                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
787                                        CLONE_VM | CLONE_FILES);
788                 if (rc < 0) {
789                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
790                                obd->obd_name, rc);
791                         class_decref(obd);
792                 } else {
793                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
794                                "thread=%d\n", obd->obd_name,
795                                mlsi->mlsi_index, rc);
796                         rc = 0;
797                 }
798         } else {
799                 rc = __mds_lov_synchronize((void *)mlsi);
800         }
801
802         RETURN(rc);
803 }
804
805 int mds_notify(struct obd_device *obd, struct obd_device *watched,
806                enum obd_notify_event ev, void *data)
807 {
808         int rc = 0;
809         ENTRY;
810
811         switch (ev) {
812         /* We only handle these: */
813         case OBD_NOTIFY_ACTIVE:
814         case OBD_NOTIFY_SYNC:
815         case OBD_NOTIFY_SYNC_NONBLOCK:
816                 break;
817         case OBD_NOTIFY_CONFIG:
818                 mds_allow_cli(obd, (unsigned long)data);
819         default:
820                 RETURN(0);
821         }
822
823         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
824         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
825                 CERROR("unexpected notification of %s %s!\n",
826                        watched->obd_type->typ_name, watched->obd_name);
827                 RETURN(-EINVAL);
828         }
829
830         if (obd->obd_recovering) {
831                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
832                       obd->obd_name,
833                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
834                 /* We still have to fix the lov descriptor for ost's added
835                    after the mdt in the config log.  They didn't make it into
836                    mds_lov_connect. */
837                 mutex_down(&obd->obd_dev_sem);
838                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
839                 if (rc) {
840                         mutex_up(&obd->obd_dev_sem);
841                         RETURN(rc);
842                 }
843                 /* We should update init llog here too for replay unlink and 
844                  * possiable llog init race when recovery complete */
845                 llog_cat_initialize(obd, &obd->obd_olg, 
846                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
847                                     &watched->u.cli.cl_target_uuid);
848                 mutex_up(&obd->obd_dev_sem);
849                 mds_allow_cli(obd, CONFIG_SYNC);
850                 RETURN(rc);
851         }
852
853         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
854         rc = mds_lov_start_synchronize(obd, watched, data,
855                                        !(ev == OBD_NOTIFY_SYNC));
856
857         lquota_recovery(mds_quota_interface_ref, obd);
858
859         RETURN(rc);
860 }