Whamcloud - gitweb
LU-812 compat: clean up mutex lock to use kernel mutex primitive
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/mds/mds_lov.c
39  *
40  * Lustre Metadata Server (mds) handling of striped file data
41  *
42  * Author: Peter Braam <braam@clusterfs.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MDS
46
47 #include <linux/module.h>
48 #include <lustre_mds.h>
49 #include <lustre/lustre_idl.h>
50 #include <obd_class.h>
51 #include <obd_lov.h>
52 #include <lustre_lib.h>
53 #include <lustre_fsfilt.h>
54 #include <obd_cksum.h>
55
56 #include "mds_internal.h"
57
58 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
59 {
60         struct mds_obd *mds = &obd->u.mds;
61         unsigned int i=0, j;
62
63         if ((libcfs_debug & D_INFO) == 0)
64                 return;
65
66         CDEBUG(D_INFO, "dump from %s\n", label);
67         if (mds->mds_lov_page_dirty == NULL) {
68                 CERROR("NULL bitmap!\n");
69                 GOTO(skip_bitmap, i);
70         }
71
72         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
73                 CDEBUG(D_INFO, "%u - %lx\n", i,
74                        mds->mds_lov_page_dirty->data[i]);
75 skip_bitmap:
76         if (mds->mds_lov_page_array == NULL) {
77                 CERROR("not init page array!\n");
78                 GOTO(skip_array, i);
79
80         }
81         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
82                 obd_id *data = mds->mds_lov_page_array[i];
83
84                 if (data == NULL)
85                         continue;
86
87                 for(j=0; j < OBJID_PER_PAGE(); j++) {
88                         if (data[j] == 0)
89                                 continue;
90                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
91                                i, j, data[j]);
92                 }
93         }
94 skip_array:
95         EXIT;
96 }
97
98 int mds_lov_init_objids(struct obd_device *obd)
99 {
100         struct mds_obd *mds = &obd->u.mds;
101         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
102         struct file *file;
103         int rc;
104         ENTRY;
105
106         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
107
108         mds->mds_lov_page_dirty =
109                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
110         if (mds->mds_lov_page_dirty == NULL)
111                 RETURN(-ENOMEM);
112
113
114         OBD_ALLOC(mds->mds_lov_page_array, size);
115         if (mds->mds_lov_page_array == NULL)
116                 GOTO(err_free_bitmap, rc = -ENOMEM);
117
118         /* open and test the lov objd file */
119         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
120         if (IS_ERR(file)) {
121                 rc = PTR_ERR(file);
122                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
123                 GOTO(err_free, rc = PTR_ERR(file));
124         }
125         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
126                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
127                        file->f_dentry->d_inode->i_mode);
128                 GOTO(err_open, rc = -ENOENT);
129         }
130         mds->mds_lov_objid_filp = file;
131
132         RETURN (0);
133 err_open:
134         if (filp_close((struct file *)file, 0))
135                 CERROR("can't close %s after error\n", LOV_OBJID);
136 err_free:
137         OBD_FREE(mds->mds_lov_page_array, size);
138 err_free_bitmap:
139         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
140
141         RETURN(rc);
142 }
143
144 void mds_lov_destroy_objids(struct obd_device *obd)
145 {
146         struct mds_obd *mds = &obd->u.mds;
147         int i, rc;
148         ENTRY;
149
150         if (mds->mds_lov_page_array != NULL) {
151                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
152                         obd_id *data = mds->mds_lov_page_array[i];
153                         if (data != NULL)
154                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
155                 }
156                 OBD_FREE(mds->mds_lov_page_array,
157                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
158         }
159
160         if (mds->mds_lov_objid_filp) {
161                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
162                 mds->mds_lov_objid_filp = NULL;
163                 if (rc)
164                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
165         }
166
167         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
168         EXIT;
169 }
170
171 /**
172  * currently exist two ways for know about ost count and max ost index.
173  * first - after ost is connected to mds and sync process finished
174  * second - get from lmm in recovery process, in case when mds not have configs,
175  * and ost isn't registered in mgs.
176  *
177  * \param mds pointer to mds structure
178  * \param index maxium ost index
179  *
180  * \retval -ENOMEM is not hame memory for new page
181  * \retval 0 is update passed
182  */
183 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
184 {
185         __u32 page = index / OBJID_PER_PAGE();
186         __u32 off = index % OBJID_PER_PAGE();
187         obd_id *data =  mds->mds_lov_page_array[page];
188
189         if (data == NULL) {
190                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
191                 if (data == NULL)
192                         RETURN(-ENOMEM);
193
194                 mds->mds_lov_page_array[page] = data;
195         }
196
197         if (index > mds->mds_lov_objid_max_index) {
198                 mds->mds_lov_objid_lastpage = page;
199                 mds->mds_lov_objid_lastidx = off;
200                 mds->mds_lov_objid_max_index = index;
201         }
202
203         /* workaround - New target not in objids file; increase mdsize */
204         /* ld_tgt_count is used as the max index everywhere, despite its name. */
205         if (data[off] == 0) {
206                 __u32 max_easize;
207                 __u32 stripes;
208
209                 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
210                 data[off] = 1;
211                 mds->mds_lov_objid_count++;
212                 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
213                               mds->mds_lov_objid_count);
214
215                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
216                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
217
218                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
219                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
220                        mds->mds_max_cookiesize);
221         }
222
223         EXIT;
224         return 0;
225 }
226
227 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
228 {
229         __u32 page = index / OBJID_PER_PAGE();
230         __u32 off = index % OBJID_PER_PAGE();
231         obd_id *data =  mds->mds_lov_page_array[page];
232
233         return (data[off] > 0);
234 }
235
236 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
237 {
238         struct lov_ost_data_v1 *data;
239         __u16 count;
240         int rc = 0;
241         __u32 j;
242
243         /* if we create file without objects - lmm is NULL */
244         if (lmm == NULL)
245                 return 0;
246
247         switch (le32_to_cpu(lmm->lmm_magic)) {
248                 case LOV_MAGIC_V1:
249                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
250                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
251                         break;
252                 case LOV_MAGIC_V3:
253                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
254                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
255                         break;
256                 default:
257                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
258                         RETURN(-EINVAL);
259         }
260
261
262         cfs_mutex_lock(&obd->obd_dev_mutex);
263         for (j = 0; j < count; j++) {
264                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
265                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
266                         rc = -ENOMEM;
267                         break;
268                 }
269         }
270         cfs_mutex_unlock(&obd->obd_dev_mutex);
271
272         RETURN(rc);
273 }
274 EXPORT_SYMBOL(mds_lov_prepare_objids);
275
276 /*
277  * write llog orphan record about lost ost object,
278  * Special lsm is allocated with single stripe, caller should deallocated it
279  * after use
280  */
281 static int mds_log_lost_precreated(struct obd_device *obd,
282                                    struct lov_stripe_md **lsmp, __u16 *stripes,
283                                    obd_id id, obd_count count, int idx)
284 {
285         struct lov_stripe_md *lsm = *lsmp;
286         int rc;
287         ENTRY;
288
289         if (*lsmp == NULL) {
290                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
291                 if (rc < 0)
292                         RETURN(rc);
293                 /* need only one stripe, save old value */
294                 *stripes = lsm->lsm_stripe_count;
295                 lsm->lsm_stripe_count = 1;
296                 *lsmp = lsm;
297         }
298
299         lsm->lsm_oinfo[0]->loi_id = id;
300         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
301         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
302
303         rc = mds_log_op_orphan(obd, lsm, count);
304         RETURN(rc);
305 }
306
307 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
308 {
309         struct mds_obd *mds = &obd->u.mds;
310         int j;
311         struct lov_ost_data_v1 *obj;
312         struct lov_stripe_md *lsm = NULL;
313         __u16 stripes = 0;
314         int count;
315         ENTRY;
316
317         /* if we create file without objects - lmm is NULL */
318         if (lmm == NULL)
319                 return;
320
321         switch (le32_to_cpu(lmm->lmm_magic)) {
322                 case LOV_MAGIC_V1:
323                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
324                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
325                         break;
326                 case LOV_MAGIC_V3:
327                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
328                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
329                         break;
330                 default:
331                         CERROR("Unknow lmm type %X !\n",
332                                le32_to_cpu(lmm->lmm_magic));
333                         return;
334         }
335
336         for (j = 0; j < count; j++) {
337                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
338                 obd_id id = le64_to_cpu(obj[j].l_object_id);
339                 __u32 page = i / OBJID_PER_PAGE();
340                 __u32 idx = i % OBJID_PER_PAGE();
341                 obd_id *data;
342
343                 data = mds->mds_lov_page_array[page];
344
345                 CDEBUG(D_INODE,"update last object for ost %u"
346                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
347                 if (id > data[idx]) {
348                         int lost = id - data[idx] - 1;
349                         /* we might have lost precreated objects due to VBR */
350                         if (lost > 0 && obd->obd_recovering) {
351                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
352                                 if (!obd->obd_version_recov)
353                                         CERROR("Unexpected gap in objids\n");
354                                 /* lsm is allocated if NULL */
355                                 mds_log_lost_precreated(obd, &lsm, &stripes,
356                                                         data[idx]+1, lost, i);
357                         }
358                         data[idx] = id;
359                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
360                 }
361         }
362         if (lsm) {
363                 /* restore stripes number */
364                 lsm->lsm_stripe_count = stripes;
365                 obd_free_memmd(mds->mds_lov_exp, &lsm);
366         }
367         EXIT;
368         return;
369 }
370 EXPORT_SYMBOL(mds_lov_update_objids);
371
372 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
373                                     __u32 count)
374 {
375         __u32 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
376         __u32 i, stripes;
377
378         for (i = 0; i < count; i++) {
379                 if (data[i] == 0)
380                         continue;
381
382                 mds->mds_lov_objid_count++;
383         }
384
385         stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
386                          mds->mds_lov_objid_count);
387
388         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
389         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
390
391         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
392                "%d/%d\n", stripes, mds->mds_max_mdsize,
393                mds->mds_max_cookiesize);
394
395         EXIT;
396         return 0;
397 }
398
399 static int mds_lov_read_objids(struct obd_device *obd)
400 {
401         struct mds_obd *mds = &obd->u.mds;
402         loff_t off = 0;
403         int i, rc = 0, count = 0, page = 0;
404         unsigned long size;
405         ENTRY;
406
407         /* Read everything in the file, even if our current lov desc
408            has fewer targets. Old targets not in the lov descriptor
409            during mds setup may still have valid objids. */
410         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
411         if (size == 0)
412                 RETURN(0);
413
414         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
415         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
416         for (i = 0; i < page; i++) {
417                 obd_id *data;
418                 loff_t off_old = off;
419
420                 LASSERT(mds->mds_lov_page_array[i] == NULL);
421                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
422                 if (mds->mds_lov_page_array[i] == NULL)
423                         GOTO(out, rc = -ENOMEM);
424
425                 data = mds->mds_lov_page_array[i];
426
427                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
428                                         MDS_LOV_ALLOC_SIZE, &off);
429                 if (rc < 0) {
430                         CERROR("Error reading objids %d\n", rc);
431                         GOTO(out, rc);
432                 }
433                 if (off == off_old) /* hole is read */
434                         off += MDS_LOV_ALLOC_SIZE;
435
436                 count = (off - off_old) / sizeof(obd_id);
437                 if (mds_lov_update_from_read(mds, data, count)) {
438                         CERROR("Can't update mds data\n");
439                         GOTO(out, rc = -EIO);
440                 }
441         }
442         mds->mds_lov_objid_lastpage = page - 1;
443         mds->mds_lov_objid_lastidx = count - 1;
444
445         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
446                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
447 out:
448         mds_lov_dump_objids("read",obd);
449
450         RETURN(rc);
451 }
452
453 int mds_lov_write_objids(struct obd_device *obd)
454 {
455         struct mds_obd *mds = &obd->u.mds;
456         int i = 0, rc = 0;
457         ENTRY;
458
459         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
460                 RETURN(0);
461
462         mds_lov_dump_objids("write", obd);
463
464         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
465                 obd_id *data =  mds->mds_lov_page_array[i];
466                 unsigned int size = MDS_LOV_ALLOC_SIZE;
467                 loff_t off = i * size;
468
469                 LASSERT(data != NULL);
470
471                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
472                         continue;
473
474                 /* check for particaly filled last page */
475                 if (i == mds->mds_lov_objid_lastpage)
476                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
477
478                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
479                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
480                                          size, &off, 0);
481                 if (rc < 0) {
482                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
483                         break;
484                 }
485         }
486         if (rc >= 0)
487                 rc = 0;
488
489         RETURN(rc);
490 }
491 EXPORT_SYMBOL(mds_lov_write_objids);
492
493 static int mds_lov_get_objid(struct obd_device * obd,
494                              obd_id idx)
495 {
496         struct mds_obd *mds = &obd->u.mds;
497         struct obd_export *lov_exp = mds->mds_lov_exp;
498         unsigned int page;
499         unsigned int off;
500         obd_id *data;
501         __u32 size;
502         int rc = 0;
503         ENTRY;
504
505         page = idx / OBJID_PER_PAGE();
506         off = idx % OBJID_PER_PAGE();
507         data = mds->mds_lov_page_array[page];
508
509         if (data[off] < 2) {
510                 /* We never read this lastid; ask the osc */
511                 struct obd_id_info lastid;
512
513                 size = sizeof(lastid);
514                 lastid.idx = idx;
515                 lastid.data = &data[off];
516                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
517                                   &size, &lastid, NULL);
518                 if (rc)
519                         GOTO(out, rc);
520
521                 /* workaround for clean filter */
522                 if (data[off] == 0)
523                         data[off] = 1;
524
525                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
526         }
527         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
528                idx, data, page, off, data[off]);
529 out:
530         RETURN(rc);
531 }
532
533 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
534 {
535         int rc;
536         struct obdo oa = { 0 };
537         struct obd_trans_info oti = {0};
538         struct lov_stripe_md  *empty_ea = NULL;
539         ENTRY;
540
541         LASSERT(mds->mds_lov_page_array != NULL);
542
543         /* This create will in fact either create or destroy:  If the OST is
544          * missing objects below this ID, they will be created.  If it finds
545          * objects above this ID, they will be removed. */
546         memset(&oa, 0, sizeof(oa));
547         oa.o_flags = OBD_FL_DELORPHAN;
548         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
549         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
550         if (ost_uuid != NULL)
551                 oti.oti_ost_uuid = ost_uuid;
552
553         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
554
555         RETURN(rc);
556 }
557
558 /* for one target */
559 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
560 {
561         struct mds_obd *mds = &obd->u.mds;
562         int rc;
563         struct obd_id_info info;
564         ENTRY;
565
566         LASSERT(!obd->obd_recovering);
567
568         info.idx = idx;
569         info.data = id;
570         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
571                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
572         if (rc)
573                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
574                         obd->obd_name, rc);
575
576         RETURN(rc);
577 }
578
579 /* Update the lov desc for a new size lov. */
580 static int mds_lov_update_desc(struct obd_device *obd, int idx,
581                                struct obd_uuid *uuid)
582 {
583         struct mds_obd *mds = &obd->u.mds;
584         struct lov_desc *ld;
585         __u32 valsize = sizeof(mds->mds_lov_desc);
586         int rc = 0;
587         ENTRY;
588
589         OBD_ALLOC(ld, sizeof(*ld));
590         if (!ld)
591                 RETURN(-ENOMEM);
592
593         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
594                           &valsize, ld, NULL);
595         if (rc)
596                 GOTO(out, rc);
597
598         /* Don't change the mds_lov_desc until the objids size matches the
599            count (paranoia) */
600         mds->mds_lov_desc = *ld;
601         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
602                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
603
604         cfs_mutex_lock(&obd->obd_dev_mutex);
605         rc = mds_lov_update_max_ost(mds, idx);
606         cfs_mutex_unlock(&obd->obd_dev_mutex);
607         if (rc != 0)
608                 GOTO(out, rc );
609
610         /* If we added a target we have to reconnect the llogs */
611         /* We only _need_ to do this at first add (idx), or the first time
612            after recovery.  However, it should now be safe to call anytime. */
613         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
614         if (rc)
615                 GOTO(out, rc);
616
617 out:
618         OBD_FREE(ld, sizeof(*ld));
619         RETURN(rc);
620 }
621
622 /* Inform MDS about new/updated target */
623 static int mds_lov_update_mds(struct obd_device *obd,
624                               struct obd_device *watched,
625                               __u32 idx)
626 {
627         struct mds_obd *mds = &obd->u.mds;
628         int rc = 0;
629         int page;
630         int off;
631         obd_id *data;
632         ENTRY;
633
634         LASSERT(mds_lov_objinit(mds, idx));
635
636         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
637                idx, obd->obd_recovering, obd->obd_async_recov,
638                mds->mds_lov_desc.ld_tgt_count);
639
640         /* idx is set as data from lov_notify. */
641         if (obd->obd_recovering)
642                 GOTO(out, rc);
643
644         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
645                 CERROR("index %d > count %d!\n", idx,
646                        mds->mds_lov_desc.ld_tgt_count);
647                 GOTO(out, rc = -EINVAL);
648         }
649
650         rc = mds_lov_get_objid(obd, idx);
651         if (rc)
652                 GOTO(out, rc);
653
654         page = idx / OBJID_PER_PAGE();
655         off = idx % OBJID_PER_PAGE();
656         data = mds->mds_lov_page_array[page];
657
658         /* We have read this lastid from disk; tell the osc.
659            Don't call this during recovery. */
660         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
661         if (rc) {
662                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
663                 /* Don't abort the rest of the sync */
664                 rc = 0;
665         } else {
666                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
667                         data[off], idx, rc);
668         }
669 out:
670         RETURN(rc);
671 }
672
673 /* update the LOV-OSC knowledge of the last used object id's */
674 int mds_lov_connect(struct obd_device *obd, char * lov_name)
675 {
676         struct mds_obd *mds = &obd->u.mds;
677         struct obd_connect_data *data;
678         int rc;
679         ENTRY;
680
681         if (IS_ERR(mds->mds_lov_obd))
682                 RETURN(PTR_ERR(mds->mds_lov_obd));
683
684         if (mds->mds_lov_obd)
685                 RETURN(0);
686
687         mds->mds_lov_obd = class_name2obd(lov_name);
688         if (!mds->mds_lov_obd) {
689                 CERROR("MDS cannot locate LOV %s\n", lov_name);
690                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
691                 RETURN(-ENOTCONN);
692         }
693
694         cfs_mutex_lock(&obd->obd_dev_mutex);
695         rc = mds_lov_read_objids(obd);
696         cfs_mutex_unlock(&obd->obd_dev_mutex);
697         if (rc) {
698                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
699                 GOTO(err_exit, rc);
700         }
701
702         rc = obd_register_observer(mds->mds_lov_obd, obd);
703         if (rc) {
704                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
705                        lov_name, rc);
706                 GOTO(err_exit, rc);
707         }
708
709         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
710          * targets */
711         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
712
713         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
714
715         OBD_ALLOC(data, sizeof(*data));
716         if (data == NULL)
717                 GOTO(err_exit, rc = -ENOMEM);
718
719         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
720                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
721                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FULL20  |
722                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
723                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
724                                   OBD_CONNECT_SOM | OBD_CONNECT_MAX_EASIZE;
725 #ifdef HAVE_LRU_RESIZE_SUPPORT
726         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
727 #endif
728         data->ocd_version = LUSTRE_VERSION_CODE;
729         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
730         data->ocd_max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
731
732         /* send max bytes per rpc */
733         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
734         /* send the list of supported checksum types */
735         data->ocd_cksum_types = cksum_types_supported();
736         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
737         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
738         OBD_FREE(data, sizeof(*data));
739         if (rc) {
740                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
741                 mds->mds_lov_obd = ERR_PTR(rc);
742                 RETURN(rc);
743         }
744
745         /* I want to see a callback happen when the OBD moves to a
746          * "For General Use" state, and that's when we'll call
747          * set_nextid().  The class driver can help us here, because
748          * it can use the obd_recovering flag to determine when the
749          * the OBD is full available. */
750         /* MDD device will care about that
751         if (!obd->obd_recovering)
752                 rc = mds_postrecov(obd);
753          */
754         RETURN(rc);
755
756 err_exit:
757         mds->mds_lov_exp = NULL;
758         mds->mds_lov_obd = ERR_PTR(rc);
759         RETURN(rc);
760 }
761
762 int mds_lov_disconnect(struct obd_device *obd)
763 {
764         struct mds_obd *mds = &obd->u.mds;
765         int rc = 0;
766         ENTRY;
767
768         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
769                 obd_register_observer(mds->mds_lov_obd, NULL);
770
771                 /* The actual disconnect of the mds_lov will be called from
772                  * class_disconnect_exports from mds_lov_clean. So we have to
773                  * ensure that class_cleanup doesn't fail due to the extra ref
774                  * we're holding now. The mechanism to do that already exists -
775                  * the obd_force flag. We'll drop the final ref to the
776                  * mds_lov_exp in mds_cleanup. */
777                 mds->mds_lov_obd->obd_force = 1;
778         }
779
780         RETURN(rc);
781 }
782
783 struct mds_lov_sync_info {
784         struct obd_device    *mlsi_obd;     /* the lov device to sync */
785         struct obd_device    *mlsi_watched; /* target osc */
786         __u32                 mlsi_index;   /* index of target */
787 };
788
789 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
790 {
791         struct mds_capa_info    info = { .uuid = uuid };
792         struct lustre_capa_key *key;
793         int i, rc = 0;
794
795         ENTRY;
796
797         if (!mds->mds_capa_keys)
798                 RETURN(0);
799
800         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
801         for (i = 0; i < 2; i++) {
802                 key = &mds->mds_capa_keys[i];
803                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
804
805                 info.capa = key;
806                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
807                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
808                 if (rc) {
809                         DEBUG_CAPA_KEY(D_ERROR, key,
810                                        "propagate failed (rc = %d) for", rc);
811                         RETURN(rc);
812                 }
813         }
814
815         RETURN(0);
816 }
817
818 /* We only sync one osc at a time, so that we don't have to hold
819    any kind of lock on the whole mds_lov_desc, which may change
820    (grow) as a result of mds_lov_add_ost.  This also avoids any
821    kind of mismatch between the lov_desc and the mds_lov_desc,
822    which are not in lock-step during lov_add_obd */
823 static int __mds_lov_synchronize(void *data)
824 {
825         struct mds_lov_sync_info *mlsi = data;
826         struct obd_device *obd = mlsi->mlsi_obd;
827         struct obd_device *watched = mlsi->mlsi_watched;
828         struct mds_obd *mds = &obd->u.mds;
829         struct obd_uuid *uuid;
830         __u32  idx = mlsi->mlsi_index;
831         struct mds_group_info mgi;
832         struct llog_ctxt *ctxt;
833         int rc = 0;
834         ENTRY;
835
836         OBD_FREE_PTR(mlsi);
837
838         LASSERT(obd);
839         LASSERT(watched);
840         uuid = &watched->u.cli.cl_target_uuid;
841         LASSERT(uuid);
842
843         cfs_down_read(&mds->mds_notify_lock);
844         if (obd->obd_stopping || obd->obd_fail)
845                 GOTO(out, rc = -ENODEV);
846
847         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
848         rc = mds_lov_update_mds(obd, watched, idx);
849         if (rc != 0) {
850                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
851                 GOTO(out, rc);
852         }
853         mgi.group = mdt_to_obd_objseq(mds->mds_id);
854         mgi.uuid = uuid;
855
856         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
857                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
858         if (rc != 0)
859                 GOTO(out, rc);
860         /* propagate capability keys */
861         rc = mds_propagate_capa_keys(mds, uuid);
862         if (rc)
863                 GOTO(out, rc);
864
865         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
866         if (!ctxt)
867                 GOTO(out, rc = -ENODEV);
868
869         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
870         rc = llog_connect(ctxt, NULL, NULL, uuid);
871         llog_ctxt_put(ctxt);
872         if (rc != 0) {
873                 CERROR("%s failed at llog_origin_connect: %d\n",
874                        obd_uuid2str(uuid), rc);
875                 GOTO(out, rc);
876         }
877
878         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
879               obd->obd_name, obd_uuid2str(uuid));
880
881         rc = mds_lov_clear_orphans(mds, uuid);
882         if (rc != 0) {
883                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
884                        obd_uuid2str(uuid), rc);
885                 GOTO(out, rc);
886         }
887
888 #ifdef HAVE_QUOTA_SUPPORT
889         if (obd->obd_upcall.onu_owner) {
890                 /*
891                  * This is a hack for mds_notify->mdd_notify. When the mds obd
892                  * in mdd is removed, This hack should be removed.
893                  */
894                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
895                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
896                                                 obd->obd_upcall.onu_owner,NULL);
897         }
898 #endif
899         EXIT;
900 out:
901         cfs_up_read(&mds->mds_notify_lock);
902         if (rc) {
903                 /* Deactivate it for safety */
904                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
905                        rc);
906                 if (!obd->obd_stopping && mds->mds_lov_obd &&
907                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
908                         obd_notify(mds->mds_lov_obd, watched,
909                                    OBD_NOTIFY_INACTIVE, NULL);
910         }
911
912         class_decref(obd, "mds_lov_synchronize", obd);
913         return rc;
914 }
915
916 int mds_lov_synchronize(void *data)
917 {
918         struct mds_lov_sync_info *mlsi = data;
919         char name[20];
920
921         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
922         cfs_daemonize_ctxt(name);
923
924         RETURN(__mds_lov_synchronize(data));
925 }
926
927 int mds_lov_start_synchronize(struct obd_device *obd,
928                               struct obd_device *watched,
929                               void *data, enum obd_notify_event ev)
930 {
931         struct mds_lov_sync_info *mlsi;
932         int rc;
933         struct obd_uuid *uuid;
934         ENTRY;
935
936         LASSERT(watched);
937         uuid = &watched->u.cli.cl_target_uuid;
938
939         OBD_ALLOC(mlsi, sizeof(*mlsi));
940         if (mlsi == NULL)
941                 RETURN(-ENOMEM);
942
943         LASSERT(data);
944         mlsi->mlsi_obd = obd;
945         mlsi->mlsi_watched = watched;
946         mlsi->mlsi_index = *(__u32 *)data;
947
948         /* Although class_export_get(obd->obd_self_export) would lock
949            the MDS in place, since it's only a self-export
950            it doesn't lock the LOV in place.  The LOV can be disconnected
951            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
952            Simply taking an export ref on the LOV doesn't help, because it's
953            still disconnected. Taking an obd reference insures that we don't
954            disconnect the LOV.  This of course means a cleanup won't
955            finish for as long as the sync is blocking. */
956         class_incref(obd, "mds_lov_synchronize", obd);
957
958         if (ev != OBD_NOTIFY_SYNC) {
959                 /* Synchronize in the background */
960                 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
961                                        CFS_DAEMON_FLAGS);
962                 if (rc < 0) {
963                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
964                                obd->obd_name, rc);
965                         class_decref(obd, "mds_lov_synchronize", obd);
966                 } else {
967                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
968                                "thread=%d\n", obd->obd_name,
969                                mlsi->mlsi_index, rc);
970                         rc = 0;
971                 }
972         } else {
973                 rc = __mds_lov_synchronize((void *)mlsi);
974         }
975
976         RETURN(rc);
977 }
978
979 int mds_notify(struct obd_device *obd, struct obd_device *watched,
980                enum obd_notify_event ev, void *data)
981 {
982         struct mds_obd *mds = &obd->u.mds;
983         int rc = 0;
984         ENTRY;
985
986         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
987
988         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
989                 CERROR("unexpected notification of %s %s!\n",
990                        watched->obd_type->typ_name, watched->obd_name);
991                 RETURN(-EINVAL);
992         }
993
994         /*XXX this notifies the MDD until lov handling use old mds code
995          * must non block!
996          */
997         if (obd->obd_upcall.onu_owner) {
998                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
999                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
1000                                                  obd->obd_upcall.onu_owner,
1001                                                  &mds->mds_obt.obt_mount_count);
1002         }
1003
1004         switch (ev) {
1005         /* We only handle these: */
1006         case OBD_NOTIFY_CREATE:
1007                 CWARN("MDS %s: add target %s\n",obd->obd_name,
1008                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1009                 /* We still have to fix the lov descriptor for ost's */
1010                 LASSERT(data);
1011                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1012                                           &watched->u.cli.cl_target_uuid);
1013                 RETURN(rc);
1014         case OBD_NOTIFY_ACTIVE:
1015                 /* lov want one or more _active_ targets for work */
1016                 /* activate event should be pass lov idx as argument */
1017         case OBD_NOTIFY_SYNC:
1018         case OBD_NOTIFY_SYNC_NONBLOCK:
1019                 /* sync event should be pass lov idx as argument */
1020                 break;
1021         default:
1022                 RETURN(0);
1023         }
1024
1025         if (obd->obd_recovering) {
1026                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1027                       obd->obd_name,
1028                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1029                 /* We still have to fix the lov descriptor for ost's added
1030                    after the mdt in the config log.  They didn't make it into
1031                    mds_lov_connect. */
1032                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1033                                          &watched->u.cli.cl_target_uuid);
1034         } else {
1035                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1036         }
1037         RETURN(rc);
1038 }