Whamcloud - gitweb
Land b_head_interop_disk on HEAD (20081119_1314)
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52
53 #include "mds_internal.h"
54
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
56 {
57         struct mds_obd *mds = &obd->u.mds;
58         unsigned int i=0, j;
59
60         CDEBUG(D_INFO, "dump from %s\n", label);
61         if (mds->mds_lov_page_dirty == NULL) {
62                 CERROR("NULL bitmap!\n");
63                 GOTO(skip_bitmap, i);
64         }
65
66         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
68 skip_bitmap:
69         if (mds->mds_lov_page_array == NULL) {
70                 CERROR("not init page array!\n");
71                 GOTO(skip_array, i);
72
73         }
74         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75                 obd_id *data = mds->mds_lov_page_array[i];
76
77                 if (data == NULL)
78                         continue;
79
80                 for(j=0; j < OBJID_PER_PAGE(); j++) {
81                         if (data[j] == 0)
82                                 continue;
83                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
84                 }
85         }
86 skip_array:
87         EXIT;
88 }
89
90 int mds_lov_init_objids(struct obd_device *obd)
91 {
92         struct mds_obd *mds = &obd->u.mds;
93         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
94         struct file *file;
95         int rc;
96         ENTRY;
97
98         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
99
100         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101         if (mds->mds_lov_page_dirty == NULL)
102                 RETURN(-ENOMEM);
103
104
105         OBD_ALLOC(mds->mds_lov_page_array, size);
106         if (mds->mds_lov_page_array == NULL)
107                 GOTO(err_free_bitmap, rc = -ENOMEM);
108
109         /* open and test the lov objd file */
110         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
111         if (IS_ERR(file)) {
112                 rc = PTR_ERR(file);
113                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114                 GOTO(err_free, rc = PTR_ERR(file));
115         }
116         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118                        file->f_dentry->d_inode->i_mode);
119                 GOTO(err_open, rc = -ENOENT);
120         }
121         mds->mds_lov_objid_filp = file;
122
123         RETURN (0);
124 err_open:
125         if (filp_close((struct file *)file, 0))
126                 CERROR("can't close %s after error\n", LOV_OBJID);
127 err_free:
128         OBD_FREE(mds->mds_lov_page_array, size);
129 err_free_bitmap:
130         FREE_BITMAP(mds->mds_lov_page_dirty);
131
132         RETURN(rc);
133 }
134
135 void mds_lov_destroy_objids(struct obd_device *obd)
136 {
137         struct mds_obd *mds = &obd->u.mds;
138         int i, rc;
139         ENTRY;
140
141         if (mds->mds_lov_page_array != NULL) {
142                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143                         obd_id *data = mds->mds_lov_page_array[i];
144                         if (data != NULL)
145                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
146                 }
147                 OBD_FREE(mds->mds_lov_page_array,
148                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
149         }
150
151         if (mds->mds_lov_objid_filp) {
152                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153                 mds->mds_lov_objid_filp = NULL;
154                 if (rc)
155                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
156         }
157
158         FREE_BITMAP(mds->mds_lov_page_dirty);
159         EXIT;
160 }
161
162 static int mds_lov_read_objids(struct obd_device *obd)
163 {
164         struct mds_obd *mds = &obd->u.mds;
165         loff_t off = 0;
166         int i, rc, count = 0, page = 0;
167         unsigned long size;
168         ENTRY;
169
170         /* Read everything in the file, even if our current lov desc
171            has fewer targets. Old targets not in the lov descriptor
172            during mds setup may still have valid objids. */
173         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
174         if (size == 0)
175                 RETURN(0);
176
177         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
178         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
179
180         for (i = 0; i < page; i++) {
181                 obd_id *data =  mds->mds_lov_page_array[i];
182                 loff_t off_old = off;
183
184                 LASSERT(data == NULL);
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         GOTO(out, rc = -ENOMEM);
188
189                 mds->mds_lov_page_array[i] = data;
190
191                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
192                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
193                 if (rc < 0) {
194                         CERROR("Error reading objids %d\n", rc);
195                         GOTO(out, rc);
196                 }
197                 if (off == off_old)
198                         break; // eof
199
200                 count += (off - off_old)/sizeof(obd_id);
201         }
202         mds->mds_lov_objid_count = count;
203         if (count) {
204                 count --;
205                 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
206                 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
207         }
208         CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
209                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
210 out:
211         mds_lov_dump_objids("read",obd);
212
213         RETURN(0);
214 }
215
216 int mds_lov_write_objids(struct obd_device *obd)
217 {
218         struct mds_obd *mds = &obd->u.mds;
219         int i, rc = 0;
220         ENTRY;
221
222         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
223                 RETURN(0);
224
225         mds_lov_dump_objids("write", obd);
226
227         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
228                 obd_id *data =  mds->mds_lov_page_array[i];
229                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
230                 loff_t off = i * size;
231
232                 LASSERT(data != NULL);
233
234                 /* check for particaly filled last page */
235                 if (i == mds->mds_lov_objid_lastpage)
236                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
237
238                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
239                                          size, &off, 0);
240                 if (rc < 0)
241                         break;
242                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
243         }
244         if (rc >= 0)
245                 rc = 0;
246
247         RETURN(rc);
248 }
249 EXPORT_SYMBOL(mds_lov_write_objids);
250
251 static int mds_lov_get_objid(struct obd_device * obd,
252                              __u32 idx)
253 {
254         struct mds_obd *mds = &obd->u.mds;
255         unsigned int page;
256         unsigned int off;
257         obd_id *data;
258         int rc = 0;
259         ENTRY;
260
261         page = idx / OBJID_PER_PAGE();
262         off = idx % OBJID_PER_PAGE();
263         data = mds->mds_lov_page_array[page];
264         if (data == NULL) {
265                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
266                 if (data == NULL)
267                         GOTO(out, rc = -ENOMEM);
268
269                 mds->mds_lov_page_array[page] = data;
270         }
271
272         if (data[off] == 0) {
273                 /* We never read this lastid; ask the osc */
274                 struct obd_id_info lastid;
275                 __u32 size = sizeof(lastid);
276
277                 lastid.idx = idx;
278                 lastid.data = &data[off];
279                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
280                                   KEY_LAST_ID, &size, &lastid, NULL);
281                 if (rc)
282                         GOTO(out, rc);
283
284                 if (idx > mds->mds_lov_objid_count) {
285                         mds->mds_lov_objid_count = idx;
286                         mds->mds_lov_objid_lastpage = page;
287                         mds->mds_lov_objid_lastidx = off;
288                 }
289                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
290         }
291 out:
292         RETURN(rc);
293 }
294
295 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
296 {
297         int rc;
298         struct obdo oa;
299         struct obd_trans_info oti = {0};
300         struct lov_stripe_md  *empty_ea = NULL;
301         ENTRY;
302
303         LASSERT(mds->mds_lov_page_array != NULL);
304
305         /* This create will in fact either create or destroy:  If the OST is
306          * missing objects below this ID, they will be created.  If it finds
307          * objects above this ID, they will be removed. */
308         memset(&oa, 0, sizeof(oa));
309         oa.o_flags = OBD_FL_DELORPHAN;
310         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
311         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
312         if (ost_uuid != NULL)
313                 oti.oti_ost_uuid = ost_uuid;
314         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
315
316         RETURN(rc);
317 }
318
319 /* for one target */
320 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
321 {
322         struct mds_obd *mds = &obd->u.mds;
323         int rc;
324         struct obd_id_info info;
325         ENTRY;
326
327         LASSERT(!obd->obd_recovering);
328
329         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
330         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
331
332         info.idx = idx;
333         info.data = id;
334         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
335                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
336         if (rc)
337                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
338                         obd->obd_name, rc);
339
340         RETURN(rc);
341 }
342
343 /* Update the lov desc for a new size lov. */
344 static int mds_lov_update_desc(struct obd_device *obd, int idx,
345                                struct obd_uuid *uuid)
346 {
347         struct mds_obd *mds = &obd->u.mds;
348         struct lov_desc *ld;
349         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
350         int rc = 0;
351         ENTRY;
352
353         OBD_ALLOC(ld, sizeof(*ld));
354         if (!ld)
355                 RETURN(-ENOMEM);
356
357         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
358                           &valsize, ld, NULL);
359         if (rc)
360                 GOTO(out, rc);
361
362         /* Don't change the mds_lov_desc until the objids size matches the
363            count (paranoia) */
364         mds->mds_lov_desc = *ld;
365         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
366                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
367
368         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
369                                mds->mds_lov_desc.ld_tgt_count);
370
371         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
372         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
373         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
374                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
375                stripes);
376
377         /* If we added a target we have to reconnect the llogs */
378         /* We only _need_ to do this at first add (idx), or the first time
379            after recovery.  However, it should now be safe to call anytime. */
380         rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
381         if (rc)
382                 GOTO(out, rc);
383
384         /*XXX this notifies the MDD until lov handling use old mds code */
385         if (obd->obd_upcall.onu_owner) {
386                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
387                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
388                                                  obd->obd_upcall.onu_owner);
389         }
390 out:
391         OBD_FREE(ld, sizeof(*ld));
392         RETURN(rc);
393 }
394
395 /* Inform MDS about new/updated target */
396 static int mds_lov_update_mds(struct obd_device *obd,
397                               struct obd_device *watched,
398                               __u32 idx)
399 {
400         struct mds_obd *mds = &obd->u.mds;
401         int rc = 0;
402         int page;
403         int off;
404         obd_id *data;
405
406         ENTRY;
407
408         /* Don't let anyone else mess with mds_lov_objids now */
409         mutex_down(&obd->obd_dev_sem);
410
411         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
412         if (rc)
413                 GOTO(out, rc);
414
415         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
416                idx, obd->obd_recovering, obd->obd_async_recov,
417                mds->mds_lov_desc.ld_tgt_count);
418
419         /* idx is set as data from lov_notify. */
420         if (obd->obd_recovering)
421                 GOTO(out, rc);
422
423         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
424                 CERROR("index %d > count %d!\n", idx,
425                        mds->mds_lov_desc.ld_tgt_count);
426                 GOTO(out, rc = -EINVAL);
427         }
428
429         rc = mds_lov_get_objid(obd, idx);
430         if (rc)
431                 GOTO(out, rc);
432
433         page = idx / OBJID_PER_PAGE();
434         off = idx % OBJID_PER_PAGE();
435         data = mds->mds_lov_page_array[page];
436
437         /* We have read this lastid from disk; tell the osc.
438            Don't call this during recovery. */
439         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
440         if (rc) {
441                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
442                 /* Don't abort the rest of the sync */
443                 rc = 0;
444         } else {
445                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
446                         data[off], idx, rc);
447         }
448 out:
449         mutex_up(&obd->obd_dev_sem);
450         RETURN(rc);
451 }
452
453 /* update the LOV-OSC knowledge of the last used object id's */
454 int mds_lov_connect(struct obd_device *obd, char * lov_name)
455 {
456         struct mds_obd *mds = &obd->u.mds;
457         struct lustre_handle conn = {0,};
458         struct obd_connect_data *data;
459         int rc;
460         ENTRY;
461
462         if (IS_ERR(mds->mds_osc_obd))
463                 RETURN(PTR_ERR(mds->mds_osc_obd));
464
465         if (mds->mds_osc_obd)
466                 RETURN(0);
467
468         mds->mds_osc_obd = class_name2obd(lov_name);
469         if (!mds->mds_osc_obd) {
470                 CERROR("MDS cannot locate LOV %s\n", lov_name);
471                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
472                 RETURN(-ENOTCONN);
473         }
474
475         OBD_ALLOC(data, sizeof(*data));
476         if (data == NULL)
477                 RETURN(-ENOMEM);
478         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
479                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
480                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
481                                   OBD_CONNECT_AT | OBD_CONNECT_CHANGE_QS;
482 #ifdef HAVE_LRU_RESIZE_SUPPORT
483         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
484 #endif
485         data->ocd_version = LUSTRE_VERSION_CODE;
486         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
487         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
488         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
489         OBD_FREE(data, sizeof(*data));
490         if (rc) {
491                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
492                 mds->mds_osc_obd = ERR_PTR(rc);
493                 RETURN(rc);
494         }
495         mds->mds_osc_exp = class_conn2export(&conn);
496
497         rc = obd_register_observer(mds->mds_osc_obd, obd);
498         if (rc) {
499                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
500                        lov_name, rc);
501                 GOTO(err_discon, rc);
502         }
503
504         /* Deny new client connections until we are sure we have some OSTs */
505         obd->obd_no_conn = 1;
506
507         mutex_down(&obd->obd_dev_sem);
508         rc = mds_lov_read_objids(obd);
509         if (rc) {
510                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
511                 GOTO(err_reg, rc);
512         }
513         mutex_up(&obd->obd_dev_sem);
514
515         /* I want to see a callback happen when the OBD moves to a
516          * "For General Use" state, and that's when we'll call
517          * set_nextid().  The class driver can help us here, because
518          * it can use the obd_recovering flag to determine when the
519          * the OBD is full available. */
520         /* MDD device will care about that
521         if (!obd->obd_recovering)
522                 rc = mds_postrecov(obd);
523          */
524         RETURN(rc);
525
526 err_reg:
527         mutex_up(&obd->obd_dev_sem);
528         obd_register_observer(mds->mds_osc_obd, NULL);
529 err_discon:
530         obd_disconnect(mds->mds_osc_exp);
531         mds->mds_osc_exp = NULL;
532         mds->mds_osc_obd = ERR_PTR(rc);
533         RETURN(rc);
534 }
535
536 int mds_lov_disconnect(struct obd_device *obd)
537 {
538         struct mds_obd *mds = &obd->u.mds;
539         int rc = 0;
540         ENTRY;
541
542         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
543                 obd_register_observer(mds->mds_osc_obd, NULL);
544
545                 /* The actual disconnect of the mds_lov will be called from
546                  * class_disconnect_exports from mds_lov_clean. So we have to
547                  * ensure that class_cleanup doesn't fail due to the extra ref
548                  * we're holding now. The mechanism to do that already exists -
549                  * the obd_force flag. We'll drop the final ref to the
550                  * mds_osc_exp in mds_cleanup. */
551                 mds->mds_osc_obd->obd_force = 1;
552         }
553
554         RETURN(rc);
555 }
556
557 /* Collect the preconditions we need to allow client connects */
558 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
559 {
560         if (flag & CONFIG_LOG)
561                 obd->u.mds.mds_fl_cfglog = 1;
562         if (flag & CONFIG_SYNC)
563                 obd->u.mds.mds_fl_synced = 1;
564         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
565                 /* Open for clients */
566                 obd->obd_no_conn = 0;
567 }
568
569 struct mds_lov_sync_info {
570         struct obd_device *mlsi_obd;     /* the lov device to sync */
571         struct obd_device *mlsi_watched; /* target osc */
572         __u32              mlsi_index;   /* index of target */
573 };
574
575 static int mds_propagate_capa_keys(struct mds_obd *mds)
576 {
577         struct lustre_capa_key *key;
578         int i, rc = 0;
579
580         ENTRY;
581
582         if (!mds->mds_capa_keys)
583                 RETURN(0);
584
585         for (i = 0; i < 2; i++) {
586                 key = &mds->mds_capa_keys[i];
587                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
588
589                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
590                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
591                 if (rc) {
592                         DEBUG_CAPA_KEY(D_ERROR, key,
593                                        "propagate failed (rc = %d) for", rc);
594                         RETURN(rc);
595                 }
596         }
597
598         RETURN(0);
599 }
600
601 /* We only sync one osc at a time, so that we don't have to hold
602    any kind of lock on the whole mds_lov_desc, which may change
603    (grow) as a result of mds_lov_add_ost.  This also avoids any
604    kind of mismatch between the lov_desc and the mds_lov_desc,
605    which are not in lock-step during lov_add_obd */
606 static int __mds_lov_synchronize(void *data)
607 {
608         struct mds_lov_sync_info *mlsi = data;
609         struct obd_device *obd = mlsi->mlsi_obd;
610         struct obd_device *watched = mlsi->mlsi_watched;
611         struct mds_obd *mds = &obd->u.mds;
612         struct obd_uuid *uuid;
613         __u32  idx = mlsi->mlsi_index;
614         struct mds_group_info mgi;
615         struct llog_ctxt *ctxt;
616         int rc = 0;
617         ENTRY;
618
619         OBD_FREE_PTR(mlsi);
620
621         LASSERT(obd);
622         LASSERT(watched);
623         uuid = &watched->u.cli.cl_target_uuid;
624         LASSERT(uuid);
625
626         down_read(&mds->mds_notify_lock);
627         if (obd->obd_stopping || obd->obd_fail)
628                 GOTO(out, rc = -ENODEV);
629
630         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
631         rc = mds_lov_update_mds(obd, watched, idx);
632         if (rc != 0) {
633                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
634                 GOTO(out, rc);
635         }
636         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
637         mgi.uuid = uuid;
638
639         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
640                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
641         if (rc != 0)
642                 GOTO(out, rc);
643         /* propagate capability keys */
644         rc = mds_propagate_capa_keys(mds);
645         if (rc)
646                 GOTO(out, rc);
647
648         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
649         if (!ctxt)
650                 GOTO(out, rc = -ENODEV);
651
652         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
653         rc = llog_connect(ctxt, NULL, NULL, uuid);
654         llog_ctxt_put(ctxt);
655         if (rc != 0) {
656                 CERROR("%s failed at llog_origin_connect: %d\n",
657                        obd_uuid2str(uuid), rc);
658                 GOTO(out, rc);
659         }
660
661         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
662               obd->obd_name, obd_uuid2str(uuid));
663         rc = mds_lov_clear_orphans(mds, uuid);
664         if (rc != 0) {
665                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
666                        obd_uuid2str(uuid), rc);
667                 GOTO(out, rc);
668         }
669
670         if (obd->obd_upcall.onu_owner) {
671                 /*
672                  * This is a hack for mds_notify->mdd_notify. When the mds obd
673                  * in mdd is removed, This hack should be removed.
674                  */
675                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
676                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
677                                                  obd->obd_upcall.onu_owner);
678         }
679         EXIT;
680 out:
681         up_read(&mds->mds_notify_lock);
682         if (rc) {
683                 /* Deactivate it for safety */
684                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
685                        rc);
686                 if (!obd->obd_stopping && mds->mds_osc_obd &&
687                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
688                         obd_notify(mds->mds_osc_obd, watched,
689                                    OBD_NOTIFY_INACTIVE, NULL);
690         }
691
692         class_decref(obd, "mds_lov_synchronize", obd);
693         return rc;
694 }
695
696 int mds_lov_synchronize(void *data)
697 {
698         struct mds_lov_sync_info *mlsi = data;
699         char name[20];
700
701         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
702         ptlrpc_daemonize(name);
703
704         RETURN(__mds_lov_synchronize(data));
705 }
706
707 int mds_lov_start_synchronize(struct obd_device *obd,
708                               struct obd_device *watched,
709                               void *data, int nonblock)
710 {
711         struct mds_lov_sync_info *mlsi;
712         int rc;
713         struct obd_uuid *uuid;
714         ENTRY;
715
716         LASSERT(watched);
717         uuid = &watched->u.cli.cl_target_uuid;
718
719         OBD_ALLOC(mlsi, sizeof(*mlsi));
720         if (mlsi == NULL)
721                 RETURN(-ENOMEM);
722
723         mlsi->mlsi_obd = obd;
724         mlsi->mlsi_watched = watched;
725         mlsi->mlsi_index = *(__u32 *)data;
726
727         /* Although class_export_get(obd->obd_self_export) would lock
728            the MDS in place, since it's only a self-export
729            it doesn't lock the LOV in place.  The LOV can be disconnected
730            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
731            Simply taking an export ref on the LOV doesn't help, because it's
732            still disconnected. Taking an obd reference insures that we don't
733            disconnect the LOV.  This of course means a cleanup won't
734            finish for as long as the sync is blocking. */
735         class_incref(obd, "mds_lov_synchronize", obd);
736
737         if (nonblock) {
738                 /* Synchronize in the background */
739                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
740                                        CLONE_VM | CLONE_FILES);
741                 if (rc < 0) {
742                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
743                                obd->obd_name, rc);
744                         class_decref(obd, "mds_lov_synchronize", obd);
745                 } else {
746                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
747                                "thread=%d\n", obd->obd_name,
748                                mlsi->mlsi_index, rc);
749                         rc = 0;
750                 }
751         } else {
752                 rc = __mds_lov_synchronize((void *)mlsi);
753         }
754
755         RETURN(rc);
756 }
757
758 int mds_notify(struct obd_device *obd, struct obd_device *watched,
759                enum obd_notify_event ev, void *data)
760 {
761         int rc = 0;
762         ENTRY;
763
764         switch (ev) {
765         /* We only handle these: */
766         case OBD_NOTIFY_ACTIVE:
767         case OBD_NOTIFY_SYNC:
768         case OBD_NOTIFY_SYNC_NONBLOCK:
769                 break;
770         case OBD_NOTIFY_CONFIG:
771                 mds_allow_cli(obd, (unsigned long)data);
772         default:
773                 RETURN(0);
774         }
775
776         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
777         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
778                 CERROR("unexpected notification of %s %s!\n",
779                        watched->obd_type->typ_name, watched->obd_name);
780                 RETURN(-EINVAL);
781         }
782
783         if (obd->obd_recovering) {
784                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
785                       obd->obd_name,
786                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
787                 /* We still have to fix the lov descriptor for ost's added
788                    after the mdt in the config log.  They didn't make it into
789                    mds_lov_connect. */
790                 mutex_down(&obd->obd_dev_sem);
791                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
792                                         &watched->u.cli.cl_target_uuid);
793                 mutex_up(&obd->obd_dev_sem);
794                 if (rc == 0)
795                         mds_allow_cli(obd, CONFIG_SYNC);
796                 RETURN(rc);
797         }
798
799         rc = mds_lov_start_synchronize(obd, watched, data,
800                                        !(ev == OBD_NOTIFY_SYNC));
801
802         RETURN(rc);
803 }